daemon: initial pass to revamp how node linking and link management is done, provides a consistent way to link all wired nodes and allows them to be configured for tc for the same behavior across the board

This commit is contained in:
Blake Harnden 2022-03-17 15:28:38 -07:00
parent d684b8eb5a
commit cd7f1a641e
19 changed files with 1393 additions and 1556 deletions

View file

@ -0,0 +1,256 @@
"""
Provides functionality for maintaining information about known links
for a session.
"""
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Tuple, ValuesView
from core.emulator.data import LinkData, LinkOptions
from core.emulator.enumerations import LinkTypes, MessageFlags
from core.errors import CoreError
from core.nodes.base import NodeBase
from core.nodes.interface import CoreInterface
from core.nodes.network import PtpNet
logger = logging.getLogger(__name__)
LinkKeyType = Tuple[int, Optional[int], int, Optional[int]]
def create_key(
node1: NodeBase,
iface1: Optional[CoreInterface],
node2: NodeBase,
iface2: Optional[CoreInterface],
) -> LinkKeyType:
"""
Creates a unique key for tracking links.
:param node1: first node in link
:param iface1: node1 interface
:param node2: second node in link
:param iface2: node2 interface
:return: link key
"""
iface1_id = iface1.id if iface1 else None
iface2_id = iface2.id if iface2 else None
if node1.id < node2.id:
return node1.id, iface1_id, node2.id, iface2_id
else:
return node2.id, iface2_id, node1.id, iface1_id
@dataclass
class CoreLink:
"""
Provides a core link data structure.
"""
node1: NodeBase
iface1: Optional[CoreInterface]
node2: NodeBase
iface2: Optional[CoreInterface]
ptp: PtpNet = None
label: str = None
color: str = None
def key(self) -> LinkKeyType:
"""
Retrieve the key for this link.
:return: link key
"""
return create_key(self.node1, self.iface1, self.node2, self.iface2)
def is_unidirectional(self) -> bool:
"""
Checks if this link is considered unidirectional, due to current
iface configurations.
:return: True if unidirectional, False otherwise
"""
unidirectional = False
if self.iface1 and self.iface2:
unidirectional = self.iface1.options != self.iface2.options
return unidirectional
def options(self) -> LinkOptions:
"""
Retrieve the options for this link.
:return: options for this link
"""
if self.is_unidirectional():
options = self.iface1.options
else:
if self.iface1:
options = self.iface1.options
else:
options = self.iface2.options
return options
def get_data(self, message_type: MessageFlags, source: str = None) -> LinkData:
"""
Create link data for this link.
:param message_type: link data message type
:param source: source for this data
:return: link data
"""
iface1_data = self.iface1.get_data() if self.iface1 else None
iface2_data = self.iface2.get_data() if self.iface2 else None
return LinkData(
message_type=message_type,
type=LinkTypes.WIRED,
node1_id=self.node1.id,
node2_id=self.node2.id,
iface1=iface1_data,
iface2=iface2_data,
options=self.options(),
label=self.label,
color=self.color,
source=source,
)
def get_data_unidirectional(self, source: str = None) -> LinkData:
"""
Create other unidirectional link data.
:param source: source for this data
:return: unidirectional link data
"""
iface1_data = self.iface1.get_data() if self.iface1 else None
iface2_data = self.iface2.get_data() if self.iface2 else None
return LinkData(
message_type=MessageFlags.NONE,
type=LinkTypes.WIRED,
node1_id=self.node2.id,
node2_id=self.node1.id,
iface1=iface2_data,
iface2=iface1_data,
options=self.iface2.options,
label=self.label,
color=self.color,
source=source,
)
class LinkManager:
"""
Provides core link management.
"""
def __init__(self) -> None:
"""
Create a LinkManager instance.
"""
self._links: Dict[LinkKeyType, CoreLink] = {}
self._node_links: Dict[int, Dict[LinkKeyType, CoreLink]] = {}
def add(self, core_link: CoreLink) -> None:
"""
Add a core link to be tracked.
:param core_link: link to track
:return: nothing
"""
node1, iface1 = core_link.node1, core_link.iface1
node2, iface2 = core_link.node2, core_link.iface2
if core_link.key() in self._links:
raise CoreError(
f"node1({node1.name}) iface1({iface1.id}) "
f"node2({node2.name}) iface2({iface2.id}) link already exists"
)
logger.info(
"adding link from node(%s:%s) to node(%s:%s)",
node1.name,
iface1.name if iface1 else None,
node2.name,
iface2.name if iface2 else None,
)
self._links[core_link.key()] = core_link
node1_links = self._node_links.setdefault(node1.id, {})
node1_links[core_link.key()] = core_link
node2_links = self._node_links.setdefault(node2.id, {})
node2_links[core_link.key()] = core_link
def delete(
self,
node1: NodeBase,
iface1: Optional[CoreInterface],
node2: NodeBase,
iface2: Optional[CoreInterface],
) -> CoreLink:
"""
Remove a link from being tracked.
:param node1: first node in link
:param iface1: node1 interface
:param node2: second node in link
:param iface2: node2 interface
:return: removed core link
"""
key = create_key(node1, iface1, node2, iface2)
if key not in self._links:
raise CoreError(
f"node1({node1.name}) iface1({iface1.id}) "
f"node2({node2.name}) iface2({iface2.id}) is not linked"
)
logger.info(
"deleting link from node(%s:%s) to node(%s:%s)",
node1.name,
iface1.name if iface1 else None,
node2.name,
iface2.name if iface2 else None,
)
node1_links = self._node_links[node1.id]
node1_links.pop(key)
node2_links = self._node_links[node2.id]
node2_links.pop(key)
return self._links.pop(key)
def reset(self) -> None:
"""
Resets and clears all tracking information.
:return: nothing
"""
self._links.clear()
self._node_links.clear()
def get_link(
self,
node1: NodeBase,
iface1: Optional[CoreInterface],
node2: NodeBase,
iface2: Optional[CoreInterface],
) -> Optional[CoreLink]:
"""
Retrieve a link for provided values.
:param node1: first node in link
:param iface1: interface for node1
:param node2: second node in link
:param iface2: interface for node2
:return: core link if present, None otherwise
"""
key = create_key(node1, iface1, node2, iface2)
return self._links.get(key)
def links(self) -> ValuesView[CoreLink]:
"""
Retrieve all known links
:return: iterator for all known links
"""
return self._links.values()
def node_links(self, node: NodeBase) -> ValuesView[CoreLink]:
"""
Retrieve all links for a given node.
:param node: node to get links for
:return: node links
"""
return self._node_links.get(node.id, {}).values()

View file

@ -35,10 +35,10 @@ from core.emulator.distributed import DistributedController
from core.emulator.enumerations import (
EventTypes,
ExceptionLevels,
LinkTypes,
MessageFlags,
NodeTypes,
)
from core.emulator.links import CoreLink, LinkManager
from core.emulator.sessionconfig import SessionConfig
from core.errors import CoreError
from core.location.event import EventLoop
@ -86,6 +86,7 @@ CONTAINER_NODES: Set[Type[NodeBase]] = {DockerNode, LxcNode}
CTRL_NET_ID: int = 9001
LINK_COLORS: List[str] = ["green", "blue", "orange", "purple", "turquoise"]
NT: TypeVar = TypeVar("NT", bound=NodeBase)
WIRELESS_TYPE: Tuple[Type[WlanNode], Type[EmaneNet]] = (WlanNode, EmaneNet)
class Session:
@ -119,7 +120,8 @@ class Session:
# dict of nodes: all nodes and nets
self.nodes: Dict[int, NodeBase] = {}
self.nodes_lock = threading.Lock()
self.nodes_lock: threading.Lock = threading.Lock()
self.link_manager: LinkManager = LinkManager()
# states and hooks handlers
self.state: EventTypes = EventTypes.DEFINITION_STATE
@ -187,40 +189,6 @@ class Session:
raise CoreError(f"invalid node class: {_class}")
return node_type
def _link_wireless(
self, node1: CoreNodeBase, node2: CoreNodeBase, connect: bool
) -> None:
"""
Objects to deal with when connecting/disconnecting wireless links.
:param node1: node one for wireless link
:param node2: node two for wireless link
:param connect: link interfaces if True, unlink otherwise
:return: nothing
:raises core.CoreError: when objects to link is less than 2, or no common
networks are found
"""
logger.info(
"handling wireless linking node1(%s) node2(%s): %s",
node1.name,
node2.name,
connect,
)
common_networks = node1.commonnets(node1)
if not common_networks:
raise CoreError("no common network found for wireless link/unlink")
for common_network, iface1, iface2 in common_networks:
if not isinstance(common_network, (WlanNode, EmaneNet)):
logger.info(
"skipping common network that is not wireless/emane: %s",
common_network,
)
continue
if connect:
common_network.link(iface1, iface2)
else:
common_network.unlink(iface1, iface2)
def use_ovs(self) -> bool:
return self.options.get_config("ovs") == "1"
@ -231,8 +199,7 @@ class Session:
iface1_data: InterfaceData = None,
iface2_data: InterfaceData = None,
options: LinkOptions = None,
link_type: LinkTypes = LinkTypes.WIRED,
) -> Tuple[CoreInterface, CoreInterface]:
) -> Tuple[Optional[CoreInterface], Optional[CoreInterface]]:
"""
Add a link between nodes.
@ -244,89 +211,120 @@ class Session:
data, defaults to none
:param options: data for creating link,
defaults to no options
:param link_type: type of link to add
:return: tuple of created core interfaces, depending on link
"""
if not options:
options = LinkOptions()
node1 = self.get_node(node1_id, NodeBase)
node2 = self.get_node(node2_id, NodeBase)
iface1 = None
iface2 = None
options = options if options else LinkOptions()
# set mtu
mtu = self.options.get_config_int("mtu") or DEFAULT_MTU
if iface1_data:
iface1_data.mtu = mtu
if iface2_data:
iface2_data.mtu = mtu
# wireless link
if link_type == LinkTypes.WIRELESS:
if isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNodeBase):
self._link_wireless(node1, node2, connect=True)
else:
raise CoreError(
f"cannot wireless link node1({type(node1)}) node2({type(node2)})"
)
# wired link
node1 = self.get_node(node1_id, NodeBase)
node2 = self.get_node(node2_id, NodeBase)
# check for invalid linking
if isinstance(node1, WIRELESS_TYPE) and isinstance(node2, WIRELESS_TYPE):
raise CoreError(f"cannot link node({type(node1)}) node({type(node2)})")
# custom links
iface1 = None
iface2 = None
if isinstance(node1, WlanNode):
iface2 = self._add_wlan_link(node2, iface2_data, node1)
elif isinstance(node2, WlanNode):
iface1 = self._add_wlan_link(node1, iface1_data, node2)
elif isinstance(node1, EmaneNet) and isinstance(node2, CoreNode):
iface2 = self._add_emane_link(node2, iface2_data, node1)
elif isinstance(node2, EmaneNet) and isinstance(node1, CoreNode):
iface1 = self._add_emane_link(node1, iface1_data, node2)
else:
# peer to peer link
if isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNodeBase):
logger.info("linking ptp: %s - %s", node1.name, node2.name)
start = self.state.should_start()
ptp = self.create_node(PtpNet, start)
iface1 = node1.new_iface(ptp, iface1_data)
iface2 = node2.new_iface(ptp, iface2_data)
iface1.config(options)
if not options.unidirectional:
iface2.config(options)
# link node to net
elif isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNetworkBase):
logger.info("linking node to net: %s - %s", node1.name, node2.name)
iface1 = node1.new_iface(node2, iface1_data)
if not isinstance(node2, (EmaneNet, WlanNode)):
iface1.config(options)
# link net to node
elif isinstance(node2, CoreNodeBase) and isinstance(node1, CoreNetworkBase):
logger.info("linking net to node: %s - %s", node1.name, node2.name)
iface2 = node2.new_iface(node1, iface2_data)
wireless_net = isinstance(node1, (EmaneNet, WlanNode))
if not options.unidirectional and not wireless_net:
iface2.config(options)
# network to network
elif isinstance(node1, CoreNetworkBase) and isinstance(
node2, CoreNetworkBase
):
logger.info(
"linking network to network: %s - %s", node1.name, node2.name
)
iface1 = node1.linknet(node2)
use_local = iface1.net == node1
iface1.config(options, use_local=use_local)
if not options.unidirectional:
iface1.config(options, use_local=not use_local)
else:
raise CoreError(
f"cannot link node1({type(node1)}) node2({type(node2)})"
)
# configure tunnel nodes
key = options.key
if isinstance(node1, TunnelNode):
logger.info("setting tunnel key for: %s", node1.name)
node1.setkey(key, iface1_data)
if isinstance(node2, TunnelNode):
logger.info("setting tunnel key for: %s", node2.name)
node2.setkey(key, iface2_data)
iface1, iface2 = self._add_wired_link(
node1, node2, iface1_data, iface2_data, options
)
# configure tunnel nodes
key = options.key
if isinstance(node1, TunnelNode):
logger.info("setting tunnel key for: %s", node1.name)
node1.setkey(key, iface1_data)
if isinstance(node2, TunnelNode):
logger.info("setting tunnel key for: %s", node2.name)
node2.setkey(key, iface2_data)
self.sdt.add_link(node1_id, node2_id)
return iface1, iface2
def delete_link(
def _add_wlan_link(
self, node: NodeBase, iface_data: InterfaceData, net: WlanNode
) -> CoreInterface:
"""
Create a wlan link.
:param node: node to link to wlan network
:param iface_data: data to create interface with
:param net: wlan network to link to
:return: interface created for node
"""
# create interface
iface = node.create_iface(iface_data)
# attach to wlan
net.attach(iface)
# track link
core_link = CoreLink(node, iface, net, None)
self.link_manager.add(core_link)
return iface
def _add_emane_link(
self, node: CoreNode, iface_data: InterfaceData, net: EmaneNet
) -> CoreInterface:
"""
Create am emane link.
:param node: node to link to emane network
:param iface_data: data to create interface with
:param net: emane network to link to
:return: interface created for node
"""
# create iface tuntap
iface = net.create_tuntap(node, iface_data)
# track link
core_link = CoreLink(node, iface, net, None)
self.link_manager.add(core_link)
return iface
def _add_wired_link(
self,
node1_id: int,
node2_id: int,
iface1_id: int = None,
iface2_id: int = None,
link_type: LinkTypes = LinkTypes.WIRED,
node1: NodeBase,
node2: NodeBase,
iface1_data: InterfaceData = None,
iface2_data: InterfaceData = None,
options: LinkOptions = None,
) -> Tuple[CoreInterface, CoreInterface]:
"""
Create a wired link between two nodes.
:param node1: first node to be linked
:param node2: second node to be linked
:param iface1_data: data to create interface for node1
:param iface2_data: data to create interface for node2
:param options: options to configure interfaces with
:return: interfaces created for both nodes
"""
# create interface
if iface1_data and isinstance(node1, CoreNetworkBase):
iface1_data.id = None
iface1 = node1.create_iface(iface1_data, options)
if iface2_data and isinstance(node2, CoreNetworkBase):
iface2_data.id = None
iface2 = node2.create_iface(iface2_data, options)
# join and attach to ptp bridge
ptp = self.create_node(PtpNet, self.state.should_start())
ptp.attach(iface1)
ptp.attach(iface2)
# track link
core_link = CoreLink(node1, iface1, node2, iface2, ptp)
self.link_manager.add(core_link)
return iface1, iface2
def delete_link(
self, node1_id: int, node2_id: int, iface1_id: int = None, iface2_id: int = None
) -> None:
"""
Delete a link between nodes.
@ -335,63 +333,38 @@ class Session:
:param node2_id: node two id
:param iface1_id: interface id for node one
:param iface2_id: interface id for node two
:param link_type: link type to delete
:return: nothing
:raises core.CoreError: when no common network is found for link being deleted
"""
node1 = self.get_node(node1_id, NodeBase)
node2 = self.get_node(node2_id, NodeBase)
logger.info(
"deleting link(%s) node(%s):interface(%s) node(%s):interface(%s)",
link_type.name,
"deleting link node(%s):interface(%s) node(%s):interface(%s)",
node1.name,
iface1_id,
node2.name,
iface2_id,
)
# wireless link
if link_type == LinkTypes.WIRELESS:
if isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNodeBase):
self._link_wireless(node1, node2, connect=False)
else:
raise CoreError(
"cannot delete wireless link "
f"node1({type(node1)}) node2({type(node2)})"
)
# wired link
iface1 = None
iface2 = None
if isinstance(node1, WlanNode):
iface2 = node2.delete_iface(iface2_id)
node1.detach(iface2)
elif isinstance(node2, WlanNode):
iface1 = node1.delete_iface(iface1_id)
node2.detach(iface1)
elif isinstance(node1, EmaneNet):
iface2 = node2.delete_iface(iface2_id)
node1.detach(iface2)
elif isinstance(node2, EmaneNet):
iface1 = node1.delete_iface(iface1_id)
node2.detach(iface1)
else:
if isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNodeBase):
iface1 = node1.get_iface(iface1_id)
iface2 = node2.get_iface(iface2_id)
if iface1.net != iface2.net:
raise CoreError(
f"node1({node1.name}) node2({node2.name}) "
"not connected to same net"
)
ptp = iface1.net
node1.delete_iface(iface1_id)
node2.delete_iface(iface2_id)
self.delete_node(ptp.id)
elif isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNetworkBase):
node1.delete_iface(iface1_id)
elif isinstance(node2, CoreNodeBase) and isinstance(node1, CoreNetworkBase):
node2.delete_iface(iface2_id)
elif isinstance(node1, CoreNetworkBase) and isinstance(
node2, CoreNetworkBase
):
iface1 = node1.get_linked_iface(node2)
if iface1:
node1.detach(iface1)
iface1.shutdown()
iface2 = node2.get_linked_iface(node1)
if iface2:
node2.detach(iface2)
iface2.shutdown()
if not iface1 and not iface2:
raise CoreError(
f"node1({node1.name}) and node2({node2.name}) are not connected"
)
iface1 = node1.delete_iface(iface1_id)
iface2 = node2.delete_iface(iface2_id)
core_link = self.link_manager.delete(node1, iface1, node2, iface2)
if core_link.ptp:
self.delete_node(core_link.ptp.id)
self.sdt.delete_link(node1_id, node2_id)
def update_link(
@ -401,7 +374,6 @@ class Session:
iface1_id: int = None,
iface2_id: int = None,
options: LinkOptions = None,
link_type: LinkTypes = LinkTypes.WIRED,
) -> None:
"""
Update link information between nodes.
@ -411,7 +383,6 @@ class Session:
:param iface1_id: interface id for node one
:param iface2_id: interface id for node two
:param options: data to update link with
:param link_type: type of link to update
:return: nothing
:raises core.CoreError: when updating a wireless type link, when there is a
unknown link between networks
@ -421,72 +392,28 @@ class Session:
node1 = self.get_node(node1_id, NodeBase)
node2 = self.get_node(node2_id, NodeBase)
logger.info(
"update link(%s) node(%s):interface(%s) node(%s):interface(%s)",
link_type.name,
"update link node(%s):interface(%s) node(%s):interface(%s)",
node1.name,
iface1_id,
node2.name,
iface2_id,
)
# wireless link
if link_type == LinkTypes.WIRELESS:
raise CoreError("cannot update wireless link")
else:
if isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNodeBase):
iface1 = node1.ifaces.get(iface1_id)
if not iface1:
raise CoreError(
f"node({node1.name}) missing interface({iface1_id})"
)
iface2 = node2.ifaces.get(iface2_id)
if not iface2:
raise CoreError(
f"node({node2.name}) missing interface({iface2_id})"
)
if iface1.net != iface2.net:
raise CoreError(
f"node1({node1.name}) node2({node2.name}) "
"not connected to same net"
)
iface1.config(options)
if not options.unidirectional:
iface2.config(options)
elif isinstance(node1, CoreNodeBase) and isinstance(node2, CoreNetworkBase):
iface = node1.get_iface(iface1_id)
if iface.net != node2:
raise CoreError(
f"node1({node1.name}) iface1({iface1_id})"
f" is not linked to node1({node2.name})"
)
iface.config(options)
elif isinstance(node2, CoreNodeBase) and isinstance(node1, CoreNetworkBase):
iface = node2.get_iface(iface2_id)
if iface.net != node1:
raise CoreError(
f"node2({node2.name}) iface2({iface2_id})"
f" is not linked to node1({node1.name})"
)
iface.config(options)
elif isinstance(node1, CoreNetworkBase) and isinstance(
node2, CoreNetworkBase
):
iface = node1.get_linked_iface(node2)
if not iface:
iface = node2.get_linked_iface(node1)
if iface:
use_local = iface.net == node1
iface.config(options, use_local=use_local)
if not options.unidirectional:
iface.config(options, use_local=not use_local)
else:
raise CoreError(
f"node1({node1.name}) and node2({node2.name}) are not linked"
)
else:
raise CoreError(
f"cannot update link node1({type(node1)}) node2({type(node2)})"
)
iface1 = node1.get_iface(iface1_id) if iface1_id is not None else None
iface2 = node2.get_iface(iface2_id) if iface2_id is not None else None
core_link = self.link_manager.get_link(node1, iface1, node2, iface2)
if not core_link:
raise CoreError(
f"there is no link for node({node1.name}):interface({iface1_id}) "
f"node({node2.name}):interface({iface2_id})"
)
if iface1:
iface1.options.update(options)
node1_input = node1 if isinstance(node1, CoreNode) else None
iface1.set_config(node1_input)
if iface2 and not options.unidirectional:
iface2.options.update(options)
node2_input = node2 if isinstance(node2, CoreNode) else None
iface2.set_config(node2_input)
def next_node_id(self) -> int:
"""
@ -723,6 +650,7 @@ class Session:
"""
self.emane.shutdown()
self.delete_nodes()
self.link_manager.reset()
self.distributed.shutdown()
self.hooks.clear()
self.emane.reset()
@ -1480,7 +1408,8 @@ class Session:
ip4_mask=ip4_mask,
mtu=DEFAULT_MTU,
)
iface = node.new_iface(control_net, iface_data)
iface = node.create_iface(iface_data)
control_net.attach(iface)
iface.control = True
except ValueError:
msg = f"Control interface not added to node {node.id}. "