From cf2912e8a47db60dfc07424a2fcb72d4561bfa9d Mon Sep 17 00:00:00 2001 From: Rod A Santiago Date: Wed, 10 Aug 2016 17:27:15 -0700 Subject: [PATCH] API2 add/modify experiment support. --- daemon/core/api/apibridge.py | 82 ++++++++++++++++++++++++++++++++---- daemon/sbin/core-daemon | 12 ++++-- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/daemon/core/api/apibridge.py b/daemon/core/api/apibridge.py index 71a5c0fc..b760533d 100644 --- a/daemon/core/api/apibridge.py +++ b/daemon/core/api/apibridge.py @@ -12,6 +12,7 @@ import core_pb2 import struct from core.api import coreapi, corewrapper +from core.experiments import ExperimentStore wrapper = corewrapper legacy = coreapi @@ -25,24 +26,27 @@ API2HDRSIZ = struct.calcsize(API2HDRFMT) class CoreApiBridge(object): @staticmethod - def Api2toLegacy(data): + def Api2toLegacy(data, handler): message = core_pb2.CoreMessage() message.ParseFromString(data) if message.HasField('session'): return CoreApiBridge.translateApi2SessionMsg(message.session) if message.HasField('experiment'): - return CoreApiBridge.translateExperimentMsg(message.experiment) + return CoreApiBridge.handleApi2ExperimentMsg(message.experiment, + message.purpose, + handler) if message.HasField('event'): return CoreApiBridge.translateEvent(message.event) @staticmethod - def LegacytoApi2(messages, sessionState): + def LegacytoApi2(messages, handler): api2msgs = [] for msgstr in messages: # Unpack the message parser = wrapper.CoreMessageParser(msgstr) if parser.getType() == legacy.CORE_API_REG_MSG: regMsg = parser.createWrapper() + ''' print "RegisterMessage" print "\twireless=", regMsg.getWireless() print "\tmobility=", regMsg.getMobility() @@ -51,8 +55,10 @@ class CoreApiBridge(object): print "\tgui=", regMsg.getGui() print "\temul=", regMsg.getEmulsrv() print "\tsess=", regMsg.getSession() + ''' elif parser.getType() == legacy.CORE_API_SESS_MSG: sessMsg = parser.createWrapper() + ''' print "SessionMessage" print "\tnumber=", sessMsg.getNumber() print "\tname=", sessMsg.getName() @@ -62,14 +68,33 @@ class CoreApiBridge(object): print "\tthumb=", sessMsg.getThumb() print "\tuser=", sessMsg.getUser() print "\topaque=", sessMsg.getOpaque() + ''' sessions = sessMsg.getNumber().split("|") port_num = int(sessions[0]) newMsg = core_pb2.CoreMessage() newMsg.session.clientId = 'client' + sessions[0] newMsg.session.port_num = port_num + + # List active experiments in the server + ''' + for sid in sessions: + sid = int(sid) + if sid == 0: + continue + session = session.server.getsession(sessionid=sid, useexisting=True) + if session is None: + print "Invalid session ID received from daemon" + continue + if hasattr(session, 'experiment'): + newMsg.session.all_exps.add(session.experiment.id) + else: + newMsg.session.all_exps.add(str(sid)) + ''' + newMsg.purpose = core_pb2.ADD api2msgs.append(CoreApiBridge.packApi2(newMsg)) elif parser.getType() == legacy.CORE_API_EVENT_MSG: event = parser.createWrapper() + ''' print "Event:" print "\tnode=", event.getNode() print "\ttype=", event.getType() @@ -77,12 +102,15 @@ class CoreApiBridge(object): print "\tdata=", event.getData() print "\ttime=", event.getTime() print "\tsessions=", event.getSession() + ''' if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE: newMsg = core_pb2.CoreMessage() - newMsg.event.newstate = core_pb2.Event.RUNTIME + newMsg.purpose = core_pb2.STATE_CHANGE + newMsg.event.state = core_pb2.RUNTIME api2msgs.append(CoreApiBridge.packApi2(newMsg)) elif parser.getType() == legacy.CORE_API_NODE_MSG: node = parser.createWrapper() + ''' print "Node:" print "\tnumber=", node.getNumber() print "\ttype=", node.getType() @@ -104,12 +132,18 @@ class CoreApiBridge(object): print "\talt=", node.getAlt() print "\ticon=", node.getIcon() print "\topaque=", node.getOpaque() + if handler.session.getstate() == legacy.CORE_EVENT_INSTANTIATION_STATE: + ''' + + newMsg = core_pb2.CoreMessage() newMsg.node.idx=node.getNumber() if node.getEmuid(): newMsg.node.emu_id=node.getEmuid() if node.getName(): newMsg.node.name=node.getName() if node.getXpos(): newMsg.node.x_pos=node.getXpos() if node.getYpos(): newMsg.node.y_pos=node.getYpos() + # TODO: Collect all node updates into an experiment message + newMsg.purpose = core_pb2.MODIFY api2msgs.append(CoreApiBridge.packApi2(newMsg)) else: print "received message type", parser.getType() @@ -128,6 +162,37 @@ class CoreApiBridge(object): return msgs + @staticmethod + def handleApi2ExperimentMsg(exp, purpose, handler): + if purpose == core_pb2.ADD: + if ExperimentStore.addExperiment(exp): + response = core_pb2.CoreMessage() + response.experiment.experimentId = exp.experimentId; + response.purpose = purpose + handler.request.sendall(CoreApiBridge.packApi2(response)) + return CoreApiBridge.translateExperimentMsg(exp) + else: + return CoreApiBridge.Api2Error("unable to add experiment") + elif purpose == core_pb2.MODIFY: + # Detect if a change in state is requested + if exp.HasField('running'): + if exp.running: + # TODO: Check for a state transition + # transition to instantiation state (legacy) + msgs = [] + msgs.append(wrapper.EventMsg.instantiate( + legacy.CORE_EVENT_INSTANTIATION_STATE)) + return msgs + else: + # TODO: Check for transition from running to not running + # transition to data collection state (legacy) + msgs = [] + msgs.append(wrapper.EventMsg.instantiate( + legacy.CORE_EVENT_DATACOLLECT_STATE)) + return msgs + else: + print "Unsupported experiment modification received" + @staticmethod def translateExperimentMsg(message): @@ -138,8 +203,10 @@ class CoreApiBridge(object): 0, "0", nodecount=str(len(message.nodes) + len(message.devices)))) # Quickly transition through the definition and configuration states - msgs.append(wrapper.EventMsg.instantiate(legacy.CORE_EVENT_DEFINITION_STATE)) - msgs.append(wrapper.EventMsg.instantiate(legacy.CORE_EVENT_CONFIGURATION_STATE)) + msgs.append(wrapper.EventMsg.instantiate( + legacy.CORE_EVENT_DEFINITION_STATE)) + msgs.append(wrapper.EventMsg.instantiate( + legacy.CORE_EVENT_CONFIGURATION_STATE)) # Send location # TODO: Add this info to the Experiment @@ -213,9 +280,6 @@ class CoreApiBridge(object): # TODO - # transition to instantiation state - # TODO - msgs.append(wrapper.EventMsg.instantiate(legacy.CORE_EVENT_INSTANTIATION_STATE)) return msgs diff --git a/daemon/sbin/core-daemon b/daemon/sbin/core-daemon index 70e3815e..3295e1d0 100755 --- a/daemon/sbin/core-daemon +++ b/daemon/sbin/core-daemon @@ -1395,6 +1395,7 @@ class CoreApi2RequestHandler(CoreRequestHandler): self.nodestatusreq = {} self.master = False self.session = None + self.experiment = None self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true") self.debug = bool(server.mainserver.cfg['debug'].lower() == "true") SocketServer.BaseRequestHandler.__init__(self, request, @@ -1414,7 +1415,7 @@ class CoreApi2RequestHandler(CoreRequestHandler): while True: try: data = self.recvmsg() - msgs = apibridge.CoreApiBridge.Api2toLegacy(data) + msgs = apibridge.CoreApiBridge.Api2toLegacy(data, self) if msgs: for msg in msgs: self.session.broadcast(self, msg) @@ -1429,7 +1430,7 @@ class CoreApi2RequestHandler(CoreRequestHandler): ''' Dispatch a reply to a previously received message. ''' api2Replies = apibridge.CoreApiBridge.LegacytoApi2( - replies, self.session.getstate()) + replies, self) if api2Replies: for reply in api2Replies: try: @@ -1450,10 +1451,15 @@ class CoreApi2RequestHandler(CoreRequestHandler): for msg in msgs: self.request.sendall(msg) - def finish(self): + if self.verbose: + self.info("API2 client disconnected") + + if self.session: + self.session.disconnect(self) return SocketServer.BaseRequestHandler.finish(self) + def recvmsg(self): ''' Receive data, parse a CoreMessage and queue it onto an existing session handler's queue, if available.