moved as much api2 handling as possible into apibridge.py

This commit is contained in:
Rod A Santiago 2016-08-11 16:51:46 -07:00
parent cf2912e8a4
commit 3cf9c80d15
2 changed files with 164 additions and 84 deletions

View file

@ -10,7 +10,7 @@
import core_pb2 import core_pb2
import struct import struct, threading, traceback, sys
from core.api import coreapi, corewrapper from core.api import coreapi, corewrapper
from core.experiments import ExperimentStore from core.experiments import ExperimentStore
@ -25,21 +25,93 @@ API2HDRSIZ = struct.calcsize(API2HDRFMT)
class CoreApiBridge(object): class CoreApiBridge(object):
@staticmethod def __init__(self, handler):
def Api2toLegacy(data, handler): # The collector is used for gathering node messages sent by the core session,
# for example, during INSTANTIATION as nodes are started until RUNTIME.
self.collector = None
# The currently associated (added or joined) experiment
self.assocExperiment = None
# Mutex
self.lock = threading.Lock()
# Reference to the owning handler in the core-daemon
self.handler = handler
def recvmsg(self):
''' Receive data, parse a CoreMessage and queue it onto an existing
session handler's queue, if available.
'''
try:
hdr = self.handler.request.recv(API2HDRSIZ)
except Exception, e:
raise IOError, "error receiving API 2 header (%s)" % e
if len(hdr) != API2HDRSIZ:
if len(hdr) == 0:
raise EOFError, "client disconnected"
else:
raise IOError, "invalid message header size"
dataToRead = struct.unpack(API2HDRFMT, hdr)[0]
data = ""
while len(data) < dataToRead:
data += self.handler.request.recv(dataToRead - len(data))
msgs = self.processApi2Message(data)
return msgs
def dispatchreplies(self, replies):
''' Dispatch a reply to a previously received message.
'''
api2Replies = self.processLegacyCoreMessage(replies)
if api2Replies:
for reply in api2Replies:
try:
# send to API2 client
self.handler.request.sendall(reply)
except Exception, e:
self.warn("Error sending reply data: %s" % e)
def sendall(self, data):
''' The daemon calls this method with legacy API data. Convert first
API2 then send.
'''
try:
msgs = self.processLegacyCoreMessage((data,))
if msgs:
for msg in msgs:
self.handler.request.sendall(msg)
except Exception, e:
print "-"*60
traceback.print_exc(file=sys.stdout)
print "-"*60
raise e
def processApi2Message(self, data):
message = core_pb2.CoreMessage() message = core_pb2.CoreMessage()
message.ParseFromString(data) message.ParseFromString(data)
if message.HasField('session'): if message.HasField('session'):
return CoreApiBridge.translateApi2SessionMsg(message.session) return self.processApi2SessionMsg(message.session,
message.purpose)
if message.HasField('experiment'): if message.HasField('experiment'):
return CoreApiBridge.handleApi2ExperimentMsg(message.experiment, return self.processApi2ExperimentMsg(message.experiment,
message.purpose, message.purpose)
handler)
if message.HasField('event'): if message.HasField('event'):
return CoreApiBridge.translateEvent(message.event) return self.processApi2Event(message.event,
message.purpose)
@staticmethod def processLegacyCoreMessage(self, messages):
def LegacytoApi2(messages, handler):
api2msgs = [] api2msgs = []
for msgstr in messages: for msgstr in messages:
# Unpack the message # Unpack the message
@ -90,6 +162,7 @@ class CoreApiBridge(object):
else: else:
newMsg.session.all_exps.add(str(sid)) newMsg.session.all_exps.add(str(sid))
''' '''
newMsg.purpose = core_pb2.ADD newMsg.purpose = core_pb2.ADD
api2msgs.append(CoreApiBridge.packApi2(newMsg)) api2msgs.append(CoreApiBridge.packApi2(newMsg))
elif parser.getType() == legacy.CORE_API_EVENT_MSG: elif parser.getType() == legacy.CORE_API_EVENT_MSG:
@ -108,9 +181,16 @@ class CoreApiBridge(object):
newMsg.purpose = core_pb2.STATE_CHANGE newMsg.purpose = core_pb2.STATE_CHANGE
newMsg.event.state = core_pb2.RUNTIME newMsg.event.state = core_pb2.RUNTIME
api2msgs.append(CoreApiBridge.packApi2(newMsg)) api2msgs.append(CoreApiBridge.packApi2(newMsg))
with self.lock:
if self.collector:
self.collector.experiment.running = True
else:
raise RuntimeError, "runtime entered without an instantiated experiment"
api2msgs.append(CoreApiBridge.packApi2(self.collector))
self.collector = None
elif parser.getType() == legacy.CORE_API_NODE_MSG: elif parser.getType() == legacy.CORE_API_NODE_MSG:
node = parser.createWrapper() node = parser.createWrapper()
'''
print "Node:" print "Node:"
print "\tnumber=", node.getNumber() print "\tnumber=", node.getNumber()
print "\ttype=", node.getType() print "\ttype=", node.getType()
@ -132,47 +212,93 @@ class CoreApiBridge(object):
print "\talt=", node.getAlt() print "\talt=", node.getAlt()
print "\ticon=", node.getIcon() print "\ticon=", node.getIcon()
print "\topaque=", node.getOpaque() print "\topaque=", node.getOpaque()
'''
if handler.session.getstate() == legacy.CORE_EVENT_INSTANTIATION_STATE: if handler.session.getstate() == legacy.CORE_EVENT_INSTANTIATION_STATE:
''' '''
api2_node=None
api2_dev=None
with self.lock:
for a_node in self.assocExperiment.nodes:
if a_node.idx == node.getNumber():
api2_node = core_pb2.Node()
break
if not api2_node:
for a_device in self.assocExperiment.devices:
if a_device.idx == node.getNumber():
api2_dev = core_pb2.Device()
break
if api2_node:
api2_obj = api2_node
else:
api2_obj = api2_dev
if not api2_obj:
raise RuntimeError, "Node %d not in experiment" % (node.getNumber())
api2_obj.idx = node.getNumber()
if node.getEmuid() is not None: api2_obj.emu_id=node.getEmuid()
if node.getName() is not None: api2_obj.name=node.getName()
if node.getXpos() is not None: api2_obj.location.x_pos=node.getXpos()
if node.getYpos() is not None: api2_obj.location.y_pos=node.getYpos()
if self.collector:
if api_node:
self.collector.experiment.nodes.add().CopyFrom(api_node)
else:
self.collector.experiment.devices.add().CopyFrom(api_device)
newMsg = core_pb2.CoreMessage() newMsg = core_pb2.CoreMessage()
newMsg.node.idx=node.getNumber()
if node.getEmuid(): newMsg.node.emu_id=node.getEmuid()
if node.getName(): newMsg.node.name=node.getName()
if node.getXpos(): newMsg.node.x_pos=node.getXpos()
if node.getYpos(): newMsg.node.y_pos=node.getYpos()
# TODO: Collect all node updates into an experiment message
newMsg.purpose = core_pb2.MODIFY newMsg.purpose = core_pb2.MODIFY
if api_node:
newMsg.node = api_node
else:
newMsg.device = api_device
api2msgs.append(CoreApiBridge.packApi2(newMsg)) api2msgs.append(CoreApiBridge.packApi2(newMsg))
else: else:
print "received message type", parser.getType() print "received message type", parser.getType()
return api2msgs return api2msgs
@staticmethod @staticmethod
def packApi2(message): def packApi2(message):
''' Pack an API2 message for transmission
'''
data = message.SerializeToString() data = message.SerializeToString()
return struct.pack(API2HDRFMT, len(data)) + data return struct.pack(API2HDRFMT, len(data)) + data
@staticmethod def processApi2SessionMsg(self, message, purpose):
def translateApi2SessionMsg(message):
print 'Received session request message' print 'Received session request message'
msgs = [] if purpose == core_pb2.ADD:
msgs.append(wrapper.RegMsg.instantiate(0, gui='true')) legacymsgs = []
return msgs legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true'))
return legacymsgs
# The response will be sent to the API2 client when a legacy session message is received from the daemon
elif purpose == core_pb2.DELETE:
# TODO: shutdown session
pass
else:
print 'Received invalid purpose for SESSION'
@staticmethod def processApi2ExperimentMsg(self, exp, purpose):
def handleApi2ExperimentMsg(exp, purpose, handler):
if purpose == core_pb2.ADD: if purpose == core_pb2.ADD:
if ExperimentStore.addExperiment(exp): if ExperimentStore.addExperiment(exp):
response = core_pb2.CoreMessage() response = core_pb2.CoreMessage()
response.experiment.experimentId = exp.experimentId; response.experiment.experimentId = exp.experimentId;
response.purpose = purpose response.purpose = purpose
handler.request.sendall(CoreApiBridge.packApi2(response))
return CoreApiBridge.translateExperimentMsg(exp) # Start a collector for gathering node messages instantiated in the core session
with self.lock:
if not self.collector:
self.assocExperiment = exp
self.collector = response
else:
raise RuntimeError, "Instantiation of experiment while another is active"
self.handler.request.sendall(CoreApiBridge.packApi2(response)) # TODO: Fix this
return self.translateApi2ExperimentMsg(exp)
else: else:
return CoreApiBridge.Api2Error("unable to add experiment") return self.Api2Error("unable to add experiment")
elif purpose == core_pb2.MODIFY: elif purpose == core_pb2.MODIFY:
# Detect if a change in state is requested # Detect if a change in state is requested
if exp.HasField('running'): if exp.HasField('running'):
@ -194,8 +320,7 @@ class CoreApiBridge(object):
print "Unsupported experiment modification received" print "Unsupported experiment modification received"
@staticmethod def translateApi2ExperimentMsg(self, message):
def translateExperimentMsg(message):
print 'Received experiment message' print 'Received experiment message'
msgs = [] msgs = []
# Flag need to be 0 otherwise CORE will not enter runtime state (per JavaAdaptor, need verification) # Flag need to be 0 otherwise CORE will not enter runtime state (per JavaAdaptor, need verification)
@ -214,14 +339,10 @@ class CoreApiBridge(object):
dataTypes=(9,9,9,9,9,9), dataTypes=(9,9,9,9,9,9),
dataValues='0|0| 47.5766974863|-122.125920191|0.0|150.0')) dataValues='0|0| 47.5766974863|-122.125920191|0.0|150.0'))
# TODO
# Send control net configuration # Send control net configuration
# TODO
# send node types # send node types
# TODO
# send services # send services
# TODO
# send nodes # send nodes
devices = {} devices = {}
@ -234,13 +355,6 @@ class CoreApiBridge(object):
legacy.CORE_API_ADD_FLAG|legacy.CORE_API_STR_FLAG, legacy.CORE_API_ADD_FLAG|legacy.CORE_API_STR_FLAG,
node.idx, node.idx,
str(node.name))) str(node.name)))
'''
for iface in node.interfaces:
msgs.append(wrapper.IfaceMsg.instantiate(legacy.CORE_API_ADD_FLAG,
node.idx,
iface.idx,
ip4=iface.ip4_addr))
'''
for device in message.devices: for device in message.devices:
if device.idx in devices: if device.idx in devices:
@ -276,17 +390,13 @@ class CoreApiBridge(object):
if1ip4=if0.ip4_addr if if0.HasField("ip4_addr") else None, if1ip4=if0.ip4_addr if if0.HasField("ip4_addr") else None,
if2ip4=if1.ip4_addr if if1.HasField("ip4_addr") else None)) if2ip4=if1.ip4_addr if if1.HasField("ip4_addr") else None))
# send metadata
# TODO # TODO
# send metadata
return msgs return msgs
@staticmethod def processApi2Event(self, event, purpose):
def translateEvent(event):
print 'Received event' print 'Received event'

View file

@ -1395,7 +1395,7 @@ class CoreApi2RequestHandler(CoreRequestHandler):
self.nodestatusreq = {} self.nodestatusreq = {}
self.master = False self.master = False
self.session = None self.session = None
self.experiment = None self.bridge = apibridge.CoreApiBridge(self)
self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true") self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true")
self.debug = bool(server.mainserver.cfg['debug'].lower() == "true") self.debug = bool(server.mainserver.cfg['debug'].lower() == "true")
SocketServer.BaseRequestHandler.__init__(self, request, SocketServer.BaseRequestHandler.__init__(self, request,
@ -1414,8 +1414,7 @@ class CoreApi2RequestHandler(CoreRequestHandler):
self.session.connect(self) self.session.connect(self)
while True: while True:
try: try:
data = self.recvmsg() msgs = self.recvmsg()
msgs = apibridge.CoreApiBridge.Api2toLegacy(data, self)
if msgs: if msgs:
for msg in msgs: for msg in msgs:
self.session.broadcast(self, msg) self.session.broadcast(self, msg)
@ -1429,27 +1428,14 @@ class CoreApi2RequestHandler(CoreRequestHandler):
def dispatchreplies(self, replies): def dispatchreplies(self, replies):
''' Dispatch a reply to a previously received message. ''' Dispatch a reply to a previously received message.
''' '''
api2Replies = apibridge.CoreApiBridge.LegacytoApi2( self.bridge.dispatchreplies(replies)
replies, self)
if api2Replies:
for reply in api2Replies:
try:
# send to API2 client
self.request.sendall(reply)
except Exception, e:
self.warn("Error sending reply data: %s" % e)
def sendall(self, data): def sendall(self, data):
''' The daemon calls this method with legacy API data. Convert first ''' The daemon calls this method with legacy API data. Convert first
API2 then send. API2 then send.
''' '''
self.bridge.sendall(data)
msgs = apibridge.CoreApiBridge.LegacytoApi2(
(data,), self.session.getstate())
if msgs:
for msg in msgs:
self.request.sendall(msg)
def finish(self): def finish(self):
if self.verbose: if self.verbose:
@ -1465,23 +1451,7 @@ class CoreApi2RequestHandler(CoreRequestHandler):
session handler's queue, if available. session handler's queue, if available.
''' '''
try: return self.bridge.recvmsg()
hdr = self.request.recv(apibridge.API2HDRSIZ)
except Exception, e:
raise IOError, "error receiving API 2 header (%s)" % e
if len(hdr) != apibridge.API2HDRSIZ:
if len(hdr) == 0:
raise EOFError, "client disconnected"
else:
raise IOError, "invalid message header size"
dataToRead = struct.unpack(apibridge.API2HDRFMT, hdr)[0]
data = ""
while len(data) < dataToRead:
data += self.request.recv(dataToRead - len(data))
return data