Merge pull request #416 from coreemu/enhancement/emane-links

Enhancement/emane links
This commit is contained in:
bharnden 2020-03-30 12:59:44 -07:00 committed by GitHub
commit 5d9b451b1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 397 additions and 30 deletions

View file

@ -19,11 +19,11 @@ jobs:
cp setup.py.in setup.py
cp core/constants.py.in core/constants.py
sed -i 's/True/False/g' core/constants.py
pipenv install --dev
pipenv sync --dev
- name: isort
run: |
cd daemon
pipenv run isort -c
pipenv run isort -c -df
- name: black
run: |
cd daemon

View file

@ -15,6 +15,7 @@ from core.emane.bypass import EmaneBypassModel
from core.emane.commeffect import EmaneCommEffectModel
from core.emane.emanemodel import EmaneModel
from core.emane.ieee80211abg import EmaneIeee80211abgModel
from core.emane.linkmonitor import EmaneLinkMonitor
from core.emane.nodes import EmaneNet
from core.emane.rfpipe import EmaneRfPipeModel
from core.emane.tdma import EmaneTdmaModel
@ -28,7 +29,6 @@ from core.xml import emanexml
if TYPE_CHECKING:
from core.emulator.session import Session
try:
from emane.events import EventService
from emane.events import LocationEvent
@ -90,6 +90,9 @@ class EmaneManager(ModelManager):
self.emane_config = EmaneGlobalModel(session)
self.set_configs(self.emane_config.default_values())
# link monitor
self.link_monitor = EmaneLinkMonitor(self)
self.service = None
self.eventchannel = None
self.event_device = None
@ -349,9 +352,13 @@ class EmaneManager(ModelManager):
f.write(f"{nodename} {ifname} {nemid}\n")
except IOError:
logging.exception("Error writing EMANE NEMs file: %s")
if self.links_enabled():
self.link_monitor.start()
return EmaneManager.SUCCESS
def links_enabled(self) -> bool:
return self.get_config("link_enabled") == "1"
def poststartup(self) -> None:
"""
Retransmit location events now that all NEMs are active.
@ -393,7 +400,9 @@ class EmaneManager(ModelManager):
with self._emane_node_lock:
if not self._emane_nets:
return
logging.info("stopping EMANE daemons.")
logging.info("stopping EMANE daemons")
if self.links_enabled():
self.link_monitor.stop()
self.deinstallnetifs()
self.stopdaemons()
self.stopeventmonitor()
@ -834,13 +843,43 @@ class EmaneGlobalModel:
def __init__(self, session: "Session") -> None:
self.session = session
self.nem_config = [
self.core_config = [
Configuration(
_id="platform_id_start",
_type=ConfigDataTypes.INT32,
default="1",
label="Starting Platform ID",
),
Configuration(
_id="nem_id_start",
_type=ConfigDataTypes.INT32,
default="1",
label="Starting NEM ID (core)",
)
label="Starting NEM ID",
),
Configuration(
_id="link_enabled",
_type=ConfigDataTypes.BOOL,
default="1",
label="Enable Links?",
),
Configuration(
_id="loss_threshold",
_type=ConfigDataTypes.INT32,
default="30",
label="Link Loss Threshold (%)",
),
Configuration(
_id="link_interval",
_type=ConfigDataTypes.INT32,
default="1",
label="Link Check Interval (sec)",
),
Configuration(
_id="link_timeout",
_type=ConfigDataTypes.INT32,
default="4",
label="Link Timeout (sec)",
),
]
self.emulator_config = None
self.parse_config()
@ -857,25 +896,16 @@ class EmaneGlobalModel:
"otamanagergroup": "224.1.2.8:45702",
}
self.emulator_config = emanemanifest.parse(emulator_xml, emulator_defaults)
self.emulator_config.insert(
0,
Configuration(
_id="platform_id_start",
_type=ConfigDataTypes.INT32,
default="1",
label="Starting Platform ID (core)",
),
)
def configurations(self) -> List[Configuration]:
return self.emulator_config + self.nem_config
return self.emulator_config + self.core_config
def config_groups(self) -> List[ConfigGroup]:
emulator_len = len(self.emulator_config)
config_len = len(self.configurations())
return [
ConfigGroup("Platform Attributes", 1, emulator_len),
ConfigGroup("NEM Parameters", emulator_len + 1, config_len),
ConfigGroup("CORE Configuration", emulator_len + 1, config_len),
]
def default_values(self) -> Dict[str, str]:

View file

@ -0,0 +1,312 @@
import logging
import sched
import threading
import time
from typing import TYPE_CHECKING, Dict, List, Tuple
import netaddr
from lxml import etree
from core.emulator.data import LinkData
from core.emulator.enumerations import LinkTypes, MessageFlags
from core.nodes.network import CtrlNet
try:
from emane import shell
except ImportError:
try:
from emanesh import shell
except ImportError:
logging.debug("compatible emane python bindings not installed")
if TYPE_CHECKING:
from core.emane.emanemanager import EmaneManager
DEFAULT_PORT = 47_000
MAC_COMPONENT_INDEX = 1
EMANE_RFPIPE = "rfpipemaclayer"
EMANE_80211 = "ieee80211abgmaclayer"
EMANE_TDMA = "tdmaeventschedulerradiomodel"
SINR_TABLE = "NeighborStatusTable"
NEM_SELF = 65535
class LossTable:
def __init__(self, losses: Dict[float, float]) -> None:
self.losses = losses
self.sinrs = sorted(self.losses.keys())
self.loss_lookup = {}
for index, value in enumerate(self.sinrs):
self.loss_lookup[index] = self.losses[value]
self.mac_id = None
def get_loss(self, sinr: float) -> float:
index = self._get_index(sinr)
loss = 100.0 - self.loss_lookup[index]
return loss
def _get_index(self, current_sinr: float) -> int:
for index, sinr in enumerate(self.sinrs):
if current_sinr <= sinr:
return index
return len(self.sinrs) - 1
class EmaneLink:
def __init__(self, from_nem: int, to_nem: int, sinr: float) -> None:
self.from_nem = from_nem
self.to_nem = to_nem
self.sinr = sinr
self.last_seen = None
self.touch()
def update(self, sinr: float) -> None:
self.sinr = sinr
self.touch()
def touch(self) -> None:
self.last_seen = time.monotonic()
def is_dead(self, timeout: int) -> bool:
return (time.monotonic() - self.last_seen) >= timeout
def __repr__(self) -> str:
return f"EmaneLink({self.from_nem}, {self.to_nem}, {self.sinr})"
class EmaneClient:
def __init__(self, address: str) -> None:
self.address = address
self.client = shell.ControlPortClient(self.address, DEFAULT_PORT)
self.nems = {}
self.setup()
def setup(self) -> None:
manifest = self.client.getManifest()
for nem_id, components in manifest.items():
# get mac config
mac_id, _, emane_model = components[MAC_COMPONENT_INDEX]
mac_config = self.client.getConfiguration(mac_id)
logging.debug(
"address(%s) nem(%s) emane(%s)", self.address, nem_id, emane_model
)
# create loss table based on current configuration
if emane_model == EMANE_80211:
loss_table = self.handle_80211(mac_config)
elif emane_model == EMANE_RFPIPE:
loss_table = self.handle_rfpipe(mac_config)
else:
logging.warning("unknown emane link model: %s", emane_model)
continue
logging.info("monitoring links nem(%s) model(%s)", nem_id, emane_model)
loss_table.mac_id = mac_id
self.nems[nem_id] = loss_table
def check_links(
self, links: Dict[Tuple[int, int], EmaneLink], loss_threshold: int
) -> None:
for from_nem, loss_table in self.nems.items():
tables = self.client.getStatisticTable(loss_table.mac_id, (SINR_TABLE,))
table = tables[SINR_TABLE][1:][0]
for row in table:
row = row
to_nem = row[0][0]
sinr = row[5][0]
age = row[-1][0]
# exclude invalid links
is_self = to_nem == NEM_SELF
has_valid_age = 0 <= age <= 1
if is_self or not has_valid_age:
continue
# check if valid link loss
link_key = (from_nem, to_nem)
loss = loss_table.get_loss(sinr)
if loss < loss_threshold:
link = links.get(link_key)
if link:
link.update(sinr)
else:
link = EmaneLink(from_nem, to_nem, sinr)
links[link_key] = link
def handle_tdma(self, config: Dict[str, Tuple]):
pcr = config["pcrcurveuri"][0][0]
logging.debug("tdma pcr: %s", pcr)
def handle_80211(self, config: Dict[str, Tuple]) -> LossTable:
unicastrate = config["unicastrate"][0][0]
pcr = config["pcrcurveuri"][0][0]
logging.debug("80211 pcr: %s", pcr)
tree = etree.parse(pcr)
root = tree.getroot()
table = root.find("table")
losses = {}
for rate in table.iter("datarate"):
index = int(rate.get("index"))
if index == unicastrate:
for row in rate.iter("row"):
sinr = float(row.get("sinr"))
por = float(row.get("por"))
losses[sinr] = por
return LossTable(losses)
def handle_rfpipe(self, config: Dict[str, Tuple]) -> LossTable:
pcr = config["pcrcurveuri"][0][0]
logging.debug("rfpipe pcr: %s", pcr)
tree = etree.parse(pcr)
root = tree.getroot()
table = root.find("table")
losses = {}
for row in table.iter("row"):
sinr = float(row.get("sinr"))
por = float(row.get("por"))
losses[sinr] = por
return LossTable(losses)
def stop(self) -> None:
self.client.stop()
class EmaneLinkMonitor:
def __init__(self, emane_manager: "EmaneManager") -> None:
self.emane_manager = emane_manager
self.clients = []
self.links = {}
self.complete_links = set()
self.loss_threshold = None
self.link_interval = None
self.link_timeout = None
self.scheduler = None
self.running = False
def start(self) -> None:
self.loss_threshold = int(self.emane_manager.get_config("loss_threshold"))
self.link_interval = int(self.emane_manager.get_config("link_interval"))
self.link_timeout = int(self.emane_manager.get_config("link_timeout"))
self.initialize()
if not self.clients:
logging.info("no valid emane models to monitor links")
return
self.scheduler = sched.scheduler()
self.scheduler.enter(0, 0, self.check_links)
self.running = True
thread = threading.Thread(target=self.scheduler.run, daemon=True)
thread.start()
def initialize(self) -> None:
addresses = self.get_addresses()
for address in addresses:
client = EmaneClient(address)
if client.nems:
self.clients.append(client)
def get_addresses(self) -> List[str]:
addresses = []
nodes = self.emane_manager.getnodes()
for node in nodes:
for netif in node.netifs():
if isinstance(netif.net, CtrlNet):
ip4 = None
for x in netif.addrlist:
address, prefix = x.split("/")
if netaddr.valid_ipv4(address):
ip4 = address
if ip4:
addresses.append(ip4)
break
return addresses
def check_links(self) -> None:
# check for new links
previous_links = set(self.links.keys())
for client in self.clients:
try:
client.check_links(self.links, self.loss_threshold)
except shell.ControlPortException:
if self.running:
logging.exception("link monitor error")
# find new links
current_links = set(self.links.keys())
new_links = current_links - previous_links
# find dead links
dead_links = []
for link_id, link in self.links.items():
if link.is_dead(self.link_timeout):
dead_links.append(link_id)
# announce dead links
for link_id in dead_links:
del self.links[link_id]
complete_id = self.get_complete_id(link_id)
if complete_id in self.complete_links:
self.complete_links.remove(complete_id)
self.send_link(MessageFlags.DELETE, complete_id)
# announce new links
for link_id in new_links:
complete_id = self.get_complete_id(link_id)
if complete_id in self.complete_links:
continue
if self.is_complete_link(link_id):
self.complete_links.add(complete_id)
self.send_link(MessageFlags.ADD, complete_id)
if self.running:
self.scheduler.enter(self.link_interval, 0, self.check_links)
def get_complete_id(self, link_id: Tuple[int, int]) -> Tuple[int, int]:
value_one, value_two = link_id
if value_one < value_two:
return value_one, value_two
else:
return value_two, value_one
def is_complete_link(self, link_id: Tuple[int, int]) -> bool:
reverse_id = link_id[1], link_id[0]
return link_id in self.links and reverse_id in self.links
def send_link(self, message_type: MessageFlags, link_id: Tuple[int, int]) -> None:
nem_one, nem_two = link_id
emane_one, netif = self.emane_manager.nemlookup(nem_one)
if not emane_one or not netif:
logging.error("invalid nem: %s", nem_one)
return
node_one = netif.node
emane_two, netif = self.emane_manager.nemlookup(nem_two)
if not emane_two or not netif:
logging.error("invalid nem: %s", nem_two)
return
node_two = netif.node
logging.debug(
"%s emane link from %s(%s) to %s(%s)",
message_type.name,
node_one.name,
nem_one,
node_two.name,
nem_two,
)
self.send_message(message_type, node_one.id, node_two.id, emane_one.id)
def send_message(self, message_type, node_one, node_two, emane_id) -> None:
link_data = LinkData(
message_type=message_type,
node1_id=node_one,
node2_id=node_two,
network_id=emane_id,
link_type=LinkTypes.WIRELESS,
)
self.emane_manager.session.broadcast_link(link_data)
def stop(self) -> None:
self.running = False
for client in self.clients:
client.stop()
self.clients.clear()
self.links.clear()
self.complete_links.clear()

View file

@ -21,11 +21,18 @@ if TYPE_CHECKING:
from core.emulator.session import Session
def link_data_params(link_data: LinkData) -> Tuple[int, int, bool]:
def link_data_params(link_data: LinkData) -> Tuple[int, int, bool, int]:
node_one = link_data.node1_id
node_two = link_data.node2_id
is_wireless = link_data.link_type == LinkTypes.WIRELESS
return node_one, node_two, is_wireless
network_id = link_data.network_id
return node_one, node_two, is_wireless, network_id
CORE_LAYER = "CORE"
NODE_LAYER = "CORE::Nodes"
LINK_LAYER = "CORE::Links"
CORE_LAYERS = [CORE_LAYER, LINK_LAYER, NODE_LAYER]
class Sdt:
@ -200,6 +207,10 @@ class Sdt:
:return: nothing
"""
nets = []
# create layers
for layer in CORE_LAYERS:
self.cmd(f"layer {layer}")
with self.session._nodes_lock:
for node_id in self.session.nodes:
node = self.session.nodes[node_id]
@ -253,7 +264,10 @@ class Sdt:
icon = icon.replace("$CORE_DATA_DIR", constants.CORE_DATA_DIR)
icon = icon.replace("$CORE_CONF_DIR", constants.CORE_CONF_DIR)
self.cmd(f"sprite {node_type} image {icon}")
self.cmd(f'node {node.id} type {node_type} label on,"{node.name}" {pos}')
self.cmd(
f'node {node.id} nodeLayer "{NODE_LAYER}" '
f'type {node_type} label on,"{node.name}" {pos}'
)
def edit_node(self, node: NodeBase, lon: float, lat: float, alt: float) -> None:
"""
@ -333,13 +347,16 @@ class Sdt:
pass
return result
def add_link(self, node_one: int, node_two: int, is_wireless: bool) -> None:
def add_link(
self, node_one: int, node_two: int, is_wireless: bool, network_id: int = None
) -> None:
"""
Handle adding a link in SDT.
:param node_one: node one id
:param node_two: node two id
:param is_wireless: True if link is wireless, False otherwise
:param network_id: network link is associated with
:return: nothing
"""
logging.debug("sdt add link: %s, %s, %s", node_one, node_two, is_wireless)
@ -351,7 +368,15 @@ class Sdt:
attr = "green,2"
else:
attr = "red,2"
self.cmd(f"link {node_one},{node_two} line {attr}")
link_id = f"{node_one}-{node_two}"
if network_id is not None:
link_id = f"{link_id}-{network_id}"
layer = LINK_LAYER
if network_id:
node = self.session.nodes[network_id]
network_name = node.name
layer = f"{layer}::{network_name}"
self.cmd(f"link {node_one},{node_two},{link_id} linkLayer {layer} line {attr}")
def delete_link(self, node_one: int, node_two: int) -> None:
"""

View file

@ -75,10 +75,10 @@ def create_emane_config(session: "Session") -> etree.Element:
for emulator_config in session.emane.emane_config.emulator_config:
value = config[emulator_config.id]
add_configuration(emulator_element, emulator_config.id, value)
nem_element = etree.SubElement(emane_configuration, "nem")
for nem_config in session.emane.emane_config.nem_config:
value = config[nem_config.id]
add_configuration(nem_element, nem_config.id, value)
core_element = etree.SubElement(emane_configuration, "core")
for core_config in session.emane.emane_config.core_config:
value = config[core_config.id]
add_configuration(core_element, core_config.id, value)
return emane_configuration
@ -737,8 +737,8 @@ class CoreXmlReader:
name = config.get("name")
value = config.get("value")
configs[name] = value
nem_configuration = emane_global_configuration.find("nem")
for config in nem_configuration.iterchildren():
core_configuration = emane_global_configuration.find("core")
for config in core_configuration.iterchildren():
name = config.get("name")
value = config.get("value")
configs[name] = value