Source code for sesf.msgtransfer.user_node
"""
Module **sesf.msgtransfer.user_node** has class ``UserNode`` that
implements a user node that starts a given msgtransfer node.
If the peer user nodes are also started, the collection of user nodes
exercise the msgtransfer service provided by the msgtransfer nodes.
For example, to start an N-process system using imp_0 msgtransfer nodes,
run the N shell commands ``python user_node.py N j --use imp_0.node N j``
where ``j`` ranges over 0, 1, ..., N-1.
(Or run ``python ../start_nodes.py N user_node.py --use imp_0.node``.)
The user nodes use the service provided by the two imp_0 msgtransfer nodes
to exchange a few messages, then end the msgtransfer service and the
N processes.
The user nodes can also make use of a running msgtransfer *servicemodel*,
instead of a msgtransfer implementation. Simply replace ``imp_0.node``
by ``service_node`` in the above commands.
"""
from threading import Thread
import importlib, random, time, sys, argparse
[docs]class UserNode():
def __init__(self, argv):
print('starting msgtransfer.user_node', argv, flush=True)
# import pdb; pdb.set_trace()
p = argparse.ArgumentParser(
usage='python user_node.py [-h] n myid --use node_argv',
description='create UserNode(n, myid) using node_argv/default node')
p.add_argument("n", type=int, help='number of nodes')
p.add_argument("myid", type=int, help='id of this node, in 0..n-1')
p.add_argument("--use", nargs=argparse.REMAINDER,
help='zero or more "--use node_argv" strings')
args = p.parse_args(argv[1:])
print("create node from args:", args)
if args.use == None:
print("Using node from module sesf.msgtransfer.imp_0.node")
nodemodule = importlib.import_module('sesf.msgtransfer.imp_0.node')
mt_node = nodemodule.Node(
['imp_0.node', str(args.n), str(args.myid)])
else:
nodemodule = importlib.import_module(args.use[0])
print("Using node from module", nodemodule)
mt_node = nodemodule.Node(args.use)
print("msgtransfer node", mt_node)
self.mt_node = mt_node
self.n = args.n
self.myid = args.myid
self.recv_thread = Thread(target=self.do_recvs)
self.recv_thread.start()
def do_recvs(self):
while True:
rval = self.mt_node.recv()
print("user rcvd", rval, flush=True)
if rval == (False,):
break
def do_sends(self):
def random_destination():
distance = random.randint(1, self.n-1)
return (self.myid + distance) % self.n
for i in range(5):
time.sleep(4*random.random())
# msg = [self.myid, random.randint(0, 10000)]
msg = random.randint(0, 1000)
print("user send", msg, flush=True)
rval = self.mt_node.send(random_destination(), msg)
if not rval:
break
##### END class UserNode in module msgtransfer.user_node ##################
if __name__ == '__main__':
usernode = UserNode(sys.argv)
usernode.do_sends()
if usernode.myid == 0:
usernode.mt_node.end()
usernode.recv_thread.join()
print("user node ended", flush=True)
time.sleep(10)