Merge branch 'develop' of https://github.com/coreemu/core into develop

This commit is contained in:
Blake Harnden 2022-07-21 15:41:48 -07:00
commit cd6bb319ad
6 changed files with 136 additions and 12 deletions

View file

@ -3,6 +3,7 @@ Defines the base logic for nodes used within core.
"""
import abc
import logging
import shlex
import shutil
import threading
from dataclasses import dataclass, field
@ -693,7 +694,7 @@ class CoreNode(CoreNodeBase):
finally:
self.rmnodedir()
def _create_cmd(self, args: str, shell: bool = False) -> str:
def create_cmd(self, args: str, shell: bool = False) -> str:
"""
Create command used to run commands within the context of a node.
@ -702,7 +703,7 @@ class CoreNode(CoreNodeBase):
:return: node command
"""
if shell:
args = f'{BASH} -c "{args}"'
args = f"{BASH} -c {shlex.quote(args)}"
return f"{VCMD} -c {self.ctrlchnlname} -- {args}"
def cmd(self, args: str, wait: bool = True, shell: bool = False) -> str:
@ -716,7 +717,7 @@ class CoreNode(CoreNodeBase):
:return: combined stdout and stderr
:raises CoreCommandError: when a non-zero exit status occurs
"""
args = self._create_cmd(args, shell)
args = self.create_cmd(args, shell)
if self.server is None:
return utils.cmd(args, wait=wait, shell=shell)
else:
@ -742,7 +743,7 @@ class CoreNode(CoreNodeBase):
:param sh: shell to execute command in
:return: str
"""
terminal = self._create_cmd(sh)
terminal = self.create_cmd(sh)
if self.server is None:
return terminal
else:

View file

@ -89,7 +89,7 @@ class DockerNode(CoreNode):
"""
return DockerOptions()
def _create_cmd(self, args: str, shell: bool = False) -> str:
def create_cmd(self, args: str, shell: bool = False) -> str:
"""
Create command used to run commands within the context of a node.
@ -142,8 +142,9 @@ class DockerNode(CoreNode):
volumes += (
f"--mount type=volume," f"source={volume.src},target={volume.dst} "
)
hostname = self.name.replace("_", "-")
self.host_cmd(
f"{DOCKER} run -td --init --net=none --hostname {self.name} "
f"{DOCKER} run -td --init --net=none --hostname {hostname} "
f"--name {self.name} --sysctl net.ipv6.conf.all.disable_ipv6=0 "
f"{binds} {volumes} "
f"--privileged {self.image} tail -f /dev/null"
@ -187,7 +188,11 @@ class DockerNode(CoreNode):
:param sh: shell to execute command in
:return: str
"""
return f"{DOCKER} exec -it {self.name} {sh}"
terminal = f"{DOCKER} exec -it {self.name} {sh}"
if self.server is None:
return terminal
else:
return f"ssh -X -f {self.server.host} xterm -e {terminal}"
def create_dir(self, dir_path: Path) -> None:
"""

View file

@ -62,7 +62,7 @@ class LxcNode(CoreNode):
def create_options(cls) -> LxcOptions:
return LxcOptions()
def _create_cmd(self, args: str, shell: bool = False) -> str:
def create_cmd(self, args: str, shell: bool = False) -> str:
"""
Create command used to run commands within the context of a node.
@ -130,7 +130,11 @@ class LxcNode(CoreNode):
:param sh: shell to execute command in
:return: str
"""
return f"lxc exec {self.name} -- {sh}"
terminal = f"lxc exec {self.name} -- {sh}"
if self.server is None:
return terminal
else:
return f"ssh -X -f {self.server.host} xterm -e {terminal}"
def create_dir(self, dir_path: Path) -> None:
"""

View file

@ -28,6 +28,7 @@ class LinuxNetClient:
:param name: name for hostname
:return: nothing
"""
name = name.replace("_", "-")
self.run(f"hostname {name}")
def create_route(self, route: str, device: str) -> None:

View file

@ -249,7 +249,7 @@ class PhysicalNode(CoreNode):
iface.shutdown()
self.rmnodedir()
def _create_cmd(self, args: str, shell: bool = False) -> str:
def create_cmd(self, args: str, shell: bool = False) -> str:
if shell:
args = f'{BASH} -c "{args}"'
return args

View file

@ -216,8 +216,7 @@ def cmd(
shell: bool = False,
) -> str:
"""
Execute a command on the host and return a tuple containing the exit status and
result string. stderr output is folded into the stdout result string.
Execute a command on the host and returns the combined stderr stdout output.
:param args: command arguments
:param env: environment to run command with
@ -250,6 +249,25 @@ def cmd(
raise CoreCommandError(1, input_args, "", e.strerror)
def run_cmds(args: List[str], wait: bool = True, shell: bool = False) -> List[str]:
"""
Execute a series of commands on the host and returns a list of the combined stderr
stdout output.
:param args: command arguments
:param wait: True to wait for status, False otherwise
:param shell: True to use shell, False otherwise
:return: combined stdout and stderr
:raises CoreCommandError: when there is a non-zero exit status or the file to
execute is not found
"""
outputs = []
for arg in args:
output = cmd(arg, wait=wait, shell=shell)
outputs.append(output)
return outputs
def file_munge(pathname: str, header: str, text: str) -> None:
"""
Insert text at the end of a file, surrounded by header comments.
@ -407,6 +425,101 @@ def load_logging_config(config_path: Path) -> None:
logging.config.dictConfig(log_config)
def run_cmds_threaded(
nodes: List["CoreNode"],
cmds: List[str],
wait: bool = True,
shell: bool = False,
workers: int = None,
) -> Tuple[Dict[int, List[str]], List[Exception]]:
"""
Run a set of commands in order across a provided set of nodes. Each node will
run the commands within the context of a threadpool.
:param nodes: nodes to run commands in
:param cmds: commands to run in nodes
:param wait: True to wait for status, False otherwise
:param shell: True to run shell like, False otherwise
:param workers: number of workers for threadpool, uses library default otherwise
:return: tuple including dict of node id to list of command output and a list of
exceptions if any
"""
def _node_cmds(
_target: "CoreNode", _cmds: List[str], _wait: bool, _shell: bool
) -> List[str]:
outputs = []
for _cmd in _cmds:
output = _target.cmd(_cmd, wait=_wait, shell=_shell)
outputs.append(output)
return outputs
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
node_mappings = {}
for node in nodes:
future = executor.submit(_node_cmds, node, cmds, wait, shell)
node_mappings[future] = node
futures.append(future)
outputs = {}
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
node = node_mappings[future]
outputs[node.id] = result
except Exception as e:
logger.exception("thread pool exception")
exceptions.append(e)
return outputs, exceptions
def run_cmds_mp(
nodes: List["CoreNode"],
cmds: List[str],
wait: bool = True,
shell: bool = False,
workers: int = None,
) -> Tuple[Dict[int, List[str]], List[Exception]]:
"""
Run a set of commands in order across a provided set of nodes. Each node will
run the commands within the context of a process pool. This will not work
for distributed nodes and throws an exception when encountered.
:param nodes: nodes to run commands in
:param cmds: commands to run in nodes
:param wait: True to wait for status, False otherwise
:param shell: True to run shell like, False otherwise
:param workers: number of workers for threadpool, uses library default otherwise
:return: tuple including dict of node id to list of command output and a list of
exceptions if any
:raises CoreError: when a distributed node is provided as input
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = []
node_mapping = {}
for node in nodes:
node_cmds = [node.create_cmd(x) for x in cmds]
if node.server:
raise CoreError(
f"{node.name} uses a distributed server and not supported"
)
future = executor.submit(run_cmds, node_cmds, wait=wait, shell=shell)
node_mapping[future] = node
futures.append(future)
exceptions = []
outputs = {}
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
node = node_mapping[future]
outputs[node.id] = result
except Exception as e:
logger.exception("thread pool exception")
exceptions.append(e)
return outputs, exceptions
def threadpool(
funcs: List[Tuple[Callable, Iterable[Any], Dict[Any, Any]]], workers: int = 10
) -> Tuple[List[Any], List[Exception]]: