daemon: updates to expose node.create_cmd and not be private, added utility functions for running multiple commands on multiple nodes more efficiently
This commit is contained in:
parent
60a48c7084
commit
9c69881aad
5 changed files with 121 additions and 8 deletions
|
@ -694,7 +694,7 @@ class CoreNode(CoreNodeBase):
|
||||||
finally:
|
finally:
|
||||||
self.rmnodedir()
|
self.rmnodedir()
|
||||||
|
|
||||||
def _create_cmd(self, args: str, shell: bool = False) -> str:
|
def create_cmd(self, args: str, shell: bool = False) -> str:
|
||||||
"""
|
"""
|
||||||
Create command used to run commands within the context of a node.
|
Create command used to run commands within the context of a node.
|
||||||
|
|
||||||
|
@ -717,7 +717,7 @@ class CoreNode(CoreNodeBase):
|
||||||
:return: combined stdout and stderr
|
:return: combined stdout and stderr
|
||||||
:raises CoreCommandError: when a non-zero exit status occurs
|
:raises CoreCommandError: when a non-zero exit status occurs
|
||||||
"""
|
"""
|
||||||
args = self._create_cmd(args, shell)
|
args = self.create_cmd(args, shell)
|
||||||
if self.server is None:
|
if self.server is None:
|
||||||
return utils.cmd(args, wait=wait, shell=shell)
|
return utils.cmd(args, wait=wait, shell=shell)
|
||||||
else:
|
else:
|
||||||
|
@ -743,7 +743,7 @@ class CoreNode(CoreNodeBase):
|
||||||
:param sh: shell to execute command in
|
:param sh: shell to execute command in
|
||||||
:return: str
|
:return: str
|
||||||
"""
|
"""
|
||||||
terminal = self._create_cmd(sh)
|
terminal = self.create_cmd(sh)
|
||||||
if self.server is None:
|
if self.server is None:
|
||||||
return terminal
|
return terminal
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -89,7 +89,7 @@ class DockerNode(CoreNode):
|
||||||
"""
|
"""
|
||||||
return DockerOptions()
|
return DockerOptions()
|
||||||
|
|
||||||
def _create_cmd(self, args: str, shell: bool = False) -> str:
|
def create_cmd(self, args: str, shell: bool = False) -> str:
|
||||||
"""
|
"""
|
||||||
Create command used to run commands within the context of a node.
|
Create command used to run commands within the context of a node.
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ class LxcNode(CoreNode):
|
||||||
def create_options(cls) -> LxcOptions:
|
def create_options(cls) -> LxcOptions:
|
||||||
return LxcOptions()
|
return LxcOptions()
|
||||||
|
|
||||||
def _create_cmd(self, args: str, shell: bool = False) -> str:
|
def create_cmd(self, args: str, shell: bool = False) -> str:
|
||||||
"""
|
"""
|
||||||
Create command used to run commands within the context of a node.
|
Create command used to run commands within the context of a node.
|
||||||
|
|
||||||
|
|
|
@ -249,7 +249,7 @@ class PhysicalNode(CoreNode):
|
||||||
iface.shutdown()
|
iface.shutdown()
|
||||||
self.rmnodedir()
|
self.rmnodedir()
|
||||||
|
|
||||||
def _create_cmd(self, args: str, shell: bool = False) -> str:
|
def create_cmd(self, args: str, shell: bool = False) -> str:
|
||||||
if shell:
|
if shell:
|
||||||
args = f'{BASH} -c "{args}"'
|
args = f'{BASH} -c "{args}"'
|
||||||
return args
|
return args
|
||||||
|
|
|
@ -216,8 +216,7 @@ def cmd(
|
||||||
shell: bool = False,
|
shell: bool = False,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Execute a command on the host and return a tuple containing the exit status and
|
Execute a command on the host and returns the combined stderr stdout output.
|
||||||
result string. stderr output is folded into the stdout result string.
|
|
||||||
|
|
||||||
:param args: command arguments
|
:param args: command arguments
|
||||||
:param env: environment to run command with
|
:param env: environment to run command with
|
||||||
|
@ -250,6 +249,25 @@ def cmd(
|
||||||
raise CoreCommandError(1, input_args, "", e.strerror)
|
raise CoreCommandError(1, input_args, "", e.strerror)
|
||||||
|
|
||||||
|
|
||||||
|
def run_cmds(args: List[str], wait: bool = True, shell: bool = False) -> List[str]:
|
||||||
|
"""
|
||||||
|
Execute a series of commands on the host and returns a list of the combined stderr
|
||||||
|
stdout output.
|
||||||
|
|
||||||
|
:param args: command arguments
|
||||||
|
:param wait: True to wait for status, False otherwise
|
||||||
|
:param shell: True to use shell, False otherwise
|
||||||
|
:return: combined stdout and stderr
|
||||||
|
:raises CoreCommandError: when there is a non-zero exit status or the file to
|
||||||
|
execute is not found
|
||||||
|
"""
|
||||||
|
outputs = []
|
||||||
|
for arg in args:
|
||||||
|
output = cmd(arg, wait=wait, shell=shell)
|
||||||
|
outputs.append(output)
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
def file_munge(pathname: str, header: str, text: str) -> None:
|
def file_munge(pathname: str, header: str, text: str) -> None:
|
||||||
"""
|
"""
|
||||||
Insert text at the end of a file, surrounded by header comments.
|
Insert text at the end of a file, surrounded by header comments.
|
||||||
|
@ -407,6 +425,101 @@ def load_logging_config(config_path: Path) -> None:
|
||||||
logging.config.dictConfig(log_config)
|
logging.config.dictConfig(log_config)
|
||||||
|
|
||||||
|
|
||||||
|
def run_cmds_threaded(
|
||||||
|
nodes: List["CoreNode"],
|
||||||
|
cmds: List[str],
|
||||||
|
wait: bool = True,
|
||||||
|
shell: bool = False,
|
||||||
|
workers: int = None,
|
||||||
|
) -> Tuple[Dict[int, List[str]], List[Exception]]:
|
||||||
|
"""
|
||||||
|
Run a set of commands in order across a provided set of nodes. Each node will
|
||||||
|
run the commands within the context of a threadpool.
|
||||||
|
|
||||||
|
:param nodes: nodes to run commands in
|
||||||
|
:param cmds: commands to run in nodes
|
||||||
|
:param wait: True to wait for status, False otherwise
|
||||||
|
:param shell: True to run shell like, False otherwise
|
||||||
|
:param workers: number of workers for threadpool, uses library default otherwise
|
||||||
|
:return: tuple including dict of node id to list of command output and a list of
|
||||||
|
exceptions if any
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _node_cmds(
|
||||||
|
_target: "CoreNode", _cmds: List[str], _wait: bool, _shell: bool
|
||||||
|
) -> List[str]:
|
||||||
|
outputs = []
|
||||||
|
for _cmd in _cmds:
|
||||||
|
output = _target.cmd(_cmd, wait=_wait, shell=_shell)
|
||||||
|
outputs.append(output)
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
|
||||||
|
futures = []
|
||||||
|
node_mappings = {}
|
||||||
|
for node in nodes:
|
||||||
|
future = executor.submit(_node_cmds, node, cmds, wait, shell)
|
||||||
|
node_mappings[future] = node
|
||||||
|
futures.append(future)
|
||||||
|
outputs = {}
|
||||||
|
exceptions = []
|
||||||
|
for future in concurrent.futures.as_completed(futures):
|
||||||
|
try:
|
||||||
|
result = future.result()
|
||||||
|
node = node_mappings[future]
|
||||||
|
outputs[node.id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("thread pool exception")
|
||||||
|
exceptions.append(e)
|
||||||
|
return outputs, exceptions
|
||||||
|
|
||||||
|
|
||||||
|
def run_cmds_mp(
|
||||||
|
nodes: List["CoreNode"],
|
||||||
|
cmds: List[str],
|
||||||
|
wait: bool = True,
|
||||||
|
shell: bool = False,
|
||||||
|
workers: int = None,
|
||||||
|
) -> Tuple[Dict[int, List[str]], List[Exception]]:
|
||||||
|
"""
|
||||||
|
Run a set of commands in order across a provided set of nodes. Each node will
|
||||||
|
run the commands within the context of a process pool. This will not work
|
||||||
|
for distributed nodes and throws an exception when encountered.
|
||||||
|
|
||||||
|
:param nodes: nodes to run commands in
|
||||||
|
:param cmds: commands to run in nodes
|
||||||
|
:param wait: True to wait for status, False otherwise
|
||||||
|
:param shell: True to run shell like, False otherwise
|
||||||
|
:param workers: number of workers for threadpool, uses library default otherwise
|
||||||
|
:return: tuple including dict of node id to list of command output and a list of
|
||||||
|
exceptions if any
|
||||||
|
:raises CoreError: when a distributed node is provided as input
|
||||||
|
"""
|
||||||
|
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
|
||||||
|
futures = []
|
||||||
|
node_mapping = {}
|
||||||
|
for node in nodes:
|
||||||
|
node_cmds = [node.create_cmd(x) for x in cmds]
|
||||||
|
if node.server:
|
||||||
|
raise CoreError(
|
||||||
|
f"{node.name} uses a distributed server and not supported"
|
||||||
|
)
|
||||||
|
future = executor.submit(run_cmds, node_cmds, wait=wait, shell=shell)
|
||||||
|
node_mapping[future] = node
|
||||||
|
futures.append(future)
|
||||||
|
exceptions = []
|
||||||
|
outputs = {}
|
||||||
|
for future in concurrent.futures.as_completed(futures):
|
||||||
|
try:
|
||||||
|
result = future.result()
|
||||||
|
node = node_mapping[future]
|
||||||
|
outputs[node.id] = result
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("thread pool exception")
|
||||||
|
exceptions.append(e)
|
||||||
|
return outputs, exceptions
|
||||||
|
|
||||||
|
|
||||||
def threadpool(
|
def threadpool(
|
||||||
funcs: List[Tuple[Callable, Iterable[Any], Dict[Any, Any]]], workers: int = 10
|
funcs: List[Tuple[Callable, Iterable[Any], Dict[Any, Any]]], workers: int = 10
|
||||||
) -> Tuple[List[Any], List[Exception]]:
|
) -> Tuple[List[Any], List[Exception]]:
|
||||||
|
|
Loading…
Reference in a new issue