changes to support not modifying controlnet configuration messages and avoid issues with setting the master meane config when dealing with distributed emane

This commit is contained in:
Blake Harnden 2019-06-18 10:33:16 -07:00
parent 0af3629ac6
commit 948b1126ba
6 changed files with 25 additions and 80 deletions

View file

@ -121,7 +121,6 @@ class CoreBroker(object):
self.physical_nodes = set() self.physical_nodes = set()
# allows for other message handlers to process API messages (e.g. EMANE) # allows for other message handlers to process API messages (e.g. EMANE)
self.handlers = set() self.handlers = set()
self.handlers.add(self.handle_distributed)
# dict with tunnel key to tunnel device mapping # dict with tunnel key to tunnel device mapping
self.tunnels = {} self.tunnels = {}
self.dorecvloop = False self.dorecvloop = False
@ -1049,62 +1048,3 @@ class CoreBroker(object):
if not server.instantiation_complete: if not server.instantiation_complete:
return False return False
return True return True
def handle_distributed(self, message):
"""
Handle the session options config message as it has reached the
broker. Options requiring modification for distributed operation should
be handled here.
:param message: message to handle
:return: nothing
"""
if not self.session.master:
return
if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session":
return
values_str = message.get_tlv(ConfigTlvs.VALUES.value)
if values_str is None:
return
value_strings = values_str.split("|")
for value_string in value_strings:
key, _value = value_string.split("=", 1)
if key == "controlnet":
self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string))
def handle_distributed_control_net(self, message, values, index):
"""
Modify Config Message if multiple control network prefixes are
defined. Map server names to prefixes and repack the message before
it is forwarded to slave servers.
:param message: message to handle
:param list values: values to handle
:param int index: index ti get key value from
:return: nothing
"""
key_value = values[index]
_key, value = key_value.split("=", 1)
control_nets = value.split()
if len(control_nets) < 2:
logging.warning("multiple controlnet prefixes do not exist")
return
servers = self.session.broker.getservernames()
if len(servers) < 2:
logging.warning("not distributed")
return
servers.remove("localhost")
# master always gets first prefix
servers.insert(0, "localhost")
# create list of "server1:ctrlnet1 server2:ctrlnet2 ..."
control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets))
values[index] = "controlnet=%s" % (" ".join(control_nets))
values_str = "|".join(values)
message.tlv_data[ConfigTlvs.VALUES.value] = values_str
message.repack()

View file

@ -510,7 +510,6 @@ class CoreHandler(socketserver.BaseRequestHandler):
:param message: message for replies :param message: message for replies
:return: nothing :return: nothing
""" """
logging.debug("dispatching replies: %s", replies)
for reply in replies: for reply in replies:
message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(reply) message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(reply)
try: try:
@ -524,7 +523,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
reply_message = "CoreMessage (type %d flags %d length %d)" % ( reply_message = "CoreMessage (type %d flags %d length %d)" % (
message_type, message_flags, message_length) message_type, message_flags, message_length)
logging.debug("dispatch reply:\n%s", reply_message) logging.debug("sending reply:\n%s", reply_message)
try: try:
self.sendall(reply) self.sendall(reply)

View file

@ -2,6 +2,7 @@
emane.py: definition of an Emane class for implementing configuration control of an EMANE emulation. emane.py: definition of an Emane class for implementing configuration control of an EMANE emulation.
""" """
import copy
import logging import logging
import os import os
import threading import threading
@ -443,10 +444,13 @@ class EmaneManager(ModelManager):
continue continue
platformid += 1 platformid += 1
# create temporary config for updating distributed nodes
typeflags = ConfigFlags.UPDATE.value typeflags = ConfigFlags.UPDATE.value
self.set_config("platform_id_start", str(platformid)) config = copy.deepcopy(self.get_configs())
self.set_config("nem_id_start", str(nemid)) config["platform_id_start"] = str(platformid)
config_data = ConfigShim.config_data(0, None, typeflags, self.emane_config, self.get_configs()) config["nem_id_start"] = str(nemid)
config_data = ConfigShim.config_data(0, None, typeflags, self.emane_config, config)
message = dataconversion.convert_config(config_data) message = dataconversion.convert_config(config_data)
server.sock.send(message) server.sock.send(message)
# increment nemid for next server by number of interfaces # increment nemid for next server by number of interfaces
@ -477,26 +481,26 @@ class EmaneManager(ModelManager):
be configured. This generates configuration for slave control nets be configured. This generates configuration for slave control nets
using the default list of prefixes. using the default list of prefixes.
""" """
session = self.session
# slave server # slave server
session = self.session
if not session.master: if not session.master:
return return
servers = session.broker.getservernames()
# not distributed # not distributed
servers = session.broker.getservernames()
if len(servers) < 2: if len(servers) < 2:
return return
prefix = session.options.get_config("controlnet")
prefixes = prefix.split()
# normal Config messaging will distribute controlnets # normal Config messaging will distribute controlnets
if len(prefixes) >= len(servers): prefix = session.options.get_config("controlnet", default="")
return prefixes = prefix.split()
if len(prefixes) < len(servers):
logging.info("setting up default controlnet prefixes for distributed (%d configured)", len(prefixes))
prefix = ctrlnet.DEFAULT_PREFIX_LIST[0]
# this generates a config message having controlnet prefix assignments # this generates a config message having controlnet prefix assignments
logging.info("Setting up default controlnet prefixes for distributed (%d configured)" % len(prefixes)) logging.info("setting up controlnet prefixes for distributed: %s", prefix)
prefixes = ctrlnet.DEFAULT_PREFIX_LIST[0] vals = "controlnet=%s" % prefix
vals = 'controlnet="%s"' % prefixes
tlvdata = b"" tlvdata = b""
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "session") tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "session")
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, 0) tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, 0)
@ -504,6 +508,7 @@ class EmaneManager(ModelManager):
rawmsg = coreapi.CoreConfMessage.pack(0, tlvdata) rawmsg = coreapi.CoreConfMessage.pack(0, tlvdata)
msghdr = rawmsg[:coreapi.CoreMessage.header_len] msghdr = rawmsg[:coreapi.CoreMessage.header_len]
msg = coreapi.CoreConfMessage(flags=0, hdr=msghdr, data=rawmsg[coreapi.CoreMessage.header_len:]) msg = coreapi.CoreConfMessage(flags=0, hdr=msghdr, data=rawmsg[coreapi.CoreMessage.header_len:])
logging.debug("sending controlnet message:\n%s", msg)
self.session.broker.handle_message(msg) self.session.broker.handle_message(msg)
def check_node_models(self): def check_node_models(self):

View file

@ -165,16 +165,16 @@ class EmaneNode(EmaneNet):
nemid = self.getnemid(netif) nemid = self.getnemid(netif)
ifname = netif.localname ifname = netif.localname
if nemid is None: if nemid is None:
logging.info("nemid for %s is unknown" % ifname) logging.info("nemid for %s is unknown", ifname)
return return
lat, long, alt = self.session.location.getgeo(x, y, z) lat, lon, alt = self.session.location.getgeo(x, y, z)
logging.info("setnemposition %s (%s) x,y,z=(%d,%d,%s)(%.6f,%.6f,%.6f)", ifname, nemid, x, y, z, lat, long, alt) logging.info("setnemposition %s (%s) x,y,z=(%d,%d,%s)(%.6f,%.6f,%.6f)", ifname, nemid, x, y, z, lat, lon, alt)
event = LocationEvent() event = LocationEvent()
# altitude must be an integer or warning is printed # altitude must be an integer or warning is printed
# unused: yaw, pitch, roll, azimuth, elevation, velocity # unused: yaw, pitch, roll, azimuth, elevation, velocity
alt = int(round(alt)) alt = int(round(alt))
event.append(nemid, latitude=lat, longitude=long, altitude=alt) event.append(nemid, latitude=lat, longitude=lon, altitude=alt)
self.session.emane.service.publish(0, event) self.session.emane.service.publish(0, event)
def setnempositions(self, moved_netifs): def setnempositions(self, moved_netifs):

View file

@ -1498,7 +1498,7 @@ class Session(object):
break break
if not prefix: if not prefix:
logging.error("Control network prefix not found for server '%s'" % servers[0]) logging.error("control network prefix not found for server: %s", servers[0])
assign_address = False assign_address = False
try: try:
prefix = prefixes[0].split(':', 1)[1] prefix = prefixes[0].split(':', 1)[1]

View file

@ -108,7 +108,7 @@ def build_node_platform_xml(emane_manager, control_net, node, nem_id, platform_x
:return: the next nem id that can be used for creating platform xml files :return: the next nem id that can be used for creating platform xml files
:rtype: int :rtype: int
""" """
logging.debug("building emane platform xml for node(%s): %s", node, node.name) logging.debug("building emane platform xml for node(%s) nem_id(%s): %s", node, nem_id, node.name)
nem_entries = {} nem_entries = {}
if node.model is None: if node.model is None:
@ -116,6 +116,7 @@ def build_node_platform_xml(emane_manager, control_net, node, nem_id, platform_x
return nem_entries return nem_entries
for netif in node.netifs(): for netif in node.netifs():
logging.debug("building platform xml for interface(%s) nem_id(%s)", netif.name, nem_id)
# build nem xml # build nem xml
nem_definition = nem_file_name(node.model, netif) nem_definition = nem_file_name(node.model, netif)
nem_element = etree.Element("nem", id=str(nem_id), name=netif.localname, definition=nem_definition) nem_element = etree.Element("nem", id=str(nem_id), name=netif.localname, definition=nem_definition)