From 9c265ab28379f72147292311a7493bc761d8719b Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Fri, 10 Jun 2022 10:01:48 -0700 Subject: [PATCH 1/4] daemon: updates to change hostname settings to replace _ to - due to _ being an invalid character --- daemon/core/nodes/docker.py | 3 ++- daemon/core/nodes/netclient.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/daemon/core/nodes/docker.py b/daemon/core/nodes/docker.py index a97ba08c..d454ea42 100644 --- a/daemon/core/nodes/docker.py +++ b/daemon/core/nodes/docker.py @@ -142,8 +142,9 @@ class DockerNode(CoreNode): volumes += ( f"--mount type=volume," f"source={volume.src},target={volume.dst} " ) + hostname = self.name.replace("_", "-") self.host_cmd( - f"{DOCKER} run -td --init --net=none --hostname {self.name} " + f"{DOCKER} run -td --init --net=none --hostname {hostname} " f"--name {self.name} --sysctl net.ipv6.conf.all.disable_ipv6=0 " f"{binds} {volumes} " f"--privileged {self.image} tail -f /dev/null" diff --git a/daemon/core/nodes/netclient.py b/daemon/core/nodes/netclient.py index 09cf94ec..e0a409f4 100644 --- a/daemon/core/nodes/netclient.py +++ b/daemon/core/nodes/netclient.py @@ -28,6 +28,7 @@ class LinuxNetClient: :param name: name for hostname :return: nothing """ + name = name.replace("_", "-") self.run(f"hostname {name}") def create_route(self, route: str, device: str) -> None: From 60a48c7084d249cdb8061bdd13a3f7c9e50f677e Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Fri, 10 Jun 2022 12:12:25 -0700 Subject: [PATCH 2/4] daemon: update node commands to make use of shlex.quote for shell=True commands --- daemon/core/nodes/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/daemon/core/nodes/base.py b/daemon/core/nodes/base.py index 404e9ab2..5866b5e2 100644 --- a/daemon/core/nodes/base.py +++ b/daemon/core/nodes/base.py @@ -3,6 +3,7 @@ Defines the base logic for nodes used within core. """ import abc import logging +import shlex import shutil import threading from dataclasses import dataclass, field @@ -702,7 +703,7 @@ class CoreNode(CoreNodeBase): :return: node command """ if shell: - args = f'{BASH} -c "{args}"' + args = f"{BASH} -c {shlex.quote(args)}" return f"{VCMD} -c {self.ctrlchnlname} -- {args}" def cmd(self, args: str, wait: bool = True, shell: bool = False) -> str: From 9c69881aad57b0c2cf5c7fb2741d7b9ddc6ce298 Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Fri, 10 Jun 2022 14:23:06 -0700 Subject: [PATCH 3/4] daemon: updates to expose node.create_cmd and not be private, added utility functions for running multiple commands on multiple nodes more efficiently --- daemon/core/nodes/base.py | 6 +- daemon/core/nodes/docker.py | 2 +- daemon/core/nodes/lxd.py | 2 +- daemon/core/nodes/physical.py | 2 +- daemon/core/utils.py | 117 +++++++++++++++++++++++++++++++++- 5 files changed, 121 insertions(+), 8 deletions(-) diff --git a/daemon/core/nodes/base.py b/daemon/core/nodes/base.py index 5866b5e2..7edbc198 100644 --- a/daemon/core/nodes/base.py +++ b/daemon/core/nodes/base.py @@ -694,7 +694,7 @@ class CoreNode(CoreNodeBase): finally: self.rmnodedir() - def _create_cmd(self, args: str, shell: bool = False) -> str: + def create_cmd(self, args: str, shell: bool = False) -> str: """ Create command used to run commands within the context of a node. @@ -717,7 +717,7 @@ class CoreNode(CoreNodeBase): :return: combined stdout and stderr :raises CoreCommandError: when a non-zero exit status occurs """ - args = self._create_cmd(args, shell) + args = self.create_cmd(args, shell) if self.server is None: return utils.cmd(args, wait=wait, shell=shell) else: @@ -743,7 +743,7 @@ class CoreNode(CoreNodeBase): :param sh: shell to execute command in :return: str """ - terminal = self._create_cmd(sh) + terminal = self.create_cmd(sh) if self.server is None: return terminal else: diff --git a/daemon/core/nodes/docker.py b/daemon/core/nodes/docker.py index d454ea42..41625c78 100644 --- a/daemon/core/nodes/docker.py +++ b/daemon/core/nodes/docker.py @@ -89,7 +89,7 @@ class DockerNode(CoreNode): """ return DockerOptions() - def _create_cmd(self, args: str, shell: bool = False) -> str: + def create_cmd(self, args: str, shell: bool = False) -> str: """ Create command used to run commands within the context of a node. diff --git a/daemon/core/nodes/lxd.py b/daemon/core/nodes/lxd.py index 8bf02ae4..33b8d466 100644 --- a/daemon/core/nodes/lxd.py +++ b/daemon/core/nodes/lxd.py @@ -62,7 +62,7 @@ class LxcNode(CoreNode): def create_options(cls) -> LxcOptions: return LxcOptions() - def _create_cmd(self, args: str, shell: bool = False) -> str: + def create_cmd(self, args: str, shell: bool = False) -> str: """ Create command used to run commands within the context of a node. diff --git a/daemon/core/nodes/physical.py b/daemon/core/nodes/physical.py index 02c200db..8ab13f20 100644 --- a/daemon/core/nodes/physical.py +++ b/daemon/core/nodes/physical.py @@ -249,7 +249,7 @@ class PhysicalNode(CoreNode): iface.shutdown() self.rmnodedir() - def _create_cmd(self, args: str, shell: bool = False) -> str: + def create_cmd(self, args: str, shell: bool = False) -> str: if shell: args = f'{BASH} -c "{args}"' return args diff --git a/daemon/core/utils.py b/daemon/core/utils.py index d2308f30..244590f8 100644 --- a/daemon/core/utils.py +++ b/daemon/core/utils.py @@ -216,8 +216,7 @@ def cmd( shell: bool = False, ) -> str: """ - Execute a command on the host and return a tuple containing the exit status and - result string. stderr output is folded into the stdout result string. + Execute a command on the host and returns the combined stderr stdout output. :param args: command arguments :param env: environment to run command with @@ -250,6 +249,25 @@ def cmd( raise CoreCommandError(1, input_args, "", e.strerror) +def run_cmds(args: List[str], wait: bool = True, shell: bool = False) -> List[str]: + """ + Execute a series of commands on the host and returns a list of the combined stderr + stdout output. + + :param args: command arguments + :param wait: True to wait for status, False otherwise + :param shell: True to use shell, False otherwise + :return: combined stdout and stderr + :raises CoreCommandError: when there is a non-zero exit status or the file to + execute is not found + """ + outputs = [] + for arg in args: + output = cmd(arg, wait=wait, shell=shell) + outputs.append(output) + return outputs + + def file_munge(pathname: str, header: str, text: str) -> None: """ Insert text at the end of a file, surrounded by header comments. @@ -407,6 +425,101 @@ def load_logging_config(config_path: Path) -> None: logging.config.dictConfig(log_config) +def run_cmds_threaded( + nodes: List["CoreNode"], + cmds: List[str], + wait: bool = True, + shell: bool = False, + workers: int = None, +) -> Tuple[Dict[int, List[str]], List[Exception]]: + """ + Run a set of commands in order across a provided set of nodes. Each node will + run the commands within the context of a threadpool. + + :param nodes: nodes to run commands in + :param cmds: commands to run in nodes + :param wait: True to wait for status, False otherwise + :param shell: True to run shell like, False otherwise + :param workers: number of workers for threadpool, uses library default otherwise + :return: tuple including dict of node id to list of command output and a list of + exceptions if any + """ + + def _node_cmds( + _target: "CoreNode", _cmds: List[str], _wait: bool, _shell: bool + ) -> List[str]: + outputs = [] + for _cmd in _cmds: + output = _target.cmd(_cmd, wait=_wait, shell=_shell) + outputs.append(output) + return outputs + + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + futures = [] + node_mappings = {} + for node in nodes: + future = executor.submit(_node_cmds, node, cmds, wait, shell) + node_mappings[future] = node + futures.append(future) + outputs = {} + exceptions = [] + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + node = node_mappings[future] + outputs[node.id] = result + except Exception as e: + logger.exception("thread pool exception") + exceptions.append(e) + return outputs, exceptions + + +def run_cmds_mp( + nodes: List["CoreNode"], + cmds: List[str], + wait: bool = True, + shell: bool = False, + workers: int = None, +) -> Tuple[Dict[int, List[str]], List[Exception]]: + """ + Run a set of commands in order across a provided set of nodes. Each node will + run the commands within the context of a process pool. This will not work + for distributed nodes and throws an exception when encountered. + + :param nodes: nodes to run commands in + :param cmds: commands to run in nodes + :param wait: True to wait for status, False otherwise + :param shell: True to run shell like, False otherwise + :param workers: number of workers for threadpool, uses library default otherwise + :return: tuple including dict of node id to list of command output and a list of + exceptions if any + :raises CoreError: when a distributed node is provided as input + """ + with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: + futures = [] + node_mapping = {} + for node in nodes: + node_cmds = [node.create_cmd(x) for x in cmds] + if node.server: + raise CoreError( + f"{node.name} uses a distributed server and not supported" + ) + future = executor.submit(run_cmds, node_cmds, wait=wait, shell=shell) + node_mapping[future] = node + futures.append(future) + exceptions = [] + outputs = {} + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + node = node_mapping[future] + outputs[node.id] = result + except Exception as e: + logger.exception("thread pool exception") + exceptions.append(e) + return outputs, exceptions + + def threadpool( funcs: List[Tuple[Callable, Iterable[Any], Dict[Any, Any]]], workers: int = 10 ) -> Tuple[List[Any], List[Exception]]: From 3c28ea373a701e46d6de39644b872e88289c7922 Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Fri, 10 Jun 2022 14:53:49 -0700 Subject: [PATCH 4/4] daemon: adjustments to fix terminal command string generation for docker/lxd nodes to account for being on a distributed server --- daemon/core/nodes/docker.py | 6 +++++- daemon/core/nodes/lxd.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/daemon/core/nodes/docker.py b/daemon/core/nodes/docker.py index 41625c78..45d2b892 100644 --- a/daemon/core/nodes/docker.py +++ b/daemon/core/nodes/docker.py @@ -188,7 +188,11 @@ class DockerNode(CoreNode): :param sh: shell to execute command in :return: str """ - return f"{DOCKER} exec -it {self.name} {sh}" + terminal = f"{DOCKER} exec -it {self.name} {sh}" + if self.server is None: + return terminal + else: + return f"ssh -X -f {self.server.host} xterm -e {terminal}" def create_dir(self, dir_path: Path) -> None: """ diff --git a/daemon/core/nodes/lxd.py b/daemon/core/nodes/lxd.py index 33b8d466..01bd2db7 100644 --- a/daemon/core/nodes/lxd.py +++ b/daemon/core/nodes/lxd.py @@ -130,7 +130,11 @@ class LxcNode(CoreNode): :param sh: shell to execute command in :return: str """ - return f"lxc exec {self.name} -- {sh}" + terminal = f"lxc exec {self.name} -- {sh}" + if self.server is None: + return terminal + else: + return f"ssh -X -f {self.server.host} xterm -e {terminal}" def create_dir(self, dir_path: Path) -> None: """