"""
Module **sesf.objtransfer.imp_dpr.node** has class ``Node`` that implements
a node of an objtransfer service implementation for N addresses, N |geq| 2.
The implementation is based on the distributed path-reversal algorithm.
It uses the msgtransfer service for N addresses, choosing, by default,
the msgtransfer implementation in sesf.msgtransfer.imp_0.node.Node.
Run the N commands ``python user_node.py N j k --use imp_dpr/node.py N j k``
for ``j`` ranging over 0..N-1 to start, exercise and close the objtransfer
service spanning N locations, using the default msgtransfer implementation.
(Or run ``python ../start_nodes.py N user_node.py --use imp_dpr.node``.)
"""
from threading import Thread, Lock, Condition
import argparse, random, importlib, time, sys
[docs]class Node():
"""One node of a distributed program that uses distributed pathreversal
to implement the distributed object transfer service over a msgtransfer
service.
Args:
- mt_node: used by this node to exchange messages with other nodes
- n: number of nodes; their ids are 0, 1, ..., n-1
- myid: id of this node, in 0..n-1
- initial_owner: id of the initial owner's node
Messages:
- ['OBJ', oval]: conveys the object; oval is its value
- ['REQ', j]: request for object by node j
- ['END', j]: request by node j for recipient to end
- ['ENDACK', j]: response by node j to receiving an END message
"""
def __init__(self, argv, testerproxy=None):
# argv: node.py num_nodes myid initial_owner --use msgtransfer_node_argv
print("create node from args", argv)
p = argparse.ArgumentParser()
p.add_argument("n", type=int, help='number of nodes')
p.add_argument("myid", type=int, help='id of this node, in 0..n-1')
p.add_argument("initial_owner", type=int, help='node id of initial owner')
p.add_argument("--use", nargs=argparse.REMAINDER,
help='zero or more "--use node_module args" strings')
args = p.parse_args(argv[1:])
print("create_node_from args:", args)
self.testerproxy = testerproxy
self.n = args.n
self.myid = args.myid
self.initial_owner = args.initial_owner
if args.use == None:
print("Using default mt_node module: sesf.msgtransfer.imp_0.node")
args.use = ['sesf.msgtransfer.imp_0.node', str(args.n), str(args.myid)]
nodemodule = importlib.import_module(args.use[0])
print("creating_node_from_module", nodemodule)
mt_node = nodemodule.Node(args.use)
print("mt_node", mt_node)
self.ending = False
self.unacked_endmsgs = args.n - 1
self.mt_node = mt_node
# true if object is here (in transit to) or with local user
self.owner = (self.myid == self.initial_owner)
# (True, oval) if object is here (in transit to user), o/w (False,)
self.objbuf = (False,)
# true if request is here (in transit to user) and not ending, o/w False
self.reqbuf = False
# next pointer: id of the node to next eat after this node eats
self.nextptr = None
# last pointer: id of the node of the last request received by this node
if self.myid == self.initial_owner:
self.lastptr = None
else:
self.lastptr = self.initial_owner
# lock and conditions for atomic update of local state
self.lck = Lock()
self.condition_obj = Condition(self.lck)
self.condition_req = Condition(self.lck)
self.condition_end = Condition(self.lck)
# thread to receive messages from mt_node
Thread(name="rcvthread", target=self.do_recv).start()
def inform_tester(self, event):
"""Inform tester of local event if running with service tester."""
if self.testerproxy != None:
# print('inform_tester: event', event, flush=True)
self.testerproxy.inform_tester(event)
def acq(self):
"""Acquire object and return (True, objvalue) or (False,) if ending.
Blocking."""
# print("acq: call")
with self.lck:
if self.ending:
# print("acq: return (False,)")
return (False,)
# print(3, "acq: send REQ")
self.inform_tester((self.myid, 'sendreq', self.myid, self.lastptr))
self.mt_node.send(self.lastptr, ['REQ', self.myid])
self.lastptr = None
with self.condition_obj:
while not (self.objbuf[0] or self.ending):
self.condition_obj.wait()
# print(3, "acq: rcvd OBJ", self.objbuf)
rval = self.objbuf
self.objbuf = (False,)
# print(2, "acq: return", rval)
return rval
def recvreq(self):
"""Input. Return True if a request has been received and not ending.
Return false if ending. Blocking."""
# print(2, "recvreq: call")
with self.condition_req:
while not (self.reqbuf or self.ending):
self.condition_req.wait()
rval = not self.ending
# print(2, "recvreq: return", rval)
self.reqbuf = False
return rval
def rel(self, oval):
"""Input. Release object with value oval (can not be None)."""
# print(2, "rel: call")
self.inform_tester((self.myid, 'sendobj', None, self.nextptr))
# print(3, "rel: sending", ['OBJ', oval])
with self.lck:
self.mt_node.send(self.nextptr, ['OBJ', oval])
self.nextptr = None
self.owner = False
return None
def end(self):
"""Input. Send END msg to every other node.
Return when all END msgs are acked. Blocking."""
# print(2, "end: call")
with self.lck:
# print(3, "rel: send END msgs")
for j in range(self.n):
if j != self.myid:
self.mt_node.send(j, ['END', self.myid])
with self.condition_end:
while self.unacked_endmsgs > 0:
self.condition_end.wait()
# print(2, "end: return")
print("ENDING mt_node", self.myid, flush=True)
self.mt_node.end()
def do_recv(self):
"""Receive a message (blocking) and process it. Repeat until node ends.
Run by a local thread."""
# print(2, "do_recv: call")
while not self.ending:
# mt_node.recv() can, in general, return (False,) or (True, msg)).
# (False,) means mt_node is ended; this should not happen here coz
# mt_node is closed only after dist_objtransfer_pathreversal ends.
flag, (msghdr, msgdat) = self.mt_node.recv()
if not flag:
raise Exception("mt_node ended prematurely")
# print(3, "do_recv: rcvd ", msghdr, msgdat)
with self.lck:
if msghdr == 'OBJ':
self.objbuf = (True, msgdat)
self.owner = True
self.condition_obj.notify()
elif msghdr == 'REQ':
if self.lastptr != None:
self.inform_tester((self.myid, 'fwdreq', msgdat, self.lastptr))
self.mt_node.send(self.lastptr, [msghdr, msgdat])
self.lastptr = msgdat
else:
self.lastptr = msgdat
self.nextptr = msgdat
self.reqbuf = True
self.inform_tester((self.myid, 'recvreq', msgdat, None))
self.condition_req.notify()
elif msghdr == 'END':
self.ending = True
self.condition_obj.notify()
self.condition_req.notify()
self.mt_node.send(msgdat, ['ENDACK', self.myid])
elif msghdr == 'ENDACK':
self.unacked_endmsgs -= 1
if self.unacked_endmsgs == 0:
self.condition_end.notify()
self.ending = True
self.condition_req.notify()
else:
raise Exception("unknown msg header:", msghdr)
########### END class Node ##########################