"""
Module **sesf.objtransfer.servicetester** has class ``ServiceTester``
that implements the objtransfer servicetester.
This can test a distributed implementation of the objtransfer service.
For example, to test a distributed system of N imp_0 implementation nodes:
- Start a Pyro4 nameserver if one is not already running:
run ``python -m Pyro4.naming &``.
- Run ``python test_node.py N j k --use imp_0.node N j k``
for ``j`` (node's id) ranging over 0, 1, ..., N-1
and ``k`` (initial owner) some fixed value in 0, 1, ..., N-1.
(Or run ``python start_nodes.py N test_node.py --use imp_0.node k``.)
- Run ``python servicetester.py N k`` to test only against the service.
- Run ``python servicetester.py N k --checker imp_0.checker N k`` to
test against the service and the implementation-specific assertions
in ``imp_0.checker.Checker``.
- Or run ``python servicetester.py N --log logfile--checker imp_0.checker N``
to also generate an event log in file ``logfile``, which can later be
played by running ``python imp0/checker_art.py logfile``.
"""
from sesf.objtransfer.serviceparts import ServiceParts
from sesf.util.service import (do_service_output, do_service_input)
from sesf.util.pyro import (start_pyrodaemon, end_testerpyronet)
from threading import Thread, Lock, Condition, Semaphore
import Pyro4, importlib, argparse, time, pickle, random
def random_delay():
"""Increase the multiplier (currently 1) for delays of higher variance."""
time.sleep(random.random()*1)
[docs]class ServiceTester():
def __init__(self, num_nodes, initial_owner, checker_argv, log,
min_acqs_todo=4):
"""
- ``num_nodes``: number of nodes
- ``initial_owner``: id of node whose user holds object initially.
- ``checker_argv``: checker module and args; None if no checker.
- ``log``: True if event logging enabled (to file arthistory.pickle).
- ``min_num_acqs``: minimum number of acqs done by each node.
"""
self.nodeproxies = {}
self.daemon = None
self.num_nodes = num_nodes
self.initial_owner = initial_owner
self.enderid = random.randint(0, num_nodes-1)
self.ended = False
# modify below to vary number of acqs done by each node
self.acqs_todo = min_acqs_todo + random.randint(1, 5)
self.nthreads = [0 for j in range(num_nodes)]
self.serviceparts = ServiceParts(num_nodes, initial_owner)
# for atomic update and synchronization of service state
self.lck = Lock()
self.cond = Condition(self.lck)
# for termination of servicetester
self.awaitend = Semaphore(0)
self.logfile = log
self.eventlog = [('service', 'init',
[num_nodes, initial_owner])] if log else None
print("checker_argv:", checker_argv, flush=True)
if checker_argv == None:
self.checker = None
else:
checkermodule = importlib.import_module(checker_argv[0])
self.checker = checkermodule.create_checker_from_argv(
self, checker_argv[1:])
print(self.checker, flush=True)
print('Starting pyronet', flush=True)
Thread(name='daemon_thread', target=start_pyrodaemon,
args=(self, 'objtransfer_tester')).start()
nodes_start_order = list(range(num_nodes))
random.shuffle(nodes_start_order)
for j in nodes_start_order:
Thread(target=self.start_testnode, args=(j,), daemon=True).start()
print("started tester", "num_nodes:", self.num_nodes,
"initial_owner", self.initial_owner,
"ender", self.enderid, "acqs_todo", self.acqs_todo, flush=True)
@Pyro4.expose
def inform_tester(self, event):
"""rpc-called by a node to convey event to checker."""
print("inform_tester", event, flush=True)
if self.checker:
with self.lck:
# self.log_event(event)
self.checker.handle_event(event)
def log_event(self, event):
if self.eventlog:
self.eventlog.append(event)
def log_event_lock(self, event):
if self.eventlog:
with self.lck:
self.eventlog.append(event)
def start_testnode(self, j):
random_delay()
self.nodeproxies[j] = [
Pyro4.Proxy("PYRONAME:" + 'ot_test_node_' + str(j))
for h in range(2)]
self.log_event_lock(('service', 'node', 'starting', j))
self.nodeproxies[j][0].start_testnode()
print("START_TESTNODE", j, "returned", flush=True)
time.sleep(random.random()*3)
self.log_event_lock(('service', 'node', 'started', j))
Thread(name="do_acq_rel_end_" + str(j),
target=self.do_acq_rel_end_loop, args=(j,),
daemon=True).start()
Thread(name="do_recvreq_" + str(j),
target=self.do_recvreq_loop, args=(j,),
daemon=True).start()
# to detect when proxy-invoking threads have ended
with self.lck:
self.nthreads[j] = 2
def do_acq(self, j):
x = self.serviceparts
args = do_service_output(j, x.acq_COC, x.acq_CU, self.cond)
random_delay()
rval = self.nodeproxies[j][0].acq(*args)
random_delay()
do_service_input(j, x.acq_RIC, x.acq_RU, (rval,), self.cond)
return rval
def do_rel(self, j):
x = self.serviceparts
args = do_service_output(j, x.rel_COC, x.rel_CU, self.cond)
random_delay()
rval = self.nodeproxies[j][0].rel(*args)
random_delay()
do_service_input(j, x.rel_RIC, x.rel_RU, (rval,), self.cond)
return rval
def do_recvreq(self, j):
x = self.serviceparts
args = do_service_output(j, x.recvreq_COC, x.recvreq_CU, self.cond)
random_delay()
rval = self.nodeproxies[j][1].recvreq()
random_delay()
do_service_input(j, x.recvreq_RIC, x.recvreq_RU, (rval,), self.cond)
return rval
def do_end(self, j):
x = self.serviceparts
args = do_service_output(j, x.end_COC, x.end_CU, self.cond)
random_delay()
rval = self.nodeproxies[j][0].end()
random_delay()
do_service_input(j, x.end_RIC, x.end_RU, (rval,), self.cond)
return rval
def do_acq_rel_end_loop(self, j):
if j == self.initial_owner:
random_delay()
# do_req blocks if a request has not arrived
self.do_rel(j)
# acq-awaitreq-rel loop
while True:
random_delay()
# acquire object
rval = self.do_acq(j)
if rval == (False,):
break
oval = rval[1]
random_delay()
if j == self.enderid:
self.acqs_todo -= 1
if self.acqs_todo <= 0:
self.do_end(j)
with self.lck:
self.ended = True
break
self.do_rel(j)
self.check_node_termination(j)
def do_recvreq_loop(self, j):
while True:
rval = self.do_recvreq(j)
if not rval:
break
self.check_node_termination(j)
def check_node_termination(self, j):
with self.lck:
self.nthreads[j] -= 1
# print("check_node_termination: nthreads", self.nthreads, flush=True)
if self.nthreads[j] == 0:
self.log_event(('service', 'node', 'ended', j))
if sum(self.nthreads) == 0:
self.log_event('END')
self.awaitend.release()
time.sleep(2)
########## end class ServiceTester #############
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--log', type=str, help='logfile')
parser.add_argument("num_nodes", type=int, help="number of ot_nodes")
parser.add_argument("initial_owner", type=int,
help="id of the initial owner's node")
parser.add_argument("--checker", nargs=argparse.REMAINDER,
help="checker module and args")
args = parser.parse_args()
print("Starting objtransfer servicetester:",
" num_nodes:", args.num_nodes,
" initial_owner:", args.initial_owner,
" logging:", args.log,
" checker:", args.checker,
flush=True)
servicetester = ServiceTester(
args.num_nodes, args.initial_owner, args.checker, args.log)
servicetester.awaitend.acquire()
time.sleep(1)
if servicetester.logfile:
with open(servicetester.logfile, 'wb') as outfile:
pickle.dump(servicetester.eventlog, outfile)
end_testerpyronet(servicetester, args.num_nodes)
time.sleep(5)