daemon: revamp to align gre tunnels for distributed to align with changes, also moved gre tunnels for wired links to occur directly when linking, allowing runtime distributed functionality, also updates to phyical node to leverage a core node, but avoid using namespacing

This commit is contained in:
Blake Harnden 2022-03-18 12:31:04 -07:00
parent 8c24e9cfa6
commit b71272519d
5 changed files with 113 additions and 201 deletions

View file

@ -15,6 +15,7 @@ from fabric import Connection
from invoke import UnexpectedExit
from core import utils
from core.emulator.links import CoreLink
from core.errors import CoreCommandError, CoreError
from core.executables import get_requirements
from core.nodes.interface import GreTap
@ -183,21 +184,36 @@ class DistributedController:
def start(self) -> None:
"""
Start distributed network tunnels.
Start distributed network tunnels for control networks.
:return: nothing
"""
mtu = self.session.options.get_config_int("mtu")
for node_id in self.session.nodes:
node = self.session.nodes[node_id]
if not isinstance(node, CoreNetwork):
continue
if isinstance(node, CtrlNet) and node.serverintf is not None:
if not isinstance(node, CtrlNet) or node.serverintf is not None:
continue
for name in self.servers:
server = self.servers[name]
self.create_gre_tunnel(node, server, mtu, True)
def create_gre_tunnels(self, core_link: CoreLink) -> None:
"""
Creates gre tunnels for a core link with a ptp network connection.
:param core_link: core link to create gre tunnel for
:return: nothing
"""
if not self.servers:
return
if not core_link.ptp:
raise CoreError(
"attempted to create gre tunnel for core link without a ptp network"
)
mtu = self.session.options.get_config_int("mtu")
for server in self.servers.values():
self.create_gre_tunnel(core_link.ptp, server, mtu, True)
def create_gre_tunnel(
self, node: CoreNetwork, server: DistributedServer, mtu: int, start: bool
) -> Tuple[GreTap, GreTap]:

View file

@ -321,6 +321,9 @@ class Session:
# track link
core_link = CoreLink(node1, iface1, node2, iface2, ptp)
self.link_manager.add(core_link)
# setup link for gre tunnels if needed
if ptp.up:
self.distributed.create_gre_tunnels(core_link)
return iface1, iface2
def delete_link(

View file

@ -776,8 +776,9 @@ class CoreNode(CoreNodeBase):
iface.flow_id = self.node_net_client.get_ifindex(iface.name)
logger.debug("interface flow index: %s - %s", iface.name, iface.flow_id)
# set mac address
self.node_net_client.device_mac(iface.name, str(iface.mac))
logger.debug("interface mac: %s - %s", iface.name, iface.mac)
if iface.mac:
self.node_net_client.device_mac(iface.name, str(iface.mac))
logger.debug("interface mac: %s - %s", iface.name, iface.mac)
# set all addresses
for ip in iface.ips():
# ipv4 check
@ -827,10 +828,10 @@ class CoreNetworkBase(NodeBase):
:param iface: network interface to attach
:return: nothing
"""
i = self.next_iface_id()
self.ifaces[i] = iface
iface_id = self.next_iface_id()
self.ifaces[iface_id] = iface
iface.net = self
iface.net_id = i
iface.net_id = iface_id
with self.linked_lock:
self.linked[iface] = {}

View file

@ -3,17 +3,18 @@ PhysicalNode class for including real systems in the emulated network.
"""
import logging
import threading
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple
import netaddr
from core.emulator.data import InterfaceData, LinkOptions
from core.emulator.distributed import DistributedServer
from core.emulator.enumerations import NodeTypes, TransportType
from core.errors import CoreCommandError, CoreError
from core.executables import MOUNT, TEST, UMOUNT
from core.nodes.base import CoreNetworkBase, CoreNodeBase
from core.nodes.interface import DEFAULT_MTU, CoreInterface
from core.executables import BASH, TEST, UMOUNT
from core.nodes.base import CoreNode, CoreNodeBase
from core.nodes.interface import CoreInterface
logger = logging.getLogger(__name__)
@ -21,185 +22,6 @@ if TYPE_CHECKING:
from core.emulator.session import Session
class PhysicalNode(CoreNodeBase):
def __init__(
self,
session: "Session",
_id: int = None,
name: str = None,
directory: Path = None,
server: DistributedServer = None,
) -> None:
super().__init__(session, _id, name, server)
if not self.server:
raise CoreError("physical nodes must be assigned to a remote server")
self.directory: Optional[Path] = directory
self.lock: threading.RLock = threading.RLock()
self._mounts: List[Tuple[Path, Path]] = []
def startup(self) -> None:
with self.lock:
self.makenodedir()
self.up = True
def shutdown(self) -> None:
if not self.up:
return
with self.lock:
while self._mounts:
_, target_path = self._mounts.pop(-1)
self.umount(target_path)
for iface in self.get_ifaces():
iface.shutdown()
self.rmnodedir()
def path_exists(self, path: str) -> bool:
"""
Determines if a file or directory path exists.
:param path: path to file or directory
:return: True if path exists, False otherwise
"""
try:
self.host_cmd(f"{TEST} -e {path}")
return True
except CoreCommandError:
return False
def termcmdstring(self, sh: str = "/bin/sh") -> str:
"""
Create a terminal command string.
:param sh: shell to execute command in
:return: str
"""
return sh
def set_mac(self, iface_id: int, mac: str) -> None:
"""
Set mac address for an interface.
:param iface_id: index of interface to set hardware address for
:param mac: mac address to set
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
iface = self.get_iface(iface_id)
iface.set_mac(mac)
if self.up:
self.net_client.device_mac(iface.name, str(iface.mac))
def add_ip(self, iface_id: int, ip: str) -> None:
"""
Add an ip address to an interface in the format "10.0.0.1/24".
:param iface_id: id of interface to add address to
:param ip: address to add to interface
:return: nothing
:raises CoreError: when ip address provided is invalid
:raises CoreCommandError: when a non-zero exit status occurs
"""
iface = self.get_iface(iface_id)
iface.add_ip(ip)
if self.up:
self.net_client.create_address(iface.name, ip)
def remove_ip(self, iface_id: int, ip: str) -> None:
"""
Remove an ip address from an interface in the format "10.0.0.1/24".
:param iface_id: id of interface to delete address from
:param ip: ip address to remove from interface
:return: nothing
:raises CoreError: when ip address provided is invalid
:raises CoreCommandError: when a non-zero exit status occurs
"""
iface = self.get_iface(iface_id)
iface.remove_ip(ip)
if self.up:
self.net_client.delete_address(iface.name, ip)
def adopt_iface(
self, iface: CoreInterface, iface_id: int, mac: str, ips: List[str]
) -> None:
"""
When a link message is received linking this node to another part of
the emulation, no new interface is created; instead, adopt the
GreTap interface as the node interface.
"""
iface.name = f"gt{iface_id}"
iface.node = self
self.add_iface(iface, iface_id)
# use a more reasonable name, e.g. "gt0" instead of "gt.56286.150"
if self.up:
self.net_client.device_down(iface.localname)
self.net_client.device_name(iface.localname, iface.name)
iface.localname = iface.name
if mac:
self.set_mac(iface_id, mac)
for ip in ips:
self.add_ip(iface_id, ip)
if self.up:
self.net_client.device_up(iface.localname)
def next_iface_id(self) -> int:
with self.lock:
while self.iface_id in self.ifaces:
self.iface_id += 1
iface_id = self.iface_id
self.iface_id += 1
return iface_id
def new_iface(
self, net: CoreNetworkBase, iface_data: InterfaceData
) -> CoreInterface:
logger.info("creating interface")
ips = iface_data.get_ips()
iface_id = iface_data.id
if iface_id is None:
iface_id = self.next_iface_id()
name = iface_data.name
if name is None:
name = f"gt{iface_id}"
_, remote_tap = self.session.distributed.create_gre_tunnel(
net, self.server, iface_data.mtu, self.up
)
self.adopt_iface(remote_tap, iface_id, iface_data.mac, ips)
return remote_tap
def privatedir(self, dir_path: Path) -> None:
if not str(dir_path).startswith("/"):
raise CoreError(f"private directory path not fully qualified: {dir_path}")
host_path = self.host_path(dir_path, is_dir=True)
self.host_cmd(f"mkdir -p {host_path}")
self.mount(host_path, dir_path)
def mount(self, src_path: Path, target_path: Path) -> None:
logger.debug("node(%s) mounting: %s at %s", self.name, src_path, target_path)
self.cmd(f"mkdir -p {target_path}")
self.host_cmd(f"{MOUNT} --bind {src_path} {target_path}", cwd=self.directory)
self._mounts.append((src_path, target_path))
def umount(self, target_path: Path) -> None:
logger.info("unmounting '%s'", target_path)
try:
self.host_cmd(f"{UMOUNT} -l {target_path}", cwd=self.directory)
except CoreCommandError:
logger.exception("unmounting failed for %s", target_path)
def cmd(self, args: str, wait: bool = True, shell: bool = False) -> str:
return self.host_cmd(args, wait=wait)
def create_dir(self, dir_path: Path) -> None:
raise CoreError("physical node does not support creating directories")
def create_file(self, file_path: Path, contents: str, mode: int = 0o644) -> None:
raise CoreError("physical node does not support creating files")
def copy_file(self, src_path: Path, dst_path: Path, mode: int = None) -> None:
raise CoreError("physical node does not support copying files")
class Rj45Node(CoreNodeBase):
"""
RJ45Node is a physical interface on the host linked to the emulated
@ -214,7 +36,6 @@ class Rj45Node(CoreNodeBase):
session: "Session",
_id: int = None,
name: str = None,
mtu: int = DEFAULT_MTU,
server: DistributedServer = None,
) -> None:
"""
@ -223,13 +44,12 @@ class Rj45Node(CoreNodeBase):
:param session: core session instance
:param _id: node id
:param name: node name
:param mtu: rj45 mtu
:param server: remote server node
will run on, default is None for localhost
"""
super().__init__(session, _id, name, server)
self.iface: CoreInterface = CoreInterface(
self.iface_id, name, name, session.use_ovs(), mtu, self, server
self.iface_id, name, name, session.use_ovs(), node=self, server=server
)
self.iface.transport_type = TransportType.RAW
self.old_up: bool = False
@ -287,9 +107,13 @@ class Rj45Node(CoreNodeBase):
raise CoreError(
f"rj45({self.name}) nodes support at most 1 network interface"
)
self.ifaces[self.iface.id] = self.iface
if iface_data and iface_data.mtu is not None:
self.iface.mtu = iface_data.mtu
self.iface.ip4s.clear()
self.iface.ip6s.clear()
for ip in iface_data.get_ips():
self.iface.add_ip(ip)
self.ifaces[self.iface.id] = self.iface
if self.up:
for ip in self.iface.ips():
self.net_client.create_address(self.iface.name, str(ip))
@ -395,3 +219,69 @@ class Rj45Node(CoreNodeBase):
def copy_file(self, src_path: Path, dst_path: Path, mode: int = None) -> None:
raise CoreError("rj45 does not support copying files")
class PhysicalNode(CoreNode):
def __init__(
self,
session: "Session",
_id: int = None,
name: str = None,
directory: Path = None,
server: DistributedServer = None,
) -> None:
if not self.server:
raise CoreError("physical nodes must be assigned to a remote server")
super().__init__(session, _id, name, directory, server)
def startup(self) -> None:
with self.lock:
self.makenodedir()
self.up = True
def shutdown(self) -> None:
if not self.up:
return
with self.lock:
while self._mounts:
_, target_path = self._mounts.pop(-1)
self.umount(target_path)
for iface in self.get_ifaces():
iface.shutdown()
self.rmnodedir()
def _create_cmd(self, args: str, shell: bool = False) -> str:
if shell:
args = f'{BASH} -c "{args}"'
return args
def adopt_iface(self, iface: CoreInterface, name: str) -> None:
# validate iface belongs to node and get id
iface_id = self.get_iface_id(iface)
if iface_id == -1:
raise CoreError(f"adopting unknown iface({iface.name})")
# turn checksums off
self.node_net_client.checksums_off(iface.name)
# retrieve flow id for container
iface.flow_id = self.node_net_client.get_ifindex(iface.name)
logger.debug("interface flow index: %s - %s", iface.name, iface.flow_id)
if iface.mac:
self.net_client.device_mac(iface.name, str(iface.mac))
# set all addresses
for ip in iface.ips():
# ipv4 check
broadcast = None
if netaddr.valid_ipv4(ip):
broadcast = "+"
self.node_net_client.create_address(iface.name, str(ip), broadcast)
# configure iface options
iface.set_config()
# set iface up
self.net_client.device_up(iface.name)
def umount(self, target_path: Path) -> None:
logger.info("unmounting '%s'", target_path)
try:
self.host_cmd(f"{UMOUNT} -l {target_path}", cwd=self.directory)
except CoreCommandError:
logger.exception("unmounting failed for %s", target_path)

View file

@ -29,12 +29,14 @@ class TestDistributed:
# when
session.distributed.add_server(server_name, host)
node1 = session.add_node(HubNode)
options = NodeOptions(server=server_name)
node = session.add_node(HubNode, options=options)
node2 = session.add_node(HubNode, options=options)
session.add_link(node1.id, node2.id)
session.instantiate()
# then
assert node.server is not None
assert node.server.name == server_name
assert node.server.host == host
assert len(session.distributed.tunnels) > 0
assert node2.server is not None
assert node2.server.name == server_name
assert node2.server.host == host
assert len(session.distributed.tunnels) == 1