diff --git a/daemon/core/emulator/distributed.py b/daemon/core/emulator/distributed.py index 2b4830ad..6faa852a 100644 --- a/daemon/core/emulator/distributed.py +++ b/daemon/core/emulator/distributed.py @@ -15,6 +15,7 @@ from fabric import Connection from invoke import UnexpectedExit from core import utils +from core.emulator.links import CoreLink from core.errors import CoreCommandError, CoreError from core.executables import get_requirements from core.nodes.interface import GreTap @@ -183,21 +184,36 @@ class DistributedController: def start(self) -> None: """ - Start distributed network tunnels. + Start distributed network tunnels for control networks. :return: nothing """ mtu = self.session.options.get_config_int("mtu") for node_id in self.session.nodes: node = self.session.nodes[node_id] - if not isinstance(node, CoreNetwork): - continue - if isinstance(node, CtrlNet) and node.serverintf is not None: + if not isinstance(node, CtrlNet) or node.serverintf is not None: continue for name in self.servers: server = self.servers[name] self.create_gre_tunnel(node, server, mtu, True) + def create_gre_tunnels(self, core_link: CoreLink) -> None: + """ + Creates gre tunnels for a core link with a ptp network connection. + + :param core_link: core link to create gre tunnel for + :return: nothing + """ + if not self.servers: + return + if not core_link.ptp: + raise CoreError( + "attempted to create gre tunnel for core link without a ptp network" + ) + mtu = self.session.options.get_config_int("mtu") + for server in self.servers.values(): + self.create_gre_tunnel(core_link.ptp, server, mtu, True) + def create_gre_tunnel( self, node: CoreNetwork, server: DistributedServer, mtu: int, start: bool ) -> Tuple[GreTap, GreTap]: diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index 9525f7ca..929d5fac 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -321,6 +321,9 @@ class Session: # track link core_link = CoreLink(node1, iface1, node2, iface2, ptp) self.link_manager.add(core_link) + # setup link for gre tunnels if needed + if ptp.up: + self.distributed.create_gre_tunnels(core_link) return iface1, iface2 def delete_link( diff --git a/daemon/core/nodes/base.py b/daemon/core/nodes/base.py index d7c2c02f..6fbfb612 100644 --- a/daemon/core/nodes/base.py +++ b/daemon/core/nodes/base.py @@ -776,8 +776,9 @@ class CoreNode(CoreNodeBase): iface.flow_id = self.node_net_client.get_ifindex(iface.name) logger.debug("interface flow index: %s - %s", iface.name, iface.flow_id) # set mac address - self.node_net_client.device_mac(iface.name, str(iface.mac)) - logger.debug("interface mac: %s - %s", iface.name, iface.mac) + if iface.mac: + self.node_net_client.device_mac(iface.name, str(iface.mac)) + logger.debug("interface mac: %s - %s", iface.name, iface.mac) # set all addresses for ip in iface.ips(): # ipv4 check @@ -827,10 +828,10 @@ class CoreNetworkBase(NodeBase): :param iface: network interface to attach :return: nothing """ - i = self.next_iface_id() - self.ifaces[i] = iface + iface_id = self.next_iface_id() + self.ifaces[iface_id] = iface iface.net = self - iface.net_id = i + iface.net_id = iface_id with self.linked_lock: self.linked[iface] = {} diff --git a/daemon/core/nodes/physical.py b/daemon/core/nodes/physical.py index 908053d5..ae1b07af 100644 --- a/daemon/core/nodes/physical.py +++ b/daemon/core/nodes/physical.py @@ -3,17 +3,18 @@ PhysicalNode class for including real systems in the emulated network. """ import logging -import threading from pathlib import Path from typing import TYPE_CHECKING, List, Optional, Tuple +import netaddr + from core.emulator.data import InterfaceData, LinkOptions from core.emulator.distributed import DistributedServer from core.emulator.enumerations import NodeTypes, TransportType from core.errors import CoreCommandError, CoreError -from core.executables import MOUNT, TEST, UMOUNT -from core.nodes.base import CoreNetworkBase, CoreNodeBase -from core.nodes.interface import DEFAULT_MTU, CoreInterface +from core.executables import BASH, TEST, UMOUNT +from core.nodes.base import CoreNode, CoreNodeBase +from core.nodes.interface import CoreInterface logger = logging.getLogger(__name__) @@ -21,185 +22,6 @@ if TYPE_CHECKING: from core.emulator.session import Session -class PhysicalNode(CoreNodeBase): - def __init__( - self, - session: "Session", - _id: int = None, - name: str = None, - directory: Path = None, - server: DistributedServer = None, - ) -> None: - super().__init__(session, _id, name, server) - if not self.server: - raise CoreError("physical nodes must be assigned to a remote server") - self.directory: Optional[Path] = directory - self.lock: threading.RLock = threading.RLock() - self._mounts: List[Tuple[Path, Path]] = [] - - def startup(self) -> None: - with self.lock: - self.makenodedir() - self.up = True - - def shutdown(self) -> None: - if not self.up: - return - with self.lock: - while self._mounts: - _, target_path = self._mounts.pop(-1) - self.umount(target_path) - for iface in self.get_ifaces(): - iface.shutdown() - self.rmnodedir() - - def path_exists(self, path: str) -> bool: - """ - Determines if a file or directory path exists. - - :param path: path to file or directory - :return: True if path exists, False otherwise - """ - try: - self.host_cmd(f"{TEST} -e {path}") - return True - except CoreCommandError: - return False - - def termcmdstring(self, sh: str = "/bin/sh") -> str: - """ - Create a terminal command string. - - :param sh: shell to execute command in - :return: str - """ - return sh - - def set_mac(self, iface_id: int, mac: str) -> None: - """ - Set mac address for an interface. - - :param iface_id: index of interface to set hardware address for - :param mac: mac address to set - :return: nothing - :raises CoreCommandError: when a non-zero exit status occurs - """ - iface = self.get_iface(iface_id) - iface.set_mac(mac) - if self.up: - self.net_client.device_mac(iface.name, str(iface.mac)) - - def add_ip(self, iface_id: int, ip: str) -> None: - """ - Add an ip address to an interface in the format "10.0.0.1/24". - - :param iface_id: id of interface to add address to - :param ip: address to add to interface - :return: nothing - :raises CoreError: when ip address provided is invalid - :raises CoreCommandError: when a non-zero exit status occurs - """ - iface = self.get_iface(iface_id) - iface.add_ip(ip) - if self.up: - self.net_client.create_address(iface.name, ip) - - def remove_ip(self, iface_id: int, ip: str) -> None: - """ - Remove an ip address from an interface in the format "10.0.0.1/24". - - :param iface_id: id of interface to delete address from - :param ip: ip address to remove from interface - :return: nothing - :raises CoreError: when ip address provided is invalid - :raises CoreCommandError: when a non-zero exit status occurs - """ - iface = self.get_iface(iface_id) - iface.remove_ip(ip) - if self.up: - self.net_client.delete_address(iface.name, ip) - - def adopt_iface( - self, iface: CoreInterface, iface_id: int, mac: str, ips: List[str] - ) -> None: - """ - When a link message is received linking this node to another part of - the emulation, no new interface is created; instead, adopt the - GreTap interface as the node interface. - """ - iface.name = f"gt{iface_id}" - iface.node = self - self.add_iface(iface, iface_id) - # use a more reasonable name, e.g. "gt0" instead of "gt.56286.150" - if self.up: - self.net_client.device_down(iface.localname) - self.net_client.device_name(iface.localname, iface.name) - iface.localname = iface.name - if mac: - self.set_mac(iface_id, mac) - for ip in ips: - self.add_ip(iface_id, ip) - if self.up: - self.net_client.device_up(iface.localname) - - def next_iface_id(self) -> int: - with self.lock: - while self.iface_id in self.ifaces: - self.iface_id += 1 - iface_id = self.iface_id - self.iface_id += 1 - return iface_id - - def new_iface( - self, net: CoreNetworkBase, iface_data: InterfaceData - ) -> CoreInterface: - logger.info("creating interface") - ips = iface_data.get_ips() - iface_id = iface_data.id - if iface_id is None: - iface_id = self.next_iface_id() - name = iface_data.name - if name is None: - name = f"gt{iface_id}" - _, remote_tap = self.session.distributed.create_gre_tunnel( - net, self.server, iface_data.mtu, self.up - ) - self.adopt_iface(remote_tap, iface_id, iface_data.mac, ips) - return remote_tap - - def privatedir(self, dir_path: Path) -> None: - if not str(dir_path).startswith("/"): - raise CoreError(f"private directory path not fully qualified: {dir_path}") - host_path = self.host_path(dir_path, is_dir=True) - self.host_cmd(f"mkdir -p {host_path}") - self.mount(host_path, dir_path) - - def mount(self, src_path: Path, target_path: Path) -> None: - logger.debug("node(%s) mounting: %s at %s", self.name, src_path, target_path) - self.cmd(f"mkdir -p {target_path}") - self.host_cmd(f"{MOUNT} --bind {src_path} {target_path}", cwd=self.directory) - self._mounts.append((src_path, target_path)) - - def umount(self, target_path: Path) -> None: - logger.info("unmounting '%s'", target_path) - try: - self.host_cmd(f"{UMOUNT} -l {target_path}", cwd=self.directory) - except CoreCommandError: - logger.exception("unmounting failed for %s", target_path) - - def cmd(self, args: str, wait: bool = True, shell: bool = False) -> str: - return self.host_cmd(args, wait=wait) - - def create_dir(self, dir_path: Path) -> None: - raise CoreError("physical node does not support creating directories") - - def create_file(self, file_path: Path, contents: str, mode: int = 0o644) -> None: - raise CoreError("physical node does not support creating files") - - def copy_file(self, src_path: Path, dst_path: Path, mode: int = None) -> None: - raise CoreError("physical node does not support copying files") - - class Rj45Node(CoreNodeBase): """ RJ45Node is a physical interface on the host linked to the emulated @@ -214,7 +36,6 @@ class Rj45Node(CoreNodeBase): session: "Session", _id: int = None, name: str = None, - mtu: int = DEFAULT_MTU, server: DistributedServer = None, ) -> None: """ @@ -223,13 +44,12 @@ class Rj45Node(CoreNodeBase): :param session: core session instance :param _id: node id :param name: node name - :param mtu: rj45 mtu :param server: remote server node will run on, default is None for localhost """ super().__init__(session, _id, name, server) self.iface: CoreInterface = CoreInterface( - self.iface_id, name, name, session.use_ovs(), mtu, self, server + self.iface_id, name, name, session.use_ovs(), node=self, server=server ) self.iface.transport_type = TransportType.RAW self.old_up: bool = False @@ -287,9 +107,13 @@ class Rj45Node(CoreNodeBase): raise CoreError( f"rj45({self.name}) nodes support at most 1 network interface" ) - self.ifaces[self.iface.id] = self.iface + if iface_data and iface_data.mtu is not None: + self.iface.mtu = iface_data.mtu + self.iface.ip4s.clear() + self.iface.ip6s.clear() for ip in iface_data.get_ips(): self.iface.add_ip(ip) + self.ifaces[self.iface.id] = self.iface if self.up: for ip in self.iface.ips(): self.net_client.create_address(self.iface.name, str(ip)) @@ -395,3 +219,69 @@ class Rj45Node(CoreNodeBase): def copy_file(self, src_path: Path, dst_path: Path, mode: int = None) -> None: raise CoreError("rj45 does not support copying files") + + +class PhysicalNode(CoreNode): + def __init__( + self, + session: "Session", + _id: int = None, + name: str = None, + directory: Path = None, + server: DistributedServer = None, + ) -> None: + if not self.server: + raise CoreError("physical nodes must be assigned to a remote server") + super().__init__(session, _id, name, directory, server) + + def startup(self) -> None: + with self.lock: + self.makenodedir() + self.up = True + + def shutdown(self) -> None: + if not self.up: + return + with self.lock: + while self._mounts: + _, target_path = self._mounts.pop(-1) + self.umount(target_path) + for iface in self.get_ifaces(): + iface.shutdown() + self.rmnodedir() + + def _create_cmd(self, args: str, shell: bool = False) -> str: + if shell: + args = f'{BASH} -c "{args}"' + return args + + def adopt_iface(self, iface: CoreInterface, name: str) -> None: + # validate iface belongs to node and get id + iface_id = self.get_iface_id(iface) + if iface_id == -1: + raise CoreError(f"adopting unknown iface({iface.name})") + # turn checksums off + self.node_net_client.checksums_off(iface.name) + # retrieve flow id for container + iface.flow_id = self.node_net_client.get_ifindex(iface.name) + logger.debug("interface flow index: %s - %s", iface.name, iface.flow_id) + if iface.mac: + self.net_client.device_mac(iface.name, str(iface.mac)) + # set all addresses + for ip in iface.ips(): + # ipv4 check + broadcast = None + if netaddr.valid_ipv4(ip): + broadcast = "+" + self.node_net_client.create_address(iface.name, str(ip), broadcast) + # configure iface options + iface.set_config() + # set iface up + self.net_client.device_up(iface.name) + + def umount(self, target_path: Path) -> None: + logger.info("unmounting '%s'", target_path) + try: + self.host_cmd(f"{UMOUNT} -l {target_path}", cwd=self.directory) + except CoreCommandError: + logger.exception("unmounting failed for %s", target_path) diff --git a/daemon/tests/test_distributed.py b/daemon/tests/test_distributed.py index 01362cae..35b7af4e 100644 --- a/daemon/tests/test_distributed.py +++ b/daemon/tests/test_distributed.py @@ -29,12 +29,14 @@ class TestDistributed: # when session.distributed.add_server(server_name, host) + node1 = session.add_node(HubNode) options = NodeOptions(server=server_name) - node = session.add_node(HubNode, options=options) + node2 = session.add_node(HubNode, options=options) + session.add_link(node1.id, node2.id) session.instantiate() # then - assert node.server is not None - assert node.server.name == server_name - assert node.server.host == host - assert len(session.distributed.tunnels) > 0 + assert node2.server is not None + assert node2.server.name == server_name + assert node2.server.host == host + assert len(session.distributed.tunnels) == 1