removed broker from session, updated most places using broker to use alternative logic to compensate where needed

This commit is contained in:
Blake Harnden 2019-10-15 14:13:42 -07:00
parent 6570f22ccf
commit b2d2705849
19 changed files with 151 additions and 471 deletions

View file

@ -20,12 +20,14 @@ class DistributedServer(object):
Provides distributed server interactions.
"""
def __init__(self, host):
def __init__(self, name, host):
"""
Create a DistributedServer instance.
:param str name: convenience name to associate with host
:param str host: host to connect to
"""
self.name = name
self.host = host
self.conn = Connection(host, user="root")
self.lock = threading.Lock()
@ -36,8 +38,8 @@ class DistributedServer(object):
:param str cmd: command to run
:param dict env: environment for remote command, default is None
:param str cwd: directory to run command in, defaults to None, which is the user's
home directory
:param str cwd: directory to run command in, defaults to None, which is the
user's home directory
:param bool wait: True to wait for status, False to background process
:return: stdout when success
:rtype: str

View file

@ -15,8 +15,6 @@ import time
from multiprocessing.pool import ThreadPool
from core import constants, utils
from core.api.tlv import coreapi
from core.api.tlv.broker import CoreBroker
from core.emane.emanemanager import EmaneManager
from core.emane.nodes import EmaneNet
from core.emulator.data import EventData, ExceptionData, NodeData
@ -142,10 +140,9 @@ class Session(object):
# distributed servers
self.servers = {}
self.tunnels = {}
self.address = None
self.address = self.options.get_config("distributed_address", default=None)
# initialize session feature helpers
self.broker = CoreBroker(session=self)
self.location = CoreLocation()
self.mobility = MobilityManager(session=self)
self.services = CoreServices(session=self)
@ -161,9 +158,9 @@ class Session(object):
"host": ("DefaultRoute", "SSH"),
}
def add_distributed(self, host):
server = DistributedServer(host)
self.servers[host] = server
def add_distributed(self, name, host):
server = DistributedServer(name, host)
self.servers[name] = server
cmd = "mkdir -p %s" % self.session_dir
server.remote_cmd(cmd)
@ -175,8 +172,8 @@ class Session(object):
tunnel.shutdown()
# remove all remote session directories
for host in self.servers:
server = self.servers[host]
for name in self.servers:
server = self.servers[name]
cmd = "rm -rf %s" % self.session_dir
server.remote_cmd(cmd)
@ -193,8 +190,9 @@ class Session(object):
if isinstance(node, CtrlNet) and node.serverintf is not None:
continue
for host in self.servers:
server = self.servers[host]
for name in self.servers:
server = self.servers[name]
host = server.host
key = self.tunnelkey(node_id, IpAddress.to_int(host))
# local to server
@ -219,23 +217,35 @@ class Session(object):
)
# save tunnels for shutdown
self.tunnels[key] = [local_tap, remote_tap]
self.tunnels[key] = (local_tap, remote_tap)
def tunnelkey(self, n1num, n2num):
def tunnelkey(self, n1_id, n2_id):
"""
Compute a 32-bit key used to uniquely identify a GRE tunnel.
The hash(n1num), hash(n2num) values are used, so node numbers may be
None or string values (used for e.g. "ctrlnet").
:param int n1num: node one id
:param int n2num: node two id
:param int n1_id: node one id
:param int n2_id: node two id
:return: tunnel key for the node pair
:rtype: int
"""
logging.debug("creating tunnel key for: %s, %s", n1num, n2num)
key = (self.id << 16) ^ utils.hashkey(n1num) ^ (utils.hashkey(n2num) << 8)
logging.debug("creating tunnel key for: %s, %s", n1_id, n2_id)
key = (self.id << 16) ^ utils.hashkey(n1_id) ^ (utils.hashkey(n2_id) << 8)
return key & 0xFFFFFFFF
def gettunnel(self, n1_id, n2_id):
"""
Return the GreTap between two nodes if it exists.
:param int n1_id: node one id
:param int n2_id: node two id
:return: gre tap between nodes or None
"""
key = self.tunnelkey(n1_id, n2_id)
logging.debug("checking for tunnel key(%s) in: %s", key, self.tunnels)
return self.tunnels.get(key)
@classmethod
def get_node_class(cls, _type):
"""
@ -285,7 +295,7 @@ class Session(object):
node_two = self.get_node(node_two_id)
# both node ids are provided
tunnel = self.broker.gettunnel(node_one_id, node_two_id)
tunnel = self.gettunnel(node_one_id, node_two_id)
logging.debug("tunnel between nodes: %s", tunnel)
if isinstance(tunnel, GreTapBridge):
net_one = tunnel
@ -958,13 +968,13 @@ class Session(object):
def clear(self):
"""
Clear all CORE session data. (objects, hooks, broker)
Clear all CORE session data. (nodes, hooks, etc)
:return: nothing
"""
self.delete_nodes()
self.shutdown_distributed()
self.del_hooks()
self.broker.reset()
self.emane.reset()
def start_events(self):
@ -1038,17 +1048,16 @@ class Session(object):
# shutdown/cleanup feature helpers
self.emane.shutdown()
self.broker.shutdown()
self.sdt.shutdown()
# delete all current nodes
# remove and shutdown all nodes and tunnels
self.delete_nodes()
self.shutdown_distributed()
# remove this sessions working directory
preserve = self.options.get_config("preservedir") == "1"
if not preserve:
shutil.rmtree(self.session_dir, ignore_errors=True)
self.shutdown_distributed()
# call session shutdown handlers
for handler in self.shutdown_handlers:
@ -1160,7 +1169,7 @@ class Session(object):
"""
try:
state_file = open(self._state_file, "w")
state_file.write("%d %s\n" % (state, coreapi.state_name(state)))
state_file.write("%d %s\n" % (state, EventTypes(self.state).name))
state_file.close()
except IOError:
logging.exception("error writing state file: %s", state)
@ -1278,7 +1287,7 @@ class Session(object):
hook(state)
except Exception:
message = "exception occured when running %s state hook: %s" % (
coreapi.state_name(state),
EventTypes(self.state).name,
hook,
)
logging.exception(message)
@ -1549,11 +1558,10 @@ class Session(object):
# write current nodes out to session directory file
self.write_nodes()
# create control net interfaces and broker network tunnels
# create control net interfaces and network tunnels
# which need to exist for emane to sync on location events
# in distributed scenarios
self.add_remove_control_interface(node=None, remove=False)
self.broker.startup()
# initialize distributed tunnels
self.initialize_distributed()
@ -1566,9 +1574,6 @@ class Session(object):
self.boot_nodes()
self.mobility.startup()
# set broker local instantiation to complete
self.broker.local_instantiation_complete()
# notify listeners that instantiation is complete
event = EventData(event_type=EventTypes.INSTANTIATION_COMPLETE.value)
self.broadcast_event(event)
@ -1606,21 +1611,16 @@ class Session(object):
have entered runtime (time=0).
"""
# this is called from instantiate() after receiving an event message
# for the instantiation state, and from the broker when distributed
# nodes have been started
# for the instantiation state
logging.debug(
"session(%s) checking if not in runtime state, current state: %s",
self.id,
coreapi.state_name(self.state),
EventTypes(self.state).name,
)
if self.state == EventTypes.RUNTIME_STATE.value:
logging.info("valid runtime state found, returning")
return
# check to verify that all nodes and networks are running
if not self.broker.instantiation_complete():
return
# start event loop and set to runtime
self.event_loop.run()
self.set_state(EventTypes.RUNTIME_STATE, send_event=True)
@ -1830,37 +1830,11 @@ class Session(object):
except IndexError:
# no server name. possibly only one server
prefix = prefixes[0]
else:
# slave servers have their name and localhost in the serverlist
servers = self.broker.getservernames()
servers.remove("localhost")
prefix = None
for server_prefix in prefixes:
try:
# split each entry into server and prefix
server, p = server_prefix.split(":")
except ValueError:
server = ""
p = None
if server == servers[0]:
# the server name in the list matches this server
prefix = p
break
if not prefix:
logging.error(
"control network prefix not found for server: %s", servers[0]
)
assign_address = False
try:
prefix = prefixes[0].split(":", 1)[1]
except IndexError:
prefix = prefixes[0]
# len(prefixes) == 1
else:
# TODO: can we get the server name from the servers.conf or from the node assignments?
# TODO: can we get the server name from the servers.conf or from the node
# assignments?o
# with one prefix, only master gets a ctrlnet address
assign_address = self.master
prefix = prefixes[0]
@ -1882,13 +1856,6 @@ class Session(object):
serverintf=server_interface,
)
# tunnels between controlnets will be built with Broker.addnettunnels()
# TODO: potentially remove documentation saying node ids are ints
# TODO: need to move broker code out of the session object
self.broker.addnet(_id)
for server in self.broker.getservers():
self.broker.addnodemap(server, _id)
return control_net
def add_remove_control_interface(