diff --git a/daemon/core/api/tlv/corehandlers.py b/daemon/core/api/tlv/corehandlers.py index 60ddfcca..8f995920 100644 --- a/daemon/core/api/tlv/corehandlers.py +++ b/daemon/core/api/tlv/corehandlers.py @@ -1192,9 +1192,9 @@ class CoreHandler(socketserver.BaseRequestHandler): for server in server_list: server_items = server.split(":") name, host, _ = server_items[:3] - self.session.add_distributed(name, host) + self.session.distributed.add_server(name, host) elif message_type == ConfigFlags.RESET: - self.session.shutdown_distributed() + self.session.distributed.shutdown() def handle_config_services(self, message_type, config_data): replies = [] diff --git a/daemon/core/emane/emanemanager.py b/daemon/core/emane/emanemanager.py index e4208189..91553b5a 100644 --- a/daemon/core/emane/emanemanager.py +++ b/daemon/core/emane/emanemanager.py @@ -142,9 +142,7 @@ class EmaneManager(ModelManager): args = "emane --version" emane_version = utils.check_cmd(args) logging.info("using EMANE: %s", emane_version) - for name in self.session.servers: - server = self.session.servers[name] - server.remote_cmd(args) + self.session.distributed.execute(lambda x: x.remote_cmd(args)) # load default emane models self.load_models(EMANE_MODELS) @@ -518,11 +516,11 @@ class EmaneManager(ModelManager): dev = self.get_config("eventservicedevice") emanexml.create_event_service_xml(group, port, dev, self.session.session_dir) - for name in self.session.servers: - conn = self.session.servers[name] - emanexml.create_event_service_xml( - group, port, dev, self.session.session_dir, conn + self.session.distributed.execute( + lambda x: emanexml.create_event_service_xml( + group, port, dev, self.session.session_dir, x ) + ) def startdaemons(self): """ @@ -598,9 +596,7 @@ class EmaneManager(ModelManager): emanecmd += " -f %s" % os.path.join(path, "emane.log") emanecmd += " %s" % os.path.join(path, "platform.xml") utils.check_cmd(emanecmd, cwd=path) - for name in self.session.servers: - server = self.session.servers[name] - server.remote_cmd(emanecmd, cwd=path) + self.session.distributed.execute(lambda x: x.remote_cmd(emanecmd, cwd=path)) logging.info("host emane daemon running: %s", emanecmd) def stopdaemons(self): @@ -625,10 +621,8 @@ class EmaneManager(ModelManager): try: utils.check_cmd(kill_emaned) utils.check_cmd(kill_transortd) - for name in self.session.servers: - server = self.session[name] - server.remote_cmd(kill_emaned) - server.remote_cmd(kill_transortd) + self.session.distributed.execute(lambda x: x.remote_cmd(kill_emaned)) + self.session.distributed.execute(lambda x: x.remote_cmd(kill_transortd)) except CoreCommandError: logging.exception("error shutting down emane daemons") diff --git a/daemon/core/emulator/distributed.py b/daemon/core/emulator/distributed.py index 2df33541..c6218441 100644 --- a/daemon/core/emulator/distributed.py +++ b/daemon/core/emulator/distributed.py @@ -5,12 +5,17 @@ Defines distributed server functionality. import logging import os import threading +from collections import OrderedDict from tempfile import NamedTemporaryFile from fabric import Connection from invoke import UnexpectedExit +from core import utils from core.errors import CoreCommandError +from core.nodes.interface import GreTap +from core.nodes.ipaddress import IpAddress +from core.nodes.network import CoreNetwork, CtrlNet LOCK = threading.Lock() @@ -93,3 +98,150 @@ class DistributedServer(object): temp.close() self.conn.put(temp.name, destination) os.unlink(temp.name) + + +class DistributedController(object): + def __init__(self, session): + """ + Create + + :param session: + """ + self.session = session + self.servers = OrderedDict() + self.tunnels = {} + self.address = self.session.options.get_config( + "distributed_address", default=None + ) + + def add_server(self, name, host): + """ + Add distributed server configuration. + + :param str name: distributed server name + :param str host: distributed server host address + :return: nothing + """ + server = DistributedServer(name, host) + self.servers[name] = server + cmd = "mkdir -p %s" % self.session.session_dir + server.remote_cmd(cmd) + + def execute(self, func): + """ + Convenience for executing logic against all distributed servers. + + :param func: function to run, that takes a DistributedServer as a parameter + :return: nothing + """ + for name in self.servers: + server = self.servers[name] + func(server) + + def shutdown(self): + """ + Shutdown logic for dealing with distributed tunnels and server session + directories. + + :return: nothing + """ + # shutdown all tunnels + for key in self.tunnels: + tunnels = self.tunnels[key] + for tunnel in tunnels: + tunnel.shutdown() + + # remove all remote session directories + for name in self.servers: + server = self.servers[name] + cmd = "rm -rf %s" % self.session.session_dir + server.remote_cmd(cmd) + + # clear tunnels + self.tunnels.clear() + + def start(self): + """ + Start distributed network tunnels. + + :return: nothing + """ + for node_id in self.session.nodes: + node = self.session.nodes[node_id] + + if not isinstance(node, CoreNetwork): + continue + + if isinstance(node, CtrlNet) and node.serverintf is not None: + continue + + for name in self.servers: + server = self.servers[name] + self.create_gre_tunnel(node, server) + + def create_gre_tunnel(self, node, server): + """ + Create gre tunnel using a pair of gre taps between the local and remote server. + + + :param core.nodes.network.CoreNetwork node: node to create gre tunnel for + :param core.emulator.distributed.DistributedServer server: server to create + tunnel for + :return: local and remote gre taps created for tunnel + :rtype: tuple + """ + host = server.host + key = self.tunnel_key(node.id, IpAddress.to_int(host)) + tunnel = self.tunnels.get(key) + if tunnel is not None: + return tunnel + + # local to server + logging.info( + "local tunnel node(%s) to remote(%s) key(%s)", node.name, host, key + ) + local_tap = GreTap(session=self.session, remoteip=host, key=key) + local_tap.net_client.create_interface(node.brname, local_tap.localname) + + # server to local + logging.info( + "remote tunnel node(%s) to local(%s) key(%s)", node.name, self.address, key + ) + remote_tap = GreTap( + session=self.session, remoteip=self.address, key=key, server=server + ) + remote_tap.net_client.create_interface(node.brname, remote_tap.localname) + + # save tunnels for shutdown + tunnel = (local_tap, remote_tap) + self.tunnels[key] = tunnel + return tunnel + + def tunnel_key(self, n1_id, n2_id): + """ + Compute a 32-bit key used to uniquely identify a GRE tunnel. + The hash(n1num), hash(n2num) values are used, so node numbers may be + None or string values (used for e.g. "ctrlnet"). + + :param int n1_id: node one id + :param int n2_id: node two id + :return: tunnel key for the node pair + :rtype: int + """ + logging.debug("creating tunnel key for: %s, %s", n1_id, n2_id) + key = ( + (self.session.id << 16) ^ utils.hashkey(n1_id) ^ (utils.hashkey(n2_id) << 8) + ) + return key & 0xFFFFFFFF + + def get_tunnel(self, n1_id, n2_id): + """ + Return the GreTap between two nodes if it exists. + + :param int n1_id: node one id + :param int n2_id: node two id + :return: gre tap between nodes or None + """ + key = self.tunnel_key(n1_id, n2_id) + logging.debug("checking for tunnel key(%s) in: %s", key, self.tunnels) + return self.tunnels.get(key) diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index fe371c44..d962da28 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -18,7 +18,7 @@ from core import constants, utils from core.emane.emanemanager import EmaneManager from core.emane.nodes import EmaneNet from core.emulator.data import EventData, ExceptionData, NodeData -from core.emulator.distributed import DistributedServer +from core.emulator.distributed import DistributedController from core.emulator.emudata import ( IdGen, LinkOptions, @@ -34,11 +34,9 @@ from core.location.event import EventLoop from core.location.mobility import MobilityManager from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase from core.nodes.docker import DockerNode -from core.nodes.interface import GreTap -from core.nodes.ipaddress import IpAddress, MacAddress +from core.nodes.ipaddress import MacAddress from core.nodes.lxd import LxcNode from core.nodes.network import ( - CoreNetwork, CtrlNet, GreTapBridge, HubNode, @@ -137,10 +135,8 @@ class Session(object): self.options.set_config(key, value) self.metadata = SessionMetaData() - # distributed servers - self.servers = {} - self.tunnels = {} - self.address = self.options.get_config("distributed_address", default=None) + # distributed support and logic + self.distributed = DistributedController(self) # initialize session feature helpers self.location = CoreLocation() @@ -158,123 +154,6 @@ class Session(object): "host": ("DefaultRoute", "SSH"), } - def add_distributed(self, name, host): - """ - Add distributed server configuration. - - :param str name: distributed server name - :param str host: distributed server host address - :return: nothing - """ - server = DistributedServer(name, host) - self.servers[name] = server - cmd = "mkdir -p %s" % self.session_dir - server.remote_cmd(cmd) - - def shutdown_distributed(self): - """ - Shutdown logic for dealing with distributed tunnels and server session - directories. - - :return: nothing - """ - # shutdown all tunnels - for key in self.tunnels: - tunnels = self.tunnels[key] - for tunnel in tunnels: - tunnel.shutdown() - - # remove all remote session directories - for name in self.servers: - server = self.servers[name] - cmd = "rm -rf %s" % self.session_dir - server.remote_cmd(cmd) - - # clear tunnels - self.tunnels.clear() - - def start_distributed(self): - """ - Start distributed network tunnels. - - :return: nothing - """ - for node_id in self.nodes: - node = self.nodes[node_id] - - if not isinstance(node, CoreNetwork): - continue - - if isinstance(node, CtrlNet) and node.serverintf is not None: - continue - - for name in self.servers: - server = self.servers[name] - self.create_gre_tunnel(node, server) - - def create_gre_tunnel(self, node, server): - """ - Create gre tunnel using a pair of gre taps between the local and remote server. - - - :param core.nodes.network.CoreNetwork node: node to create gre tunnel for - :param core.emulator.distributed.DistributedServer server: server to create - tunnel for - :return: local and remote gre taps created for tunnel - :rtype: tuple - """ - host = server.host - key = self.tunnelkey(node.id, IpAddress.to_int(host)) - tunnel = self.tunnels.get(key) - if tunnel is not None: - return tunnel - - # local to server - logging.info( - "local tunnel node(%s) to remote(%s) key(%s)", node.name, host, key - ) - local_tap = GreTap(session=self, remoteip=host, key=key) - local_tap.net_client.create_interface(node.brname, local_tap.localname) - - # server to local - logging.info( - "remote tunnel node(%s) to local(%s) key(%s)", node.name, self.address, key - ) - remote_tap = GreTap(session=self, remoteip=self.address, key=key, server=server) - remote_tap.net_client.create_interface(node.brname, remote_tap.localname) - - # save tunnels for shutdown - tunnel = (local_tap, remote_tap) - self.tunnels[key] = tunnel - return tunnel - - def tunnelkey(self, n1_id, n2_id): - """ - Compute a 32-bit key used to uniquely identify a GRE tunnel. - The hash(n1num), hash(n2num) values are used, so node numbers may be - None or string values (used for e.g. "ctrlnet"). - - :param int n1_id: node one id - :param int n2_id: node two id - :return: tunnel key for the node pair - :rtype: int - """ - logging.debug("creating tunnel key for: %s, %s", n1_id, n2_id) - key = (self.id << 16) ^ utils.hashkey(n1_id) ^ (utils.hashkey(n2_id) << 8) - return key & 0xFFFFFFFF - - def gettunnel(self, n1_id, n2_id): - """ - Return the GreTap between two nodes if it exists. - - :param int n1_id: node one id - :param int n2_id: node two id - :return: gre tap between nodes or None - """ - key = self.tunnelkey(n1_id, n2_id) - logging.debug("checking for tunnel key(%s) in: %s", key, self.tunnels) - return self.tunnels.get(key) - @classmethod def get_node_class(cls, _type): """ @@ -324,7 +203,7 @@ class Session(object): node_two = self.get_node(node_two_id) # both node ids are provided - tunnel = self.gettunnel(node_one_id, node_two_id) + tunnel = self.distributed.get_tunnel(node_one_id, node_two_id) logging.debug("tunnel between nodes: %s", tunnel) if isinstance(tunnel, GreTapBridge): net_one = tunnel @@ -789,7 +668,7 @@ class Session(object): name = "%s%s" % (node_class.__name__, _id) # verify distributed server - server = self.servers.get(node_options.emulation_server) + server = self.distributed.servers.get(node_options.emulation_server) if node_options.emulation_server is not None and server is None: raise CoreError( "invalid distributed server: %s" % node_options.emulation_server @@ -1003,7 +882,7 @@ class Session(object): :return: nothing """ self.delete_nodes() - self.shutdown_distributed() + self.distributed.shutdown() self.del_hooks() self.emane.reset() @@ -1082,7 +961,7 @@ class Session(object): # remove and shutdown all nodes and tunnels self.delete_nodes() - self.shutdown_distributed() + self.distributed.shutdown() # remove this sessions working directory preserve = self.options.get_config("preservedir") == "1" @@ -1594,7 +1473,7 @@ class Session(object): self.add_remove_control_interface(node=None, remove=False) # initialize distributed tunnels - self.start_distributed() + self.distributed.start() # instantiate will be invoked again upon Emane configure if self.emane.startup() == self.emane.NOT_READY: diff --git a/daemon/core/nodes/network.py b/daemon/core/nodes/network.py index 931622bb..98bec198 100644 --- a/daemon/core/nodes/network.py +++ b/daemon/core/nodes/network.py @@ -289,9 +289,7 @@ class CoreNetwork(CoreNetworkBase): """ logging.info("network node(%s) cmd", self.name) output = utils.check_cmd(args, env, cwd, wait) - for name in self.session.servers: - server = self.session.servers[name] - server.remote_cmd(args, env, cwd, wait) + self.session.distributed.execute(lambda x: x.remote_cmd(args, env, cwd, wait)) return output def startup(self): @@ -778,8 +776,9 @@ class CtrlNet(CoreNetwork): current = "%s/%s" % (address, self.prefix.prefixlen) net_client = get_net_client(use_ovs, utils.check_cmd) net_client.create_address(self.brname, current) - for name in self.session.servers: - server = self.session.servers[name] + servers = self.session.distributed.servers + for name in servers: + server = servers[name] address -= 1 current = "%s/%s" % (address, self.prefix.prefixlen) net_client = get_net_client(use_ovs, server.remote_cmd) diff --git a/daemon/core/nodes/physical.py b/daemon/core/nodes/physical.py index 37a2eb54..0f9e0217 100644 --- a/daemon/core/nodes/physical.py +++ b/daemon/core/nodes/physical.py @@ -166,7 +166,7 @@ class PhysicalNode(CoreNodeBase): if self.up: # this is reached when this node is linked to a network node # tunnel to net not built yet, so build it now and adopt it - _, remote_tap = self.session.create_gre_tunnel(net, self.server) + _, remote_tap = self.session.distributed.create_gre_tunnel(net, self.server) self.adoptnetif(remote_tap, ifindex, hwaddr, addrlist) return ifindex else: diff --git a/daemon/core/xml/emanexml.py b/daemon/core/xml/emanexml.py index 881ff373..41319ea4 100644 --- a/daemon/core/xml/emanexml.py +++ b/daemon/core/xml/emanexml.py @@ -314,9 +314,9 @@ def build_transport_xml(emane_manager, node, transport_type): file_name = transport_file_name(node.id, transport_type) file_path = os.path.join(emane_manager.session.session_dir, file_name) create_file(transport_element, doc_name, file_path) - for name in emane_manager.session.servers: - server = emane_manager.session.servers[name] - create_file(transport_element, doc_name, file_path, server) + emane_manager.session.distributed.execute( + lambda x: create_file(transport_element, doc_name, file_path, x) + ) def create_phy_xml(emane_model, config, file_path, server): @@ -342,9 +342,9 @@ def create_phy_xml(emane_model, config, file_path, server): create_file(phy_element, "phy", file_path, server) else: create_file(phy_element, "phy", file_path) - for name in emane_model.session.servers: - server = emane_model.session.servers[name] - create_file(phy_element, "phy", file_path, server) + emane_model.session.distributed.execute( + lambda x: create_file(phy_element, "phy", file_path, x) + ) def create_mac_xml(emane_model, config, file_path, server): @@ -372,9 +372,9 @@ def create_mac_xml(emane_model, config, file_path, server): create_file(mac_element, "mac", file_path, server) else: create_file(mac_element, "mac", file_path) - for name in emane_model.session.servers: - server = emane_model.session.servers[name] - create_file(mac_element, "mac", file_path, server) + emane_model.session.distributed.execute( + lambda x: create_file(mac_element, "mac", file_path, x) + ) def create_nem_xml( @@ -410,9 +410,9 @@ def create_nem_xml( create_file(nem_element, "nem", nem_file, server) else: create_file(nem_element, "nem", nem_file) - for name in emane_model.session.servers: - server = emane_model.session.servers[name] - create_file(nem_element, "nem", nem_file, server) + emane_model.session.distributed.execute( + lambda x: create_file(nem_element, "nem", nem_file, x) + ) def create_event_service_xml(group, port, device, file_directory, server=None): diff --git a/daemon/examples/python/distributed.py b/daemon/examples/python/distributed.py index 8bcf2972..8eb23b2c 100644 --- a/daemon/examples/python/distributed.py +++ b/daemon/examples/python/distributed.py @@ -20,7 +20,7 @@ def main(): # initialize distributed server_name = "core2" - session.add_distributed(server_name, remote) + session.distributed.add_server(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/examples/python/distributed_emane.py b/daemon/examples/python/distributed_emane.py index c64d1f0c..4ef50ccb 100644 --- a/daemon/examples/python/distributed_emane.py +++ b/daemon/examples/python/distributed_emane.py @@ -27,7 +27,7 @@ def main(): # initialize distributed server_name = "core2" - session.add_distributed(server_name, remote) + session.distributed.add_server(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/examples/python/distributed_lxd.py b/daemon/examples/python/distributed_lxd.py index 8bafeb7a..130942ea 100644 --- a/daemon/examples/python/distributed_lxd.py +++ b/daemon/examples/python/distributed_lxd.py @@ -20,7 +20,7 @@ def main(): # initialize distributed server_name = "core2" - session.add_distributed(server_name, remote) + session.distributed.add_server(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/examples/python/distributed_ptp.py b/daemon/examples/python/distributed_ptp.py index b0f27c28..62e7df64 100644 --- a/daemon/examples/python/distributed_ptp.py +++ b/daemon/examples/python/distributed_ptp.py @@ -20,7 +20,7 @@ def main(): # initialize distributed server_name = "core2" - session.add_distributed(server_name, remote) + session.distributed.add_server(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/examples/python/distributed_switches.py b/daemon/examples/python/distributed_switches.py index bc13bf2c..f9b69757 100644 --- a/daemon/examples/python/distributed_switches.py +++ b/daemon/examples/python/distributed_switches.py @@ -16,7 +16,7 @@ def main(): # initialize distributed server_name = "core2" - session.add_distributed(server_name, remote) + session.distributed.add_server(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/examples/python/distributed_wlan.py b/daemon/examples/python/distributed_wlan.py index f8af1f5f..10f25aa8 100644 --- a/daemon/examples/python/distributed_wlan.py +++ b/daemon/examples/python/distributed_wlan.py @@ -21,7 +21,7 @@ def main(): # initialize distributed server_name = "core2" - session.add_distributed(server_name, remote) + session.distributed.add_server(server_name, remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) diff --git a/daemon/tests/test_gui.py b/daemon/tests/test_gui.py index 02e634be..c07e2bd3 100644 --- a/daemon/tests/test_gui.py +++ b/daemon/tests/test_gui.py @@ -763,11 +763,11 @@ class TestGui: (ConfigTlvs.VALUES, "%s:%s:%s" % (server, host, port)), ], ) - coreserver.session.add_distributed = mock.MagicMock() + coreserver.session.distributed.add_server = mock.MagicMock() coreserver.request_handler.handle_message(message) - coreserver.session.add_distributed.assert_called_once_with(server, host) + coreserver.session.distributed.add_server.assert_called_once_with(server, host) def test_config_services_request_all(self, coreserver): message = coreapi.CoreConfMessage.create(