Source code for sesf.objtransfer.servicetester

"""
Module **sesf.objtransfer.servicetester** has class ``ServiceTester``
that implements the objtransfer servicetester.
This can test a distributed implementation of the objtransfer service.
For example, to test a distributed system of N imp_0 implementation nodes:

- Start a Pyro4 nameserver if one is not already running:
  run ``python -m Pyro4.naming &``.

- Run ``python test_node.py N j k --use imp_0.node N j k``
  for ``j`` (node's id) ranging over 0, 1, ..., N-1
  and ``k`` (initial owner) some fixed value in 0, 1, ..., N-1. 
  (Or run ``python start_nodes.py N test_node.py --use imp_0.node k``.)

- Run ``python servicetester.py N k`` to test only against the service.

- Run ``python servicetester.py N k --checker imp_0.checker N k`` to
  test against the service and the implementation-specific assertions
  in ``imp_0.checker.Checker``.

- Or run ``python servicetester.py N --log logfile--checker imp_0.checker N``
  to also generate an event log in file ``logfile``, which can later be 
  played by running ``python imp0/checker_art.py logfile``.
"""

from sesf.objtransfer.serviceparts import ServiceParts
from sesf.util.service import (do_service_output, do_service_input)
from sesf.util.pyro import (start_pyrodaemon, end_testerpyronet)
from threading import Thread, Lock, Condition, Semaphore
import Pyro4, importlib, argparse, time, pickle, random


def random_delay():
  """Increase the multiplier (currently 1) for delays of higher variance."""
  time.sleep(random.random()*1)


[docs]class ServiceTester(): def __init__(self, num_nodes, initial_owner, checker_argv, log, min_acqs_todo=4): """ - ``num_nodes``: number of nodes - ``initial_owner``: id of node whose user holds object initially. - ``checker_argv``: checker module and args; None if no checker. - ``log``: True if event logging enabled (to file arthistory.pickle). - ``min_num_acqs``: minimum number of acqs done by each node. """ self.nodeproxies = {} self.daemon = None self.num_nodes = num_nodes self.initial_owner = initial_owner self.enderid = random.randint(0, num_nodes-1) self.ended = False # modify below to vary number of acqs done by each node self.acqs_todo = min_acqs_todo + random.randint(1, 5) self.nthreads = [0 for j in range(num_nodes)] self.serviceparts = ServiceParts(num_nodes, initial_owner) # for atomic update and synchronization of service state self.lck = Lock() self.cond = Condition(self.lck) # for termination of servicetester self.awaitend = Semaphore(0) self.logfile = log self.eventlog = [('service', 'init', [num_nodes, initial_owner])] if log else None print("checker_argv:", checker_argv, flush=True) if checker_argv == None: self.checker = None else: checkermodule = importlib.import_module(checker_argv[0]) self.checker = checkermodule.create_checker_from_argv( self, checker_argv[1:]) print(self.checker, flush=True) print('Starting pyronet', flush=True) Thread(name='daemon_thread', target=start_pyrodaemon, args=(self, 'objtransfer_tester')).start() nodes_start_order = list(range(num_nodes)) random.shuffle(nodes_start_order) for j in nodes_start_order: Thread(target=self.start_testnode, args=(j,), daemon=True).start() print("started tester", "num_nodes:", self.num_nodes, "initial_owner", self.initial_owner, "ender", self.enderid, "acqs_todo", self.acqs_todo, flush=True) @Pyro4.expose def inform_tester(self, event): """rpc-called by a node to convey event to checker.""" print("inform_tester", event, flush=True) if self.checker: with self.lck: # self.log_event(event) self.checker.handle_event(event) def log_event(self, event): if self.eventlog: self.eventlog.append(event) def log_event_lock(self, event): if self.eventlog: with self.lck: self.eventlog.append(event) def start_testnode(self, j): random_delay() self.nodeproxies[j] = [ Pyro4.Proxy("PYRONAME:" + 'ot_test_node_' + str(j)) for h in range(2)] self.log_event_lock(('service', 'node', 'starting', j)) self.nodeproxies[j][0].start_testnode() print("START_TESTNODE", j, "returned", flush=True) time.sleep(random.random()*3) self.log_event_lock(('service', 'node', 'started', j)) Thread(name="do_acq_rel_end_" + str(j), target=self.do_acq_rel_end_loop, args=(j,), daemon=True).start() Thread(name="do_recvreq_" + str(j), target=self.do_recvreq_loop, args=(j,), daemon=True).start() # to detect when proxy-invoking threads have ended with self.lck: self.nthreads[j] = 2 def do_acq(self, j): x = self.serviceparts args = do_service_output(j, x.acq_COC, x.acq_CU, self.cond) random_delay() rval = self.nodeproxies[j][0].acq(*args) random_delay() do_service_input(j, x.acq_RIC, x.acq_RU, (rval,), self.cond) return rval def do_rel(self, j): x = self.serviceparts args = do_service_output(j, x.rel_COC, x.rel_CU, self.cond) random_delay() rval = self.nodeproxies[j][0].rel(*args) random_delay() do_service_input(j, x.rel_RIC, x.rel_RU, (rval,), self.cond) return rval def do_recvreq(self, j): x = self.serviceparts args = do_service_output(j, x.recvreq_COC, x.recvreq_CU, self.cond) random_delay() rval = self.nodeproxies[j][1].recvreq() random_delay() do_service_input(j, x.recvreq_RIC, x.recvreq_RU, (rval,), self.cond) return rval def do_end(self, j): x = self.serviceparts args = do_service_output(j, x.end_COC, x.end_CU, self.cond) random_delay() rval = self.nodeproxies[j][0].end() random_delay() do_service_input(j, x.end_RIC, x.end_RU, (rval,), self.cond) return rval def do_acq_rel_end_loop(self, j): if j == self.initial_owner: random_delay() # do_req blocks if a request has not arrived self.do_rel(j) # acq-awaitreq-rel loop while True: random_delay() # acquire object rval = self.do_acq(j) if rval == (False,): break oval = rval[1] random_delay() if j == self.enderid: self.acqs_todo -= 1 if self.acqs_todo <= 0: self.do_end(j) with self.lck: self.ended = True break self.do_rel(j) self.check_node_termination(j) def do_recvreq_loop(self, j): while True: rval = self.do_recvreq(j) if not rval: break self.check_node_termination(j) def check_node_termination(self, j): with self.lck: self.nthreads[j] -= 1 # print("check_node_termination: nthreads", self.nthreads, flush=True) if self.nthreads[j] == 0: self.log_event(('service', 'node', 'ended', j)) if sum(self.nthreads) == 0: self.log_event('END') self.awaitend.release() time.sleep(2)
########## end class ServiceTester ############# if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--log', type=str, help='logfile') parser.add_argument("num_nodes", type=int, help="number of ot_nodes") parser.add_argument("initial_owner", type=int, help="id of the initial owner's node") parser.add_argument("--checker", nargs=argparse.REMAINDER, help="checker module and args") args = parser.parse_args() print("Starting objtransfer servicetester:", " num_nodes:", args.num_nodes, " initial_owner:", args.initial_owner, " logging:", args.log, " checker:", args.checker, flush=True) servicetester = ServiceTester( args.num_nodes, args.initial_owner, args.checker, args.log) servicetester.awaitend.acquire() time.sleep(1) if servicetester.logfile: with open(servicetester.logfile, 'wb') as outfile: pickle.dump(servicetester.eventlog, outfile) end_testerpyronet(servicetester, args.num_nodes) time.sleep(5)