handled config messages from the daemon;handled link messages from the daemon; handled modified session message from an api2 client to connect with an experiment that has already been added by other clients
This commit is contained in:
parent
a6e7c0da7e
commit
1dfacdbaea
1 changed files with 148 additions and 23 deletions
|
@ -54,6 +54,8 @@ emulationDict = {
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
class CoreApiBridge(object):
|
||||
|
||||
def __init__(self, handler):
|
||||
|
@ -61,7 +63,9 @@ class CoreApiBridge(object):
|
|||
# for example, during INSTANTIATION as nodes are started until RUNTIME.
|
||||
self.collector = None
|
||||
|
||||
# The currently associated (added or joined) experiment
|
||||
# The currently associated (client added) experiment.
|
||||
# This will be use to get contextual information for partial messages as nodes
|
||||
# in the experiments are instantiated by the daemon.
|
||||
self.assocExperiment = None
|
||||
|
||||
# Mutex
|
||||
|
@ -160,6 +164,10 @@ class CoreApiBridge(object):
|
|||
|
||||
elif parser.getType() == legacy.CORE_API_NODE_MSG:
|
||||
self.processLegacyNodeMsg(parser.createWrapper(), api2msgs)
|
||||
|
||||
elif parser.getType() == legacy.CORE_API_CONF_MSG:
|
||||
self.processLegacyConfigMsg(parser.createWrapper(), api2msgs)
|
||||
|
||||
else:
|
||||
self.warn("received message type %s" % (parser.getType()))
|
||||
return api2msgs
|
||||
|
@ -261,6 +269,50 @@ class CoreApiBridge(object):
|
|||
api2msgs.append(coreapi2.pack(self.collector))
|
||||
self.collector = None
|
||||
|
||||
def processLegacyConfigMsg(self, confMsg, api2msgs):
|
||||
'''
|
||||
Intercept an outgoing config message generated by the CORE daemon and generate the equivalent
|
||||
API2 messages to send to the client
|
||||
'''
|
||||
|
||||
print "Config:"
|
||||
print "\tobj=", confMsg.getObj()
|
||||
print "\tnode=", confMsg.getNode()
|
||||
print "\ttype=", confMsg.getType()
|
||||
print "\tdata=", confMsg.getData()
|
||||
print "\tvalues=", confMsg.getValues()
|
||||
print "\tcaptions=", confMsg.getCaptions()
|
||||
print "\tbitmap=", confMsg.getBitmap()
|
||||
print "\tposs values=", confMsg.getPossible()
|
||||
print "\tgroups=", confMsg.getGroups()
|
||||
print "\tsession=", confMsg.getSession()
|
||||
print "\tnetid=", confMsg.getNetid()
|
||||
print "\topaque=", confMsg.getOpaque()
|
||||
|
||||
|
||||
# The CONFIG message will have its 'object' field set to the string literal
|
||||
# "session" if it is ending a stream of node and link messages sent to a
|
||||
# client that has joined a running experiment (legacy:session).
|
||||
if confMsg.getObj() == "session":
|
||||
# TODO: Process the values field
|
||||
|
||||
# Send what has been collected to the client
|
||||
with self.lock:
|
||||
if self.collector:
|
||||
self.collector.experiment.running = True
|
||||
self.collector.purpose = coreapi2.MODIFY
|
||||
else:
|
||||
raise RuntimeError, "runtime entered without an instantiated experiment"
|
||||
api2msgs.append(coreapi2.pack(self.collector))
|
||||
self.assocExperiment = self.collector.experiment
|
||||
self.collector = None
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def processLegacyNodeMsg(self, nodeMsg, api2msgs):
|
||||
'''
|
||||
Intercept an outgoing legacy node message generated by the CORE daemon and generate the equivalent
|
||||
|
@ -292,15 +344,20 @@ class CoreApiBridge(object):
|
|||
api2_node=None
|
||||
api2_dev=None
|
||||
with self.lock:
|
||||
#if self.assocExperiment:
|
||||
nodeOrDev = None
|
||||
|
||||
# If collection is active (i.e. joining a session is in progress) collect node
|
||||
# information into an experiment message. Otherise, send as an independent Node or
|
||||
# Device message.
|
||||
newMsg = None
|
||||
if not self.collector:
|
||||
newMsg = coreapi2.CoreMessage()
|
||||
|
||||
# The legacy API uses a Node message to convey everything from hubs to hosts. But it does
|
||||
# always have type information in it.
|
||||
# Check if the legacy message updates an existing API2 node or a device
|
||||
# The legacy API uses a Node message to convey everything from hubs to hosts.
|
||||
# But it does always have type information in it. Get context information
|
||||
# from assocExperiment for the newly instantiated node or device being conveyed
|
||||
# by the Node message.
|
||||
# Determine if the legacy message if for a node or a device
|
||||
nodeOrDev = None
|
||||
isNode = self.assocExperiment and coreapi2.findNodeByIdx(self.assocExperiment,
|
||||
nodeMsg.getNumber())
|
||||
isDev = self.assocExperiment and coreapi2.findDeviceByIdx(self.assocExperiment,
|
||||
|
@ -314,8 +371,8 @@ class CoreApiBridge(object):
|
|||
raise RuntimeError, "Inconsistent device types."
|
||||
|
||||
if not isNode and not isDev and nodeMsg.getType() is not None:
|
||||
isNode = isNode(nodeMsg.getType())
|
||||
isDev = isDev(nodeMsg.getType())
|
||||
isNode = nodeMsg.getType() in nodeTypesSet
|
||||
isDev = nodeMsg.getType() in deviceTypesSet
|
||||
|
||||
# Add the node/device to either the experiment object in the collector for transmission as
|
||||
# part of a API2 session/experiment, or to a new message for independent API2 Node or
|
||||
|
@ -360,14 +417,75 @@ class CoreApiBridge(object):
|
|||
api2msgs.append(coreapi2.pack(newMsg))
|
||||
else:
|
||||
self.warn("Received update for unknown node or device %d" % (nodeMsg.getNumber()))
|
||||
|
||||
|
||||
def processLegacyLinkMsg(self, linkMsg, api2msgs):
|
||||
'''
|
||||
Intercept an outgoing legacy link message generated by the CORE daemon and generate the equivalent
|
||||
API2 messages to send to the client
|
||||
'''
|
||||
|
||||
# When collecting information for a complete experment (i.e. when joining an experiment),
|
||||
# add information from legacy Link messages to the experiment message being constructed.
|
||||
# Otherwise, send as an independent Channel message.
|
||||
newMsg = None
|
||||
if not self.collector:
|
||||
newMsg = coreapi2.CoreMessage()
|
||||
|
||||
|
||||
if not self.collector.experiment:
|
||||
raise RuntimeError, 'Invalid collector'
|
||||
|
||||
if not self.collector.experiment.network:
|
||||
self.collector.experiment.network.name = "default"
|
||||
self.collector.experiment.network.idx = 1
|
||||
|
||||
|
||||
# Endpoint 1
|
||||
if coreapi2.findNodeByIdx(self.collector.experiment, linkMsg.getN1number()):
|
||||
ep1 = coreapi2.getNodeInterfaceByIdx(self.collector.experiment,
|
||||
linkMsg.getN1number(),
|
||||
linkMsg.getIf1num())
|
||||
else:
|
||||
ep1 = coreapi2.getDeviceInterfaceByIdx(self.collector.experiment,
|
||||
linkMsg.getN1number(),
|
||||
linkMsg.getIf1num())
|
||||
if linkMsg.getIf1ip4(): ep1.ip4_addr = linkMsg.getIf1ip4()
|
||||
if linkMsg.getIf1ip4mask(): ep1.ip4_mask = linkMsg.getIf1ip4mask()
|
||||
# TODO: Add IPv6 fields updates
|
||||
|
||||
# Endpoint 2
|
||||
if coreapi2.findNodeByIdx(self.collector.experiment, linkMsg.getN2number()):
|
||||
ep2 = coreapi2.getNodeInterfaceByIdx(self.collector.experiment,
|
||||
linkMsg.getN2number(),
|
||||
linkMsg.getIf2num())
|
||||
else:
|
||||
ep2 = coreapi2.getDeviceInterfaceByIdx(self.collector.experiment,
|
||||
linkMsg.getN2number(),
|
||||
linkMsg.getIf2num())
|
||||
if linkMsg.getIf2ip4(): ep2.ip4_addr = linkMsg.getIf2ip4()
|
||||
if linkMsg.getIf2ip4mask(): ep2.ip4_mask = linkMsg.getIf2ip4mask()
|
||||
# TODO: Add IPv6 fields updates
|
||||
|
||||
# Capture updated link characteristics onto the Channel message
|
||||
channel = coreapi2.getChannel(self.collector.network, ep1_interface, ep2_interface)
|
||||
if linkMsg.getDelay() : channel.delay = linkMsg.getDelay()
|
||||
if linkMsg.getBw(): channel.bandwidth = linkMsg.getBw()
|
||||
if linkMsg.getPer(): channel.per = linkMsg.getPer()
|
||||
if linkMsg.getDup(): channel.dups = linkMsg.getDup()
|
||||
if linkMsg.getJitter(): channel.jitter = linkMsg.getJitter()
|
||||
if linkMsg.getMer(): channel.mer = linkMsg.getMer()
|
||||
if linkMsg.getBurst(): channel.burst = linkMsg.getBurst()
|
||||
#TODO: Add remaining channel parameters
|
||||
|
||||
|
||||
|
||||
def processApi2SessionMsg(self, message, purpose):
|
||||
legacymsgs = []
|
||||
if purpose == coreapi2.ADD:
|
||||
if self.handler.debug:
|
||||
self.info('Received ADD session request message')
|
||||
|
||||
legacymsgs = []
|
||||
legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true'))
|
||||
return legacymsgs
|
||||
# The response will be sent to the API2 client when a legacy session message is received from the daemon
|
||||
|
@ -375,7 +493,6 @@ class CoreApiBridge(object):
|
|||
if self.handler.debug:
|
||||
self.info('Received MODIFY session request message')
|
||||
|
||||
legacymsgs = []
|
||||
if message.HasField("experiment"):
|
||||
exp = message.experiment
|
||||
if exp.HasField("experimentId"):
|
||||
|
@ -384,19 +501,24 @@ class CoreApiBridge(object):
|
|||
response.experiment.experimentId = exp.experimentId;
|
||||
response.purpose = purpose
|
||||
with self.lock:
|
||||
self.assocExperiment = exp
|
||||
# Start a collector for nodes, devices and channels in the
|
||||
self.collector = response
|
||||
if expId.startswith('_'):
|
||||
try:
|
||||
legacySessNo = int(expId[1:])
|
||||
legacymsgs.append(wrapper.ConfMsg.instantiate("all",
|
||||
coreapi.CONF_TYPE_FLAGS_RESET))
|
||||
legacymsgs.append(wrapper.RegMsg.instantiate(0, gui='true'))
|
||||
legacymsgs.append(wrapper.SessionMsg.instantiate(0, legacySessNo))
|
||||
except:
|
||||
# TODO: get legacy session number from experimentId if running, or pass back
|
||||
# non-running experiment components
|
||||
pass
|
||||
if self.handler.debug:
|
||||
self.info('request connection to session %d' % (legacySessNo))
|
||||
msg = wrapper.SessionMsg.instantiate(
|
||||
coreapi.CORE_API_ADD_FLAG | coreapi.CORE_API_STR_FLAG,
|
||||
str(legacySessNo), nodecount="0")
|
||||
legacymsgs.append(msg)
|
||||
|
||||
except Exception, e:
|
||||
if self.handler.debug:
|
||||
self.info("-"*60)
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
self.info("-"*60)
|
||||
raise e
|
||||
else:
|
||||
# TODO: get legacy session number from experimentId if running, or pass back
|
||||
# non-running experiment components
|
||||
|
@ -406,11 +528,12 @@ class CoreApiBridge(object):
|
|||
else:
|
||||
self.warn("session modify request without an experiment")
|
||||
|
||||
|
||||
return legacymsgs
|
||||
elif purpose == coreapi2.DELETE:
|
||||
# TODO: shutdown session
|
||||
pass
|
||||
legacymsgs.append(wrapper.ConfMsg.instantiate("all",
|
||||
coreapi.CONF_TYPE_FLAGS_RESET))
|
||||
# TODO: Remove experiment from dictionary
|
||||
|
||||
else:
|
||||
self.warn('Received invalid purpose for SESSION')
|
||||
|
||||
|
@ -422,7 +545,9 @@ class CoreApiBridge(object):
|
|||
response.experiment.experimentId = exp.experimentId;
|
||||
response.purpose = purpose
|
||||
|
||||
# Start a collector for gathering node messages instantiated in the core session
|
||||
# Associate the newly added experiment with this bridge.
|
||||
# Start a collector for gathering messages for nodes and links instantiated
|
||||
# by the daemon
|
||||
with self.lock:
|
||||
if not self.collector:
|
||||
self.assocExperiment = exp
|
||||
|
|
Loading…
Reference in a new issue