Source code for msgtransfer.test_node

"""
Module **sesf.msgtransfer.test_node** has class TestNode that implements
an RPC wrapper for a node of an implementation of msgtransfer service.
Run ``python test_node.py N j --use imp_0.imp N j`` to start a 
process with a TestNode object wrapping an ``imp_0.imp`` implementation node.
"""

import Pyro4, importlib, argparse, time, sys
from sesf.util.pyro import (start_pyrodaemon)
from threading import Thread


[docs]@Pyro4.expose class TestNode(): # msgtransfer.test_node.TestNode def __init__(self, argv): parser = argparse.ArgumentParser() parser.add_argument("n", type=int) parser.add_argument("myid", type=int) parser.add_argument("--use", nargs=argparse.REMAINDER, help="one or more --use node_descriptors") args = parser.parse_args() print("args", args, flush=True) # args.n not needed for test_node self.myid = args.myid self.use = args.use self.pyrodaemon = None self.testerproxy = None self.mt_node = None self.pyrodaemon_thread = Thread( target=start_pyrodaemon, args=(self, "sesf.msgtransfer.test_node_"+str(self.myid))) self.pyrodaemon_thread.start() def start_testnode(self): stp = Pyro4.Proxy("PYRONAME:sesf.msgtransfer.servicetester") print("got servicetester proxy", stp, flush=True) imp_nodemodule = importlib.import_module(self.use[0]) print("creating_node_from_module", imp_nodemodule, flush=True) self.mt_node = imp_nodemodule.Node(self.use, stp) return def send(self, dst, msg): print("send call", dst, msg, flush=True) # return self.mt_node.send(dst, msg) x = self.mt_node.send(dst, msg) print("send ret", x, flush=True) return x def recv(self): return self.mt_node.recv() def end(self): # import pdb; pdb.set_trace() return self.mt_node.end() @Pyro4.oneway def end_pyrodaemon(self): self.pyrodaemon.shutdown()
##### END class TestNode of module sesf.msgtransfer.test_node ##### if __name__ == '__main__': testnode = TestNode(sys.argv) testnode.pyrodaemon_thread.join() time.sleep(10)