updates for basic working distrbuted network using fabric

This commit is contained in:
Blake Harnden 2019-10-08 15:09:26 -07:00
parent 212fec916b
commit b7b0e4222c
8 changed files with 261 additions and 96 deletions

View file

@ -0,0 +1,27 @@
import logging
from core.errors import CoreCommandError
def remote_cmd(server, cmd, env=None):
"""
Run command remotely using server connection.
:param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
:param str cmd: command to run
:param dict env: environment for remote command, default is None
:return: stdout when success
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
"""
logging.info("remote cmd server(%s): %s", server, cmd)
if env is None:
result = server.run(cmd, hide=False)
else:
result = server.run(cmd, hide=False, env=env, replace_env=True)
if result.exited:
raise CoreCommandError(
result.exited, result.command, result.stdout, result.stderr
)
return result.stdout.strip()

View file

@ -37,9 +37,11 @@ from core.location.event import EventLoop
from core.location.mobility import MobilityManager from core.location.mobility import MobilityManager
from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase
from core.nodes.docker import DockerNode from core.nodes.docker import DockerNode
from core.nodes.ipaddress import MacAddress from core.nodes.interface import GreTap
from core.nodes.ipaddress import IpAddress, MacAddress
from core.nodes.lxd import LxcNode from core.nodes.lxd import LxcNode
from core.nodes.network import ( from core.nodes.network import (
CoreNetwork,
CtrlNet, CtrlNet,
GreTapBridge, GreTapBridge,
HubNode, HubNode,
@ -148,6 +150,8 @@ class Session(object):
# distributed servers # distributed servers
self.servers = {} self.servers = {}
self.tunnels = {}
self.address = None
# initialize default node services # initialize default node services
self.services.default_services = { self.services.default_services = {
@ -161,19 +165,81 @@ class Session(object):
def add_distributed(self, server): def add_distributed(self, server):
conn = Connection(server, user="root") conn = Connection(server, user="root")
self.servers[server] = conn self.servers[server] = conn
cmd = "mkdir -p %s" % self.session_dir
def init_distributed(self): conn.run(cmd, hide=False)
for server in self.servers:
conn = self.servers[server]
cmd = "mkdir -p %s" % self.session_dir
conn.run(cmd, hide=False)
def shutdown_distributed(self): def shutdown_distributed(self):
# shutdown all tunnels
for key in self.tunnels:
tunnels = self.tunnels[key]
for tunnel in tunnels:
tunnel.shutdown()
# remove all remote session directories
for server in self.servers: for server in self.servers:
conn = self.servers[server] conn = self.servers[server]
cmd = "rm -rf %s" % self.session_dir cmd = "rm -rf %s" % self.session_dir
conn.run(cmd, hide=False) conn.run(cmd, hide=False)
# clear tunnels
self.tunnels.clear()
def initialize_distributed(self):
for node_id in self.nodes:
node = self.nodes[node_id]
if not isinstance(node, CoreNetwork):
continue
if isinstance(node, CtrlNet) and node.serverintf is not None:
continue
for server in self.servers:
conn = self.servers[server]
key = self.tunnelkey(node_id, IpAddress.to_int(server))
# local to server
logging.info(
"local tunnel node(%s) to remote(%s) key(%s)",
node.name,
server,
key,
)
local_tap = GreTap(session=self, remoteip=server, key=key)
local_tap.net_client.create_interface(node.brname, local_tap.localname)
# server to local
logging.info(
"remote tunnel node(%s) to local(%s) key(%s)",
node.name,
self.address,
key,
)
remote_tap = GreTap(
session=self, remoteip=self.address, key=key, server=conn
)
remote_tap.net_client.create_interface(
node.brname, remote_tap.localname
)
# save tunnels for shutdown
self.tunnels[key] = [local_tap, remote_tap]
def tunnelkey(self, n1num, n2num):
"""
Compute a 32-bit key used to uniquely identify a GRE tunnel.
The hash(n1num), hash(n2num) values are used, so node numbers may be
None or string values (used for e.g. "ctrlnet").
:param int n1num: node one id
:param int n2num: node two id
:return: tunnel key for the node pair
:rtype: int
"""
logging.debug("creating tunnel key for: %s, %s", n1num, n2num)
key = (self.id << 16) ^ utils.hashkey(n1num) ^ (utils.hashkey(n2num) << 8)
return key & 0xFFFFFFFF
@classmethod @classmethod
def get_node_class(cls, _type): def get_node_class(cls, _type):
""" """
@ -1493,6 +1559,9 @@ class Session(object):
self.add_remove_control_interface(node=None, remove=False) self.add_remove_control_interface(node=None, remove=False)
self.broker.startup() self.broker.startup()
# initialize distributed tunnels
self.initialize_distributed()
# instantiate will be invoked again upon Emane configure # instantiate will be invoked again upon Emane configure
if self.emane.startup() == self.emane.NOT_READY: if self.emane.startup() == self.emane.NOT_READY:
return return

View file

@ -14,6 +14,7 @@ from socket import AF_INET, AF_INET6
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from core import constants, utils from core import constants, utils
from core.emulator import distributed
from core.emulator.data import LinkData, NodeData from core.emulator.data import LinkData, NodeData
from core.emulator.enumerations import LinkTypes, NodeTypes from core.emulator.enumerations import LinkTypes, NodeTypes
from core.errors import CoreCommandError from core.errors import CoreCommandError
@ -95,39 +96,7 @@ class NodeBase(object):
:rtype: str :rtype: str
:raises CoreCommandError: when a non-zero exit status occurs :raises CoreCommandError: when a non-zero exit status occurs
""" """
logging.info("net cmd server(%s): %s", self.server, args) raise NotImplementedError
if self.server is None:
return utils.check_cmd(args, env=env)
else:
args = " ".join(args)
return self.remote_cmd(args, env=env)
def remote_cmd(self, cmd, env=None):
"""
Run command remotely using server connection.
:param str cmd: command to run
:param dict env: environment for remote command, default is None
:return: stdout when success
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
"""
if env is None:
result = self.server.run(cmd, hide=False)
else:
logging.info("command env: %s", env)
result = self.server.run(cmd, hide=False, env=env, replace_env=True)
if result.exited:
raise CoreCommandError(
result.exited, result.command, result.stdout, result.stderr
)
logging.info(
"fabric result:\n\tstdout: %s\n\tstderr: %s",
result.stdout.strip(),
result.stderr.strip(),
)
return result.stdout.strip()
def setposition(self, x=None, y=None, z=None): def setposition(self, x=None, y=None, z=None):
""" """
@ -279,7 +248,8 @@ class CoreNodeBase(NodeBase):
:param int _id: object id :param int _id: object id
:param str name: object name :param str name: object name
:param bool start: boolean for starting :param bool start: boolean for starting
:param str server: remote server node will run on, default is None for localhost :param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
""" """
super(CoreNodeBase, self).__init__(session, _id, name, start, server) super(CoreNodeBase, self).__init__(session, _id, name, start, server)
self.services = [] self.services = []
@ -412,6 +382,23 @@ class CoreNodeBase(NodeBase):
return common return common
def net_cmd(self, args, env=None):
"""
Runs a command that is used to configure and setup the network on the host
system.
:param list[str]|str args: command to run
:param dict env: environment to run command with
:return: combined stdout and stderr
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
"""
if self.server is None:
return utils.check_cmd(args, env=env)
else:
args = " ".join(args)
return distributed.remote_cmd(self.server, args, env=env)
def node_net_cmd(self, args): def node_net_cmd(self, args):
""" """
Runs a command that is used to configure and setup the network within a Runs a command that is used to configure and setup the network within a
@ -493,7 +480,8 @@ class CoreNode(CoreNodeBase):
:param str nodedir: node directory :param str nodedir: node directory
:param str bootsh: boot shell to use :param str bootsh: boot shell to use
:param bool start: start flag :param bool start: start flag
:param str server: remote server node will run on, default is None for localhost :param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
""" """
super(CoreNode, self).__init__(session, _id, name, start, server) super(CoreNode, self).__init__(session, _id, name, start, server)
self.nodedir = nodedir self.nodedir = nodedir
@ -653,13 +641,13 @@ class CoreNode(CoreNodeBase):
:rtype: str :rtype: str
:raises CoreCommandError: when a non-zero exit status occurs :raises CoreCommandError: when a non-zero exit status occurs
""" """
logging.info("net cmd server(%s): %s", self.server, args)
if self.server is None: if self.server is None:
logging.info("node(%s) cmd: %s", self.name, args)
return self.check_cmd(args) return self.check_cmd(args)
else: else:
args = self.client._cmd_args() + args args = self.client._cmd_args() + args
args = " ".join(args) args = " ".join(args)
return self.remote_cmd(args) return distributed.remote_cmd(self.server, args)
def check_cmd(self, args): def check_cmd(self, args):
""" """
@ -753,7 +741,11 @@ class CoreNode(CoreNodeBase):
raise ValueError("interface name (%s) too long" % name) raise ValueError("interface name (%s) too long" % name)
veth = Veth( veth = Veth(
node=self, name=name, localname=localname, net=net, start=self.up node=self,
name=name,
localname=localname,
start=self.up,
server=self.server,
) )
if self.up: if self.up:
@ -806,9 +798,7 @@ class CoreNode(CoreNodeBase):
sessionid = self.session.short_session_id() sessionid = self.session.short_session_id()
localname = "tap%s.%s.%s" % (self.id, ifindex, sessionid) localname = "tap%s.%s.%s" % (self.id, ifindex, sessionid)
name = ifname name = ifname
tuntap = TunTap( tuntap = TunTap(node=self, name=name, localname=localname, start=self.up)
node=self, name=name, localname=localname, net=net, start=self.up
)
try: try:
self.addnetif(tuntap, ifindex) self.addnetif(tuntap, ifindex)
@ -1057,7 +1047,8 @@ class CoreNetworkBase(NodeBase):
:param int _id: object id :param int _id: object id
:param str name: object name :param str name: object name
:param bool start: should object start :param bool start: should object start
:param str server: remote server node will run on, default is None for localhost :param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
""" """
super(CoreNetworkBase, self).__init__(session, _id, name, start, server) super(CoreNetworkBase, self).__init__(session, _id, name, start, server)
self._linked = {} self._linked = {}

View file

@ -7,6 +7,7 @@ import time
from builtins import int, range from builtins import int, range
from core import utils from core import utils
from core.emulator import distributed
from core.errors import CoreCommandError from core.errors import CoreCommandError
from core.nodes.netclient import LinuxNetClient from core.nodes.netclient import LinuxNetClient
@ -16,13 +17,15 @@ class CoreInterface(object):
Base class for network interfaces. Base class for network interfaces.
""" """
def __init__(self, node, name, mtu): def __init__(self, node, name, mtu, server=None):
""" """
Creates a PyCoreNetIf instance. Creates a PyCoreNetIf instance.
:param core.nodes.base.CoreNode node: node for interface :param core.nodes.base.CoreNode node: node for interface
:param str name: interface name :param str name: interface name
:param mtu: mtu value :param mtu: mtu value
:param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
""" """
self.node = node self.node = node
@ -42,7 +45,15 @@ class CoreInterface(object):
self.netindex = None self.netindex = None
# index used to find flow data # index used to find flow data
self.flow_id = None self.flow_id = None
self.net_client = LinuxNetClient(utils.check_cmd) self.server = server
self.net_client = LinuxNetClient(self.net_cmd)
def net_cmd(self, args):
if self.server is None:
return utils.check_cmd(args)
else:
args = " ".join(args)
return distributed.remote_cmd(self.server, args)
def startup(self): def startup(self):
""" """
@ -191,8 +202,7 @@ class Veth(CoreInterface):
Provides virtual ethernet functionality for core nodes. Provides virtual ethernet functionality for core nodes.
""" """
# TODO: network is not used, why was it needed? def __init__(self, node, name, localname, mtu=1500, server=None, start=True):
def __init__(self, node, name, localname, mtu=1500, net=None, start=True):
""" """
Creates a VEth instance. Creates a VEth instance.
@ -200,12 +210,13 @@ class Veth(CoreInterface):
:param str name: interface name :param str name: interface name
:param str localname: interface local name :param str localname: interface local name
:param mtu: interface mtu :param mtu: interface mtu
:param net: network :param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
:param bool start: start flag :param bool start: start flag
:raises CoreCommandError: when there is a command exception :raises CoreCommandError: when there is a command exception
""" """
# note that net arg is ignored # note that net arg is ignored
CoreInterface.__init__(self, node=node, name=name, mtu=mtu) CoreInterface.__init__(self, node, name, mtu, server)
self.localname = localname self.localname = localname
self.up = False self.up = False
if start: if start:
@ -251,8 +262,7 @@ class TunTap(CoreInterface):
TUN/TAP virtual device in TAP mode TUN/TAP virtual device in TAP mode
""" """
# TODO: network is not used, why was it needed? def __init__(self, node, name, localname, mtu=1500, server=None, start=True):
def __init__(self, node, name, localname, mtu=1500, net=None, start=True):
""" """
Create a TunTap instance. Create a TunTap instance.
@ -260,10 +270,11 @@ class TunTap(CoreInterface):
:param str name: interface name :param str name: interface name
:param str localname: local interface name :param str localname: local interface name
:param mtu: interface mtu :param mtu: interface mtu
:param core.nodes.base.CoreNetworkBase net: related network :param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
:param bool start: start flag :param bool start: start flag
""" """
CoreInterface.__init__(self, node=node, name=name, mtu=mtu) CoreInterface.__init__(self, node, name, mtu, server)
self.localname = localname self.localname = localname
self.up = False self.up = False
self.transport_type = "virtual" self.transport_type = "virtual"
@ -427,6 +438,7 @@ class GreTap(CoreInterface):
ttl=255, ttl=255,
key=None, key=None,
start=True, start=True,
server=None,
): ):
""" """
Creates a GreTap instance. Creates a GreTap instance.
@ -441,9 +453,11 @@ class GreTap(CoreInterface):
:param ttl: ttl value :param ttl: ttl value
:param key: gre tap key :param key: gre tap key
:param bool start: start flag :param bool start: start flag
:param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
:raises CoreCommandError: when there is a command exception :raises CoreCommandError: when there is a command exception
""" """
CoreInterface.__init__(self, node=node, name=name, mtu=mtu) CoreInterface.__init__(self, node, name, mtu, server)
self.session = session self.session = session
if _id is None: if _id is None:
# from PyCoreObj # from PyCoreObj
@ -460,9 +474,13 @@ class GreTap(CoreInterface):
if remoteip is None: if remoteip is None:
raise ValueError("missing remote IP required for GRE TAP device") raise ValueError("missing remote IP required for GRE TAP device")
self.net_client.create_gretap( if localip is not None:
self.localname, str(remoteip), str(localip), str(ttl), str(key) localip = str(localip)
) if ttl is not None:
ttl = str(ttl)
if key is not None:
key = str(key)
self.net_client.create_gretap(self.localname, str(remoteip), localip, ttl, key)
self.net_client.device_up(self.localname) self.net_client.device_up(self.localname)
self.up = True self.up = True

View file

@ -10,6 +10,7 @@ import time
from socket import AF_INET, AF_INET6 from socket import AF_INET, AF_INET6
from core import constants, utils from core import constants, utils
from core.emulator import distributed
from core.emulator.data import LinkData from core.emulator.data import LinkData
from core.emulator.enumerations import LinkTypes, NodeTypes, RegisterTlvs from core.emulator.enumerations import LinkTypes, NodeTypes, RegisterTlvs
from core.errors import CoreCommandError, CoreError from core.errors import CoreCommandError, CoreError
@ -291,7 +292,8 @@ class CoreNetwork(CoreNetworkBase):
:param int _id: object id :param int _id: object id
:param str name: object name :param str name: object name
:param bool start: start flag :param bool start: start flag
:param str server: remote server node will run on, default is None for localhost :param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
:param policy: network policy :param policy: network policy
""" """
CoreNetworkBase.__init__(self, session, _id, name, start, server) CoreNetworkBase.__init__(self, session, _id, name, start, server)
@ -307,6 +309,27 @@ class CoreNetwork(CoreNetworkBase):
self.startup() self.startup()
ebq.startupdateloop(self) ebq.startupdateloop(self)
def net_cmd(self, args, env=None):
"""
Runs a command that is used to configure and setup the network on the host
system.
:param list[str]|str args: command to run
:param dict env: environment to run command with
:return: combined stdout and stderr
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
"""
logging.info("network node(%s) cmd", self.name)
output = utils.check_cmd(args, env=env)
args = " ".join(args)
for server in self.session.servers:
conn = self.session.servers[server]
distributed.remote_cmd(conn, args, env=env)
return output
def startup(self): def startup(self):
""" """
Linux bridge starup logic. Linux bridge starup logic.
@ -381,11 +404,11 @@ class CoreNetwork(CoreNetworkBase):
""" """
Attach a network interface. Attach a network interface.
:param core.netns.vnode.VEth netif: network interface to attach :param core.nodes.interface.Veth netif: network interface to attach
:return: nothing :return: nothing
""" """
if self.up: if self.up:
self.net_client.create_interface(self.brname, netif.localname) netif.net_client.create_interface(self.brname, netif.localname)
CoreNetworkBase.attach(self, netif) CoreNetworkBase.attach(self, netif)
@ -397,7 +420,7 @@ class CoreNetwork(CoreNetworkBase):
:return: nothing :return: nothing
""" """
if self.up: if self.up:
self.net_client.delete_interface(self.brname, netif.localname) netif.net_client.delete_interface(self.brname, netif.localname)
CoreNetworkBase.detach(self, netif) CoreNetworkBase.detach(self, netif)
@ -591,13 +614,11 @@ class CoreNetwork(CoreNetworkBase):
if len(name) >= 16: if len(name) >= 16:
raise ValueError("interface name %s too long" % name) raise ValueError("interface name %s too long" % name)
netif = Veth( netif = Veth(node=None, name=name, localname=localname, mtu=1500, start=self.up)
node=None, name=name, localname=localname, mtu=1500, net=self, start=self.up
)
self.attach(netif) self.attach(netif)
if net.up: if net.up:
# this is similar to net.attach() but uses netif.name instead of localname # this is similar to net.attach() but uses netif.name instead of localname
self.net_client.create_interface(net.brname, netif.name) netif.net_client.create_interface(net.brname, netif.name)
i = net.newifindex() i = net.newifindex()
net._netif[i] = netif net._netif[i] = netif
with net._linked_lock: with net._linked_lock:
@ -666,6 +687,8 @@ class GreTapBridge(CoreNetwork):
:param ttl: ttl value :param ttl: ttl value
:param key: gre tap key :param key: gre tap key
:param bool start: start flag :param bool start: start flag
:param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
""" """
CoreNetwork.__init__(self, session, _id, name, False, server, policy) CoreNetwork.__init__(self, session, _id, name, False, server, policy)
self.grekey = key self.grekey = key

View file

@ -263,7 +263,7 @@ def check_cmd(args, **kwargs):
kwargs["stdout"] = subprocess.PIPE kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT kwargs["stderr"] = subprocess.STDOUT
args = split_args(args) args = split_args(args)
logging.debug("command: %s", args) logging.info("command: %s", args)
try: try:
p = subprocess.Popen(args, **kwargs) p = subprocess.Popen(args, **kwargs)
stdout, _ = p.communicate() stdout, _ = p.communicate()

View file

@ -1,51 +1,46 @@
import logging import logging
import pdb import pdb
import sys
from core.emulator.coreemu import CoreEmu from core.emulator.coreemu import CoreEmu
from core.emulator.emudata import NodeOptions from core.emulator.emudata import IpPrefixes, NodeOptions
from core.emulator.enumerations import EventTypes from core.emulator.enumerations import EventTypes, NodeTypes
def main(): def main():
# ip generator for example # ip generator for example
# prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16") prefixes = IpPrefixes(ip4_prefix="10.83.0.0/16")
# create emulator instance for creating sessions and utility methods # create emulator instance for creating sessions and utility methods
coreemu = CoreEmu() coreemu = CoreEmu()
session = coreemu.create_session() session = coreemu.create_session()
# initialize distributed # initialize distributed
session.add_distributed("core2") address = sys.argv[1]
session.init_distributed() remote = sys.argv[2]
session.address = address
session.add_distributed(remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)
# create switch network node # create local node, switch, and remote nodes
# switch = session.add_node(_type=NodeTypes.SWITCH) node_one = session.add_node()
switch = session.add_node(_type=NodeTypes.SWITCH)
# create nodes
options = NodeOptions() options = NodeOptions()
options.emulation_server = "core2" options.emulation_server = remote
session.add_node(node_options=options) node_two = session.add_node(node_options=options)
# interface = prefixes.create_interface(node_one)
# session.add_link(node_one.id, switch.id, interface_one=interface)
session.add_node() # create not interfaces and link
# interface = prefixes.create_interface(node_two) interface_one = prefixes.create_interface(node_one)
# session.add_link(node_two.id, switch.id, interface_one=interface) interface_two = prefixes.create_interface(node_two)
session.add_link(node_one.id, switch.id, interface_one=interface_one)
session.add_link(node_two.id, switch.id, interface_one=interface_two)
# instantiate session # instantiate session
session.instantiate() session.instantiate()
# print("starting iperf server on node: %s" % node_one.name) # pause script for verification
# node_one.cmd(["iperf", "-s", "-D"])
# node_one_address = prefixes.ip4_address(node_one)
#
# print("node %s connecting to %s" % (node_two.name, node_one_address))
# node_two.client.icmd(["iperf", "-t", "10", "-c", node_one_address])
# node_one.cmd(["killall", "-9", "iperf"])
pdb.set_trace() pdb.set_trace()
# shutdown session # shutdown session

View file

@ -0,0 +1,42 @@
import logging
import pdb
import sys
from core.emulator.coreemu import CoreEmu
from core.emulator.enumerations import EventTypes, NodeTypes
def main():
# create emulator instance for creating sessions and utility methods
coreemu = CoreEmu()
session = coreemu.create_session()
# initialize distributed
address = sys.argv[1]
remote = sys.argv[2]
session.address = address
session.add_distributed(remote)
# must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE)
# create local node, switch, and remote nodes
switch_one = session.add_node(_type=NodeTypes.SWITCH)
switch_two = session.add_node(_type=NodeTypes.SWITCH)
# create not interfaces and link
session.add_link(switch_one.id, switch_two.id)
# instantiate session
session.instantiate()
# pause script for verification
pdb.set_trace()
# shutdown session
coreemu.shutdown()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()