From 3359aeb323bf95db89a12e4d5fa382c73b19be13 Mon Sep 17 00:00:00 2001 From: Rod A Santiago Date: Fri, 12 Aug 2016 10:35:12 -0700 Subject: [PATCH] moved from api to misc directory --- daemon/core/{api => misc}/apibridge.py | 123 +++++++++++-------------- 1 file changed, 55 insertions(+), 68 deletions(-) rename daemon/core/{api => misc}/apibridge.py (82%) diff --git a/daemon/core/api/apibridge.py b/daemon/core/misc/apibridge.py similarity index 82% rename from daemon/core/api/apibridge.py rename to daemon/core/misc/apibridge.py index de395bf9..c0ed6c0a 100644 --- a/daemon/core/api/apibridge.py +++ b/daemon/core/misc/apibridge.py @@ -9,22 +9,16 @@ -import core_pb2 -import struct, threading, traceback, sys +import coreapi2 +import threading, traceback, sys from core.api import coreapi, corewrapper from core.experiments import ExperimentStore wrapper = corewrapper legacy = coreapi -API2HDRFMT = "H" -API2HDRSIZ = struct.calcsize(API2HDRFMT) - - - - class CoreApiBridge(object): - + def __init__(self, handler): # The collector is used for gathering node messages sent by the core session, # for example, during INSTANTIATION as nodes are started until RUNTIME. @@ -45,12 +39,14 @@ class CoreApiBridge(object): session handler's queue, if available. ''' + data = coreapi2.recvAndUnpack(self.handler.request.recv) + ''' try: - hdr = self.handler.request.recv(API2HDRSIZ) + hdr = self.handler.request.recv(coreapi2.API2HDRSIZ) except Exception, e: raise IOError, "error receiving API 2 header (%s)" % e - if len(hdr) != API2HDRSIZ: + if len(hdr) != coreapi2.API2HDRSIZ: if len(hdr) == 0: raise EOFError, "client disconnected" else: @@ -60,7 +56,7 @@ class CoreApiBridge(object): data = "" while len(data) < dataToRead: data += self.handler.request.recv(dataToRead - len(data)) - + ''' msgs = self.processApi2Message(data) return msgs @@ -75,7 +71,10 @@ class CoreApiBridge(object): # send to API2 client self.handler.request.sendall(reply) except Exception, e: - self.warn("Error sending reply data: %s" % e) + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + raise e def sendall(self, data): @@ -99,7 +98,7 @@ class CoreApiBridge(object): def processApi2Message(self, data): - message = core_pb2.CoreMessage() + message = coreapi2.CoreMessage() message.ParseFromString(data) if message.HasField('session'): return self.processApi2SessionMsg(message.session, @@ -143,7 +142,7 @@ class CoreApiBridge(object): ''' sessions = sessMsg.getNumber().split("|") port_num = int(sessions[0]) - newMsg = core_pb2.CoreMessage() + newMsg = coreapi2.CoreMessage() newMsg.session.clientId = 'client' + sessions[0] newMsg.session.port_num = port_num @@ -163,8 +162,8 @@ class CoreApiBridge(object): newMsg.session.all_exps.add(str(sid)) ''' - newMsg.purpose = core_pb2.ADD - api2msgs.append(CoreApiBridge.packApi2(newMsg)) + newMsg.purpose = coreapi2.ADD + api2msgs.append(coreapi2.pack(newMsg)) elif parser.getType() == legacy.CORE_API_EVENT_MSG: event = parser.createWrapper() ''' @@ -177,16 +176,16 @@ class CoreApiBridge(object): print "\tsessions=", event.getSession() ''' if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE: - newMsg = core_pb2.CoreMessage() - newMsg.purpose = core_pb2.STATE_CHANGE - newMsg.event.state = core_pb2.RUNTIME - api2msgs.append(CoreApiBridge.packApi2(newMsg)) + newMsg = coreapi2.CoreMessage() + newMsg.purpose = coreapi2.STATE_CHANGE + newMsg.event.state = coreapi2.RUNTIME + api2msgs.append(coreapi2.pack(newMsg)) with self.lock: if self.collector: self.collector.experiment.running = True else: raise RuntimeError, "runtime entered without an instantiated experiment" - api2msgs.append(CoreApiBridge.packApi2(self.collector)) + api2msgs.append(coreapi2.pack(self.collector)) self.collector = None elif parser.getType() == legacy.CORE_API_NODE_MSG: @@ -219,62 +218,50 @@ class CoreApiBridge(object): api2_node=None api2_dev=None with self.lock: - for a_node in self.assocExperiment.nodes: - if a_node.idx == node.getNumber(): - api2_node = core_pb2.Node() - break - if not api2_node: - for a_device in self.assocExperiment.devices: - if a_device.idx == node.getNumber(): - api2_dev = core_pb2.Device() - break - if api2_node: - api2_obj = api2_node - else: - api2_obj = api2_dev + if self.assocExperiment: + nodeOrDev = None + newMsg = None + if not self.collector: + newMsg = coreapi2.CoreMessage() - if not api2_obj: - raise RuntimeError, "Node %d not in experiment" % (node.getNumber()) + if coreapi2.findNodeByIdx(self.assocExperiment, node.getNumber()): + if self.collector: + nodeOrDev = coreapi2.getNodeByIdx(self.collector.experiment, node.getNumber()) + else: + nodeOrDev = newMsg.node + elif coreapi2.findDeviceByIdx(self.assocExperiment, node.getNumber()): + if self.collector: + nodeOrDev = coreapi2.getDeviceByIdx(self.collector.experiment, node.getNumber()) + else: + nodeOrDev = newMsg.device - api2_obj.idx = node.getNumber() - if node.getEmuid() is not None: api2_obj.emu_id=node.getEmuid() - if node.getName() is not None: api2_obj.name=node.getName() - if node.getXpos() is not None: api2_obj.location.x_pos=node.getXpos() - if node.getYpos() is not None: api2_obj.location.y_pos=node.getYpos() - if self.collector: - if api_node: - self.collector.experiment.nodes.add().CopyFrom(api_node) + if nodeOrDev: + nodeOrDev.idx = node.getNumber() + if node.getEmuid() is not None: nodeOrDev.emu_id=node.getEmuid() + if node.getName() is not None: nodeOrDev.name=node.getName() + if node.getXpos() is not None: nodeOrDev.location.x_pos=node.getXpos() + if node.getYpos() is not None: nodeOrDev.location.y_pos=node.getYpos() + + if newMsg: + newMsg.purpose = coreapi2.MODIFY + api2msgs.append(coreapi2.pack(newMsg)) else: - self.collector.experiment.devices.add().CopyFrom(api_device) - - newMsg = core_pb2.CoreMessage() - newMsg.purpose = core_pb2.MODIFY - if api_node: - newMsg.node = api_node - else: - newMsg.device = api_device - api2msgs.append(CoreApiBridge.packApi2(newMsg)) - + print "Received update for unknown node or device", node.getNumber() + else: + print "Received update for node or device without an associated experiment", node.getNumber() else: print "received message type", parser.getType() return api2msgs - @staticmethod - def packApi2(message): - ''' Pack an API2 message for transmission - ''' - data = message.SerializeToString() - return struct.pack(API2HDRFMT, len(data)) + data - def processApi2SessionMsg(self, message, purpose): print 'Received session request message' - if purpose == core_pb2.ADD: + if purpose == coreapi2.ADD: legacymsgs = [] legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true')) return legacymsgs # The response will be sent to the API2 client when a legacy session message is received from the daemon - elif purpose == core_pb2.DELETE: + elif purpose == coreapi2.DELETE: # TODO: shutdown session pass else: @@ -282,9 +269,9 @@ class CoreApiBridge(object): def processApi2ExperimentMsg(self, exp, purpose): - if purpose == core_pb2.ADD: + if purpose == coreapi2.ADD: if ExperimentStore.addExperiment(exp): - response = core_pb2.CoreMessage() + response = coreapi2.CoreMessage() response.experiment.experimentId = exp.experimentId; response.purpose = purpose @@ -295,11 +282,11 @@ class CoreApiBridge(object): self.collector = response else: raise RuntimeError, "Instantiation of experiment while another is active" - self.handler.request.sendall(CoreApiBridge.packApi2(response)) # TODO: Fix this + self.handler.request.sendall(coreapi2.pack(response)) # TODO: Fix this return self.translateApi2ExperimentMsg(exp) else: return self.Api2Error("unable to add experiment") - elif purpose == core_pb2.MODIFY: + elif purpose == coreapi2.MODIFY: # Detect if a change in state is requested if exp.HasField('running'): if exp.running: