diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index 7b2a03b9..f5625232 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -14,6 +14,8 @@ import threading import time from multiprocessing.pool import ThreadPool +from fabric import Connection + from core import constants, utils from core.api.tlv import coreapi from core.api.tlv.broker import CoreBroker @@ -144,6 +146,9 @@ class Session(object): self.emane = EmaneManager(session=self) self.sdt = Sdt(session=self) + # distributed servers + self.servers = set() + # initialize default node services self.services.default_services = { "mdr": ("zebra", "OSPFv3MDR", "IPForward"), @@ -153,6 +158,11 @@ class Session(object): "host": ("DefaultRoute", "SSH"), } + def init_distributed(self): + for server in self.servers: + cmd = "mkdir -p %s" % self.session_dir + Connection(server, user="root").run(cmd, hide=False) + @classmethod def get_node_class(cls, _type): """ @@ -683,7 +693,13 @@ class Session(object): image=node_options.image, ) else: - node = self.create_node(cls=node_class, _id=_id, name=name, start=start) + node = self.create_node( + cls=node_class, + _id=_id, + name=name, + start=start, + server=node_options.emulation_server, + ) # set node attributes node.icon = node_options.icon diff --git a/daemon/core/nodes/base.py b/daemon/core/nodes/base.py index 7efad49e..b08c7cd4 100644 --- a/daemon/core/nodes/base.py +++ b/daemon/core/nodes/base.py @@ -14,13 +14,15 @@ import threading from builtins import range from socket import AF_INET, AF_INET6 +from fabric import Connection + from core import constants, utils from core.emulator.data import LinkData, NodeData from core.emulator.enumerations import LinkTypes, NodeTypes +from core.errors import CoreCommandError from core.nodes import client, ipaddress from core.nodes.interface import CoreInterface, TunTap, Veth from core.nodes.netclient import LinuxNetClient, OvsNetClient -from fabric import Connection _DEFAULT_MTU = 1500 @@ -33,7 +35,7 @@ class NodeBase(object): apitype = None # TODO: appears start has no usage, verify and remove - def __init__(self, session, _id=None, name=None, start=True): + def __init__(self, session, _id=None, name=None, start=True, server=None): """ Creates a PyCoreObj instance. @@ -41,7 +43,7 @@ class NodeBase(object): :param int _id: id :param str name: object name :param bool start: start value - :return: + :param str server: remote server node will run on, default is None for localhost """ self.session = session @@ -51,8 +53,11 @@ class NodeBase(object): if name is None: name = "o%s" % self.id self.name = name + self.server = server + if self.server is not None: + self.server_conn = Connection(self.server, user="root") + self.type = None - self.server = None self.services = None # ifindex is key, CoreInterface instance is value self._netif = {} @@ -94,12 +99,23 @@ class NodeBase(object): :rtype: str :raises CoreCommandError: when a non-zero exit status occurs """ + logging.info("net cmd server(%s): %s", self.server, args) if self.server is None: return utils.check_cmd(args, env=env) else: args = " ".join(args) - result = Connection(self.server, user="root").run(args, hide=True) - return result.stderr + result = self.server_conn.run(args, hide=False) + if result.exited: + raise CoreCommandError( + result.exited, result.command, result.stdout, result.stderr + ) + + logging.info( + "fabric result:\n\tstdout: %s\n\tstderr: %s", + result.stdout.strip(), + result.stderr.strip(), + ) + return result.stdout.strip() def setposition(self, x=None, y=None, z=None): """ @@ -243,7 +259,7 @@ class CoreNodeBase(NodeBase): Base class for CORE nodes. """ - def __init__(self, session, _id=None, name=None, start=True): + def __init__(self, session, _id=None, name=None, start=True, server=None): """ Create a CoreNodeBase instance. @@ -251,8 +267,9 @@ class CoreNodeBase(NodeBase): :param int _id: object id :param str name: object name :param bool start: boolean for starting + :param str server: remote server node will run on, default is None for localhost """ - super(CoreNodeBase, self).__init__(session, _id, name, start=start) + super(CoreNodeBase, self).__init__(session, _id, name, start, server) self.services = [] self.nodedir = None self.tmpnodedir = False @@ -265,7 +282,7 @@ class CoreNodeBase(NodeBase): """ if self.nodedir is None: self.nodedir = os.path.join(self.session.session_dir, self.name + ".conf") - os.makedirs(self.nodedir) + self.net_cmd(["mkdir", "-p", self.nodedir]) self.tmpnodedir = True else: self.tmpnodedir = False @@ -446,7 +463,14 @@ class CoreNode(CoreNodeBase): valid_address_types = {"inet", "inet6", "inet6link"} def __init__( - self, session, _id=None, name=None, nodedir=None, bootsh="boot.sh", start=True + self, + session, + _id=None, + name=None, + nodedir=None, + bootsh="boot.sh", + start=True, + server=None, ): """ Create a CoreNode instance. @@ -457,8 +481,9 @@ class CoreNode(CoreNodeBase): :param str nodedir: node directory :param str bootsh: boot shell to use :param bool start: start flag + :param str server: remote server node will run on, default is None for localhost """ - super(CoreNode, self).__init__(session, _id, name, start) + super(CoreNode, self).__init__(session, _id, name, start, server) self.nodedir = nodedir self.ctrlchnlname = os.path.abspath( os.path.join(self.session.session_dir, self.name) @@ -619,7 +644,24 @@ class CoreNode(CoreNodeBase): :rtype: str :raises CoreCommandError: when a non-zero exit status occurs """ - return self.check_cmd(args) + logging.info("net cmd server(%s): %s", self.server, args) + if self.server is None: + return self.check_cmd(args) + else: + args = self.client._cmd_args() + args + args = " ".join(args) + result = self.server_conn.run(args, hide=False) + if result.exited: + raise CoreCommandError( + result.exited, result.command, result.stdout, result.stderr + ) + + logging.info( + "fabric result:\n\tstdout: %s\n\tstderr: %s", + result.stdout.strip(), + result.stderr.strip(), + ) + return result.stdout.strip() def check_cmd(self, args): """ @@ -653,7 +695,7 @@ class CoreNode(CoreNodeBase): hostpath = os.path.join( self.nodedir, os.path.normpath(path).strip("/").replace("/", ".") ) - os.mkdir(hostpath) + self.net_cmd(["mkdir", "-p", hostpath]) self.mount(hostpath, path) def mount(self, source, target): diff --git a/daemon/examples/python/distributed.py b/daemon/examples/python/distributed.py new file mode 100644 index 00000000..bed75a47 --- /dev/null +++ b/daemon/examples/python/distributed.py @@ -0,0 +1,55 @@ +import logging + +from core.emulator.coreemu import CoreEmu +from core.emulator.emudata import NodeOptions +from core.emulator.enumerations import EventTypes + + +def main(): + # ip generator for example + # prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") + + # create emulator instance for creating sessions and utility methods + coreemu = CoreEmu() + session = coreemu.create_session() + + # initialize distributed + session.servers.add("core2") + session.init_distributed() + + # must be in configuration state for nodes to start, when using "node_add" below + session.set_state(EventTypes.CONFIGURATION_STATE) + + # create switch network node + # switch = session.add_node(_type=NodeTypes.SWITCH) + + # create nodes + options = NodeOptions() + options.emulation_server = "10.10.4.38" + options.emulation_server = "core2" + session.add_node(node_options=options) + # interface = prefixes.create_interface(node_one) + # session.add_link(node_one.id, switch.id, interface_one=interface) + + # node_two = session.add_node() + # interface = prefixes.create_interface(node_two) + # session.add_link(node_two.id, switch.id, interface_one=interface) + + # instantiate session + session.instantiate() + + # print("starting iperf server on node: %s" % node_one.name) + # node_one.cmd(["iperf", "-s", "-D"]) + # node_one_address = prefixes.ip4_address(node_one) + # + # print("node %s connecting to %s" % (node_two.name, node_one_address)) + # node_two.client.icmd(["iperf", "-t", "10", "-c", node_one_address]) + # node_one.cmd(["killall", "-9", "iperf"]) + + # shutdown session + coreemu.shutdown() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main()