1169 lines
43 KiB
Python
1169 lines
43 KiB
Python
"""
|
|
Broker class that is part of the session object. Handles distributing parts of the emulation out to
|
|
other emulation servers. The broker is consulted when handling messages to determine if messages
|
|
should be handled locally or forwarded on to another emulation server.
|
|
"""
|
|
|
|
import os
|
|
import select
|
|
import socket
|
|
import threading
|
|
|
|
from core import logger
|
|
from core.api import coreapi
|
|
from core.conf import ConfigurableManager
|
|
from core.coreobj import PyCoreNet
|
|
from core.coreobj import PyCoreNode
|
|
from core.enumerations import ConfigDataTypes
|
|
from core.enumerations import ConfigFlags
|
|
from core.enumerations import ConfigTlvs
|
|
from core.enumerations import EventTlvs
|
|
from core.enumerations import EventTypes
|
|
from core.enumerations import ExecuteTlvs
|
|
from core.enumerations import FileTlvs
|
|
from core.enumerations import LinkTlvs
|
|
from core.enumerations import MessageFlags
|
|
from core.enumerations import MessageTypes
|
|
from core.enumerations import NodeTlvs
|
|
from core.enumerations import NodeTypes
|
|
from core.enumerations import RegisterTlvs
|
|
from core.misc import nodeutils
|
|
from core.misc.ipaddress import IpAddress
|
|
from core.netns.vif import GreTap
|
|
from core.netns.vnet import GreTapBridge
|
|
from core.phys.pnodes import PhysicalNode
|
|
|
|
|
|
class CoreDistributedServer(object):
|
|
"""
|
|
Represents CORE daemon servers for communication.
|
|
"""
|
|
|
|
def __init__(self, name, host, port):
|
|
"""
|
|
Creates a CoreServer instance.
|
|
|
|
:param str name: name of the CORE server
|
|
:param str host: server address
|
|
:param int port: server port
|
|
"""
|
|
self.name = name
|
|
self.host = host
|
|
self.port = port
|
|
self.sock = None
|
|
self.instantiation_complete = False
|
|
|
|
def connect(self):
|
|
"""
|
|
Connect to CORE server and save connection.
|
|
|
|
:return: nothing
|
|
"""
|
|
assert self.sock is None
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
try:
|
|
sock.connect((self.host, self.port))
|
|
except IOError as e:
|
|
sock.close()
|
|
raise e
|
|
|
|
self.sock = sock
|
|
|
|
def close(self):
|
|
"""
|
|
Close connection with CORE server.
|
|
|
|
:return: nothing
|
|
"""
|
|
if self.sock is not None:
|
|
self.sock.close()
|
|
self.sock = None
|
|
|
|
|
|
class CoreBroker(ConfigurableManager):
|
|
"""
|
|
Helps with brokering messages between CORE daemon servers.
|
|
"""
|
|
|
|
# configurable manager name
|
|
name = "broker"
|
|
|
|
# configurable manager type
|
|
config_type = RegisterTlvs.UTILITY.value
|
|
|
|
def __init__(self, session):
|
|
"""
|
|
Creates a CoreBroker instance.
|
|
|
|
:param core.session.Session session: session this manager is tied to
|
|
:return: nothing
|
|
"""
|
|
|
|
ConfigurableManager.__init__(self)
|
|
self.session = session
|
|
self.session_clients = []
|
|
self.session_id_master = None
|
|
self.myip = None
|
|
# dict containing tuples of (host, port, sock)
|
|
self.servers = {}
|
|
self.servers_lock = threading.Lock()
|
|
self.addserver("localhost", None, None)
|
|
# dict containing node number to server name mapping
|
|
self.nodemap = {}
|
|
# this lock also protects self.nodecounts
|
|
self.nodemap_lock = threading.Lock()
|
|
# reference counts of nodes on servers
|
|
self.nodecounts = {}
|
|
# set of node numbers that are link-layer nodes (networks)
|
|
self.network_nodes = set()
|
|
# set of node numbers that are PhysicalNode nodes
|
|
self.physical_nodes = set()
|
|
# allows for other message handlers to process API messages (e.g. EMANE)
|
|
self.handlers = set()
|
|
self.handlers.add(self.handle_distributed)
|
|
# dict with tunnel key to tunnel device mapping
|
|
self.tunnels = {}
|
|
self.dorecvloop = False
|
|
self.recvthread = None
|
|
self.bootcount = 0
|
|
|
|
def startup(self):
|
|
"""
|
|
Build tunnels between network-layer nodes now that all node
|
|
and link information has been received; called when session
|
|
enters the instantation state.
|
|
"""
|
|
self.addnettunnels()
|
|
self.writeservers()
|
|
|
|
def shutdown(self):
|
|
"""
|
|
Close all active sockets; called when the session enters the
|
|
data collect state
|
|
"""
|
|
with self.servers_lock:
|
|
while len(self.servers) > 0:
|
|
name, server = self.servers.popitem()
|
|
if server.sock is not None:
|
|
logger.info("closing connection with %s @ %s:%s" %
|
|
(name, server.host, server.port))
|
|
server.close()
|
|
self.reset()
|
|
self.dorecvloop = False
|
|
if self.recvthread is not None:
|
|
self.recvthread.join()
|
|
|
|
def reset(self):
|
|
"""
|
|
Reset to initial state.
|
|
"""
|
|
logger.info("broker reset")
|
|
self.nodemap_lock.acquire()
|
|
self.nodemap.clear()
|
|
for server, count in self.nodecounts.iteritems():
|
|
if count < 1:
|
|
self.delserver(server)
|
|
self.nodecounts.clear()
|
|
self.bootcount = 0
|
|
self.nodemap_lock.release()
|
|
self.network_nodes.clear()
|
|
self.physical_nodes.clear()
|
|
while len(self.tunnels) > 0:
|
|
key, gt = self.tunnels.popitem()
|
|
gt.shutdown()
|
|
|
|
def startrecvloop(self):
|
|
"""
|
|
Spawn the receive loop for receiving messages.
|
|
"""
|
|
if self.recvthread is not None:
|
|
logger.info("server receive loop already started")
|
|
if self.recvthread.isAlive():
|
|
return
|
|
else:
|
|
self.recvthread.join()
|
|
# start reading data from connected sockets
|
|
logger.info("starting server receive loop")
|
|
self.dorecvloop = True
|
|
self.recvthread = threading.Thread(target=self.recvloop)
|
|
self.recvthread.daemon = True
|
|
self.recvthread.start()
|
|
|
|
def recvloop(self):
|
|
"""
|
|
Receive loop for receiving messages from server sockets.
|
|
"""
|
|
self.dorecvloop = True
|
|
# note: this loop continues after emulation is stopped,
|
|
# even with 0 servers
|
|
while self.dorecvloop:
|
|
rlist = []
|
|
with self.servers_lock:
|
|
# build a socket list for select call
|
|
for server in self.servers.itervalues():
|
|
if server.sock is not None:
|
|
rlist.append(server.sock)
|
|
r, w, x = select.select(rlist, [], [], 1.0)
|
|
for sock in r:
|
|
server = self.getserverbysock(sock)
|
|
logger.info("attempting to receive from server: peer:%s remote:%s",
|
|
server.sock.getpeername(), server.sock.getsockname())
|
|
if server is None:
|
|
# servers may have changed; loop again
|
|
continue
|
|
rcvlen = self.recv(server)
|
|
if rcvlen == 0:
|
|
logger.info("connection with %s @ %s:%s has closed" % (
|
|
server.name, server.host, server.port))
|
|
|
|
def recv(self, server):
|
|
"""
|
|
Receive data on an emulation server socket and broadcast it to
|
|
all connected session handlers. Returns the length of data recevied
|
|
and forwarded. Return value of zero indicates the socket has closed
|
|
and should be removed from the self.servers dict.
|
|
|
|
:param CoreDistributedServer server: server to receive from
|
|
:return: message length
|
|
:rtype: int
|
|
"""
|
|
msghdr = server.sock.recv(coreapi.CoreMessage.header_len)
|
|
if len(msghdr) == 0:
|
|
# server disconnected
|
|
logger.info("server disconnected, closing server")
|
|
server.close()
|
|
return 0
|
|
|
|
if len(msghdr) != coreapi.CoreMessage.header_len:
|
|
logger.info("warning: broker received not enough data len=%s" % len(msghdr))
|
|
return len(msghdr)
|
|
|
|
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
|
msgdata = server.sock.recv(msglen)
|
|
data = msghdr + msgdata
|
|
count = None
|
|
logger.info("received message type: %s", MessageTypes(msgtype))
|
|
# snoop exec response for remote interactive TTYs
|
|
if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value:
|
|
data = self.fixupremotetty(msghdr, msgdata, server.host)
|
|
logger.info("created remote tty message: %s", data)
|
|
elif msgtype == MessageTypes.NODE.value:
|
|
# snoop node delete response to decrement node counts
|
|
if msgflags & MessageFlags.DELETE.value:
|
|
msg = coreapi.CoreNodeMessage(msgflags, msghdr, msgdata)
|
|
nodenum = msg.get_tlv(NodeTlvs.NUMBER.value)
|
|
if nodenum is not None:
|
|
count = self.delnodemap(server, nodenum)
|
|
elif msgtype == MessageTypes.LINK.value:
|
|
# this allows green link lines for remote WLANs
|
|
msg = coreapi.CoreLinkMessage(msgflags, msghdr, msgdata)
|
|
self.session.sdt.handle_distributed(msg)
|
|
elif msgtype == MessageTypes.EVENT.value:
|
|
msg = coreapi.CoreEventMessage(msgflags, msghdr, msgdata)
|
|
eventtype = msg.get_tlv(EventTlvs.TYPE.value)
|
|
if eventtype == EventTypes.INSTANTIATION_COMPLETE.value:
|
|
server.instantiation_complete = True
|
|
if self.instantiation_complete():
|
|
self.session.check_runtime()
|
|
else:
|
|
logger.error("unknown message type received: %s", msgtype)
|
|
|
|
try:
|
|
for session_client in self.session_clients:
|
|
session_client.sendall(data)
|
|
except IOError:
|
|
logger.exception("error sending message")
|
|
|
|
if count is not None and count < 1:
|
|
return 0
|
|
else:
|
|
return len(data)
|
|
|
|
def addserver(self, name, host, port):
|
|
"""
|
|
Add a new server, and try to connect to it. If we"re already connected to this
|
|
(host, port), then leave it alone. When host,port is None, do not try to connect.
|
|
|
|
:param str name: name of server
|
|
:param str host: server address
|
|
:param int port: server port
|
|
:return: nothing
|
|
"""
|
|
with self.servers_lock:
|
|
server = self.servers.get(name)
|
|
if server is not None:
|
|
if host == server.host and port == server.port and \
|
|
server.sock is not None:
|
|
# leave this socket connected
|
|
return
|
|
|
|
logger.info("closing connection with %s @ %s:%s" % (name, server.host, server.port))
|
|
server.close()
|
|
del self.servers[name]
|
|
|
|
logger.info("adding server: %s @ %s:%s" % (name, host, port))
|
|
server = CoreDistributedServer(name, host, port)
|
|
if host is not None and port is not None:
|
|
try:
|
|
server.connect()
|
|
except IOError:
|
|
logger.exception("error connecting to server %s:%s" % (host, port))
|
|
if server.sock is not None:
|
|
self.startrecvloop()
|
|
self.servers[name] = server
|
|
|
|
def delserver(self, server):
|
|
"""
|
|
Remove a server and hang up any connection.
|
|
|
|
:param CoreDistributedServer server: server to delete
|
|
:return: nothing
|
|
"""
|
|
with self.servers_lock:
|
|
try:
|
|
s = self.servers.pop(server.name)
|
|
assert s == server
|
|
except KeyError:
|
|
logger.exception("error deleting server")
|
|
|
|
if server.sock is not None:
|
|
logger.info("closing connection with %s @ %s:%s" % (server.name, server.host, server.port))
|
|
server.close()
|
|
|
|
def getserverbyname(self, name):
|
|
"""
|
|
Return the server object having the given name, or None.
|
|
|
|
:param str name: name of server to retrieve
|
|
:return: server for given name
|
|
:rtype: CoreDistributedServer
|
|
"""
|
|
with self.servers_lock:
|
|
return self.servers.get(name)
|
|
|
|
def getserverbysock(self, sock):
|
|
"""
|
|
Return the server object corresponding to the given socket, or None.
|
|
|
|
:param sock: socket associated with a server
|
|
:return: core server associated wit the socket
|
|
:rtype: CoreDistributedServer
|
|
"""
|
|
with self.servers_lock:
|
|
for server in self.servers.itervalues():
|
|
if server.sock == sock:
|
|
return server
|
|
return None
|
|
|
|
def getservers(self):
|
|
"""
|
|
Return a list of servers sorted by name.
|
|
|
|
:return: sorted server list
|
|
:rtype: list
|
|
"""
|
|
with self.servers_lock:
|
|
return sorted(self.servers.values(), key=lambda x: x.name)
|
|
|
|
def getservernames(self):
|
|
"""
|
|
Return a sorted list of server names (keys from self.servers).
|
|
|
|
:return: sorted server names
|
|
:rtype: list
|
|
"""
|
|
with self.servers_lock:
|
|
return sorted(self.servers.keys())
|
|
|
|
def tunnelkey(self, n1num, n2num):
|
|
"""
|
|
Compute a 32-bit key used to uniquely identify a GRE tunnel.
|
|
The hash(n1num), hash(n2num) values are used, so node numbers may be
|
|
None or string values (used for e.g. "ctrlnet").
|
|
|
|
:param int n1num: node one id
|
|
:param int n2num: node two id
|
|
:return: tunnel key for the node pair
|
|
:rtype: int
|
|
"""
|
|
sid = self.session_id_master
|
|
if sid is None:
|
|
# this is the master session
|
|
sid = self.session.session_id
|
|
|
|
key = (sid << 16) ^ hash(n1num) ^ (hash(n2num) << 8)
|
|
return key & 0xFFFFFFFF
|
|
|
|
def addtunnel(self, remoteip, n1num, n2num, localnum):
|
|
"""
|
|
Adds a new GreTapBridge between nodes on two different machines.
|
|
|
|
:param str remoteip: remote address for tunnel
|
|
:param int n1num: node one id
|
|
:param int n2num: node two id
|
|
:param int localnum: local id
|
|
:return: nothing
|
|
"""
|
|
key = self.tunnelkey(n1num, n2num)
|
|
if localnum == n2num:
|
|
remotenum = n1num
|
|
else:
|
|
remotenum = n2num
|
|
|
|
if key in self.tunnels.keys():
|
|
logger.warn("tunnel with key %s (%s-%s) already exists!" % (key, n1num, n2num))
|
|
else:
|
|
objid = key & ((1 << 16) - 1)
|
|
logger.info("adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key)
|
|
if localnum in self.physical_nodes:
|
|
# no bridge is needed on physical nodes; use the GreTap directly
|
|
gt = GreTap(node=None, name=None, session=self.session,
|
|
remoteip=remoteip, key=key)
|
|
else:
|
|
gt = self.session.add_object(cls=GreTapBridge, objid=objid,
|
|
policy="ACCEPT", remoteip=remoteip, key=key)
|
|
gt.localnum = localnum
|
|
gt.remotenum = remotenum
|
|
self.tunnels[key] = gt
|
|
|
|
def addnettunnels(self):
|
|
"""
|
|
Add GreTaps between network devices on different machines.
|
|
The GreTapBridge is not used since that would add an extra bridge.
|
|
"""
|
|
logger.info("adding network tunnels for nodes: %s", self.network_nodes)
|
|
for n in self.network_nodes:
|
|
self.addnettunnel(n)
|
|
|
|
def addnettunnel(self, node_id):
|
|
"""
|
|
Add network tunnel between node and broker.
|
|
|
|
:param int node_id: node id of network to add tunnel to
|
|
:return: list of gre taps
|
|
:rtype: list
|
|
"""
|
|
try:
|
|
net = self.session.get_object(node_id)
|
|
logger.info("adding net tunnel for: id(%s) %s", node_id, net)
|
|
except KeyError:
|
|
raise KeyError("network node %s not found" % node_id)
|
|
|
|
# add other nets here that do not require tunnels
|
|
if nodeutils.is_node(net, NodeTypes.EMANE_NET):
|
|
logger.warn("emane network does not require a tunnel")
|
|
return None
|
|
|
|
server_interface = getattr(net, "serverintf", None)
|
|
if nodeutils.is_node(net, NodeTypes.CONTROL_NET) and server_interface is not None:
|
|
logger.warn("control networks with server interfaces do not need a tunnel")
|
|
return None
|
|
|
|
servers = self.getserversbynode(node_id)
|
|
if len(servers) < 2:
|
|
logger.warn("not enough servers to create a tunnel: %s", servers)
|
|
return None
|
|
|
|
hosts = []
|
|
for server in servers:
|
|
if server.host is None:
|
|
continue
|
|
logger.info("adding server host for net tunnel: %s", server.host)
|
|
hosts.append(server.host)
|
|
|
|
if len(hosts) == 0:
|
|
for session_client in self.session_clients:
|
|
# get IP address from API message sender (master)
|
|
if session_client.client_address != "":
|
|
address = session_client.client_address[0]
|
|
logger.info("adding session_client host: %s", address)
|
|
hosts.append(address)
|
|
|
|
r = []
|
|
for host in hosts:
|
|
if self.myip:
|
|
# we are the remote emulation server
|
|
myip = self.myip
|
|
else:
|
|
# we are the session master
|
|
myip = host
|
|
key = self.tunnelkey(node_id, IpAddress.to_int(myip))
|
|
if key in self.tunnels.keys():
|
|
logger.info("tunnel already exists, returning existing tunnel: %s", key)
|
|
gt = self.tunnels[key]
|
|
r.append(gt)
|
|
continue
|
|
logger.info("adding tunnel for net %s to %s with key %s" % (node_id, host, key))
|
|
gt = GreTap(node=None, name=None, session=self.session, remoteip=host, key=key)
|
|
self.tunnels[key] = gt
|
|
r.append(gt)
|
|
# attaching to net will later allow gt to be destroyed
|
|
# during net.shutdown()
|
|
net.attach(gt)
|
|
|
|
return r
|
|
|
|
def deltunnel(self, n1num, n2num):
|
|
"""
|
|
Delete tunnel between nodes.
|
|
|
|
:param int n1num: node one id
|
|
:param int n2num: node two id
|
|
:return: nothing
|
|
"""
|
|
key = self.tunnelkey(n1num, n2num)
|
|
try:
|
|
logger.info("deleting tunnel between %s - %s with key: %s", n1num, n2num, key)
|
|
gt = self.tunnels.pop(key)
|
|
except KeyError:
|
|
gt = None
|
|
if gt:
|
|
self.session.delete_object(gt.objid)
|
|
del gt
|
|
|
|
def gettunnel(self, n1num, n2num):
|
|
"""
|
|
Return the GreTap between two nodes if it exists.
|
|
|
|
:param int n1num: node one id
|
|
:param int n2num: node two id
|
|
:return: gre tap between nodes or none
|
|
"""
|
|
key = self.tunnelkey(n1num, n2num)
|
|
logger.info("checking for tunnel(%s) in: %s", key, self.tunnels.keys())
|
|
if key in self.tunnels.keys():
|
|
return self.tunnels[key]
|
|
else:
|
|
return None
|
|
|
|
def addnodemap(self, server, nodenum):
|
|
"""
|
|
Record a node number to emulation server mapping.
|
|
|
|
:param CoreDistributedServer server: core server to associate node with
|
|
:param int nodenum: node id
|
|
:return: nothing
|
|
"""
|
|
with self.nodemap_lock:
|
|
if nodenum in self.nodemap:
|
|
if server in self.nodemap[nodenum]:
|
|
return
|
|
self.nodemap[nodenum].add(server)
|
|
else:
|
|
self.nodemap[nodenum] = {server}
|
|
|
|
if server in self.nodecounts:
|
|
self.nodecounts[server] += 1
|
|
else:
|
|
self.nodecounts[server] = 1
|
|
|
|
def delnodemap(self, server, nodenum):
|
|
"""
|
|
Remove a node number to emulation server mapping.
|
|
Return the number of nodes left on this server.
|
|
|
|
:param CoreDistributedServer server: server to remove from node map
|
|
:param int nodenum: node id
|
|
:return: number of nodes left on server
|
|
:rtype: int
|
|
"""
|
|
count = None
|
|
with self.nodemap_lock:
|
|
if nodenum not in self.nodemap:
|
|
return count
|
|
|
|
self.nodemap[nodenum].remove(server)
|
|
if server in self.nodecounts:
|
|
count = self.nodecounts[server]
|
|
count -= 1
|
|
self.nodecounts[server] = count
|
|
|
|
return count
|
|
|
|
def getserversbynode(self, nodenum):
|
|
"""
|
|
Retrieve a set of emulation servers given a node number.
|
|
|
|
:param int nodenum: node id
|
|
:return: core server associated with node
|
|
:rtype: set
|
|
"""
|
|
with self.nodemap_lock:
|
|
if nodenum not in self.nodemap:
|
|
return set()
|
|
return self.nodemap[nodenum]
|
|
|
|
def addnet(self, nodenum):
|
|
"""
|
|
Add a node number to the list of link-layer nodes.
|
|
|
|
:param int nodenum: node id to add
|
|
:return: nothing
|
|
"""
|
|
logger.info("adding net to broker: %s", nodenum)
|
|
self.network_nodes.add(nodenum)
|
|
logger.info("broker network nodes: %s", self.network_nodes)
|
|
|
|
def addphys(self, nodenum):
|
|
"""
|
|
Add a node number to the list of physical nodes.
|
|
|
|
:param int nodenum: node id to add
|
|
:return: nothing
|
|
"""
|
|
self.physical_nodes.add(nodenum)
|
|
|
|
def configure_reset(self, config_data):
|
|
"""
|
|
Ignore reset messages, because node delete responses may still
|
|
arrive and require the use of nodecounts.
|
|
|
|
:param core.conf.ConfigData config_data: configuration data for carrying out a configuration
|
|
:return: nothing
|
|
"""
|
|
return None
|
|
|
|
def configure_values(self, config_data):
|
|
"""
|
|
Receive configuration message with a list of server:host:port
|
|
combinations that we"ll need to connect with.
|
|
|
|
:param core.conf.ConfigData config_data: configuration data for carrying out a configuration
|
|
:return: nothing
|
|
"""
|
|
values = config_data.data_values
|
|
session_id = config_data.session
|
|
|
|
if values is None:
|
|
logger.info("emulation server data missing")
|
|
return None
|
|
values = values.split("|")
|
|
|
|
# string of "server:ip:port,server:ip:port,..."
|
|
server_strings = values[0]
|
|
server_list = server_strings.split(",")
|
|
|
|
for server in server_list:
|
|
server_items = server.split(":")
|
|
(name, host, port) = server_items[:3]
|
|
|
|
if host == "":
|
|
host = None
|
|
|
|
if port == "":
|
|
port = None
|
|
else:
|
|
port = int(port)
|
|
|
|
if session_id is not None:
|
|
# receive session ID and my IP from master
|
|
self.session_id_master = int(session_id.split("|")[0])
|
|
self.myip = host
|
|
host = None
|
|
port = None
|
|
|
|
# this connects to the server immediately; maybe we should wait
|
|
# or spin off a new "client" thread here
|
|
self.addserver(name, host, port)
|
|
self.setupserver(name)
|
|
|
|
return None
|
|
|
|
def handle_message(self, message):
|
|
"""
|
|
Handle an API message. Determine whether this needs to be handled
|
|
by the local server or forwarded on to another one.
|
|
Returns True when message does not need to be handled locally,
|
|
and performs forwarding if required.
|
|
Returning False indicates this message should be handled locally.
|
|
|
|
:param core.api.coreapi.CoreMessage message: message to handle
|
|
:return: true or false for handling locally
|
|
:rtype: bool
|
|
"""
|
|
servers = set()
|
|
handle_locally = False
|
|
# Do not forward messages when in definition state
|
|
# (for e.g. configuring services)
|
|
if self.session.state == EventTypes.DEFINITION_STATE.value:
|
|
return False
|
|
|
|
# Decide whether message should be handled locally or forwarded, or both
|
|
if message.message_type == MessageTypes.NODE.value:
|
|
handle_locally, servers = self.handlenodemsg(message)
|
|
elif message.message_type == MessageTypes.EVENT.value:
|
|
# broadcast events everywhere
|
|
servers = self.getservers()
|
|
elif message.message_type == MessageTypes.CONFIG.value:
|
|
# broadcast location and services configuration everywhere
|
|
confobj = message.get_tlv(ConfigTlvs.OBJECT.value)
|
|
if confobj == "location" or confobj == "services" or \
|
|
confobj == "session" or confobj == "all":
|
|
servers = self.getservers()
|
|
elif message.message_type == MessageTypes.FILE.value:
|
|
# broadcast hook scripts and custom service files everywhere
|
|
filetype = message.get_tlv(FileTlvs.TYPE.value)
|
|
if filetype is not None and (filetype[:5] == "hook:" or filetype[:8] == "service:"):
|
|
servers = self.getservers()
|
|
if message.message_type == MessageTypes.LINK.value:
|
|
# prepare a server list from two node numbers in link message
|
|
handle_locally, servers, message = self.handlelinkmsg(message)
|
|
logger.info("broker handle link message: %s - %s", handle_locally,
|
|
map(lambda x: "%s:%s" % (x.host, x.port), servers))
|
|
elif len(servers) == 0:
|
|
# check for servers based on node numbers in all messages but link
|
|
nn = message.node_numbers()
|
|
if len(nn) == 0:
|
|
return False
|
|
servers = self.getserversbynode(nn[0])
|
|
|
|
# allow other handlers to process this message (this is used
|
|
# by e.g. EMANE to use the link add message to keep counts of
|
|
# interfaces on other servers)
|
|
for handler in self.handlers:
|
|
handler(message)
|
|
|
|
# perform any message forwarding
|
|
handle_locally |= self.forwardmsg(message, servers)
|
|
return not handle_locally
|
|
|
|
def setupserver(self, servername):
|
|
"""
|
|
Send the appropriate API messages for configuring the specified emulation server.
|
|
|
|
:param str servername: name of server to configure
|
|
:return: nothing
|
|
"""
|
|
server = self.getserverbyname(servername)
|
|
if server is None:
|
|
logger.warn("ignoring unknown server: %s" % servername)
|
|
return
|
|
if server.sock is None or server.host is None or server.port is None:
|
|
logger.info("ignoring disconnected server: %s" % servername)
|
|
return
|
|
|
|
# communicate this session"s current state to the server
|
|
tlvdata = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, self.session.state)
|
|
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
|
server.sock.send(msg)
|
|
|
|
# send a Configuration message for the broker object and inform the
|
|
# server of its local name
|
|
tlvdata = ""
|
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "broker")
|
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, ConfigFlags.UPDATE.value)
|
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.DATA_TYPES.value, (ConfigDataTypes.STRING.value,))
|
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.VALUES.value,
|
|
"%s:%s:%s" % (server.name, server.host, server.port))
|
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.SESSION.value, "%s" % self.session.session_id)
|
|
msg = coreapi.CoreConfMessage.pack(0, tlvdata)
|
|
server.sock.send(msg)
|
|
|
|
@staticmethod
|
|
def fixupremotetty(msghdr, msgdata, host):
|
|
"""
|
|
When an interactive TTY request comes from the GUI, snoop the reply
|
|
and add an SSH command to the appropriate remote server.
|
|
|
|
:param msghdr: message header
|
|
:param msgdata: message data
|
|
:param str host: host address
|
|
:return: packed core execute tlv data
|
|
"""
|
|
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
|
msgcls = coreapi.CLASS_MAP[msgtype]
|
|
msg = msgcls(msgflags, msghdr, msgdata)
|
|
|
|
nodenum = msg.get_tlv(ExecuteTlvs.NODE.value)
|
|
execnum = msg.get_tlv(ExecuteTlvs.NUMBER.value)
|
|
cmd = msg.get_tlv(ExecuteTlvs.COMMAND.value)
|
|
res = msg.get_tlv(ExecuteTlvs.RESULT.value)
|
|
|
|
tlvdata = ""
|
|
tlvdata += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, nodenum)
|
|
tlvdata += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, execnum)
|
|
tlvdata += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, cmd)
|
|
title = "\\\"CORE: n%s @ %s\\\"" % (nodenum, host)
|
|
res = "ssh -X -f " + host + " xterm -e " + res
|
|
tlvdata += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res)
|
|
|
|
return coreapi.CoreExecMessage.pack(msgflags, tlvdata)
|
|
|
|
def handlenodemsg(self, message):
|
|
"""
|
|
Determine and return the servers to which this node message should
|
|
be forwarded. Also keep track of link-layer nodes and the mapping of
|
|
nodes to servers.
|
|
|
|
:param core.api.coreapi.CoreMessage message: message to handle
|
|
:return: boolean for handling locally and set of servers
|
|
:rtype: tuple
|
|
"""
|
|
servers = set()
|
|
handle_locally = False
|
|
serverfiletxt = None
|
|
|
|
# snoop Node Message for emulation server TLV and record mapping
|
|
n = message.tlv_data[NodeTlvs.NUMBER.value]
|
|
|
|
# replicate link-layer nodes on all servers
|
|
nodetype = message.get_tlv(NodeTlvs.TYPE.value)
|
|
if nodetype is not None:
|
|
try:
|
|
nodecls = nodeutils.get_node_class(NodeTypes(nodetype))
|
|
except KeyError:
|
|
logger.warn("broker invalid node type %s" % nodetype)
|
|
return handle_locally, servers
|
|
if nodecls is None:
|
|
logger.warn("broker unimplemented node type %s" % nodetype)
|
|
return handle_locally, servers
|
|
if issubclass(nodecls, PyCoreNet) and nodetype != NodeTypes.WIRELESS_LAN.value:
|
|
# network node replicated on all servers; could be optimized
|
|
# don"t replicate WLANs, because ebtables rules won"t work
|
|
servers = self.getservers()
|
|
handle_locally = True
|
|
self.addnet(n)
|
|
for server in servers:
|
|
self.addnodemap(server, n)
|
|
# do not record server name for networks since network
|
|
# nodes are replicated across all server
|
|
return handle_locally, servers
|
|
elif issubclass(nodecls, PyCoreNode):
|
|
name = message.get_tlv(NodeTlvs.NAME.value)
|
|
if name:
|
|
serverfiletxt = "%s %s %s" % (n, name, nodecls)
|
|
if issubclass(nodecls, PhysicalNode):
|
|
# remember physical nodes
|
|
self.addphys(n)
|
|
|
|
# emulation server TLV specifies server
|
|
servername = message.get_tlv(NodeTlvs.EMULATION_SERVER.value)
|
|
server = self.getserverbyname(servername)
|
|
if server is not None:
|
|
self.addnodemap(server, n)
|
|
if server not in servers:
|
|
servers.add(server)
|
|
if serverfiletxt and self.session.master:
|
|
self.writenodeserver(serverfiletxt, server)
|
|
|
|
# hook to update coordinates of physical nodes
|
|
if n in self.physical_nodes:
|
|
self.session.mobility.physnodeupdateposition(message)
|
|
|
|
return handle_locally, servers
|
|
|
|
def handlelinkmsg(self, message):
|
|
"""
|
|
Determine and return the servers to which this link message should
|
|
be forwarded. Also build tunnels between different servers or add
|
|
opaque data to the link message before forwarding.
|
|
|
|
:param core.api.coreapi.CoreMessage message: message to handle
|
|
:return: boolean to handle locally, a set of server, and message
|
|
:rtype: tuple
|
|
"""
|
|
servers = set()
|
|
handle_locally = False
|
|
|
|
# determine link message destination using non-network nodes
|
|
nn = message.node_numbers()
|
|
logger.info("checking link nodes (%s) with network nodes (%s)", nn, self.network_nodes)
|
|
if nn[0] in self.network_nodes:
|
|
if nn[1] in self.network_nodes:
|
|
# two network nodes linked together - prevent loops caused by
|
|
# the automatic tunnelling
|
|
handle_locally = True
|
|
else:
|
|
servers = self.getserversbynode(nn[1])
|
|
elif nn[1] in self.network_nodes:
|
|
servers = self.getserversbynode(nn[0])
|
|
else:
|
|
logger.info("link nodes are not network nodes")
|
|
servers1 = self.getserversbynode(nn[0])
|
|
logger.info("servers for node(%s): %s", nn[0], servers1)
|
|
servers2 = self.getserversbynode(nn[1])
|
|
logger.info("servers for node(%s): %s", nn[1], servers2)
|
|
# nodes are on two different servers, build tunnels as needed
|
|
if servers1 != servers2:
|
|
localn = None
|
|
if len(servers1) == 0 or len(servers2) == 0:
|
|
handle_locally = True
|
|
servers = servers1.union(servers2)
|
|
host = None
|
|
# get the IP of remote server and decide which node number
|
|
# is for a local node
|
|
for server in servers:
|
|
host = server.host
|
|
if host is None:
|
|
# server is local
|
|
handle_locally = True
|
|
if server in servers1:
|
|
localn = nn[0]
|
|
else:
|
|
localn = nn[1]
|
|
if handle_locally and localn is None:
|
|
# having no local node at this point indicates local node is
|
|
# the one with the empty server set
|
|
if len(servers1) == 0:
|
|
localn = nn[0]
|
|
elif len(servers2) == 0:
|
|
localn = nn[1]
|
|
if host is None:
|
|
host = self.getlinkendpoint(message, localn == nn[0])
|
|
|
|
logger.info("handle locally(%s) and local node(%s)", handle_locally, localn)
|
|
if localn is None:
|
|
message = self.addlinkendpoints(message, servers1, servers2)
|
|
elif message.flags & MessageFlags.ADD.value:
|
|
self.addtunnel(host, nn[0], nn[1], localn)
|
|
elif message.flags & MessageFlags.DELETE.value:
|
|
self.deltunnel(nn[0], nn[1])
|
|
handle_locally = False
|
|
else:
|
|
servers = servers1.union(servers2)
|
|
|
|
return handle_locally, servers, message
|
|
|
|
def addlinkendpoints(self, message, servers1, servers2):
|
|
"""
|
|
For a link message that is not handled locally, inform the remote
|
|
servers of the IP addresses used as tunnel endpoints by adding
|
|
opaque data to the link message.
|
|
|
|
:param core.api.coreapi.CoreMessage message: message to link end points
|
|
:param servers1:
|
|
:param servers2:
|
|
:return: core link message
|
|
:rtype: coreapi.CoreLinkMessage
|
|
"""
|
|
ip1 = ""
|
|
for server in servers1:
|
|
if server.host is not None:
|
|
ip1 = server.host
|
|
break
|
|
ip2 = ""
|
|
for server in servers2:
|
|
if server.host is not None:
|
|
ip2 = server.host
|
|
break
|
|
tlvdata = message.raw_message[coreapi.CoreMessage.header_len:]
|
|
tlvdata += coreapi.CoreLinkTlv.pack(LinkTlvs.OPAQUE.value, "%s:%s" % (ip1, ip2))
|
|
newraw = coreapi.CoreLinkMessage.pack(message.flags, tlvdata)
|
|
msghdr = newraw[:coreapi.CoreMessage.header_len]
|
|
return coreapi.CoreLinkMessage(message.flags, msghdr, tlvdata)
|
|
|
|
def getlinkendpoint(self, msg, first_is_local):
|
|
"""
|
|
A link message between two different servers has been received,
|
|
and we need to determine the tunnel endpoint. First look for
|
|
opaque data in the link message, otherwise use the IP of the message
|
|
sender (the master server).
|
|
|
|
:param coreapi.CoreLinkMessage msg:
|
|
:param bool first_is_local: is first local
|
|
:return: host address
|
|
:rtype: str
|
|
"""
|
|
host = None
|
|
opaque = msg.get_tlv(LinkTlvs.OPAQUE.value)
|
|
if opaque is not None:
|
|
if first_is_local:
|
|
host = opaque.split(":")[1]
|
|
else:
|
|
host = opaque.split(":")[0]
|
|
if host == "":
|
|
host = None
|
|
|
|
if host is None:
|
|
for session_client in self.session_clients:
|
|
# get IP address from API message sender (master)
|
|
if session_client.client_address != "":
|
|
host = session_client.client_address[0]
|
|
break
|
|
|
|
return host
|
|
|
|
def handlerawmsg(self, msg):
|
|
"""
|
|
Helper to invoke message handler, using raw (packed) message bytes.
|
|
|
|
:param msg: raw message butes
|
|
:return: should handle locally or not
|
|
:rtype: bool
|
|
"""
|
|
hdr = msg[:coreapi.CoreMessage.header_len]
|
|
msgtype, flags, msglen = coreapi.CoreMessage.unpack_header(hdr)
|
|
msgcls = coreapi.CLASS_MAP[msgtype]
|
|
return self.handle_message(msgcls(flags, hdr, msg[coreapi.CoreMessage.header_len:]))
|
|
|
|
def forwardmsg(self, message, servers):
|
|
"""
|
|
Forward API message to all given servers.
|
|
|
|
Return True if an empty host/port is encountered, indicating
|
|
the message should be handled locally.
|
|
|
|
:param core.api.coreapi.CoreMessage message: message to forward
|
|
:param list servers: server to forward message to
|
|
:return: handle locally value
|
|
:rtype: bool
|
|
"""
|
|
handle_locally = len(servers) == 0
|
|
for server in servers:
|
|
if server.host is None and server.port is None:
|
|
# local emulation server, handle this locally
|
|
handle_locally = True
|
|
elif server.sock is None:
|
|
logger.info("server %s @ %s:%s is disconnected" % (
|
|
server.name, server.host, server.port))
|
|
else:
|
|
logger.info("forwarding message to server: %s - %s:\n%s",
|
|
server.host, server.port, message)
|
|
server.sock.send(message.raw_message)
|
|
return handle_locally
|
|
|
|
def writeservers(self):
|
|
"""
|
|
Write the server list to a text file in the session directory upon
|
|
startup: /tmp/pycore.nnnnn/servers
|
|
|
|
:return: nothing
|
|
"""
|
|
servers = self.getservers()
|
|
filename = os.path.join(self.session.session_dir, "servers")
|
|
master = self.session_id_master
|
|
if master is None:
|
|
master = self.session.session_id
|
|
try:
|
|
with open(filename, "w") as f:
|
|
f.write("master=%s\n" % master)
|
|
for server in servers:
|
|
if server.name == "localhost":
|
|
continue
|
|
|
|
lhost, lport = None, None
|
|
if server.sock:
|
|
lhost, lport = server.sock.getsockname()
|
|
f.write("%s %s %s %s %s\n" % (server.name, server.host, server.port, lhost, lport))
|
|
except IOError:
|
|
logger.exception("error writing server list to the file: %s" % filename)
|
|
|
|
def writenodeserver(self, nodestr, server):
|
|
"""
|
|
Creates a /tmp/pycore.nnnnn/nX.conf/server file having the node
|
|
and server info. This may be used by scripts for accessing nodes on
|
|
other machines, much like local nodes may be accessed via the
|
|
VnodeClient class.
|
|
|
|
:param str nodestr: node string
|
|
:param CoreDistributedServer server: core server
|
|
:return: nothing
|
|
"""
|
|
serverstr = "%s %s %s" % (server.name, server.host, server.port)
|
|
name = nodestr.split()[1]
|
|
dirname = os.path.join(self.session.session_dir, name + ".conf")
|
|
filename = os.path.join(dirname, "server")
|
|
try:
|
|
os.makedirs(dirname)
|
|
except OSError:
|
|
# directory may already exist from previous distributed run
|
|
logger.exception("error creating directory: %s", dirname)
|
|
|
|
try:
|
|
with open(filename, "w") as f:
|
|
f.write("%s\n%s\n" % (serverstr, nodestr))
|
|
except IOError:
|
|
logger.exception("error writing server file %s for node %s" % (filename, name))
|
|
|
|
def local_instantiation_complete(self):
|
|
"""
|
|
Set the local server"s instantiation-complete status to True.
|
|
|
|
:return: nothing
|
|
"""
|
|
# TODO: do we really want to allow a localhost to not exist?
|
|
with self.servers_lock:
|
|
server = self.servers.get("localhost")
|
|
if server is not None:
|
|
server.instantiation_complete = True
|
|
|
|
# broadcast out instantiate complete
|
|
tlvdata = ""
|
|
tlvdata += coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.INSTANTIATION_COMPLETE.value)
|
|
message = coreapi.CoreEventMessage.pack(0, tlvdata)
|
|
for session_client in self.session_clients:
|
|
session_client.sendall(message)
|
|
|
|
def instantiation_complete(self):
|
|
"""
|
|
Return True if all servers have completed instantiation, False
|
|
otherwise.
|
|
|
|
:return: have all server completed instantiation
|
|
:rtype: bool
|
|
"""
|
|
with self.servers_lock:
|
|
for server in self.servers.itervalues():
|
|
if not server.instantiation_complete:
|
|
return False
|
|
return True
|
|
|
|
def handle_distributed(self, message):
|
|
"""
|
|
Handle the session options config message as it has reached the
|
|
broker. Options requiring modification for distributed operation should
|
|
be handled here.
|
|
|
|
:param message: message to handle
|
|
:return: nothing
|
|
"""
|
|
if not self.session.master:
|
|
return
|
|
|
|
if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session":
|
|
return
|
|
|
|
values_str = message.get_tlv(ConfigTlvs.VALUES.value)
|
|
if values_str is None:
|
|
return
|
|
|
|
value_strings = values_str.split('|')
|
|
for value_string in value_strings:
|
|
key, value = value_string.split('=', 1)
|
|
if key == "controlnet":
|
|
self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string))
|
|
|
|
def handle_distributed_control_net(self, message, values, index):
|
|
"""
|
|
Modify Config Message if multiple control network prefixes are
|
|
defined. Map server names to prefixes and repack the message before
|
|
it is forwarded to slave servers.
|
|
|
|
:param message: message to handle
|
|
:param list values: values to handle
|
|
:param int index: index ti get key value from
|
|
:return: nothing
|
|
"""
|
|
key_value = values[index]
|
|
key, value = key_value.split('=', 1)
|
|
control_nets = value.split()
|
|
|
|
if len(control_nets) < 2:
|
|
logger.warn("multiple controlnet prefixes do not exist")
|
|
return
|
|
|
|
servers = self.session.broker.getservernames()
|
|
if len(servers) < 2:
|
|
logger.warn("not distributed")
|
|
return
|
|
|
|
servers.remove("localhost")
|
|
# master always gets first prefix
|
|
servers.insert(0, "localhost")
|
|
# create list of "server1:ctrlnet1 server2:ctrlnet2 ..."
|
|
control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets))
|
|
values[index] = "controlnet=%s" % (" ".join(control_nets))
|
|
values_str = "|".join(values)
|
|
message.tlvdata[ConfigTlvs.VALUES.value] = values_str
|
|
message.repack()
|