From 73eea80f51338ad41779c34ceeb23ae073797ca4 Mon Sep 17 00:00:00 2001 From: "Blake J. Harnden" Date: Mon, 7 Aug 2017 15:37:41 -0700 Subject: [PATCH] attempt to fix missing updates for sdt, moved broker specific code to broker --- daemon/core/broker.py | 80 +++++++++++++++++++--- daemon/core/conf.py | 2 - daemon/core/corehandlers.py | 7 +- daemon/core/coreobj.py | 14 ++-- daemon/core/emane/emanemanager.py | 17 ++--- daemon/core/mobility.py | 6 -- daemon/core/netns/nodes.py | 3 - daemon/core/sdt.py | 44 +++++++++++-- daemon/core/service.py | 3 +- daemon/core/session.py | 106 +++++------------------------- daemon/sbin/core-daemon | 3 +- 11 files changed, 146 insertions(+), 139 deletions(-) diff --git a/daemon/core/broker.py b/daemon/core/broker.py index 7507218a..294fcd65 100644 --- a/daemon/core/broker.py +++ b/daemon/core/broker.py @@ -34,11 +34,11 @@ from core.netns.vnet import GreTapBridge from core.phys.pnodes import PhysicalNode -# TODO: name conflict with main core server, probably should rename -class CoreServer(object): +class CoreDistributedServer(object): """ Represents CORE daemon servers for communication. """ + def __init__(self, name, host, port): """ Creates a CoreServer instance. @@ -121,6 +121,7 @@ class CoreBroker(ConfigurableManager): self.physical_nodes = set() # allows for other message handlers to process API messages (e.g. EMANE) self.handlers = set() + self.handlers.add(self.handle_distributed) # dict with tunnel key to tunnel device mapping self.tunnels = {} self.dorecvloop = False @@ -223,7 +224,7 @@ class CoreBroker(ConfigurableManager): and forwarded. Return value of zero indicates the socket has closed and should be removed from the self.servers dict. - :param CoreServer server: server to receive from + :param CoreDistributedServer server: server to receive from :return: message length :rtype: int """ @@ -302,7 +303,7 @@ class CoreBroker(ConfigurableManager): del self.servers[name] logger.info("adding server: %s @ %s:%s" % (name, host, port)) - server = CoreServer(name, host, port) + server = CoreDistributedServer(name, host, port) if host is not None and port is not None: try: server.connect() @@ -316,7 +317,7 @@ class CoreBroker(ConfigurableManager): """ Remove a server and hang up any connection. - :param CoreServer server: server to delete + :param CoreDistributedServer server: server to delete :return: nothing """ with self.servers_lock: @@ -336,7 +337,7 @@ class CoreBroker(ConfigurableManager): :param str name: name of server to retrieve :return: server for given name - :rtype: CoreServer + :rtype: CoreDistributedServer """ with self.servers_lock: return self.servers.get(name) @@ -347,7 +348,7 @@ class CoreBroker(ConfigurableManager): :param sock: socket associated with a server :return: core server associated wit the socket - :rtype: CoreServer + :rtype: CoreDistributedServer """ with self.servers_lock: for server in self.servers.itervalues(): @@ -540,7 +541,7 @@ class CoreBroker(ConfigurableManager): """ Record a node number to emulation server mapping. - :param CoreServer server: core server to associate node with + :param CoreDistributedServer server: core server to associate node with :param int nodenum: node id :return: nothing """ @@ -562,7 +563,7 @@ class CoreBroker(ConfigurableManager): Remove a node number to emulation server mapping. Return the number of nodes left on this server. - :param CoreServer server: server to remove from node map + :param CoreDistributedServer server: server to remove from node map :param int nodenum: node id :return: number of nodes left on server :rtype: int @@ -1056,7 +1057,7 @@ class CoreBroker(ConfigurableManager): VnodeClient class. :param str nodestr: node string - :param CoreServer server: core server + :param CoreDistributedServer server: core server :return: nothing """ serverstr = "%s %s %s" % (server.name, server.host, server.port) @@ -1107,3 +1108,62 @@ class CoreBroker(ConfigurableManager): if not server.instantiation_complete: return False return True + + def handle_distributed(self, message): + """ + Handle the session options config message as it has reached the + broker. Options requiring modification for distributed operation should + be handled here. + + :param message: message to handle + :return: nothing + """ + if not self.session.master: + return + + if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session": + return + + values_str = message.get_tlv(ConfigTlvs.VALUES.value) + if values_str is None: + return + + value_strings = values_str.split('|') + for value_string in value_strings: + key, value = value_string.split('=', 1) + if key == "controlnet": + self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string)) + + def handle_distributed_control_net(self, message, values, index): + """ + Modify Config Message if multiple control network prefixes are + defined. Map server names to prefixes and repack the message before + it is forwarded to slave servers. + + :param message: message to handle + :param list values: values to handle + :param int index: index ti get key value from + :return: nothing + """ + key_value = values[index] + key, value = key_value.split('=', 1) + control_nets = value.split() + + if len(control_nets) < 2: + logger.warn("multiple controlnet prefixes do not exist") + return + + servers = self.session.broker.getservernames() + if len(servers) < 2: + logger.warn("not distributed") + return + + servers.remove("localhost") + # master always gets first prefix + servers.insert(0, "localhost") + # create list of "server1:ctrlnet1 server2:ctrlnet2 ..." + control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets)) + values[index] = "controlnet=%s" % (" ".join(control_nets)) + values_str = "|".join(values) + message.tlvdata[ConfigTlvs.VALUES.value] = values_str + message.repack() diff --git a/daemon/core/conf.py b/daemon/core/conf.py index 8d195394..97337b82 100644 --- a/daemon/core/conf.py +++ b/daemon/core/conf.py @@ -301,7 +301,6 @@ class Configurable(object): :return: tuple of default values :rtype: tuple """ - # TODO: why the need for a tuple? return tuple(map(lambda x: x[2], cls.config_matrix)) @classmethod @@ -312,7 +311,6 @@ class Configurable(object): :return: tuple of name values :rtype: tuple """ - # TODO: why the need for a tuple? return tuple(map(lambda x: x[0], cls.config_matrix)) @classmethod diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py index 8b9957df..96af2f82 100644 --- a/daemon/core/corehandlers.py +++ b/daemon/core/corehandlers.py @@ -945,8 +945,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler): Remove a link. """ if node1 and node2: - # TODO: fix this for the case where ifindex[1,2] are - # not specified + # TODO: fix this for the case where ifindex[1,2] are not specified # a wired unlink event, delete the connecting bridge netif1 = node1.netif(interface_index1) netif2 = node2.netif(interface_index2) @@ -1211,7 +1210,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler): # find the session containing this client and set the session to master for session in self.server.sessions.itervalues(): if self in session.broker.session_clients: - logger.info("SESSION SET TO MASTER!: %s", session.session_id) + logger.info("setting session to master: %s", session.session_id) session.master = True break @@ -1746,8 +1745,6 @@ class BaseAuxRequestHandler(CoreRequestHandler): messages = self.receive_message() if messages: for message in messages: - # TODO: do we really want to broadcast node and link messages from a client to other clients? - # self.session.broadcast(self, message) self.handle_message(message) except EOFError: break diff --git a/daemon/core/coreobj.py b/daemon/core/coreobj.py index c85b56a1..05a99219 100644 --- a/daemon/core/coreobj.py +++ b/daemon/core/coreobj.py @@ -190,18 +190,21 @@ class PyCoreObj(object): self.ifindex += 1 return ifindex - def data(self, message_type): + def data(self, message_type, lat=None, lon=None, alt=None): """ Build a data object for this node. :param message_type: purpose for the data object we are creating + :param float lat: latitude + :param float lon: longitude + :param float alt: altitude :return: node data object :rtype: core.data.NodeData """ if self.apitype is None: return None - x, y, z = self.getposition() + x, y, _ = self.getposition() model = None if hasattr(self, "type"): @@ -229,6 +232,9 @@ class PyCoreObj(object): opaque=self.opaque, x_position=x, y_position=y, + latitude=lat, + longitude=lon, + altitude=alt, model=model, emulation_server=emulation_server, services=services @@ -254,7 +260,6 @@ class PyCoreNode(PyCoreObj): Base class for CORE nodes. """ - # TODO: start seems like it should go away def __init__(self, session, objid=None, name=None, start=True): """ Create a PyCoreNode instance. @@ -414,7 +419,6 @@ class PyCoreNet(PyCoreObj): """ linktype = LinkTypes.WIRED.value - # TODO: remove start if appropriate def __init__(self, session, objid, name, start=True): """ Create a PyCoreNet instance. @@ -502,8 +506,6 @@ class PyCoreNet(PyCoreObj): interface2_ip6 = ipaddress.IpAddress(af=family, address=ipl) interface2_ip6_mask = mask - # TODO: not currently used - # loss = netif.getparam('loss') link_data = LinkData( message_type=flags, node1_id=self.objid, diff --git a/daemon/core/emane/emanemanager.py b/daemon/core/emane/emanemanager.py index 335c0db1..729815b9 100644 --- a/daemon/core/emane/emanemanager.py +++ b/daemon/core/emane/emanemanager.py @@ -1094,25 +1094,25 @@ class EmaneManager(ConfigurableManager): alt = attrs["altitude"] self.handlelocationeventtoxyz(txnemid, lat, long, alt) - def handlelocationeventtoxyz(self, nemid, lat, long, alt): + def handlelocationeventtoxyz(self, nemid, lat, lon, alt): """ Convert the (NEM ID, lat, long, alt) from a received location event into a node and x,y,z coordinate values, sending a Node Message. Returns True if successfully parsed and a Node Message was sent. """ # convert nemid to node number - (emanenode, netif) = self.nemlookup(nemid) + emanenode, netif = self.nemlookup(nemid) if netif is None: - logger.info("location event for unknown NEM %s" % nemid) + logger.info("location event for unknown NEM %s", nemid) return False n = netif.node.objid # convert from lat/long/alt to x,y,z coordinates - x, y, z = self.session.location.getxyz(lat, long, alt) + x, y, z = self.session.location.getxyz(lat, lon, alt) x = int(x) y = int(y) z = int(z) logger.info("location event NEM %s (%s, %s, %s) -> (%s, %s, %s)", - nemid, lat, long, alt, x, y, z) + nemid, lat, lon, alt, x, y, z) try: if (x.bit_length() > 16) or (y.bit_length() > 16) or \ (z.bit_length() > 16) or (x < 0) or (y < 0) or (z < 0): @@ -1123,7 +1123,7 @@ class EmaneManager(ConfigurableManager): return False except AttributeError: # int.bit_length() not present on Python 2.6 - logger.exception("error wusing bit_length") + logger.exception("error using bit_length") # generate a node message for this location update try: @@ -1134,12 +1134,9 @@ class EmaneManager(ConfigurableManager): # don"t use node.setposition(x,y,z) which generates an event node.position.set(x, y, z) - node_data = node.data(message_type=0) + node_data = node.data(message_type=0, lat=lat, lon=lon, alt=alt) self.session.broadcast_node(node_data) - # TODO: determinehow to add SDT handlers - # self.session.sdt.updatenodegeo(node.objid, lat, long, alt) - return True def emanerunning(self, node): diff --git a/daemon/core/mobility.py b/daemon/core/mobility.py index e3075055..c32b68b0 100644 --- a/daemon/core/mobility.py +++ b/daemon/core/mobility.py @@ -607,9 +607,6 @@ class BasicRangeModel(WirelessModel): link_data = self.create_link_data(netif, netif2, message_type) self.session.broadcast_link(link_data) - # TODO: account for SDT wanting to listen as well - # self.session.sdt.updatelink(netif.node.objid, netif2.node.objid, flags, wireless=True) - def all_link_data(self, flags): """ Return a list of wireless link messages for when the GUI reconnects. @@ -913,9 +910,6 @@ class WayPointMobility(WirelessModel): node_data = node.data(message_type=0) self.session.broadcast_node(node_data) - # TODO: determine how to add handler for SDT - # self.session.sdt.updatenode(node.objid, flags=0, x=x, y=y, z=z) - def setendtime(self): """ Set self.endtime to the time of the last waypoint in the queue of diff --git a/daemon/core/netns/nodes.py b/daemon/core/netns/nodes.py index 202c0622..d678c930 100644 --- a/daemon/core/netns/nodes.py +++ b/daemon/core/netns/nodes.py @@ -249,8 +249,6 @@ class PtpNet(LxBrNet): interface2_ip6 = ipaddress.IpAddress(af=family, address=ipl) interface2_ip6_mask = mask - # TODO: not currently used - # loss=netif.getparam("loss") link_data = LinkData( message_type=flags, node1_id=if1.node.objid, @@ -510,7 +508,6 @@ class RJ45Node(PyCoreNode, PyCoreNetIf): """ PyCoreNetIf.detachnet(self) - # TODO: parameters are not used def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None): """ This is called when linking with another node. Since this node diff --git a/daemon/core/sdt.py b/daemon/core/sdt.py index 1e14c110..75d20f93 100644 --- a/daemon/core/sdt.py +++ b/daemon/core/sdt.py @@ -71,6 +71,42 @@ class Sdt(object): self.remotes = {} session.broker.handlers.add(self.handle_distributed) + # add handler for node updates + self.session.node_handlers.append(self.handle_node_update) + + # add handler for link updates + self.session.link_handlers.append(self.handle_link_update) + + def handle_node_update(self, node_data): + """ + Handler for node updates, specifically for updating their location. + + :param core.data.NodeData node_data: node data being updated + :return: nothing + """ + x = node_data.x_position + y = node_data.y_position + lat = node_data.latitude + lon = node_data.longitude + alt = node_data.altitude + + if all([lat, lon, alt]): + self.updatenodegeo(node_data.id, node_data.latitude, node_data.longitude, node_data.altitude) + + if node_data.message_type == 0: + # TODO: z is not currently supported by node messages + self.updatenode(node_data.id, 0, x, y, 0) + + def handle_link_update(self, link_data): + """ + Handler for link updates, checking for wireless link/unlink messages. + + :param core.data.LinkData link_data: link data being updated + :return: nothing + """ + if link_data.link_type == LinkTypes.WIRELESS.value: + self.updatelink(link_data.node1_id, link_data.node2_id, link_data.message_type, wireless=True) + def is_enabled(self): """ Check for "enablesdt" session option. Return False by default if @@ -180,6 +216,7 @@ class Sdt(object): :return: nothing """ + logger.info("SDT shutdown!") self.cmd("clear all") self.disconnect() self.showerror = True @@ -227,8 +264,8 @@ class Sdt(object): return if x is None or y is None: return - lat, long, alt = self.session.location.getgeo(x, y, z) - pos = "pos %.6f,%.6f,%.6f" % (long, lat, alt) + lat, lon, alt = self.session.location.getgeo(x, y, z) + pos = "pos %.6f,%.6f,%.6f" % (lon, lat, alt) if flags & MessageFlags.ADD.value: if icon is not None: type = name @@ -327,7 +364,6 @@ class Sdt(object): for n2num, wl in r.links: self.updatelink(n1num, n2num, MessageFlags.ADD.value, wl) - # TODO: remove the need for this def handle_distributed(self, message): """ Broker handler for processing CORE API messages as they are @@ -342,7 +378,6 @@ class Sdt(object): elif message.message_type == MessageTypes.NODE.value: return self.handlenodemsg(message) - # TODO: remove the need for this def handlenodemsg(self, msg): """ Process a Node Message to add/delete or move a node on @@ -403,7 +438,6 @@ class Sdt(object): remote.pos = (x, y, z) self.updatenode(nodenum, msg.flags, x, y, z, name, type, icon) - # TODO: remove the need for this def handlelinkmsg(self, msg): """ Process a Link Message to add/remove links on the SDT display. diff --git a/daemon/core/service.py b/daemon/core/service.py index f8ab6251..1f7f513f 100644 --- a/daemon/core/service.py +++ b/daemon/core/service.py @@ -867,7 +867,8 @@ class CoreServices(ConfigurableManager): if len(cfgfiles) > 0: for filename in cfgfiles: if filename[:7] == "file:///": - raise NotImplementedError # TODO + # TODO: implement this + raise NotImplementedError cfg = self.getservicefiledata(s, filename) if cfg is None: cfg = s.generateconfig(node, filename, services) diff --git a/daemon/core/session.py b/daemon/core/session.py index b9ac4e12..175e00c3 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -27,13 +27,11 @@ from core.data import EventData from core.data import ExceptionData from core.data import FileData from core.emane.emanemanager import EmaneManager -from core.enumerations import ConfigDataTypes, EventTlvs +from core.enumerations import ConfigDataTypes from core.enumerations import ConfigFlags -from core.enumerations import ConfigTlvs from core.enumerations import EventTypes from core.enumerations import ExceptionLevels from core.enumerations import MessageFlags -from core.enumerations import MessageTypes from core.enumerations import NodeTypes from core.enumerations import RegisterTlvs from core.location import CoreLocation @@ -156,6 +154,15 @@ class Session(object): self.master = False + # handlers for broadcasting information + self.event_handlers = [] + self.exception_handlers = [] + self.node_handlers = [] + self.link_handlers = [] + self.file_handlers = [] + self.config_handlers = [] + self.shutdown_handlers = [] + # setup broker self.broker = CoreBroker(session=self) self.add_config_object(CoreBroker.name, CoreBroker.config_type, self.broker.configure) @@ -192,15 +199,6 @@ class Session(object): self.metadata = SessionMetaData() self.add_config_object(SessionMetaData.name, SessionMetaData.config_type, self.metadata.configure) - # handlers for broadcasting information - self.event_handlers = [] - self.exception_handlers = [] - self.node_handlers = [] - self.link_handlers = [] - self.file_handlers = [] - self.config_handlers = [] - self.shutdown_handlers = [] - def shutdown(self): """ Shutdown all emulation objects and remove the session directory. @@ -326,10 +324,6 @@ class Session(object): event_data = EventData(event_type=state, time="%s" % time.time()) self.broadcast_event(event_data) - # also inform slave servers - # TODO: deal with broker, potentially broker should really live within the core server/handlers - # self.broker.handlerawmsg(message) - def write_state(self, state): """ Write the current state to a state file in the session dir. @@ -915,8 +909,7 @@ class Session(object): if node_count == 0: shutdown = True self.set_state(state=EventTypes.SHUTDOWN_STATE.value) - # TODO: this seems redundant as it occurs during shutdown as well - self.sdt.shutdown() + return shutdown def short_session_id(self): @@ -935,16 +928,13 @@ class Session(object): """ with self._objects_lock: for obj in self.objects.itervalues(): - # TODO: determine instance type we need to check, due to method issue below + # TODO: PyCoreNode is not the type to check, but there are two types, due to bsd and netns if isinstance(obj, nodes.PyCoreNode) and not nodeutils.is_node(obj, NodeTypes.RJ45): # add a control interface if configured logger.info("booting node: %s - %s", obj.objid, obj.name) self.add_remove_control_interface(node=obj, remove=False) obj.boot() - # TODO(blake): send node emu ids back - # self.sendnodeemuid(obj.objid) - self.update_control_interface_hosts() def validate_nodes(self): @@ -955,7 +945,7 @@ class Session(object): """ with self._objects_lock: for obj in self.objects.itervalues(): - # TODO: this can be extended to validate everything, bad node check here as well + # TODO: issues with checking PyCoreNode alone, validate is not a method # such as vnoded process, bridges, etc. if not isinstance(obj, nodes.PyCoreNode): continue @@ -1015,7 +1005,7 @@ class Session(object): return -1 def get_control_net_object(self, net_index): - # TODO: all nodes use an integer id and now this wants to use a string =( + # TODO: all nodes use an integer id and now this wants to use a string object_id = "ctrl%dnet" % net_index return self.get_object(object_id) @@ -1123,7 +1113,8 @@ class Session(object): updown_script=updown_script, serverintf=server_interface) # tunnels between controlnets will be built with Broker.addnettunnels() - # TODO: potentialy remove documentation saying object ids are ints + # TODO: potentially remove documentation saying object ids are ints + # TODO: need to move broker code out of the session object self.broker.addnet(object_id) for server in self.broker.getservers(): self.broker.addnodemap(server, object_id) @@ -1377,7 +1368,6 @@ class SessionConfig(ConfigurableManager, Configurable): """ ConfigurableManager.__init__(self) self.session = session - self.session.broker.handlers.add(self.handle_distributed) self.reset() def reset(self): @@ -1423,70 +1413,6 @@ class SessionConfig(ConfigurableManager, Configurable): return self.config_data(0, node_id, type_flags, values) - # TODO: update logic to not be tied to old style messages - def handle_distributed(self, message): - """ - Handle the session options config message as it has reached the - broker. Options requiring modification for distributed operation should - be handled here. - - :param message: message to handle - :return: nothing - """ - if not self.session.master: - return - - if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session": - return - - values_str = message.get_tlv(ConfigTlvs.VALUES.value) - if values_str is None: - return - - value_strings = values_str.split('|') - if not self.haskeyvalues(value_strings): - return - - for value_string in value_strings: - key, value = value_string.split('=', 1) - if key == "controlnet": - self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string)) - - # TODO: update logic to not be tied to old style messages - def handle_distributed_control_net(self, message, values, index): - """ - Modify Config Message if multiple control network prefixes are - defined. Map server names to prefixes and repack the message before - it is forwarded to slave servers. - - :param message: message to handle - :param list values: values to handle - :param int index: index ti get key value from - :return: nothing - """ - key_value = values[index] - key, value = key_value.split('=', 1) - control_nets = value.split() - - if len(control_nets) < 2: - logger.warn("multiple controlnet prefixes do not exist") - return - - servers = self.session.broker.getservernames() - if len(servers) < 2: - logger.warn("not distributed") - return - - servers.remove("localhost") - # master always gets first prefix - servers.insert(0, "localhost") - # create list of "server1:ctrlnet1 server2:ctrlnet2 ..." - control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets)) - values[index] = "controlnet=%s" % (" ".join(control_nets)) - values_str = "|".join(values) - message.tlvdata[ConfigTlvs.VALUES.value] = values_str - message.repack() - class SessionMetaData(ConfigurableManager): """ diff --git a/daemon/sbin/core-daemon b/daemon/sbin/core-daemon index ab492f13..78a4cc57 100755 --- a/daemon/sbin/core-daemon +++ b/daemon/sbin/core-daemon @@ -296,7 +296,8 @@ def exec_file(cfg): tlvdata = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, filename) msg = coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlvdata) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.connect(("localhost", int(cfg["port"]))) # TODO: connect address option + # TODO: connect address option + sock.connect(("localhost", int(cfg["port"]))) sock.sendall(msg) return 0