core-extra/daemon/core/utils.py

469 lines
13 KiB
Python

"""
Miscellaneous utility functions, wrappers around some subprocess procedures.
"""
import concurrent.futures
import fcntl
import hashlib
import importlib
import inspect
import json
import logging
import logging.config
import os
import random
import shlex
import shutil
import sys
from subprocess import PIPE, STDOUT, Popen
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
import netaddr
from core.errors import CoreCommandError, CoreError
if TYPE_CHECKING:
from core.emulator.session import Session
from core.nodes.base import CoreNode
T = TypeVar("T")
DEVNULL = open(os.devnull, "wb")
def execute_file(
path: str, exec_globals: Dict[str, str] = None, exec_locals: Dict[str, str] = None
) -> None:
"""
Provides an alternative way to run execfile to be compatible for
both python2/3.
:param path: path of file to execute
:param exec_globals: globals values to pass to execution
:param exec_locals: local values to pass to execution
:return: nothing
"""
if exec_globals is None:
exec_globals = {}
exec_globals.update({"__file__": path, "__name__": "__main__"})
with open(path, "rb") as f:
data = compile(f.read(), path, "exec")
exec(data, exec_globals, exec_locals)
def hashkey(value: Union[str, int]) -> int:
"""
Provide a consistent hash that can be used in place
of the builtin hash, that no longer behaves consistently
in python3.
:param value: value to hash
:return: hash value
"""
if isinstance(value, int):
value = str(value)
value = value.encode("utf-8")
return int(hashlib.sha256(value).hexdigest(), 16)
def _detach_init() -> None:
"""
Fork a child process and exit.
:return: nothing
"""
if os.fork():
# parent exits
os._exit(0)
os.setsid()
def _valid_module(path: str, file_name: str) -> bool:
"""
Check if file is a valid python module.
:param path: path to file
:param file_name: file name to check
:return: True if a valid python module file, False otherwise
"""
file_path = os.path.join(path, file_name)
if not os.path.isfile(file_path):
return False
if file_name.startswith("_"):
return False
if not file_name.endswith(".py"):
return False
return True
def _is_class(module: Any, member: Type, clazz: Type) -> bool:
"""
Validates if a module member is a class and an instance of a CoreService.
:param module: module to validate for service
:param member: member to validate for service
:param clazz: clazz type to check for validation
:return: True if a valid service, False otherwise
"""
if not inspect.isclass(member):
return False
if not issubclass(member, clazz):
return False
if member.__module__ != module.__name__:
return False
return True
def close_onexec(fd: int) -> None:
"""
Close on execution of a shell process.
:param fd: file descriptor to close
:return: nothing
"""
fdflags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, fdflags | fcntl.FD_CLOEXEC)
def which(command: str, required: bool) -> str:
"""
Find location of desired executable within current PATH.
:param command: command to find location for
:param required: command is required to be found, false otherwise
:return: command location or None
:raises ValueError: when not found and required
"""
found_path = shutil.which(command)
if found_path is None and required:
raise ValueError(f"failed to find required executable({command}) in path")
return found_path
def make_tuple(obj: Generic[T]) -> Tuple[T]:
"""
Create a tuple from an object, or return the object itself.
:param obj: object to convert to a tuple
:return: converted tuple or the object itself
"""
if hasattr(obj, "__iter__"):
return tuple(obj)
else:
return (obj,)
def make_tuple_fromstr(s: str, value_type: Callable[[str], T]) -> Tuple[T]:
"""
Create a tuple from a string.
:param s: string to convert to a tuple
:param value_type: type of values to be contained within tuple
:return: tuple from string
"""
# remove tuple braces and strip commands and space from all values in the tuple
# string
values = []
for x in s.strip("(), ").split(","):
x = x.strip("' ")
if x:
values.append(x)
return tuple(value_type(i) for i in values)
def mute_detach(args: str, **kwargs: Dict[str, Any]) -> int:
"""
Run a muted detached process by forking it.
:param args: arguments for the command
:param kwargs: keyword arguments for the command
:return: process id of the command
"""
args = shlex.split(args)
kwargs["preexec_fn"] = _detach_init
kwargs["stdout"] = DEVNULL
kwargs["stderr"] = STDOUT
return Popen(args, **kwargs).pid
def cmd(
args: str,
env: Dict[str, str] = None,
cwd: str = None,
wait: bool = True,
shell: bool = False,
) -> str:
"""
Execute a command on the host and return a tuple containing the exit status and
result string. stderr output is folded into the stdout result string.
:param args: command arguments
:param env: environment to run command with
:param cwd: directory to run command in
:param wait: True to wait for status, False otherwise
:param shell: True to use shell, False otherwise
:return: combined stdout and stderr
:raises CoreCommandError: when there is a non-zero exit status or the file to
execute is not found
"""
logging.debug("command cwd(%s) wait(%s): %s", cwd, wait, args)
if shell is False:
args = shlex.split(args)
try:
p = Popen(args, stdout=PIPE, stderr=PIPE, env=env, cwd=cwd, shell=shell)
if wait:
stdout, stderr = p.communicate()
status = p.wait()
if status != 0:
raise CoreCommandError(status, args, stdout, stderr)
return stdout.decode("utf-8").strip()
else:
return ""
except OSError:
raise CoreCommandError(-1, args)
def file_munge(pathname: str, header: str, text: str) -> None:
"""
Insert text at the end of a file, surrounded by header comments.
:param pathname: file path to add text to
:param header: header text comments
:param text: text to append to file
:return: nothing
"""
# prevent duplicates
file_demunge(pathname, header)
with open(pathname, "a") as append_file:
append_file.write(f"# BEGIN {header}\n")
append_file.write(text)
append_file.write(f"# END {header}\n")
def file_demunge(pathname: str, header: str) -> None:
"""
Remove text that was inserted in a file surrounded by header comments.
:param pathname: file path to open for removing a header
:param header: header text to target for removal
:return: nothing
"""
with open(pathname, "r") as read_file:
lines = read_file.readlines()
start = None
end = None
for i, line in enumerate(lines):
if line == f"# BEGIN {header}\n":
start = i
elif line == f"# END {header}\n":
end = i + 1
if start is None or end is None:
return
with open(pathname, "w") as write_file:
lines = lines[:start] + lines[end:]
write_file.write("".join(lines))
def expand_corepath(
pathname: str, session: "Session" = None, node: "CoreNode" = None
) -> str:
"""
Expand a file path given session information.
:param pathname: file path to expand
:param session: core session object to expand path
:param node: node to expand path with
:return: expanded path
"""
if session is not None:
pathname = pathname.replace("~", f"/home/{session.user}")
pathname = pathname.replace("%SESSION%", str(session.id))
pathname = pathname.replace("%SESSION_DIR%", session.session_dir)
pathname = pathname.replace("%SESSION_USER%", session.user)
if node is not None:
pathname = pathname.replace("%NODE%", str(node.id))
pathname = pathname.replace("%NODENAME%", node.name)
return pathname
def sysctl_devname(devname: str) -> Optional[str]:
"""
Translate a device name to the name used with sysctl.
:param devname: device name to translate
:return: translated device name
"""
if devname is None:
return None
return devname.replace(".", "/")
def load_config(filename: str, d: Dict[str, str]) -> None:
"""
Read key=value pairs from a file, into a dict. Skip comments; strip newline
characters and spacing.
:param filename: file to read into a dictionary
:param d: dictionary to read file into
:return: nothing
"""
with open(filename, "r") as f:
lines = f.readlines()
for line in lines:
if line[:1] == "#":
continue
try:
key, value = line.split("=", 1)
d[key] = value.strip()
except ValueError:
logging.exception("error reading file to dict: %s", filename)
def load_classes(path: str, clazz: Generic[T]) -> T:
"""
Dynamically load classes for use within CORE.
:param path: path to load classes from
:param clazz: class type expected to be inherited from for loading
:return: list of classes loaded
"""
# validate path exists
logging.debug("attempting to load modules from path: %s", path)
if not os.path.isdir(path):
logging.warning("invalid custom module directory specified" ": %s", path)
# check if path is in sys.path
parent_path = os.path.dirname(path)
if parent_path not in sys.path:
logging.debug("adding parent path to allow imports: %s", parent_path)
sys.path.append(parent_path)
# retrieve potential service modules, and filter out invalid modules
base_module = os.path.basename(path)
module_names = os.listdir(path)
module_names = filter(lambda x: _valid_module(path, x), module_names)
module_names = map(lambda x: x[:-3], module_names)
# import and add all service modules in the path
classes = []
for module_name in module_names:
import_statement = f"{base_module}.{module_name}"
logging.debug("importing custom module: %s", import_statement)
try:
module = importlib.import_module(import_statement)
members = inspect.getmembers(module, lambda x: _is_class(module, x, clazz))
for member in members:
valid_class = member[1]
classes.append(valid_class)
except Exception:
logging.exception(
"unexpected error during import, skipping: %s", import_statement
)
return classes
def load_logging_config(config_path: str) -> None:
"""
Load CORE logging configuration file.
:param config_path: path to logging config file
:return: nothing
"""
with open(config_path, "r") as log_config_file:
log_config = json.load(log_config_file)
logging.config.dictConfig(log_config)
def threadpool(
funcs: List[Tuple[Callable, Iterable[Any], Dict[Any, Any]]], workers: int = 10
) -> Tuple[List[Any], List[Exception]]:
"""
Run provided functions, arguments, and keywords within a threadpool
collecting results and exceptions.
:param funcs: iterable that provides a func, args, kwargs
:param workers: number of workers for the threadpool
:return: results and exceptions from running functions with args and kwargs
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for func, args, kwargs in funcs:
future = executor.submit(func, *args, **kwargs)
futures.append(future)
results = []
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logging.exception("thread pool exception")
exceptions.append(e)
return results, exceptions
def random_mac() -> str:
"""
Create a random mac address using Xen OID 00:16:3E.
:return: random mac address
"""
value = random.randint(0, 0xFFFFFF)
value |= 0x00163E << 24
mac = netaddr.EUI(value, dialect=netaddr.mac_unix_expanded)
return str(mac)
def validate_mac(value: str) -> str:
"""
Validate mac and return unix formatted version.
:param value: address to validate
:return: unix formatted mac
"""
try:
mac = netaddr.EUI(value, dialect=netaddr.mac_unix_expanded)
return str(mac)
except netaddr.AddrFormatError as e:
raise CoreError(f"invalid mac address {value}: {e}")
def validate_ip(value: str) -> str:
"""
Validate ip address with prefix and return formatted version.
:param value: address to validate
:return: formatted ip address
"""
try:
ip = netaddr.IPNetwork(value)
return str(ip)
except (ValueError, netaddr.AddrFormatError) as e:
raise CoreError(f"invalid ip address {value}: {e}")