refactor fabric distributed to use a class and update sessions to create and provide these to nodes

This commit is contained in:
Blake Harnden 2019-10-14 15:43:57 -07:00
parent 5f282bb695
commit 6570f22ccf
10 changed files with 153 additions and 133 deletions

View file

@ -1,8 +1,13 @@
"""
Defines distributed server functionality.
"""
import logging
import os
import threading
from tempfile import NamedTemporaryFile
from fabric import Connection
from invoke import UnexpectedExit
from core.errors import CoreCommandError
@ -10,52 +15,80 @@ from core.errors import CoreCommandError
LOCK = threading.Lock()
def remote_cmd(server, cmd, env=None, cwd=None, wait=True):
class DistributedServer(object):
"""
Run command remotely using server connection.
:param fabric.connection.Connection server: remote server node will run on,
default is None for localhost
:param str cmd: command to run
:param dict env: environment for remote command, default is None
:param str cwd: directory to run command in, defaults to None, which is the user's
home directory
:param bool wait: True to wait for status, False to background process
:return: stdout when success
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
Provides distributed server interactions.
"""
replace_env = env is not None
if not wait:
cmd += " &"
logging.info(
"remote cmd server(%s) cwd(%s) wait(%s): %s", server.host, cwd, wait, cmd
)
try:
with LOCK:
if cwd is None:
result = server.run(cmd, hide=False, env=env, replace_env=replace_env)
else:
with server.cd(cwd):
result = server.run(
def __init__(self, host):
"""
Create a DistributedServer instance.
:param str host: host to connect to
"""
self.host = host
self.conn = Connection(host, user="root")
self.lock = threading.Lock()
def remote_cmd(self, cmd, env=None, cwd=None, wait=True):
"""
Run command remotely using server connection.
:param str cmd: command to run
:param dict env: environment for remote command, default is None
:param str cwd: directory to run command in, defaults to None, which is the user's
home directory
:param bool wait: True to wait for status, False to background process
:return: stdout when success
:rtype: str
:raises CoreCommandError: when a non-zero exit status occurs
"""
replace_env = env is not None
if not wait:
cmd += " &"
logging.info(
"remote cmd server(%s) cwd(%s) wait(%s): %s", self.host, cwd, wait, cmd
)
try:
with self.lock:
if cwd is None:
result = self.conn.run(
cmd, hide=False, env=env, replace_env=replace_env
)
return result.stdout.strip()
except UnexpectedExit as e:
stdout, stderr = e.streams_for_display()
raise CoreCommandError(e.result.exited, cmd, stdout, stderr)
else:
with self.conn.cd(cwd):
result = self.conn.run(
cmd, hide=False, env=env, replace_env=replace_env
)
return result.stdout.strip()
except UnexpectedExit as e:
stdout, stderr = e.streams_for_display()
raise CoreCommandError(e.result.exited, cmd, stdout, stderr)
def remote_put(self, source, destination):
"""
Push file to remote server.
def remote_put(server, source, destination):
with LOCK:
server.put(source, destination)
:param str source: source file to push
:param str destination: destination file location
:return: nothing
"""
with self.lock:
self.conn.put(source, destination)
def remote_put_temp(self, destination, data):
"""
Remote push file contents to a remote server, using a temp file as an
intermediate step.
def remote_put_temp(server, destination, data):
with LOCK:
temp = NamedTemporaryFile(delete=False)
temp.write(data.encode("utf-8"))
temp.close()
server.put(temp.name, destination)
os.unlink(temp.name)
:param str destination: file destination for data
:param str data: data to store in remote file
:return: nothing
"""
with self.lock:
temp = NamedTemporaryFile(delete=False)
temp.write(data.encode("utf-8"))
temp.close()
self.conn.put(temp.name, destination)
os.unlink(temp.name)

View file

@ -14,14 +14,13 @@ import threading
import time
from multiprocessing.pool import ThreadPool
from fabric import Connection
from core import constants, utils
from core.api.tlv import coreapi
from core.api.tlv.broker import CoreBroker
from core.emane.emanemanager import EmaneManager
from core.emane.nodes import EmaneNet
from core.emulator.data import EventData, ExceptionData, NodeData
from core.emulator.distributed import DistributedServer
from core.emulator.emudata import (
IdGen,
LinkOptions,
@ -162,11 +161,11 @@ class Session(object):
"host": ("DefaultRoute", "SSH"),
}
def add_distributed(self, server):
conn = Connection(server, user="root")
self.servers[server] = conn
def add_distributed(self, host):
server = DistributedServer(host)
self.servers[host] = server
cmd = "mkdir -p %s" % self.session_dir
conn.run(cmd, hide=False)
server.remote_cmd(cmd)
def shutdown_distributed(self):
# shutdown all tunnels
@ -176,10 +175,10 @@ class Session(object):
tunnel.shutdown()
# remove all remote session directories
for server in self.servers:
conn = self.servers[server]
for host in self.servers:
server = self.servers[host]
cmd = "rm -rf %s" % self.session_dir
conn.run(cmd, hide=False)
server.remote_cmd(cmd)
# clear tunnels
self.tunnels.clear()
@ -194,18 +193,15 @@ class Session(object):
if isinstance(node, CtrlNet) and node.serverintf is not None:
continue
for server in self.servers:
conn = self.servers[server]
key = self.tunnelkey(node_id, IpAddress.to_int(server))
for host in self.servers:
server = self.servers[host]
key = self.tunnelkey(node_id, IpAddress.to_int(host))
# local to server
logging.info(
"local tunnel node(%s) to remote(%s) key(%s)",
node.name,
server,
key,
"local tunnel node(%s) to remote(%s) key(%s)", node.name, host, key
)
local_tap = GreTap(session=self, remoteip=server, key=key)
local_tap = GreTap(session=self, remoteip=host, key=key)
local_tap.net_client.create_interface(node.brname, local_tap.localname)
# server to local
@ -216,7 +212,7 @@ class Session(object):
key,
)
remote_tap = GreTap(
session=self, remoteip=self.address, key=key, server=conn
session=self, remoteip=self.address, key=key, server=server
)
remote_tap.net_client.create_interface(
node.brname, remote_tap.localname