Source code for msgtransfer.servicetester

"""
Module **sesf.msgtransfer.servicetester** has class ``ServiceTester``
that implements the msgtransfer servicetester.
This can test a distributed implementation of the msgtransfer service.
For example, to test a distributed system of N imp_0 implementation nodes:

- Start a Pyro4 nameserver if one is not already running:
  run ``python -m Pyro4.naming &``.

- Run ``python test_node.py N j --use imp_0.node N j``
  for ``j`` ranging over 0, 1, ..., N-1.
  (Or run ``python start_nodes.py N test_node.py --use imp_0.node``.)

- Run ``python servicetester.py N`` to test only against the service.

- Or run ``python servicetester.py N --checker imp_0.checker N`` to
  test against the service and the implementation-specific assertions
  in ``imp_0.checker.Checker``.

- Or run
  ``python servicetester.py N --log <logfile> --checker imp_0.checker N``
  to also generate an event log in file ``arthistory.pickle``, which 
  can later be played by running ``python logplayer.py logfile``.

"""

### There is code for a command-line "--imp" option to specify the 
### implementation to test. But I'm not sure of this, so I've commented
### it out using "##IMP"

from sesf.msgtransfer.serviceparts import ServiceParts
##IMP from sesf.start_nodes import start_nodes

# general imports
import importlib, argparse, time, Pyro4, sys
from sesf.util.service import (do_service_output, do_service_input)
from sesf.util.pyro import (end_testerpyronet, start_pyrodaemon)
from threading import Thread, Lock, Condition, Semaphore
from random import randint, shuffle
import pickle


[docs]class ServiceTester(): def __init__(self, argv, test_duration=0.5): # argv: servicetester.py num_nodes # [--log logfile] # [--checker checkermodule and any args] ##IMP [--imp test_node.py --use ...] parser = argparse.ArgumentParser() parser.add_argument("num_nodes", type=int, help="number of nodes") parser.add_argument('--log', type=str, help='logfile') parser.add_argument("--checker", nargs='*', help="checker module and any args") ##IMP parser.add_argument("--imp", nargs=argparse.REMAINDER, ##IMP help="node_argv") args = parser.parse_args(argv[1:]) print("Starting msgtransfer servicetester:", "num_nodes:", args.num_nodes, " logging:", args.log, " checker:", args.checker, flush=True) ##IMP print("Testing implementation:", args.imp) ##IMP if args.imp: ##IMP start_nodes(['placeholder', str(args.num_nodes)] + args.imp) ##IMP time.sleep(3) self.nodeproxies = {} self.daemon = None self.log = args.log self.num_nodes = args.num_nodes self.enderid = randint(0, self.num_nodes-1) self.nthreads = [2 if j != self.enderid else 3 for j in range(self.num_nodes)] self.test_duration = test_duration self.serviceparts = ServiceParts(self.num_nodes) self.lck = Lock() self.cond = Condition(self.lck) self.awaitend = Semaphore(0) self.eventlog = [('service', 'init', [str(self.num_nodes)])] if args.log else None self.checker = args.checker if self.checker != None: checkermodule = importlib.import_module(args.checker[0]) self.checker = checkermodule.Checker(self, args.checker) print(self.checker, flush=True) print('Starting pyronet', flush=True) Thread(name='pyrodaemon_thread', target=start_pyrodaemon, args=(self, 'sesf.msgtransfer.servicetester')).start() nodes_start_order = list(range(self.num_nodes)) shuffle(nodes_start_order) for j in nodes_start_order: Thread(target=self.start_testnode, args=(j,), daemon=True).start() @Pyro4.expose def inform_tester(self, event): # rpc-called by a node to convey event to imptester print("INFORM CALL", event, flush=True) if self.checker: with self.lck: self.log_event(event) self.checker.handle_event(event) def log_event(self, event): if self.eventlog: self.eventlog.append(event) def log_event_lock(self, event): if self.eventlog: with self.lck: self.eventlog.append(event) def start_testnode(self, j): self.nodeproxies[j] = [Pyro4.Proxy("PYRONAME:" + 'sesf.msgtransfer.test_node_' + str(j)) for h in range(3)] self.log_event_lock(('service', 'node', 'starting', j)) self.nodeproxies[j][0].start_testnode() print("START_TESTNODE", j, "returned", flush=True) self.log_event_lock(('service', 'node', 'started', j)) Thread( name='send_'+str(j), target=self.do_send_repeatedly, args=(j,), daemon=True).start() Thread( name='recv_'+str(j), target=self.do_recv_repeatedly, args=(j,), daemon=True).start() if self.enderid == j: self.do_end(j) self.check_node_termination(j) print("ENDER finished notifying", j, flush=True) def do_send(self, j): x = self.serviceparts call_args = do_service_output(j, x.send_COC, x.send_CU, self.cond) # print("do_send", j, call_args, flush=True) self.log_event_lock(('service', 'msg', 'send', j, *call_args)) i_rval = self.nodeproxies[j][0].send(*call_args) do_service_input(j, x.send_RIC, x.send_RU, (i_rval,), self.cond) return i_rval def do_send_repeatedly(self, j): while True: # for n in range(4): time.sleep(randint(0,1)) i_rval = self.do_send(j) if i_rval == False: break self.check_node_termination(j) def do_recv(self, j): x = self.serviceparts call_args = do_service_output(j, x.recv_COC, x.recv_CU, self.cond) i_rval = self.nodeproxies[j][1].recv(*call_args) # i_rval: (False,) | (True, msg), where msg is [sender_id, rest_of_msg] # if (True, msg), change to (True, msg, sender_id), coz sender_id equals # the internal-nondeterminism argument if i_rval[0]: i_rval = i_rval + (i_rval[1][0],) do_service_input(j, x.recv_RIC, x.recv_RU, (i_rval,), self.cond) # print("do_recv ret", j, i_rval, flush=True) if i_rval[0]: self.log_event_lock( ('service', 'msg', 'recv', i_rval[1][0], j, i_rval[1])) return i_rval def do_recv_repeatedly(self, j): """Repeatedly call msgtransfer j's recv and update global snapshot. Stop when recv returns None (ie, msgtransfer insocket has closed). """ while True: # for k in range(5): rval = self.do_recv(j) if rval == (False,): break self.check_node_termination(j) def do_end(self, j): time.sleep(self.test_duration) x = self.serviceparts call_args = do_service_output(j, x.end_COC, x.end_CU, self.cond) # print("calling msgtransfer", j, "end()", flush=True) i_rval = self.nodeproxies[j][2].end() # can omit below coz end(j)'s retcond and retupdate are vacuous do_service_input(j, x.end_RIC, x.end_RU, (i_rval,), self.cond) print("DO_END RETURNED", j, flush=True) def check_node_termination(self, j): with self.lck: self.nthreads[j] -= 1 # print("CHECK NODE TERMINATION: nthreads", self.nthreads, flush=True) if self.nthreads[j] == 0: self.log_event(('service', 'node', 'ended', j)) if sum(self.nthreads) == 0: self.awaitend.release() time.sleep(2) self.log_event('END') def print_sent_rcvd_histories(self): """Print the sent and rcvd histories for every pair of msgtransfers.""" print("_________________________________________________________", flush=True) print("Printing send and received histories", flush=True) print(' ', flush=True) for i in range(self.num_nodes): for j in range(self.num_nodes): if i != j: strij = str(i) + ',' + str(j) print("sent[" + strij + "]:", self.serviceparts.sent[(i,j)], flush=True) print("nrcvd[" + strij + "]:", self.serviceparts.nrcvd[(i,j)], flush=True) print("-------------------", flush=True) print("_________________________________________________________", flush=True)
########## end class ServiceTester ################################## if __name__ == '__main__': # tester = ServiceTester(args.n, args.checker, args.log) tester = ServiceTester(sys.argv) # import pdb; pdb.set_trace() tester.awaitend.acquire() time.sleep(1) if tester.log: # with open(r'arthistory.pickle', 'wb') as outfile: with open(tester.log, 'wb') as outfile: pickle.dump(tester.eventlog, outfile) print("ENDING_PYRONET", flush=True) end_testerpyronet(tester, tester.num_nodes) tester.print_sent_rcvd_histories()