daemon: adjusted cmd utilities to support running different sets of commands for each node

This commit is contained in:
Blake Harnden 2023-07-31 11:08:05 -07:00
parent 2cb8ec2fb2
commit e3715e188c

View file

@ -414,18 +414,16 @@ def load_logging_config(config_path: Path) -> None:
def run_cmds_threaded( def run_cmds_threaded(
nodes: list["CoreNode"], node_cmds: list[tuple["CoreNode", list[str]]],
cmds: list[str],
wait: bool = True, wait: bool = True,
shell: bool = False, shell: bool = False,
workers: int = None, workers: int = None,
) -> tuple[dict[int, list[str]], list[Exception]]: ) -> tuple[dict[int, list[str]], list[Exception]]:
""" """
Run a set of commands in order across a provided set of nodes. Each node will Run the set of commands for the node provided. Each node will
run the commands within the context of a threadpool. run the commands within the context of a threadpool.
:param nodes: nodes to run commands in :param node_cmds: list of tuples of nodes and commands to run within them
:param cmds: commands to run in nodes
:param wait: True to wait for status, False otherwise :param wait: True to wait for status, False otherwise
:param shell: True to run shell like, False otherwise :param shell: True to run shell like, False otherwise
:param workers: number of workers for threadpool, uses library default otherwise :param workers: number of workers for threadpool, uses library default otherwise
@ -436,16 +434,16 @@ def run_cmds_threaded(
def _node_cmds( def _node_cmds(
_target: "CoreNode", _cmds: list[str], _wait: bool, _shell: bool _target: "CoreNode", _cmds: list[str], _wait: bool, _shell: bool
) -> list[str]: ) -> list[str]:
outputs = [] cmd_outputs = []
for _cmd in _cmds: for _cmd in _cmds:
output = _target.cmd(_cmd, wait=_wait, shell=_shell) output = _target.cmd(_cmd, wait=_wait, shell=_shell)
outputs.append(output) cmd_outputs.append(output)
return outputs return cmd_outputs
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [] futures = []
node_mappings = {} node_mappings = {}
for node in nodes: for node, cmds in node_cmds:
future = executor.submit(_node_cmds, node, cmds, wait, shell) future = executor.submit(_node_cmds, node, cmds, wait, shell)
node_mappings[future] = node node_mappings[future] = node
futures.append(future) futures.append(future)
@ -463,19 +461,17 @@ def run_cmds_threaded(
def run_cmds_mp( def run_cmds_mp(
nodes: list["CoreNode"], node_cmds: list[tuple["CoreNode", list[str]]],
cmds: list[str],
wait: bool = True, wait: bool = True,
shell: bool = False, shell: bool = False,
workers: int = None, workers: int = None,
) -> tuple[dict[int, list[str]], list[Exception]]: ) -> tuple[dict[int, list[str]], list[Exception]]:
""" """
Run a set of commands in order across a provided set of nodes. Each node will Run the set of commands for the node provided. Each node will
run the commands within the context of a process pool. This will not work run the commands within the context of a process pool. This will not work
for distributed nodes and throws an exception when encountered. for distributed nodes and throws an exception when encountered.
:param nodes: nodes to run commands in :param node_cmds: list of tuples of nodes and commands to run within them
:param cmds: commands to run in nodes
:param wait: True to wait for status, False otherwise :param wait: True to wait for status, False otherwise
:param shell: True to run shell like, False otherwise :param shell: True to run shell like, False otherwise
:param workers: number of workers for threadpool, uses library default otherwise :param workers: number of workers for threadpool, uses library default otherwise
@ -486,7 +482,7 @@ def run_cmds_mp(
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = [] futures = []
node_mapping = {} node_mapping = {}
for node in nodes: for node, cmds in node_cmds:
node_cmds = [node.create_cmd(x) for x in cmds] node_cmds = [node.create_cmd(x) for x in cmds]
if node.server: if node.server:
raise CoreError( raise CoreError(