diff --git a/daemon/core/emane/emanemanager.py b/daemon/core/emane/emanemanager.py index f61bf452..47279592 100644 --- a/daemon/core/emane/emanemanager.py +++ b/daemon/core/emane/emanemanager.py @@ -7,7 +7,7 @@ import os import threading from dataclasses import dataclass, field from enum import Enum -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type, Union from core import utils from core.emane.emanemodel import EmaneModel @@ -27,15 +27,19 @@ if TYPE_CHECKING: from core.emulator.session import Session try: - from emane.events import EventService, PathlossEvent - from emane.events import LocationEvent + from emane.events import EventService, PathlossEvent, CommEffectEvent, LocationEvent from emane.events.eventserviceexception import EventServiceException except ImportError: try: - from emanesh.events import EventService - from emanesh.events import LocationEvent + from emanesh.events import ( + EventService, + PathlossEvent, + CommEffectEvent, + LocationEvent, + ) from emanesh.events.eventserviceexception import EventServiceException except ImportError: + CommEffectEvent = None EventService = None LocationEvent = None PathlossEvent = None @@ -59,6 +63,59 @@ class StartData: ifaces: List[CoreInterface] = field(default_factory=list) +class EmaneEventService: + def __init__( + self, manager: "EmaneManager", device: str, group: str, port: int + ) -> None: + self.manager: "EmaneManager" = manager + self.device: str = device + self.group: str = group + self.port: int = port + self.running: bool = False + self.thread: Optional[threading.Thread] = None + logger.info("starting emane event service %s %s:%s", device, group, port) + self.events: EventService = EventService( + eventchannel=(group, port, device), otachannel=None + ) + + def start(self) -> None: + self.running = True + self.thread = threading.Thread(target=self.run, daemon=True) + self.thread.start() + + def run(self) -> None: + """ + Run and monitor events. + """ + logger.info("subscribing to emane location events") + while self.running: + _uuid, _seq, events = self.events.nextEvent() + # this occurs with 0.9.1 event service + if not self.running: + break + for event in events: + nem, eid, data = event + if eid == LocationEvent.IDENTIFIER: + self.manager.handlelocationevent(nem, eid, data) + logger.info("unsubscribing from emane location events") + + def stop(self) -> None: + """ + Stop service and monitoring events. + """ + self.events.breakloop() + self.running = False + if self.thread: + self.thread.join() + self.thread = None + for fd in self.events._readFd, self.events._writeFd: + if fd >= 0: + os.close(fd) + for f in self.events._socket, self.events._socketOTA: + if f: + f.close() + + class EmaneManager: """ EMANE controller object. Lives in a Session instance and is used for @@ -98,10 +155,10 @@ class EmaneManager: # link monitor self.link_monitor: EmaneLinkMonitor = EmaneLinkMonitor(self) - - self.service: Optional[EventService] = None + # emane event monitoring + self.services: Dict[str, EmaneEventService] = {} + self.nem_service: Dict[int, EmaneEventService] = {} self.eventchannel: Optional[Tuple[str, int, str]] = None - self.event_device: Optional[str] = None def next_nem_id(self, iface: CoreInterface) -> int: nem_id = self.session.options.get_config_int("nem_id_start") @@ -201,47 +258,6 @@ class EmaneManager: self.node_configs.get(node_id, {}).clear() self.node_models.pop(node_id, None) - def deleteeventservice(self) -> None: - if self.service: - for fd in self.service._readFd, self.service._writeFd: - if fd >= 0: - os.close(fd) - for f in self.service._socket, self.service._socketOTA: - if f: - f.close() - self.service = None - self.event_device = None - - def initeventservice(self) -> None: - """ - Re-initialize the EMANE Event service. - The multicast group and/or port may be configured. - """ - # Get the control network to be used for events - group, port = "224.1.2.8:45703".split(":") - self.event_device = DEFAULT_DEV - eventnetidx = self.session.get_control_net_index(self.event_device) - if eventnetidx < 0: - logger.error( - "invalid emane event service device provided: %s", self.event_device - ) - return - # make sure the event control network is in place - eventnet = self.session.add_remove_control_net( - net_index=eventnetidx, remove=False, conf_required=False - ) - if eventnet is not None: - # direct EMANE events towards control net bridge - self.event_device = eventnet.brname - self.eventchannel = (group, int(port), self.event_device) - # disabled otachannel for event service - # only needed for e.g. antennaprofile events xmit by models - logger.info("using %s for emane event service", self.event_device) - try: - self.service = EventService(eventchannel=self.eventchannel, otachannel=None) - except EventServiceException: - logger.exception("error starting emane event service") - def add_node(self, emane_net: EmaneNet) -> None: """ Add EMANE network object to this manager. @@ -304,9 +320,6 @@ class EmaneManager: status = self.setup() if status != EmaneState.SUCCESS: return status - self.initeventservice() - if self.service and self.doeventmonitor(): - self.starteventmonitor() self.startup_nodes() if self.links_enabled(): self.link_monitor.start() @@ -319,9 +332,15 @@ class EmaneManager: for data in start_data: node = data.node for iface in data.ifaces: - if isinstance(node, CoreNode): - self.setup_ota(node, iface) - emanexml.build_platform_xml(self, node, iface) + nem_id = self.next_nem_id(iface) + logger.info( + "starting emane for node(%s) iface(%s) nem(%s)", + node.name, + iface.name, + nem_id, + ) + self.setup_control_channels(nem_id, node, iface) + emanexml.build_platform_xml(self, nem_id, node, iface) self.start_daemon(node, iface) self.install_iface(iface) @@ -347,7 +366,9 @@ class EmaneManager: start_node.ifaces = sorted(start_node.ifaces, key=lambda x: x.node_id) return start_nodes - def setup_ota(self, node: CoreNode, iface: CoreInterface) -> None: + def setup_control_channels( + self, nem_id: int, node: CoreNodeBase, iface: CoreInterface + ) -> None: if not isinstance(iface.net, EmaneNet): raise CoreError( f"emane interface not connected to emane net: {iface.net.name}" @@ -358,13 +379,35 @@ class EmaneManager: otadev = config["otamanagerdevice"] ota_index = self.session.get_control_net_index(otadev) self.session.add_remove_control_net(ota_index, conf_required=False) - self.session.add_remove_control_iface(node, ota_index, conf_required=False) + if isinstance(node, CoreNode): + self.session.add_remove_control_iface(node, ota_index, conf_required=False) # setup event device - eventgroup, _eventport = config["eventservicegroup"].split(":") + eventgroup, eventport = config["eventservicegroup"].split(":") eventdev = config["eventservicedevice"] event_index = self.session.get_control_net_index(eventdev) - self.session.add_remove_control_net(event_index, conf_required=False) - self.session.add_remove_control_iface(node, event_index, conf_required=False) + event_net = self.session.add_remove_control_net( + event_index, conf_required=False + ) + if isinstance(node, CoreNode): + self.session.add_remove_control_iface( + node, event_index, conf_required=False + ) + # initialize emane event services + service = self.services.get(event_net.brname) + if not service: + try: + service = EmaneEventService( + self, event_net.brname, eventgroup, int(eventport) + ) + self.services[event_net.brname] = service + self.nem_service[nem_id] = service + except EventServiceException: + raise CoreError( + "failed to start emane event services " + f"{event_net.brname} {eventgroup}:{eventport}" + ) + else: + self.nem_service[nem_id] = service # setup multicast routes as needed logger.info( "node(%s) interface(%s) ota(%s:%s) event(%s:%s)", @@ -451,7 +494,11 @@ class EmaneManager: node.cmd(kill_emaned, wait=False) else: node.host_cmd(kill_emaned, wait=False) - self.stopeventmonitor() + # stop emane event services + while self.services: + _, service = self.services.popitem() + service.stop() + self.nem_service.clear() def check_node_models(self) -> None: """ @@ -509,13 +556,6 @@ class EmaneManager: Start one EMANE daemon per node having a radio. Add a control network even if the user has not configured one. """ - nem = self.get_nem_id(iface) - logger.info( - "starting emane daemon node(%s) iface(%s) nem(%s)", - node.name, - iface.name, - nem, - ) loglevel = str(DEFAULT_LOG_LEVEL) cfgloglevel = self.session.options.get_config_int("emane_log_level") realtime = self.session.options.get_config_bool("emane_realtime", default=True) @@ -572,48 +612,6 @@ class EmaneManager: tmp = not self.doeventmonitor() return tmp - def starteventmonitor(self) -> None: - """ - Start monitoring EMANE location events if configured to do so. - """ - logger.info("starting emane event monitor") - self.doeventloop = True - self.eventmonthread = threading.Thread( - target=self.eventmonitorloop, daemon=True - ) - self.eventmonthread.start() - - def stopeventmonitor(self) -> None: - """ - Stop monitoring EMANE location events. - """ - self.doeventloop = False - if self.service is not None: - self.service.breakloop() - if self.eventmonthread is not None: - self.eventmonthread.join() - self.eventmonthread = None - # reset the service, otherwise nextEvent won"t work - self.deleteeventservice() - - def eventmonitorloop(self) -> None: - """ - Thread target that monitors EMANE location events. - """ - if self.service is None: - return - logger.info("subscribing to EMANE location events") - while self.doeventloop: - _uuid, _seq, events = self.service.nextEvent() - # this occurs with 0.9.1 event service - if not self.doeventloop: - break - for event in events: - nem, eid, data = event - if eid == LocationEvent.IDENTIFIER: - self.handlelocationevent(nem, eid, data) - logger.info("unsubscribing from EMANE location events") - def handlelocationevent(self, rxnemid: int, eid: int, data: str) -> None: """ Handle an EMANE location event. @@ -629,7 +627,6 @@ class EmaneManager: ): logger.warning("dropped invalid location event") continue - # yaw,pitch,roll,azimuth,elevation,velocity are unhandled lat = attrs["latitude"] lon = attrs["longitude"] @@ -724,5 +721,19 @@ class EmaneManager: event = PathlossEvent() event.append(nem1, forward=rx1) event.append(nem2, forward=rx2) - self.service.publish(nem1, event) - self.service.publish(nem2, event) + self.publish_event(nem1, event) + self.publish_event(nem2, event) + + def publish_event( + self, + nem_id: int, + event: Union[PathlossEvent, CommEffectEvent, LocationEvent], + send_all: bool = False, + ) -> None: + service = self.nem_service.get(nem_id) + if not service: + logger.error("no service to publish event nem(%s)", nem_id) + return + if send_all: + nem_id = 0 + service.events.publish(nem_id, event) diff --git a/daemon/core/emane/models/commeffect.py b/daemon/core/emane/models/commeffect.py index b73dc837..c3f0b07b 100644 --- a/daemon/core/emane/models/commeffect.py +++ b/daemon/core/emane/models/commeffect.py @@ -122,15 +122,9 @@ class EmaneCommEffectModel(emanemodel.EmaneModel): Generate CommEffect events when a Link Message is received having link parameters. """ - service = self.session.emane.service - if service is None: - logger.warning("%s: EMANE event service unavailable", self.name) - return - if iface is None or iface2 is None: logger.warning("%s: missing NEM information", self.name) return - # TODO: batch these into multiple events per transmission # TODO: may want to split out seconds portion of delay and jitter event = CommEffectEvent() @@ -146,4 +140,4 @@ class EmaneCommEffectModel(emanemodel.EmaneModel): unicast=int(convert_none(options.bandwidth)), broadcast=int(convert_none(options.bandwidth)), ) - service.publish(nem2, event) + self.session.emane.publish_event(nem2, event) diff --git a/daemon/core/emane/models/tdma.py b/daemon/core/emane/models/tdma.py index 0ba756e4..62843ec1 100644 --- a/daemon/core/emane/models/tdma.py +++ b/daemon/core/emane/models/tdma.py @@ -59,8 +59,9 @@ class EmaneTdmaModel(emanemodel.EmaneModel): logger.warning("ignoring invalid tdma schedule: %s", schedule) return # initiate tdma schedule - event_device = self.session.emane.event_device - logger.info( - "setting up tdma schedule: schedule(%s) device(%s)", schedule, event_device - ) - utils.cmd(f"emaneevent-tdmaschedule -i {event_device} {schedule}") + for service in self.session.emane.services.values(): + device = service.device + logger.info( + "setting up tdma schedule: schedule(%s) device(%s)", schedule, device + ) + utils.cmd(f"emaneevent-tdmaschedule -i {device} {schedule}") diff --git a/daemon/core/emane/nodes.py b/daemon/core/emane/nodes.py index 1e43723b..12caf408 100644 --- a/daemon/core/emane/nodes.py +++ b/daemon/core/emane/nodes.py @@ -140,15 +140,12 @@ class EmaneNet(CoreNetworkBase): :param iface: interface to set nem position for """ - if self.session.emane.service is None: - logger.info("position service not available") - return position = self._nem_position(iface) if position: nemid, lon, lat, alt = position event = LocationEvent() event.append(nemid, latitude=lat, longitude=lon, altitude=alt) - self.session.emane.service.publish(0, event) + self.session.emane.publish_event(nemid, event, send_all=True) def setnempositions(self, moved_ifaces: List[CoreInterface]) -> None: """ @@ -156,20 +153,21 @@ class EmaneNet(CoreNetworkBase): calculation. Generate an EMANE Location Event having several entries for each interface that has moved. """ - if len(moved_ifaces) == 0: + if not moved_ifaces: return - - if self.session.emane.service is None: - logger.info("position service not available") - return - - event = LocationEvent() + services = {} for iface in moved_ifaces: position = self._nem_position(iface) - if position: - nemid, lon, lat, alt = position - event.append(nemid, latitude=lat, longitude=lon, altitude=alt) - self.session.emane.service.publish(0, event) + if not position: + continue + nem_id, lon, lat, alt = position + service = self.session.emane.nem_service.get(nem_id) + if not service: + continue + event = services.setdefault(service, LocationEvent()) + event.append(nem_id, latitude=lat, longitude=lon, altitude=alt) + for service, event in services.items(): + service.events.publish(0, event) def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]: links = super().links(flags) diff --git a/daemon/core/xml/emanexml.py b/daemon/core/xml/emanexml.py index e26d0488..9d0753cb 100644 --- a/daemon/core/xml/emanexml.py +++ b/daemon/core/xml/emanexml.py @@ -145,13 +145,14 @@ def add_configurations( def build_platform_xml( - emane_manager: "EmaneManager", node: CoreNodeBase, iface: CoreInterface + emane_manager: "EmaneManager", nem_id: int, node: CoreNodeBase, iface: CoreInterface ) -> None: """ Create platform xml for a specific node. :param emane_manager: emane manager with emane configurations + :param nem_id: nem id for current node/interface :param node: node to create a platform xml for :param iface: node interface to create platform xml for :return: the next nem id that can be used for creating platform xml files @@ -160,7 +161,6 @@ def build_platform_xml( emane_net = iface.net if not isinstance(emane_net, EmaneNet): raise CoreError(f"emane interface not connected to emane net: {emane_net.name}") - nem_id = emane_manager.next_nem_id(iface) config = emane_manager.get_iface_config(emane_net, iface) emane_net.model.build_xml_files(config, iface)