# # CORE # Copyright (c)2016 the Boeing Company. # See the LICENSE file included in this distribution. # # authors: Rod Santiago # John Kharouta # import threading, traceback, sys from core.api import coreapi, corewrapper, coreapi2 from core.experiments import ExperimentStore wrapper = corewrapper legacy = coreapi class CoreApiBridge(object): def __init__(self, handler): # The collector is used for gathering node messages sent by the core session, # for example, during INSTANTIATION as nodes are started until RUNTIME. self.collector = None # The currently associated (added or joined) experiment self.assocExperiment = None # Mutex self.lock = threading.Lock() # Reference to the owning handler in the core-daemon self.handler = handler def recvmsg(self): ''' Receive data, parse a CoreMessage and queue it onto an existing session handler's queue, if available. ''' data = coreapi2.recvAndUnpack(self.handler.request.recv) msgs = self.processApi2Message(data) return msgs def dispatchreplies(self, replies): ''' Dispatch a reply to a previously received message. ''' api2Replies = self.processLegacyCoreMessage(replies) if api2Replies: for reply in api2Replies: try: # send to API2 client self.handler.request.sendall(reply) except Exception, e: print "-"*60 traceback.print_exc(file=sys.stdout) print "-"*60 raise e def sendall(self, data): ''' The daemon calls this method with legacy API data. Convert first API2 then send. ''' try: msgs = self.processLegacyCoreMessage((data,)) if msgs: for msg in msgs: self.handler.request.sendall(msg) except Exception, e: print "-"*60 traceback.print_exc(file=sys.stdout) print "-"*60 raise e def processApi2Message(self, data): message = coreapi2.CoreMessage() message.ParseFromString(data) if message.HasField('session'): return self.processApi2SessionMsg(message.session, message.purpose) if message.HasField('experiment'): return self.processApi2ExperimentMsg(message.experiment, message.purpose) if message.HasField('event'): return self.processApi2Event(message.event, message.purpose) def processLegacyCoreMessage(self, messages): api2msgs = [] for msgstr in messages: # Unpack the message parser = wrapper.CoreMessageParser(msgstr) if parser.getType() == legacy.CORE_API_REG_MSG: regMsg = parser.createWrapper() ''' print "RegisterMessage" print "\twireless=", regMsg.getWireless() print "\tmobility=", regMsg.getMobility() print "\tutility=", regMsg.getUtility() print "\texec=", regMsg.getExecsrv() print "\tgui=", regMsg.getGui() print "\temul=", regMsg.getEmulsrv() print "\tsess=", regMsg.getSession() ''' elif parser.getType() == legacy.CORE_API_SESS_MSG: sessMsg = parser.createWrapper() ''' print "SessionMessage" print "\tnumber=", sessMsg.getNumber() print "\tname=", sessMsg.getName() print "\tfile=", sessMsg.getFile() print "\tnodecount=", sessMsg.getNodecount() print "\tdate=", sessMsg.getDate() print "\tthumb=", sessMsg.getThumb() print "\tuser=", sessMsg.getUser() print "\topaque=", sessMsg.getOpaque() ''' sessions = sessMsg.getNumber().split("|") port_num = int(sessions[0]) newMsg = coreapi2.CoreMessage() newMsg.session.clientId = 'client' + sessions[0] newMsg.session.port_num = port_num # List active experiments in the server ''' for sid in sessions: sid = int(sid) if sid == 0: continue session = session.server.getsession(sessionid=sid, useexisting=True) if session is None: print "Invalid session ID received from daemon" continue if hasattr(session, 'experiment'): newMsg.session.all_exps.add(session.experiment.id) else: newMsg.session.all_exps.add(str(sid)) ''' newMsg.purpose = coreapi2.ADD api2msgs.append(coreapi2.pack(newMsg)) elif parser.getType() == legacy.CORE_API_EVENT_MSG: event = parser.createWrapper() ''' print "Event:" print "\tnode=", event.getNode() print "\ttype=", event.getType() print "\tname=", event.getName() print "\tdata=", event.getData() print "\ttime=", event.getTime() print "\tsessions=", event.getSession() ''' if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE: newMsg = coreapi2.CoreMessage() newMsg.purpose = coreapi2.STATE_CHANGE newMsg.event.state = coreapi2.RUNTIME api2msgs.append(coreapi2.pack(newMsg)) with self.lock: if self.collector: self.collector.experiment.running = True self.collector.purpose = coreapi2.MODIFY else: raise RuntimeError, "runtime entered without an instantiated experiment" api2msgs.append(coreapi2.pack(self.collector)) self.collector = None elif parser.getType() == legacy.CORE_API_NODE_MSG: node = parser.createWrapper() print "Node:" print "\tnumber=", node.getNumber() print "\ttype=", node.getType() print "\tname=", node.getName() print "\tipaddr=", node.getIpaddr() print "\tmacaddr=", node.getMacaddr() print "\tip6addr=", node.getIp6addr() print "\tmodel=", node.getModel() print "\temusrv=", node.getEmusrv() print "\tsession=", node.getSession() print "\txpos=", node.getXpos() print "\typos=", node.getYpos() print "\tcanvas=", node.getCanvas() print "\temuid=", node.getEmuid() print "\tnetid=", node.getNetid() print "\tservices=", node.getServices() print "\tlat=", node.getLat() print "\tlon=", node.getLong() print "\talt=", node.getAlt() print "\ticon=", node.getIcon() print "\topaque=", node.getOpaque() ''' if handler.session.getstate() == legacy.CORE_EVENT_INSTANTIATION_STATE: ''' api2_node=None api2_dev=None with self.lock: if self.assocExperiment: nodeOrDev = None newMsg = None if not self.collector: newMsg = coreapi2.CoreMessage() if coreapi2.findNodeByIdx(self.assocExperiment, node.getNumber()): if self.collector: nodeOrDev = coreapi2.getNodeByIdx(self.collector.experiment, node.getNumber()) else: nodeOrDev = newMsg.node elif coreapi2.findDeviceByIdx(self.assocExperiment, node.getNumber()): if self.collector: nodeOrDev = coreapi2.getDeviceByIdx(self.collector.experiment, node.getNumber()) else: nodeOrDev = newMsg.device if nodeOrDev: nodeOrDev.idx = node.getNumber() if node.getEmuid() is not None: nodeOrDev.emu_id=node.getEmuid() if node.getName() is not None: nodeOrDev.name=node.getName() if node.getXpos() is not None: nodeOrDev.location.x_pos=node.getXpos() if node.getYpos() is not None: nodeOrDev.location.y_pos=node.getYpos() if newMsg: newMsg.purpose = coreapi2.MODIFY api2msgs.append(coreapi2.pack(newMsg)) else: print "Received update for unknown node or device", node.getNumber() else: print "Received update for node or device without an associated experiment", node.getNumber() else: print "received message type", parser.getType() return api2msgs def processApi2SessionMsg(self, message, purpose): print 'Received session request message' if purpose == coreapi2.ADD: legacymsgs = [] legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true')) return legacymsgs # The response will be sent to the API2 client when a legacy session message is received from the daemon elif purpose == coreapi2.MODIFY: pass elif purpose == coreapi2.DELETE: # TODO: shutdown session pass else: print 'Received invalid purpose for SESSION' def processApi2ExperimentMsg(self, exp, purpose): if purpose == coreapi2.ADD: if ExperimentStore.addExperiment(exp): response = coreapi2.CoreMessage() response.experiment.experimentId = exp.experimentId; response.purpose = purpose # Start a collector for gathering node messages instantiated in the core session with self.lock: if not self.collector: self.assocExperiment = exp self.collector = response else: raise RuntimeError, "Instantiation of experiment while another is active" self.handler.request.sendall(coreapi2.pack(response)) return self.translateApi2ExperimentMsg(exp) else: return self.Api2Error("unable to add experiment") elif purpose == coreapi2.MODIFY: # Detect if a change in state is requested if exp.HasField('running'): if exp.running: # TODO: Check for a state transition # transition to instantiation state (legacy) msgs = [] msgs.append(wrapper.EventMsg.instantiate( legacy.CORE_EVENT_INSTANTIATION_STATE)) return msgs else: # TODO: Check for transition from running to not running # transition to data collection state (legacy) msgs = [] msgs.append(wrapper.EventMsg.instantiate( legacy.CORE_EVENT_DATACOLLECT_STATE)) return msgs else: print "Unsupported experiment modification received" def translateApi2ExperimentMsg(self, message): print 'Received experiment message' msgs = [] # Flag need to be 0 otherwise CORE will not enter runtime state (per JavaAdaptor, need verification) msgs.append(wrapper.SessionMsg.instantiate( 0, "0", nodecount=str(len(message.nodes) + len(message.devices)))) # Quickly transition through the definition and configuration states msgs.append(wrapper.EventMsg.instantiate( legacy.CORE_EVENT_DEFINITION_STATE)) msgs.append(wrapper.EventMsg.instantiate( legacy.CORE_EVENT_CONFIGURATION_STATE)) # Send location # TODO: Add this info to the Experiment msgs.append(wrapper.ConfMsg.instantiate(obj="location", dataTypes=(9,9,9,9,9,9), dataValues='0|0| 47.5766974863|-122.125920191|0.0|150.0')) # TODO # Send control net configuration # send node types # send services # send nodes devices = {} for node in message.nodes: if node.idx in devices: raise IOError, "received experiment with node/device duplicates" devices[node.idx] = node # TODO: Add other fields msgs.append(wrapper.NodeMsg.instantiate( legacy.CORE_API_ADD_FLAG|legacy.CORE_API_STR_FLAG, node.idx, str(node.name))) for device in message.devices: if device.idx in devices: raise IOError, "received experiment with node/device duplicates" devices[device.idx] = device # TODO: Add other fields msgs.append(wrapper.NodeMsg.instantiate( legacy.CORE_API_ADD_FLAG|legacy.CORE_API_STR_FLAG, device.idx, str(device.name), type = legacy.CORE_NODE_SWITCH)) # TODO: Update this later for network in message.networks: for channel in network.channels: if len(channel.endpoints) == 2: ep0 = channel.endpoints[0] ep1 = channel.endpoints[1] if ep0.dev_idx not in devices or ep1.dev_idx not in devices: raise IOError, "received channel message with invalid first endpoint device (%d)" % (ep0.dev_idx) if ep1.dev_idx not in devices: raise IOError, "received channel message with invalid second endpoint device (%d)" % (ep1.dev_idx) if ep0.intf_idx in devices[ep0.dev_idx].interfaces: raise IOError, "received channel message with invalid first endpoint interface (%d)" % (ep0.intf_idx) if ep1.intf_idx in devices[ep1.dev_idx].interfaces: raise IOError, "received channel message with invalid second endpoint interface (%d)" % (ep1.intf_idx) if0 = devices[ep0.dev_idx].interfaces[ep0.intf_idx] if1 = devices[ep1.dev_idx].interfaces[ep1.intf_idx] msgs.append(wrapper.LinkMsg.instantiate(legacy.CORE_API_ADD_FLAG, ep0.dev_idx,ep0.intf_idx, ep1.dev_idx,ep1.intf_idx, if1ip4=if0.ip4_addr if if0.HasField("ip4_addr") else None, if2ip4=if1.ip4_addr if if1.HasField("ip4_addr") else None)) # TODO # send metadata return msgs def processApi2Event(self, event, purpose): print 'Received event'