diff --git a/daemon/core/api/apibridge.py b/daemon/core/api/apibridge.py index b760533d..de395bf9 100644 --- a/daemon/core/api/apibridge.py +++ b/daemon/core/api/apibridge.py @@ -10,7 +10,7 @@ import core_pb2 -import struct +import struct, threading, traceback, sys from core.api import coreapi, corewrapper from core.experiments import ExperimentStore @@ -24,22 +24,94 @@ API2HDRSIZ = struct.calcsize(API2HDRFMT) class CoreApiBridge(object): + + def __init__(self, handler): + # The collector is used for gathering node messages sent by the core session, + # for example, during INSTANTIATION as nodes are started until RUNTIME. + self.collector = None - @staticmethod - def Api2toLegacy(data, handler): + # The currently associated (added or joined) experiment + self.assocExperiment = None + + # Mutex + self.lock = threading.Lock() + + # Reference to the owning handler in the core-daemon + self.handler = handler + + + def recvmsg(self): + ''' Receive data, parse a CoreMessage and queue it onto an existing + session handler's queue, if available. + ''' + + try: + hdr = self.handler.request.recv(API2HDRSIZ) + except Exception, e: + raise IOError, "error receiving API 2 header (%s)" % e + + if len(hdr) != API2HDRSIZ: + if len(hdr) == 0: + raise EOFError, "client disconnected" + else: + raise IOError, "invalid message header size" + + dataToRead = struct.unpack(API2HDRFMT, hdr)[0] + data = "" + while len(data) < dataToRead: + data += self.handler.request.recv(dataToRead - len(data)) + + msgs = self.processApi2Message(data) + + return msgs + + def dispatchreplies(self, replies): + ''' Dispatch a reply to a previously received message. + ''' + api2Replies = self.processLegacyCoreMessage(replies) + if api2Replies: + for reply in api2Replies: + try: + # send to API2 client + self.handler.request.sendall(reply) + except Exception, e: + self.warn("Error sending reply data: %s" % e) + + + def sendall(self, data): + ''' The daemon calls this method with legacy API data. Convert first + API2 then send. + ''' + + try: + msgs = self.processLegacyCoreMessage((data,)) + if msgs: + for msg in msgs: + self.handler.request.sendall(msg) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + raise e + + + + + + def processApi2Message(self, data): message = core_pb2.CoreMessage() message.ParseFromString(data) if message.HasField('session'): - return CoreApiBridge.translateApi2SessionMsg(message.session) + return self.processApi2SessionMsg(message.session, + message.purpose) if message.HasField('experiment'): - return CoreApiBridge.handleApi2ExperimentMsg(message.experiment, - message.purpose, - handler) + return self.processApi2ExperimentMsg(message.experiment, + message.purpose) if message.HasField('event'): - return CoreApiBridge.translateEvent(message.event) + return self.processApi2Event(message.event, + message.purpose) - @staticmethod - def LegacytoApi2(messages, handler): + def processLegacyCoreMessage(self, messages): api2msgs = [] for msgstr in messages: # Unpack the message @@ -90,6 +162,7 @@ class CoreApiBridge(object): else: newMsg.session.all_exps.add(str(sid)) ''' + newMsg.purpose = core_pb2.ADD api2msgs.append(CoreApiBridge.packApi2(newMsg)) elif parser.getType() == legacy.CORE_API_EVENT_MSG: @@ -108,9 +181,16 @@ class CoreApiBridge(object): newMsg.purpose = core_pb2.STATE_CHANGE newMsg.event.state = core_pb2.RUNTIME api2msgs.append(CoreApiBridge.packApi2(newMsg)) + with self.lock: + if self.collector: + self.collector.experiment.running = True + else: + raise RuntimeError, "runtime entered without an instantiated experiment" + api2msgs.append(CoreApiBridge.packApi2(self.collector)) + self.collector = None + elif parser.getType() == legacy.CORE_API_NODE_MSG: node = parser.createWrapper() - ''' print "Node:" print "\tnumber=", node.getNumber() print "\ttype=", node.getType() @@ -132,47 +212,93 @@ class CoreApiBridge(object): print "\talt=", node.getAlt() print "\ticon=", node.getIcon() print "\topaque=", node.getOpaque() + ''' if handler.session.getstate() == legacy.CORE_EVENT_INSTANTIATION_STATE: ''' - + api2_node=None + api2_dev=None + with self.lock: + for a_node in self.assocExperiment.nodes: + if a_node.idx == node.getNumber(): + api2_node = core_pb2.Node() + break + if not api2_node: + for a_device in self.assocExperiment.devices: + if a_device.idx == node.getNumber(): + api2_dev = core_pb2.Device() + break + if api2_node: + api2_obj = api2_node + else: + api2_obj = api2_dev + + if not api2_obj: + raise RuntimeError, "Node %d not in experiment" % (node.getNumber()) + + api2_obj.idx = node.getNumber() + if node.getEmuid() is not None: api2_obj.emu_id=node.getEmuid() + if node.getName() is not None: api2_obj.name=node.getName() + if node.getXpos() is not None: api2_obj.location.x_pos=node.getXpos() + if node.getYpos() is not None: api2_obj.location.y_pos=node.getYpos() + + if self.collector: + if api_node: + self.collector.experiment.nodes.add().CopyFrom(api_node) + else: + self.collector.experiment.devices.add().CopyFrom(api_device) + newMsg = core_pb2.CoreMessage() - newMsg.node.idx=node.getNumber() - if node.getEmuid(): newMsg.node.emu_id=node.getEmuid() - if node.getName(): newMsg.node.name=node.getName() - if node.getXpos(): newMsg.node.x_pos=node.getXpos() - if node.getYpos(): newMsg.node.y_pos=node.getYpos() - # TODO: Collect all node updates into an experiment message newMsg.purpose = core_pb2.MODIFY + if api_node: + newMsg.node = api_node + else: + newMsg.device = api_device api2msgs.append(CoreApiBridge.packApi2(newMsg)) + else: print "received message type", parser.getType() return api2msgs @staticmethod def packApi2(message): + ''' Pack an API2 message for transmission + ''' data = message.SerializeToString() return struct.pack(API2HDRFMT, len(data)) + data - @staticmethod - def translateApi2SessionMsg(message): + def processApi2SessionMsg(self, message, purpose): print 'Received session request message' - msgs = [] - msgs.append(wrapper.RegMsg.instantiate(0, gui='true')) - return msgs + if purpose == core_pb2.ADD: + legacymsgs = [] + legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true')) + return legacymsgs + # The response will be sent to the API2 client when a legacy session message is received from the daemon + elif purpose == core_pb2.DELETE: + # TODO: shutdown session + pass + else: + print 'Received invalid purpose for SESSION' - @staticmethod - def handleApi2ExperimentMsg(exp, purpose, handler): + def processApi2ExperimentMsg(self, exp, purpose): if purpose == core_pb2.ADD: if ExperimentStore.addExperiment(exp): response = core_pb2.CoreMessage() response.experiment.experimentId = exp.experimentId; response.purpose = purpose - handler.request.sendall(CoreApiBridge.packApi2(response)) - return CoreApiBridge.translateExperimentMsg(exp) + + # Start a collector for gathering node messages instantiated in the core session + with self.lock: + if not self.collector: + self.assocExperiment = exp + self.collector = response + else: + raise RuntimeError, "Instantiation of experiment while another is active" + self.handler.request.sendall(CoreApiBridge.packApi2(response)) # TODO: Fix this + return self.translateApi2ExperimentMsg(exp) else: - return CoreApiBridge.Api2Error("unable to add experiment") + return self.Api2Error("unable to add experiment") elif purpose == core_pb2.MODIFY: # Detect if a change in state is requested if exp.HasField('running'): @@ -194,8 +320,7 @@ class CoreApiBridge(object): print "Unsupported experiment modification received" - @staticmethod - def translateExperimentMsg(message): + def translateApi2ExperimentMsg(self, message): print 'Received experiment message' msgs = [] # Flag need to be 0 otherwise CORE will not enter runtime state (per JavaAdaptor, need verification) @@ -214,14 +339,10 @@ class CoreApiBridge(object): dataTypes=(9,9,9,9,9,9), dataValues='0|0| 47.5766974863|-122.125920191|0.0|150.0')) + # TODO # Send control net configuration - # TODO - # send node types - # TODO - # send services - # TODO # send nodes devices = {} @@ -234,13 +355,6 @@ class CoreApiBridge(object): legacy.CORE_API_ADD_FLAG|legacy.CORE_API_STR_FLAG, node.idx, str(node.name))) - ''' - for iface in node.interfaces: - msgs.append(wrapper.IfaceMsg.instantiate(legacy.CORE_API_ADD_FLAG, - node.idx, - iface.idx, - ip4=iface.ip4_addr)) - ''' for device in message.devices: if device.idx in devices: @@ -276,17 +390,13 @@ class CoreApiBridge(object): if1ip4=if0.ip4_addr if if0.HasField("ip4_addr") else None, if2ip4=if1.ip4_addr if if1.HasField("ip4_addr") else None)) - # send metadata # TODO - - - + # send metadata return msgs - @staticmethod - def translateEvent(event): + def processApi2Event(self, event, purpose): print 'Received event' diff --git a/daemon/sbin/core-daemon b/daemon/sbin/core-daemon index 3295e1d0..0ef7073b 100755 --- a/daemon/sbin/core-daemon +++ b/daemon/sbin/core-daemon @@ -1395,7 +1395,7 @@ class CoreApi2RequestHandler(CoreRequestHandler): self.nodestatusreq = {} self.master = False self.session = None - self.experiment = None + self.bridge = apibridge.CoreApiBridge(self) self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true") self.debug = bool(server.mainserver.cfg['debug'].lower() == "true") SocketServer.BaseRequestHandler.__init__(self, request, @@ -1414,8 +1414,7 @@ class CoreApi2RequestHandler(CoreRequestHandler): self.session.connect(self) while True: try: - data = self.recvmsg() - msgs = apibridge.CoreApiBridge.Api2toLegacy(data, self) + msgs = self.recvmsg() if msgs: for msg in msgs: self.session.broadcast(self, msg) @@ -1429,27 +1428,14 @@ class CoreApi2RequestHandler(CoreRequestHandler): def dispatchreplies(self, replies): ''' Dispatch a reply to a previously received message. ''' - api2Replies = apibridge.CoreApiBridge.LegacytoApi2( - replies, self) - if api2Replies: - for reply in api2Replies: - try: - # send to API2 client - self.request.sendall(reply) - except Exception, e: - self.warn("Error sending reply data: %s" % e) + self.bridge.dispatchreplies(replies) def sendall(self, data): ''' The daemon calls this method with legacy API data. Convert first API2 then send. ''' - - msgs = apibridge.CoreApiBridge.LegacytoApi2( - (data,), self.session.getstate()) - if msgs: - for msg in msgs: - self.request.sendall(msg) + self.bridge.sendall(data) def finish(self): if self.verbose: @@ -1465,30 +1451,14 @@ class CoreApi2RequestHandler(CoreRequestHandler): session handler's queue, if available. ''' - try: - hdr = self.request.recv(apibridge.API2HDRSIZ) - except Exception, e: - raise IOError, "error receiving API 2 header (%s)" % e - - if len(hdr) != apibridge.API2HDRSIZ: - if len(hdr) == 0: - raise EOFError, "client disconnected" - else: - raise IOError, "invalid message header size" - - dataToRead = struct.unpack(apibridge.API2HDRFMT, hdr)[0] - data = "" - while len(data) < dataToRead: - data += self.request.recv(dataToRead - len(data)) - - return data + return self.bridge.recvmsg() def queuemsg(self, msg): raise Exception, "TO BE IMPLEMENTED if needed" % msg.typestr() - +