Source code for sesf.msgtransfer.user_node

"""
Module **sesf.msgtransfer.user_node** has class ``UserNode`` that
implements a user node that starts a given msgtransfer node.
If the peer user nodes are also started, the collection of user nodes
exercise the msgtransfer service provided by the msgtransfer nodes.

For example, to start an N-process system using imp_0 msgtransfer nodes,
run the N shell commands ``python user_node.py N j --use imp_0.node N j``
where ``j`` ranges over 0, 1, ..., N-1.
(Or run ``python ../start_nodes.py N user_node.py --use imp_0.node``.)

The user nodes use the service provided by the two imp_0 msgtransfer nodes
to exchange a few messages, then end the msgtransfer service and the
N processes.

The user nodes can also make use of a running msgtransfer *servicemodel*,
instead of a msgtransfer implementation. Simply replace ``imp_0.node``
by ``service_node`` in the above commands.
"""

from threading import Thread
import importlib, random, time, sys, argparse

[docs]class UserNode(): def __init__(self, argv): print('starting msgtransfer.user_node', argv, flush=True) # import pdb; pdb.set_trace() p = argparse.ArgumentParser( usage='python user_node.py [-h] n myid --use node_argv', description='create UserNode(n, myid) using node_argv/default node') p.add_argument("n", type=int, help='number of nodes') p.add_argument("myid", type=int, help='id of this node, in 0..n-1') p.add_argument("--use", nargs=argparse.REMAINDER, help='zero or more "--use node_argv" strings') args = p.parse_args(argv[1:]) print("create node from args:", args) if args.use == None: print("Using node from module sesf.msgtransfer.imp_0.node") nodemodule = importlib.import_module('sesf.msgtransfer.imp_0.node') mt_node = nodemodule.Node( ['imp_0.node', str(args.n), str(args.myid)]) else: nodemodule = importlib.import_module(args.use[0]) print("Using node from module", nodemodule) mt_node = nodemodule.Node(args.use) print("msgtransfer node", mt_node) self.mt_node = mt_node self.n = args.n self.myid = args.myid self.recv_thread = Thread(target=self.do_recvs) self.recv_thread.start() def do_recvs(self): while True: rval = self.mt_node.recv() print("user rcvd", rval, flush=True) if rval == (False,): break def do_sends(self): def random_destination(): distance = random.randint(1, self.n-1) return (self.myid + distance) % self.n for i in range(5): time.sleep(4*random.random()) # msg = [self.myid, random.randint(0, 10000)] msg = random.randint(0, 1000) print("user send", msg, flush=True) rval = self.mt_node.send(random_destination(), msg) if not rval: break
##### END class UserNode in module msgtransfer.user_node ################## if __name__ == '__main__': usernode = UserNode(sys.argv) usernode.do_sends() if usernode.myid == 0: usernode.mt_node.end() usernode.recv_thread.join() print("user node ended", flush=True) time.sleep(10)