5. A message-transfer service spanning N locations

../_images/msgtransfer-config.svg

This section extends the treatment in the previous section to N locations, where N ≥ 2. We obtain service model and tester programs, and a distributed program that implements the service using TCP. For no good reason, there is also a program that plays the log of a test (example video).


Intended service informally stated

The service spans N locations, identified by addresses 0, 1, …, N-1, for any N ≥ 2. At each address j, three functions are provided to the user:

  • send(k, msg): send message msg to address k.
    • Return False only if the service is closed, else return True.
    • Call only if no send() call is ongoing at address j and no previous send() call at address j has returned False.
  • recv(): receive a message sent at some other address.
    • Return (False,) only if the service is closed, return (True, msg) only if incoming message msg is available, else block.
    • Call only if no recv() call is ongoing at this address and no previous recv() call at this address has returned (False,).
    • For every address k, messages received from k are in the order in which they were sent at address k. But messages sent to j from different addresses can be received interleaved in any order.
  • end(): Close the service.
    • Return None.
    • Call only if end() has not been called at any address.

An implementation using TCP

Module sesf.msgtransfer.imp_0.node has class Node that implements a node of a msgtransfer service implementation for N addresses, N ≥ 2. Run the N commands python user_node.py N j --use imp_0/node.py N j for j ranging over 0..N-1 to start, exercise and close a msgtransfer service spanning N locations. (Or run python ../start_nodes.py N user_node.py --use imp_0.node.)

Process j starts a user node with address j that starts an imp_0 node with address j. The N imp_0 nodes implement a msgtransfer service over N addresses. This service is exercised by the user nodes. Each user node has two concurrent threads, one sending messages and one receving messages; after a while, the main thread in the process with address 0 calls its imp_0 node’s end(), causing the network to shutdown.

class msgtransfer.imp_0.node.Node(argv, testerproxy=None)[source]

An instance of this is a node of a distributed system that implements the msgtransfer service. Assumes it is in a network of num_nodes nodes (in different processes), and has address myid, where myid is a a value in 0, …, num_nodes-1.

For brevity, below we say “n” for num_nodes, “j” for myid, and “Node(j)” for the Node instance with address j. If Node(j), …, Node(n-1) have been started, they form a ring of TCP links, with Node(j) connected to Node((j+1)%n).

During initialization, Node(j) forms TCP connections with Noden((j-1)%n) and Node((j+1)%n) as follows:

  • If j is 0: Node(0) connects to Node(1) at port 6001, retrying until success. Then it waits to accept a TCP connect, which would be from Node(n-1), on port accptngport0 (= 6000).
  • If j is in 1, …, n-1: Node(j) waits to accept a TCP connect (which would be from Node(j-1)) on port accptngport0 + j. After it accepts this connect, Node(j) connects to Node(j+1) at port accptngport0``+j+1 (retrying until success), except for Node(n-1), which connects to Node(0) at port ``accptngport0.

After Node(j) has opened its two TCP connections, it is ready to accept calls to its send, recv and end functions. At this point, Node(j) maintains the following variables:

  • ending: True iff end has been called or END msg has been received.
  • accptngport0 = 6000 (hard-coded).
  • accptngsocket = TCP socket to accept connect requests (at port accptngport0 + j).
  • outsocket = TCP socket connected to Node(j+1).
  • insocket = TCP socket connected to Node(j-1).
  • sbuf (bytearray): buffer to hold an outgoing packet.
  • sbufview: memoryview of sbuf (slicing sbufview is cheap).
  • rbuf (bytearray): buffer to hold an incoming packet.
  • rbufview: memoryview of rbuf.
  • rcvdmsgqueue: queue of incoming (flag,msg) tuples for local user.
  • testerproxy: Used iff Node(j) is started within a test node, in which case Node(j) can call testerproxy.inform_tester(e) to inform the servicetester of local event e.

Each user message is placed in a packet with an 8-byte header (consisting of 2-byte src id, 2-byte dst id, 4-byte datalen) and data of length datalen (consisting of the serialized user message).

Packets travel only in one direction of the ring. A message sent by Node(j) to Node(k) goes via nodes j+1, j+2, …, k. Each node has a local thread that receives packets from insocket (from the previous node) and puts them in rcvdmsgqueue or forwards them to outsocket (to the next node).

The end function should be called at one address only. The call initiates an END message that travels along the ring, closing each node it encounters. The END message has a datalen field of 0.


Starting the msgtransfer nodes from user nodes

Module sesf.msgtransfer.user_node has class UserNode that implements a user node that starts a given msgtransfer node. If the peer user nodes are also started, the collection of user nodes exercise the msgtransfer service provided by the msgtransfer nodes.

For example, to start an N-process system using imp_0 msgtransfer nodes, run the N shell commands python user_node.py N j --use imp_0.node N j where j ranges over 0, 1, …, N-1. (Or run python ../start_nodes.py N user_node.py --use imp_0.node.)

The user nodes use the service provided by the two imp_0 msgtransfer nodes to exchange a few messages, then end the msgtransfer service and the N processes.

The user nodes can also make use of a running msgtransfer servicemodel, instead of a msgtransfer implementation. Simply replace imp_0.node by service_node in the above commands.

class sesf.msgtransfer.user_node.UserNode(argv)[source]

Service program (abstract version)

Extending the (pseudo-Python) service program for two addresses to more than 2 addresses has interesting consequences. Suppose the program maintains the following variables for every address j and k: sentj,k, the sequence of messages sent at j to k; and rcvdk, the sequence of messages received at k (from all addresses). Consider the receive function at say address 0. Its return condition would say that message m can be returned iff there is a merge s of sent1,0, …, sentN-1,0 such that rcvd0 concatentated with m is a prefix of s.

This works but it’s not convenient. The concrete service and servicetester programs would be computationally expensive. Using the abstract service and servicetester program in proofs would be messy.

A better approach is to allow internal non-determinism in the service program. Instead of rcvdk, suppose the program maintains nrcvdj,k, the number of messages received at k from j. Also have an additional parameter j in the return part of recv(0). Then the return condition can say that message m can be returned iff there is a j such that m equals sentj,0[nrcvdj,k]. If more than one j satisfies this condition, a particular j is chosen non-deterministically. Furthermore, the non-determinism is internal because the environment does not see the chosen value of j (although it may able to infer it at some later point after receiving more messages).

Using this approach, the service is formally defined by the pseudo-Python class Service in msgtransfer.service.ppy. As usual, each function has a parameter j identifying the address at which the function is located.


Service program (concrete version)

Module sesf.msgtransfer.serviceparts has class ServiceParts, which has the component functions of sesf.msgtransfer.service.

class msgtransfer.serviceparts.ServiceParts(num_nodes)[source]

Recall the internal parameter k in the return part of the pseudo-Python service function recv(j). Note what happens to it in the Python version of the service. In function recv_RCI(j,rval), the input version of recv(j).RC, a value for k is provided by the implementation’s rval. In function recv_RCO(j), the output version of recv(j).RC, the function generates a value for k.


Module sesf.msgtransfer.service_node has class Node that has the signature of a msgtransfer implementation node and redirects incoming calls to RPC calls to a running service with pyrodaemon sesf.msgtransfer.service.

class sesf.msgtransfer.service_node.Node(argv)[source]

Module sesf.msgtransfer.service has class Service, which is the concrete version of msgtransfer service. RPC-accessable via pyrodaemon sesf.msgtransfer.service. Run python service.py <num_nodes> to start a Service instance for num_nodes addresses, which can then be accessed by a distributed program that uses the msgtransfer service.

class sesf.msgtransfer.service.Service(num_nodes)[source]

Note that function recv(j) does not return the value of the internal parameter k generated by recv_RCO(j).

Running user nodes over the service

The msgtransfer user node class can make use of the service model (instead of an implementation).

  • Start a Pyro4 nameserver if one is not already running: enter python -m Pyro4.naming & in a shell.
  • Run python service.py N, to start the service program for N addresses.
  • Run python user_node.py N j --use service_node N j for j ranging over 0, 1, …, N-1. (Or run python ../start_nodes.py N user_node.py --use service_node.)

Servicetester (abstract version)

TODO

Servicetester (concrete version)

Recall that the service tester is a program that can be used in place of the users of the service, typically for testing a (distributed) implementation of the service. For testing, each node of the implementation is wrapped in a “test node”. The service tester makes RPC calls to the test nodes. A test node redirects incoming RPC calls to its implementation node.

The service tester can also use a checker, if one is provided with the implementation, to check implementation-specific assertions. In this case, each implementation node can inform the servicetester of local events by calling the node’s inform_tester() function. This function has no effect if the implementation node has not been started by a test node.


class TestNode

Module sesf.msgtransfer.test_node has class TestNode that implements an RPC wrapper for a node of an implementation of msgtransfer service. Run python test_node.py N j --use imp_0.imp N j to start a process with a TestNode object wrapping an imp_0.imp implementation node.

class msgtransfer.test_node.TestNode(argv)[source]

class ServiceTester

Module sesf.msgtransfer.servicetester has class ServiceTester that implements the msgtransfer servicetester. This can test a distributed implementation of the msgtransfer service. For example, to test a distributed system of N imp_0 implementation nodes:

  • Start a Pyro4 nameserver if one is not already running: run python -m Pyro4.naming &.
  • Run python test_node.py N j --use imp_0.node N j for j ranging over 0, 1, …, N-1. (Or run python start_nodes.py N test_node.py --use imp_0.node.)
  • Run python servicetester.py N to test only against the service.
  • Or run python servicetester.py N --checker imp_0.checker N to test against the service and the implementation-specific assertions in imp_0.checker.Checker.
  • Or run python servicetester.py N --log <logfile> --checker imp_0.checker N to also generate an event log in file arthistory.pickle, which can later be played by running python logplayer.py logfile.
class msgtransfer.servicetester.ServiceTester(argv, test_duration=0.5)[source]

Note that the return part of function do_recv(j) expects the implementation-supplied rval to have a value for the internal parameter k of recv_RCI(j). This is easily achieved by having the servicetester include the value in the message it sends (in the call part of send(j)); no modification to the implementation node’s code is needed.


Checker for implementation imp_0

class msgtransfer.imp_0.checker.Checker(servicetester, argv)[source]

Maintain the global state of a distributed system of msgtransfer.imp_0 nodes and check some assertions. Created by msgtransfer.servicetester when testing the distributed system. Maintains the following variables:

  • servicetester: the servicetester object that started this checker object
  • n: number of (msgtransfer.imp0) nodes
  • sent: sent state maintained by servicetester.serviceparts object
  • transit_counts: map indexed by node pairs of lists of ints. transit_counts[(j,k)] is a list of (k-j)%n+1 ints, where list entry p is the number of packets of flow (j,k) that have passed by node (j+p)%n.

Function handle_event(e):

  • RPC-called by an node to report an event e at the node. Here e is a (p,j,k,msg) tuple indicating the generation or arrival at node p of a packet with srcid j, dstid k, and data equal to serialized msg.
  • Update the global state according to e and check if following assertion still holds: for every event (p, j, k, msg): sent[(j,k)][transit_counts[(j,k)][(j+k)%n]] == msg

Log player

Module sesf.msgtransfer.logplayer can play a log file generated by msgtransfer.servicetester. Run python logplayer.py <logfile> --sart <tester_art>

class msgtransfer.logplayer.Artbase[source]