From b2d2705849cb2f547586214790495ccb4bc19996 Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Tue, 15 Oct 2019 14:13:42 -0700 Subject: [PATCH] removed broker from session, updated most places using broker to use alternative logic to compensate where needed --- daemon/core/__init__.py | 3 + daemon/core/api/tlv/coreapi.py | 18 -- daemon/core/api/tlv/corehandlers.py | 72 +++---- daemon/core/emane/emanemanager.py | 182 ++---------------- daemon/core/emulator/distributed.py | 8 +- daemon/core/emulator/session.py | 111 ++++------- daemon/core/location/mobility.py | 93 --------- daemon/core/nodes/base.py | 12 +- daemon/core/nodes/network.py | 4 +- daemon/core/plugins/sdt.py | 2 +- daemon/core/xml/corexmldeployment.py | 4 - daemon/core/xml/emanexml.py | 24 +-- daemon/examples/python/distributed.py | 16 +- daemon/examples/python/distributed_emane.py | 26 +-- daemon/examples/python/distributed_ptp.py | 13 +- .../examples/python/distributed_switches.py | 11 +- daemon/examples/python/distributed_wlan.py | 16 +- daemon/tests/conftest.py | 1 - daemon/tests/test_gui.py | 6 +- 19 files changed, 151 insertions(+), 471 deletions(-) diff --git a/daemon/core/__init__.py b/daemon/core/__init__.py index c847c8dc..40ca3604 100644 --- a/daemon/core/__init__.py +++ b/daemon/core/__init__.py @@ -2,3 +2,6 @@ import logging.config # setup default null handler logging.getLogger(__name__).addHandler(logging.NullHandler()) + +# disable paramiko logging +logging.getLogger("paramiko").setLevel(logging.WARNING) diff --git a/daemon/core/api/tlv/coreapi.py b/daemon/core/api/tlv/coreapi.py index 63747642..1e1de8be 100644 --- a/daemon/core/api/tlv/coreapi.py +++ b/daemon/core/api/tlv/coreapi.py @@ -15,7 +15,6 @@ from core.api.tlv import structutils from core.emulator.enumerations import ( ConfigTlvs, EventTlvs, - EventTypes, ExceptionTlvs, ExecuteTlvs, FileTlvs, @@ -1017,20 +1016,3 @@ def str_to_list(value): return None return value.split("|") - - -def state_name(value): - """ - Helper to convert state number into state name using event types. - - :param int value: state value to derive name from - :return: state name - :rtype: str - """ - - try: - value = EventTypes(value).name - except ValueError: - value = "unknown" - - return value diff --git a/daemon/core/api/tlv/corehandlers.py b/daemon/core/api/tlv/corehandlers.py index 6ff7f55b..4b4e7c1e 100644 --- a/daemon/core/api/tlv/corehandlers.py +++ b/daemon/core/api/tlv/corehandlers.py @@ -86,6 +86,7 @@ class CoreHandler(socketserver.BaseRequestHandler): self.master = False self.session = None + self.session_clients = {} # core emulator self.coreemu = server.coreemu @@ -138,8 +139,9 @@ class CoreHandler(socketserver.BaseRequestHandler): if self.session: # remove client from session broker and shutdown if there are no clients self.remove_session_handlers() - self.session.broker.session_clients.remove(self) - if not self.session.broker.session_clients and not self.session.is_active(): + clients = self.session_clients[self.session.id] + clients.remove(self) + if not clients and not self.session.is_active(): logging.info( "no session clients left and not active, initiating shutdown" ) @@ -407,9 +409,7 @@ class CoreHandler(socketserver.BaseRequestHandler): tlv_data += coreapi.CoreRegisterTlv.pack( RegisterTlvs.EMULATION_SERVER.value, "core-daemon" ) - tlv_data += coreapi.CoreRegisterTlv.pack( - self.session.broker.config_type, self.session.broker.name - ) + tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.UTILITY.value, "broker") tlv_data += coreapi.CoreRegisterTlv.pack( self.session.location.config_type, self.session.location.name ) @@ -533,10 +533,6 @@ class CoreHandler(socketserver.BaseRequestHandler): :param message: message to handle :return: nothing """ - if self.session and self.session.broker.handle_message(message): - logging.debug("message not being handled locally") - return - logging.debug( "%s handling message:\n%s", threading.currentThread().getName(), message ) @@ -606,12 +602,11 @@ class CoreHandler(socketserver.BaseRequestHandler): self.session = self.coreemu.create_session(port, master=False) logging.debug("created new session for client: %s", self.session.id) - # TODO: hack to associate this handler with this sessions broker for broadcasting - # TODO: broker needs to be pulled out of session to the server/handler level if self.master: logging.debug("session set to master") self.session.master = True - self.session.broker.session_clients.append(self) + clients = self.session_clients.setdefault(self.session.id, []) + clients.append(self) # add handlers for various data self.add_session_handlers() @@ -643,7 +638,8 @@ class CoreHandler(socketserver.BaseRequestHandler): ]: continue - for client in self.session.broker.session_clients: + clients = self.session_clients[self.session.id] + for client in clients: if client == self: continue @@ -734,6 +730,7 @@ class CoreHandler(socketserver.BaseRequestHandler): node_options.icon = message.get_tlv(NodeTlvs.ICON.value) node_options.canvas = message.get_tlv(NodeTlvs.CANVAS.value) node_options.opaque = message.get_tlv(NodeTlvs.OPAQUE.value) + node_options.emulation_server = message.get_tlv(NodeTlvs.EMULATION_SERVER.value) services = message.get_tlv(NodeTlvs.SERVICES.value) if services: @@ -1027,8 +1024,9 @@ class CoreHandler(socketserver.BaseRequestHandler): # find the session containing this client and set the session to master for _id in self.coreemu.sessions: - session = self.coreemu.sessions[_id] - if self in session.broker.session_clients: + clients = self.session_clients[_id] + if self in clients: + session = self.coreemu.sessions[_id] logging.debug("setting session to master: %s", session.id) session.master = True break @@ -1077,7 +1075,7 @@ class CoreHandler(socketserver.BaseRequestHandler): self.handle_config_location(message_type, config_data) elif config_data.object == self.session.metadata.name: replies = self.handle_config_metadata(message_type, config_data) - elif config_data.object == self.session.broker.name: + elif config_data.object == "broker": self.handle_config_broker(message_type, config_data) elif config_data.object == self.session.services.name: replies = self.handle_config_services(message_type, config_data) @@ -1182,7 +1180,6 @@ class CoreHandler(socketserver.BaseRequestHandler): def handle_config_broker(self, message_type, config_data): if message_type not in [ConfigFlags.REQUEST, ConfigFlags.RESET]: - session_id = config_data.session if not config_data.data_values: logging.info("emulation server data missing") else: @@ -1194,29 +1191,10 @@ class CoreHandler(socketserver.BaseRequestHandler): for server in server_list: server_items = server.split(":") - name, host, port = server_items[:3] - - if host == "": - host = None - - if port == "": - port = None - else: - port = int(port) - - if session_id is not None: - # receive session ID and my IP from master - self.session.broker.session_id_master = int( - session_id.split("|")[0] - ) - self.session.broker.myip = host - host = None - port = None - - # this connects to the server immediately; maybe we should wait - # or spin off a new "client" thread here - self.session.broker.addserver(name, host, port) - self.session.broker.setupserver(name) + name, host, _ = server_items[:3] + self.session.add_distributed(name, host) + elif message_type == ConfigFlags.RESET: + self.session.shutdown_distributed() def handle_config_services(self, message_type, config_data): replies = [] @@ -1842,11 +1820,9 @@ class CoreHandler(socketserver.BaseRequestHandler): # remove client from session broker and shutdown if needed self.remove_session_handlers() - self.session.broker.session_clients.remove(self) - if ( - not self.session.broker.session_clients - and not self.session.is_active() - ): + clients = self.session_clients[self.session.id] + clients.remove(self) + if not clients and not self.session.is_active(): self.coreemu.delete_session(self.session.id) # set session to join @@ -1855,7 +1831,8 @@ class CoreHandler(socketserver.BaseRequestHandler): # add client to session broker and set master if needed if self.master: self.session.master = True - self.session.broker.session_clients.append(self) + clients = self.session_clients.setdefault(self.session.id, []) + clients.append(self) # add broadcast handlers logging.info("adding session broadcast handlers") @@ -2139,7 +2116,8 @@ class CoreUdpHandler(CoreHandler): if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)): return - for client in self.session.broker.session_clients: + clients = self.session_clients[self.session.id] + for client in clients: try: client.sendall(message.raw_message) except IOError: diff --git a/daemon/core/emane/emanemanager.py b/daemon/core/emane/emanemanager.py index f48d2e2e..e4208189 100644 --- a/daemon/core/emane/emanemanager.py +++ b/daemon/core/emane/emanemanager.py @@ -2,14 +2,12 @@ emane.py: definition of an Emane class for implementing configuration control of an EMANE emulation. """ -import copy import logging import os import threading from core import utils -from core.api.tlv import coreapi, dataconversion -from core.config import ConfigGroup, ConfigShim, Configuration, ModelManager +from core.config import ConfigGroup, Configuration, ModelManager from core.emane import emanemanifest from core.emane.bypass import EmaneBypassModel from core.emane.commeffect import EmaneCommEffectModel @@ -18,14 +16,7 @@ from core.emane.ieee80211abg import EmaneIeee80211abgModel from core.emane.nodes import EmaneNet from core.emane.rfpipe import EmaneRfPipeModel from core.emane.tdma import EmaneTdmaModel -from core.emulator.enumerations import ( - ConfigDataTypes, - ConfigFlags, - ConfigTlvs, - MessageFlags, - MessageTypes, - RegisterTlvs, -) +from core.emulator.enumerations import ConfigDataTypes, RegisterTlvs from core.errors import CoreCommandError, CoreError from core.xml import emanexml @@ -75,8 +66,6 @@ class EmaneManager(ModelManager): self.session = session self._emane_nets = {} self._emane_node_lock = threading.Lock() - self._ifccounts = {} - self._ifccountslock = threading.Lock() # port numbers are allocated from these counters self.platformport = self.session.options.get_config_int( "emane_platform_port", 8100 @@ -91,7 +80,6 @@ class EmaneManager(ModelManager): self.emane_config = EmaneGlobalModel(session) self.set_configs(self.emane_config.default_values()) - session.broker.handlers.add(self.handledistributed) self.service = None self.event_device = None self.emane_check() @@ -154,8 +142,8 @@ class EmaneManager(ModelManager): args = "emane --version" emane_version = utils.check_cmd(args) logging.info("using EMANE: %s", emane_version) - for host in self.session.servers: - server = self.session.servers[host] + for name in self.session.servers: + server = self.session.servers[name] server.remote_cmd(args) # load default emane models @@ -282,7 +270,6 @@ class EmaneManager(ModelManager): return EmaneManager.NOT_NEEDED # control network bridge required for EMANE 0.9.2 - # - needs to be configured before checkdistributed() for distributed # - needs to exist when eventservice binds to it (initeventservice) if self.session.master: otadev = self.get_config("otamanagerdevice") @@ -297,10 +284,9 @@ class EmaneManager(ModelManager): ) return EmaneManager.NOT_READY - ctrlnet = self.session.add_remove_control_net( + self.session.add_remove_control_net( net_index=netidx, remove=False, conf_required=False ) - self.distributedctrlnet(ctrlnet) eventdev = self.get_config("eventservicedevice") logging.debug("emane event service device: eventdev(%s)", eventdev) if eventdev != otadev: @@ -313,18 +299,9 @@ class EmaneManager(ModelManager): ) return EmaneManager.NOT_READY - ctrlnet = self.session.add_remove_control_net( + self.session.add_remove_control_net( net_index=netidx, remove=False, conf_required=False ) - self.distributedctrlnet(ctrlnet) - - if self.checkdistributed(): - # we are slave, but haven't received a platformid yet - platform_id_start = "platform_id_start" - default_values = self.emane_config.default_values() - value = self.get_config(platform_id_start) - if value == default_values[platform_id_start]: - return EmaneManager.NOT_READY self.check_node_models() return EmaneManager.SUCCESS @@ -413,9 +390,6 @@ class EmaneManager(ModelManager): """ stop all EMANE daemons """ - with self._ifccountslock: - self._ifccounts.clear() - with self._emane_node_lock: if not self._emane_nets: return @@ -424,92 +398,6 @@ class EmaneManager(ModelManager): self.stopdaemons() self.stopeventmonitor() - def handledistributed(self, message): - """ - Broker handler for processing CORE API messages as they are - received. This is used to snoop the Link add messages to get NEM - counts of NEMs that exist on other servers. - """ - if ( - message.message_type == MessageTypes.LINK.value - and message.flags & MessageFlags.ADD.value - ): - nn = message.node_numbers() - # first node is always link layer node in Link add message - if nn[0] in self.session.broker.network_nodes: - serverlist = self.session.broker.getserversbynode(nn[1]) - for server in serverlist: - with self._ifccountslock: - if server not in self._ifccounts: - self._ifccounts[server] = 1 - else: - self._ifccounts[server] += 1 - - def checkdistributed(self): - """ - Check for EMANE nodes that exist on multiple emulation servers and - coordinate the NEM id and port number space. - If we are the master EMANE node, return False so initialization will - proceed as normal; otherwise slaves return True here and - initialization is deferred. - """ - # check with the session if we are the "master" Emane object? - master = False - - with self._emane_node_lock: - if self._emane_nets: - master = self.session.master - logging.info("emane check distributed as master: %s.", master) - - # we are not the master Emane object, wait for nem id and ports - if not master: - return True - - nemcount = 0 - with self._emane_node_lock: - for key in self._emane_nets: - emane_node = self._emane_nets[key] - nemcount += emane_node.numnetif() - - nemid = int(self.get_config("nem_id_start")) - nemid += nemcount - - platformid = int(self.get_config("platform_id_start")) - - # build an ordered list of servers so platform ID is deterministic - servers = [] - for key in sorted(self._emane_nets): - for server in self.session.broker.getserversbynode(key): - if server not in servers: - servers.append(server) - - servers.sort(key=lambda x: x.name) - for server in servers: - if server.name == "localhost": - continue - - if server.sock is None: - continue - - platformid += 1 - - # create temporary config for updating distributed nodes - typeflags = ConfigFlags.UPDATE.value - config = copy.deepcopy(self.get_configs()) - config["platform_id_start"] = str(platformid) - config["nem_id_start"] = str(nemid) - config_data = ConfigShim.config_data( - 0, None, typeflags, self.emane_config, config - ) - message = dataconversion.convert_config(config_data) - server.sock.send(message) - # increment nemid for next server by number of interfaces - with self._ifccountslock: - if server in self._ifccounts: - nemid += self._ifccounts[server] - - return False - def buildxml(self): """ Build XML files required to run EMANE on each node. @@ -526,52 +414,6 @@ class EmaneManager(ModelManager): self.buildnemxml() self.buildeventservicexml() - # TODO: remove need for tlv messaging - def distributedctrlnet(self, ctrlnet): - """ - Distributed EMANE requires multiple control network prefixes to - be configured. This generates configuration for slave control nets - using the default list of prefixes. - """ - # slave server - session = self.session - if not session.master: - return - - # not distributed - servers = session.broker.getservernames() - if len(servers) < 2: - return - - # normal Config messaging will distribute controlnets - prefix = session.options.get_config("controlnet", default="") - prefixes = prefix.split() - if len(prefixes) < len(servers): - logging.info( - "setting up default controlnet prefixes for distributed (%d configured)", - len(prefixes), - ) - prefix = ctrlnet.DEFAULT_PREFIX_LIST[0] - prefixes = prefix.split() - servers.remove("localhost") - servers.insert(0, "localhost") - prefix = " ".join("%s:%s" % (s, prefixes[i]) for i, s in enumerate(servers)) - - # this generates a config message having controlnet prefix assignments - logging.info("setting up controlnet prefixes for distributed: %s", prefix) - vals = "controlnet=%s" % prefix - tlvdata = b"" - tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "session") - tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, 0) - tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.VALUES.value, vals) - rawmsg = coreapi.CoreConfMessage.pack(0, tlvdata) - msghdr = rawmsg[: coreapi.CoreMessage.header_len] - msg = coreapi.CoreConfMessage( - flags=0, hdr=msghdr, data=rawmsg[coreapi.CoreMessage.header_len :] - ) - logging.debug("sending controlnet message:\n%s", msg) - self.session.broker.handle_message(msg) - def check_node_models(self): """ Associate EMANE model classes with EMANE network nodes. @@ -676,8 +518,8 @@ class EmaneManager(ModelManager): dev = self.get_config("eventservicedevice") emanexml.create_event_service_xml(group, port, dev, self.session.session_dir) - for server in self.session.servers: - conn = self.session.servers[server] + for name in self.session.servers: + conn = self.session.servers[name] emanexml.create_event_service_xml( group, port, dev, self.session.session_dir, conn ) @@ -756,8 +598,8 @@ class EmaneManager(ModelManager): emanecmd += " -f %s" % os.path.join(path, "emane.log") emanecmd += " %s" % os.path.join(path, "platform.xml") utils.check_cmd(emanecmd, cwd=path) - for host in self.session.servers: - server = self.session.servers[host] + for name in self.session.servers: + server = self.session.servers[name] server.remote_cmd(emanecmd, cwd=path) logging.info("host emane daemon running: %s", emanecmd) @@ -783,8 +625,8 @@ class EmaneManager(ModelManager): try: utils.check_cmd(kill_emaned) utils.check_cmd(kill_transortd) - for host in self.session.servers: - server = self.session[host] + for name in self.session.servers: + server = self.session[name] server.remote_cmd(kill_emaned) server.remote_cmd(kill_transortd) except CoreCommandError: diff --git a/daemon/core/emulator/distributed.py b/daemon/core/emulator/distributed.py index 19594ae1..4e258937 100644 --- a/daemon/core/emulator/distributed.py +++ b/daemon/core/emulator/distributed.py @@ -20,12 +20,14 @@ class DistributedServer(object): Provides distributed server interactions. """ - def __init__(self, host): + def __init__(self, name, host): """ Create a DistributedServer instance. + :param str name: convenience name to associate with host :param str host: host to connect to """ + self.name = name self.host = host self.conn = Connection(host, user="root") self.lock = threading.Lock() @@ -36,8 +38,8 @@ class DistributedServer(object): :param str cmd: command to run :param dict env: environment for remote command, default is None - :param str cwd: directory to run command in, defaults to None, which is the user's - home directory + :param str cwd: directory to run command in, defaults to None, which is the + user's home directory :param bool wait: True to wait for status, False to background process :return: stdout when success :rtype: str diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index a0b4ec38..1445cb8f 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -15,8 +15,6 @@ import time from multiprocessing.pool import ThreadPool from core import constants, utils -from core.api.tlv import coreapi -from core.api.tlv.broker import CoreBroker from core.emane.emanemanager import EmaneManager from core.emane.nodes import EmaneNet from core.emulator.data import EventData, ExceptionData, NodeData @@ -142,10 +140,9 @@ class Session(object): # distributed servers self.servers = {} self.tunnels = {} - self.address = None + self.address = self.options.get_config("distributed_address", default=None) # initialize session feature helpers - self.broker = CoreBroker(session=self) self.location = CoreLocation() self.mobility = MobilityManager(session=self) self.services = CoreServices(session=self) @@ -161,9 +158,9 @@ class Session(object): "host": ("DefaultRoute", "SSH"), } - def add_distributed(self, host): - server = DistributedServer(host) - self.servers[host] = server + def add_distributed(self, name, host): + server = DistributedServer(name, host) + self.servers[name] = server cmd = "mkdir -p %s" % self.session_dir server.remote_cmd(cmd) @@ -175,8 +172,8 @@ class Session(object): tunnel.shutdown() # remove all remote session directories - for host in self.servers: - server = self.servers[host] + for name in self.servers: + server = self.servers[name] cmd = "rm -rf %s" % self.session_dir server.remote_cmd(cmd) @@ -193,8 +190,9 @@ class Session(object): if isinstance(node, CtrlNet) and node.serverintf is not None: continue - for host in self.servers: - server = self.servers[host] + for name in self.servers: + server = self.servers[name] + host = server.host key = self.tunnelkey(node_id, IpAddress.to_int(host)) # local to server @@ -219,23 +217,35 @@ class Session(object): ) # save tunnels for shutdown - self.tunnels[key] = [local_tap, remote_tap] + self.tunnels[key] = (local_tap, remote_tap) - def tunnelkey(self, n1num, n2num): + def tunnelkey(self, n1_id, n2_id): """ Compute a 32-bit key used to uniquely identify a GRE tunnel. The hash(n1num), hash(n2num) values are used, so node numbers may be None or string values (used for e.g. "ctrlnet"). - :param int n1num: node one id - :param int n2num: node two id + :param int n1_id: node one id + :param int n2_id: node two id :return: tunnel key for the node pair :rtype: int """ - logging.debug("creating tunnel key for: %s, %s", n1num, n2num) - key = (self.id << 16) ^ utils.hashkey(n1num) ^ (utils.hashkey(n2num) << 8) + logging.debug("creating tunnel key for: %s, %s", n1_id, n2_id) + key = (self.id << 16) ^ utils.hashkey(n1_id) ^ (utils.hashkey(n2_id) << 8) return key & 0xFFFFFFFF + def gettunnel(self, n1_id, n2_id): + """ + Return the GreTap between two nodes if it exists. + + :param int n1_id: node one id + :param int n2_id: node two id + :return: gre tap between nodes or None + """ + key = self.tunnelkey(n1_id, n2_id) + logging.debug("checking for tunnel key(%s) in: %s", key, self.tunnels) + return self.tunnels.get(key) + @classmethod def get_node_class(cls, _type): """ @@ -285,7 +295,7 @@ class Session(object): node_two = self.get_node(node_two_id) # both node ids are provided - tunnel = self.broker.gettunnel(node_one_id, node_two_id) + tunnel = self.gettunnel(node_one_id, node_two_id) logging.debug("tunnel between nodes: %s", tunnel) if isinstance(tunnel, GreTapBridge): net_one = tunnel @@ -958,13 +968,13 @@ class Session(object): def clear(self): """ - Clear all CORE session data. (objects, hooks, broker) + Clear all CORE session data. (nodes, hooks, etc) :return: nothing """ self.delete_nodes() + self.shutdown_distributed() self.del_hooks() - self.broker.reset() self.emane.reset() def start_events(self): @@ -1038,17 +1048,16 @@ class Session(object): # shutdown/cleanup feature helpers self.emane.shutdown() - self.broker.shutdown() self.sdt.shutdown() - # delete all current nodes + # remove and shutdown all nodes and tunnels self.delete_nodes() + self.shutdown_distributed() # remove this sessions working directory preserve = self.options.get_config("preservedir") == "1" if not preserve: shutil.rmtree(self.session_dir, ignore_errors=True) - self.shutdown_distributed() # call session shutdown handlers for handler in self.shutdown_handlers: @@ -1160,7 +1169,7 @@ class Session(object): """ try: state_file = open(self._state_file, "w") - state_file.write("%d %s\n" % (state, coreapi.state_name(state))) + state_file.write("%d %s\n" % (state, EventTypes(self.state).name)) state_file.close() except IOError: logging.exception("error writing state file: %s", state) @@ -1278,7 +1287,7 @@ class Session(object): hook(state) except Exception: message = "exception occured when running %s state hook: %s" % ( - coreapi.state_name(state), + EventTypes(self.state).name, hook, ) logging.exception(message) @@ -1549,11 +1558,10 @@ class Session(object): # write current nodes out to session directory file self.write_nodes() - # create control net interfaces and broker network tunnels + # create control net interfaces and network tunnels # which need to exist for emane to sync on location events # in distributed scenarios self.add_remove_control_interface(node=None, remove=False) - self.broker.startup() # initialize distributed tunnels self.initialize_distributed() @@ -1566,9 +1574,6 @@ class Session(object): self.boot_nodes() self.mobility.startup() - # set broker local instantiation to complete - self.broker.local_instantiation_complete() - # notify listeners that instantiation is complete event = EventData(event_type=EventTypes.INSTANTIATION_COMPLETE.value) self.broadcast_event(event) @@ -1606,21 +1611,16 @@ class Session(object): have entered runtime (time=0). """ # this is called from instantiate() after receiving an event message - # for the instantiation state, and from the broker when distributed - # nodes have been started + # for the instantiation state logging.debug( "session(%s) checking if not in runtime state, current state: %s", self.id, - coreapi.state_name(self.state), + EventTypes(self.state).name, ) if self.state == EventTypes.RUNTIME_STATE.value: logging.info("valid runtime state found, returning") return - # check to verify that all nodes and networks are running - if not self.broker.instantiation_complete(): - return - # start event loop and set to runtime self.event_loop.run() self.set_state(EventTypes.RUNTIME_STATE, send_event=True) @@ -1830,37 +1830,11 @@ class Session(object): except IndexError: # no server name. possibly only one server prefix = prefixes[0] - else: - # slave servers have their name and localhost in the serverlist - servers = self.broker.getservernames() - servers.remove("localhost") - prefix = None - for server_prefix in prefixes: - try: - # split each entry into server and prefix - server, p = server_prefix.split(":") - except ValueError: - server = "" - p = None - - if server == servers[0]: - # the server name in the list matches this server - prefix = p - break - - if not prefix: - logging.error( - "control network prefix not found for server: %s", servers[0] - ) - assign_address = False - try: - prefix = prefixes[0].split(":", 1)[1] - except IndexError: - prefix = prefixes[0] # len(prefixes) == 1 else: - # TODO: can we get the server name from the servers.conf or from the node assignments? + # TODO: can we get the server name from the servers.conf or from the node + # assignments?o # with one prefix, only master gets a ctrlnet address assign_address = self.master prefix = prefixes[0] @@ -1882,13 +1856,6 @@ class Session(object): serverintf=server_interface, ) - # tunnels between controlnets will be built with Broker.addnettunnels() - # TODO: potentially remove documentation saying node ids are ints - # TODO: need to move broker code out of the session object - self.broker.addnet(_id) - for server in self.broker.getservers(): - self.broker.addnodemap(server, _id) - return control_net def add_remove_control_interface( diff --git a/daemon/core/location/mobility.py b/daemon/core/location/mobility.py index 2f323783..eae46ce4 100644 --- a/daemon/core/location/mobility.py +++ b/daemon/core/location/mobility.py @@ -19,13 +19,9 @@ from core.emulator.enumerations import ( EventTypes, LinkTypes, MessageFlags, - MessageTypes, - NodeTlvs, RegisterTlvs, ) from core.errors import CoreError -from core.nodes.base import CoreNodeBase -from core.nodes.ipaddress import IpAddress class MobilityManager(ModelManager): @@ -48,11 +44,6 @@ class MobilityManager(ModelManager): self.models[BasicRangeModel.name] = BasicRangeModel self.models[Ns2ScriptedMobility.name] = Ns2ScriptedMobility - # dummy node objects for tracking position of nodes on other servers - self.phys = {} - self.physnets = {} - self.session.broker.handlers.add(self.physnodehandlelink) - def reset(self): """ Clear out all current configurations. @@ -93,9 +84,6 @@ class MobilityManager(ModelManager): model_class = self.models[model_name] self.set_model(node, model_class, config) - if self.session.master: - self.installphysnodes(node) - if node.mobility: self.session.event_loop.add_event(0.0, node.mobility.startup) @@ -209,87 +197,6 @@ class MobilityManager(ModelManager): if node.model: node.model.update(moved, moved_netifs) - def addphys(self, netnum, node): - """ - Keep track of PhysicalNodes and which network they belong to. - - :param int netnum: network number - :param core.coreobj.PyCoreNode node: node to add physical network to - :return: nothing - """ - node_id = node.id - self.phys[node_id] = node - if netnum not in self.physnets: - self.physnets[netnum] = [node_id] - else: - self.physnets[netnum].append(node_id) - - # TODO: remove need for handling old style message - - def physnodehandlelink(self, message): - """ - Broker handler. Snoop Link add messages to get - node numbers of PhyiscalNodes and their nets. - Physical nodes exist only on other servers, but a shadow object is - created here for tracking node position. - - :param message: link message to handle - :return: nothing - """ - if ( - message.message_type == MessageTypes.LINK.value - and message.flags & MessageFlags.ADD.value - ): - nn = message.node_numbers() - # first node is always link layer node in Link add message - if nn[0] not in self.session.broker.network_nodes: - return - if nn[1] in self.session.broker.physical_nodes: - # record the fact that this PhysicalNode is linked to a net - dummy = CoreNodeBase( - session=self.session, _id=nn[1], name="n%d" % nn[1], start=False - ) - self.addphys(nn[0], dummy) - - # TODO: remove need to handling old style messages - def physnodeupdateposition(self, message): - """ - Snoop node messages belonging to physical nodes. The dummy object - in self.phys[] records the node position. - - :param message: message to handle - :return: nothing - """ - nodenum = message.node_numbers()[0] - try: - dummy = self.phys[nodenum] - nodexpos = message.get_tlv(NodeTlvs.X_POSITION.value) - nodeypos = message.get_tlv(NodeTlvs.Y_POSITION.value) - dummy.setposition(nodexpos, nodeypos, None) - except KeyError: - logging.exception("error retrieving physical node: %s", nodenum) - - def installphysnodes(self, net): - """ - After installing a mobility model on a net, include any physical - nodes that we have recorded. Use the GreTap tunnel to the physical node - as the node's interface. - - :param net: network to install - :return: nothing - """ - node_ids = self.physnets.get(net.id, []) - for node_id in node_ids: - node = self.phys[node_id] - # TODO: fix this bad logic, relating to depending on a break to get a valid server - for server in self.session.broker.getserversbynode(node_id): - break - netif = self.session.broker.gettunnel(net.id, IpAddress.to_int(server.host)) - node.addnetif(netif, 0) - netif.node = node - x, y, z = netif.node.position.get() - netif.poshook(netif, x, y, z) - class WirelessModel(ConfigurableOptions): """ diff --git a/daemon/core/nodes/base.py b/daemon/core/nodes/base.py index 7758e4af..a9e8dc43 100644 --- a/daemon/core/nodes/base.py +++ b/daemon/core/nodes/base.py @@ -199,7 +199,9 @@ class NodeBase(object): x, y, _ = self.getposition() model = self.type - emulation_server = self.server.host + emulation_server = None + if self.server is not None: + emulation_server = self.server.host services = self.services if services is not None: @@ -593,7 +595,13 @@ class CoreNode(CoreNodeBase): :param str sh: shell to execute command in :return: str """ - return self.client.create_cmd(sh) + terminal = self.client.create_cmd(sh) + if self.server is None: + return terminal + else: + return "ssh -X -f {host} xterm -e {terminal}".format( + host=self.server.host, terminal=terminal + ) def privatedir(self, path): """ diff --git a/daemon/core/nodes/network.py b/daemon/core/nodes/network.py index 4d68ccaf..f25c800c 100644 --- a/daemon/core/nodes/network.py +++ b/daemon/core/nodes/network.py @@ -288,8 +288,8 @@ class CoreNetwork(CoreNetworkBase): """ logging.info("network node(%s) cmd", self.name) output = utils.check_cmd(args, env, cwd, wait) - for host in self.session.servers: - server = self.session.servers[host] + for name in self.session.servers: + server = self.session.servers[name] server.remote_cmd(args, env, cwd, wait) return output diff --git a/daemon/core/plugins/sdt.py b/daemon/core/plugins/sdt.py index 32800eea..fd674b45 100644 --- a/daemon/core/plugins/sdt.py +++ b/daemon/core/plugins/sdt.py @@ -76,7 +76,7 @@ class Sdt(object): # node information for remote nodes not in session._objs # local nodes also appear here since their obj may not exist yet self.remotes = {} - session.broker.handlers.add(self.handle_distributed) + # session.broker.handlers.add(self.handle_distributed) # add handler for node updates self.session.node_handlers.append(self.handle_node_update) diff --git a/daemon/core/xml/corexmldeployment.py b/daemon/core/xml/corexmldeployment.py index ee316ffc..0a81b75e 100644 --- a/daemon/core/xml/corexmldeployment.py +++ b/daemon/core/xml/corexmldeployment.py @@ -107,10 +107,6 @@ class CoreXmlDeployment(object): def add_deployment(self): physical_host = self.add_physical_host(socket.gethostname()) - # TODO: handle other servers - # servers = self.session.broker.getservernames() - # servers.remove("localhost") - for node_id in self.session.nodes: node = self.session.nodes[node_id] if isinstance(node, CoreNodeBase): diff --git a/daemon/core/xml/emanexml.py b/daemon/core/xml/emanexml.py index 0005c378..881ff373 100644 --- a/daemon/core/xml/emanexml.py +++ b/daemon/core/xml/emanexml.py @@ -314,9 +314,9 @@ def build_transport_xml(emane_manager, node, transport_type): file_name = transport_file_name(node.id, transport_type) file_path = os.path.join(emane_manager.session.session_dir, file_name) create_file(transport_element, doc_name, file_path) - for server in emane_manager.session.servers: - conn = emane_manager.session.servers[server] - create_file(transport_element, doc_name, file_path, conn) + for name in emane_manager.session.servers: + server = emane_manager.session.servers[name] + create_file(transport_element, doc_name, file_path, server) def create_phy_xml(emane_model, config, file_path, server): @@ -342,9 +342,9 @@ def create_phy_xml(emane_model, config, file_path, server): create_file(phy_element, "phy", file_path, server) else: create_file(phy_element, "phy", file_path) - for server in emane_model.session.servers: - conn = emane_model.session.servers[server] - create_file(phy_element, "phy", file_path, conn) + for name in emane_model.session.servers: + server = emane_model.session.servers[name] + create_file(phy_element, "phy", file_path, server) def create_mac_xml(emane_model, config, file_path, server): @@ -372,9 +372,9 @@ def create_mac_xml(emane_model, config, file_path, server): create_file(mac_element, "mac", file_path, server) else: create_file(mac_element, "mac", file_path) - for server in emane_model.session.servers: - conn = emane_model.session.servers[server] - create_file(mac_element, "mac", file_path, conn) + for name in emane_model.session.servers: + server = emane_model.session.servers[name] + create_file(mac_element, "mac", file_path, server) def create_nem_xml( @@ -410,9 +410,9 @@ def create_nem_xml( create_file(nem_element, "nem", nem_file, server) else: create_file(nem_element, "nem", nem_file) - for server in emane_model.session.servers: - conn = emane_model.session.servers[server] - create_file(nem_element, "nem", nem_file, conn) + for name in emane_model.session.servers: + server = emane_model.session.servers[name] + create_file(nem_element, "nem", nem_file, server) def create_event_service_xml(group, port, device, file_directory, server=None): diff --git a/daemon/examples/python/distributed.py b/daemon/examples/python/distributed.py index ca9ca928..8bcf2972 100644 --- a/daemon/examples/python/distributed.py +++ b/daemon/examples/python/distributed.py @@ -8,21 +8,19 @@ from core.emulator.enumerations import EventTypes, NodeTypes def main(): + address = sys.argv[1] + remote = sys.argv[2] + # ip generator for example prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") # create emulator instance for creating sessions and utility methods - coreemu = CoreEmu() + coreemu = CoreEmu({"controlnet": "172.16.0.0/24", "distributed_address": address}) session = coreemu.create_session() - # set controlnet - session.options.set_config("controlnet", "172.16.0.0/24") - # initialize distributed - address = sys.argv[1] - remote = sys.argv[2] - session.address = address - session.add_distributed(remote) + server_name = "core2" + session.add_distributed(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) @@ -31,7 +29,7 @@ def main(): node_one = session.add_node() switch = session.add_node(_type=NodeTypes.SWITCH) options = NodeOptions() - options.emulation_server = remote + options.emulation_server = server_name node_two = session.add_node(node_options=options) # create node interfaces and link diff --git a/daemon/examples/python/distributed_emane.py b/daemon/examples/python/distributed_emane.py index 1ffe5795..c64d1f0c 100644 --- a/daemon/examples/python/distributed_emane.py +++ b/daemon/examples/python/distributed_emane.py @@ -9,25 +9,25 @@ from core.emulator.enumerations import EventTypes, NodeTypes def main(): + address = sys.argv[1] + remote = sys.argv[2] + # ip generator for example prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") # create emulator instance for creating sessions and utility methods - coreemu = CoreEmu() + coreemu = CoreEmu( + { + "controlnet": "core1:172.16.1.0/24 core2:172.16.2.0/24 core3:172.16.3.0/24 " + "core4:172.16.4.0/24 core5:172.16.5.0/24", + "distributed_address": address, + } + ) session = coreemu.create_session() - # set controlnet - session.options.set_config( - "controlnet", - "core1:172.16.1.0/24 core2:172.16.2.0/24 core3:172.16.3.0/24 " - "core4:172.16.4.0/24 core5:172.16.5.0/24", - ) - # initialize distributed - address = sys.argv[1] - remote = sys.argv[2] - session.address = address - session.add_distributed(remote) + server_name = "core2" + session.add_distributed(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) @@ -38,7 +38,7 @@ def main(): node_one = session.add_node(node_options=options) emane_net = session.add_node(_type=NodeTypes.EMANE) session.emane.set_model(emane_net, EmaneIeee80211abgModel) - options.emulation_server = remote + options.emulation_server = server_name node_two = session.add_node(node_options=options) # create node interfaces and link diff --git a/daemon/examples/python/distributed_ptp.py b/daemon/examples/python/distributed_ptp.py index 2b611816..b0f27c28 100644 --- a/daemon/examples/python/distributed_ptp.py +++ b/daemon/examples/python/distributed_ptp.py @@ -8,18 +8,19 @@ from core.emulator.enumerations import EventTypes def main(): + address = sys.argv[1] + remote = sys.argv[2] + # ip generator for example prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") # create emulator instance for creating sessions and utility methods - coreemu = CoreEmu() + coreemu = CoreEmu({"distributed_address": address}) session = coreemu.create_session() # initialize distributed - address = sys.argv[1] - remote = sys.argv[2] - session.address = address - session.add_distributed(remote) + server_name = "core2" + session.add_distributed(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) @@ -27,7 +28,7 @@ def main(): # create local node, switch, and remote nodes options = NodeOptions() node_one = session.add_node(node_options=options) - options.emulation_server = remote + options.emulation_server = server_name node_two = session.add_node(node_options=options) # create node interfaces and link diff --git a/daemon/examples/python/distributed_switches.py b/daemon/examples/python/distributed_switches.py index b7ed166b..bc13bf2c 100644 --- a/daemon/examples/python/distributed_switches.py +++ b/daemon/examples/python/distributed_switches.py @@ -7,15 +7,16 @@ from core.emulator.enumerations import EventTypes, NodeTypes def main(): + address = sys.argv[1] + remote = sys.argv[2] + # create emulator instance for creating sessions and utility methods - coreemu = CoreEmu() + coreemu = CoreEmu({"distributed_address": address}) session = coreemu.create_session() # initialize distributed - address = sys.argv[1] - remote = sys.argv[2] - session.address = address - session.add_distributed(remote) + server_name = "core2" + session.add_distributed(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/examples/python/distributed_wlan.py b/daemon/examples/python/distributed_wlan.py index ca64ee01..f8af1f5f 100644 --- a/daemon/examples/python/distributed_wlan.py +++ b/daemon/examples/python/distributed_wlan.py @@ -9,21 +9,19 @@ from core.location.mobility import BasicRangeModel def main(): + address = sys.argv[1] + remote = sys.argv[2] + # ip generator for example prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") # create emulator instance for creating sessions and utility methods - coreemu = CoreEmu() + coreemu = CoreEmu({"distributed_address": address}) session = coreemu.create_session() - # set controlnet - # session.options.set_config("controlnet", "172.16.0.0/24") - # initialize distributed - address = sys.argv[1] - remote = sys.argv[2] - session.address = address - session.add_distributed(remote) + server_name = "core2" + session.add_distributed(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) @@ -31,7 +29,7 @@ def main(): # create local node, switch, and remote nodes options = NodeOptions() options.set_position(0, 0) - options.emulation_server = remote + options.emulation_server = server_name node_one = session.add_node(node_options=options) wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) session.mobility.set_model(wlan, BasicRangeModel) diff --git a/daemon/tests/conftest.py b/daemon/tests/conftest.py index 001233bb..ead3c2b4 100644 --- a/daemon/tests/conftest.py +++ b/daemon/tests/conftest.py @@ -58,7 +58,6 @@ class CoreServerTest(object): self.request_handler = CoreHandler(request_mock, "", self.server) self.request_handler.session = self.session self.request_handler.add_session_handlers() - self.session.broker.session_clients.append(self.request_handler) # have broker handle a configuration state change self.session.set_state(EventTypes.DEFINITION_STATE) diff --git a/daemon/tests/test_gui.py b/daemon/tests/test_gui.py index caff15fe..02e634be 100644 --- a/daemon/tests/test_gui.py +++ b/daemon/tests/test_gui.py @@ -763,13 +763,11 @@ class TestGui: (ConfigTlvs.VALUES, "%s:%s:%s" % (server, host, port)), ], ) - coreserver.session.broker.addserver = mock.MagicMock() - coreserver.session.broker.setupserver = mock.MagicMock() + coreserver.session.add_distributed = mock.MagicMock() coreserver.request_handler.handle_message(message) - coreserver.session.broker.addserver.assert_called_once_with(server, host, port) - coreserver.session.broker.setupserver.assert_called_once_with(server) + coreserver.session.add_distributed.assert_called_once_with(server, host) def test_config_services_request_all(self, coreserver): message = coreapi.CoreConfMessage.create(