From b7b0e4222c5ee8312133a3d332528a9fed0dea2d Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Tue, 8 Oct 2019 15:09:26 -0700 Subject: [PATCH] updates for basic working distrbuted network using fabric --- daemon/core/emulator/distributed.py | 27 ++++++ daemon/core/emulator/session.py | 83 +++++++++++++++++-- daemon/core/nodes/base.py | 75 ++++++++--------- daemon/core/nodes/interface.py | 46 ++++++---- daemon/core/nodes/network.py | 39 +++++++-- daemon/core/utils.py | 2 +- daemon/examples/python/distributed.py | 43 +++++----- .../examples/python/distributed_switches.py | 42 ++++++++++ 8 files changed, 261 insertions(+), 96 deletions(-) create mode 100644 daemon/core/emulator/distributed.py create mode 100644 daemon/examples/python/distributed_switches.py diff --git a/daemon/core/emulator/distributed.py b/daemon/core/emulator/distributed.py new file mode 100644 index 00000000..104d939d --- /dev/null +++ b/daemon/core/emulator/distributed.py @@ -0,0 +1,27 @@ +import logging + +from core.errors import CoreCommandError + + +def remote_cmd(server, cmd, env=None): + """ + Run command remotely using server connection. + + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost + :param str cmd: command to run + :param dict env: environment for remote command, default is None + :return: stdout when success + :rtype: str + :raises CoreCommandError: when a non-zero exit status occurs + """ + logging.info("remote cmd server(%s): %s", server, cmd) + if env is None: + result = server.run(cmd, hide=False) + else: + result = server.run(cmd, hide=False, env=env, replace_env=True) + if result.exited: + raise CoreCommandError( + result.exited, result.command, result.stdout, result.stderr + ) + return result.stdout.strip() diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index e0afc53c..9eb02a07 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -37,9 +37,11 @@ from core.location.event import EventLoop from core.location.mobility import MobilityManager from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase from core.nodes.docker import DockerNode -from core.nodes.ipaddress import MacAddress +from core.nodes.interface import GreTap +from core.nodes.ipaddress import IpAddress, MacAddress from core.nodes.lxd import LxcNode from core.nodes.network import ( + CoreNetwork, CtrlNet, GreTapBridge, HubNode, @@ -148,6 +150,8 @@ class Session(object): # distributed servers self.servers = {} + self.tunnels = {} + self.address = None # initialize default node services self.services.default_services = { @@ -161,19 +165,81 @@ class Session(object): def add_distributed(self, server): conn = Connection(server, user="root") self.servers[server] = conn - - def init_distributed(self): - for server in self.servers: - conn = self.servers[server] - cmd = "mkdir -p %s" % self.session_dir - conn.run(cmd, hide=False) + cmd = "mkdir -p %s" % self.session_dir + conn.run(cmd, hide=False) def shutdown_distributed(self): + # shutdown all tunnels + for key in self.tunnels: + tunnels = self.tunnels[key] + for tunnel in tunnels: + tunnel.shutdown() + + # remove all remote session directories for server in self.servers: conn = self.servers[server] cmd = "rm -rf %s" % self.session_dir conn.run(cmd, hide=False) + # clear tunnels + self.tunnels.clear() + + def initialize_distributed(self): + for node_id in self.nodes: + node = self.nodes[node_id] + + if not isinstance(node, CoreNetwork): + continue + + if isinstance(node, CtrlNet) and node.serverintf is not None: + continue + + for server in self.servers: + conn = self.servers[server] + key = self.tunnelkey(node_id, IpAddress.to_int(server)) + + # local to server + logging.info( + "local tunnel node(%s) to remote(%s) key(%s)", + node.name, + server, + key, + ) + local_tap = GreTap(session=self, remoteip=server, key=key) + local_tap.net_client.create_interface(node.brname, local_tap.localname) + + # server to local + logging.info( + "remote tunnel node(%s) to local(%s) key(%s)", + node.name, + self.address, + key, + ) + remote_tap = GreTap( + session=self, remoteip=self.address, key=key, server=conn + ) + remote_tap.net_client.create_interface( + node.brname, remote_tap.localname + ) + + # save tunnels for shutdown + self.tunnels[key] = [local_tap, remote_tap] + + def tunnelkey(self, n1num, n2num): + """ + Compute a 32-bit key used to uniquely identify a GRE tunnel. + The hash(n1num), hash(n2num) values are used, so node numbers may be + None or string values (used for e.g. "ctrlnet"). + + :param int n1num: node one id + :param int n2num: node two id + :return: tunnel key for the node pair + :rtype: int + """ + logging.debug("creating tunnel key for: %s, %s", n1num, n2num) + key = (self.id << 16) ^ utils.hashkey(n1num) ^ (utils.hashkey(n2num) << 8) + return key & 0xFFFFFFFF + @classmethod def get_node_class(cls, _type): """ @@ -1493,6 +1559,9 @@ class Session(object): self.add_remove_control_interface(node=None, remove=False) self.broker.startup() + # initialize distributed tunnels + self.initialize_distributed() + # instantiate will be invoked again upon Emane configure if self.emane.startup() == self.emane.NOT_READY: return diff --git a/daemon/core/nodes/base.py b/daemon/core/nodes/base.py index 21324c59..82915b38 100644 --- a/daemon/core/nodes/base.py +++ b/daemon/core/nodes/base.py @@ -14,6 +14,7 @@ from socket import AF_INET, AF_INET6 from tempfile import NamedTemporaryFile from core import constants, utils +from core.emulator import distributed from core.emulator.data import LinkData, NodeData from core.emulator.enumerations import LinkTypes, NodeTypes from core.errors import CoreCommandError @@ -95,39 +96,7 @@ class NodeBase(object): :rtype: str :raises CoreCommandError: when a non-zero exit status occurs """ - logging.info("net cmd server(%s): %s", self.server, args) - if self.server is None: - return utils.check_cmd(args, env=env) - else: - args = " ".join(args) - return self.remote_cmd(args, env=env) - - def remote_cmd(self, cmd, env=None): - """ - Run command remotely using server connection. - - :param str cmd: command to run - :param dict env: environment for remote command, default is None - :return: stdout when success - :rtype: str - :raises CoreCommandError: when a non-zero exit status occurs - """ - if env is None: - result = self.server.run(cmd, hide=False) - else: - logging.info("command env: %s", env) - result = self.server.run(cmd, hide=False, env=env, replace_env=True) - if result.exited: - raise CoreCommandError( - result.exited, result.command, result.stdout, result.stderr - ) - - logging.info( - "fabric result:\n\tstdout: %s\n\tstderr: %s", - result.stdout.strip(), - result.stderr.strip(), - ) - return result.stdout.strip() + raise NotImplementedError def setposition(self, x=None, y=None, z=None): """ @@ -279,7 +248,8 @@ class CoreNodeBase(NodeBase): :param int _id: object id :param str name: object name :param bool start: boolean for starting - :param str server: remote server node will run on, default is None for localhost + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost """ super(CoreNodeBase, self).__init__(session, _id, name, start, server) self.services = [] @@ -412,6 +382,23 @@ class CoreNodeBase(NodeBase): return common + def net_cmd(self, args, env=None): + """ + Runs a command that is used to configure and setup the network on the host + system. + + :param list[str]|str args: command to run + :param dict env: environment to run command with + :return: combined stdout and stderr + :rtype: str + :raises CoreCommandError: when a non-zero exit status occurs + """ + if self.server is None: + return utils.check_cmd(args, env=env) + else: + args = " ".join(args) + return distributed.remote_cmd(self.server, args, env=env) + def node_net_cmd(self, args): """ Runs a command that is used to configure and setup the network within a @@ -493,7 +480,8 @@ class CoreNode(CoreNodeBase): :param str nodedir: node directory :param str bootsh: boot shell to use :param bool start: start flag - :param str server: remote server node will run on, default is None for localhost + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost """ super(CoreNode, self).__init__(session, _id, name, start, server) self.nodedir = nodedir @@ -653,13 +641,13 @@ class CoreNode(CoreNodeBase): :rtype: str :raises CoreCommandError: when a non-zero exit status occurs """ - logging.info("net cmd server(%s): %s", self.server, args) if self.server is None: + logging.info("node(%s) cmd: %s", self.name, args) return self.check_cmd(args) else: args = self.client._cmd_args() + args args = " ".join(args) - return self.remote_cmd(args) + return distributed.remote_cmd(self.server, args) def check_cmd(self, args): """ @@ -753,7 +741,11 @@ class CoreNode(CoreNodeBase): raise ValueError("interface name (%s) too long" % name) veth = Veth( - node=self, name=name, localname=localname, net=net, start=self.up + node=self, + name=name, + localname=localname, + start=self.up, + server=self.server, ) if self.up: @@ -806,9 +798,7 @@ class CoreNode(CoreNodeBase): sessionid = self.session.short_session_id() localname = "tap%s.%s.%s" % (self.id, ifindex, sessionid) name = ifname - tuntap = TunTap( - node=self, name=name, localname=localname, net=net, start=self.up - ) + tuntap = TunTap(node=self, name=name, localname=localname, start=self.up) try: self.addnetif(tuntap, ifindex) @@ -1057,7 +1047,8 @@ class CoreNetworkBase(NodeBase): :param int _id: object id :param str name: object name :param bool start: should object start - :param str server: remote server node will run on, default is None for localhost + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost """ super(CoreNetworkBase, self).__init__(session, _id, name, start, server) self._linked = {} diff --git a/daemon/core/nodes/interface.py b/daemon/core/nodes/interface.py index 51859e3a..8b73b1b7 100644 --- a/daemon/core/nodes/interface.py +++ b/daemon/core/nodes/interface.py @@ -7,6 +7,7 @@ import time from builtins import int, range from core import utils +from core.emulator import distributed from core.errors import CoreCommandError from core.nodes.netclient import LinuxNetClient @@ -16,13 +17,15 @@ class CoreInterface(object): Base class for network interfaces. """ - def __init__(self, node, name, mtu): + def __init__(self, node, name, mtu, server=None): """ Creates a PyCoreNetIf instance. :param core.nodes.base.CoreNode node: node for interface :param str name: interface name :param mtu: mtu value + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost """ self.node = node @@ -42,7 +45,15 @@ class CoreInterface(object): self.netindex = None # index used to find flow data self.flow_id = None - self.net_client = LinuxNetClient(utils.check_cmd) + self.server = server + self.net_client = LinuxNetClient(self.net_cmd) + + def net_cmd(self, args): + if self.server is None: + return utils.check_cmd(args) + else: + args = " ".join(args) + return distributed.remote_cmd(self.server, args) def startup(self): """ @@ -191,8 +202,7 @@ class Veth(CoreInterface): Provides virtual ethernet functionality for core nodes. """ - # TODO: network is not used, why was it needed? - def __init__(self, node, name, localname, mtu=1500, net=None, start=True): + def __init__(self, node, name, localname, mtu=1500, server=None, start=True): """ Creates a VEth instance. @@ -200,12 +210,13 @@ class Veth(CoreInterface): :param str name: interface name :param str localname: interface local name :param mtu: interface mtu - :param net: network + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost :param bool start: start flag :raises CoreCommandError: when there is a command exception """ # note that net arg is ignored - CoreInterface.__init__(self, node=node, name=name, mtu=mtu) + CoreInterface.__init__(self, node, name, mtu, server) self.localname = localname self.up = False if start: @@ -251,8 +262,7 @@ class TunTap(CoreInterface): TUN/TAP virtual device in TAP mode """ - # TODO: network is not used, why was it needed? - def __init__(self, node, name, localname, mtu=1500, net=None, start=True): + def __init__(self, node, name, localname, mtu=1500, server=None, start=True): """ Create a TunTap instance. @@ -260,10 +270,11 @@ class TunTap(CoreInterface): :param str name: interface name :param str localname: local interface name :param mtu: interface mtu - :param core.nodes.base.CoreNetworkBase net: related network + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost :param bool start: start flag """ - CoreInterface.__init__(self, node=node, name=name, mtu=mtu) + CoreInterface.__init__(self, node, name, mtu, server) self.localname = localname self.up = False self.transport_type = "virtual" @@ -427,6 +438,7 @@ class GreTap(CoreInterface): ttl=255, key=None, start=True, + server=None, ): """ Creates a GreTap instance. @@ -441,9 +453,11 @@ class GreTap(CoreInterface): :param ttl: ttl value :param key: gre tap key :param bool start: start flag + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost :raises CoreCommandError: when there is a command exception """ - CoreInterface.__init__(self, node=node, name=name, mtu=mtu) + CoreInterface.__init__(self, node, name, mtu, server) self.session = session if _id is None: # from PyCoreObj @@ -460,9 +474,13 @@ class GreTap(CoreInterface): if remoteip is None: raise ValueError("missing remote IP required for GRE TAP device") - self.net_client.create_gretap( - self.localname, str(remoteip), str(localip), str(ttl), str(key) - ) + if localip is not None: + localip = str(localip) + if ttl is not None: + ttl = str(ttl) + if key is not None: + key = str(key) + self.net_client.create_gretap(self.localname, str(remoteip), localip, ttl, key) self.net_client.device_up(self.localname) self.up = True diff --git a/daemon/core/nodes/network.py b/daemon/core/nodes/network.py index 444ded56..3ec84282 100644 --- a/daemon/core/nodes/network.py +++ b/daemon/core/nodes/network.py @@ -10,6 +10,7 @@ import time from socket import AF_INET, AF_INET6 from core import constants, utils +from core.emulator import distributed from core.emulator.data import LinkData from core.emulator.enumerations import LinkTypes, NodeTypes, RegisterTlvs from core.errors import CoreCommandError, CoreError @@ -291,7 +292,8 @@ class CoreNetwork(CoreNetworkBase): :param int _id: object id :param str name: object name :param bool start: start flag - :param str server: remote server node will run on, default is None for localhost + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost :param policy: network policy """ CoreNetworkBase.__init__(self, session, _id, name, start, server) @@ -307,6 +309,27 @@ class CoreNetwork(CoreNetworkBase): self.startup() ebq.startupdateloop(self) + def net_cmd(self, args, env=None): + """ + Runs a command that is used to configure and setup the network on the host + system. + + :param list[str]|str args: command to run + :param dict env: environment to run command with + :return: combined stdout and stderr + :rtype: str + :raises CoreCommandError: when a non-zero exit status occurs + """ + logging.info("network node(%s) cmd", self.name) + output = utils.check_cmd(args, env=env) + + args = " ".join(args) + for server in self.session.servers: + conn = self.session.servers[server] + distributed.remote_cmd(conn, args, env=env) + + return output + def startup(self): """ Linux bridge starup logic. @@ -381,11 +404,11 @@ class CoreNetwork(CoreNetworkBase): """ Attach a network interface. - :param core.netns.vnode.VEth netif: network interface to attach + :param core.nodes.interface.Veth netif: network interface to attach :return: nothing """ if self.up: - self.net_client.create_interface(self.brname, netif.localname) + netif.net_client.create_interface(self.brname, netif.localname) CoreNetworkBase.attach(self, netif) @@ -397,7 +420,7 @@ class CoreNetwork(CoreNetworkBase): :return: nothing """ if self.up: - self.net_client.delete_interface(self.brname, netif.localname) + netif.net_client.delete_interface(self.brname, netif.localname) CoreNetworkBase.detach(self, netif) @@ -591,13 +614,11 @@ class CoreNetwork(CoreNetworkBase): if len(name) >= 16: raise ValueError("interface name %s too long" % name) - netif = Veth( - node=None, name=name, localname=localname, mtu=1500, net=self, start=self.up - ) + netif = Veth(node=None, name=name, localname=localname, mtu=1500, start=self.up) self.attach(netif) if net.up: # this is similar to net.attach() but uses netif.name instead of localname - self.net_client.create_interface(net.brname, netif.name) + netif.net_client.create_interface(net.brname, netif.name) i = net.newifindex() net._netif[i] = netif with net._linked_lock: @@ -666,6 +687,8 @@ class GreTapBridge(CoreNetwork): :param ttl: ttl value :param key: gre tap key :param bool start: start flag + :param fabric.connection.Connection server: remote server node will run on, + default is None for localhost """ CoreNetwork.__init__(self, session, _id, name, False, server, policy) self.grekey = key diff --git a/daemon/core/utils.py b/daemon/core/utils.py index 20d2384e..8e59a050 100644 --- a/daemon/core/utils.py +++ b/daemon/core/utils.py @@ -263,7 +263,7 @@ def check_cmd(args, **kwargs): kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.STDOUT args = split_args(args) - logging.debug("command: %s", args) + logging.info("command: %s", args) try: p = subprocess.Popen(args, **kwargs) stdout, _ = p.communicate() diff --git a/daemon/examples/python/distributed.py b/daemon/examples/python/distributed.py index 5b5174f6..feb5e8bb 100644 --- a/daemon/examples/python/distributed.py +++ b/daemon/examples/python/distributed.py @@ -1,51 +1,46 @@ import logging import pdb +import sys from core.emulator.coreemu import CoreEmu -from core.emulator.emudata import NodeOptions -from core.emulator.enumerations import EventTypes +from core.emulator.emudata import IpPrefixes, NodeOptions +from core.emulator.enumerations import EventTypes, NodeTypes def main(): # ip generator for example - # prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") + prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") # create emulator instance for creating sessions and utility methods coreemu = CoreEmu() session = coreemu.create_session() # initialize distributed - session.add_distributed("core2") - session.init_distributed() + address = sys.argv[1] + remote = sys.argv[2] + session.address = address + session.add_distributed(remote) # must be in configuration state for nodes to start, when using "node_add" below session.set_state(EventTypes.CONFIGURATION_STATE) - # create switch network node - # switch = session.add_node(_type=NodeTypes.SWITCH) - - # create nodes + # create local node, switch, and remote nodes + node_one = session.add_node() + switch = session.add_node(_type=NodeTypes.SWITCH) options = NodeOptions() - options.emulation_server = "core2" - session.add_node(node_options=options) - # interface = prefixes.create_interface(node_one) - # session.add_link(node_one.id, switch.id, interface_one=interface) + options.emulation_server = remote + node_two = session.add_node(node_options=options) - session.add_node() - # interface = prefixes.create_interface(node_two) - # session.add_link(node_two.id, switch.id, interface_one=interface) + # create not interfaces and link + interface_one = prefixes.create_interface(node_one) + interface_two = prefixes.create_interface(node_two) + session.add_link(node_one.id, switch.id, interface_one=interface_one) + session.add_link(node_two.id, switch.id, interface_one=interface_two) # instantiate session session.instantiate() - # print("starting iperf server on node: %s" % node_one.name) - # node_one.cmd(["iperf", "-s", "-D"]) - # node_one_address = prefixes.ip4_address(node_one) - # - # print("node %s connecting to %s" % (node_two.name, node_one_address)) - # node_two.client.icmd(["iperf", "-t", "10", "-c", node_one_address]) - # node_one.cmd(["killall", "-9", "iperf"]) - + # pause script for verification pdb.set_trace() # shutdown session diff --git a/daemon/examples/python/distributed_switches.py b/daemon/examples/python/distributed_switches.py new file mode 100644 index 00000000..c6366d5d --- /dev/null +++ b/daemon/examples/python/distributed_switches.py @@ -0,0 +1,42 @@ +import logging +import pdb +import sys + +from core.emulator.coreemu import CoreEmu +from core.emulator.enumerations import EventTypes, NodeTypes + + +def main(): + # create emulator instance for creating sessions and utility methods + coreemu = CoreEmu() + session = coreemu.create_session() + + # initialize distributed + address = sys.argv[1] + remote = sys.argv[2] + session.address = address + session.add_distributed(remote) + + # must be in configuration state for nodes to start, when using "node_add" below + session.set_state(EventTypes.CONFIGURATION_STATE) + + # create local node, switch, and remote nodes + switch_one = session.add_node(_type=NodeTypes.SWITCH) + switch_two = session.add_node(_type=NodeTypes.SWITCH) + + # create not interfaces and link + session.add_link(switch_one.id, switch_two.id) + + # instantiate session + session.instantiate() + + # pause script for verification + pdb.set_trace() + + # shutdown session + coreemu.shutdown() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main()