3. A message-transfer service spanning two locations

../_images/msgtransfer2-config.svg

This section defines a message-transfer service between two locations, identified by addresses 0 and 1, and a distributed program that implements the service using TCP sockets. The distributed program starts two instances of a “node” program, one at each address. Each node provides three functions to the local user: one to send a message, one to receive a message, and one to end the service. The programs use TCP’s connection opening and closing, and add message framing to TCP’s byte-stream data transfer.

The intended service is described, first informally and then in pseudo-Python and Python. We obtain a service model program and a service tester program, which is used to test our implementation. This example shows how Sesf handles distributed services and implementations, and service functions that return (non-void) values.


Intended service informally stated

The service spans two locations, identified by addresses 0 and 1. At each address, three functions can be called by (a local user in) the environment.

  • send(msg): send message msg to the other address.
    • Return False only if the service is closed, else return True.
    • Call only if no send() call is ongoing at this address and only if no previous send() call at this address has returned False.
  • recv(): receive a message sent at the other end.
    • Return (False,) only if the service is closed, return (True, msg) only if incoming message msg is available, else block.
    • Call only if no recv() call is ongoing at this address and only if no previous recv() call at this address has returned (False,).
    • Messages are received in the order they were sent at the other address.
  • end(): Close the service.
    • Return None.
    • Call only if end() has not been called at any address.

This, though informal, completely and unambiguously defines the intended service of the distributed program. The function calls define the inputs, the function returns define the outputs, and the constraints on function calls and return values define the set of acceptable sequences of inputs and outputs. Later we will cast it into a Python class.


An implementation using TCP

Module sesf.msgtransfer2.imp_0.node has class Node that implements a node of a msgtransfer2 implementation.

  • Run python user_node 2 0 --use imp_0/node.py 2 0 to start a process executing a user node with address 0 that starts an imp_0 node with address 0.
  • Run python user_node 2 1 --use imp_0/node.py 2 1 to start a process executing a user node with address 1 that starts an imp_0 node with address 1.
  • Or run python start_nodes.py 2 user_node.py --use imp_0.node instead of the above two commands.

The two imp_0 node instances connect via TCP, and the resulting distributed system implements msgtransfer2 service, which is used by the two user nodes. Each user node has two concurrent threads, one sending messages and one receving messages; after a while, the main thread in the process with node 0 calls its imp_0 node’s end(), causing the two processes to end.

class msgtransfer2.imp_0.node.Node(argv, testerproxy=None)[source]

A node of a distributed system that implements the msgtransfer2 service. The distributed system consists of two Node instances with addresses 0 and 1. For brevity, refer to the node at adddress j as node_j, where j ranges over 0 and 1.

After node_j becomes connected, it maintains the following variables:

  • sbuf (bytearray): buffer to hold an outgoing packet.
  • sbufview: memoryview of sbuf (slicing sbufview is cheap, slicing sbuf is not).
  • rbuf (bytearray): buffer to hold an incoming packet.
  • rbufview: memoryview of rbuf.
  • rcvdmsgqueue: queue of incoming (flag,msg) tuples for local user.
  • local thread named recv: receives packets from TCP socket and puts them in rcvdmsgqueue.

Initialization

The first step is to establish a TCP connection with its peer. node_0 starts accepting on a TCP server socket on port accptngport0 (hardcoded to 6000). node_1 starts a TCP client socket connecting to node_0’s ip address and port number, retrying until success. At some point, each side’s TCP socket becomes open.

Then node_j creates some buffers: sbuf, to hold an outgoing packet; rbuf, to hold an incoming packet; and rcvdmsgqueue, to hold incoming (flag,msg) tuples. Then node_j starts a local thread, named recv, to receive bytes from TCP. Then it returns, giving the caller access to its three “external” functions: send(msg), recv(), and end().

Packet structure

A packet containing a user message msg has the following structure: 8-byte header and variable-sized data consisting of the serialized (pickled) msg. The 8-byte header consists of: a 2-byte srcid (not used), a 2-byte dstid (not used), and a 4-byte datalen, equal to the size in bytes of the data. Thus the serialized msg can be at most 232 bytes.

There is also an “END” packet (used in closing). It has only a header with datalen of 0.

User-callable functions

A send(msg) call casts msg into a packet, sends the byte sequence over TCP, and returns (True,) if the TCP send succeeded and (False,) otherwise.

A recv() call returns the next (flag,msg) tuple from rcvdmsgqueue, blocking until the latter is not empty.

An end() call sends an END packet and returns, after which send() calls have no effect. (The peer responds to the END packet with its own END packet, and when that is received this system closes its TCP socket and ends.)

Receiving packets

The local thread recv repeatedly does the following:

  • receive 8 bytes (packet header) and get the value of datalen;
  • if the value is not zero: receive that many bytes (packet data); unpickle it to a message, say msg; and add (True,msg) to rcvdmsgqueue;
  • if the value of datalen is zero (END packet): if end() was not called locally, send an END packet; add (False,None) to rcvdmsgqueue, and close the TCP socket. (node_j would typically end at some point after this.)

Running the processes on different machines

The two processes must at network locations reachable from each other via TCP. Class Node assumes that its peer node is on the same OS: so they have the same IP address (“localhost”) and differ only in their TCP port numbers. This is convenient but not necessary.

To run the two processes on different machines, each Node instance needs the IP address of the other (easily done). But you also have to deal with any intervening firewalls (and network administrators). Also, remote testing would not be as safe because these programs are not secure (no authentication, pickle for serializing) and they would be exposed to your network.


Start_nodes utility

The script start_nodes.py allows you to enter one command to start any number of nodes, each running as a process on the same OS in its own shell. Instead of entering python imp_0/node.py 2 0 and python imp_0/node.py 2 1 in different shells, you can run python start-nodes.py 2 imp_0/node.py in a shell. The script uses function Popen from the subprocessing module to start a process in a new shell. (You may need to edit the arguments of the Popen call for your operating system. More recent versions of Python may take care of this.)


Starting the msgtransfer2 nodes from user nodes

Module sesf.msgtransfer2.user_node has class UserNode that implements a user node that starts a given msgtransfer2 node. If the peer user node is also started, the two user nodes exercise the msgtransfer2 service provided by the two msgtransfer2 nodes. For example:

  • Run python user_node.py 2 0 --use imp_0.node 2 0 to start a process that starts UserNode 0 which starts msgtransfer2.imp_0.Node 0.
  • Run python user_node.py 2 1 --use imp_0.node 2 1 to start a process that starts UserNode 1 which starts msgtransfer2.imp_0.Node 1.
  • Or, instead of the above two commands, run python ../start_nodes.py 2 user_node.py --use imp_0.node.)

The two user nodes use the service provided by the two msgtransfer2 nodes to exchange a few messages, then end the msgtransfer2 service and the two processes.

The user nodes can also make use of a running msgtransfer2 servicemodel, instead of a msgtransfer2 implementation. Simply replace imp_0.node by service_node in the above commands.

class sesf.msgtransfer2.user_node.UserNode(argv)[source]

Service program (abstract version)

We now define a “pseudo-Python” class Service in msgtransfer2.service.ppy, that expresses the intended service formally. We say “pseudo-Python” because it has non-Python constructs (namely, CC, CU, RETURN, RC, RU), which are explained below. Conceptually, the class is a program that can, at any point in its execution, receive any (and only an) acceptable input and can return any acceptable output.

The class has the following structure.

class Service():
  def __init__(self):
    <instance variables>

  def send(self, j, msg):
    <call part>
    <return part>

  def recv(self, j):
    <call part>
    <return part>

  def end(self, j):
    <call part>
    <return part>

The class has three non-init functions: send(j,msg), recv(j) and end(j), corresponding to the functions send(msg), recv() and end() at address j of an implementation of the service.

Note

Convention: Throughout the class, parameter j is an address (0 or 1).

The __init__ function defines variables adequate to express when a non-init function can be called and the possible values it can return. Every non-init function has a call part followed by a return part. Each part is executed atomically. The call part checks whether the function call is valid, and if so updates the instance variables. The return part checks whether the function can return, and if so generates a return value, if any, and updates the instance variables.

Here is the function __init__():

  def __init__(self):
    """Throughout j ranges over 0..1:
    - ending is true iff end(0) or end(1) has been called.
    - ongoing_send[j] is true iff a send(j,.) call is ongoing. j in 0..1
    - closed_send[j] is true iff a send(j,.) call has returned False.
    - ongoing_recv[j] is true iff a recv(j,.) call is ongoing.
    - closed_recv[j] is true iff a recv(j,.) call has returned (False,).
    - sent[j] is the sequence of messages sent in send(j,.) calls.
    - nrcvd[j] is the number of messages returned in recv(j) calls.
    """
    self.ending = False
    self.ongoing_send = [False, False]
    self.closed_send = [False, False]
    self.ongoing_recv = [False, False]
    self.closed_recv = [False, False]
    self.sent = [[], []]
    self.nrcvd = [0, 0]

Here is the function send(j, msg), with the call and return parts indicated by the comments at right.

  def send(self, j, msg):
    CIC:
      j in (0, 1) and                                       # call part
      not self.ongoing_send[j] and not self.closed_send[j]  #  "  "  "
    CU:                                                     #  "  "  "
      self.sent[j].append(msg)                              #  "  "  "
      self.ongoing_send[j] = True                           #  "  "  "

    RETURN(rval):                                           # return part
      ROC:                                                   #  "  "  "
        rval == True or (rval == False and self.ending)     #  "  "  "
      RU:                                                   #  "  "  "
        self.ongoing_send[j] = False                        #  "  "  "
        if not rval:                                        #  "  "  "
          self.closed_send[j] = True                        #  "  "  "
        return rval                                         #  "  "  "

The call part has two components: CIC, which stands for call input condition; and CU, which stands short for call update. When a thread comes to the call condition, it does the following in one atomic step: evaluate the call condition; if true (which means the call is valid) execute the call update, else abort.

The return part has three components: RETURN, which defines a return parameter, rval; ROC, which stands for return output condition; and RU, which stands for return update. A thread at RETURN is blocked unless the return output condition is true for some value, say x, of rval, in which case it does the following atomically: set rval to x; execute the return update (which returns (x,)). Thus the return output condition implicitly assigns a value to the return parameter. Short-circuit evaluation does not apply here. If ending is true, the function can return True or False, and the choice is non-deterministic.

Here is the function recv(j):

  def recv(self, j):
    CIC:
      (j in (0, 1) and
       not self.ongoing_recv[j] and not self.closed_recv[j])
    CU:
      self.ongoing_recv[j] = True

    RETURN((flag, msg)):
      # rval is (False,) | (True, msg)
      ROC:
       ((flag == False and self.ending) or
        (flag == True and self.nrcvd[j] < len(self.sent[1-j])
         and msg == self.sent[1-j][self.nrcvd[j]]))
      RU:
        self.ongoing_recv[j] = False
        if flag == False:
          self.closed_recv[j] = True
          return (flag,)
        else:
          self.nrcvd[j] += 1
        return (flag, msg)

The return parameter is a tuple (flag, msg). The first disjunct in the return condition can be chosen if ending is true; it sets flag to false and msg to None. The second disjunct can be chosen if there is an incoming message to be received; it sets flag to true and msg to the next “in-order” message sent at the remote address. The two disjuncts are not mutually exclusive: if both are true, either can happen and the choice is non-deterministic.

Here is the function end(j):

  def end(self, j):
    CIC:
      j in (0,1) and not self.ending
    CU:
      self.ending = True

    RETURN():
      ROC:
        True
      RU:
        pass

Non-determinism

As noted above, when a return condition allows more than one value for a return parameter, one of these values is non-deterministically chosen. This non-determinism is essential for defining services of multi-threaded programs. Encoding it by a boolean condition makes it easy to understand the service program and to use it in proving that an implementation satisfies the service.


Service program (concrete version)

Recall that we want the service to yield executable servicetester and service programs.

To obtain a service program, we need to implement the non-Python constructs in Python. The call part can be easily done using locks and condition variables. The return part is not as easy because of the return condition: we have to get Python code equivalent to the non-deterministic assignment of return values.

We do this in two steps. The first step is to obtain a version of class Service in which each component (CC, CU, RC, RU) of a function is represented by a separate Python function.

Consider a function f in the pseudo-Python class:

def f(self, j, args):
  CIC: ...
  CU:  ...
  RETURN(rval):
    ROC: ...
    RU:  ...

This function is broken up into four functions. (Below f.CIC refers to the body of CIC in function f; ditto for f.CU, f.ROC, f.RU).

def f_CIC(self, j, args):
  return f.CC

def f_CU(self, j, args):
  f.CU

def f_ROC(self, j):
  if f.ROC does not hold for any value of rval:
    return (0,)
  else:
    randomly select a value x of rval such that f.RC holds
    return (1, x)

def f_RU(self, j, args, rval):
  f.RU

Barring f_ROC, the above functions can be obtained mechanically. Given these functions, it is straightforward to use standard Python synchronization constructs (e.g., locks and condition variables) to obtain the service-model.


Obtaining the servicetester program is similar, but with the roles of the call condition and return condition switched. That is, the tester can make a call to an implementation function f iff f.COC is true for some value x of function parameters, and those values become the arguments of the call. When the implementation returns the call, the tester has to check that the return satisfies f.RIC. That is, for each call condition we need an output version (CCO), and for each return condition we need an input version (RCI).


Module sesf.msgtransfer2.serviceparts has class ServiceParts, which has the component functions of sesf.msgtransfer2.service.ppy. Each function f in the pseudo-Python class gives rise to six functions: f_CIC, f_COC, f_CU, f_RIC, f_ROC, f_RU.

class msgtransfer2.serviceparts.ServiceParts[source]

Consider a function f in the pseudo-Python class:

def f(self, j, args):
  CIC: ...
  CU:  ...
  RETURN(rval):
    ROC: ...
    RU:  ...

To make the “synchronization harness”, the model class can use a lock, say lck, and an associated condition variable, say cond. Then the function f in the model class would be:

def f(self, j, args):

  # do_service_input
  with cond:
    icval = f_CIC(j, args)
    if not icval:
       raise Exception("input call condition violated")
    f_CU(j, args)
    cond.notify_all()

  # do_service_output
  with cond:
    while True:
      oval = f_ROC(j)
      if oval == (0,):
        # output return condition unsatisfiable
        cond.wait()
      else:
        # oval == (1,x);
        # output return condition satisfied with output value x
        break
    # oval == (1,x)
    oval = oval[1:]
    # oval == (x)
    f_RU(j, oval)
    cond.notify_all()
    return rval

It’s convenient to treat the # do_service_input part as a function with f_CIC and f_CU as parameters. Similarly, treat the # do_service_output part as a function with f_ROC and f_RU as parameters. These functions (with a slight tweak so that oval is always a tuple) are here:

util.service.do_service_input(j, check_cond, do_update, args, cond)[source]

Service input-part factory, used in service model and service tester. The arguments are:

  • j (int): node id for the input step.
  • check_cond (fn): CCI or RCI function from service.
  • do_update (fn): CU or RU function from service.
  • args: tuple of arguments supplied by implementation; (None,) if none.
  • cond (Condition): condition used in service model or service tester.
util.service.do_service_output(j, get_args, do_update, cond)[source]

Service output-part factory, used in service model and service tester. The arguments are:

  • j (int): node id for the input step.
  • get_args (fn): CCO or RCO function from service.
  • do_update (fn): CU or RU function from service.
  • cond (Condition): condition used in service tester or service model.

What remains is to redirect user j’s calls of msgtransfer2 functions to calls of the corresponding functions in the model. RPC (remote procedure call) is a natural choice here (because the users are distributed). We use Pyro4. The framework has three parts.

  • A Pyro4 nameserver runs in the background.
  • The model starts a Pyro4 daemon (to receive RPCs) and registers itself with the nameserver (with name sesf.msgtransfer2.service).
  • Each user j gets the address of daemon sesf.msgtransfer2.service from the nameserver and creates three proxies for the model. User j can call a function of the model by prefixing the call with a proxy’s name. Why three proxies? Because user j can potentially have three concurrent calls to the model (one for each of send(), recv() and end()).

Service node

Instead of adding Pyro4 code to the node that uses the service (eg, msgtransfer2.user_node node), we put the Pyro4 code in a class msgtransfer2.service_node.Node, and have the user node use the service node (instead of an implementation node).

Module sesf.msgtransfer2.service_node has class Node that has the signature of a msgtransfer2 implementation node and redirects incoming calls to RPC calls to a running msgtransfer2.service with pyrodaemon sesf.msgtransfer2.service.

class msgtransfer2.service_node.Node(argv)[source]

Service program

Module sesf.msgtransfer2.service has class Service, which is the concrete version of msgtransfer2 service. RPC-accessable via pyrodaemon sesf.msgtransfer2.service. Run python service.py to start a Service instance, which can then be accessed by a distributed program that uses msgtransfer2 service.

class msgtransfer2.service.Service[source]

Running user nodes over the service

../_images/msgtransfer2-users+servicemodel.svg

The msgtransfer2 user node class described above can make use of the service (instead of an implementation).

  • Start a Pyro4 nameserver if one is not already running (run python -m Pyro4.naming &).
  • Run python servicemodel.py, to start the service program.
  • Run python user_node.py 2 0 --use service_node 2 0.
  • Run python user_node.py 2 1 --use service_node 2 1.
  • Or run python start_nodes.py 2 user_node.py --use service_node instead of the last two commands.

Servicetester (abstract version)

TODO

Servicetester (concrete version)

Recall that a servicetester program is a program that can be used in place of the users of the service, for testing a (distributed) implementation of the service.

The servicetester class, ServiceTester, would have the signature of the pseudo-Python service but with inputs and outputs exchanged. For each function f in the pseudo-Python service, the tester would have a function, say do_f (the name doesn’t matter), whose body would be a “synchronization harness” in which functions f_CCO, f_CU, f_RCI, f_RU (from the Python service) and a RPC call to node_j.f are inserted.

Consider a function f in the pseudo-Python class:

def f(self, j, args):
  CIC: ...
  CU:  ...
  RETURN(rval):
    ROC: ...
    RU:  ...

The function do_f would be as follows, where the synchronization harness uses lock lck and associated condition variable cond, and rpc_m[j] is an RPC proxy for implementation node j.

def do_f(self, j):

  # do_service_output
  with cond:
    while True:
      oval = f_COC(j)
      if oval == (0,):
        # call condition unsatisfiable
        cond.wait()
      else:
        # oval == (1, x)
        # output call condition satisfied with args value x
        break
    # oval == (1, x)
    oval = oval[1:]
    # oval == (x)
    f_CU(j, oval)
    cond.notify_all()

  # call the implemenation function f(oval) at address j
  # rpc_mp[j] is an RPC proxy for node_j
  irval = rpc_mp[j].f(ccargs)

  # do_service_input
  with cond:
    rcval = f_RIC(j, irval)
    if not rcval:
      raise Exception("return condition violated")
    f_RU(j, irval)
    cond.notify_all()

    # irval may be relevant to ending the tester
    return irval

What remains is the Pyro4 RPC glue. The framework has three parts.

  • A Pyro4 nameserver runs in the background.
  • In the implementation, node j starts a Pyro4 daemon (to receive RPCs) and registers itself with the nameserver.
  • The service-tester, in its __init__ function, gets the addresses of these daemons from the nameserver and creates three proxies for each node j. (These are the proxies used in the do_f functions to call the send, recv and end functions of node j.)

class TestNode

Instead of adding Pyro4 code to class Node, we put it in a wrapper class TestNode, to minimize modifications to Msgtransfer2_imp0.

class msgtransfer2.test_node.TestNode(myid, use)[source]

class ServiceTester

Module sesf.msgtransfer2.servicetester has class ServiceTester that implements the msgtransfer2 servicetester. This can test a distributed implementation of the msgtransfer2 service. For example, to test a distributed system of two imp_0 nodes:

  • Start a Pyro4 nameserver if one is not already running: run python -m Pyro4.naming &.
  • Run python test_node.py 2 0 --use imp_0.node 2 0 and python test_node.py 2 1 --use imp_0.node 2 1. (Or python start_nodes.py 2 test_node.py --use imp_0.node.)
  • Run python servicetester.py 2 --checker imp_0.checker 2 to test against the service and the implementation-specific assertions in imp_0.checker.Checker. (Or python servicetester.py 2 to test only against the service.)
class msgtransfer2.servicetester.ServiceTester(num_nodes, checker_argv, test_duration=3)[source]
../_images/msgtransfer2-servicetester+imp.svg

Checker for implementation imp_0

class msgtransfer2.imp_0.checker.Checker(servicetester, argv)[source]

Maintain the global state of a distributed system of msgtransfer.imp0 nodes and check some assertions. Created by msgtransfer2.servicetester when testing the distributed system. Maintains the following variables:

  • tester: the tester object that started this checker object
  • sent: sent state maintained by tester (actually tester.service) object
  • rcvd_counts: list of two integers initially both 0. rcvd_counts[j] is number of packets received at node j, for j in 0..1.

Has a function handle_event(e) that takes an event e reported by a node, updates the global state, and checks whether an assertion still holds.

  • RPC-called by an node to report an event e at the node. e is a ('checker','rcvd',j,msg) tuple, indicating the arrival at node j of packet whose data is msg serialized.
  • Assertion checked: for every event (‘checker’, ‘rcvd’, j, msg): sent[1-j][rcvd_counts[j]] == msg