Source code for distlock2.servicetester

Module **sesf.distlock2.servicetester** has class ``ServiceTester``
that implements the distlock2 servicetester.
This can test a distributed implementation of the distlock2 service.
For example, to test a distributed system of two imp_0 nodes:

- Start a Pyro4 nameserver if one is not already running:
  run ``python -m Pyro4.naming &``.

- Run ``python 2 0 --use imp_0.node 2 0``
  and ``python 2 1 --use imp_0.node 2 1``.
  (Or ``python 2 --use imp_0.node``.)

- Run ``python 2`` to test against the service.
  Or, run ``python 2 --checker imp_0.checker 2`` to
  test against the service and the implementation-specific assertions
  in ``imp_0.checker.Checker`` (if present).

from sesf.distlock2.serviceparts import ServiceParts
import importlib, argparse, time, Pyro4, sys, random
from sesf.util.service import (do_service_input, do_service_output)
from sesf.util.pyro import (start_pyrodaemon, end_testerpyronet)
from threading import Thread, Lock, Condition

def random_delay():

[docs]class ServiceTester(): def __init__(self, num_nodes, num_acqrels, checker_argv): # def __init__(self, argv): self.nodeproxies = {} self.pyrodaemon = None # number of nodes must be 2; their ids are 0 and 1 self.num_nodes = 2 self.num_acqrels = num_acqrels self.serviceparts = ServiceParts() self.lck = Lock() self.cond = Condition(self.lck) self.threads = {} # initialize checker if needed if checker_argv == None: self.checker = None else: checkermodule = importlib.import_module(checker_argv[0]) self.checker = checkermodule.Checker(self, checker_argv) print(self.checker, flush=True) print('Starting pyronet', flush=True) Thread(target=start_pyrodaemon, args=(self, 'sesf.distlock2.servicetester')).start() nodes_start_order = [0, 1] random.shuffle(nodes_start_order) for j in nodes_start_order: self.threads[j] = Thread(target=self.start_testnode, args=(j,)) self.threads[j].start() @Pyro4.expose def inform_tester(self, event): # rpc-called by a node to convey event to implementation checker if self.checker: with self.lck: self.checker.handle_event(event) def start_testnode(self, j): random_delay() self.nodeproxies[j] = [Pyro4.Proxy( "PYRONAME:" + 'sesf.distlock2.testnode_' + str(j))] self.nodeproxies[j][0].start_testnode() print("start_testnode", j, "returned", flush=True) self.do_acq_rel_repeatedly(j) def do_acq(self, j): # import pdb; pdb.set_trace() x = self.serviceparts do_service_output(j, x.acq_COC, x.acq_CU, self.cond) # print(j, "do_acq call()", flush=True) # DEBUG random_delay() self.nodeproxies[j][0].acq() random_delay() # print(j, "do_acq return", flush=True) # DEBUG do_service_input(j, x.acq_RIC, x.acq_RU, (), self.cond) return def do_rel(self, j): # import pdb; pdb.set_trace() x = self.serviceparts do_service_output(j, x.rel_COC, x.rel_CU, self.cond) # print(j, "do_rel call()", flush=True) # DEBUG random_delay() self.nodeproxies[j][0].rel() random_delay() # print(j, "do_rel return", flush=True) # DEBUG do_service_input(j, x.rel_RIC, x.rel_RU, (), self.cond) return def do_acq_rel_repeatedly(self, j): """Call do_acq(j) and do_rel(j) for num_acqrels repetitions.""" for i in range(self.num_acqrels): random_delay() print(j, "do_acq call()", i, flush=True) # DEBUG self.do_acq(j) print(j, "do_rel call()", i, flush=True) # DEBUG self.do_rel(j) print(" finished do_acq_rel_repeatedly", j, flush=True) def end_test(self): """Wait for threads executing do_acq_rel_repeatedly(j), j = 0,1, to end. Call end() of a randomly chosen node. """ for j in range(self.num_nodes): self.threads[j].join() endj = random.randint(0,1) print(endj, "end call()", flush=True) # DEBUG self.nodeproxies[endj][0].end()
########## end class ServiceTester ############# if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("num_nodes", type=int, help="number of nodes; must be 2") parser.add_argument("num_acqrels", type=int, help="number of acq-rel ops") parser.add_argument("--checker", nargs=argparse.REMAINDER, help="checker module and args") args = parser.parse_args() if args.num_nodes != 2: print("changing number of nodes to 2", flush=True) servicetester = ServiceTester( args.num_nodes, args.num_acqrels, args.checker) servicetester.end_test() end_testerpyronet(servicetester, servicetester.num_nodes) time.sleep(10)