core-extra/daemon/core/emulator/distributed.py

253 lines
8.4 KiB
Python

"""
Defines distributed server functionality.
"""
import logging
import os
import threading
from collections import OrderedDict
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Callable, Dict, Tuple
import netaddr
from fabric import Connection
from invoke import UnexpectedExit
from core import utils
from core.errors import CoreCommandError, CoreError
from core.executables import get_requirements
from core.nodes.interface import GreTap
from core.nodes.network import CoreNetwork, CtrlNet
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from core.emulator.session import Session
LOCK = threading.Lock()
CMD_HIDE = True
class DistributedServer:
"""
Provides distributed server interactions.
"""
def __init__(self, name: str, host: str) -> None:
"""
Create a DistributedServer instance.
:param name: convenience name to associate with host
:param host: host to connect to
"""
self.name: str = name
self.host: str = host
self.conn: Connection = Connection(host, user="root")
self.lock: threading.Lock = threading.Lock()
def remote_cmd(
self, cmd: str, env: Dict[str, str] = None, cwd: str = None, wait: bool = True
) -> str:
"""
Run command remotely using server connection.
:param cmd: command to run
:param env: environment for remote command, default is None
:param cwd: directory to run command in, defaults to None, which is the
user's home directory
:param wait: True to wait for status, False to background process
:return: stdout when success
:raises CoreCommandError: when a non-zero exit status occurs
"""
replace_env = env is not None
if not wait:
cmd += " &"
logger.debug(
"remote cmd server(%s) cwd(%s) wait(%s): %s", self.host, cwd, wait, cmd
)
try:
if cwd is None:
result = self.conn.run(
cmd, hide=CMD_HIDE, env=env, replace_env=replace_env
)
else:
with self.conn.cd(cwd):
result = self.conn.run(
cmd, hide=CMD_HIDE, env=env, replace_env=replace_env
)
return result.stdout.strip()
except UnexpectedExit as e:
stdout, stderr = e.streams_for_display()
raise CoreCommandError(e.result.exited, cmd, stdout, stderr)
def remote_put(self, src_path: Path, dst_path: Path) -> None:
"""
Push file to remote server.
:param src_path: source file to push
:param dst_path: destination file location
:return: nothing
"""
with self.lock:
self.conn.put(str(src_path), str(dst_path))
def remote_put_temp(self, dst_path: Path, data: str) -> None:
"""
Remote push file contents to a remote server, using a temp file as an
intermediate step.
:param dst_path: file destination for data
:param data: data to store in remote file
:return: nothing
"""
with self.lock:
temp = NamedTemporaryFile(delete=False)
temp.write(data.encode("utf-8"))
temp.close()
self.conn.put(temp.name, str(dst_path))
os.unlink(temp.name)
class DistributedController:
"""
Provides logic for dealing with remote tunnels and distributed servers.
"""
def __init__(self, session: "Session") -> None:
"""
Create
:param session: session
"""
self.session: "Session" = session
self.servers: Dict[str, DistributedServer] = OrderedDict()
self.tunnels: Dict[int, Tuple[GreTap, GreTap]] = {}
self.address: str = self.session.options.get_config(
"distributed_address", default=None
)
def add_server(self, name: str, host: str) -> None:
"""
Add distributed server configuration.
:param name: distributed server name
:param host: distributed server host address
:return: nothing
:raises CoreError: when there is an error validating server
"""
server = DistributedServer(name, host)
for requirement in get_requirements(self.session.use_ovs()):
try:
server.remote_cmd(f"which {requirement}")
except CoreCommandError:
raise CoreError(
f"server({server.name}) failed validation for "
f"command({requirement})"
)
self.servers[name] = server
cmd = f"mkdir -p {self.session.directory}"
server.remote_cmd(cmd)
def execute(self, func: Callable[[DistributedServer], None]) -> None:
"""
Convenience for executing logic against all distributed servers.
:param func: function to run, that takes a DistributedServer as a parameter
:return: nothing
"""
for name in self.servers:
server = self.servers[name]
func(server)
def shutdown(self) -> None:
"""
Shutdown logic for dealing with distributed tunnels and server session
directories.
:return: nothing
"""
# shutdown all tunnels
for key in self.tunnels:
tunnels = self.tunnels[key]
for tunnel in tunnels:
tunnel.shutdown()
# remove all remote session directories
for name in self.servers:
server = self.servers[name]
cmd = f"rm -rf {self.session.directory}"
server.remote_cmd(cmd)
# clear tunnels
self.tunnels.clear()
def start(self) -> None:
"""
Start distributed network tunnels.
:return: nothing
"""
mtu = self.session.options.get_config_int("mtu")
for node_id in self.session.nodes:
node = self.session.nodes[node_id]
if not isinstance(node, CoreNetwork):
continue
if isinstance(node, CtrlNet) and node.serverintf is not None:
continue
for name in self.servers:
server = self.servers[name]
self.create_gre_tunnel(node, server, mtu, True)
def create_gre_tunnel(
self, node: CoreNetwork, server: DistributedServer, mtu: int, start: bool
) -> Tuple[GreTap, GreTap]:
"""
Create gre tunnel using a pair of gre taps between the local and remote server.
:param node: node to create gre tunnel for
:param server: server to create tunnel for
:param mtu: mtu for gre taps
:param start: True to start gre taps, False otherwise
:return: local and remote gre taps created for tunnel
"""
host = server.host
key = self.tunnel_key(node.id, netaddr.IPAddress(host).value)
tunnel = self.tunnels.get(key)
if tunnel is not None:
return tunnel
# local to server
logger.info("local tunnel node(%s) to remote(%s) key(%s)", node.name, host, key)
local_tap = GreTap(self.session, host, key=key, mtu=mtu)
if start:
local_tap.startup()
local_tap.net_client.set_iface_master(node.brname, local_tap.localname)
# server to local
logger.info(
"remote tunnel node(%s) to local(%s) key(%s)", node.name, self.address, key
)
remote_tap = GreTap(self.session, self.address, key=key, server=server, mtu=mtu)
if start:
remote_tap.startup()
remote_tap.net_client.set_iface_master(node.brname, remote_tap.localname)
# save tunnels for shutdown
tunnel = (local_tap, remote_tap)
self.tunnels[key] = tunnel
return tunnel
def tunnel_key(self, node1_id: int, node2_id: int) -> int:
"""
Compute a 32-bit key used to uniquely identify a GRE tunnel.
The hash(n1num), hash(n2num) values are used, so node numbers may be
None or string values (used for e.g. "ctrlnet").
:param node1_id: node one id
:param node2_id: node two id
:return: tunnel key for the node pair
"""
logger.debug("creating tunnel key for: %s, %s", node1_id, node2_id)
key = (
(self.session.id << 16)
^ utils.hashkey(node1_id)
^ (utils.hashkey(node2_id) << 8)
)
return key & 0xFFFFFFFF