# sesf module: objtransfer.imp_distpathreversal.checker
from threading import Thread, Semaphore
import argparse, time
# from sesf.objtransfer.imp_distpathreversal.checker_art import CheckerArt
def myprint(*args):
print(*args, flush=True)
def create_checker_from_argv(tester, argv):
parser = argparse.ArgumentParser()
parser.add_argument("n", type=int, help="number of nodes")
parser.add_argument("initial_owner", type=int, help="initial owner")
args = parser.parse_args(argv)
return Checker(tester, args.n, args.initial_owner)
[docs]class Checker():
"""Maintain the global state of the distributed path-reversal system
(including displays of lr and pr graphs). Provides a function that
takes a given event at a node and checks whether the resulting state
change preserves desired global invariants. The distributed objtransfer
tester creates an instance of this class when testing a distributed
path-reversal-based implementation.
State:
- the[j]: node j status: 't' (thinking) | 'h' (hungry) | 'e' (eating)
- objdest: msg [OBJ,oval]'s location or destination if in transit
- lastptr[j]: node j's last pointer; None if not pointing to a node
- nextptr[j]: node j's next pointer; None if not pointing to a node
- reqdest[j]: msg [REQ,j]'s destination; None if [REQ,j] not in transit
- pr_graph[j]: [k such that (k,j) is a pr-edge, ie, reqdest[k] == j
or nextptr[j] == k or (lastptr[k] == j and k thinking)].
Note: pr_graph[j] allows duplicates (but there shouldn't be any)
- dpr_art: art for dynamic dist_pathreversal graphs
Events:
[event location, event type, msg parameter, msg destination]
- [j, 'sendreq', j, k]: node j sends [REQ,j] to node k
- [j, 'fwdreq', i, k]: node j receives [REQ,i], forwards it to node k
- [j, 'recvreq', i, None]: node j receives [REQ,i] (does not forward it)
- [j, 'sendobj', oval, k]: node j sends [OBJ, oval] to node k
- [j, 'recvobj', oval, None]: node j receives [OBJ,oval]
"""
def __init__(self, tester, n, initial_owner):
# set higher for more diagnostic printing; 0 for no printing
self.debugprintlevel = 1
# tester that created this; needed for logging
self.tester = tester
self.n = n
self.initial_owner = initial_owner
# initially: owner is eating, others are thinking
self.the = ['t' for k in range(n)]
self.the[initial_owner] = 'e'
self.objdest = initial_owner
# initially: last pointers form an in-tree rooted at initial owner
self.lastptr = [initial_owner for k in range(n)]
self.lastptr[initial_owner] = None
# next pointers void, no request or object in transit
self.nextptr = [None for k in range(n)]
self.reqdest = [None for k in range(n)]
self.pr_graph = [[] for k in range(n)]
self.pr_graph[initial_owner] = [k for k in range(n)
if k != initial_owner]
# last reported event from dist pathreversal implementation
self.event = None
# for producer-consumer access to self.event
# self.sem_full = Semaphore(0)
# self.sem_empty = Semaphore(1)
# art for dist_pathreversal graphs
# self.dpr_art = CheckerArt(n, initial_owner)
def debugprint(self, level=1, *args):
"""Print if level <= self.debugprintlevel"""
if self.debugprintlevel >= level:
print(*args, flush=True)
def check_assertions(self):
"""Check assertions (currently only one assertion):
- pr_graph is a directed in-tree rooted at node objdest
"""
if self.n != self.dfs(self.pr_graph, self.objdest):
raise Exception("pr_graph is NOT an in-tree rooted at node objdest")
def dfs(self, graph, startnode):
"""Return number of nodes reachable from startnode.
Uses depth-first-search program from http://eddmann.com/posts
/depth-first-search-and-breadth-first-search-in-python/
modified for multigraph
Args:
- graph: {nodeid : [reachable nodes]}
- startnode: node at which to start depth-first-search
"""
visited, stack = [], [startnode]
while stack:
vertex = stack.pop()
if vertex not in visited:
visited.append(vertex)
stack.extend([k for k in graph[vertex] if k not in visited])
return len(visited)
def replace_predge(self, j1, k1, j2, k2):
"""Remove predge (j1,k1); raise exception if edge not present.
Add predge (j2,k2); raise exception if edge already present.
"""
self.debugprint(3, "replace_predge:", j1, k1, j2, k2)
if j1 not in self.pr_graph[k1]:
raise Exception("remove pr_edge", j1, k1, "failed: not present")
if j1 == j2 and k1 == k2:
# remove and add same edge: no change
return
if j2 in self.pr_graph[k2]:
raise Exception("add pr_edge", j2, k2, "failed: already present")
self.pr_graph[k1].remove(j1)
self.pr_graph[k2].append(j2)
self.debugprint(3, "ending pr_graph:", self.pr_graph)
return None
def set_event(self, event):
"""Place event in self.event (via prod-cons synch).
Called from dist_objtransfer_tester."""
self.sem_empty.acquire()
self.event = event
self.sem_full.release()
def handle_event(self, event):
"""Called by servicetester with lock held.
Update the state by *event* and log event if logging enabled.
"""
self.debugprint(3, "HANDLE_EVENT:", event)
eloc, etype, edata, edest = event
if etype == 'sendreq':
self.the[eloc] = 'h'
# lnr: (l,eloc,edest) -> (r,eloc,edest)
self.lastptr[eloc] = None
self.reqdest[eloc] = edest
# pr: eloc was thinking: (prl,eloc,edest) -> (prr,eloc,edest)
self.replace_predge(eloc, edest, eloc, edest)
self.tester.log_event(event)
elif etype == 'recvreq':
# lnr: (r,edata,eloc) -> (n,eloc,edata), (l,eloc,edata)
self.reqdest[edata] = None
self.lastptr[eloc] = edata
self.nextptr[eloc] = edata
# pr: eloc is not thinking: (prr,edata,eloc) -> (prn,edata,eloc)
self.replace_predge(edata, eloc, edata, eloc)
self.tester.log_event(event)
elif etype == 'fwdreq':
# old_eloc_lastptr = self.lastptr[eloc]
self.lastptr[eloc] = edata
self.reqdest[edata] = edest
# fwdreq implies eloc.lastptr was not None when req was recvd
# lnr: (r,edata,eloc), (l,eloc,edest)-> (r,edata,edest), (l,eloc,edata)
# pr: (prr,edata,eloc) -> (prr,edata,edest)
# if eloc thinking: (prl,eloc,edest) -> (prl, eloc,edata)
self.replace_predge(edata, eloc, edata, edest)
if self.the[eloc] == 't':
self.replace_predge(eloc, edest, eloc, edata)
self.tester.log_event((eloc, 'fwdreq_prl_change', edata, edest))
# self.dpr_art.remove_pr_edge('prl', eloc, edest)
# self.dpr_art.add_pr_edge('prl', eloc, edata)
else:
self.tester.log_event((eloc, 'fwdreq_prl_nochange', edata, edest))
elif etype == 'sendobj':
self.nextptr[eloc] = None
self.objdest = edest
self.the[eloc] = 't'
self.the[edest] = 'e'
# lnr: (n,eloc,edest) -> none
# pr: (prn, edest,eloc) -> (prl, eloc,lastptr[eloc])
self.replace_predge(edest, eloc, eloc, self.lastptr[eloc])
# KLUGE: setting edata to eloc.lastptr in logged event
self.tester.log_event((eloc, 'sendobj', self.lastptr[eloc], edest))
elif etype == 'recvobj':
pass
else:
raise Exception("unknown event:", self.event)
self.check_assertions()
########## end class Checker #############
def run_test(dprc):
def io(j):
return (dprc.initial_owner + j) % dprc.n
if dprc.n < 4:
myprint("run_test: n must be at least 5")
return
global run_test_over
dprc.set_event((io(2), 'sendreq', io(2), io(0)))
time.sleep(1)
dprc.set_event((io(3), 'sendreq', io(3), io(0)))
time.sleep(1)
dprc.set_event((io(0), 'recvreq', io(2), None))
time.sleep(1)
dprc.set_event((io(0), 'fwdreq', io(3), io(2)))
time.sleep(1)
dprc.set_event((io(2), 'recvreq', io(3), None))
time.sleep(1)
# set this here (not at return) because of producer-consumer synch
run_test_over = True
dprc.set_event((io(0), 'sendobj', None, io(2)))
time.sleep(1)
return
####### main ##########################################################
if __name__ == '__main__':
dpr_checker = Checker(4, 2)
time.sleep(1)
run_test_over = False
thrd1 = Thread(name="runtest", target=run_test, args=(dpr_checker,))
thrd1.start()
while not run_test_over:
dpr_checker.handle_event()
myprint("run_test over")