moved from api to misc directory
This commit is contained in:
parent
ea3becb38b
commit
3359aeb323
1 changed files with 55 additions and 68 deletions
|
@ -9,22 +9,16 @@
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import core_pb2
|
import coreapi2
|
||||||
import struct, threading, traceback, sys
|
import threading, traceback, sys
|
||||||
from core.api import coreapi, corewrapper
|
from core.api import coreapi, corewrapper
|
||||||
from core.experiments import ExperimentStore
|
from core.experiments import ExperimentStore
|
||||||
|
|
||||||
wrapper = corewrapper
|
wrapper = corewrapper
|
||||||
legacy = coreapi
|
legacy = coreapi
|
||||||
|
|
||||||
API2HDRFMT = "H"
|
|
||||||
API2HDRSIZ = struct.calcsize(API2HDRFMT)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class CoreApiBridge(object):
|
class CoreApiBridge(object):
|
||||||
|
|
||||||
def __init__(self, handler):
|
def __init__(self, handler):
|
||||||
# The collector is used for gathering node messages sent by the core session,
|
# The collector is used for gathering node messages sent by the core session,
|
||||||
# for example, during INSTANTIATION as nodes are started until RUNTIME.
|
# for example, during INSTANTIATION as nodes are started until RUNTIME.
|
||||||
|
@ -45,12 +39,14 @@ class CoreApiBridge(object):
|
||||||
session handler's queue, if available.
|
session handler's queue, if available.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
data = coreapi2.recvAndUnpack(self.handler.request.recv)
|
||||||
|
'''
|
||||||
try:
|
try:
|
||||||
hdr = self.handler.request.recv(API2HDRSIZ)
|
hdr = self.handler.request.recv(coreapi2.API2HDRSIZ)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
raise IOError, "error receiving API 2 header (%s)" % e
|
raise IOError, "error receiving API 2 header (%s)" % e
|
||||||
|
|
||||||
if len(hdr) != API2HDRSIZ:
|
if len(hdr) != coreapi2.API2HDRSIZ:
|
||||||
if len(hdr) == 0:
|
if len(hdr) == 0:
|
||||||
raise EOFError, "client disconnected"
|
raise EOFError, "client disconnected"
|
||||||
else:
|
else:
|
||||||
|
@ -60,7 +56,7 @@ class CoreApiBridge(object):
|
||||||
data = ""
|
data = ""
|
||||||
while len(data) < dataToRead:
|
while len(data) < dataToRead:
|
||||||
data += self.handler.request.recv(dataToRead - len(data))
|
data += self.handler.request.recv(dataToRead - len(data))
|
||||||
|
'''
|
||||||
msgs = self.processApi2Message(data)
|
msgs = self.processApi2Message(data)
|
||||||
|
|
||||||
return msgs
|
return msgs
|
||||||
|
@ -75,7 +71,10 @@ class CoreApiBridge(object):
|
||||||
# send to API2 client
|
# send to API2 client
|
||||||
self.handler.request.sendall(reply)
|
self.handler.request.sendall(reply)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.warn("Error sending reply data: %s" % e)
|
print "-"*60
|
||||||
|
traceback.print_exc(file=sys.stdout)
|
||||||
|
print "-"*60
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
def sendall(self, data):
|
def sendall(self, data):
|
||||||
|
@ -99,7 +98,7 @@ class CoreApiBridge(object):
|
||||||
|
|
||||||
|
|
||||||
def processApi2Message(self, data):
|
def processApi2Message(self, data):
|
||||||
message = core_pb2.CoreMessage()
|
message = coreapi2.CoreMessage()
|
||||||
message.ParseFromString(data)
|
message.ParseFromString(data)
|
||||||
if message.HasField('session'):
|
if message.HasField('session'):
|
||||||
return self.processApi2SessionMsg(message.session,
|
return self.processApi2SessionMsg(message.session,
|
||||||
|
@ -143,7 +142,7 @@ class CoreApiBridge(object):
|
||||||
'''
|
'''
|
||||||
sessions = sessMsg.getNumber().split("|")
|
sessions = sessMsg.getNumber().split("|")
|
||||||
port_num = int(sessions[0])
|
port_num = int(sessions[0])
|
||||||
newMsg = core_pb2.CoreMessage()
|
newMsg = coreapi2.CoreMessage()
|
||||||
newMsg.session.clientId = 'client' + sessions[0]
|
newMsg.session.clientId = 'client' + sessions[0]
|
||||||
newMsg.session.port_num = port_num
|
newMsg.session.port_num = port_num
|
||||||
|
|
||||||
|
@ -163,8 +162,8 @@ class CoreApiBridge(object):
|
||||||
newMsg.session.all_exps.add(str(sid))
|
newMsg.session.all_exps.add(str(sid))
|
||||||
'''
|
'''
|
||||||
|
|
||||||
newMsg.purpose = core_pb2.ADD
|
newMsg.purpose = coreapi2.ADD
|
||||||
api2msgs.append(CoreApiBridge.packApi2(newMsg))
|
api2msgs.append(coreapi2.pack(newMsg))
|
||||||
elif parser.getType() == legacy.CORE_API_EVENT_MSG:
|
elif parser.getType() == legacy.CORE_API_EVENT_MSG:
|
||||||
event = parser.createWrapper()
|
event = parser.createWrapper()
|
||||||
'''
|
'''
|
||||||
|
@ -177,16 +176,16 @@ class CoreApiBridge(object):
|
||||||
print "\tsessions=", event.getSession()
|
print "\tsessions=", event.getSession()
|
||||||
'''
|
'''
|
||||||
if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE:
|
if event.getType() == legacy.CORE_EVENT_RUNTIME_STATE:
|
||||||
newMsg = core_pb2.CoreMessage()
|
newMsg = coreapi2.CoreMessage()
|
||||||
newMsg.purpose = core_pb2.STATE_CHANGE
|
newMsg.purpose = coreapi2.STATE_CHANGE
|
||||||
newMsg.event.state = core_pb2.RUNTIME
|
newMsg.event.state = coreapi2.RUNTIME
|
||||||
api2msgs.append(CoreApiBridge.packApi2(newMsg))
|
api2msgs.append(coreapi2.pack(newMsg))
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if self.collector:
|
if self.collector:
|
||||||
self.collector.experiment.running = True
|
self.collector.experiment.running = True
|
||||||
else:
|
else:
|
||||||
raise RuntimeError, "runtime entered without an instantiated experiment"
|
raise RuntimeError, "runtime entered without an instantiated experiment"
|
||||||
api2msgs.append(CoreApiBridge.packApi2(self.collector))
|
api2msgs.append(coreapi2.pack(self.collector))
|
||||||
self.collector = None
|
self.collector = None
|
||||||
|
|
||||||
elif parser.getType() == legacy.CORE_API_NODE_MSG:
|
elif parser.getType() == legacy.CORE_API_NODE_MSG:
|
||||||
|
@ -219,62 +218,50 @@ class CoreApiBridge(object):
|
||||||
api2_node=None
|
api2_node=None
|
||||||
api2_dev=None
|
api2_dev=None
|
||||||
with self.lock:
|
with self.lock:
|
||||||
for a_node in self.assocExperiment.nodes:
|
if self.assocExperiment:
|
||||||
if a_node.idx == node.getNumber():
|
nodeOrDev = None
|
||||||
api2_node = core_pb2.Node()
|
newMsg = None
|
||||||
break
|
if not self.collector:
|
||||||
if not api2_node:
|
newMsg = coreapi2.CoreMessage()
|
||||||
for a_device in self.assocExperiment.devices:
|
|
||||||
if a_device.idx == node.getNumber():
|
|
||||||
api2_dev = core_pb2.Device()
|
|
||||||
break
|
|
||||||
if api2_node:
|
|
||||||
api2_obj = api2_node
|
|
||||||
else:
|
|
||||||
api2_obj = api2_dev
|
|
||||||
|
|
||||||
if not api2_obj:
|
if coreapi2.findNodeByIdx(self.assocExperiment, node.getNumber()):
|
||||||
raise RuntimeError, "Node %d not in experiment" % (node.getNumber())
|
if self.collector:
|
||||||
|
nodeOrDev = coreapi2.getNodeByIdx(self.collector.experiment, node.getNumber())
|
||||||
|
else:
|
||||||
|
nodeOrDev = newMsg.node
|
||||||
|
elif coreapi2.findDeviceByIdx(self.assocExperiment, node.getNumber()):
|
||||||
|
if self.collector:
|
||||||
|
nodeOrDev = coreapi2.getDeviceByIdx(self.collector.experiment, node.getNumber())
|
||||||
|
else:
|
||||||
|
nodeOrDev = newMsg.device
|
||||||
|
|
||||||
api2_obj.idx = node.getNumber()
|
|
||||||
if node.getEmuid() is not None: api2_obj.emu_id=node.getEmuid()
|
|
||||||
if node.getName() is not None: api2_obj.name=node.getName()
|
|
||||||
if node.getXpos() is not None: api2_obj.location.x_pos=node.getXpos()
|
|
||||||
if node.getYpos() is not None: api2_obj.location.y_pos=node.getYpos()
|
|
||||||
|
|
||||||
if self.collector:
|
if nodeOrDev:
|
||||||
if api_node:
|
nodeOrDev.idx = node.getNumber()
|
||||||
self.collector.experiment.nodes.add().CopyFrom(api_node)
|
if node.getEmuid() is not None: nodeOrDev.emu_id=node.getEmuid()
|
||||||
|
if node.getName() is not None: nodeOrDev.name=node.getName()
|
||||||
|
if node.getXpos() is not None: nodeOrDev.location.x_pos=node.getXpos()
|
||||||
|
if node.getYpos() is not None: nodeOrDev.location.y_pos=node.getYpos()
|
||||||
|
|
||||||
|
if newMsg:
|
||||||
|
newMsg.purpose = coreapi2.MODIFY
|
||||||
|
api2msgs.append(coreapi2.pack(newMsg))
|
||||||
else:
|
else:
|
||||||
self.collector.experiment.devices.add().CopyFrom(api_device)
|
print "Received update for unknown node or device", node.getNumber()
|
||||||
|
else:
|
||||||
newMsg = core_pb2.CoreMessage()
|
print "Received update for node or device without an associated experiment", node.getNumber()
|
||||||
newMsg.purpose = core_pb2.MODIFY
|
|
||||||
if api_node:
|
|
||||||
newMsg.node = api_node
|
|
||||||
else:
|
|
||||||
newMsg.device = api_device
|
|
||||||
api2msgs.append(CoreApiBridge.packApi2(newMsg))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print "received message type", parser.getType()
|
print "received message type", parser.getType()
|
||||||
return api2msgs
|
return api2msgs
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def packApi2(message):
|
|
||||||
''' Pack an API2 message for transmission
|
|
||||||
'''
|
|
||||||
data = message.SerializeToString()
|
|
||||||
return struct.pack(API2HDRFMT, len(data)) + data
|
|
||||||
|
|
||||||
def processApi2SessionMsg(self, message, purpose):
|
def processApi2SessionMsg(self, message, purpose):
|
||||||
print 'Received session request message'
|
print 'Received session request message'
|
||||||
if purpose == core_pb2.ADD:
|
if purpose == coreapi2.ADD:
|
||||||
legacymsgs = []
|
legacymsgs = []
|
||||||
legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true'))
|
legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true'))
|
||||||
return legacymsgs
|
return legacymsgs
|
||||||
# The response will be sent to the API2 client when a legacy session message is received from the daemon
|
# The response will be sent to the API2 client when a legacy session message is received from the daemon
|
||||||
elif purpose == core_pb2.DELETE:
|
elif purpose == coreapi2.DELETE:
|
||||||
# TODO: shutdown session
|
# TODO: shutdown session
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -282,9 +269,9 @@ class CoreApiBridge(object):
|
||||||
|
|
||||||
|
|
||||||
def processApi2ExperimentMsg(self, exp, purpose):
|
def processApi2ExperimentMsg(self, exp, purpose):
|
||||||
if purpose == core_pb2.ADD:
|
if purpose == coreapi2.ADD:
|
||||||
if ExperimentStore.addExperiment(exp):
|
if ExperimentStore.addExperiment(exp):
|
||||||
response = core_pb2.CoreMessage()
|
response = coreapi2.CoreMessage()
|
||||||
response.experiment.experimentId = exp.experimentId;
|
response.experiment.experimentId = exp.experimentId;
|
||||||
response.purpose = purpose
|
response.purpose = purpose
|
||||||
|
|
||||||
|
@ -295,11 +282,11 @@ class CoreApiBridge(object):
|
||||||
self.collector = response
|
self.collector = response
|
||||||
else:
|
else:
|
||||||
raise RuntimeError, "Instantiation of experiment while another is active"
|
raise RuntimeError, "Instantiation of experiment while another is active"
|
||||||
self.handler.request.sendall(CoreApiBridge.packApi2(response)) # TODO: Fix this
|
self.handler.request.sendall(coreapi2.pack(response)) # TODO: Fix this
|
||||||
return self.translateApi2ExperimentMsg(exp)
|
return self.translateApi2ExperimentMsg(exp)
|
||||||
else:
|
else:
|
||||||
return self.Api2Error("unable to add experiment")
|
return self.Api2Error("unable to add experiment")
|
||||||
elif purpose == core_pb2.MODIFY:
|
elif purpose == coreapi2.MODIFY:
|
||||||
# Detect if a change in state is requested
|
# Detect if a change in state is requested
|
||||||
if exp.HasField('running'):
|
if exp.HasField('running'):
|
||||||
if exp.running:
|
if exp.running:
|
Loading…
Reference in a new issue