Source code for msgtransfer.imp_0.node

"""
Module **sesf.msgtransfer.imp_0.node** has class ``Node`` that implements
a node of a msgtransfer service implementation for N addresses, N |geq| 2.
Run the N commands ``python user_node.py N j --use imp_0/node.py N j``
for ``j`` ranging over 0..N-1 to start, exercise and close a msgtransfer
service spanning N locations.
(Or run ``python ../start_nodes.py N user_node.py --use imp_0.node``.)

Process j starts a user node with address j that starts an imp_0 node
with address j. The N imp_0 nodes implement a msgtransfer service over
N addresses. This service is exercised by the user nodes. Each user node
has two concurrent threads, one sending messages and one receving messages;
after a while, the main thread in the process with address 0 calls
its imp_0 node's ``end()``, causing the network to shutdown.
"""

import pickle as serializer
import socket, struct, time, argparse, sys
from queue import Queue
from threading import Thread, Lock
from random import randint


# def myprint(*args):
#   print(*args, flush=True)

# from stackoverflow: because gethostname() may give an incorrect address
def getmyipaddr():
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.connect(('8.8.8.8', 0))  # UDP address, doesn't send packets
  return s.getsockname()[0]

[docs]class Node(): """ An instance of this is a node of a distributed system that implements the msgtransfer service. Assumes it is in a network of *num_nodes* nodes (in different processes), and has address *myid*, where *myid* is a a value in 0, ..., *num_nodes*-1. For brevity, below we say "n" for *num_nodes*, "j" for *myid*, and "Node(j)" for the ``Node`` instance with address j. If Node(j), ..., Node(n-1) have been started, they form a ring of TCP links, with Node(j) connected to Node((j+1)%n). During initialization, Node(j) forms TCP connections with Noden((j-1)%n) and Node((j+1)%n) as follows: - If j is 0: Node(0) connects to Node(1) at port 6001, retrying until success. Then it waits to accept a TCP connect, which would be from Node(n-1), on port ``accptngport0`` (= 6000). - If j is in 1, ..., n-1: Node(j) waits to accept a TCP connect (which would be from Node(j-1)) on port accptngport0 + j. After it accepts this connect, Node(j) connects to Node(j+1) at port ``accptngport0``+j+1 (retrying until success), except for Node(n-1), which connects to Node(0) at port ``accptngport0``. After Node(j) has opened its two TCP connections, it is ready to accept calls to its ``send``, ``recv`` and ``end`` functions. At this point, Node(j) maintains the following variables: - ``ending``: True iff ``end`` has been called or END msg has been received. - ``accptngport0`` = 6000 (hard-coded). - ``accptngsocket`` = TCP socket to accept connect requests (at port ``accptngport0 + j``). - ``outsocket`` = TCP socket connected to Node(j+1). - ``insocket`` = TCP socket connected to Node(j-1). - ``sbuf`` (bytearray): buffer to hold an outgoing packet. - ``sbufview``: memoryview of ``sbuf`` (slicing ``sbufview`` is cheap). - ``rbuf`` (bytearray): buffer to hold an incoming packet. - ``rbufview``: memoryview of ``rbuf``. - ``rcvdmsgqueue``: queue of incoming ``(flag,msg)`` tuples for local user. - ``testerproxy``: Used iff Node(j) is started within a test node, in which case Node(j) can call ``testerproxy.inform_tester(e)`` to inform the servicetester of local event ``e``. Each user message is placed in a packet with an 8-byte header (consisting of 2-byte src id, 2-byte dst id, 4-byte datalen) and data of length datalen (consisting of the serialized user message). Packets travel only in one direction of the ring. A message sent by Node(j) to Node(k) goes via nodes j+1, j+2, ..., k. Each node has a local thread that receives packets from ``insocket`` (from the previous node) and puts them in ``rcvdmsgqueue`` or forwards them to ``outsocket`` (to the next node). The end function should be called at one address only. The call initiates an END message that travels along the ring, closing each node it encounters. The END message has a datalen field of 0. """ # def __init__(self, num_nodes, myid, testerproxy=None): def __init__(self, argv, testerproxy=None): # import pdb; pdb.set_trace() print("msgtransfer.imp_0.node init", argv, testerproxy, flush=True) self.testerproxy = testerproxy p = argparse.ArgumentParser() p.add_argument("num_nodes", type=int) p.add_argument("myid", type=int) args = p.parse_args(argv[1:]) self.num_nodes = args.num_nodes self.myid = args.myid # for multi-machine network: use socket.gethostname() or getmyipaddr() self.myipaddr = 'localhost' print("My ip addr", self.myipaddr, flush=True) self.ending = False # maxpktlen in bytes: increase this if needed (can go up to 2**32 - 8) self.maxpktlen = 1024 # pkt header 8 bytes: src(h:2), dst(h:2), datalen(i:4) self.pkthdrlen = 8 self.maxpktdatalen = self.maxpktlen - self.pkthdrlen # accptngport number for mt 0: HARD-CODED self.accptngport0 = 6000 # server socket: to accept connect request (from prev mtnode) self.accptngsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.accptngsocket.bind((self.myipaddr, self.accptngport0 + self.myid)) self.accptngsocket.listen(1) # client socket: connected to next msgtransfer, to send packets self.outsocket = socket.socket() # for atomic updates when user and recv threads try simultaneous sends self.outsocket_lock = Lock() # client socket: connected to prev mtnode (via accept), to recv packets self.insocket = None # join the network: connect to next mtnode and prev mtnode self.join_network() # buffer to hold outgoing packet from this user self.sbuf = bytearray(self.maxpktlen) self.sbufview = memoryview(self.sbuf) # buffer to hold incoming packet (from prev mtnode) self.rbuf = bytearray(self.maxpktlen) self.rbufview = memoryview(self.rbuf) # queue for messages for local user received from insocket self.rcvdmsgqueue = Queue() # start thread receiving on insocket Thread(name="recv", target=self.recv_pkt_loop, args=(self.rbufview,) ).start() print('mt_node', self.myid, 'started', flush=True) def inform_tester(self, event): """Inform tester of local event""" if self.testerproxy != None: print("NODE: CALLING inform_tester:", event, flush=True) self.testerproxy.inform_tester(event) # def nextid(self): # return (self.myid + 1) % self.num_nodes #### BEGIN join network code ############################### def join_network(self): if self.myid == 0: self.connect_next() self.accept_prev() else: self.accept_prev() self.connect_next() def connect_next(self): nextid = (self.myid + 1) % self.num_nodes while True: try: self.outsocket.connect((self.myipaddr, self.accptngport0 + nextid)) print('msgtransfer', self.myid, 'connect req to', nextid, 'accepted', flush=True) # import pdb; pdb.set_trace() self.inform_tester(('checker', 'linkup', self.myid, nextid)) break except: print('mtnode', self.myid, 'connect req to', nextid, 'refused', flush=True) # sleep a bit and try again time.sleep(1) def accept_prev(self): (self.insocket, self.addrin) = self.accptngsocket.accept() print('msgtransfer', self.myid, 'accepted connect req from', self.addrin, flush=True) #### END join network code ################################## #### BEGIN leave network code ################################## def close_outsocket_rcvdmsgqueue(self): with self.outsocket_lock: self.ending = True hdr = struct.pack('!hhi', 0, 0, 0) self.send_bytes(self.pkthdrlen, hdr) self.outsocket.shutdown(socket.SHUT_RDWR) self.outsocket.close() print('msgtransfer', self.myid, 'outsocket closed', flush=True) self.rcvdmsgqueue.put((False,)) print('rcvdmsgqueue put closed', flush=True) def end(self): print('END CALLED', flush=True) self.close_outsocket_rcvdmsgqueue() print('END RETURNED', flush=True) def handle_endmsg(self): if not self.ending: self.close_outsocket_rcvdmsgqueue() self.insocket.close() previd = (self.myid - 1) % self.num_nodes self.inform_tester(('checker', 'linkdown', previd, self.myid)) print('msgtransfer', self.myid, 'insocket closed', flush=True) #### END leave network code ################################## def send(self, dst, msg): """Send msg to dst. User calls this.""" if dst < 0 or dst >= self.num_nodes or dst == self.myid: print('send: invalid destination', dst, flush=True) return False data = serializer.dumps(msg) datalen = len(data) if datalen == 0: print('send: null msg', flush=True) return False if datalen > self.maxpktdatalen: print('send: serialized msg size is too large', datalen, flush=True) return None # construct packet header hdr = struct.pack('!hhi', self.myid, dst, datalen) pktlen = len(hdr) + datalen with self.outsocket_lock: if self.ending: return False # place packet in sbuf self.sbufview[:pktlen] = hdr + data try: self.inform_tester(('checker', 'sent', self.myid, dst, msg)) self.send_bytes(pktlen, self.sbufview) except: # here if outsocket is closed (eg, another thread doing end()) return False return True def send_bytes(self, nbytes, bufview): """Send bufview[0:nbytes] bytes on outsocket""" totalsent = 0 while totalsent < nbytes: nsent = self.outsocket.send(bufview[totalsent:nbytes]) if nsent == 0: raise Exception('send_bytes: outsocket closed') totalsent += nsent def recv(self): try: rval = self.rcvdmsgqueue.get() except: print('recv: rcvdmsgqueue closed', flush=True) return (False,) return rval def recv_pkt_loop(self, bufview): """Repeatedly receive whole packet and either add to rcvdmsgqueue or send to next msgtransfer, until insocket is closed or END msg is recvd.""" while True: try: (src, dst, datalen) = self.recv_pkt(bufview) except: raise Exception('recv_pkt: insocket closed') if datalen == 0: self.handle_endmsg() break if dst == self.myid: msg = serializer.loads( bufview[self.pkthdrlen:self.pkthdrlen+datalen]) self.rcvdmsgqueue.put((True,msg)) self.inform_tester(('checker', 'rcvd', self.myid, dst, msg)) else: msg = serializer.loads( bufview[self.pkthdrlen:self.pkthdrlen+datalen]) self.inform_tester(('checker', 'fwd', self.myid, dst, msg)) with self.outsocket_lock: if not self.ending: self.send_bytes(self.pkthdrlen + datalen, bufview) def recv_pkt(self, bufview): """Receive incoming packet (bytes: hdr(8) + data) into bufview. Return hdr tuple. """ # receive pkt header, and unpack src, dst, datalen self.recv_bytes(self.pkthdrlen, bufview) (src, dst, datalen) = struct.unpack('!hhi', bufview[:self.pkthdrlen]) if datalen > 0: self.recv_bytes(datalen, bufview[self.pkthdrlen:]) return (src, dst, datalen) def recv_bytes(self, nbytes, bufview): """Receive nbytes from insocket.recv into bufview.""" totalrcvd = 0 while totalrcvd < nbytes: nrcvd = self.insocket.recv_into(bufview, nbytes) if nrcvd == 0: raise RuntimeError('insocket connection broken') bufview = bufview[nrcvd:] # slicing views is cheap nbytes -= nrcvd
##### END class Node in module msgtransfer.imp0.node ##################