From e3715e188c78dce34f74ab8dd09621dfd5027431 Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Mon, 31 Jul 2023 11:08:05 -0700 Subject: [PATCH] daemon: adjusted cmd utilities to support running different sets of commands for each node --- daemon/core/utils.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/daemon/core/utils.py b/daemon/core/utils.py index 2cfd3605..df00984c 100644 --- a/daemon/core/utils.py +++ b/daemon/core/utils.py @@ -414,18 +414,16 @@ def load_logging_config(config_path: Path) -> None: def run_cmds_threaded( - nodes: list["CoreNode"], - cmds: list[str], + node_cmds: list[tuple["CoreNode", list[str]]], wait: bool = True, shell: bool = False, workers: int = None, ) -> tuple[dict[int, list[str]], list[Exception]]: """ - Run a set of commands in order across a provided set of nodes. Each node will + Run the set of commands for the node provided. Each node will run the commands within the context of a threadpool. - :param nodes: nodes to run commands in - :param cmds: commands to run in nodes + :param node_cmds: list of tuples of nodes and commands to run within them :param wait: True to wait for status, False otherwise :param shell: True to run shell like, False otherwise :param workers: number of workers for threadpool, uses library default otherwise @@ -436,16 +434,16 @@ def run_cmds_threaded( def _node_cmds( _target: "CoreNode", _cmds: list[str], _wait: bool, _shell: bool ) -> list[str]: - outputs = [] + cmd_outputs = [] for _cmd in _cmds: output = _target.cmd(_cmd, wait=_wait, shell=_shell) - outputs.append(output) - return outputs + cmd_outputs.append(output) + return cmd_outputs with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: futures = [] node_mappings = {} - for node in nodes: + for node, cmds in node_cmds: future = executor.submit(_node_cmds, node, cmds, wait, shell) node_mappings[future] = node futures.append(future) @@ -463,19 +461,17 @@ def run_cmds_threaded( def run_cmds_mp( - nodes: list["CoreNode"], - cmds: list[str], + node_cmds: list[tuple["CoreNode", list[str]]], wait: bool = True, shell: bool = False, workers: int = None, ) -> tuple[dict[int, list[str]], list[Exception]]: """ - Run a set of commands in order across a provided set of nodes. Each node will + Run the set of commands for the node provided. Each node will run the commands within the context of a process pool. This will not work for distributed nodes and throws an exception when encountered. - :param nodes: nodes to run commands in - :param cmds: commands to run in nodes + :param node_cmds: list of tuples of nodes and commands to run within them :param wait: True to wait for status, False otherwise :param shell: True to run shell like, False otherwise :param workers: number of workers for threadpool, uses library default otherwise @@ -486,7 +482,7 @@ def run_cmds_mp( with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: futures = [] node_mapping = {} - for node in nodes: + for node, cmds in node_cmds: node_cmds = [node.create_cmd(x) for x in cmds] if node.server: raise CoreError(