"""
Module **sesf.msgtransfer.service** has class ``Service``, which is the
concrete version of msgtransfer service. RPC-accessable via pyrodaemon
`sesf.msgtransfer.service`. Run ``python service.py <num_nodes>`` to
start a Service instance for ``num_nodes`` addresses, which can then be
accessed by a distributed program that uses the msgtransfer service.
"""
from sesf.msgtransfer.serviceparts import ServiceParts
import Pyro4, argparse, time, random
from sesf.util.service import (do_service_output, do_service_input)
from sesf.util.pyro import (start_pyrodaemon)
from threading import Thread, Lock, Condition
[docs]@Pyro4.expose
class Service():
def __init__(self, num_nodes):
self.num_nodes = num_nodes
self.serviceparts = ServiceParts(num_nodes)
self.lck = Lock()
self.cond = Condition(self.lck)
Thread(name='pyrodaemon', target=start_pyrodaemon,
args=(self, 'sesf.msgtransfer.service')).start()
def send(self, j, k, msg):
x = self.serviceparts
do_service_input(j, x.send_CIC, x.send_CU, (k,msg), self.cond)
time.sleep(random.random()*2)
rval = do_service_output(j, x.send_ROC, x.send_RU, self.cond)
return rval
def recv(self, j):
x = self.serviceparts
do_service_input(j, x.recv_CIC, x.recv_CU, (), self.cond)
time.sleep(random.random()*2)
rval = do_service_output(j, x.recv_ROC, x.recv_RU, self.cond)
# rval == ((False,),) | ((True, msg, k),)
# k is an internal parameter; remove it from rval
rval = rval[0]
if rval != (False,):
rval = rval[:-1]
return rval
@Pyro4.oneway
def end(self, j):
x = self.serviceparts
do_service_input(j, x.end_CIC, x.end_CU, (), self.cond)
time.sleep(random.random()*1)
rval = do_service_output(j, x.end_ROC, x.end_RU, self.cond)
time.sleep(7)
self.pyrodaemon.shutdown()
return rval
###### End Service #########################################
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("num_nodes", type=int,
help="number of msgtransfer nodes")
args = parser.parse_args()
print("number of nodes:", args.num_nodes, flush=True)
msgtransfer_service = Service(args.num_nodes)