Source code for sesf.objtransfer.imp_dpr.node

"""
Module **sesf.objtransfer.imp_dpr.node** has class ``Node`` that implements
a node of an objtransfer service implementation for N addresses, N |geq| 2.
The implementation is based on the distributed path-reversal algorithm.
It uses the msgtransfer service for N addresses, choosing, by default,
the msgtransfer implementation in sesf.msgtransfer.imp_0.node.Node.

Run the N commands ``python user_node.py N j k --use imp_dpr/node.py N j k``
for ``j`` ranging over 0..N-1 to start, exercise and close the objtransfer
service spanning N locations, using the default msgtransfer implementation.
(Or run ``python ../start_nodes.py N user_node.py --use imp_dpr.node``.)
"""

from threading import Thread, Lock, Condition
import argparse, random, importlib, time, sys


[docs]class Node(): """One node of a distributed program that uses distributed pathreversal to implement the distributed object transfer service over a msgtransfer service. Args: - mt_node: used by this node to exchange messages with other nodes - n: number of nodes; their ids are 0, 1, ..., n-1 - myid: id of this node, in 0..n-1 - initial_owner: id of the initial owner's node Messages: - ['OBJ', oval]: conveys the object; oval is its value - ['REQ', j]: request for object by node j - ['END', j]: request by node j for recipient to end - ['ENDACK', j]: response by node j to receiving an END message """ def __init__(self, argv, testerproxy=None): # argv: node.py num_nodes myid initial_owner --use msgtransfer_node_argv print("create node from args", argv) p = argparse.ArgumentParser() p.add_argument("n", type=int, help='number of nodes') p.add_argument("myid", type=int, help='id of this node, in 0..n-1') p.add_argument("initial_owner", type=int, help='node id of initial owner') p.add_argument("--use", nargs=argparse.REMAINDER, help='zero or more "--use node_module args" strings') args = p.parse_args(argv[1:]) print("create_node_from args:", args) self.testerproxy = testerproxy self.n = args.n self.myid = args.myid self.initial_owner = args.initial_owner if args.use == None: print("Using default mt_node module: sesf.msgtransfer.imp_0.node") args.use = ['sesf.msgtransfer.imp_0.node', str(args.n), str(args.myid)] nodemodule = importlib.import_module(args.use[0]) print("creating_node_from_module", nodemodule) mt_node = nodemodule.Node(args.use) print("mt_node", mt_node) self.ending = False self.unacked_endmsgs = args.n - 1 self.mt_node = mt_node # true if object is here (in transit to) or with local user self.owner = (self.myid == self.initial_owner) # (True, oval) if object is here (in transit to user), o/w (False,) self.objbuf = (False,) # true if request is here (in transit to user) and not ending, o/w False self.reqbuf = False # next pointer: id of the node to next eat after this node eats self.nextptr = None # last pointer: id of the node of the last request received by this node if self.myid == self.initial_owner: self.lastptr = None else: self.lastptr = self.initial_owner # lock and conditions for atomic update of local state self.lck = Lock() self.condition_obj = Condition(self.lck) self.condition_req = Condition(self.lck) self.condition_end = Condition(self.lck) # thread to receive messages from mt_node Thread(name="rcvthread", target=self.do_recv).start() def inform_tester(self, event): """Inform tester of local event if running with service tester.""" if self.testerproxy != None: # print('inform_tester: event', event, flush=True) self.testerproxy.inform_tester(event) def acq(self): """Acquire object and return (True, objvalue) or (False,) if ending. Blocking.""" # print("acq: call") with self.lck: if self.ending: # print("acq: return (False,)") return (False,) # print(3, "acq: send REQ") self.inform_tester((self.myid, 'sendreq', self.myid, self.lastptr)) self.mt_node.send(self.lastptr, ['REQ', self.myid]) self.lastptr = None with self.condition_obj: while not (self.objbuf[0] or self.ending): self.condition_obj.wait() # print(3, "acq: rcvd OBJ", self.objbuf) rval = self.objbuf self.objbuf = (False,) # print(2, "acq: return", rval) return rval def recvreq(self): """Input. Return True if a request has been received and not ending. Return false if ending. Blocking.""" # print(2, "recvreq: call") with self.condition_req: while not (self.reqbuf or self.ending): self.condition_req.wait() rval = not self.ending # print(2, "recvreq: return", rval) self.reqbuf = False return rval def rel(self, oval): """Input. Release object with value oval (can not be None).""" # print(2, "rel: call") self.inform_tester((self.myid, 'sendobj', None, self.nextptr)) # print(3, "rel: sending", ['OBJ', oval]) with self.lck: self.mt_node.send(self.nextptr, ['OBJ', oval]) self.nextptr = None self.owner = False return None def end(self): """Input. Send END msg to every other node. Return when all END msgs are acked. Blocking.""" # print(2, "end: call") with self.lck: # print(3, "rel: send END msgs") for j in range(self.n): if j != self.myid: self.mt_node.send(j, ['END', self.myid]) with self.condition_end: while self.unacked_endmsgs > 0: self.condition_end.wait() # print(2, "end: return") print("ENDING mt_node", self.myid, flush=True) self.mt_node.end() def do_recv(self): """Receive a message (blocking) and process it. Repeat until node ends. Run by a local thread.""" # print(2, "do_recv: call") while not self.ending: # mt_node.recv() can, in general, return (False,) or (True, msg)). # (False,) means mt_node is ended; this should not happen here coz # mt_node is closed only after dist_objtransfer_pathreversal ends. flag, (msghdr, msgdat) = self.mt_node.recv() if not flag: raise Exception("mt_node ended prematurely") # print(3, "do_recv: rcvd ", msghdr, msgdat) with self.lck: if msghdr == 'OBJ': self.objbuf = (True, msgdat) self.owner = True self.condition_obj.notify() elif msghdr == 'REQ': if self.lastptr != None: self.inform_tester((self.myid, 'fwdreq', msgdat, self.lastptr)) self.mt_node.send(self.lastptr, [msghdr, msgdat]) self.lastptr = msgdat else: self.lastptr = msgdat self.nextptr = msgdat self.reqbuf = True self.inform_tester((self.myid, 'recvreq', msgdat, None)) self.condition_req.notify() elif msghdr == 'END': self.ending = True self.condition_obj.notify() self.condition_req.notify() self.mt_node.send(msgdat, ['ENDACK', self.myid]) elif msghdr == 'ENDACK': self.unacked_endmsgs -= 1 if self.unacked_endmsgs == 0: self.condition_end.notify() self.ending = True self.condition_req.notify() else: raise Exception("unknown msg header:", msghdr)
########### END class Node ##########################