diff --git a/.github/workflows/daemon-checks.yml b/.github/workflows/daemon-checks.yml index 00537c8e..ca2de7d8 100644 --- a/.github/workflows/daemon-checks.yml +++ b/.github/workflows/daemon-checks.yml @@ -19,11 +19,11 @@ jobs: cp setup.py.in setup.py cp core/constants.py.in core/constants.py sed -i 's/True/False/g' core/constants.py - pipenv install --dev + pipenv sync --dev - name: isort run: | cd daemon - pipenv run isort -c + pipenv run isort -c -df - name: black run: | cd daemon diff --git a/daemon/core/emane/emanemanager.py b/daemon/core/emane/emanemanager.py index 82e37f43..756cad2a 100644 --- a/daemon/core/emane/emanemanager.py +++ b/daemon/core/emane/emanemanager.py @@ -15,6 +15,7 @@ from core.emane.bypass import EmaneBypassModel from core.emane.commeffect import EmaneCommEffectModel from core.emane.emanemodel import EmaneModel from core.emane.ieee80211abg import EmaneIeee80211abgModel +from core.emane.linkmonitor import EmaneLinkMonitor from core.emane.nodes import EmaneNet from core.emane.rfpipe import EmaneRfPipeModel from core.emane.tdma import EmaneTdmaModel @@ -28,7 +29,6 @@ from core.xml import emanexml if TYPE_CHECKING: from core.emulator.session import Session - try: from emane.events import EventService from emane.events import LocationEvent @@ -90,6 +90,9 @@ class EmaneManager(ModelManager): self.emane_config = EmaneGlobalModel(session) self.set_configs(self.emane_config.default_values()) + # link monitor + self.link_monitor = EmaneLinkMonitor(self) + self.service = None self.eventchannel = None self.event_device = None @@ -349,9 +352,13 @@ class EmaneManager(ModelManager): f.write(f"{nodename} {ifname} {nemid}\n") except IOError: logging.exception("Error writing EMANE NEMs file: %s") - + if self.links_enabled(): + self.link_monitor.start() return EmaneManager.SUCCESS + def links_enabled(self) -> bool: + return self.get_config("link_enabled") == "1" + def poststartup(self) -> None: """ Retransmit location events now that all NEMs are active. @@ -393,7 +400,9 @@ class EmaneManager(ModelManager): with self._emane_node_lock: if not self._emane_nets: return - logging.info("stopping EMANE daemons.") + logging.info("stopping EMANE daemons") + if self.links_enabled(): + self.link_monitor.stop() self.deinstallnetifs() self.stopdaemons() self.stopeventmonitor() @@ -834,13 +843,43 @@ class EmaneGlobalModel: def __init__(self, session: "Session") -> None: self.session = session - self.nem_config = [ + self.core_config = [ + Configuration( + _id="platform_id_start", + _type=ConfigDataTypes.INT32, + default="1", + label="Starting Platform ID", + ), Configuration( _id="nem_id_start", _type=ConfigDataTypes.INT32, default="1", - label="Starting NEM ID (core)", - ) + label="Starting NEM ID", + ), + Configuration( + _id="link_enabled", + _type=ConfigDataTypes.BOOL, + default="1", + label="Enable Links?", + ), + Configuration( + _id="loss_threshold", + _type=ConfigDataTypes.INT32, + default="30", + label="Link Loss Threshold (%)", + ), + Configuration( + _id="link_interval", + _type=ConfigDataTypes.INT32, + default="1", + label="Link Check Interval (sec)", + ), + Configuration( + _id="link_timeout", + _type=ConfigDataTypes.INT32, + default="4", + label="Link Timeout (sec)", + ), ] self.emulator_config = None self.parse_config() @@ -857,25 +896,16 @@ class EmaneGlobalModel: "otamanagergroup": "224.1.2.8:45702", } self.emulator_config = emanemanifest.parse(emulator_xml, emulator_defaults) - self.emulator_config.insert( - 0, - Configuration( - _id="platform_id_start", - _type=ConfigDataTypes.INT32, - default="1", - label="Starting Platform ID (core)", - ), - ) def configurations(self) -> List[Configuration]: - return self.emulator_config + self.nem_config + return self.emulator_config + self.core_config def config_groups(self) -> List[ConfigGroup]: emulator_len = len(self.emulator_config) config_len = len(self.configurations()) return [ ConfigGroup("Platform Attributes", 1, emulator_len), - ConfigGroup("NEM Parameters", emulator_len + 1, config_len), + ConfigGroup("CORE Configuration", emulator_len + 1, config_len), ] def default_values(self) -> Dict[str, str]: diff --git a/daemon/core/emane/linkmonitor.py b/daemon/core/emane/linkmonitor.py new file mode 100644 index 00000000..6d4daa8d --- /dev/null +++ b/daemon/core/emane/linkmonitor.py @@ -0,0 +1,312 @@ +import logging +import sched +import threading +import time +from typing import TYPE_CHECKING, Dict, List, Tuple + +import netaddr +from lxml import etree + +from core.emulator.data import LinkData +from core.emulator.enumerations import LinkTypes, MessageFlags +from core.nodes.network import CtrlNet + +try: + from emane import shell +except ImportError: + try: + from emanesh import shell + except ImportError: + logging.debug("compatible emane python bindings not installed") + + +if TYPE_CHECKING: + from core.emane.emanemanager import EmaneManager + +DEFAULT_PORT = 47_000 +MAC_COMPONENT_INDEX = 1 +EMANE_RFPIPE = "rfpipemaclayer" +EMANE_80211 = "ieee80211abgmaclayer" +EMANE_TDMA = "tdmaeventschedulerradiomodel" +SINR_TABLE = "NeighborStatusTable" +NEM_SELF = 65535 + + +class LossTable: + def __init__(self, losses: Dict[float, float]) -> None: + self.losses = losses + self.sinrs = sorted(self.losses.keys()) + self.loss_lookup = {} + for index, value in enumerate(self.sinrs): + self.loss_lookup[index] = self.losses[value] + self.mac_id = None + + def get_loss(self, sinr: float) -> float: + index = self._get_index(sinr) + loss = 100.0 - self.loss_lookup[index] + return loss + + def _get_index(self, current_sinr: float) -> int: + for index, sinr in enumerate(self.sinrs): + if current_sinr <= sinr: + return index + return len(self.sinrs) - 1 + + +class EmaneLink: + def __init__(self, from_nem: int, to_nem: int, sinr: float) -> None: + self.from_nem = from_nem + self.to_nem = to_nem + self.sinr = sinr + self.last_seen = None + self.touch() + + def update(self, sinr: float) -> None: + self.sinr = sinr + self.touch() + + def touch(self) -> None: + self.last_seen = time.monotonic() + + def is_dead(self, timeout: int) -> bool: + return (time.monotonic() - self.last_seen) >= timeout + + def __repr__(self) -> str: + return f"EmaneLink({self.from_nem}, {self.to_nem}, {self.sinr})" + + +class EmaneClient: + def __init__(self, address: str) -> None: + self.address = address + self.client = shell.ControlPortClient(self.address, DEFAULT_PORT) + self.nems = {} + self.setup() + + def setup(self) -> None: + manifest = self.client.getManifest() + for nem_id, components in manifest.items(): + # get mac config + mac_id, _, emane_model = components[MAC_COMPONENT_INDEX] + mac_config = self.client.getConfiguration(mac_id) + logging.debug( + "address(%s) nem(%s) emane(%s)", self.address, nem_id, emane_model + ) + + # create loss table based on current configuration + if emane_model == EMANE_80211: + loss_table = self.handle_80211(mac_config) + elif emane_model == EMANE_RFPIPE: + loss_table = self.handle_rfpipe(mac_config) + else: + logging.warning("unknown emane link model: %s", emane_model) + continue + logging.info("monitoring links nem(%s) model(%s)", nem_id, emane_model) + loss_table.mac_id = mac_id + self.nems[nem_id] = loss_table + + def check_links( + self, links: Dict[Tuple[int, int], EmaneLink], loss_threshold: int + ) -> None: + for from_nem, loss_table in self.nems.items(): + tables = self.client.getStatisticTable(loss_table.mac_id, (SINR_TABLE,)) + table = tables[SINR_TABLE][1:][0] + for row in table: + row = row + to_nem = row[0][0] + sinr = row[5][0] + age = row[-1][0] + + # exclude invalid links + is_self = to_nem == NEM_SELF + has_valid_age = 0 <= age <= 1 + if is_self or not has_valid_age: + continue + + # check if valid link loss + link_key = (from_nem, to_nem) + loss = loss_table.get_loss(sinr) + if loss < loss_threshold: + link = links.get(link_key) + if link: + link.update(sinr) + else: + link = EmaneLink(from_nem, to_nem, sinr) + links[link_key] = link + + def handle_tdma(self, config: Dict[str, Tuple]): + pcr = config["pcrcurveuri"][0][0] + logging.debug("tdma pcr: %s", pcr) + + def handle_80211(self, config: Dict[str, Tuple]) -> LossTable: + unicastrate = config["unicastrate"][0][0] + pcr = config["pcrcurveuri"][0][0] + logging.debug("80211 pcr: %s", pcr) + tree = etree.parse(pcr) + root = tree.getroot() + table = root.find("table") + losses = {} + for rate in table.iter("datarate"): + index = int(rate.get("index")) + if index == unicastrate: + for row in rate.iter("row"): + sinr = float(row.get("sinr")) + por = float(row.get("por")) + losses[sinr] = por + return LossTable(losses) + + def handle_rfpipe(self, config: Dict[str, Tuple]) -> LossTable: + pcr = config["pcrcurveuri"][0][0] + logging.debug("rfpipe pcr: %s", pcr) + tree = etree.parse(pcr) + root = tree.getroot() + table = root.find("table") + losses = {} + for row in table.iter("row"): + sinr = float(row.get("sinr")) + por = float(row.get("por")) + losses[sinr] = por + return LossTable(losses) + + def stop(self) -> None: + self.client.stop() + + +class EmaneLinkMonitor: + def __init__(self, emane_manager: "EmaneManager") -> None: + self.emane_manager = emane_manager + self.clients = [] + self.links = {} + self.complete_links = set() + self.loss_threshold = None + self.link_interval = None + self.link_timeout = None + self.scheduler = None + self.running = False + + def start(self) -> None: + self.loss_threshold = int(self.emane_manager.get_config("loss_threshold")) + self.link_interval = int(self.emane_manager.get_config("link_interval")) + self.link_timeout = int(self.emane_manager.get_config("link_timeout")) + self.initialize() + if not self.clients: + logging.info("no valid emane models to monitor links") + return + self.scheduler = sched.scheduler() + self.scheduler.enter(0, 0, self.check_links) + self.running = True + thread = threading.Thread(target=self.scheduler.run, daemon=True) + thread.start() + + def initialize(self) -> None: + addresses = self.get_addresses() + for address in addresses: + client = EmaneClient(address) + if client.nems: + self.clients.append(client) + + def get_addresses(self) -> List[str]: + addresses = [] + nodes = self.emane_manager.getnodes() + for node in nodes: + for netif in node.netifs(): + if isinstance(netif.net, CtrlNet): + ip4 = None + for x in netif.addrlist: + address, prefix = x.split("/") + if netaddr.valid_ipv4(address): + ip4 = address + if ip4: + addresses.append(ip4) + break + return addresses + + def check_links(self) -> None: + # check for new links + previous_links = set(self.links.keys()) + for client in self.clients: + try: + client.check_links(self.links, self.loss_threshold) + except shell.ControlPortException: + if self.running: + logging.exception("link monitor error") + + # find new links + current_links = set(self.links.keys()) + new_links = current_links - previous_links + + # find dead links + dead_links = [] + for link_id, link in self.links.items(): + if link.is_dead(self.link_timeout): + dead_links.append(link_id) + + # announce dead links + for link_id in dead_links: + del self.links[link_id] + complete_id = self.get_complete_id(link_id) + if complete_id in self.complete_links: + self.complete_links.remove(complete_id) + self.send_link(MessageFlags.DELETE, complete_id) + + # announce new links + for link_id in new_links: + complete_id = self.get_complete_id(link_id) + if complete_id in self.complete_links: + continue + if self.is_complete_link(link_id): + self.complete_links.add(complete_id) + self.send_link(MessageFlags.ADD, complete_id) + + if self.running: + self.scheduler.enter(self.link_interval, 0, self.check_links) + + def get_complete_id(self, link_id: Tuple[int, int]) -> Tuple[int, int]: + value_one, value_two = link_id + if value_one < value_two: + return value_one, value_two + else: + return value_two, value_one + + def is_complete_link(self, link_id: Tuple[int, int]) -> bool: + reverse_id = link_id[1], link_id[0] + return link_id in self.links and reverse_id in self.links + + def send_link(self, message_type: MessageFlags, link_id: Tuple[int, int]) -> None: + nem_one, nem_two = link_id + emane_one, netif = self.emane_manager.nemlookup(nem_one) + if not emane_one or not netif: + logging.error("invalid nem: %s", nem_one) + return + node_one = netif.node + emane_two, netif = self.emane_manager.nemlookup(nem_two) + if not emane_two or not netif: + logging.error("invalid nem: %s", nem_two) + return + node_two = netif.node + logging.debug( + "%s emane link from %s(%s) to %s(%s)", + message_type.name, + node_one.name, + nem_one, + node_two.name, + nem_two, + ) + self.send_message(message_type, node_one.id, node_two.id, emane_one.id) + + def send_message(self, message_type, node_one, node_two, emane_id) -> None: + link_data = LinkData( + message_type=message_type, + node1_id=node_one, + node2_id=node_two, + network_id=emane_id, + link_type=LinkTypes.WIRELESS, + ) + self.emane_manager.session.broadcast_link(link_data) + + def stop(self) -> None: + self.running = False + for client in self.clients: + client.stop() + self.clients.clear() + self.links.clear() + self.complete_links.clear() diff --git a/daemon/core/plugins/sdt.py b/daemon/core/plugins/sdt.py index a759228d..0c23f567 100644 --- a/daemon/core/plugins/sdt.py +++ b/daemon/core/plugins/sdt.py @@ -21,11 +21,18 @@ if TYPE_CHECKING: from core.emulator.session import Session -def link_data_params(link_data: LinkData) -> Tuple[int, int, bool]: +def link_data_params(link_data: LinkData) -> Tuple[int, int, bool, int]: node_one = link_data.node1_id node_two = link_data.node2_id is_wireless = link_data.link_type == LinkTypes.WIRELESS - return node_one, node_two, is_wireless + network_id = link_data.network_id + return node_one, node_two, is_wireless, network_id + + +CORE_LAYER = "CORE" +NODE_LAYER = "CORE::Nodes" +LINK_LAYER = "CORE::Links" +CORE_LAYERS = [CORE_LAYER, LINK_LAYER, NODE_LAYER] class Sdt: @@ -200,6 +207,10 @@ class Sdt: :return: nothing """ nets = [] + # create layers + for layer in CORE_LAYERS: + self.cmd(f"layer {layer}") + with self.session._nodes_lock: for node_id in self.session.nodes: node = self.session.nodes[node_id] @@ -253,7 +264,10 @@ class Sdt: icon = icon.replace("$CORE_DATA_DIR", constants.CORE_DATA_DIR) icon = icon.replace("$CORE_CONF_DIR", constants.CORE_CONF_DIR) self.cmd(f"sprite {node_type} image {icon}") - self.cmd(f'node {node.id} type {node_type} label on,"{node.name}" {pos}') + self.cmd( + f'node {node.id} nodeLayer "{NODE_LAYER}" ' + f'type {node_type} label on,"{node.name}" {pos}' + ) def edit_node(self, node: NodeBase, lon: float, lat: float, alt: float) -> None: """ @@ -333,13 +347,16 @@ class Sdt: pass return result - def add_link(self, node_one: int, node_two: int, is_wireless: bool) -> None: + def add_link( + self, node_one: int, node_two: int, is_wireless: bool, network_id: int = None + ) -> None: """ Handle adding a link in SDT. :param node_one: node one id :param node_two: node two id :param is_wireless: True if link is wireless, False otherwise + :param network_id: network link is associated with :return: nothing """ logging.debug("sdt add link: %s, %s, %s", node_one, node_two, is_wireless) @@ -351,7 +368,15 @@ class Sdt: attr = "green,2" else: attr = "red,2" - self.cmd(f"link {node_one},{node_two} line {attr}") + link_id = f"{node_one}-{node_two}" + if network_id is not None: + link_id = f"{link_id}-{network_id}" + layer = LINK_LAYER + if network_id: + node = self.session.nodes[network_id] + network_name = node.name + layer = f"{layer}::{network_name}" + self.cmd(f"link {node_one},{node_two},{link_id} linkLayer {layer} line {attr}") def delete_link(self, node_one: int, node_two: int) -> None: """ diff --git a/daemon/core/xml/corexml.py b/daemon/core/xml/corexml.py index cbc7f937..87bf2e76 100644 --- a/daemon/core/xml/corexml.py +++ b/daemon/core/xml/corexml.py @@ -75,10 +75,10 @@ def create_emane_config(session: "Session") -> etree.Element: for emulator_config in session.emane.emane_config.emulator_config: value = config[emulator_config.id] add_configuration(emulator_element, emulator_config.id, value) - nem_element = etree.SubElement(emane_configuration, "nem") - for nem_config in session.emane.emane_config.nem_config: - value = config[nem_config.id] - add_configuration(nem_element, nem_config.id, value) + core_element = etree.SubElement(emane_configuration, "core") + for core_config in session.emane.emane_config.core_config: + value = config[core_config.id] + add_configuration(core_element, core_config.id, value) return emane_configuration @@ -737,8 +737,8 @@ class CoreXmlReader: name = config.get("name") value = config.get("value") configs[name] = value - nem_configuration = emane_global_configuration.find("nem") - for config in nem_configuration.iterchildren(): + core_configuration = emane_global_configuration.find("core") + for config in core_configuration.iterchildren(): name = config.get("name") value = config.get("value") configs[name] = value