239 lines
8.2 KiB
Python
239 lines
8.2 KiB
Python
import logging
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
from core import utils
|
|
from core.emulator.data import InterfaceData
|
|
from core.errors import CoreError
|
|
from core.nodes.base import CoreNode
|
|
from core.nodes.interface import DEFAULT_MTU
|
|
from core.nodes.network import CtrlNet
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if TYPE_CHECKING:
|
|
from core.emulator.session import Session
|
|
|
|
CTRL_NET_ID: int = 9001
|
|
ETC_HOSTS_PATH: str = "/etc/hosts"
|
|
|
|
|
|
class ControlNetManager:
|
|
def __init__(self, session: "Session") -> None:
|
|
self.session: "Session" = session
|
|
self.etc_hosts_header: str = f"CORE session {self.session.id} host entries"
|
|
|
|
def _etc_hosts_enabled(self) -> bool:
|
|
"""
|
|
Determines if /etc/hosts should be configured.
|
|
|
|
:return: True if /etc/hosts should be configured, False otherwise
|
|
"""
|
|
return self.session.options.get_bool("update_etc_hosts", False)
|
|
|
|
def _get_server_ifaces(
|
|
self,
|
|
) -> tuple[None, Optional[str], Optional[str], Optional[str]]:
|
|
"""
|
|
Retrieve control net server interfaces.
|
|
|
|
:return: control net server interfaces
|
|
"""
|
|
d0 = self.session.options.get("controlnetif0")
|
|
if d0:
|
|
logger.error("controlnet0 cannot be assigned with a host interface")
|
|
d1 = self.session.options.get("controlnetif1")
|
|
d2 = self.session.options.get("controlnetif2")
|
|
d3 = self.session.options.get("controlnetif3")
|
|
return None, d1, d2, d3
|
|
|
|
def _get_prefixes(
|
|
self,
|
|
) -> tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
|
|
"""
|
|
Retrieve control net prefixes.
|
|
|
|
:return: control net prefixes
|
|
"""
|
|
p = self.session.options.get("controlnet")
|
|
p0 = self.session.options.get("controlnet0")
|
|
p1 = self.session.options.get("controlnet1")
|
|
p2 = self.session.options.get("controlnet2")
|
|
p3 = self.session.options.get("controlnet3")
|
|
if not p0 and p:
|
|
p0 = p
|
|
return p0, p1, p2, p3
|
|
|
|
def update_etc_hosts(self) -> None:
|
|
"""
|
|
Add the IP addresses of control interfaces to the /etc/hosts file.
|
|
|
|
:return: nothing
|
|
"""
|
|
if not self._etc_hosts_enabled():
|
|
return
|
|
control_net = self.get_control_net(0)
|
|
entries = ""
|
|
for iface in control_net.get_ifaces():
|
|
name = iface.node.name
|
|
for ip in iface.ips():
|
|
entries += f"{ip.ip} {name}\n"
|
|
logger.info("adding entries to /etc/hosts")
|
|
utils.file_munge(ETC_HOSTS_PATH, self.etc_hosts_header, entries)
|
|
|
|
def clear_etc_hosts(self) -> None:
|
|
"""
|
|
Clear IP addresses of control interfaces from the /etc/hosts file.
|
|
|
|
:return: nothing
|
|
"""
|
|
if not self._etc_hosts_enabled():
|
|
return
|
|
logger.info("removing /etc/hosts file entries")
|
|
utils.file_demunge(ETC_HOSTS_PATH, self.etc_hosts_header)
|
|
|
|
def get_control_net_index(self, dev: str) -> int:
|
|
"""
|
|
Retrieve control net index.
|
|
|
|
:param dev: device to get control net index for
|
|
:return: control net index, -1 otherwise
|
|
"""
|
|
if dev[0:4] == "ctrl" and int(dev[4]) in (0, 1, 2, 3):
|
|
index = int(dev[4])
|
|
if index == 0:
|
|
return index
|
|
if index < 4 and self._get_prefixes()[index] is not None:
|
|
return index
|
|
return -1
|
|
|
|
def get_control_net(self, index: int) -> Optional[CtrlNet]:
|
|
"""
|
|
Retrieve a control net based on index.
|
|
|
|
:param index: control net index
|
|
:return: control net when available, None otherwise
|
|
"""
|
|
try:
|
|
return self.session.get_node(CTRL_NET_ID + index, CtrlNet)
|
|
except CoreError:
|
|
return None
|
|
|
|
def add_control_net(
|
|
self, index: int, conf_required: bool = True
|
|
) -> Optional[CtrlNet]:
|
|
"""
|
|
Create a control network bridge as necessary. The conf_reqd flag,
|
|
when False, causes a control network bridge to be added even if
|
|
one has not been configured.
|
|
|
|
:param index: network index to add
|
|
:param conf_required: flag to check if conf is required
|
|
:return: control net node
|
|
"""
|
|
logger.info(
|
|
"checking to add control net index(%s) conf_required(%s)",
|
|
index,
|
|
conf_required,
|
|
)
|
|
# check for valid index
|
|
if not (0 <= index <= 3):
|
|
raise CoreError(f"invalid control net index({index})")
|
|
# return any existing control net bridge
|
|
control_net = self.get_control_net(index)
|
|
if control_net:
|
|
logger.info("control net index(%s) already exists", index)
|
|
return control_net
|
|
# retrieve prefix for current index
|
|
index_prefix = self._get_prefixes()[index]
|
|
if not index_prefix:
|
|
if conf_required:
|
|
return None
|
|
else:
|
|
index_prefix = CtrlNet.DEFAULT_PREFIX_LIST[index]
|
|
# retrieve valid prefix from old style values
|
|
prefixes = index_prefix.split()
|
|
if len(prefixes) > 1:
|
|
# a list of per-host prefixes is provided
|
|
try:
|
|
prefix = prefixes[0].split(":", 1)[1]
|
|
except IndexError:
|
|
prefix = prefixes[0]
|
|
else:
|
|
prefix = prefixes[0]
|
|
# use the updown script for control net 0 only
|
|
updown_script = None
|
|
if index == 0:
|
|
updown_script = self.session.options.get("controlnet_updown_script")
|
|
# build a new controlnet bridge
|
|
_id = CTRL_NET_ID + index
|
|
server_iface = self._get_server_ifaces()[index]
|
|
logger.info(
|
|
"adding controlnet(%s) prefix(%s) updown(%s) server interface(%s)",
|
|
_id,
|
|
prefix,
|
|
updown_script,
|
|
server_iface,
|
|
)
|
|
options = CtrlNet.create_options()
|
|
options.prefix = prefix
|
|
options.updown_script = updown_script
|
|
options.serverintf = server_iface
|
|
control_net = self.session.create_node(CtrlNet, False, _id, options=options)
|
|
control_net.brname = f"ctrl{index}.{self.session.short_session_id()}"
|
|
control_net.startup()
|
|
return control_net
|
|
|
|
def remove_control_net(self, index: int) -> None:
|
|
"""
|
|
Removes control net.
|
|
|
|
:param index: index of control net to remove
|
|
:return: nothing
|
|
"""
|
|
control_net = self.get_control_net(index)
|
|
if control_net:
|
|
logger.info("removing control net index(%s)", index)
|
|
self.session.delete_node(control_net.id)
|
|
|
|
def add_control_iface(self, node: CoreNode, index: int) -> None:
|
|
"""
|
|
Adds a control net interface to a node.
|
|
|
|
:param node: node to add control net interface to
|
|
:param index: index of control net to add interface to
|
|
:return: nothing
|
|
:raises CoreError: if control net doesn't exist, interface already exists,
|
|
or there is an error creating the interface
|
|
"""
|
|
control_net = self.get_control_net(index)
|
|
if not control_net:
|
|
raise CoreError(f"control net index({index}) does not exist")
|
|
iface_id = control_net.CTRLIF_IDX_BASE + index
|
|
if node.ifaces.get(iface_id):
|
|
raise CoreError(f"control iface({iface_id}) already exists")
|
|
try:
|
|
logger.info(
|
|
"node(%s) adding control net index(%s) interface(%s)",
|
|
node.name,
|
|
index,
|
|
iface_id,
|
|
)
|
|
ip4 = control_net.prefix[node.id]
|
|
ip4_mask = control_net.prefix.prefixlen
|
|
iface_data = InterfaceData(
|
|
id=iface_id,
|
|
name=f"ctrl{index}",
|
|
mac=utils.random_mac(),
|
|
ip4=ip4,
|
|
ip4_mask=ip4_mask,
|
|
mtu=DEFAULT_MTU,
|
|
)
|
|
iface = node.create_iface(iface_data)
|
|
control_net.attach(iface)
|
|
iface.control = True
|
|
except ValueError:
|
|
raise CoreError(
|
|
f"error adding control net interface to node({node.id}), "
|
|
f"invalid control net prefix({control_net.prefix}), "
|
|
"a longer prefix length may be required"
|
|
)
|