quick base to try and help flesh out documentation under core.*

This commit is contained in:
Blake J. Harnden 2017-05-03 09:30:49 -07:00
parent 8f45e5c4da
commit 4ae7958a63
15 changed files with 1956 additions and 292 deletions

View file

@ -1,9 +1,7 @@
"""
broker.py: definition of CoreBroker class that is part of the
pycore session object. Handles distributing parts of the emulation out to
other emulation servers. The broker is consulted during the
CoreRequestHandler.handlemsg() loop to determine if messages should be handled
locally or forwarded on to another emulation server.
Broker class that is part of the session object. Handles distributing parts of the emulation out to
other emulation servers. The broker is consulted when handling messages to determine if messages
should be handled locally or forwarded on to another emulation server.
"""
import os
@ -40,7 +38,17 @@ logger = log.get_logger(__name__)
# TODO: name conflict with main core server, probably should rename
class CoreServer(object):
"""
Reptesents CORE daemon servers for communication.
"""
def __init__(self, name, host, port):
"""
Creates a CoreServer instance.
:param str name: name of the CORE server
:param str host: server address
:param int port: server port
"""
self.name = name
self.host = host
self.port = port
@ -48,17 +56,28 @@ class CoreServer(object):
self.instantiation_complete = False
def connect(self):
"""
Connect to CORE server and save connection.
:return: nothing
"""
assert self.sock is None
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((self.host, self.port))
except:
except IOError as e:
sock.close()
raise
raise e
self.sock = sock
def close(self):
"""
Close connection with CORE server.
:return: nothing
"""
if self.sock is not None:
self.sock.close()
self.sock = None
@ -66,9 +85,13 @@ class CoreServer(object):
class CoreBroker(ConfigurableManager):
"""
Member of pycore session class for handling global emulation server data.
Helps with brokering messages between CORE daemon servers.
"""
# configurable manager name
name = "broker"
# configurable manager type
config_type = RegisterTlvs.UTILITY.value
def __init__(self, session):
@ -104,6 +127,7 @@ class CoreBroker(ConfigurableManager):
self.tunnels = {}
self.dorecvloop = False
self.recvthread = None
self.bootcount = 0
def startup(self):
"""
@ -151,7 +175,7 @@ class CoreBroker(ConfigurableManager):
def startrecvloop(self):
"""
Spawn the recvloop() thread if it hasn"t been already started.
Spawn the receive loop for receiving messages.
"""
if self.recvthread is not None:
if self.recvthread.isAlive():
@ -166,7 +190,7 @@ class CoreBroker(ConfigurableManager):
def recvloop(self):
"""
Thread target that receives messages from server sockets.
Receive loop for receiving messages from server sockets.
"""
self.dorecvloop = True
# note: this loop continues after emulation is stopped,
@ -250,9 +274,13 @@ class CoreBroker(ConfigurableManager):
def addserver(self, name, host, port):
"""
Add a new server, and try to connect to it. If we"re already
connected to this (host, port), then leave it alone. When host,port
is None, do not try to connect.
Add a new server, and try to connect to it. If we"re already connected to this
(host, port), then leave it alone. When host,port is None, do not try to connect.
:param str name: name of server
:param str host: server address
:param int port: server port
:return: nothing
"""
with self.servers_lock:
server = self.servers.get(name)
@ -282,14 +310,15 @@ class CoreBroker(ConfigurableManager):
Remove a server and hang up any connection.
:param CoreServer server: server to delete
:return:
:return: nothing
"""
with self.servers_lock:
try:
s = self.servers.pop(server.name)
assert s == server
except KeyError:
pass
logger.exception("error deleting server")
if server.sock is not None:
logger.info("closing connection with %s @ %s:%s" % (server.name, server.host, server.port))
server.close()
@ -322,6 +351,9 @@ class CoreBroker(ConfigurableManager):
def getservers(self):
"""
Return a list of servers sorted by name.
:return: sorted server list
:rtype: list
"""
with self.servers_lock:
return sorted(self.servers.values(), key=lambda x: x.name)
@ -329,6 +361,9 @@ class CoreBroker(ConfigurableManager):
def getservernames(self):
"""
Return a sorted list of server names (keys from self.servers).
:return: sorted server names
:rtype: list
"""
with self.servers_lock:
return sorted(self.servers.keys())
@ -338,6 +373,11 @@ class CoreBroker(ConfigurableManager):
Compute a 32-bit key used to uniquely identify a GRE tunnel.
The hash(n1num), hash(n2num) values are used, so node numbers may be
None or string values (used for e.g. "ctrlnet").
:param int n1num: node one id
:param int n2num: node two id
:return: tunnel key for the node pair
:rtype: int
"""
sid = self.session_id_master
if sid is None:
@ -349,7 +389,13 @@ class CoreBroker(ConfigurableManager):
def addtunnel(self, remoteip, n1num, n2num, localnum):
"""
Add a new GreTapBridge between nodes on two different machines.
Adds a new GreTapBridge between nodes on two different machines.
:param str remoteip: remote address for tunnel
:param int n1num: node one id
:param int n2num: node two id
:param int localnum: local id
:return: nothing
"""
key = self.tunnelkey(n1num, n2num)
if localnum == n2num:
@ -380,27 +426,37 @@ class CoreBroker(ConfigurableManager):
for n in self.network_nodes:
self.addnettunnel(n)
def addnettunnel(self, n):
def addnettunnel(self, node):
"""
Add network tunnel between node and broker.
:param node: node to add network tunnel to
:return: list of grep taps
:rtype: list
"""
try:
net = self.session.get_object(n)
net = self.session.get_object(node)
except KeyError:
raise KeyError("network node %s not found" % n)
raise KeyError("network node %s not found" % node)
# add other nets here that do not require tunnels
if nodeutils.is_node(net, NodeTypes.EMANE_NET):
return None
if nodeutils.is_node(net, NodeTypes.CONTROL_NET):
if hasattr(net, "serverintf"):
if net.serverintf is not None:
return None
servers = self.getserversbynode(n)
server_interface = getattr(net, "serverintf", None)
if nodeutils.is_node(net, NodeTypes.CONTROL_NET) and server_interface is not None:
return None
servers = self.getserversbynode(node)
if len(servers) < 2:
return None
hosts = []
for server in servers:
if server.host is None:
continue
hosts.append(server.host)
if len(hosts) == 0 and self.session_handler.client_address != "":
# get IP address from API message sender (master)
hosts.append(self.session_handler.client_address[0])
@ -413,21 +469,26 @@ class CoreBroker(ConfigurableManager):
else:
# we are the session master
myip = host
key = self.tunnelkey(n, IpAddress.to_int(myip))
key = self.tunnelkey(node, IpAddress.to_int(myip))
if key in self.tunnels.keys():
continue
logger.info("Adding tunnel for net %s to %s with key %s" % (n, host, key))
logger.info("Adding tunnel for net %s to %s with key %s" % (node, host, key))
gt = GreTap(node=None, name=None, session=self.session, remoteip=host, key=key)
self.tunnels[key] = gt
r.append(gt)
# attaching to net will later allow gt to be destroyed
# during net.shutdown()
net.attach(gt)
return r
def deltunnel(self, n1num, n2num):
"""
Cleanup of the GreTapBridge.
Delete tunnel between nodes.
:param int n1num: node one id
:param int n2num: node two id
:return: nothing
"""
key = self.tunnelkey(n1num, n2num)
try:
@ -441,6 +502,10 @@ class CoreBroker(ConfigurableManager):
def gettunnel(self, n1num, n2num):
"""
Return the GreTap between two nodes if it exists.
:param int n1num: node one id
:param int n2num: node two id
:return: gre tap between nodes or none
"""
key = self.tunnelkey(n1num, n2num)
if key in self.tunnels.keys():
@ -451,6 +516,10 @@ class CoreBroker(ConfigurableManager):
def addnodemap(self, server, nodenum):
"""
Record a node number to emulation server mapping.
:param CoreServer server: core server to associate node with
:param int nodenum: node id
:return: nothing
"""
with self.nodemap_lock:
if nodenum in self.nodemap:
@ -459,6 +528,7 @@ class CoreBroker(ConfigurableManager):
self.nodemap[nodenum].add(server)
else:
self.nodemap[nodenum] = {server}
if server in self.nodecounts:
self.nodecounts[server] += 1
else:
@ -468,21 +538,32 @@ class CoreBroker(ConfigurableManager):
"""
Remove a node number to emulation server mapping.
Return the number of nodes left on this server.
:param CoreServer server: server to remove from node map
:param int nodenum: node id
:return: number of nodes left on server
:rtype: int
"""
count = None
with self.nodemap_lock:
if nodenum not in self.nodemap:
return count
self.nodemap[nodenum].remove(server)
if server in self.nodecounts:
count = self.nodecounts[server]
count -= 1
self.nodecounts[server] = count
return count
def getserversbynode(self, nodenum):
"""
Retrieve a set of emulation servers given a node number.
:param int nodenum: node id
:return: core server associated with node
:rtype: set
"""
with self.nodemap_lock:
if nodenum not in self.nodemap:
@ -492,12 +573,18 @@ class CoreBroker(ConfigurableManager):
def addnet(self, nodenum):
"""
Add a node number to the list of link-layer nodes.
:param int nodenum: node id to add
:return: nothing
"""
self.network_nodes.add(nodenum)
def addphys(self, nodenum):
"""
Add a node number to the list of physical nodes.
:param int nodenum: node id to add
:return: nothing
"""
self.physical_nodes.add(nodenum)
@ -507,7 +594,7 @@ class CoreBroker(ConfigurableManager):
arrive and require the use of nodecounts.
:param core.conf.ConfigData config_data: configuration data for carrying out a configuration
:return: None
:return: nothing
"""
return None
@ -517,7 +604,7 @@ class CoreBroker(ConfigurableManager):
combinations that we"ll need to connect with.
:param core.conf.ConfigData config_data: configuration data for carrying out a configuration
:return: None
:return: nothing
"""
values = config_data.data_values
session_id = config_data.session
@ -614,8 +701,10 @@ class CoreBroker(ConfigurableManager):
def setupserver(self, servername):
"""
Send the appropriate API messages for configuring the specified
emulation server.
Send the appropriate API messages for configuring the specified emulation server.
:param str servername: name of server to configure
:return: nothing
"""
server = self.getserverbyname(servername)
if server is None:
@ -647,6 +736,11 @@ class CoreBroker(ConfigurableManager):
"""
When an interactive TTY request comes from the GUI, snoop the reply
and add an SSH command to the appropriate remote server.
:param msghdr: message header
:param msgdata: message data
:param str host: host address
:return: packed core execute tlv data
"""
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
msgcls = coreapi.CLASS_MAP[msgtype]
@ -674,13 +768,16 @@ class CoreBroker(ConfigurableManager):
nodes to servers.
:param core.api.coreapi.CoreMessage message: message to handle
:return:
:return: boolean for handling locally and set of servers
:rtype: tuple
"""
servers = set()
handle_locally = False
serverfiletxt = None
# snoop Node Message for emulation server TLV and record mapping
n = message.tlv_data[NodeTlvs.NUMBER.value]
# replicate link-layer nodes on all servers
nodetype = message.get_tlv(NodeTlvs.TYPE.value)
if nodetype is not None:
@ -720,9 +817,11 @@ class CoreBroker(ConfigurableManager):
servers.add(server)
if serverfiletxt and self.session.master:
self.writenodeserver(serverfiletxt, server)
# hook to update coordinates of physical nodes
if n in self.physical_nodes:
self.session.mobility.physnodeupdateposition(message)
return handle_locally, servers
def handlelinkmsg(self, message):
@ -732,7 +831,8 @@ class CoreBroker(ConfigurableManager):
opaque data to the link message before forwarding.
:param core.api.coreapi.CoreMessage message: message to handle
:return:
:return: boolean to handle locally, a set of server, and message
:rtype: tuple
"""
servers = set()
handle_locally = False
@ -799,7 +899,8 @@ class CoreBroker(ConfigurableManager):
:param core.api.coreapi.CoreMessage message: message to link end points
:param servers1:
:param servers2:
:return:
:return: core link message
:rtype: coreapi.CoreLinkMessage
"""
ip1 = ""
for server in servers1:
@ -823,6 +924,11 @@ class CoreBroker(ConfigurableManager):
and we need to determine the tunnel endpoint. First look for
opaque data in the link message, otherwise use the IP of the message
sender (the master server).
:param coreapi.CoreLinkMessage msg:
:param bool first_is_local: is first local
:return: host address
:rtype: str
"""
host = None
opaque = msg.get_tlv(LinkTlvs.OPAQUE.value)
@ -840,7 +946,11 @@ class CoreBroker(ConfigurableManager):
def handlerawmsg(self, msg):
"""
Helper to invoke handlemsg() using raw (packed) message bytes.
Helper to invoke message handler, using raw (packed) message bytes.
:param msg: raw message butes
:return: should handle locally or not
:rtype: bool
"""
hdr = msg[:coreapi.CoreMessage.header_len]
msgtype, flags, msglen = coreapi.CoreMessage.unpack_header(hdr)
@ -856,7 +966,8 @@ class CoreBroker(ConfigurableManager):
:param core.api.coreapi.CoreMessage message: message to forward
:param list servers: server to forward message to
:return:
:return: handle locally value
:rtype: bool
"""
handle_locally = len(servers) == 0
for server in servers:
@ -874,6 +985,8 @@ class CoreBroker(ConfigurableManager):
"""
Write the server list to a text file in the session directory upon
startup: /tmp/pycore.nnnnn/servers
:return: nothing
"""
servers = self.getservers()
filename = os.path.join(self.session.session_dir, "servers")
@ -900,6 +1013,10 @@ class CoreBroker(ConfigurableManager):
and server info. This may be used by scripts for accessing nodes on
other machines, much like local nodes may be accessed via the
VnodeClient class.
:param str nodestr: node string
:param CoreServer server: core server
:return: nothing
"""
serverstr = "%s %s %s" % (server.name, server.host, server.port)
name = nodestr.split()[1]
@ -920,6 +1037,8 @@ class CoreBroker(ConfigurableManager):
def local_instantiation_complete(self):
"""
Set the local server"s instantiation-complete status to True.
:return: nothing
"""
# TODO: do we really want to allow a localhost to not exist?
with self.servers_lock:
@ -937,6 +1056,9 @@ class CoreBroker(ConfigurableManager):
"""
Return True if all servers have completed instantiation, False
otherwise.
:return: have all server completed instantiation
:rtype: bool
"""
with self.servers_lock:
for server in self.servers.itervalues():