Source code for rwlock.servicetester

"""
Module **sesf.rwlock.servicetester** has class ``ServiceTester`` that
implements the read-write-lock servicetester. This can test an 
implementation against the rwlock service and, optionally, against
a checker with implementation-specific assertions.

For example, assuming a running Pyro4 nameserver and a running test_imp 
using ``rwlock.imp_0.imp``,

``python servicetester.py <num_threads> <num_ops> --checker imp_0.checker``

tests the rwlock imp_0 implementation with `num_threads` threads each
doing `num_ops` read or write operations, and ``imp_0.checker`` to evaluate
implementation-specific assertions.
"""

from sesf.rwlock.serviceparts import ServiceParts
from threading import Thread, Lock, Condition, Semaphore, get_ident
import time, random, argparse, importlib, Pyro4
from sesf.util.pyro import start_pyrodaemon

def random_delay():
  time.sleep(random.random()*1)

[docs]class ServiceTester(): """ Read-write-lock servicetester to test an implementation, driving it with *num_users* user threads each doing *num_ops* (randomly chosen) read or write operations, and optionally using *checkermodule.Checker*. It maintains the following variables: - ``num_users`` (int): number of user threads in testing - ``num_ops`` (int): number of read or write operations per user thread. - ``checker`` (obj): instance of ``checker.Checker`` - ``pyrodaemon``: Pyro4 daemon `sesf.rwlock.servicetester` - ``serviceparts`` (obj): read-write-lock serviceparts object - ``impproxies``: list of Pyro proxies to the implementation. - ``lck``, ``cond``: lock and condition variable for atomicity of call and return parts. - ``awaitend`` (semaphore): main thread waits here for test to be over. Functions: j in 0..num_users - ``do_acqr(j)``: do a valid impproxies[j].acqr call and validate return - ``do_relr(j)``: do a valid impproxies[j].relr call and validate return - ``do_acqw(j)``: do a valid impproxies[j].acqw call and validate return - ``do_relw(j)``: do a valid impproxies[j].relw call and validate return - ``do_ops(j)``: issue *num_ops* read or write (randomly chosen) operations - ``do_test()``: start ``do_ops(0)``, ..., ``do_ops(num_users-1)`` in separate threads. - ``inform_tester(event)``: RPC-called by implementation to convey event to checker. """ def __init__(self, num_users, num_ops, checkermodule=None): """ Instantiate attributes, obtain *num_users* proxies for `rwlock_imp`, start a Pyro4 daemon and register it as `sesf.rwlock.servicetester` in a new thread, start ``do_test`` in a new thread. """ self.pyrodaemon = None self.serviceparts = ServiceParts() self.lck = Lock() self.cond = Condition(self.lck) self.awaitend = Semaphore(0) self.num_users = num_users self.num_ops = num_ops if checkermodule == None: self.checker = None else: checkermodule = importlib.import_module(checkermodule) self.checker = checkermodule.Checker(self) print(self.checker) self.impproxies = [Pyro4.Proxy("PYRONAME:" + 'sesf.rwlock.test_imp') for h in range(self.num_users)] print('Starting pyronet', flush=True) Thread(name='pyrodaemon_thread', target=start_pyrodaemon, args=(self, 'sesf.rwlock.servicetester')).start() self.impproxies[0].start_testimp() print("start_testimp returned") Thread(target=self.do_test, daemon=True).start() @Pyro4.expose def inform_tester(self, event): """RPC-called by implementation to convey event to checker.""" # print("INFORM CALL", event, flush=True) if self.checker: with self.lck: self.checker.handle_event(event) def do_acqr(self, j): """Issue a valid impproxies[j].acqr() call and validate return.""" x = self.serviceparts ## await acqr.CIC: acqr.CU with self.cond: while not x.acqr_CIC(): self.cond.wait() x.acqr_CU() self.cond.notify_all() # random_delay() ## call implementation's acqr() self.impproxies[j].acqr() ## check acqr.ROC: acqr.RU random_delay() with self.cond: if not x.acqr_ROC(): raise Exception('acqr ROC violated') x.acqr_RU() self.cond.notify_all() def do_relr(self, j): """Issue a valid impproxies[j].relr() call and validate return.""" x = self.serviceparts ## await relr.CIC: relr.CU with self.cond: while not x.relr_CIC(): self.cond.wait() x.relr_CU() self.cond.notify_all() # random_delay() ## call implementation's relr() self.impproxies[j].relr() random_delay() ## check relr.ROC: relr.RU with self.cond: if not x.relr_ROC(): raise Exception('relr ROC violated') x.relr_RU() self.cond.notify_all() def do_acqw(self, j): """Issue a valid impproxies[j].acqw() call and validate return.""" x = self.serviceparts ## await acqw.CIC: acqw.CU with self.cond: while not x.acqw_CIC(): self.cond.wait() x.acqw_CU() self.cond.notify_all() # random_delay() ## call implementation's acqw() self.impproxies[j].acqw() ## check acqw.ROC: acqw.RU random_delay() # time.sleep(random.random()*3) with self.cond: if not x.acqw_ROC(): raise Exception('acqw ROC violated') x.acqw_RU() self.cond.notify_all() def do_relw(self, j): """Issue a valid impproxies[j].relw() call and validate return.""" x = self.serviceparts ## await relw.CIC: relw.CU with self.cond: while not x.relw_CIC(): self.cond.wait() x.relw_CU() self.cond.notify_all() random_delay() ## call implementation's relw() self.impproxies[j].relw() ## check relw.ROC: relw.RU # random_delay() with self.cond: if not x.relw_ROC(): raise Exception('relw ROC violated') x.relw_RU() self.cond.notify_all() def do_ops(self, j): """Issue *num_ops* read or write (randomly chosen) operations via impproxies[j].""" print('starting user', get_ident()) for k in range(self.num_ops): random_delay() # randomly select read or write operation if random.randint(0, 1): self.do_acqr(j) print('read', 'wset =', self.serviceparts.wset, ' rset =', self.serviceparts.rset, flush=True) random_delay() self.do_relr(j) else: self.do_acqw(j) print('write:', 'wset =', self.serviceparts.wset, ' rset =', self.serviceparts.rset, flush=True) random_delay() self.do_relw(j) self.awaitend.release() def do_test(self): """Start do_ops(0), ..., do_ops(num_users-1) in separate threads.""" print('starting', self.num_users, 'users, each doing', self.num_ops, 'read or write operations') for j in range(self.num_users): Thread(target=self.do_ops, args=(j,)).start()
### End class ServiceTester ########################### if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("num_users", type=int, help="number of user threads") parser.add_argument("num_ops", type=int, help="number of read or write operations per user") parser.add_argument("--checker", type=str, help="checker module") args = parser.parse_args() tester = ServiceTester(args.num_users, args.num_ops, args.checker) for k in range(tester.num_users): tester.awaitend.acquire() print("ENDING_PYRONET", flush=True) tester.impproxies[0].end_pyrodaemon() if tester.pyrodaemon != None: tester.pyrodaemon.shutdown()