""" Provides an EMANE network node class, which has several attached NEMs that share the same MAC+PHY model. """ import logging from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type from core.emulator.data import InterfaceData, LinkData, LinkOptions from core.emulator.distributed import DistributedServer from core.emulator.enumerations import ( EventTypes, LinkTypes, MessageFlags, NodeTypes, RegisterTlvs, ) from core.errors import CoreError from core.nodes.base import CoreNetworkBase, CoreNode from core.nodes.interface import CoreInterface if TYPE_CHECKING: from core.emane.emanemodel import EmaneModel from core.emulator.session import Session from core.location.mobility import WirelessModel, WayPointMobility OptionalEmaneModel = Optional[EmaneModel] WirelessModelType = Type[WirelessModel] try: from emane.events import LocationEvent except ImportError: try: from emanesh.events import LocationEvent except ImportError: LocationEvent = None logging.debug("compatible emane python bindings not installed") class EmaneNet(CoreNetworkBase): """ EMANE node contains NEM configuration and causes connected nodes to have TAP interfaces (instead of VEth). These are managed by the Emane controller object that exists in a session. """ apitype: NodeTypes = NodeTypes.EMANE linktype: LinkTypes = LinkTypes.WIRED type: str = "wlan" has_custom_iface: bool = True def __init__( self, session: "Session", _id: int = None, name: str = None, server: DistributedServer = None, ) -> None: super().__init__(session, _id, name, server) self.conf: str = "" self.model: "OptionalEmaneModel" = None self.mobility: Optional[WayPointMobility] = None def linkconfig( self, iface: CoreInterface, options: LinkOptions, iface2: CoreInterface = None ) -> None: """ The CommEffect model supports link configuration. """ if not self.model: return self.model.linkconfig(iface, options, iface2) def config(self, conf: str) -> None: self.conf = conf def startup(self) -> None: pass def shutdown(self) -> None: pass def link(self, iface1: CoreInterface, iface2: CoreInterface) -> None: pass def unlink(self, iface1: CoreInterface, iface2: CoreInterface) -> None: pass def linknet(self, net: "CoreNetworkBase") -> CoreInterface: raise CoreError("emane networks cannot be linked to other networks") def updatemodel(self, config: Dict[str, str]) -> None: if not self.model: raise CoreError(f"no model set to update for node({self.name})") logging.info( "node(%s) updating model(%s): %s", self.id, self.model.name, config ) self.model.update_config(config) def setmodel(self, model: "WirelessModelType", config: Dict[str, str]) -> None: """ set the EmaneModel associated with this node """ if model.config_type == RegisterTlvs.WIRELESS: # EmaneModel really uses values from ConfigurableManager # when buildnemxml() is called, not during init() self.model = model(session=self.session, _id=self.id) self.model.update_config(config) elif model.config_type == RegisterTlvs.MOBILITY: self.mobility = model(session=self.session, _id=self.id) self.mobility.update_config(config) def _nem_position( self, iface: CoreInterface ) -> Optional[Tuple[int, float, float, float]]: """ Creates nem position for emane event for a given interface. :param iface: interface to get nem emane position for :return: nem position tuple, None otherwise """ nem_id = self.session.emane.get_nem_id(iface) ifname = iface.localname if nem_id is None: logging.info("nemid for %s is unknown", ifname) return node = iface.node x, y, z = node.getposition() lat, lon, alt = self.session.location.getgeo(x, y, z) if node.position.alt is not None: alt = node.position.alt node.position.set_geo(lon, lat, alt) # altitude must be an integer or warning is printed alt = int(round(alt)) return nem_id, lon, lat, alt def setnemposition(self, iface: CoreInterface) -> None: """ Publish a NEM location change event using the EMANE event service. :param iface: interface to set nem position for """ if self.session.emane.service is None: logging.info("position service not available") return position = self._nem_position(iface) if position: nemid, lon, lat, alt = position event = LocationEvent() event.append(nemid, latitude=lat, longitude=lon, altitude=alt) self.session.emane.service.publish(0, event) def setnempositions(self, moved_ifaces: List[CoreInterface]) -> None: """ Several NEMs have moved, from e.g. a WaypointMobilityModel calculation. Generate an EMANE Location Event having several entries for each interface that has moved. """ if len(moved_ifaces) == 0: return if self.session.emane.service is None: logging.info("position service not available") return event = LocationEvent() for iface in moved_ifaces: position = self._nem_position(iface) if position: nemid, lon, lat, alt = position event.append(nemid, latitude=lat, longitude=lon, altitude=alt) self.session.emane.service.publish(0, event) def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]: links = super().links(flags) emane_manager = self.session.emane # gather current emane links nem_ids = set() for iface in self.get_ifaces(): nem_id = emane_manager.get_nem_id(iface) nem_ids.add(nem_id) emane_links = emane_manager.link_monitor.links considered = set() for link_key in emane_links: considered_key = tuple(sorted(link_key)) if considered_key in considered: continue considered.add(considered_key) nem1, nem2 = considered_key # ignore links not related to this node if nem1 not in nem_ids and nem2 not in nem_ids: continue # ignore incomplete links if (nem2, nem1) not in emane_links: continue link = emane_manager.get_nem_link(nem1, nem2) if link: links.append(link) return links def custom_iface(self, node: CoreNode, iface_data: InterfaceData) -> CoreInterface: # TUN/TAP is not ready for addressing yet; the device may # take some time to appear, and installing it into a # namespace after it has been bound removes addressing; # save addresses with the interface now iface_id = node.newtuntap(iface_data.id, iface_data.name) node.attachnet(iface_id, self) iface = node.get_iface(iface_id) iface.set_mac(iface_data.mac) for ip in iface_data.get_ips(): iface.add_ip(ip) if self.session.state == EventTypes.RUNTIME_STATE: self.session.emane.start_iface(self, iface) return iface