core-extra/daemon/core/configservices/nrlservices/services.py

202 lines
6.8 KiB
Python
Raw Normal View History

from typing import Any, Dict, List
import netaddr
from core import utils
from core.config import Configuration
from core.configservice.base import ConfigService, ConfigServiceMode
GROUP: str = "ProtoSvc"
class MgenSinkService(ConfigService):
name: str = "MGEN_Sink"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["mgensink.sh", "sink.mgen"]
executables: List[str] = ["mgen"]
dependencies: List[str] = []
startup: List[str] = ["sh mgensink.sh"]
validate: List[str] = ["pidof mgen"]
shutdown: List[str] = ["killall mgen"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
ifnames = []
for iface in self.node.get_ifaces():
name = utils.sysctl_devname(iface.name)
ifnames.append(name)
return dict(ifnames=ifnames)
class NrlNhdp(ConfigService):
name: str = "NHDP"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["nrlnhdp.sh"]
executables: List[str] = ["nrlnhdp"]
dependencies: List[str] = []
startup: List[str] = ["sh nrlnhdp.sh"]
validate: List[str] = ["pidof nrlnhdp"]
shutdown: List[str] = ["killall nrlnhdp"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
has_smf = "SMF" in self.node.config_services
ifnames = []
for iface in self.node.get_ifaces(control=False):
ifnames.append(iface.name)
return dict(has_smf=has_smf, ifnames=ifnames)
class NrlSmf(ConfigService):
name: str = "SMF"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["startsmf.sh"]
executables: List[str] = ["nrlsmf", "killall"]
dependencies: List[str] = []
startup: List[str] = ["sh startsmf.sh"]
validate: List[str] = ["pidof nrlsmf"]
shutdown: List[str] = ["killall nrlsmf"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
has_arouted = "arouted" in self.node.config_services
has_nhdp = "NHDP" in self.node.config_services
has_olsr = "OLSR" in self.node.config_services
ifnames = []
ip4_prefix = None
for iface in self.node.get_ifaces(control=False):
ifnames.append(iface.name)
if ip4_prefix:
continue
for a in iface.addrlist:
a = a.split("/")[0]
if netaddr.valid_ipv4(a):
ip4_prefix = f"{a}/{24}"
break
return dict(
has_arouted=has_arouted,
has_nhdp=has_nhdp,
has_olsr=has_olsr,
ifnames=ifnames,
ip4_prefix=ip4_prefix,
)
class NrlOlsr(ConfigService):
name: str = "OLSR"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["nrlolsrd.sh"]
executables: List[str] = ["nrlolsrd"]
dependencies: List[str] = []
startup: List[str] = ["sh nrlolsrd.sh"]
validate: List[str] = ["pidof nrlolsrd"]
shutdown: List[str] = ["killall nrlolsrd"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
has_smf = "SMF" in self.node.config_services
has_zebra = "zebra" in self.node.config_services
ifname = None
for iface in self.node.get_ifaces(control=False):
ifname = iface.name
break
return dict(has_smf=has_smf, has_zebra=has_zebra, ifname=ifname)
class NrlOlsrv2(ConfigService):
name: str = "OLSRv2"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["nrlolsrv2.sh"]
executables: List[str] = ["nrlolsrv2"]
dependencies: List[str] = []
startup: List[str] = ["sh nrlolsrv2.sh"]
validate: List[str] = ["pidof nrlolsrv2"]
shutdown: List[str] = ["killall nrlolsrv2"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
has_smf = "SMF" in self.node.config_services
ifnames = []
for iface in self.node.get_ifaces(control=False):
ifnames.append(iface.name)
return dict(has_smf=has_smf, ifnames=ifnames)
class OlsrOrg(ConfigService):
name: str = "OLSRORG"
group: str = GROUP
directories: List[str] = ["/etc/olsrd"]
files: List[str] = ["olsrd.sh", "/etc/olsrd/olsrd.conf"]
executables: List[str] = ["olsrd"]
dependencies: List[str] = []
startup: List[str] = ["sh olsrd.sh"]
validate: List[str] = ["pidof olsrd"]
shutdown: List[str] = ["killall olsrd"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
has_smf = "SMF" in self.node.config_services
ifnames = []
for iface in self.node.get_ifaces(control=False):
ifnames.append(iface.name)
return dict(has_smf=has_smf, ifnames=ifnames)
class MgenActor(ConfigService):
name: str = "MgenActor"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["start_mgen_actor.sh"]
executables: List[str] = ["mgen"]
dependencies: List[str] = []
startup: List[str] = ["sh start_mgen_actor.sh"]
validate: List[str] = ["pidof mgen"]
shutdown: List[str] = ["killall mgen"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
class Arouted(ConfigService):
name: str = "arouted"
group: str = GROUP
directories: List[str] = []
files: List[str] = ["startarouted.sh"]
executables: List[str] = ["arouted"]
dependencies: List[str] = []
startup: List[str] = ["sh startarouted.sh"]
validate: List[str] = ["pidof arouted"]
shutdown: List[str] = ["pkill arouted"]
validation_mode: ConfigServiceMode = ConfigServiceMode.BLOCKING
default_configs: List[Configuration] = []
modes: Dict[str, Dict[str, str]] = {}
def data(self) -> Dict[str, Any]:
ip4_prefix = None
for iface in self.node.get_ifaces(control=False):
if ip4_prefix:
continue
for a in iface.addrlist:
a = a.split("/")[0]
if netaddr.valid_ipv4(a):
ip4_prefix = f"{a}/{24}"
break
return dict(ip4_prefix=ip4_prefix)