Source code for sesf.objtransfer.imp_dpr.checker

# sesf module: objtransfer.imp_distpathreversal.checker

from threading import Thread, Semaphore
import argparse, time
# from sesf.objtransfer.imp_distpathreversal.checker_art import CheckerArt

def myprint(*args):
  print(*args, flush=True)

def create_checker_from_argv(tester, argv):
  parser = argparse.ArgumentParser()
  parser.add_argument("n", type=int, help="number of nodes")
  parser.add_argument("initial_owner", type=int, help="initial owner")
  args = parser.parse_args(argv)
  return Checker(tester, args.n, args.initial_owner)


[docs]class Checker(): """Maintain the global state of the distributed path-reversal system (including displays of lr and pr graphs). Provides a function that takes a given event at a node and checks whether the resulting state change preserves desired global invariants. The distributed objtransfer tester creates an instance of this class when testing a distributed path-reversal-based implementation. State: - the[j]: node j status: 't' (thinking) | 'h' (hungry) | 'e' (eating) - objdest: msg [OBJ,oval]'s location or destination if in transit - lastptr[j]: node j's last pointer; None if not pointing to a node - nextptr[j]: node j's next pointer; None if not pointing to a node - reqdest[j]: msg [REQ,j]'s destination; None if [REQ,j] not in transit - pr_graph[j]: [k such that (k,j) is a pr-edge, ie, reqdest[k] == j or nextptr[j] == k or (lastptr[k] == j and k thinking)]. Note: pr_graph[j] allows duplicates (but there shouldn't be any) - dpr_art: art for dynamic dist_pathreversal graphs Events: [event location, event type, msg parameter, msg destination] - [j, 'sendreq', j, k]: node j sends [REQ,j] to node k - [j, 'fwdreq', i, k]: node j receives [REQ,i], forwards it to node k - [j, 'recvreq', i, None]: node j receives [REQ,i] (does not forward it) - [j, 'sendobj', oval, k]: node j sends [OBJ, oval] to node k - [j, 'recvobj', oval, None]: node j receives [OBJ,oval] """ def __init__(self, tester, n, initial_owner): # set higher for more diagnostic printing; 0 for no printing self.debugprintlevel = 1 # tester that created this; needed for logging self.tester = tester self.n = n self.initial_owner = initial_owner # initially: owner is eating, others are thinking self.the = ['t' for k in range(n)] self.the[initial_owner] = 'e' self.objdest = initial_owner # initially: last pointers form an in-tree rooted at initial owner self.lastptr = [initial_owner for k in range(n)] self.lastptr[initial_owner] = None # next pointers void, no request or object in transit self.nextptr = [None for k in range(n)] self.reqdest = [None for k in range(n)] self.pr_graph = [[] for k in range(n)] self.pr_graph[initial_owner] = [k for k in range(n) if k != initial_owner] # last reported event from dist pathreversal implementation self.event = None # for producer-consumer access to self.event # self.sem_full = Semaphore(0) # self.sem_empty = Semaphore(1) # art for dist_pathreversal graphs # self.dpr_art = CheckerArt(n, initial_owner) def debugprint(self, level=1, *args): """Print if level <= self.debugprintlevel""" if self.debugprintlevel >= level: print(*args, flush=True) def check_assertions(self): """Check assertions (currently only one assertion): - pr_graph is a directed in-tree rooted at node objdest """ if self.n != self.dfs(self.pr_graph, self.objdest): raise Exception("pr_graph is NOT an in-tree rooted at node objdest") def dfs(self, graph, startnode): """Return number of nodes reachable from startnode. Uses depth-first-search program from http://eddmann.com/posts /depth-first-search-and-breadth-first-search-in-python/ modified for multigraph Args: - graph: {nodeid : [reachable nodes]} - startnode: node at which to start depth-first-search """ visited, stack = [], [startnode] while stack: vertex = stack.pop() if vertex not in visited: visited.append(vertex) stack.extend([k for k in graph[vertex] if k not in visited]) return len(visited) def replace_predge(self, j1, k1, j2, k2): """Remove predge (j1,k1); raise exception if edge not present. Add predge (j2,k2); raise exception if edge already present. """ self.debugprint(3, "replace_predge:", j1, k1, j2, k2) if j1 not in self.pr_graph[k1]: raise Exception("remove pr_edge", j1, k1, "failed: not present") if j1 == j2 and k1 == k2: # remove and add same edge: no change return if j2 in self.pr_graph[k2]: raise Exception("add pr_edge", j2, k2, "failed: already present") self.pr_graph[k1].remove(j1) self.pr_graph[k2].append(j2) self.debugprint(3, "ending pr_graph:", self.pr_graph) return None def set_event(self, event): """Place event in self.event (via prod-cons synch). Called from dist_objtransfer_tester.""" self.sem_empty.acquire() self.event = event self.sem_full.release() def handle_event(self, event): """Called by servicetester with lock held. Update the state by *event* and log event if logging enabled. """ self.debugprint(3, "HANDLE_EVENT:", event) eloc, etype, edata, edest = event if etype == 'sendreq': self.the[eloc] = 'h' # lnr: (l,eloc,edest) -> (r,eloc,edest) self.lastptr[eloc] = None self.reqdest[eloc] = edest # pr: eloc was thinking: (prl,eloc,edest) -> (prr,eloc,edest) self.replace_predge(eloc, edest, eloc, edest) self.tester.log_event(event) elif etype == 'recvreq': # lnr: (r,edata,eloc) -> (n,eloc,edata), (l,eloc,edata) self.reqdest[edata] = None self.lastptr[eloc] = edata self.nextptr[eloc] = edata # pr: eloc is not thinking: (prr,edata,eloc) -> (prn,edata,eloc) self.replace_predge(edata, eloc, edata, eloc) self.tester.log_event(event) elif etype == 'fwdreq': # old_eloc_lastptr = self.lastptr[eloc] self.lastptr[eloc] = edata self.reqdest[edata] = edest # fwdreq implies eloc.lastptr was not None when req was recvd # lnr: (r,edata,eloc), (l,eloc,edest)-> (r,edata,edest), (l,eloc,edata) # pr: (prr,edata,eloc) -> (prr,edata,edest) # if eloc thinking: (prl,eloc,edest) -> (prl, eloc,edata) self.replace_predge(edata, eloc, edata, edest) if self.the[eloc] == 't': self.replace_predge(eloc, edest, eloc, edata) self.tester.log_event((eloc, 'fwdreq_prl_change', edata, edest)) # self.dpr_art.remove_pr_edge('prl', eloc, edest) # self.dpr_art.add_pr_edge('prl', eloc, edata) else: self.tester.log_event((eloc, 'fwdreq_prl_nochange', edata, edest)) elif etype == 'sendobj': self.nextptr[eloc] = None self.objdest = edest self.the[eloc] = 't' self.the[edest] = 'e' # lnr: (n,eloc,edest) -> none # pr: (prn, edest,eloc) -> (prl, eloc,lastptr[eloc]) self.replace_predge(edest, eloc, eloc, self.lastptr[eloc]) # KLUGE: setting edata to eloc.lastptr in logged event self.tester.log_event((eloc, 'sendobj', self.lastptr[eloc], edest)) elif etype == 'recvobj': pass else: raise Exception("unknown event:", self.event) self.check_assertions()
########## end class Checker ############# def run_test(dprc): def io(j): return (dprc.initial_owner + j) % dprc.n if dprc.n < 4: myprint("run_test: n must be at least 5") return global run_test_over dprc.set_event((io(2), 'sendreq', io(2), io(0))) time.sleep(1) dprc.set_event((io(3), 'sendreq', io(3), io(0))) time.sleep(1) dprc.set_event((io(0), 'recvreq', io(2), None)) time.sleep(1) dprc.set_event((io(0), 'fwdreq', io(3), io(2))) time.sleep(1) dprc.set_event((io(2), 'recvreq', io(3), None)) time.sleep(1) # set this here (not at return) because of producer-consumer synch run_test_over = True dprc.set_event((io(0), 'sendobj', None, io(2))) time.sleep(1) return ####### main ########################################################## if __name__ == '__main__': dpr_checker = Checker(4, 2) time.sleep(1) run_test_over = False thrd1 = Thread(name="runtest", target=run_test, args=(dpr_checker,)) thrd1.start() while not run_test_over: dpr_checker.handle_event() myprint("run_test over")