separated distributed session logic into its own class to help reduce session.py size as it is already too big

This commit is contained in:
Blake Harnden 2019-10-17 11:10:59 -07:00
parent 0ef06a0167
commit e94a6d1afa
14 changed files with 196 additions and 172 deletions

View file

@ -1192,9 +1192,9 @@ class CoreHandler(socketserver.BaseRequestHandler):
for server in server_list: for server in server_list:
server_items = server.split(":") server_items = server.split(":")
name, host, _ = server_items[:3] name, host, _ = server_items[:3]
self.session.add_distributed(name, host) self.session.distributed.add_server(name, host)
elif message_type == ConfigFlags.RESET: elif message_type == ConfigFlags.RESET:
self.session.shutdown_distributed() self.session.distributed.shutdown()
def handle_config_services(self, message_type, config_data): def handle_config_services(self, message_type, config_data):
replies = [] replies = []

View file

@ -142,9 +142,7 @@ class EmaneManager(ModelManager):
args = "emane --version" args = "emane --version"
emane_version = utils.check_cmd(args) emane_version = utils.check_cmd(args)
logging.info("using EMANE: %s", emane_version) logging.info("using EMANE: %s", emane_version)
for name in self.session.servers: self.session.distributed.execute(lambda x: x.remote_cmd(args))
server = self.session.servers[name]
server.remote_cmd(args)
# load default emane models # load default emane models
self.load_models(EMANE_MODELS) self.load_models(EMANE_MODELS)
@ -518,11 +516,11 @@ class EmaneManager(ModelManager):
dev = self.get_config("eventservicedevice") dev = self.get_config("eventservicedevice")
emanexml.create_event_service_xml(group, port, dev, self.session.session_dir) emanexml.create_event_service_xml(group, port, dev, self.session.session_dir)
for name in self.session.servers: self.session.distributed.execute(
conn = self.session.servers[name] lambda x: emanexml.create_event_service_xml(
emanexml.create_event_service_xml( group, port, dev, self.session.session_dir, x
group, port, dev, self.session.session_dir, conn
) )
)
def startdaemons(self): def startdaemons(self):
""" """
@ -598,9 +596,7 @@ class EmaneManager(ModelManager):
emanecmd += " -f %s" % os.path.join(path, "emane.log") emanecmd += " -f %s" % os.path.join(path, "emane.log")
emanecmd += " %s" % os.path.join(path, "platform.xml") emanecmd += " %s" % os.path.join(path, "platform.xml")
utils.check_cmd(emanecmd, cwd=path) utils.check_cmd(emanecmd, cwd=path)
for name in self.session.servers: self.session.distributed.execute(lambda x: x.remote_cmd(emanecmd, cwd=path))
server = self.session.servers[name]
server.remote_cmd(emanecmd, cwd=path)
logging.info("host emane daemon running: %s", emanecmd) logging.info("host emane daemon running: %s", emanecmd)
def stopdaemons(self): def stopdaemons(self):
@ -625,10 +621,8 @@ class EmaneManager(ModelManager):
try: try:
utils.check_cmd(kill_emaned) utils.check_cmd(kill_emaned)
utils.check_cmd(kill_transortd) utils.check_cmd(kill_transortd)
for name in self.session.servers: self.session.distributed.execute(lambda x: x.remote_cmd(kill_emaned))
server = self.session[name] self.session.distributed.execute(lambda x: x.remote_cmd(kill_transortd))
server.remote_cmd(kill_emaned)
server.remote_cmd(kill_transortd)
except CoreCommandError: except CoreCommandError:
logging.exception("error shutting down emane daemons") logging.exception("error shutting down emane daemons")

View file

@ -5,12 +5,17 @@ Defines distributed server functionality.
import logging import logging
import os import os
import threading import threading
from collections import OrderedDict
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from fabric import Connection from fabric import Connection
from invoke import UnexpectedExit from invoke import UnexpectedExit
from core import utils
from core.errors import CoreCommandError from core.errors import CoreCommandError
from core.nodes.interface import GreTap
from core.nodes.ipaddress import IpAddress
from core.nodes.network import CoreNetwork, CtrlNet
LOCK = threading.Lock() LOCK = threading.Lock()
@ -93,3 +98,150 @@ class DistributedServer(object):
temp.close() temp.close()
self.conn.put(temp.name, destination) self.conn.put(temp.name, destination)
os.unlink(temp.name) os.unlink(temp.name)
class DistributedController(object):
def __init__(self, session):
"""
Create
:param session:
"""
self.session = session
self.servers = OrderedDict()
self.tunnels = {}
self.address = self.session.options.get_config(
"distributed_address", default=None
)
def add_server(self, name, host):
"""
Add distributed server configuration.
:param str name: distributed server name
:param str host: distributed server host address
:return: nothing
"""
server = DistributedServer(name, host)
self.servers[name] = server
cmd = "mkdir -p %s" % self.session.session_dir
server.remote_cmd(cmd)
def execute(self, func):
"""
Convenience for executing logic against all distributed servers.
:param func: function to run, that takes a DistributedServer as a parameter
:return: nothing
"""
for name in self.servers:
server = self.servers[name]
func(server)
def shutdown(self):
"""
Shutdown logic for dealing with distributed tunnels and server session
directories.
:return: nothing
"""
# shutdown all tunnels
for key in self.tunnels:
tunnels = self.tunnels[key]
for tunnel in tunnels:
tunnel.shutdown()
# remove all remote session directories
for name in self.servers:
server = self.servers[name]
cmd = "rm -rf %s" % self.session.session_dir
server.remote_cmd(cmd)
# clear tunnels
self.tunnels.clear()
def start(self):
"""
Start distributed network tunnels.
:return: nothing
"""
for node_id in self.session.nodes:
node = self.session.nodes[node_id]
if not isinstance(node, CoreNetwork):
continue
if isinstance(node, CtrlNet) and node.serverintf is not None:
continue
for name in self.servers:
server = self.servers[name]
self.create_gre_tunnel(node, server)
def create_gre_tunnel(self, node, server):
"""
Create gre tunnel using a pair of gre taps between the local and remote server.
:param core.nodes.network.CoreNetwork node: node to create gre tunnel for
:param core.emulator.distributed.DistributedServer server: server to create
tunnel for
:return: local and remote gre taps created for tunnel
:rtype: tuple
"""
host = server.host
key = self.tunnel_key(node.id, IpAddress.to_int(host))
tunnel = self.tunnels.get(key)
if tunnel is not None:
return tunnel
# local to server
logging.info(
"local tunnel node(%s) to remote(%s) key(%s)", node.name, host, key
)
local_tap = GreTap(session=self.session, remoteip=host, key=key)
local_tap.net_client.create_interface(node.brname, local_tap.localname)
# server to local
logging.info(
"remote tunnel node(%s) to local(%s) key(%s)", node.name, self.address, key
)
remote_tap = GreTap(
session=self.session, remoteip=self.address, key=key, server=server
)
remote_tap.net_client.create_interface(node.brname, remote_tap.localname)
# save tunnels for shutdown
tunnel = (local_tap, remote_tap)
self.tunnels[key] = tunnel
return tunnel
def tunnel_key(self, n1_id, n2_id):
"""
Compute a 32-bit key used to uniquely identify a GRE tunnel.
The hash(n1num), hash(n2num) values are used, so node numbers may be
None or string values (used for e.g. "ctrlnet").
:param int n1_id: node one id
:param int n2_id: node two id
:return: tunnel key for the node pair
:rtype: int
"""
logging.debug("creating tunnel key for: %s, %s", n1_id, n2_id)
key = (
(self.session.id << 16) ^ utils.hashkey(n1_id) ^ (utils.hashkey(n2_id) << 8)
)
return key & 0xFFFFFFFF
def get_tunnel(self, n1_id, n2_id):
"""
Return the GreTap between two nodes if it exists.
:param int n1_id: node one id
:param int n2_id: node two id
:return: gre tap between nodes or None
"""
key = self.tunnel_key(n1_id, n2_id)
logging.debug("checking for tunnel key(%s) in: %s", key, self.tunnels)
return self.tunnels.get(key)

View file

@ -18,7 +18,7 @@ from core import constants, utils
from core.emane.emanemanager import EmaneManager from core.emane.emanemanager import EmaneManager
from core.emane.nodes import EmaneNet from core.emane.nodes import EmaneNet
from core.emulator.data import EventData, ExceptionData, NodeData from core.emulator.data import EventData, ExceptionData, NodeData
from core.emulator.distributed import DistributedServer from core.emulator.distributed import DistributedController
from core.emulator.emudata import ( from core.emulator.emudata import (
IdGen, IdGen,
LinkOptions, LinkOptions,
@ -34,11 +34,9 @@ from core.location.event import EventLoop
from core.location.mobility import MobilityManager from core.location.mobility import MobilityManager
from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase
from core.nodes.docker import DockerNode from core.nodes.docker import DockerNode
from core.nodes.interface import GreTap from core.nodes.ipaddress import MacAddress
from core.nodes.ipaddress import IpAddress, MacAddress
from core.nodes.lxd import LxcNode from core.nodes.lxd import LxcNode
from core.nodes.network import ( from core.nodes.network import (
CoreNetwork,
CtrlNet, CtrlNet,
GreTapBridge, GreTapBridge,
HubNode, HubNode,
@ -137,10 +135,8 @@ class Session(object):
self.options.set_config(key, value) self.options.set_config(key, value)
self.metadata = SessionMetaData() self.metadata = SessionMetaData()
# distributed servers # distributed support and logic
self.servers = {} self.distributed = DistributedController(self)
self.tunnels = {}
self.address = self.options.get_config("distributed_address", default=None)
# initialize session feature helpers # initialize session feature helpers
self.location = CoreLocation() self.location = CoreLocation()
@ -158,123 +154,6 @@ class Session(object):
"host": ("DefaultRoute", "SSH"), "host": ("DefaultRoute", "SSH"),
} }
def add_distributed(self, name, host):
"""
Add distributed server configuration.
:param str name: distributed server name
:param str host: distributed server host address
:return: nothing
"""
server = DistributedServer(name, host)
self.servers[name] = server
cmd = "mkdir -p %s" % self.session_dir
server.remote_cmd(cmd)
def shutdown_distributed(self):
"""
Shutdown logic for dealing with distributed tunnels and server session
directories.
:return: nothing
"""
# shutdown all tunnels
for key in self.tunnels:
tunnels = self.tunnels[key]
for tunnel in tunnels:
tunnel.shutdown()
# remove all remote session directories
for name in self.servers:
server = self.servers[name]
cmd = "rm -rf %s" % self.session_dir
server.remote_cmd(cmd)
# clear tunnels
self.tunnels.clear()
def start_distributed(self):
"""
Start distributed network tunnels.
:return: nothing
"""
for node_id in self.nodes:
node = self.nodes[node_id]
if not isinstance(node, CoreNetwork):
continue
if isinstance(node, CtrlNet) and node.serverintf is not None:
continue
for name in self.servers:
server = self.servers[name]
self.create_gre_tunnel(node, server)
def create_gre_tunnel(self, node, server):
"""
Create gre tunnel using a pair of gre taps between the local and remote server.
:param core.nodes.network.CoreNetwork node: node to create gre tunnel for
:param core.emulator.distributed.DistributedServer server: server to create
tunnel for
:return: local and remote gre taps created for tunnel
:rtype: tuple
"""
host = server.host
key = self.tunnelkey(node.id, IpAddress.to_int(host))
tunnel = self.tunnels.get(key)
if tunnel is not None:
return tunnel
# local to server
logging.info(
"local tunnel node(%s) to remote(%s) key(%s)", node.name, host, key
)
local_tap = GreTap(session=self, remoteip=host, key=key)
local_tap.net_client.create_interface(node.brname, local_tap.localname)
# server to local
logging.info(
"remote tunnel node(%s) to local(%s) key(%s)", node.name, self.address, key
)
remote_tap = GreTap(session=self, remoteip=self.address, key=key, server=server)
remote_tap.net_client.create_interface(node.brname, remote_tap.localname)
# save tunnels for shutdown
tunnel = (local_tap, remote_tap)
self.tunnels[key] = tunnel
return tunnel
def tunnelkey(self, n1_id, n2_id):
"""
Compute a 32-bit key used to uniquely identify a GRE tunnel.
The hash(n1num), hash(n2num) values are used, so node numbers may be
None or string values (used for e.g. "ctrlnet").
:param int n1_id: node one id
:param int n2_id: node two id
:return: tunnel key for the node pair
:rtype: int
"""
logging.debug("creating tunnel key for: %s, %s", n1_id, n2_id)
key = (self.id << 16) ^ utils.hashkey(n1_id) ^ (utils.hashkey(n2_id) << 8)
return key & 0xFFFFFFFF
def gettunnel(self, n1_id, n2_id):
"""
Return the GreTap between two nodes if it exists.
:param int n1_id: node one id
:param int n2_id: node two id
:return: gre tap between nodes or None
"""
key = self.tunnelkey(n1_id, n2_id)
logging.debug("checking for tunnel key(%s) in: %s", key, self.tunnels)
return self.tunnels.get(key)
@classmethod @classmethod
def get_node_class(cls, _type): def get_node_class(cls, _type):
""" """
@ -324,7 +203,7 @@ class Session(object):
node_two = self.get_node(node_two_id) node_two = self.get_node(node_two_id)
# both node ids are provided # both node ids are provided
tunnel = self.gettunnel(node_one_id, node_two_id) tunnel = self.distributed.get_tunnel(node_one_id, node_two_id)
logging.debug("tunnel between nodes: %s", tunnel) logging.debug("tunnel between nodes: %s", tunnel)
if isinstance(tunnel, GreTapBridge): if isinstance(tunnel, GreTapBridge):
net_one = tunnel net_one = tunnel
@ -789,7 +668,7 @@ class Session(object):
name = "%s%s" % (node_class.__name__, _id) name = "%s%s" % (node_class.__name__, _id)
# verify distributed server # verify distributed server
server = self.servers.get(node_options.emulation_server) server = self.distributed.servers.get(node_options.emulation_server)
if node_options.emulation_server is not None and server is None: if node_options.emulation_server is not None and server is None:
raise CoreError( raise CoreError(
"invalid distributed server: %s" % node_options.emulation_server "invalid distributed server: %s" % node_options.emulation_server
@ -1003,7 +882,7 @@ class Session(object):
:return: nothing :return: nothing
""" """
self.delete_nodes() self.delete_nodes()
self.shutdown_distributed() self.distributed.shutdown()
self.del_hooks() self.del_hooks()
self.emane.reset() self.emane.reset()
@ -1082,7 +961,7 @@ class Session(object):
# remove and shutdown all nodes and tunnels # remove and shutdown all nodes and tunnels
self.delete_nodes() self.delete_nodes()
self.shutdown_distributed() self.distributed.shutdown()
# remove this sessions working directory # remove this sessions working directory
preserve = self.options.get_config("preservedir") == "1" preserve = self.options.get_config("preservedir") == "1"
@ -1594,7 +1473,7 @@ class Session(object):
self.add_remove_control_interface(node=None, remove=False) self.add_remove_control_interface(node=None, remove=False)
# initialize distributed tunnels # initialize distributed tunnels
self.start_distributed() self.distributed.start()
# instantiate will be invoked again upon Emane configure # instantiate will be invoked again upon Emane configure
if self.emane.startup() == self.emane.NOT_READY: if self.emane.startup() == self.emane.NOT_READY:

View file

@ -289,9 +289,7 @@ class CoreNetwork(CoreNetworkBase):
""" """
logging.info("network node(%s) cmd", self.name) logging.info("network node(%s) cmd", self.name)
output = utils.check_cmd(args, env, cwd, wait) output = utils.check_cmd(args, env, cwd, wait)
for name in self.session.servers: self.session.distributed.execute(lambda x: x.remote_cmd(args, env, cwd, wait))
server = self.session.servers[name]
server.remote_cmd(args, env, cwd, wait)
return output return output
def startup(self): def startup(self):
@ -778,8 +776,9 @@ class CtrlNet(CoreNetwork):
current = "%s/%s" % (address, self.prefix.prefixlen) current = "%s/%s" % (address, self.prefix.prefixlen)
net_client = get_net_client(use_ovs, utils.check_cmd) net_client = get_net_client(use_ovs, utils.check_cmd)
net_client.create_address(self.brname, current) net_client.create_address(self.brname, current)
for name in self.session.servers: servers = self.session.distributed.servers
server = self.session.servers[name] for name in servers:
server = servers[name]
address -= 1 address -= 1
current = "%s/%s" % (address, self.prefix.prefixlen) current = "%s/%s" % (address, self.prefix.prefixlen)
net_client = get_net_client(use_ovs, server.remote_cmd) net_client = get_net_client(use_ovs, server.remote_cmd)

View file

@ -166,7 +166,7 @@ class PhysicalNode(CoreNodeBase):
if self.up: if self.up:
# this is reached when this node is linked to a network node # this is reached when this node is linked to a network node
# tunnel to net not built yet, so build it now and adopt it # tunnel to net not built yet, so build it now and adopt it
_, remote_tap = self.session.create_gre_tunnel(net, self.server) _, remote_tap = self.session.distributed.create_gre_tunnel(net, self.server)
self.adoptnetif(remote_tap, ifindex, hwaddr, addrlist) self.adoptnetif(remote_tap, ifindex, hwaddr, addrlist)
return ifindex return ifindex
else: else:

View file

@ -314,9 +314,9 @@ def build_transport_xml(emane_manager, node, transport_type):
file_name = transport_file_name(node.id, transport_type) file_name = transport_file_name(node.id, transport_type)
file_path = os.path.join(emane_manager.session.session_dir, file_name) file_path = os.path.join(emane_manager.session.session_dir, file_name)
create_file(transport_element, doc_name, file_path) create_file(transport_element, doc_name, file_path)
for name in emane_manager.session.servers: emane_manager.session.distributed.execute(
server = emane_manager.session.servers[name] lambda x: create_file(transport_element, doc_name, file_path, x)
create_file(transport_element, doc_name, file_path, server) )
def create_phy_xml(emane_model, config, file_path, server): def create_phy_xml(emane_model, config, file_path, server):
@ -342,9 +342,9 @@ def create_phy_xml(emane_model, config, file_path, server):
create_file(phy_element, "phy", file_path, server) create_file(phy_element, "phy", file_path, server)
else: else:
create_file(phy_element, "phy", file_path) create_file(phy_element, "phy", file_path)
for name in emane_model.session.servers: emane_model.session.distributed.execute(
server = emane_model.session.servers[name] lambda x: create_file(phy_element, "phy", file_path, x)
create_file(phy_element, "phy", file_path, server) )
def create_mac_xml(emane_model, config, file_path, server): def create_mac_xml(emane_model, config, file_path, server):
@ -372,9 +372,9 @@ def create_mac_xml(emane_model, config, file_path, server):
create_file(mac_element, "mac", file_path, server) create_file(mac_element, "mac", file_path, server)
else: else:
create_file(mac_element, "mac", file_path) create_file(mac_element, "mac", file_path)
for name in emane_model.session.servers: emane_model.session.distributed.execute(
server = emane_model.session.servers[name] lambda x: create_file(mac_element, "mac", file_path, x)
create_file(mac_element, "mac", file_path, server) )
def create_nem_xml( def create_nem_xml(
@ -410,9 +410,9 @@ def create_nem_xml(
create_file(nem_element, "nem", nem_file, server) create_file(nem_element, "nem", nem_file, server)
else: else:
create_file(nem_element, "nem", nem_file) create_file(nem_element, "nem", nem_file)
for name in emane_model.session.servers: emane_model.session.distributed.execute(
server = emane_model.session.servers[name] lambda x: create_file(nem_element, "nem", nem_file, x)
create_file(nem_element, "nem", nem_file, server) )
def create_event_service_xml(group, port, device, file_directory, server=None): def create_event_service_xml(group, port, device, file_directory, server=None):

View file

@ -20,7 +20,7 @@ def main():
# initialize distributed # initialize distributed
server_name = "core2" server_name = "core2"
session.add_distributed(server_name, remote) session.distributed.add_server(server_name, remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)

View file

@ -27,7 +27,7 @@ def main():
# initialize distributed # initialize distributed
server_name = "core2" server_name = "core2"
session.add_distributed(server_name, remote) session.distributed.add_server(server_name, remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)

View file

@ -20,7 +20,7 @@ def main():
# initialize distributed # initialize distributed
server_name = "core2" server_name = "core2"
session.add_distributed(server_name, remote) session.distributed.add_server(server_name, remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)

View file

@ -20,7 +20,7 @@ def main():
# initialize distributed # initialize distributed
server_name = "core2" server_name = "core2"
session.add_distributed(server_name, remote) session.distributed.add_server(server_name, remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)

View file

@ -16,7 +16,7 @@ def main():
# initialize distributed # initialize distributed
server_name = "core2" server_name = "core2"
session.add_distributed(server_name, remote) session.distributed.add_server(server_name, remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)

View file

@ -21,7 +21,7 @@ def main():
# initialize distributed # initialize distributed
server_name = "core2" server_name = "core2"
session.add_distributed(server_name, remote) session.distributed.add_server(server_name, remote)
# must be in configuration state for nodes to start, when using "node_add" below # must be in configuration state for nodes to start, when using "node_add" below
session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)

View file

@ -763,11 +763,11 @@ class TestGui:
(ConfigTlvs.VALUES, "%s:%s:%s" % (server, host, port)), (ConfigTlvs.VALUES, "%s:%s:%s" % (server, host, port)),
], ],
) )
coreserver.session.add_distributed = mock.MagicMock() coreserver.session.distributed.add_server = mock.MagicMock()
coreserver.request_handler.handle_message(message) coreserver.request_handler.handle_message(message)
coreserver.session.add_distributed.assert_called_once_with(server, host) coreserver.session.distributed.add_server.assert_called_once_with(server, host)
def test_config_services_request_all(self, coreserver): def test_config_services_request_all(self, coreserver):
message = coreapi.CoreConfMessage.create( message = coreapi.CoreConfMessage.create(