core-extra/daemon/core/services/coreservices.py

773 lines
26 KiB
Python

"""
Definition of CoreService class that is subclassed to define
startup services and routing for nodes. A service is typically a daemon
program launched when a node starts that provides some sort of service.
The CoreServices class handles configuration messages for sending
a list of available services to the GUI and for configuring individual
services.
"""
import enum
import logging
import pkgutil
import time
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union
from core import services as core_services
from core import utils
from core.emulator.data import FileData
from core.emulator.enumerations import ExceptionLevels, MessageFlags, RegisterTlvs
from core.errors import (
CoreCommandError,
CoreError,
CoreServiceBootError,
CoreServiceError,
)
from core.nodes.base import CoreNode
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from core.emulator.session import Session
CoreServiceType = Union["CoreService", type["CoreService"]]
class ServiceMode(enum.Enum):
BLOCKING = 0
NON_BLOCKING = 1
TIMER = 2
class ServiceDependencies:
"""
Can generate boot paths for services, based on their dependencies. Will validate
that all services will be booted and that all dependencies exist within the services
provided.
"""
def __init__(self, services: list["CoreServiceType"]) -> None:
self.visited: set[str] = set()
self.services: dict[str, "CoreServiceType"] = {}
self.paths: dict[str, list["CoreServiceType"]] = {}
self.boot_paths: list[list["CoreServiceType"]] = []
roots = {x.name for x in services}
for service in services:
self.services[service.name] = service
roots -= set(service.dependencies)
self.roots: list["CoreServiceType"] = [x for x in services if x.name in roots]
if services and not self.roots:
raise ValueError("circular dependency is present")
def _search(
self,
service: "CoreServiceType",
visiting: set[str] = None,
path: list[str] = None,
) -> list["CoreServiceType"]:
if service.name in self.visited:
return self.paths[service.name]
self.visited.add(service.name)
if visiting is None:
visiting = set()
visiting.add(service.name)
if path is None:
for dependency in service.dependencies:
path = self.paths.get(dependency)
if path is not None:
break
for dependency in service.dependencies:
service_dependency = self.services.get(dependency)
if not service_dependency:
raise ValueError(f"required dependency was not provided: {dependency}")
if dependency in visiting:
raise ValueError(f"circular dependency, already visited: {dependency}")
else:
path = self._search(service_dependency, visiting, path)
visiting.remove(service.name)
if path is None:
path = []
self.boot_paths.append(path)
path.append(service)
self.paths[service.name] = path
return path
def boot_order(self) -> list[list["CoreServiceType"]]:
for service in self.roots:
self._search(service)
return self.boot_paths
class ServiceManager:
"""
Manages services available for CORE nodes to use.
"""
services: dict[str, type["CoreService"]] = {}
@classmethod
def add(cls, service: type["CoreService"]) -> None:
"""
Add a service to manager.
:param service: service to add
:return: nothing
:raises ValueError: when service cannot be loaded
"""
name = service.name
logger.debug("loading service: class(%s) name(%s)", service.__name__, name)
# avoid services with no name
if name is None:
logger.debug("not loading class(%s) with no name", service.__name__)
return
# avoid duplicate services
if name in cls.services:
raise ValueError(f"duplicate service being added: {name}")
# validate dependent executables are present
for executable in service.executables:
try:
utils.which(executable, required=True)
except CoreError as e:
raise CoreError(f"service({name}): {e}")
# validate service on load succeeds
try:
service.on_load()
except Exception as e:
logger.exception("error during service(%s) on load", service.name)
raise ValueError(e)
# make service available
cls.services[name] = service
@classmethod
def get(cls, name: str) -> type["CoreService"]:
"""
Retrieve a service from the manager.
:param name: name of the service to retrieve
:return: service if it exists, None otherwise
"""
service = cls.services.get(name)
if service is None:
raise CoreServiceError(f"service({name}) does not exist")
return service
@classmethod
def add_services(cls, path: Path) -> list[str]:
"""
Method for retrieving all CoreServices from a given path.
:param path: path to retrieve services from
:return: list of core services that failed to load
"""
service_errors = []
services = utils.load_classes(path, CoreService)
for service in services:
if not service.name:
continue
try:
cls.add(service)
except (CoreError, ValueError) as e:
service_errors.append(service.name)
logger.debug("not loading service(%s): %s", service.name, e)
return service_errors
@classmethod
def load_locals(cls) -> list[str]:
errors = []
for module_info in pkgutil.walk_packages(
core_services.__path__, f"{core_services.__name__}."
):
services = utils.load_module(module_info.name, CoreService)
for service in services:
try:
cls.add(service)
except CoreError as e:
errors.append(service.name)
logger.debug("not loading service(%s): %s", service.name, e)
return errors
class CoreServices:
"""
Class for interacting with a list of available startup services for
nodes. Mostly used to convert a CoreService into a Config API
message. This class lives in the Session object and remembers
the default services configured for each node type, and any
custom service configuration. A CoreService is not a Configurable.
"""
name: str = "services"
config_type: RegisterTlvs = RegisterTlvs.UTILITY
def __init__(self, session: "Session") -> None:
"""
Creates a CoreServices instance.
:param session: session this manager is tied to
"""
self.session: "Session" = session
# dict of default services tuples, key is node type
self.default_services: dict[str, list[str]] = {
"mdr": ["zebra", "OSPFv3MDR", "IPForward"],
"PC": ["DefaultRoute"],
"prouter": [],
"router": ["zebra", "OSPFv2", "OSPFv3", "IPForward"],
"host": ["DefaultRoute", "SSH"],
}
# dict of node ids to dict of custom services by name
self.custom_services: dict[int, dict[str, "CoreService"]] = {}
def reset(self) -> None:
"""
Called when config message with reset flag is received
"""
self.custom_services.clear()
def get_service(
self, node_id: int, service_name: str, default_service: bool = False
) -> "CoreService":
"""
Get any custom service configured for the given node that matches the specified
service name. If no custom service is found, return the specified service.
:param node_id: object id to get service from
:param service_name: name of service to retrieve
:param default_service: True to return default service when custom does
not exist, False returns None
:return: custom service from the node
"""
node_services = self.custom_services.setdefault(node_id, {})
default = None
if default_service:
default = ServiceManager.get(service_name)
return node_services.get(service_name, default)
def set_service(self, node_id: int, service_name: str) -> None:
"""
Store service customizations in an instantiated service object
using a list of values that came from a config message.
:param node_id: object id to set custom service for
:param service_name: name of service to set
:return: nothing
"""
logger.debug("setting custom service(%s) for node: %s", service_name, node_id)
service = self.get_service(node_id, service_name)
if not service:
service_class = ServiceManager.get(service_name)
service = service_class()
# add the custom service to dict
node_services = self.custom_services.setdefault(node_id, {})
node_services[service.name] = service
def add_services(
self, node: CoreNode, model: str, services: list[str] = None
) -> None:
"""
Add services to a node.
:param node: node to add services to
:param model: node model type to add services for
:param services: names of services to add to node
:return: nothing
"""
if not services:
logger.info(
"using default services for node(%s) type(%s)", node.name, model
)
services = self.default_services.get(model, [])
logger.info("setting services for node(%s): %s", node.name, services)
for service_name in services:
service = self.get_service(node.id, service_name, default_service=True)
if not service:
logger.warning(
"unknown service(%s) for node(%s)", service_name, node.name
)
continue
node.services.append(service)
def all_configs(self) -> list[tuple[int, "CoreService"]]:
"""
Return (node_id, service) tuples for all stored configs. Used when reconnecting
to a session or opening XML.
:return: list of tuples of node ids and services
"""
configs = []
for node_id in self.custom_services:
custom_services = self.custom_services[node_id]
for name in custom_services:
service = custom_services[name]
configs.append((node_id, service))
return configs
def all_files(self, service: "CoreService") -> list[tuple[str, str]]:
"""
Return all customized files stored with a service.
Used when reconnecting to a session or opening XML.
:param service: service to get files for
:return: list of all custom service files
"""
files = []
if not service.custom:
return files
for filename in service.configs:
data = service.config_data.get(filename)
if data is None:
continue
files.append((filename, data))
return files
def boot_services(self, node: CoreNode) -> None:
"""
Start all services on a node.
:param node: node to start services on
:return: nothing
"""
boot_paths = ServiceDependencies(node.services).boot_order()
funcs = []
for boot_path in boot_paths:
args = (node, boot_path)
funcs.append((self._boot_service_path, args, {}))
result, exceptions = utils.threadpool(funcs)
if exceptions:
raise CoreServiceBootError(*exceptions)
def _boot_service_path(self, node: CoreNode, boot_path: list["CoreServiceType"]):
logger.info(
"booting node(%s) services: %s",
node.name,
" -> ".join([x.name for x in boot_path]),
)
for service in boot_path:
service = self.get_service(node.id, service.name, default_service=True)
try:
self.boot_service(node, service)
except Exception as e:
logger.exception("exception booting service: %s", service.name)
raise CoreServiceBootError(e)
def boot_service(self, node: CoreNode, service: "CoreServiceType") -> None:
"""
Start a service on a node. Create private dirs, generate config
files, and execute startup commands.
:param node: node to boot services on
:param service: service to start
:return: nothing
"""
logger.info(
"starting node(%s) service(%s) validation(%s)",
node.name,
service.name,
service.validation_mode.name,
)
# create service directories
for directory in service.dirs:
dir_path = Path(directory)
try:
node.create_dir(dir_path)
except (CoreCommandError, CoreError) as e:
logger.warning(
"error mounting private dir '%s' for service '%s': %s",
directory,
service.name,
e,
)
# create service files
self.create_service_files(node, service)
# run startup
wait = service.validation_mode == ServiceMode.BLOCKING
status = self.startup_service(node, service, wait)
if status:
raise CoreServiceBootError(
f"node({node.name}) service({service.name}) error during startup"
)
# blocking mode is finished
if wait:
return
# timer mode, sleep and return
if service.validation_mode == ServiceMode.TIMER:
time.sleep(service.validation_timer)
# non-blocking, attempt to validate periodically, up to validation_timer time
elif service.validation_mode == ServiceMode.NON_BLOCKING:
start = time.monotonic()
while True:
status = self.validate_service(node, service)
if not status:
break
if time.monotonic() - start > service.validation_timer:
break
time.sleep(service.validation_period)
if status:
raise CoreServiceBootError(
f"node({node.name}) service({service.name}) failed validation"
)
def copy_service_file(self, node: CoreNode, file_path: Path, cfg: str) -> bool:
"""
Given a configured service filename and config, determine if the
config references an existing file that should be copied.
Returns True for local files, False for generated.
:param node: node to copy service for
:param file_path: file name for a configured service
:param cfg: configuration string
:return: True if successful, False otherwise
"""
if cfg[:7] == "file://":
src = cfg[7:]
src = src.split("\n")[0]
src = utils.expand_corepath(src, node.session, node)
# TODO: glob here
node.copy_file(src, file_path, mode=0o644)
return True
return False
def validate_service(self, node: CoreNode, service: "CoreServiceType") -> int:
"""
Run the validation command(s) for a service.
:param node: node to validate service for
:param service: service to validate
:return: service validation status
"""
logger.debug("validating node(%s) service(%s)", node.name, service.name)
cmds = service.validate
if not service.custom:
cmds = service.get_validate(node)
status = 0
for cmd in cmds:
logger.debug("validating service(%s) using: %s", service.name, cmd)
try:
node.cmd(cmd)
except CoreCommandError as e:
logger.debug(
"node(%s) service(%s) validate failed", node.name, service.name
)
logger.debug("cmd(%s): %s", e.cmd, e.output)
status = -1
break
return status
def stop_services(self, node: CoreNode) -> None:
"""
Stop all services on a node.
:param node: node to stop services on
:return: nothing
"""
for service in node.services:
self.stop_service(node, service)
def stop_service(self, node: CoreNode, service: "CoreServiceType") -> int:
"""
Stop a service on a node.
:param node: node to stop a service on
:param service: service to stop
:return: status for stopping the services
"""
status = 0
for args in service.shutdown:
try:
node.cmd(args)
except CoreCommandError as e:
self.session.exception(
ExceptionLevels.ERROR,
"services",
f"error stopping service {service.name}: {e.stderr}",
node.id,
)
logger.exception("error running stop command %s", args)
status = -1
return status
def get_service_file(
self, node: CoreNode, service_name: str, filename: str
) -> FileData:
"""
Send a File Message when the GUI has requested a service file.
The file data is either auto-generated or comes from an existing config.
:param node: node to get service file from
:param service_name: service to get file from
:param filename: file name to retrieve
:return: file data
"""
# get service to get file from
service = self.get_service(node.id, service_name, default_service=True)
if not service:
raise ValueError("invalid service: %s", service_name)
# retrieve config files for default/custom service
if service.custom:
config_files = service.configs
else:
config_files = service.get_configs(node)
if filename not in config_files:
raise ValueError(
"unknown service(%s) config file: %s", service_name, filename
)
# get the file data
data = service.config_data.get(filename)
if data is None:
data = service.generate_config(node, filename)
else:
data = data
filetypestr = f"service:{service.name}"
return FileData(
message_type=MessageFlags.ADD,
node=node.id,
name=filename,
type=filetypestr,
data=data,
)
def set_service_file(
self, node_id: int, service_name: str, file_name: str, data: str
) -> None:
"""
Receive a File Message from the GUI and store the customized file
in the service config. The filename must match one from the list of
config files in the service.
:param node_id: node id to set service file
:param service_name: service name to set file for
:param file_name: file name to set
:param data: data for file to set
:return: nothing
"""
# attempt to set custom service, if needed
self.set_service(node_id, service_name)
# retrieve custom service
service = self.get_service(node_id, service_name)
if service is None:
logger.warning("received file name for unknown service: %s", service_name)
return
# validate file being set is valid
config_files = service.configs
if file_name not in config_files:
logger.warning(
"received unknown file(%s) for service(%s)", file_name, service_name
)
return
# set custom service file data
service.config_data[file_name] = data
def startup_service(
self, node: CoreNode, service: "CoreServiceType", wait: bool = False
) -> int:
"""
Startup a node service.
:param node: node to reconfigure service for
:param service: service to reconfigure
:param wait: determines if we should wait to validate startup
:return: status of startup
"""
cmds = service.startup
if not service.custom:
cmds = service.get_startup(node)
status = 0
for cmd in cmds:
try:
node.cmd(cmd, wait)
except CoreCommandError:
logger.exception("error starting command")
status = -1
return status
def create_service_files(self, node: CoreNode, service: "CoreServiceType") -> None:
"""
Creates node service files.
:param node: node to reconfigure service for
:param service: service to reconfigure
:return: nothing
"""
# get values depending on if custom or not
config_files = service.configs
if not service.custom:
config_files = service.get_configs(node)
for file_name in config_files:
file_path = Path(file_name)
logger.debug(
"generating service config custom(%s): %s", service.custom, file_name
)
if service.custom:
cfg = service.config_data.get(file_name)
if cfg is None:
cfg = service.generate_config(node, file_name)
# cfg may have a file:/// url for copying from a file
try:
if self.copy_service_file(node, file_path, cfg):
continue
except OSError:
logger.exception("error copying service file: %s", file_name)
continue
else:
cfg = service.generate_config(node, file_name)
node.create_file(file_path, cfg)
def service_reconfigure(self, node: CoreNode, service: "CoreService") -> None:
"""
Reconfigure a node service.
:param node: node to reconfigure service for
:param service: service to reconfigure
:return: nothing
"""
config_files = service.configs
if not service.custom:
config_files = service.get_configs(node)
for file_name in config_files:
file_path = Path(file_name)
if file_name[:7] == "file:///":
# TODO: implement this
raise NotImplementedError
cfg = service.config_data.get(file_name)
if cfg is None:
cfg = service.generate_config(node, file_name)
node.create_file(file_path, cfg)
class CoreService:
"""
Parent class used for defining services.
"""
# service name should not include spaces
name: Optional[str] = None
# executables that must exist for service to run
executables: tuple[str, ...] = ()
# sets service requirements that must be started prior to this service starting
dependencies: tuple[str, ...] = ()
# group string allows grouping services together
group: Optional[str] = None
# private, per-node directories required by this service
dirs: tuple[str, ...] = ()
# config files written by this service
configs: tuple[str, ...] = ()
# config file data
config_data: dict[str, str] = {}
# list of startup commands
startup: tuple[str, ...] = ()
# list of shutdown commands
shutdown: tuple[str, ...] = ()
# list of validate commands
validate: tuple[str, ...] = ()
# validation mode, used to determine startup success
validation_mode: ServiceMode = ServiceMode.NON_BLOCKING
# time to wait in seconds for determining if service started successfully
validation_timer: int = 5
# validation period in seconds, how frequent validation is attempted
validation_period: float = 0.5
# metadata associated with this service
meta: Optional[str] = None
# custom configuration text
custom: bool = False
custom_needed: bool = False
def __init__(self) -> None:
"""
Services are not necessarily instantiated. Classmethods may be used
against their config. Services are instantiated when a custom
configuration is used to override their default parameters.
"""
self.custom: bool = True
self.config_data: dict[str, str] = self.__class__.config_data.copy()
@classmethod
def on_load(cls) -> None:
pass
@classmethod
def get_configs(cls, node: CoreNode) -> Iterable[str]:
"""
Return the tuple of configuration file filenames. This default method
returns the cls._configs tuple, but this method may be overriden to
provide node-specific filenames that may be based on other services.
:param node: node to generate config for
:return: configuration files
"""
return cls.configs
@classmethod
def generate_config(cls, node: CoreNode, filename: str) -> str:
"""
Generate configuration file given a node object. The filename is
provided to allow for multiple config files.
Return the configuration string to be written to a file or sent
to the GUI for customization.
:param node: node to generate config for
:param filename: file name to generate config for
:return: generated config
"""
raise NotImplementedError
@classmethod
def get_startup(cls, node: CoreNode) -> Iterable[str]:
"""
Return the tuple of startup commands. This default method
returns the cls.startup tuple, but this method may be
overridden to provide node-specific commands that may be
based on other services.
:param node: node to get startup for
:return: startup commands
"""
return cls.startup
@classmethod
def get_validate(cls, node: CoreNode) -> Iterable[str]:
"""
Return the tuple of validate commands. This default method
returns the cls.validate tuple, but this method may be
overridden to provide node-specific commands that may be
based on other services.
:param node: node to validate
:return: validation commands
"""
return cls.validate