.. highlight:: python :linenothreshold: 50 A message-transfer service spanning two locations =================================================== .. image:: msgtransfer2-config.svg :height: 240 px :align: center This section defines a message-transfer service between two locations, identified by addresses 0 and 1, and a distributed program that implements the service using TCP sockets. The distributed program starts two instances of a "node" program, one at each address. Each node provides three functions to the local user: one to *send* a message, one to *receive* a message, and one to *end* the service. The programs use TCP's connection opening and closing, and add message framing to TCP's byte-stream data transfer. The intended service is described, first informally and then in pseudo-Python and Python. We obtain a service model program and a service tester program, which is used to test our implementation. This example shows how Sesf handles distributed services and implementations, and service functions that return (non-void) values. | Intended service informally stated ----------------------------------- The service spans two locations, identified by addresses 0 and 1. At each address, three functions can be called by (a local user in) the environment. - ``send(msg)``: send message ``msg`` to the other address. - Return ``False`` only if the service is closed, else return ``True``. - Call only if no ``send()`` call is ongoing at this address and only if no previous ``send()`` call at this address has returned ``False``. - ``recv()``: receive a message sent at the other end. - Return ``(False,)`` only if the service is closed, return ``(True, msg)`` only if incoming message ``msg`` is available, else block. - Call only if no ``recv()`` call is ongoing at this address and only if no previous ``recv()`` call at this address has returned ``(False,)``. - Messages are received in the order they were sent at the other address. - ``end()``: Close the service. - Return ``None``. - Call only if ``end()`` has not been called at any address. This, though informal, completely and unambiguously defines the intended service of the distributed program. The function calls define the inputs, the function returns define the outputs, and the constraints on function calls and return values define the set of acceptable sequences of inputs and outputs. Later we will cast it into a Python class. | An implementation using TCP ---------------------------- .. automodule:: msgtransfer2.imp_0.node .. autoclass:: msgtransfer2.imp_0.node.Node **Initialization** The first step is to establish a TCP connection with its peer. node_0 starts accepting on a TCP server socket on port ``accptngport0`` (hardcoded to 6000). node_1 starts a TCP client socket connecting to node_0's ip address and port number, retrying until success. At some point, each side's TCP socket becomes open. Then node_j creates some buffers: ``sbuf``, to hold an outgoing packet; ``rbuf``, to hold an incoming packet; and ``rcvdmsgqueue``, to hold incoming ``(flag,msg)`` tuples. Then node_j starts a local thread, named *recv*, to receive bytes from TCP. Then it returns, giving the caller access to its three "external" functions: ``send(msg)``, ``recv()``, and ``end()``. **Packet structure** A packet containing a user message ``msg`` has the following structure: 8-byte *header* and variable-sized *data* consisting of the serialized (pickled) ``msg``. The 8-byte header consists of: a 2-byte ``srcid`` (not used), a 2-byte ``dstid`` (not used), and a 4-byte ``datalen``, equal to the size in bytes of the data. Thus the serialized ``msg`` can be at most 2\ :sup:`32` bytes. There is also an "END" packet (used in closing). It has only a header with ``datalen`` of 0. **User-callable functions** A ``send(msg)`` call casts ``msg`` into a packet, sends the byte sequence over TCP, and returns ``(True,)`` if the TCP send succeeded and ``(False,)`` otherwise. A ``recv()`` call returns the next ``(flag,msg)`` tuple from ``rcvdmsgqueue``, blocking until the latter is not empty. An ``end()`` call sends an END packet and returns, after which ``send()`` calls have no effect. (The peer responds to the END packet with its own END packet, and when that is received this system closes its TCP socket and ends.) **Receiving packets** The local thread *recv* repeatedly does the following: - receive 8 bytes (packet header) and get the value of ``datalen``; - if the value is not zero: receive that many bytes (packet data); unpickle it to a message, say ``msg``; and add ``(True,msg)`` to ``rcvdmsgqueue``; - if the value of ``datalen`` is zero (END packet): if ``end()`` was not called locally, send an END packet; add ``(False,None)`` to ``rcvdmsgqueue``, and close the TCP socket. (node_j would typically end at some point after this.) Running the processes on different machines ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The two processes must at network locations reachable from each other via TCP. Class ``Node`` assumes that its peer node is on the same OS: so they have the same IP address ("localhost") and differ only in their TCP port numbers. This is convenient but not necessary. To run the two processes on different machines, each ``Node`` instance needs the IP address of the other (easily done). But you also have to deal with any intervening firewalls (and network administrators). Also, **remote testing would not be as safe because these programs are not secure (no authentication, pickle for serializing) and they would be exposed to your network.** | Start_nodes utility ----------------------- The script :doc:`start_nodes.py ` allows you to enter one command to start any number of nodes, each running as a process on the same OS in its own shell. Instead of entering ``python imp_0/node.py 2 0`` and ``python imp_0/node.py 2 1`` in different shells, you can run ``python start-nodes.py 2 imp_0/node.py`` in a shell. The script uses function ``Popen`` from the ``subprocessing`` module to start a process in a new shell. (You may need to edit the arguments of the ``Popen`` call for your operating system. More recent versions of Python may take care of this.) | Starting the msgtransfer2 nodes from user nodes --------------------------------------------------- .. automodule:: sesf.msgtransfer2.user_node .. autoclass:: UserNode | Service program (abstract version) --------------------------------------- We now define a "pseudo-Python" class ``Service`` in :doc:`msgtransfer2.service.ppy `, that expresses the intended service formally. We say "pseudo-Python" because it has non-Python constructs (namely, ``CC``, ``CU``, ``RETURN``, ``RC``, ``RU``), which are explained below. Conceptually, the class is a program that can, at any point in its execution, receive any (and only an) acceptable input and can return any acceptable output. The class has the following structure. .. code-block:: python class Service(): def __init__(self): def send(self, j, msg): def recv(self, j): def end(self, j): The class has three non-init functions: ``send(j,msg)``, ``recv(j)`` and ``end(j)``, corresponding to the functions ``send(msg)``, ``recv()`` and ``end()`` at address ``j`` of an implementation of the service. .. note:: **Convention:** Throughout the class, parameter ``j`` is an address (0 or 1). The ``__init__`` function defines variables adequate to express when a non-init function can be called and the possible values it can return. Every non-init function has a *call part* followed by a *return part*. Each part is executed atomically. The call part checks whether the function call is valid, and if so updates the instance variables. The return part checks whether the function can return, and if so generates a return value, if any, and updates the instance variables. Here is the function ``__init__()``: .. literalinclude:: ../../sesf/msgtransfer2/service.ppy :pyobject: Service.__init__ Here is the function ``send(j, msg)``, with the call and return parts indicated by the comments at right. .. literalinclude:: ../../sesf/msgtransfer2/service.ppy :pyobject: Service.send The call part has two components: ``CIC``, which stands for **call input condition**; and ``CU``, which stands short for **call update**. When a thread comes to the call condition, it does the following in one atomic step: evaluate the call condition; if true (which means the call is valid) execute the call update, else abort. The return part has three components: ``RETURN``, which defines a **return parameter**, ``rval``; ``ROC``, which stands for **return output condition**; and ``RU``, which stands for **return update**. A thread at ``RETURN`` is blocked unless the return output condition is true for some value, say ``x``, of ``rval``, in which case it does the following atomically: set ``rval`` to ``x``; execute the return update (which returns ``(x,)``). *Thus the return output condition implicitly assigns a value to the return parameter*. Short-circuit evaluation does **not** apply here. If ``ending`` is true, the function can return ``True`` or ``False``, and the choice is non-deterministic. Here is the function ``recv(j)``: .. literalinclude:: ../../sesf/msgtransfer2/service.ppy :pyobject: Service.recv The return parameter is a tuple ``(flag, msg)``. The first disjunct in the return condition can be chosen if ``ending`` is true; it sets ``flag`` to false and ``msg`` to None. The second disjunct can be chosen if there is an incoming message to be received; it sets ``flag`` to true and ``msg`` to the next "in-order" message sent at the remote address. The two disjuncts are not mutually exclusive: if both are true, either can happen and the choice is non-deterministic. Here is the function ``end(j)``: .. literalinclude:: ../../sesf/msgtransfer2/service.ppy :pyobject: Service.end | **Non-determinism** As noted above, when a return condition allows more than one value for a return parameter, one of these values is *non-deterministically* chosen. This non-determinism is essential for defining services of multi-threaded programs. Encoding it by a *boolean* condition makes it easy to understand the service program and to use it in proving that an implementation satisfies the service. | Service program (concrete version) ------------------------------------------------------ Recall that we want the service to yield executable servicetester and service programs. To obtain a service program, we need to implement the non-Python constructs in Python. The call part can be easily done using locks and condition variables. The return part is not as easy because of the return condition: we have to get Python code equivalent to the non-deterministic assignment of return values. We do this in two steps. The first step is to obtain a version of class ``Service`` in which each component (CC, CU, RC, RU) of a function is represented by a separate Python function. Consider a function ``f`` in the pseudo-Python class: .. code-block:: python def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ... This function is broken up into four functions. (Below ``f.CIC`` refers to the body of ``CIC`` in function ``f``; ditto for ``f.CU``, ``f.ROC``, ``f.RU``). .. code-block:: python def f_CIC(self, j, args): return f.CC def f_CU(self, j, args): f.CU def f_ROC(self, j): if f.ROC does not hold for any value of rval: return (0,) else: randomly select a value x of rval such that f.RC holds return (1, x) def f_RU(self, j, args, rval): f.RU Barring ``f_ROC``, the above functions can be obtained mechanically. Given these functions, it is straightforward to use standard Python synchronization constructs (e.g., locks and condition variables) to obtain the service-model. | Obtaining the **servicetester program** is similar, but with the roles of the call condition and return condition switched. That is, the tester can make a call to an implementation function ``f`` iff ``f.COC`` is true for some value *x* of function parameters, and those values become the arguments of the call. When the implementation returns the call, the tester has to check that the return satisfies ``f.RIC``. That is, for each call condition we need an *output* version (CCO), and for each return condition we need an *input* version (RCI). | .. automodule:: msgtransfer2.serviceparts .. autoclass:: ServiceParts Consider a function ``f`` in the pseudo-Python class: .. code-block:: python def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ... To make the "synchronization harness", the model class can use a lock, say ``lck``, and an associated condition variable, say ``cond``. Then the function ``f`` in the model class would be: .. code-block:: python def f(self, j, args): # do_service_input with cond: icval = f_CIC(j, args) if not icval: raise Exception("input call condition violated") f_CU(j, args) cond.notify_all() # do_service_output with cond: while True: oval = f_ROC(j) if oval == (0,): # output return condition unsatisfiable cond.wait() else: # oval == (1,x); # output return condition satisfied with output value x break # oval == (1,x) oval = oval[1:] # oval == (x) f_RU(j, oval) cond.notify_all() return rval It's convenient to treat the ``# do_service_input`` part as a function with ``f_CIC`` and ``f_CU`` as parameters. Similarly, treat the ``# do_service_output`` part as a function with ``f_ROC`` and ``f_RU`` as parameters. These functions (with a slight tweak so that ``oval`` is always a tuple) are here: .. automodule:: util.service .. autofunction:: do_service_input .. autofunction:: do_service_output | What remains is to redirect user j's calls of msgtransfer2 functions to calls of the corresponding functions in the model. RPC (remote procedure call) is a natural choice here (because the users are distributed). We use `Pyro4 `_. The framework has three parts. - A Pyro4 *nameserver* runs in the background. - The model starts a Pyro4 *daemon* (to receive RPCs) and registers itself with the nameserver (with name ``sesf.msgtransfer2.service``). - Each user j gets the address of daemon ``sesf.msgtransfer2.service`` from the nameserver and creates three *proxies* for the model. User j can call a function of the model by prefixing the call with a proxy's name. Why three proxies? Because user j can potentially have three concurrent calls to the model (one for each of ``send()``, ``recv()`` and ``end()``). Service node ^^^^^^^^^^^^^^^^^^^^^ Instead of adding Pyro4 code to the node that uses the service (eg, ``msgtransfer2.user_node`` node), we put the Pyro4 code in a class ``msgtransfer2.service_node.Node``, and have the user node use the service node (instead of an implementation node). .. automodule:: msgtransfer2.service_node .. autoclass:: Node Service program ^^^^^^^^^^^^^^^^^^^^^ .. automodule:: msgtransfer2.service .. autoclass:: Service Running user nodes over the service ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. image:: msgtransfer2-users+servicemodel.svg :height: 330 px :align: center The msgtransfer2 user node class described above can make use of the service (instead of an implementation). - Start a Pyro4 nameserver if one is not already running (run ``python -m Pyro4.naming &``). - Run ``python servicemodel.py``, to start the service program. - Run ``python user_node.py 2 0 --use service_node 2 0``. - Run ``python user_node.py 2 1 --use service_node 2 1``. - Or run ``python start_nodes.py 2 user_node.py --use service_node`` instead of the last two commands. Servicetester (abstract version) ----------------------------------------- TODO Servicetester (concrete version) -------------------------------------------- Recall that a servicetester program is a program that can be used in place of the users of the service, for testing a (distributed) implementation of the service. The servicetester class, ``ServiceTester``, would have the signature of the pseudo-Python service but with *inputs and outputs exchanged*. For each function ``f`` in the pseudo-Python service, the tester would have a function, say ``do_f`` (the name doesn't matter), whose body would be a "synchronization harness" in which functions ``f_CCO``, ``f_CU``, ``f_RCI``, ``f_RU`` (from the Python service) and a **RPC call** to node_j\ ``.f`` are inserted. Consider a function ``f`` in the pseudo-Python class: .. code-block:: python def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ... The function ``do_f`` would be as follows, where the synchronization harness uses lock ``lck`` and associated condition variable ``cond``, and ``rpc_m[j]`` is an RPC proxy for implementation node j. .. code-block:: python def do_f(self, j): # do_service_output with cond: while True: oval = f_COC(j) if oval == (0,): # call condition unsatisfiable cond.wait() else: # oval == (1, x) # output call condition satisfied with args value x break # oval == (1, x) oval = oval[1:] # oval == (x) f_CU(j, oval) cond.notify_all() # call the implemenation function f(oval) at address j # rpc_mp[j] is an RPC proxy for node_j irval = rpc_mp[j].f(ccargs) # do_service_input with cond: rcval = f_RIC(j, irval) if not rcval: raise Exception("return condition violated") f_RU(j, irval) cond.notify_all() # irval may be relevant to ending the tester return irval | What remains is the `Pyro4 `_ RPC glue. The framework has three parts. - A Pyro4 *nameserver* runs in the background. - In the implementation, node j starts a Pyro4 *daemon* (to receive RPCs) and registers itself with the nameserver. - The service-tester, in its ``__init__`` function, gets the addresses of these daemons from the nameserver and creates three *proxies* for each node j. (These are the proxies used in the ``do_f`` functions to call the ``send``, ``recv`` and ``end`` functions of node j.) class TestNode ^^^^^^^^^^^^^^^^^^^^^ Instead of adding Pyro4 code to class ``Node``, we put it in a wrapper class ``TestNode``, to minimize modifications to ``Msgtransfer2_imp0``. .. py:module:: msgtransfer2.test_node .. autoclass:: TestNode class ServiceTester ^^^^^^^^^^^^^^^^^^^^^ .. automodule:: msgtransfer2.servicetester .. autoclass:: ServiceTester .. image:: msgtransfer2-servicetester+imp.svg :height: 340 px :align: center | Checker for implementation imp_0 --------------------------------------------------------------- .. py:module:: msgtransfer2.imp_0.checker .. autoclass:: Checker