Source code for distlock2.imp_0.node

"""
Module **sesf.distlock2.imp_0.node** has class ``Node`` that implements
a node of a distlock2 implementation making use of a msgtransfer2 node.
The following commands start a distributed system of two processses,
each with a ``Node`` instance that uses a msgtransfer2 node.

- ``python imp_0/node.py 2 0 --use sesf.msgtransfer2.imp_0.node 2 0``
- ``python imp_0/node.py 2 1 --use sesf.msgtransfer2.imp_0.node 2 1``
- Or instead of the above, ``python ../start_nodes.py 2 imp_0/node.py
  --use sesf.msgtransfer2.imp_0.node``.

The two processes exercise the distlock2 service provided by the two
distlock2 nodes and then end the distlock2 service (which also ends
the msgtransfer2 service being used).

If the commands do not indicate a msgtransfer2 implementation (ie, the
"use" argument is not present), the distlock2 nodes use a default
msgtransfer2 implementation.

The distlock2 nodes can also make use of a running msgtransfer2
servicemodel, instead of a msgtransfer2 implementation. Just replace
``msgtransfer2.imp_0.node`` by ``msgtransfer2.model_node`` in the above
commands.
"""

from threading import Thread, Lock, Condition
import argparse, time, random, sys, importlib

[docs]class Node(): """ A node of a distributed system that implements the distlock2 service. The distributed system consists of two Node instances with addresses 0 and 1. For brevity, refer to the node at adddress j as node_j, where j ranges over 0 and 1. After node_j completes initialization, it maintains the following variables: - ``myid``: id of this node; 0 or 1. - ``mtnode``: msgtransfer2 node used by this node. - ``status``: 'T' ('thinking'), 'H' ('hungry'), 'E' ('eating'); initially 'T'. - ``token``: True iff node holds token; initially true iff j=0. - ``req``: True iff node holds request for token; initially true diff j=1. - ``recv_thread``: local thread that receives messages from peer. """ # def __init__(self, num_nodes, myid, mtnode, testerproxy=None): def __init__(self, argv, testerproxy=None): p = argparse.ArgumentParser( usage='python node.py [-h] n myid --use node_argv') p.add_argument("num_nodes", type=int, help='number of nodes: must be 2') p.add_argument("myid", type=int, help='id of this node: 0 or 1') p.add_argument("--use", nargs=argparse.REMAINDER, help='zero or more "--use node_argv" strings') args = p.parse_args(argv[1:]) print("create_node_from args:", args) if args.use == None: print("Using sesf.msgtransfer2.imp_0.node for msgtransfer2 node") nodemodule = importlib.import_module('sesf.msgtransfer2.imp_0.node') mtnode = nodemodule.Node( ['placeholder', str(args.num_nodes), str(args.myid)]) else: nodemodule = importlib.import_module(args.use[0]) print("creating_node_from_module", nodemodule) mtnode = nodemodule.Node(args.use) print("msgtransfer node", mtnode) self.testerproxy = testerproxy print('starting distlock2.imp_0.node', args.myid, flush=True) self.num_nodes = args.num_nodes self.myid = args.myid self.mtnode = mtnode self.status = 'T' self.token = (self.myid == 0) self.req = not self.token self.lck = Lock() self.cv = Condition(self.lck) self.recv_thread = Thread(target=self.do_recvs) self.recv_thread.start() def inform_tester(self, event): """Inform tester of local event""" # import pdb; pdb.set_trace() if self.testerproxy != None: self.testerproxy.inform_tester(event) def do_recvs(self): while True: msgtuple = self.mtnode.recv() if msgtuple == (False,): time.sleep(10) return flag, msg = msgtuple # print(self.myid, 'recvd', msg, flush=True) with self.cv: if msg == 'REQ': self.req = True if self.status == 'T': # print(self.myid, 'do_recvs: send TOKEN', flush=True) self.mtnode.send('TOKEN') self.token = False elif msg == 'TOKEN': self.token = True self.cv.notify() def acq(self): with self.cv: self.status = 'H' self.inform_tester((self.myid, 'H')) # print(self.myid, 'acq: status/token/req', # self.status, self.token, self.req, flush=True) if not self.token: while not self.req: self.cv.wait() # print(self.myid, 'acq: send REQ', flush=True) self.mtnode.send('REQ') self.req = False while not self.token: self.cv.wait() self.status = 'E' self.inform_tester((self.myid, 'E')) return def rel(self): with self.cv: self.status = 'T' self.inform_tester((self.myid, 'T')) # print(self.myid, 'rel: status/token/req', # self.status, self.token, self.req, flush=True) if self.req: # print(self.myid, 'rel: send TOKEN', flush=True) self.mtnode.send('TOKEN') self.token = False return def end(self): self.mtnode.end()
############# END class sesf.distlock2.imp_0.node.Node #################