Source code for msgtransfer2.servicetester

"""
Module **sesf.msgtransfer2.servicetester** has class ``ServiceTester``
that implements the msgtransfer2 servicetester.
This can test a distributed implementation of the msgtransfer2 service.
For example, to test a distributed system of two imp_0 nodes:

- Start a Pyro4 nameserver if one is not already running:
  run ``python -m Pyro4.naming &``.

- Run ``python test_node.py 2 0 --use imp_0.node 2 0``
  and ``python test_node.py 2 1 --use imp_0.node 2 1``.
  (Or ``python start_nodes.py 2 test_node.py --use imp_0.node``.)

- Run ``python servicetester.py 2 --checker imp_0.checker 2`` to
  test against the service and the implementation-specific assertions
  in ``imp_0.checker.Checker``.
  (Or ``python servicetester.py 2`` to test only against the service.)
"""

from sesf.msgtransfer2.serviceparts import ServiceParts
import Pyro4, argparse, time, random, importlib
from sesf.util.service import (do_service_input, do_service_output)
from sesf.util.pyro import (start_pyrodaemon, end_testerpyronet)
from threading import Thread, Lock, Condition, Semaphore



def random_delay():
  time.sleep(random.random()*1)

[docs]class ServiceTester(): def __init__(self, num_nodes, checker_argv, test_duration=3): self.nodeproxies = {} self.pyrodaemon = None # num_nodes must be 2; their ids are 0 and 1 self.num_nodes = 2 self.enderid = random.randint(0, 1) self.nthreads = [2 if j != self.enderid else 3 for j in range(self.num_nodes)] self.test_duration = test_duration self.serviceparts = ServiceParts() self.lck = Lock() self.cond = Condition(self.lck) self.awaitend = Semaphore(0) # initialize checker if needed if checker_argv == None: self.checker = None else: checkermodule = importlib.import_module(checker_argv[0]) self.checker = checkermodule.Checker(self, checker_argv) print(self.checker, flush=True) print('Starting pyronet', flush=True) Thread(target=start_pyrodaemon, args=(self, 'msgtransfer2_tester')).start() nodes_start_order = [0, 1] random.shuffle(nodes_start_order) for j in nodes_start_order: Thread(target=self.start_testnode, args=(j,), daemon=True).start() @Pyro4.expose def inform_tester(self, event): # rpc-called by a node to convey event to imptester if self.checker: with self.lck: self.checker.handle_event(event) def start_testnode(self, j): random_delay() self.nodeproxies[j] = [ Pyro4.Proxy("PYRONAME:" + 'msgtransfer2_testnode_' + str(j)) for h in range(3)] self.nodeproxies[j][0].start_testnode() print("start_testnode", j, "returned", flush=True) Thread( name='send_'+str(j), target=self.do_send_repeatedly, args=(j,), daemon=True).start() Thread( name='recv_'+str(j), target=self.do_recv_repeatedly, args=(j,), daemon=True).start() if self.enderid == j: self.do_end(j) self.check_node_termination(j) print("ender finished notifying", j, flush=True) def do_send(self, j): x = self.serviceparts call_args = do_service_output(j, x.send_COC, x.send_CU, self.cond) print(j, "do_send call(", *call_args, ")", flush=True) # DEBUG random_delay() i_rval = self.nodeproxies[j][0].send(*call_args) random_delay() # print(j, "do_send return", i_rval, flush=True) # DEBUG do_service_input(j, x.send_RIC, x.send_RU, (i_rval,), self.cond) return i_rval def do_send_repeatedly(self, j): while True: # for n in range(4): i_rval = self.do_send(j) if i_rval == False: break self.check_node_termination(j) def do_recv(self, j): x = self.serviceparts random_delay() call_args = do_service_output(j, x.recv_COC, x.recv_CU, self.cond) # print(j, "do_recv call(", *call_args, ")", flush=True) # DEBUG i_rval = self.nodeproxies[j][1].recv(*call_args) # i_rval: (False,) | (True, msg) print(j, "do_recv return", i_rval, flush=True) # DEBUG random_delay() do_service_input(j, x.recv_RIC, x.recv_RU, (i_rval,), self.cond) return i_rval def do_recv_repeatedly(self, j): """Repeatedly call node_j's recv and update global snapshot. Stop when recv returns None (ie, node_j insocket has closed). """ while True: rval = self.do_recv(j) if rval == (False,): break self.check_node_termination(j) def do_end(self, j): time.sleep(self.test_duration) x = self.serviceparts call_args = do_service_output(j, x.end_COC, x.end_CU, self.cond) print(j, "do_end call(", *call_args, ")", flush=True) # DEBUG i_rval = self.nodeproxies[j][2].end() # can omit below coz end(j)'s retcond and retupdate are vacuous do_service_input(j, x.end_RIC, x.end_RU, (i_rval,), self.cond) def check_node_termination(self, j): with self.lck: self.nthreads[j] -= 1 # print("check node termination: nthreads", self.nthreads, # flush=True) if sum(self.nthreads) == 0: self.awaitend.release() time.sleep(2) def print_sent_nrcvd(self): """Print the sent and nrcvd of both nodes.""" print("__________________________________________________", flush=True) print("Printing sent and nrcvd", flush=True) print(" sent[0]:", self.serviceparts.sent[0], flush=True) print(" nrcvd[1]", self.serviceparts.nrcvd[1], flush=True) print(" -------------------", flush=True) print(" sent[1]:", self.serviceparts.sent[1], flush=True) print(" nrcvd[0]", self.serviceparts.nrcvd[0], flush=True) print("__________________________________________________", flush=True)
########## end class ServiceTester ############# if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("num_nodes", type=int, help="number of nodes") parser.add_argument("--checker", nargs=argparse.REMAINDER, help="checker module and args") args = parser.parse_args() if args.num_nodes != 2: print("changing number of nodes to 2", flush=True) print("Starting msgtransfer2.servicetester with checker", args.checker, flush=True) tester = ServiceTester(args.num_nodes, args.checker, 4) # import pdb; pdb.set_trace() tester.awaitend.acquire() time.sleep(1) end_testerpyronet(tester, tester.num_nodes) tester.print_sent_nrcvd() time.sleep(1)