daemon: updates to provide new logic for emane event services, creating one per unique control channel, added mapping for nems to associated service for generated events

This commit is contained in:
Blake Harnden 2021-05-24 21:41:05 -07:00
parent ef0fa8c1a7
commit bcd9cc7ac2
5 changed files with 146 additions and 142 deletions

View file

@ -7,7 +7,7 @@ import os
import threading
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type, Union
from core import utils
from core.emane.emanemodel import EmaneModel
@ -27,15 +27,19 @@ if TYPE_CHECKING:
from core.emulator.session import Session
try:
from emane.events import EventService, PathlossEvent
from emane.events import LocationEvent
from emane.events import EventService, PathlossEvent, CommEffectEvent, LocationEvent
from emane.events.eventserviceexception import EventServiceException
except ImportError:
try:
from emanesh.events import EventService
from emanesh.events import LocationEvent
from emanesh.events import (
EventService,
PathlossEvent,
CommEffectEvent,
LocationEvent,
)
from emanesh.events.eventserviceexception import EventServiceException
except ImportError:
CommEffectEvent = None
EventService = None
LocationEvent = None
PathlossEvent = None
@ -59,6 +63,59 @@ class StartData:
ifaces: List[CoreInterface] = field(default_factory=list)
class EmaneEventService:
def __init__(
self, manager: "EmaneManager", device: str, group: str, port: int
) -> None:
self.manager: "EmaneManager" = manager
self.device: str = device
self.group: str = group
self.port: int = port
self.running: bool = False
self.thread: Optional[threading.Thread] = None
logger.info("starting emane event service %s %s:%s", device, group, port)
self.events: EventService = EventService(
eventchannel=(group, port, device), otachannel=None
)
def start(self) -> None:
self.running = True
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
def run(self) -> None:
"""
Run and monitor events.
"""
logger.info("subscribing to emane location events")
while self.running:
_uuid, _seq, events = self.events.nextEvent()
# this occurs with 0.9.1 event service
if not self.running:
break
for event in events:
nem, eid, data = event
if eid == LocationEvent.IDENTIFIER:
self.manager.handlelocationevent(nem, eid, data)
logger.info("unsubscribing from emane location events")
def stop(self) -> None:
"""
Stop service and monitoring events.
"""
self.events.breakloop()
self.running = False
if self.thread:
self.thread.join()
self.thread = None
for fd in self.events._readFd, self.events._writeFd:
if fd >= 0:
os.close(fd)
for f in self.events._socket, self.events._socketOTA:
if f:
f.close()
class EmaneManager:
"""
EMANE controller object. Lives in a Session instance and is used for
@ -98,10 +155,10 @@ class EmaneManager:
# link monitor
self.link_monitor: EmaneLinkMonitor = EmaneLinkMonitor(self)
self.service: Optional[EventService] = None
# emane event monitoring
self.services: Dict[str, EmaneEventService] = {}
self.nem_service: Dict[int, EmaneEventService] = {}
self.eventchannel: Optional[Tuple[str, int, str]] = None
self.event_device: Optional[str] = None
def next_nem_id(self, iface: CoreInterface) -> int:
nem_id = self.session.options.get_config_int("nem_id_start")
@ -201,47 +258,6 @@ class EmaneManager:
self.node_configs.get(node_id, {}).clear()
self.node_models.pop(node_id, None)
def deleteeventservice(self) -> None:
if self.service:
for fd in self.service._readFd, self.service._writeFd:
if fd >= 0:
os.close(fd)
for f in self.service._socket, self.service._socketOTA:
if f:
f.close()
self.service = None
self.event_device = None
def initeventservice(self) -> None:
"""
Re-initialize the EMANE Event service.
The multicast group and/or port may be configured.
"""
# Get the control network to be used for events
group, port = "224.1.2.8:45703".split(":")
self.event_device = DEFAULT_DEV
eventnetidx = self.session.get_control_net_index(self.event_device)
if eventnetidx < 0:
logger.error(
"invalid emane event service device provided: %s", self.event_device
)
return
# make sure the event control network is in place
eventnet = self.session.add_remove_control_net(
net_index=eventnetidx, remove=False, conf_required=False
)
if eventnet is not None:
# direct EMANE events towards control net bridge
self.event_device = eventnet.brname
self.eventchannel = (group, int(port), self.event_device)
# disabled otachannel for event service
# only needed for e.g. antennaprofile events xmit by models
logger.info("using %s for emane event service", self.event_device)
try:
self.service = EventService(eventchannel=self.eventchannel, otachannel=None)
except EventServiceException:
logger.exception("error starting emane event service")
def add_node(self, emane_net: EmaneNet) -> None:
"""
Add EMANE network object to this manager.
@ -304,9 +320,6 @@ class EmaneManager:
status = self.setup()
if status != EmaneState.SUCCESS:
return status
self.initeventservice()
if self.service and self.doeventmonitor():
self.starteventmonitor()
self.startup_nodes()
if self.links_enabled():
self.link_monitor.start()
@ -319,9 +332,15 @@ class EmaneManager:
for data in start_data:
node = data.node
for iface in data.ifaces:
if isinstance(node, CoreNode):
self.setup_ota(node, iface)
emanexml.build_platform_xml(self, node, iface)
nem_id = self.next_nem_id(iface)
logger.info(
"starting emane for node(%s) iface(%s) nem(%s)",
node.name,
iface.name,
nem_id,
)
self.setup_control_channels(nem_id, node, iface)
emanexml.build_platform_xml(self, nem_id, node, iface)
self.start_daemon(node, iface)
self.install_iface(iface)
@ -347,7 +366,9 @@ class EmaneManager:
start_node.ifaces = sorted(start_node.ifaces, key=lambda x: x.node_id)
return start_nodes
def setup_ota(self, node: CoreNode, iface: CoreInterface) -> None:
def setup_control_channels(
self, nem_id: int, node: CoreNodeBase, iface: CoreInterface
) -> None:
if not isinstance(iface.net, EmaneNet):
raise CoreError(
f"emane interface not connected to emane net: {iface.net.name}"
@ -358,13 +379,35 @@ class EmaneManager:
otadev = config["otamanagerdevice"]
ota_index = self.session.get_control_net_index(otadev)
self.session.add_remove_control_net(ota_index, conf_required=False)
self.session.add_remove_control_iface(node, ota_index, conf_required=False)
if isinstance(node, CoreNode):
self.session.add_remove_control_iface(node, ota_index, conf_required=False)
# setup event device
eventgroup, _eventport = config["eventservicegroup"].split(":")
eventgroup, eventport = config["eventservicegroup"].split(":")
eventdev = config["eventservicedevice"]
event_index = self.session.get_control_net_index(eventdev)
self.session.add_remove_control_net(event_index, conf_required=False)
self.session.add_remove_control_iface(node, event_index, conf_required=False)
event_net = self.session.add_remove_control_net(
event_index, conf_required=False
)
if isinstance(node, CoreNode):
self.session.add_remove_control_iface(
node, event_index, conf_required=False
)
# initialize emane event services
service = self.services.get(event_net.brname)
if not service:
try:
service = EmaneEventService(
self, event_net.brname, eventgroup, int(eventport)
)
self.services[event_net.brname] = service
self.nem_service[nem_id] = service
except EventServiceException:
raise CoreError(
"failed to start emane event services "
f"{event_net.brname} {eventgroup}:{eventport}"
)
else:
self.nem_service[nem_id] = service
# setup multicast routes as needed
logger.info(
"node(%s) interface(%s) ota(%s:%s) event(%s:%s)",
@ -451,7 +494,11 @@ class EmaneManager:
node.cmd(kill_emaned, wait=False)
else:
node.host_cmd(kill_emaned, wait=False)
self.stopeventmonitor()
# stop emane event services
while self.services:
_, service = self.services.popitem()
service.stop()
self.nem_service.clear()
def check_node_models(self) -> None:
"""
@ -509,13 +556,6 @@ class EmaneManager:
Start one EMANE daemon per node having a radio.
Add a control network even if the user has not configured one.
"""
nem = self.get_nem_id(iface)
logger.info(
"starting emane daemon node(%s) iface(%s) nem(%s)",
node.name,
iface.name,
nem,
)
loglevel = str(DEFAULT_LOG_LEVEL)
cfgloglevel = self.session.options.get_config_int("emane_log_level")
realtime = self.session.options.get_config_bool("emane_realtime", default=True)
@ -572,48 +612,6 @@ class EmaneManager:
tmp = not self.doeventmonitor()
return tmp
def starteventmonitor(self) -> None:
"""
Start monitoring EMANE location events if configured to do so.
"""
logger.info("starting emane event monitor")
self.doeventloop = True
self.eventmonthread = threading.Thread(
target=self.eventmonitorloop, daemon=True
)
self.eventmonthread.start()
def stopeventmonitor(self) -> None:
"""
Stop monitoring EMANE location events.
"""
self.doeventloop = False
if self.service is not None:
self.service.breakloop()
if self.eventmonthread is not None:
self.eventmonthread.join()
self.eventmonthread = None
# reset the service, otherwise nextEvent won"t work
self.deleteeventservice()
def eventmonitorloop(self) -> None:
"""
Thread target that monitors EMANE location events.
"""
if self.service is None:
return
logger.info("subscribing to EMANE location events")
while self.doeventloop:
_uuid, _seq, events = self.service.nextEvent()
# this occurs with 0.9.1 event service
if not self.doeventloop:
break
for event in events:
nem, eid, data = event
if eid == LocationEvent.IDENTIFIER:
self.handlelocationevent(nem, eid, data)
logger.info("unsubscribing from EMANE location events")
def handlelocationevent(self, rxnemid: int, eid: int, data: str) -> None:
"""
Handle an EMANE location event.
@ -629,7 +627,6 @@ class EmaneManager:
):
logger.warning("dropped invalid location event")
continue
# yaw,pitch,roll,azimuth,elevation,velocity are unhandled
lat = attrs["latitude"]
lon = attrs["longitude"]
@ -724,5 +721,19 @@ class EmaneManager:
event = PathlossEvent()
event.append(nem1, forward=rx1)
event.append(nem2, forward=rx2)
self.service.publish(nem1, event)
self.service.publish(nem2, event)
self.publish_event(nem1, event)
self.publish_event(nem2, event)
def publish_event(
self,
nem_id: int,
event: Union[PathlossEvent, CommEffectEvent, LocationEvent],
send_all: bool = False,
) -> None:
service = self.nem_service.get(nem_id)
if not service:
logger.error("no service to publish event nem(%s)", nem_id)
return
if send_all:
nem_id = 0
service.events.publish(nem_id, event)

View file

@ -122,15 +122,9 @@ class EmaneCommEffectModel(emanemodel.EmaneModel):
Generate CommEffect events when a Link Message is received having
link parameters.
"""
service = self.session.emane.service
if service is None:
logger.warning("%s: EMANE event service unavailable", self.name)
return
if iface is None or iface2 is None:
logger.warning("%s: missing NEM information", self.name)
return
# TODO: batch these into multiple events per transmission
# TODO: may want to split out seconds portion of delay and jitter
event = CommEffectEvent()
@ -146,4 +140,4 @@ class EmaneCommEffectModel(emanemodel.EmaneModel):
unicast=int(convert_none(options.bandwidth)),
broadcast=int(convert_none(options.bandwidth)),
)
service.publish(nem2, event)
self.session.emane.publish_event(nem2, event)

View file

@ -59,8 +59,9 @@ class EmaneTdmaModel(emanemodel.EmaneModel):
logger.warning("ignoring invalid tdma schedule: %s", schedule)
return
# initiate tdma schedule
event_device = self.session.emane.event_device
logger.info(
"setting up tdma schedule: schedule(%s) device(%s)", schedule, event_device
)
utils.cmd(f"emaneevent-tdmaschedule -i {event_device} {schedule}")
for service in self.session.emane.services.values():
device = service.device
logger.info(
"setting up tdma schedule: schedule(%s) device(%s)", schedule, device
)
utils.cmd(f"emaneevent-tdmaschedule -i {device} {schedule}")

View file

@ -140,15 +140,12 @@ class EmaneNet(CoreNetworkBase):
:param iface: interface to set nem position for
"""
if self.session.emane.service is None:
logger.info("position service not available")
return
position = self._nem_position(iface)
if position:
nemid, lon, lat, alt = position
event = LocationEvent()
event.append(nemid, latitude=lat, longitude=lon, altitude=alt)
self.session.emane.service.publish(0, event)
self.session.emane.publish_event(nemid, event, send_all=True)
def setnempositions(self, moved_ifaces: List[CoreInterface]) -> None:
"""
@ -156,20 +153,21 @@ class EmaneNet(CoreNetworkBase):
calculation. Generate an EMANE Location Event having several
entries for each interface that has moved.
"""
if len(moved_ifaces) == 0:
if not moved_ifaces:
return
if self.session.emane.service is None:
logger.info("position service not available")
return
event = LocationEvent()
services = {}
for iface in moved_ifaces:
position = self._nem_position(iface)
if position:
nemid, lon, lat, alt = position
event.append(nemid, latitude=lat, longitude=lon, altitude=alt)
self.session.emane.service.publish(0, event)
if not position:
continue
nem_id, lon, lat, alt = position
service = self.session.emane.nem_service.get(nem_id)
if not service:
continue
event = services.setdefault(service, LocationEvent())
event.append(nem_id, latitude=lat, longitude=lon, altitude=alt)
for service, event in services.items():
service.events.publish(0, event)
def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]:
links = super().links(flags)

View file

@ -145,13 +145,14 @@ def add_configurations(
def build_platform_xml(
emane_manager: "EmaneManager", node: CoreNodeBase, iface: CoreInterface
emane_manager: "EmaneManager", nem_id: int, node: CoreNodeBase, iface: CoreInterface
) -> None:
"""
Create platform xml for a specific node.
:param emane_manager: emane manager with emane
configurations
:param nem_id: nem id for current node/interface
:param node: node to create a platform xml for
:param iface: node interface to create platform xml for
:return: the next nem id that can be used for creating platform xml files
@ -160,7 +161,6 @@ def build_platform_xml(
emane_net = iface.net
if not isinstance(emane_net, EmaneNet):
raise CoreError(f"emane interface not connected to emane net: {emane_net.name}")
nem_id = emane_manager.next_nem_id(iface)
config = emane_manager.get_iface_config(emane_net, iface)
emane_net.model.build_xml_files(config, iface)