API2 add/modify experiment support.

This commit is contained in:
Rod A Santiago 2016-08-10 17:27:15 -07:00
parent 3ba2b685b6
commit cf2912e8a4
2 changed files with 82 additions and 12 deletions

View file

@ -12,6 +12,7 @@
import core_pb2 import core_pb2
import struct import struct
from core.api import coreapi, corewrapper from core.api import coreapi, corewrapper
from core.experiments import ExperimentStore
wrapper = corewrapper wrapper = corewrapper
legacy = coreapi legacy = coreapi
@ -25,24 +26,27 @@ API2HDRSIZ = struct.calcsize(API2HDRFMT)
class CoreApiBridge(object): class CoreApiBridge(object):
@staticmethod @staticmethod
def Api2toLegacy(data): def Api2toLegacy(data, handler):
message = core_pb2.CoreMessage() message = core_pb2.CoreMessage()
message.ParseFromString(data) message.ParseFromString(data)
if message.HasField('session'): if message.HasField('session'):
return CoreApiBridge.translateApi2SessionMsg(message.session) return CoreApiBridge.translateApi2SessionMsg(message.session)
if message.HasField('experiment'): if message.HasField('experiment'):
return CoreApiBridge.translateExperimentMsg(message.experiment) return CoreApiBridge.handleApi2ExperimentMsg(message.experiment,
message.purpose,
handler)
if message.HasField('event'): if message.HasField('event'):
return CoreApiBridge.translateEvent(message.event) return CoreApiBridge.translateEvent(message.event)
@staticmethod @staticmethod
def LegacytoApi2(messages, sessionState): def LegacytoApi2(messages, handler):
api2msgs = [] api2msgs = []
for msgstr in messages: for msgstr in messages:
# Unpack the message # Unpack the message
parser = wrapper.CoreMessageParser(msgstr) parser = wrapper.CoreMessageParser(msgstr)
if parser.getType() == legacy.CORE_API_REG_MSG: if parser.getType() == legacy.CORE_API_REG_MSG:
regMsg = parser.createWrapper() regMsg = parser.createWrapper()
'''
print "RegisterMessage" print "RegisterMessage"
print "\twireless=", regMsg.getWireless() print "\twireless=", regMsg.getWireless()
print "\tmobility=", regMsg.getMobility() print "\tmobility=", regMsg.getMobility()
@ -51,8 +55,10 @@ class CoreApiBridge(object):
print "\tgui=", regMsg.getGui() print "\tgui=", regMsg.getGui()
print "\temul=", regMsg.getEmulsrv() print "\temul=", regMsg.getEmulsrv()
print "\tsess=", regMsg.getSession() print "\tsess=", regMsg.getSession()
'''
elif parser.getType() == legacy.CORE_API_SESS_MSG: elif parser.getType() == legacy.CORE_API_SESS_MSG:
sessMsg = parser.createWrapper() sessMsg = parser.createWrapper()
'''
print "SessionMessage" print "SessionMessage"
print "\tnumber=", sessMsg.getNumber() print "\tnumber=", sessMsg.getNumber()
print "\tname=", sessMsg.getName() print "\tname=", sessMsg.getName()
@ -62,14 +68,33 @@ class CoreApiBridge(object):
print "\tthumb=", sessMsg.getThumb() print "\tthumb=", sessMsg.getThumb()
print "\tuser=", sessMsg.getUser() print "\tuser=", sessMsg.getUser()
print "\topaque=", sessMsg.getOpaque() print "\topaque=", sessMsg.getOpaque()
'''
sessions = sessMsg.getNumber().split("|") sessions = sessMsg.getNumber().split("|")
port_num = int(sessions[0]) port_num = int(sessions[0])
newMsg = core_pb2.CoreMessage() newMsg = core_pb2.CoreMessage()
newMsg.session.clientId = 'client' + sessions[0] newMsg.session.clientId = 'client' + sessions[0]
newMsg.session.port_num = port_num newMsg.session.port_num = port_num
# List active experiments in the server
'''
for sid in sessions:
sid = int(sid)
if sid == 0:
continue
session = session.server.getsession(sessionid=sid, useexisting=True)
if session is None:
print "Invalid session ID received from daemon"
continue
if hasattr(session, 'experiment'):
newMsg.session.all_exps.add(session.experiment.id)
else:
newMsg.session.all_exps.add(str(sid))
'''
newMsg.purpose = core_pb2.ADD
api2msgs.append(CoreApiBridge.packApi2(newMsg)) api2msgs.append(CoreApiBridge.packApi2(newMsg))
elif parser.getType() == legacy.CORE_API_EVENT_MSG: elif parser.getType() == legacy.CORE_API_EVENT_MSG:
event = parser.createWrapper() event = parser.createWrapper()
'''
print "Event:" print "Event:"
print "\tnode=", event.getNode() print "\tnode=", event.getNode()
print "\ttype=", event.getType() print "\ttype=", event.getType()
@ -77,12 +102,15 @@ class CoreApiBridge(object):
print "\tdata=", event.getData() print "\tdata=", event.getData()
print "\ttime=", event.getTime() print "\ttime=", event.getTime()
print "\tsessions=", event.getSession() print "\tsessions=", event.getSession()
'''
if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE: if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE:
newMsg = core_pb2.CoreMessage() newMsg = core_pb2.CoreMessage()
newMsg.event.newstate = core_pb2.Event.RUNTIME newMsg.purpose = core_pb2.STATE_CHANGE
newMsg.event.state = core_pb2.RUNTIME
api2msgs.append(CoreApiBridge.packApi2(newMsg)) api2msgs.append(CoreApiBridge.packApi2(newMsg))
elif parser.getType() == legacy.CORE_API_NODE_MSG: elif parser.getType() == legacy.CORE_API_NODE_MSG:
node = parser.createWrapper() node = parser.createWrapper()
'''
print "Node:" print "Node:"
print "\tnumber=", node.getNumber() print "\tnumber=", node.getNumber()
print "\ttype=", node.getType() print "\ttype=", node.getType()
@ -104,12 +132,18 @@ class CoreApiBridge(object):
print "\talt=", node.getAlt() print "\talt=", node.getAlt()
print "\ticon=", node.getIcon() print "\ticon=", node.getIcon()
print "\topaque=", node.getOpaque() print "\topaque=", node.getOpaque()
if handler.session.getstate() == legacy.CORE_EVENT_INSTANTIATION_STATE:
'''
newMsg = core_pb2.CoreMessage() newMsg = core_pb2.CoreMessage()
newMsg.node.idx=node.getNumber() newMsg.node.idx=node.getNumber()
if node.getEmuid(): newMsg.node.emu_id=node.getEmuid() if node.getEmuid(): newMsg.node.emu_id=node.getEmuid()
if node.getName(): newMsg.node.name=node.getName() if node.getName(): newMsg.node.name=node.getName()
if node.getXpos(): newMsg.node.x_pos=node.getXpos() if node.getXpos(): newMsg.node.x_pos=node.getXpos()
if node.getYpos(): newMsg.node.y_pos=node.getYpos() if node.getYpos(): newMsg.node.y_pos=node.getYpos()
# TODO: Collect all node updates into an experiment message
newMsg.purpose = core_pb2.MODIFY
api2msgs.append(CoreApiBridge.packApi2(newMsg)) api2msgs.append(CoreApiBridge.packApi2(newMsg))
else: else:
print "received message type", parser.getType() print "received message type", parser.getType()
@ -128,6 +162,37 @@ class CoreApiBridge(object):
return msgs return msgs
@staticmethod
def handleApi2ExperimentMsg(exp, purpose, handler):
if purpose == core_pb2.ADD:
if ExperimentStore.addExperiment(exp):
response = core_pb2.CoreMessage()
response.experiment.experimentId = exp.experimentId;
response.purpose = purpose
handler.request.sendall(CoreApiBridge.packApi2(response))
return CoreApiBridge.translateExperimentMsg(exp)
else:
return CoreApiBridge.Api2Error("unable to add experiment")
elif purpose == core_pb2.MODIFY:
# Detect if a change in state is requested
if exp.HasField('running'):
if exp.running:
# TODO: Check for a state transition
# transition to instantiation state (legacy)
msgs = []
msgs.append(wrapper.EventMsg.instantiate(
legacy.CORE_EVENT_INSTANTIATION_STATE))
return msgs
else:
# TODO: Check for transition from running to not running
# transition to data collection state (legacy)
msgs = []
msgs.append(wrapper.EventMsg.instantiate(
legacy.CORE_EVENT_DATACOLLECT_STATE))
return msgs
else:
print "Unsupported experiment modification received"
@staticmethod @staticmethod
def translateExperimentMsg(message): def translateExperimentMsg(message):
@ -138,8 +203,10 @@ class CoreApiBridge(object):
0, "0", 0, "0",
nodecount=str(len(message.nodes) + len(message.devices)))) nodecount=str(len(message.nodes) + len(message.devices))))
# Quickly transition through the definition and configuration states # Quickly transition through the definition and configuration states
msgs.append(wrapper.EventMsg.instantiate(legacy.CORE_EVENT_DEFINITION_STATE)) msgs.append(wrapper.EventMsg.instantiate(
msgs.append(wrapper.EventMsg.instantiate(legacy.CORE_EVENT_CONFIGURATION_STATE)) legacy.CORE_EVENT_DEFINITION_STATE))
msgs.append(wrapper.EventMsg.instantiate(
legacy.CORE_EVENT_CONFIGURATION_STATE))
# Send location # Send location
# TODO: Add this info to the Experiment # TODO: Add this info to the Experiment
@ -213,9 +280,6 @@ class CoreApiBridge(object):
# TODO # TODO
# transition to instantiation state
# TODO
msgs.append(wrapper.EventMsg.instantiate(legacy.CORE_EVENT_INSTANTIATION_STATE))
return msgs return msgs

View file

@ -1395,6 +1395,7 @@ class CoreApi2RequestHandler(CoreRequestHandler):
self.nodestatusreq = {} self.nodestatusreq = {}
self.master = False self.master = False
self.session = None self.session = None
self.experiment = None
self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true") self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true")
self.debug = bool(server.mainserver.cfg['debug'].lower() == "true") self.debug = bool(server.mainserver.cfg['debug'].lower() == "true")
SocketServer.BaseRequestHandler.__init__(self, request, SocketServer.BaseRequestHandler.__init__(self, request,
@ -1414,7 +1415,7 @@ class CoreApi2RequestHandler(CoreRequestHandler):
while True: while True:
try: try:
data = self.recvmsg() data = self.recvmsg()
msgs = apibridge.CoreApiBridge.Api2toLegacy(data) msgs = apibridge.CoreApiBridge.Api2toLegacy(data, self)
if msgs: if msgs:
for msg in msgs: for msg in msgs:
self.session.broadcast(self, msg) self.session.broadcast(self, msg)
@ -1429,7 +1430,7 @@ class CoreApi2RequestHandler(CoreRequestHandler):
''' Dispatch a reply to a previously received message. ''' Dispatch a reply to a previously received message.
''' '''
api2Replies = apibridge.CoreApiBridge.LegacytoApi2( api2Replies = apibridge.CoreApiBridge.LegacytoApi2(
replies, self.session.getstate()) replies, self)
if api2Replies: if api2Replies:
for reply in api2Replies: for reply in api2Replies:
try: try:
@ -1450,10 +1451,15 @@ class CoreApi2RequestHandler(CoreRequestHandler):
for msg in msgs: for msg in msgs:
self.request.sendall(msg) self.request.sendall(msg)
def finish(self): def finish(self):
if self.verbose:
self.info("API2 client disconnected")
if self.session:
self.session.disconnect(self)
return SocketServer.BaseRequestHandler.finish(self) return SocketServer.BaseRequestHandler.finish(self)
def recvmsg(self): def recvmsg(self):
''' Receive data, parse a CoreMessage and queue it onto an existing ''' Receive data, parse a CoreMessage and queue it onto an existing
session handler's queue, if available. session handler's queue, if available.