6. A distributed object-transfer service

We define an object-transfer service spanning N locations, identified by addresses 0, 1, …, N-1. The service allows the users to share a mutable object (eg, integer, list, etc). A user can acquire the object, modify it, and release it when asked by the service (typically because another user wants the object). The service ensures the following:

  • At most one user holds the object at any time.
  • When a user acquires the object, its value is equal to the value it had when it was last released.
  • A user asking for the object eventually acquires the object provided no user holds the object indefinitely.

The service differs from a distributed lock service in that the object is mutable and acquire should return the value in the last release. Also, the object ordinarily resides with a user and is in the service only when moving from one user to another.

At each address j, four functions are provided to the user:

  • acq(): acquire the object.
    • Return (False,) only if the service is ending. Otherwise return (True,oval), indicating that the caller has acquired the object, where oval is the object’s value.
    • Call only if the user at j does not hold the object, no acq() call is ongoing at address j, and a previous acq() call at j has not returned (False,).
  • rel(oval): release the object with value oval.
    • Return None.
    • Call only if the user at j holds the object and has not called end()
  • recvreq(): receive a request from the service to release the object.
    • Return False only if the service is ending. Return True only if the user at j holds the object or has an ongoing acq() call.
    • Call only if no recvreq() call is ongoing at address j and no previous recvreq() call at j has returned False.
    • For every address k, messages?? received from k are in the order in which they were sent at address k. Subject to this constraint, messages?? sent to j from different addresses can be received with any interleaving.
  • end(): Close the service.
    • Return None.
    • Call only if end() has not been called at any address.

Service program (abstract version)

The above service is formally defined by the pseudo-Python class DistMsgpasserService below (with the standard conventions applying to parameter j).

class DistObjtransferService():
  def __init__(self, nids, initial_owner):
    """nids (int): number of nodes.
    initial_owner: node whose owner initially holds the object.
    objstatus: (True,oval) if obj in transit with value oval, else (False,).
    hasobj[j]: True iff node j's owner holds object.
    hasreq[j]: True iff node j's owner has a pending request.
    ender: id of node whose owner called end.
    ending: True iff end has been called.
    ongoing_acq[j]: True iff acq(j) call is ongoing.
    closed_acq[j]: True iff acq(j) call has returned (False,).
    ongoing_recvreq[j]: True iff recvreq(j) call is ongoing.
    closed_recvreq[j]: True iff recvreq(j) call has returned False.
    """
    self.nids = nids
    self.initial_owner = initial_owner
    self.objstatus = (False,)
    self.hasobj = [k == initial_owner for k in range(nids)]
    self.hasreq = [False for k in range(nids)]
    self.ender = None
    self.ending = False
    self.ongoing_acq = [False for k in range(nids)]
    self.closed_acq = [False for k in range(nids)]
    self.ongoing_recvreq = [False for k in range(nids)]
    self.closed_recvreq = [False for k in range(nids)]

  input acq(self, j):    # return (False,) | (True, oval)
    CC:
      0 <= j < self.nids and not self.hasobj[j] and
      not self.ongoing_acq[j] and not self.closed_acq[j]
    CU:
      self.ongoing_acq[j] = True
    RETURN(rval):
      # rval is (False,) or (True, oval)
      RC:
        rval == self.objstatus and (rval != (False,) or self.ending)
      RU:
        self.ongoing_acq[j] = False
        self.objstatus = (False,)
        if rval != (False,):
          self.hasobj[j] = True
        else:
          self.hasobj[j] = False
          self.closed_acq[j] = True

  input rel(self, j, oval):    # return None
    CC:
      0 <= j < self.nids and self.hasobj[j] and
      self.hasreq[j] and self.ender != j
    CU:
      self.hasobj[j] = False
      self.hasreq[j] = False
      self.objstatus = (True, oval)
    RETURN():
      RC:
        True
      RU:
        return None

  input recvreq(self, j):    # return True | False
    CC:
      0 <= j < self.nids and not self.ongoing_recvreq[j] and
      not self.closed_recvreq[j]
    CU:
      self.ongoing_recvreq[j] = True
    RETURN(rval):
      RC:
       (rval == False and self.ending) or
       # note: can return True even if no one else is asking for obj
       (rval == True and (self.hasobj[j] or self.ongoing_acq[j]))
      RU:
        self.hasreq[j] = rval
        self.ongoing_recvreq[j] = False
        if not rval:
          self.closed_recvreq[j] = True
        return rval

  input end(self, j):    # return None
    CC:
      0 <= j < self.nids and not self.ending
    CU:
      self.ending = True
      self.ender = j
    RETURN():
      RC:
        True
      RU:
        return None

Service program (concrete version)

TODO

Servicetester (abstract version)

TODO

Servicetester (concrete version)

class sesf.objtransfer.servicetester.ServiceTester(num_nodes, initial_owner, checker_argv, log, min_acqs_todo=4)[source]

The tester includes an instance of class DistPathreversalChecker, which checks some some assertions of the particular implementation given below. When running the tester against another implementation, comment out or replace the checker.

Each node of the implementation has to be run with the following testing wrapper:

class sesf.objtransfer.test_node.TestNode(myid, use)[source]

Implementation imp_dpr (distributed pathreversal,uses msgtransfer service

Module sesf.objtransfer.imp_dpr.node has class Node that implements a node of an objtransfer service implementation for N addresses, N ≥ 2. The implementation is based on the distributed path-reversal algorithm. It uses the msgtransfer service for N addresses, choosing, by default, the msgtransfer implementation in sesf.msgtransfer.imp_0.node.Node.

Run the N commands python user_node.py N j k --use imp_dpr/node.py N j k for j ranging over 0..N-1 to start, exercise and close the objtransfer service spanning N locations, using the default msgtransfer implementation. (Or run python ../start_nodes.py N user_node.py --use imp_dpr.node.)

class sesf.objtransfer.imp_dpr.node.Node(argv, testerproxy=None)[source]

One node of a distributed program that uses distributed pathreversal to implement the distributed object transfer service over a msgtransfer service.

Args:

  • mt_node: used by this node to exchange messages with other nodes
  • n: number of nodes; their ids are 0, 1, …, n-1
  • myid: id of this node, in 0..n-1
  • initial_owner: id of the initial owner’s node

Messages:

  • [‘OBJ’, oval]: conveys the object; oval is its value
  • [‘REQ’, j]: request for object by node j
  • [‘END’, j]: request by node j for recipient to end
  • [‘ENDACK’, j]: response by node j to receiving an END message

Checker for this implementation

The following checks some internal invariants of this distributed implementation and displays some dynamic graphs (for which it uses matplotlib). It can be used in the tester when testing this implementation.

class sesf.objtransfer.imp_dpr.checker.Checker(tester, n, initial_owner)[source]

Maintain the global state of the distributed path-reversal system (including displays of lr and pr graphs). Provides a function that takes a given event at a node and checks whether the resulting state change preserves desired global invariants. The distributed objtransfer tester creates an instance of this class when testing a distributed path-reversal-based implementation.

State:

  • the[j]: node j status: ‘t’ (thinking) | ‘h’ (hungry) | ‘e’ (eating)
  • objdest: msg [OBJ,oval]’s location or destination if in transit
  • lastptr[j]: node j’s last pointer; None if not pointing to a node
  • nextptr[j]: node j’s next pointer; None if not pointing to a node
  • reqdest[j]: msg [REQ,j]’s destination; None if [REQ,j] not in transit
  • pr_graph[j]: [k such that (k,j) is a pr-edge, ie, reqdest[k] == j or nextptr[j] == k or (lastptr[k] == j and k thinking)]. Note: pr_graph[j] allows duplicates (but there shouldn’t be any)
  • dpr_art: art for dynamic dist_pathreversal graphs

Events: [event location, event type, msg parameter, msg destination]

  • [j, ‘sendreq’, j, k]: node j sends [REQ,j] to node k
  • [j, ‘fwdreq’, i, k]: node j receives [REQ,i], forwards it to node k
  • [j, ‘recvreq’, i, None]: node j receives [REQ,i] (does not forward it)
  • [j, ‘sendobj’, oval, k]: node j sends [OBJ, oval] to node k
  • [j, ‘recvobj’, oval, None]: node j receives [OBJ,oval]