catching up with commits: daemon: Add an instantiation-complete CORE API event type
This commit is contained in:
parent
00f4ebf5a9
commit
2fc6345138
8 changed files with 377 additions and 332 deletions
|
@ -38,6 +38,31 @@ from core.phys.pnodes import PhysicalNode
|
||||||
logger = log.get_logger(__name__)
|
logger = log.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class CoreServer(object):
|
||||||
|
def __init__(self, name, host, port):
|
||||||
|
self.name = name
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.sock = None
|
||||||
|
self.instantiation_complete = False
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
assert self.sock is None
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sock.connect((self.host, self.port))
|
||||||
|
except:
|
||||||
|
sock.close()
|
||||||
|
raise
|
||||||
|
self.sock = sock
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.sock is not None:
|
||||||
|
self.sock.close()
|
||||||
|
self.sock = None
|
||||||
|
|
||||||
|
|
||||||
class CoreBroker(ConfigurableManager):
|
class CoreBroker(ConfigurableManager):
|
||||||
"""
|
"""
|
||||||
Member of pycore session class for handling global emulation server data.
|
Member of pycore session class for handling global emulation server data.
|
||||||
|
@ -69,12 +94,12 @@ class CoreBroker(ConfigurableManager):
|
||||||
# reference counts of nodes on servers
|
# reference counts of nodes on servers
|
||||||
self.nodecounts = {}
|
self.nodecounts = {}
|
||||||
self.bootcount = 0
|
self.bootcount = 0
|
||||||
# list of node numbers that are link-layer nodes (networks)
|
# set of node numbers that are link-layer nodes (networks)
|
||||||
self.network_nodes = []
|
self.network_nodes = set()
|
||||||
# list of node numbers that are PhysicalNode nodes
|
# set of node numbers that are PhysicalNode nodes
|
||||||
self.physical_nodes = []
|
self.physical_nodes = set()
|
||||||
# allows for other message handlers to process API messages (e.g. EMANE)
|
# allows for other message handlers to process API messages (e.g. EMANE)
|
||||||
self.handlers = ()
|
self.handlers = set()
|
||||||
# dict with tunnel key to tunnel device mapping
|
# dict with tunnel key to tunnel device mapping
|
||||||
self.tunnels = {}
|
self.tunnels = {}
|
||||||
self.dorecvloop = False
|
self.dorecvloop = False
|
||||||
|
@ -96,12 +121,11 @@ class CoreBroker(ConfigurableManager):
|
||||||
"""
|
"""
|
||||||
with self.servers_lock:
|
with self.servers_lock:
|
||||||
while len(self.servers) > 0:
|
while len(self.servers) > 0:
|
||||||
server, v = self.servers.popitem()
|
name, server = self.servers.popitem()
|
||||||
host, port, sock = v
|
if server.sock is not None:
|
||||||
if sock is None:
|
logger.info("closing connection with %s @ %s:%s" %
|
||||||
continue
|
(name, server.host, server.port))
|
||||||
logger.info("closing connection with %s @ %s:%s", server, host, port)
|
server.close()
|
||||||
sock.close()
|
|
||||||
self.reset()
|
self.reset()
|
||||||
self.dorecvloop = False
|
self.dorecvloop = False
|
||||||
if self.recvthread is not None:
|
if self.recvthread is not None:
|
||||||
|
@ -113,21 +137,21 @@ class CoreBroker(ConfigurableManager):
|
||||||
"""
|
"""
|
||||||
self.nodemap_lock.acquire()
|
self.nodemap_lock.acquire()
|
||||||
self.nodemap.clear()
|
self.nodemap.clear()
|
||||||
for server in self.nodecounts:
|
for server, count in self.nodecounts.iteritems():
|
||||||
if self.nodecounts[server] < 1:
|
if count < 1:
|
||||||
self.delserver(server)
|
self.delserver(server)
|
||||||
self.nodecounts.clear()
|
self.nodecounts.clear()
|
||||||
self.bootcount = 0
|
self.bootcount = 0
|
||||||
self.nodemap_lock.release()
|
self.nodemap_lock.release()
|
||||||
del self.network_nodes[:]
|
self.network_nodes.clear()
|
||||||
del self.physical_nodes[:]
|
self.physical_nodes.clear()
|
||||||
while len(self.tunnels) > 0:
|
while len(self.tunnels) > 0:
|
||||||
key, gt = self.tunnels.popitem()
|
key, gt = self.tunnels.popitem()
|
||||||
gt.shutdown()
|
gt.shutdown()
|
||||||
|
|
||||||
def startrecvloop(self):
|
def startrecvloop(self):
|
||||||
"""
|
"""
|
||||||
Spawn the recvloop() thread if it hasn't been already started.
|
Spawn the recvloop() thread if it hasn"t been already started.
|
||||||
"""
|
"""
|
||||||
if self.recvthread is not None:
|
if self.recvthread is not None:
|
||||||
if self.recvthread.isAlive():
|
if self.recvthread.isAlive():
|
||||||
|
@ -151,53 +175,55 @@ class CoreBroker(ConfigurableManager):
|
||||||
rlist = []
|
rlist = []
|
||||||
with self.servers_lock:
|
with self.servers_lock:
|
||||||
# build a socket list for select call
|
# build a socket list for select call
|
||||||
for name in self.servers:
|
for server in self.servers.itervalues():
|
||||||
(h, p, sock) = self.servers[name]
|
if server.sock is not None:
|
||||||
if sock is not None:
|
rlist.append(server.sock)
|
||||||
rlist.append(sock.fileno())
|
|
||||||
r, w, x = select.select(rlist, [], [], 1.0)
|
r, w, x = select.select(rlist, [], [], 1.0)
|
||||||
for sockfd in r:
|
for sock in r:
|
||||||
try:
|
server = self.getserverbysock(sock)
|
||||||
(h, p, sock, name) = self.getserverbysock(sockfd)
|
if server is None:
|
||||||
except KeyError:
|
|
||||||
# servers may have changed; loop again
|
# servers may have changed; loop again
|
||||||
logger.exception("get server by sock error")
|
continue
|
||||||
break
|
rcvlen = self.recv(server)
|
||||||
rcvlen = self.recv(sock, h)
|
|
||||||
if rcvlen == 0:
|
if rcvlen == 0:
|
||||||
logger.info("connection with %s @ %s:%s has closed", name, h, p)
|
logger.info("connection with %s @ %s:%s has closed" % (
|
||||||
self.servers[name] = (h, p, None)
|
server.name, server.host, server.port))
|
||||||
|
|
||||||
def recv(self, sock, host):
|
def recv(self, server):
|
||||||
"""
|
"""
|
||||||
Receive data on an emulation server socket and broadcast it to
|
Receive data on an emulation server socket and broadcast it to
|
||||||
all connected session handlers. Returns the length of data recevied
|
all connected session handlers. Returns the length of data recevied
|
||||||
and forwarded. Return value of zero indicates the socket has closed
|
and forwarded. Return value of zero indicates the socket has closed
|
||||||
and should be removed from the self.servers dict.
|
and should be removed from the self.servers dict.
|
||||||
|
|
||||||
|
:param CoreServer server: server to receive from
|
||||||
|
:return: message length
|
||||||
|
:rtype: int
|
||||||
"""
|
"""
|
||||||
msghdr = sock.recv(coreapi.CoreMessage.header_len)
|
msghdr = server.sock.recv(coreapi.CoreMessage.header_len)
|
||||||
if len(msghdr) == 0:
|
if len(msghdr) == 0:
|
||||||
# server disconnected
|
# server disconnected
|
||||||
sock.close()
|
server.close()
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
if len(msghdr) != coreapi.CoreMessage.header_len:
|
if len(msghdr) != coreapi.CoreMessage.header_len:
|
||||||
logger.info("warning: broker received not enough data len=%s" % len(msghdr))
|
logger.info("warning: broker received not enough data len=%s" % len(msghdr))
|
||||||
return len(msghdr)
|
return len(msghdr)
|
||||||
|
|
||||||
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
||||||
msgdata = sock.recv(msglen)
|
msgdata = server.sock.recv(msglen)
|
||||||
data = msghdr + msgdata
|
data = msghdr + msgdata
|
||||||
count = None
|
count = None
|
||||||
# snoop exec response for remote interactive TTYs
|
# snoop exec response for remote interactive TTYs
|
||||||
if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value:
|
if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value:
|
||||||
data = self.fixupremotetty(msghdr, msgdata, host)
|
data = self.fixupremotetty(msghdr, msgdata, server.host)
|
||||||
elif msgtype == MessageTypes.NODE.value:
|
elif msgtype == MessageTypes.NODE.value:
|
||||||
# snoop node delete response to decrement node counts
|
# snoop node delete response to decrement node counts
|
||||||
if msgflags & MessageFlags.DELETE.value:
|
if msgflags & MessageFlags.DELETE.value:
|
||||||
msg = coreapi.CoreNodeMessage(msgflags, msghdr, msgdata)
|
msg = coreapi.CoreNodeMessage(msgflags, msghdr, msgdata)
|
||||||
nodenum = msg.get_tlv(NodeTlvs.NUMBER.value)
|
nodenum = msg.get_tlv(NodeTlvs.NUMBER.value)
|
||||||
if nodenum is not None:
|
if nodenum is not None:
|
||||||
count = self.delnodemap(sock, nodenum)
|
count = self.delnodemap(server, nodenum)
|
||||||
# snoop node add response to increment booted node count
|
# snoop node add response to increment booted node count
|
||||||
# (only CoreNodes send these response messages)
|
# (only CoreNodes send these response messages)
|
||||||
elif msgflags & (MessageFlags.ADD.value | MessageFlags.LOCAL.value):
|
elif msgflags & (MessageFlags.ADD.value | MessageFlags.LOCAL.value):
|
||||||
|
@ -207,6 +233,13 @@ class CoreBroker(ConfigurableManager):
|
||||||
# this allows green link lines for remote WLANs
|
# this allows green link lines for remote WLANs
|
||||||
msg = coreapi.CoreLinkMessage(msgflags, msghdr, msgdata)
|
msg = coreapi.CoreLinkMessage(msgflags, msghdr, msgdata)
|
||||||
self.session.sdt.handle_distributed(msg)
|
self.session.sdt.handle_distributed(msg)
|
||||||
|
elif msgtype == MessageTypes.EVENT.value:
|
||||||
|
msg = coreapi.CoreEventMessage(msgflags, msghdr, msgdata)
|
||||||
|
eventtype = msg.get_tlv(EventTlvs.TYPE.value)
|
||||||
|
if eventtype == EventTypes.INSTANTIATION_COMPLETE.value:
|
||||||
|
server.instantiation_complete = True
|
||||||
|
if self.instantiation_complete():
|
||||||
|
self.session.check_runtime()
|
||||||
else:
|
else:
|
||||||
logger.error("unknown message type received: %s", msgtype)
|
logger.error("unknown message type received: %s", msgtype)
|
||||||
|
|
||||||
|
@ -222,85 +255,88 @@ class CoreBroker(ConfigurableManager):
|
||||||
|
|
||||||
def addserver(self, name, host, port):
|
def addserver(self, name, host, port):
|
||||||
"""
|
"""
|
||||||
Add a new server, and try to connect to it. If we're already
|
Add a new server, and try to connect to it. If we"re already
|
||||||
connected to this (host, port), then leave it alone. When host,port
|
connected to this (host, port), then leave it alone. When host,port
|
||||||
is None, do not try to connect.
|
is None, do not try to connect.
|
||||||
"""
|
"""
|
||||||
self.servers_lock.acquire()
|
with self.servers_lock:
|
||||||
if name in self.servers:
|
server = self.servers.get(name)
|
||||||
oldhost, oldport, sock = self.servers[name]
|
if server is not None:
|
||||||
if host == oldhost or port == oldport:
|
if host == server.host and port == server.port and \
|
||||||
# leave this socket connected
|
server.sock is not None:
|
||||||
if sock is not None:
|
# leave this socket connected
|
||||||
self.servers_lock.release()
|
|
||||||
return
|
return
|
||||||
if host is not None and sock is not None:
|
|
||||||
logger.info("closing connection with %s @ %s:%s", name, host, port)
|
|
||||||
if sock is not None:
|
|
||||||
sock.close()
|
|
||||||
self.servers_lock.release()
|
|
||||||
if host is not None:
|
|
||||||
logger.info("adding server %s @ %s:%s", name, host, port)
|
|
||||||
if host is None:
|
|
||||||
sock = None
|
|
||||||
else:
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
# sock.setblocking(0)
|
|
||||||
# error = sock.connect_ex((host, port))
|
|
||||||
try:
|
|
||||||
sock.connect((host, port))
|
|
||||||
self.startrecvloop()
|
|
||||||
except IOError:
|
|
||||||
logger.exception("error connecting to server %s:%s", host, port)
|
|
||||||
sock.close()
|
|
||||||
sock = None
|
|
||||||
self.servers_lock.acquire()
|
|
||||||
self.servers[name] = (host, port, sock)
|
|
||||||
self.servers_lock.release()
|
|
||||||
|
|
||||||
def delserver(self, name):
|
logger.info("closing connection with %s @ %s:%s" % (name, server.host, server.port))
|
||||||
|
server.close()
|
||||||
|
del self.servers[name]
|
||||||
|
|
||||||
|
logger.info("adding server %s @ %s:%s" % (name, host, port))
|
||||||
|
server = CoreServer(name, host, port)
|
||||||
|
if host is not None and port is not None:
|
||||||
|
try:
|
||||||
|
server.connect()
|
||||||
|
except IOError:
|
||||||
|
logger.exception("error connecting to server %s:%s" % (host, port))
|
||||||
|
if server.sock is not None:
|
||||||
|
self.startrecvloop()
|
||||||
|
self.servers[name] = server
|
||||||
|
|
||||||
|
def delserver(self, server):
|
||||||
"""
|
"""
|
||||||
Remove a server and hang up any connection.
|
Remove a server and hang up any connection.
|
||||||
"""
|
|
||||||
self.servers_lock.acquire()
|
|
||||||
if name not in self.servers:
|
|
||||||
self.servers_lock.release()
|
|
||||||
return
|
|
||||||
(host, port, sock) = self.servers.pop(name)
|
|
||||||
if sock is not None:
|
|
||||||
logger.info("closing connection with %s @ %s:%s", name, host, port)
|
|
||||||
sock.close()
|
|
||||||
self.servers_lock.release()
|
|
||||||
|
|
||||||
def getserver(self, name):
|
:param CoreServer server: server to delete
|
||||||
"""
|
:return:
|
||||||
Return the (host, port, sock) tuple, or raise a KeyError exception.
|
|
||||||
"""
|
|
||||||
if name not in self.servers:
|
|
||||||
raise KeyError("emulation server %s not found" % name)
|
|
||||||
return self.servers[name]
|
|
||||||
|
|
||||||
def getserverbysock(self, sockfd):
|
|
||||||
"""
|
|
||||||
Return a (host, port, sock, name) tuple based on socket file
|
|
||||||
descriptor, or raise a KeyError exception.
|
|
||||||
"""
|
"""
|
||||||
with self.servers_lock:
|
with self.servers_lock:
|
||||||
for name in self.servers:
|
try:
|
||||||
(host, port, sock) = self.servers[name]
|
s = self.servers.pop(server.name)
|
||||||
if sock is None:
|
assert s == server
|
||||||
continue
|
except KeyError:
|
||||||
if sock.fileno() == sockfd:
|
pass
|
||||||
return host, port, sock, name
|
if server.sock is not None:
|
||||||
raise KeyError("socket fd %s not found" % sockfd)
|
logger.info("closing connection with %s @ %s:%s" % (server.name, server.host, server.port))
|
||||||
|
server.close()
|
||||||
|
|
||||||
def getserverlist(self):
|
def getserverbyname(self, name):
|
||||||
"""
|
"""
|
||||||
Return the list of server names (keys from self.servers).
|
Return the server object having the given name, or None.
|
||||||
|
|
||||||
|
:param str name: name of server to retrieve
|
||||||
|
:return: server for given name
|
||||||
|
:rtype: CoreServer
|
||||||
"""
|
"""
|
||||||
with self.servers_lock:
|
with self.servers_lock:
|
||||||
serverlist = sorted(self.servers.keys())
|
return self.servers.get(name)
|
||||||
return serverlist
|
|
||||||
|
def getserverbysock(self, sock):
|
||||||
|
"""
|
||||||
|
Return the server object corresponding to the given socket, or None.
|
||||||
|
|
||||||
|
:param sock: socket associated with a server
|
||||||
|
:return: core server associated wit the socket
|
||||||
|
:rtype: CoreServer
|
||||||
|
"""
|
||||||
|
with self.servers_lock:
|
||||||
|
for server in self.servers.itervalues():
|
||||||
|
if server.sock == sock:
|
||||||
|
return server
|
||||||
|
return None
|
||||||
|
|
||||||
|
def getservers(self):
|
||||||
|
"""
|
||||||
|
Return a list of servers sorted by name.
|
||||||
|
"""
|
||||||
|
with self.servers_lock:
|
||||||
|
return sorted(self.servers.values(), key=lambda x: x.name)
|
||||||
|
|
||||||
|
def getservernames(self):
|
||||||
|
"""
|
||||||
|
Return a sorted list of server names (keys from self.servers).
|
||||||
|
"""
|
||||||
|
with self.servers_lock:
|
||||||
|
return sorted(self.servers.keys())
|
||||||
|
|
||||||
def tunnelkey(self, n1num, n2num):
|
def tunnelkey(self, n1num, n2num):
|
||||||
"""
|
"""
|
||||||
|
@ -353,12 +389,12 @@ class CoreBroker(ConfigurableManager):
|
||||||
try:
|
try:
|
||||||
net = self.session.get_object(n)
|
net = self.session.get_object(n)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise KeyError, "network node %s not found" % n
|
raise KeyError("network node %s not found" % n)
|
||||||
# add other nets here that do not require tunnels
|
# add other nets here that do not require tunnels
|
||||||
if nodeutils.is_node(net, NodeTypes.EMANE_NET):
|
if nodeutils.is_node(net, NodeTypes.EMANE_NET):
|
||||||
return None
|
return None
|
||||||
if nodeutils.is_node(net, NodeTypes.CONTROL_NET):
|
if nodeutils.is_node(net, NodeTypes.CONTROL_NET):
|
||||||
if hasattr(net, 'serverintf'):
|
if hasattr(net, "serverintf"):
|
||||||
if net.serverintf is not None:
|
if net.serverintf is not None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -367,17 +403,12 @@ class CoreBroker(ConfigurableManager):
|
||||||
return None
|
return None
|
||||||
hosts = []
|
hosts = []
|
||||||
for server in servers:
|
for server in servers:
|
||||||
(host, port, sock) = self.getserver(server)
|
if server.host is None:
|
||||||
if host is None:
|
|
||||||
continue
|
continue
|
||||||
hosts.append(host)
|
hosts.append(server.host)
|
||||||
if len(hosts) == 0:
|
if len(hosts) == 0 and self.session_handler.client_address != "":
|
||||||
# get IP address from API message sender (master)
|
# get IP address from API message sender (master)
|
||||||
self.session._handlerslock.acquire()
|
hosts.append(self.session_handler.client_address[0])
|
||||||
for h in self.session._handlers:
|
|
||||||
if h.client_address != "":
|
|
||||||
hosts.append(h.client_address[0])
|
|
||||||
self.session._handlerslock.release()
|
|
||||||
|
|
||||||
r = []
|
r = []
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
|
@ -426,45 +457,33 @@ class CoreBroker(ConfigurableManager):
|
||||||
"""
|
"""
|
||||||
Record a node number to emulation server mapping.
|
Record a node number to emulation server mapping.
|
||||||
"""
|
"""
|
||||||
self.nodemap_lock.acquire()
|
with self.nodemap_lock:
|
||||||
if nodenum in self.nodemap:
|
if nodenum in self.nodemap:
|
||||||
if server in self.nodemap[nodenum]:
|
if server in self.nodemap[nodenum]:
|
||||||
self.nodemap_lock.release()
|
return
|
||||||
return
|
self.nodemap[nodenum].add(server)
|
||||||
self.nodemap[nodenum].append(server)
|
else:
|
||||||
else:
|
self.nodemap[nodenum] = {server}
|
||||||
self.nodemap[nodenum] = [server, ]
|
if server in self.nodecounts:
|
||||||
if server in self.nodecounts:
|
self.nodecounts[server] += 1
|
||||||
self.nodecounts[server] += 1
|
else:
|
||||||
else:
|
self.nodecounts[server] = 1
|
||||||
self.nodecounts[server] = 1
|
|
||||||
self.nodemap_lock.release()
|
|
||||||
|
|
||||||
def delnodemap(self, sock, nodenum):
|
def delnodemap(self, server, nodenum):
|
||||||
"""
|
"""
|
||||||
Remove a node number to emulation server mapping.
|
Remove a node number to emulation server mapping.
|
||||||
Return the number of nodes left on this server.
|
Return the number of nodes left on this server.
|
||||||
"""
|
"""
|
||||||
self.nodemap_lock.acquire()
|
|
||||||
count = None
|
count = None
|
||||||
if nodenum not in self.nodemap:
|
with self.nodemap_lock:
|
||||||
self.nodemap_lock.release()
|
if nodenum not in self.nodemap:
|
||||||
return count
|
return count
|
||||||
found = False
|
|
||||||
for server in self.nodemap[nodenum]:
|
|
||||||
(host, port, srvsock) = self.getserver(server)
|
|
||||||
if srvsock == sock:
|
|
||||||
found = True
|
|
||||||
break
|
|
||||||
if server in self.nodecounts:
|
|
||||||
count = self.nodecounts[server]
|
|
||||||
if found:
|
|
||||||
self.nodemap[nodenum].remove(server)
|
self.nodemap[nodenum].remove(server)
|
||||||
if server in self.nodecounts:
|
if server in self.nodecounts:
|
||||||
|
count = self.nodecounts[server]
|
||||||
count -= 1
|
count -= 1
|
||||||
self.nodecounts[server] = count
|
self.nodecounts[server] = count
|
||||||
self.nodemap_lock.release()
|
return count
|
||||||
return count
|
|
||||||
|
|
||||||
def incrbootcount(self):
|
def incrbootcount(self):
|
||||||
"""
|
"""
|
||||||
|
@ -481,29 +500,24 @@ class CoreBroker(ConfigurableManager):
|
||||||
|
|
||||||
def getserversbynode(self, nodenum):
|
def getserversbynode(self, nodenum):
|
||||||
"""
|
"""
|
||||||
Retrieve a list of emulation servers given a node number.
|
Retrieve a set of emulation servers given a node number.
|
||||||
"""
|
"""
|
||||||
self.nodemap_lock.acquire()
|
with self.nodemap_lock:
|
||||||
if nodenum not in self.nodemap:
|
if nodenum not in self.nodemap:
|
||||||
self.nodemap_lock.release()
|
return set()
|
||||||
return []
|
return self.nodemap[nodenum]
|
||||||
r = self.nodemap[nodenum]
|
|
||||||
self.nodemap_lock.release()
|
|
||||||
return r
|
|
||||||
|
|
||||||
def addnet(self, nodenum):
|
def addnet(self, nodenum):
|
||||||
"""
|
"""
|
||||||
Add a node number to the list of link-layer nodes.
|
Add a node number to the list of link-layer nodes.
|
||||||
"""
|
"""
|
||||||
if nodenum not in self.network_nodes:
|
self.network_nodes.add(nodenum)
|
||||||
self.network_nodes.append(nodenum)
|
|
||||||
|
|
||||||
def addphys(self, nodenum):
|
def addphys(self, nodenum):
|
||||||
"""
|
"""
|
||||||
Add a node number to the list of physical nodes.
|
Add a node number to the list of physical nodes.
|
||||||
"""
|
"""
|
||||||
if nodenum not in self.physical_nodes:
|
self.physical_nodes.add(nodenum)
|
||||||
self.physical_nodes.append(nodenum)
|
|
||||||
|
|
||||||
def configure_reset(self, config_data):
|
def configure_reset(self, config_data):
|
||||||
"""
|
"""
|
||||||
|
@ -518,7 +532,7 @@ class CoreBroker(ConfigurableManager):
|
||||||
def configure_values(self, config_data):
|
def configure_values(self, config_data):
|
||||||
"""
|
"""
|
||||||
Receive configuration message with a list of server:host:port
|
Receive configuration message with a list of server:host:port
|
||||||
combinations that we'll need to connect with.
|
combinations that we"ll need to connect with.
|
||||||
|
|
||||||
:param core.conf.ConfigData config_data: configuration data for carrying out a configuration
|
:param core.conf.ConfigData config_data: configuration data for carrying out a configuration
|
||||||
:return: None
|
:return: None
|
||||||
|
@ -529,27 +543,27 @@ class CoreBroker(ConfigurableManager):
|
||||||
if values is None:
|
if values is None:
|
||||||
logger.info("emulation server data missing")
|
logger.info("emulation server data missing")
|
||||||
return None
|
return None
|
||||||
values = values.split('|')
|
values = values.split("|")
|
||||||
|
|
||||||
# string of "server:ip:port,server:ip:port,..."
|
# string of "server:ip:port,server:ip:port,..."
|
||||||
server_strings = values[0]
|
server_strings = values[0]
|
||||||
server_list = server_strings.split(',')
|
server_list = server_strings.split(",")
|
||||||
|
|
||||||
for server in server_list:
|
for server in server_list:
|
||||||
server_items = server.split(':')
|
server_items = server.split(":")
|
||||||
(name, host, port) = server_items[:3]
|
(name, host, port) = server_items[:3]
|
||||||
|
|
||||||
if host == '':
|
if host == "":
|
||||||
host = None
|
host = None
|
||||||
|
|
||||||
if port == '':
|
if port == "":
|
||||||
port = None
|
port = None
|
||||||
else:
|
else:
|
||||||
port = int(port)
|
port = int(port)
|
||||||
|
|
||||||
if session_id is not None:
|
if session_id is not None:
|
||||||
# receive session ID and my IP from master
|
# receive session ID and my IP from master
|
||||||
self.session_id_master = int(session_id.split('|')[0])
|
self.session_id_master = int(session_id.split("|")[0])
|
||||||
self.myip = host
|
self.myip = host
|
||||||
host = None
|
host = None
|
||||||
port = None
|
port = None
|
||||||
|
@ -568,67 +582,70 @@ class CoreBroker(ConfigurableManager):
|
||||||
Returns True when message does not need to be handled locally,
|
Returns True when message does not need to be handled locally,
|
||||||
and performs forwarding if required.
|
and performs forwarding if required.
|
||||||
Returning False indicates this message should be handled locally.
|
Returning False indicates this message should be handled locally.
|
||||||
|
|
||||||
|
:param core.api.coreapi.CoreMessage message: message to handle
|
||||||
|
:return: true or false for handling locally
|
||||||
|
:rtype: bool
|
||||||
"""
|
"""
|
||||||
serverlist = []
|
servers = set()
|
||||||
handle_locally = False
|
|
||||||
# Do not forward messages when in definition state
|
# Do not forward messages when in definition state
|
||||||
# (for e.g. configuring services)
|
# (for e.g. configuring services)
|
||||||
if self.session.state == EventTypes.DEFINITION_STATE.value:
|
if self.session.state == EventTypes.DEFINITION_STATE.value:
|
||||||
handle_locally = True
|
return False
|
||||||
return not handle_locally
|
|
||||||
# Decide whether message should be handled locally or forwarded, or both
|
# Decide whether message should be handled locally or forwarded, or both
|
||||||
if message.message_type == MessageTypes.NODE.value:
|
if message.message_type == MessageTypes.NODE.value:
|
||||||
(handle_locally, serverlist) = self.handlenodemsg(message)
|
servers = self.handlenodemsg(message)
|
||||||
elif message.message_type == MessageTypes.EVENT.value:
|
elif message.message_type == MessageTypes.EVENT.value:
|
||||||
# broadcast events everywhere
|
# broadcast events everywhere
|
||||||
serverlist = self.getserverlist()
|
servers = self.getservers()
|
||||||
elif message.message_type == MessageTypes.CONFIG.value:
|
elif message.message_type == MessageTypes.CONFIG.value:
|
||||||
# broadcast location and services configuration everywhere
|
# broadcast location and services configuration everywhere
|
||||||
confobj = message.get_tlv(ConfigTlvs.OBJECT.value)
|
confobj = message.get_tlv(ConfigTlvs.OBJECT.value)
|
||||||
if confobj == "location" or confobj == "services" or \
|
if confobj == "location" or confobj == "services" or \
|
||||||
confobj == "session" or confobj == "all":
|
confobj == "session" or confobj == "all":
|
||||||
serverlist = self.getserverlist()
|
servers = self.getservers()
|
||||||
elif message.message_type == MessageTypes.FILE.value:
|
elif message.message_type == MessageTypes.FILE.value:
|
||||||
# broadcast hook scripts and custom service files everywhere
|
# broadcast hook scripts and custom service files everywhere
|
||||||
filetype = message.get_tlv(FileTlvs.TYPE.value)
|
filetype = message.get_tlv(FileTlvs.TYPE.value)
|
||||||
if filetype is not None and (filetype[:5] == "hook:" or filetype[:8] == "service:"):
|
if filetype is not None and (filetype[:5] == "hook:" or filetype[:8] == "service:"):
|
||||||
serverlist = self.getserverlist()
|
servers = self.getservers()
|
||||||
if message.message_type == MessageTypes.LINK.value:
|
if message.message_type == MessageTypes.LINK.value:
|
||||||
# prepare a serverlist from two node numbers in link message
|
# prepare a server list from two node numbers in link message
|
||||||
(handle_locally, serverlist, message) = self.handlelinkmsg(message)
|
servers, message = self.handlelinkmsg(message)
|
||||||
elif len(serverlist) == 0:
|
elif len(servers) == 0:
|
||||||
# check for servers based on node numbers in all messages but link
|
# check for servers based on node numbers in all messages but link
|
||||||
nn = message.node_numbers()
|
nn = message.node_numbers()
|
||||||
if len(nn) == 0:
|
if len(nn) == 0:
|
||||||
return False
|
return False
|
||||||
serverlist = self.getserversbynode(nn[0])
|
servers = self.getserversbynode(nn[0])
|
||||||
|
|
||||||
if len(serverlist) == 0:
|
# allow other handlers to process this message (this is used
|
||||||
handle_locally = True
|
# by e.g. EMANE to use the link add message to keep counts of
|
||||||
|
# interfaces on other servers)
|
||||||
# allow other handlers to process this message
|
|
||||||
# (this is used by e.g. EMANE to use the link add message to keep counts
|
|
||||||
# of interfaces on other servers)
|
|
||||||
for handler in self.handlers:
|
for handler in self.handlers:
|
||||||
handler(message)
|
handler(message)
|
||||||
|
|
||||||
# Perform any message forwarding
|
# Perform any message forwarding
|
||||||
handle_locally = self.forwardmsg(message, serverlist, handle_locally)
|
handle_locally = self.forwardmsg(message, servers)
|
||||||
return not handle_locally
|
return not handle_locally
|
||||||
|
|
||||||
def setupserver(self, server):
|
def setupserver(self, servername):
|
||||||
"""
|
"""
|
||||||
Send the appropriate API messages for configuring the specified
|
Send the appropriate API messages for configuring the specified
|
||||||
emulation server.
|
emulation server.
|
||||||
"""
|
"""
|
||||||
host, port, sock = self.getserver(server)
|
server = self.getserverbyname(servername)
|
||||||
if host is None or sock is None:
|
if server is None:
|
||||||
|
logger.warn("ignoring unknown server: %s" % servername)
|
||||||
|
return
|
||||||
|
if server.sock is None or server.host is None or server.port is None:
|
||||||
|
logger.info("ignoring disconnected server: %s" % servername)
|
||||||
return
|
return
|
||||||
|
|
||||||
# communicate this session's current state to the server
|
# communicate this session"s current state to the server
|
||||||
tlvdata = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, self.session.state)
|
tlvdata = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, self.session.state)
|
||||||
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
||||||
sock.send(msg)
|
server.sock.send(msg)
|
||||||
|
|
||||||
# send a Configuration message for the broker object and inform the
|
# send a Configuration message for the broker object and inform the
|
||||||
# server of its local name
|
# server of its local name
|
||||||
|
@ -636,10 +653,11 @@ class CoreBroker(ConfigurableManager):
|
||||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "broker")
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "broker")
|
||||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, ConfigFlags.UPDATE.value)
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, ConfigFlags.UPDATE.value)
|
||||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.DATA_TYPES.value, (ConfigDataTypes.STRING.value,))
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.DATA_TYPES.value, (ConfigDataTypes.STRING.value,))
|
||||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.VALUES.value, "%s:%s:%s" % (server, host, port))
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.VALUES.value,
|
||||||
|
"%s:%s:%s" % (server.name, server.host, server.port))
|
||||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.SESSION.value, "%s" % self.session.session_id)
|
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.SESSION.value, "%s" % self.session.session_id)
|
||||||
msg = coreapi.CoreConfMessage.pack(0, tlvdata)
|
msg = coreapi.CoreConfMessage.pack(0, tlvdata)
|
||||||
sock.send(msg)
|
server.sock.send(msg)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def fixupremotetty(msghdr, msgdata, host):
|
def fixupremotetty(msghdr, msgdata, host):
|
||||||
|
@ -666,46 +684,47 @@ class CoreBroker(ConfigurableManager):
|
||||||
|
|
||||||
return coreapi.CoreExecMessage.pack(msgflags, tlvdata)
|
return coreapi.CoreExecMessage.pack(msgflags, tlvdata)
|
||||||
|
|
||||||
def handlenodemsg(self, msg):
|
def handlenodemsg(self, message):
|
||||||
"""
|
"""
|
||||||
Determine and return the servers to which this node message should
|
Determine and return the servers to which this node message should
|
||||||
be forwarded. Also keep track of link-layer nodes and the mapping of
|
be forwarded. Also keep track of link-layer nodes and the mapping of
|
||||||
nodes to servers.
|
nodes to servers.
|
||||||
|
|
||||||
|
:param core.api.coreapi.CoreMessage message: message to handle
|
||||||
|
:return:
|
||||||
"""
|
"""
|
||||||
serverlist = []
|
servers = set()
|
||||||
handle_locally = False
|
|
||||||
serverfiletxt = None
|
serverfiletxt = None
|
||||||
# snoop Node Message for emulation server TLV and record mapping
|
# snoop Node Message for emulation server TLV and record mapping
|
||||||
n = msg.tlv_data[NodeTlvs.NUMBER.value]
|
n = message.tlv_data[NodeTlvs.NUMBER.value]
|
||||||
# replicate link-layer nodes on all servers
|
# replicate link-layer nodes on all servers
|
||||||
nodetype = msg.get_tlv(NodeTlvs.TYPE.value)
|
nodetype = message.get_tlv(NodeTlvs.TYPE.value)
|
||||||
if nodetype is not None:
|
if nodetype is not None:
|
||||||
try:
|
try:
|
||||||
nodecls = nodeutils.get_node_class(NodeTypes(nodetype))
|
nodecls = nodeutils.get_node_class(NodeTypes(nodetype))
|
||||||
except KeyError:
|
except KeyError:
|
||||||
logger.exception("broker invalid node type %s", nodetype)
|
logger.warn("broker invalid node type %s" % nodetype)
|
||||||
return False, serverlist
|
return servers
|
||||||
if nodecls is None:
|
if nodecls is None:
|
||||||
logger.warn("broker unimplemented node type %s" % nodetype)
|
logger.warn("broker unimplemented node type %s" % nodetype)
|
||||||
return False, serverlist
|
return servers
|
||||||
if issubclass(nodecls, PyCoreNet) and nodetype != NodeTypes.WIRELESS_LAN.value:
|
if issubclass(nodecls, PyCoreNet) and nodetype != NodeTypes.WIRELESS_LAN.value:
|
||||||
# network node replicated on all servers; could be optimized
|
# network node replicated on all servers; could be optimized
|
||||||
# don't replicate WLANs, because ebtables rules won't work
|
# don"t replicate WLANs, because ebtables rules won"t work
|
||||||
serverlist = self.getserverlist()
|
servers = self.getservers()
|
||||||
handle_locally = True
|
|
||||||
self.addnet(n)
|
self.addnet(n)
|
||||||
for server in serverlist:
|
for server in servers:
|
||||||
self.addnodemap(server, n)
|
self.addnodemap(server, n)
|
||||||
# do not record server name for networks since network
|
# do not record server name for networks since network
|
||||||
# nodes are replicated across all server
|
# nodes are replicated across all server
|
||||||
return handle_locally, serverlist
|
return servers
|
||||||
if issubclass(nodecls, PyCoreNet) and nodetype == NodeTypes.WIRELESS_LAN.value:
|
if issubclass(nodecls, PyCoreNet) and nodetype == NodeTypes.WIRELESS_LAN.value:
|
||||||
# special case where remote WLANs not in session._objs, and no
|
# special case where remote WLANs not in session._objs, and no
|
||||||
# node response message received, so they are counted here
|
# node response message received, so they are counted here
|
||||||
if msg.get_tlv(NodeTlvs.EMULATION_SERVER.value) is not None:
|
if message.get_tlv(NodeTlvs.EMULATION_SERVER.value) is not None:
|
||||||
self.incrbootcount()
|
self.incrbootcount()
|
||||||
elif issubclass(nodecls, PyCoreNode):
|
elif issubclass(nodecls, PyCoreNode):
|
||||||
name = msg.get_tlv(NodeTlvs.NAME.value)
|
name = message.get_tlv(NodeTlvs.NAME.value)
|
||||||
if name:
|
if name:
|
||||||
serverfiletxt = "%s %s %s" % (n, name, nodecls)
|
serverfiletxt = "%s %s %s" % (n, name, nodecls)
|
||||||
if issubclass(nodecls, PhysicalNode):
|
if issubclass(nodecls, PhysicalNode):
|
||||||
|
@ -713,101 +732,109 @@ class CoreBroker(ConfigurableManager):
|
||||||
self.addphys(n)
|
self.addphys(n)
|
||||||
|
|
||||||
# emulation server TLV specifies server
|
# emulation server TLV specifies server
|
||||||
server = msg.get_tlv(NodeTlvs.EMULATION_SERVER.value)
|
servername = message.get_tlv(NodeTlvs.EMULATION_SERVER.value)
|
||||||
|
server = self.getserverbyname(servername)
|
||||||
if server is not None:
|
if server is not None:
|
||||||
self.addnodemap(server, n)
|
self.addnodemap(server, n)
|
||||||
if server not in serverlist:
|
if server not in servers:
|
||||||
serverlist.append(server)
|
servers.add(server)
|
||||||
if serverfiletxt and self.session.master:
|
if serverfiletxt and self.session.master:
|
||||||
self.writenodeserver(serverfiletxt, server)
|
self.writenodeserver(serverfiletxt, server)
|
||||||
# hook to update coordinates of physical nodes
|
# hook to update coordinates of physical nodes
|
||||||
if n in self.physical_nodes:
|
if n in self.physical_nodes:
|
||||||
self.session.mobility.physnodeupdateposition(msg)
|
self.session.mobility.physnodeupdateposition(message)
|
||||||
return handle_locally, serverlist
|
return servers
|
||||||
|
|
||||||
def handlelinkmsg(self, msg):
|
def handlelinkmsg(self, message):
|
||||||
"""
|
"""
|
||||||
Determine and return the servers to which this link message should
|
Determine and return the servers to which this link message should
|
||||||
be forwarded. Also build tunnels between different servers or add
|
be forwarded. Also build tunnels between different servers or add
|
||||||
opaque data to the link message before forwarding.
|
opaque data to the link message before forwarding.
|
||||||
|
|
||||||
|
:param core.api.coreapi.CoreMessage message: message to handle
|
||||||
|
:return:
|
||||||
"""
|
"""
|
||||||
serverlist = []
|
servers = set()
|
||||||
handle_locally = False
|
handle_locally = False
|
||||||
|
|
||||||
# determine link message destination using non-network nodes
|
# determine link message destination using non-network nodes
|
||||||
nn = msg.node_numbers()
|
nn = message.node_numbers()
|
||||||
if nn[0] in self.network_nodes:
|
if nn[0] in self.network_nodes:
|
||||||
if nn[1] in self.network_nodes:
|
if nn[1] in self.network_nodes:
|
||||||
# two network nodes linked together - prevent loops caused by
|
# two network nodes linked together - prevent loops caused by
|
||||||
# the automatic tunnelling
|
# the automatic tunnelling
|
||||||
handle_locally = True
|
handle_locally = True
|
||||||
else:
|
else:
|
||||||
serverlist = self.getserversbynode(nn[1])
|
servers = self.getserversbynode(nn[1])
|
||||||
elif nn[1] in self.network_nodes:
|
elif nn[1] in self.network_nodes:
|
||||||
serverlist = self.getserversbynode(nn[0])
|
servers = self.getserversbynode(nn[0])
|
||||||
else:
|
else:
|
||||||
serverset1 = set(self.getserversbynode(nn[0]))
|
servers1 = self.getserversbynode(nn[0])
|
||||||
serverset2 = set(self.getserversbynode(nn[1]))
|
servers2 = self.getserversbynode(nn[1])
|
||||||
# nodes are on two different servers, build tunnels as needed
|
# nodes are on two different servers, build tunnels as needed
|
||||||
if serverset1 != serverset2:
|
if servers1 != servers2:
|
||||||
localn = None
|
localn = None
|
||||||
if len(serverset1) == 0 or len(serverset2) == 0:
|
if len(servers1) == 0 or len(servers2) == 0:
|
||||||
handle_locally = True
|
handle_locally = True
|
||||||
serverlist = list(serverset1 | serverset2)
|
servers = servers1.union(servers2)
|
||||||
host = None
|
host = None
|
||||||
# get the IP of remote server and decide which node number
|
# get the IP of remote server and decide which node number
|
||||||
# is for a local node
|
# is for a local node
|
||||||
for server in serverlist:
|
for server in servers:
|
||||||
(host, port, sock) = self.getserver(server)
|
host = server.host
|
||||||
if host is None:
|
if host is None:
|
||||||
# named server is local
|
# server is local
|
||||||
handle_locally = True
|
handle_locally = True
|
||||||
if server in serverset1:
|
if server in servers1:
|
||||||
localn = nn[0]
|
localn = nn[0]
|
||||||
else:
|
else:
|
||||||
localn = nn[1]
|
localn = nn[1]
|
||||||
if handle_locally and localn is None:
|
if handle_locally and localn is None:
|
||||||
# having no local node at this point indicates local node is
|
# having no local node at this point indicates local node is
|
||||||
# the one with the empty serverset
|
# the one with the empty server set
|
||||||
if len(serverset1) == 0:
|
if len(servers1) == 0:
|
||||||
localn = nn[0]
|
localn = nn[0]
|
||||||
elif len(serverset2) == 0:
|
elif len(servers2) == 0:
|
||||||
localn = nn[1]
|
localn = nn[1]
|
||||||
if host is None:
|
if host is None:
|
||||||
host = self.getlinkendpoint(msg, localn == nn[0])
|
host = self.getlinkendpoint(message, localn == nn[0])
|
||||||
if localn is None:
|
if localn is None:
|
||||||
msg = self.addlinkendpoints(msg, serverset1, serverset2)
|
message = self.addlinkendpoints(message, servers1, servers2)
|
||||||
elif msg.flags & MessageFlags.ADD.value:
|
elif message.flags & MessageFlags.ADD.value:
|
||||||
self.addtunnel(host, nn[0], nn[1], localn)
|
self.addtunnel(host, nn[0], nn[1], localn)
|
||||||
elif msg.flags & MessageFlags.DELETE.value:
|
elif message.flags & MessageFlags.DELETE.value:
|
||||||
self.deltunnel(nn[0], nn[1])
|
self.deltunnel(nn[0], nn[1])
|
||||||
handle_locally = False
|
|
||||||
else:
|
else:
|
||||||
serverlist = list(serverset1 | serverset2)
|
servers = servers1.union(servers2)
|
||||||
|
|
||||||
return handle_locally, serverlist, msg
|
return servers, message
|
||||||
|
|
||||||
def addlinkendpoints(self, msg, serverset1, serverset2):
|
def addlinkendpoints(self, message, servers1, servers2):
|
||||||
"""
|
"""
|
||||||
For a link message that is not handled locally, inform the remote
|
For a link message that is not handled locally, inform the remote
|
||||||
servers of the IP addresses used as tunnel endpoints by adding
|
servers of the IP addresses used as tunnel endpoints by adding
|
||||||
opaque data to the link message.
|
opaque data to the link message.
|
||||||
|
|
||||||
|
:param core.api.coreapi.CoreMessage message: message to link end points
|
||||||
|
:param servers1:
|
||||||
|
:param servers2:
|
||||||
|
:return:
|
||||||
"""
|
"""
|
||||||
ip1 = ""
|
ip1 = ""
|
||||||
for server in serverset1:
|
for server in servers1:
|
||||||
(host, port, sock) = self.getserver(server)
|
if server.host is not None:
|
||||||
if host is not None:
|
ip1 = server.host
|
||||||
ip1 = host
|
break
|
||||||
ip2 = ""
|
ip2 = ""
|
||||||
for server in serverset2:
|
for server in servers2:
|
||||||
(host, port, sock) = self.getserver(server)
|
if server.host is not None:
|
||||||
if host is not None:
|
ip2 = server.host
|
||||||
ip2 = host
|
break
|
||||||
tlvdata = msg.rawmsg[coreapi.CoreMessage.header_len:]
|
tlvdata = message.raw_message[coreapi.CoreMessage.header_len:]
|
||||||
tlvdata += coreapi.CoreLinkTlv.pack(LinkTlvs.OPAQUE.value, "%s:%s" % (ip1, ip2))
|
tlvdata += coreapi.CoreLinkTlv.pack(LinkTlvs.OPAQUE.value, "%s:%s" % (ip1, ip2))
|
||||||
newraw = coreapi.CoreLinkMessage.pack(msg.flags, tlvdata)
|
newraw = coreapi.CoreLinkMessage.pack(message.flags, tlvdata)
|
||||||
msghdr = newraw[:coreapi.CoreMessage.header_len]
|
msghdr = newraw[:coreapi.CoreMessage.header_len]
|
||||||
return coreapi.CoreLinkMessage(msg.flags, msghdr, tlvdata)
|
return coreapi.CoreLinkMessage(message.flags, msghdr, tlvdata)
|
||||||
|
|
||||||
def getlinkendpoint(self, msg, first_is_local):
|
def getlinkendpoint(self, msg, first_is_local):
|
||||||
"""
|
"""
|
||||||
|
@ -820,18 +847,14 @@ class CoreBroker(ConfigurableManager):
|
||||||
opaque = msg.get_tlv(LinkTlvs.OPAQUE.value)
|
opaque = msg.get_tlv(LinkTlvs.OPAQUE.value)
|
||||||
if opaque is not None:
|
if opaque is not None:
|
||||||
if first_is_local:
|
if first_is_local:
|
||||||
host = opaque.split(':')[1]
|
host = opaque.split(":")[1]
|
||||||
else:
|
else:
|
||||||
host = opaque.split(':')[0]
|
host = opaque.split(":")[0]
|
||||||
if host == "":
|
if host == "":
|
||||||
host = None
|
host = None
|
||||||
if host is None:
|
if host is None and self.session_handler.client_address != "":
|
||||||
# get IP address from API message sender (master)
|
# get IP address from API message sender (master)
|
||||||
self.session._handlerslock.acquire()
|
host = self.session_handler.client_address[0]
|
||||||
for h in self.session._handlers:
|
|
||||||
if h.client_address != "":
|
|
||||||
host = h.client_address[0]
|
|
||||||
self.session._handlerslock.release()
|
|
||||||
return host
|
return host
|
||||||
|
|
||||||
def handlerawmsg(self, msg):
|
def handlerawmsg(self, msg):
|
||||||
|
@ -843,35 +866,27 @@ class CoreBroker(ConfigurableManager):
|
||||||
msgcls = coreapi.CLASS_MAP[msgtype]
|
msgcls = coreapi.CLASS_MAP[msgtype]
|
||||||
return self.handle_message(msgcls(flags, hdr, msg[coreapi.CoreMessage.header_len:]))
|
return self.handle_message(msgcls(flags, hdr, msg[coreapi.CoreMessage.header_len:]))
|
||||||
|
|
||||||
def forwardmsg(self, message, serverlist, handle_locally):
|
def forwardmsg(self, message, servers):
|
||||||
"""
|
"""
|
||||||
Forward API message to all servers in serverlist; if an empty
|
Forward API message to all given servers.
|
||||||
host/port is encountered, set the handle_locally flag. Returns the
|
|
||||||
value of the handle_locally flag, which may be unchanged.
|
|
||||||
|
|
||||||
:param coreapi.CoreMessage message: core message to forward
|
Return True if an empty host/port is encountered, indicating
|
||||||
:param list serverlist: server list to forward to
|
the message should be handled locally.
|
||||||
:param bool handle_locally: used to determine if this should be handled locally
|
|
||||||
:return: should message be handled locally
|
:param core.api.coreapi.CoreMessage message: message to forward
|
||||||
:rtype: bool
|
:param list servers: server to forward message to
|
||||||
|
:return:
|
||||||
"""
|
"""
|
||||||
for server in serverlist:
|
handle_locally = len(servers) == 0
|
||||||
try:
|
for server in servers:
|
||||||
(host, port, sock) = self.getserver(server)
|
if server.host is None and server.port is None:
|
||||||
except KeyError:
|
|
||||||
# server not found, don't handle this message locally
|
|
||||||
logger.exception("broker could not find server %s, message with type %s dropped",
|
|
||||||
server, message.message_type)
|
|
||||||
continue
|
|
||||||
if host is None and port is None:
|
|
||||||
# local emulation server, handle this locally
|
# local emulation server, handle this locally
|
||||||
handle_locally = True
|
handle_locally = True
|
||||||
|
elif server.sock is None:
|
||||||
|
logger.info("server %s @ %s:%s is disconnected" % (
|
||||||
|
server.name, server.host, server.port))
|
||||||
else:
|
else:
|
||||||
if sock is None:
|
server.sock.send(message.raw_message)
|
||||||
logger.info("server %s @ %s:%s is disconnected", server, host, port)
|
|
||||||
else:
|
|
||||||
sock.send(message.raw_message)
|
|
||||||
|
|
||||||
return handle_locally
|
return handle_locally
|
||||||
|
|
||||||
def writeservers(self):
|
def writeservers(self):
|
||||||
|
@ -879,28 +894,24 @@ class CoreBroker(ConfigurableManager):
|
||||||
Write the server list to a text file in the session directory upon
|
Write the server list to a text file in the session directory upon
|
||||||
startup: /tmp/pycore.nnnnn/servers
|
startup: /tmp/pycore.nnnnn/servers
|
||||||
"""
|
"""
|
||||||
|
servers = self.getservers()
|
||||||
filename = os.path.join(self.session.session_dir, "servers")
|
filename = os.path.join(self.session.session_dir, "servers")
|
||||||
|
master = self.session_id_master
|
||||||
|
if master is None:
|
||||||
|
master = self.session.session_id
|
||||||
try:
|
try:
|
||||||
f = open(filename, "w")
|
with open(filename, "w") as f:
|
||||||
master = self.session_id_master
|
f.write("master=%s\n" % master)
|
||||||
if master is None:
|
for server in servers:
|
||||||
master = self.session.session_id
|
if server.name == "localhost":
|
||||||
f.write("master=%s\n" % master)
|
continue
|
||||||
self.servers_lock.acquire()
|
try:
|
||||||
for name in sorted(self.servers.keys()):
|
lhost, lport = server.sock.getsockname()
|
||||||
if name == "localhost":
|
except IOError:
|
||||||
continue
|
lhost, lport = None, None
|
||||||
(host, port, sock) = self.servers[name]
|
f.write("%s %s %s %s %s\n" % (server.name, server.host, server.port, lhost, lport))
|
||||||
try:
|
|
||||||
(lhost, lport) = sock.getsockname()
|
|
||||||
except:
|
|
||||||
lhost, lport = None, None
|
|
||||||
f.write("%s %s %s %s %s\n" % (name, host, port, lhost, lport))
|
|
||||||
f.close()
|
|
||||||
except IOError:
|
except IOError:
|
||||||
logger.exception("Error writing server list to the file: %s", filename)
|
logger.exception("error writing server list to the file: %s" % filename)
|
||||||
finally:
|
|
||||||
self.servers_lock.release()
|
|
||||||
|
|
||||||
def writenodeserver(self, nodestr, server):
|
def writenodeserver(self, nodestr, server):
|
||||||
"""
|
"""
|
||||||
|
@ -909,24 +920,45 @@ class CoreBroker(ConfigurableManager):
|
||||||
other machines, much like local nodes may be accessed via the
|
other machines, much like local nodes may be accessed via the
|
||||||
VnodeClient class.
|
VnodeClient class.
|
||||||
"""
|
"""
|
||||||
(host, port, sock) = self.getserver(server)
|
serverstr = "%s %s %s" % (server.name, server.host, server.port)
|
||||||
serverstr = "%s %s %s" % (server, host, port)
|
|
||||||
name = nodestr.split()[1]
|
name = nodestr.split()[1]
|
||||||
dirname = os.path.join(self.session.session_dir, name + ".conf")
|
dirname = os.path.join(self.session.session_dir, name + ".conf")
|
||||||
filename = os.path.join(dirname, "server")
|
filename = os.path.join(dirname, "server")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
os.makedirs(dirname)
|
os.makedirs(dirname)
|
||||||
except OSError:
|
except OSError:
|
||||||
|
# directory may already exist from previous distributed run
|
||||||
logger.exception("error creating directory: %s", dirname)
|
logger.exception("error creating directory: %s", dirname)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
f = open(filename, "w")
|
with open(filename, "w") as f:
|
||||||
f.write("%s\n%s\n" % (serverstr, nodestr))
|
f.write("%s\n%s\n" % (serverstr, nodestr))
|
||||||
f.close()
|
|
||||||
return True
|
|
||||||
except IOError:
|
except IOError:
|
||||||
msg = "Error writing server file '%s'" % filename
|
logger.exception("error writing server file %s for node %s" % (filename, name))
|
||||||
msg += "for node %s" % name
|
|
||||||
logger.exception(msg)
|
def local_instantiation_complete(self):
|
||||||
return False
|
"""
|
||||||
|
Set the local server"s instantiation-complete status to True.
|
||||||
|
"""
|
||||||
|
# TODO: do we really want to allow a localhost to not exist?
|
||||||
|
with self.servers_lock:
|
||||||
|
server = self.servers.get("localhost")
|
||||||
|
if server is not None:
|
||||||
|
server.instantiation_complete = True
|
||||||
|
|
||||||
|
if self.session.is_connected():
|
||||||
|
tlvdata = ""
|
||||||
|
tlvdata += coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.INSTANTIATION_COMPLETE.value)
|
||||||
|
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
||||||
|
self.session_handler.sendall(msg)
|
||||||
|
|
||||||
|
def instantiation_complete(self):
|
||||||
|
"""
|
||||||
|
Return True if all servers have completed instantiation, False
|
||||||
|
otherwise.
|
||||||
|
"""
|
||||||
|
with self.servers_lock:
|
||||||
|
for server in self.servers.itervalues():
|
||||||
|
if not server.instantiation_complete:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
|
@ -88,7 +88,7 @@ class EmaneManager(ConfigurableManager):
|
||||||
self.logversion()
|
self.logversion()
|
||||||
# model for global EMANE configuration options
|
# model for global EMANE configuration options
|
||||||
self.emane_config = EmaneGlobalModel(session, None)
|
self.emane_config = EmaneGlobalModel(session, None)
|
||||||
session.broker.handlers += (self.handledistributed,)
|
session.broker.handlers.add(self.handledistributed)
|
||||||
self.service = None
|
self.service = None
|
||||||
self._modelclsmap = {
|
self._modelclsmap = {
|
||||||
self.emane_config.name: self.emane_config
|
self.emane_config.name: self.emane_config
|
||||||
|
@ -458,22 +458,22 @@ class EmaneManager(ConfigurableManager):
|
||||||
self._objslock.release()
|
self._objslock.release()
|
||||||
|
|
||||||
for server in servers:
|
for server in servers:
|
||||||
if server == "localhost":
|
if server.name == "localhost":
|
||||||
continue
|
continue
|
||||||
(host, port, sock) = self.session.broker.getserver(server)
|
|
||||||
if sock is None:
|
if server.sock is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
platformid += 1
|
platformid += 1
|
||||||
typeflags = ConfigFlags.UPDATE.value
|
typeflags = ConfigFlags.UPDATE.value
|
||||||
values[names.index("platform_id_start")] = str(platformid)
|
values[names.index("platform_id_start")] = str(platformid)
|
||||||
values[names.index("nem_id_start")] = str(nemid)
|
values[names.index("nem_id_start")] = str(nemid)
|
||||||
msg = EmaneGlobalModel.config_data(flags=0, node_id=None, type_flags=typeflags, values=values)
|
msg = EmaneGlobalModel.config_data(flags=0, node_id=None, type_flags=typeflags, values=values)
|
||||||
sock.send(msg)
|
server.sock.send(msg)
|
||||||
# increment nemid for next server by number of interfaces
|
# increment nemid for next server by number of interfaces
|
||||||
self._ifccountslock.acquire()
|
with self._ifccountslock:
|
||||||
if server in self._ifccounts:
|
if server in self._ifccounts:
|
||||||
nemid += self._ifccounts[server]
|
nemid += self._ifccounts[server]
|
||||||
self._ifccountslock.release()
|
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -511,16 +511,19 @@ class EmaneManager(ConfigurableManager):
|
||||||
using the default list of prefixes.
|
using the default list of prefixes.
|
||||||
"""
|
"""
|
||||||
session = self.session
|
session = self.session
|
||||||
|
# slave server
|
||||||
if not session.master:
|
if not session.master:
|
||||||
return # slave server
|
return
|
||||||
servers = session.broker.getserverlist()
|
servers = session.broker.getservernames()
|
||||||
|
# not distributed
|
||||||
if len(servers) < 2:
|
if len(servers) < 2:
|
||||||
return # not distributed
|
return
|
||||||
prefix = session.config.get('controlnet')
|
prefix = session.config.get('controlnet')
|
||||||
prefix = getattr(session.options, 'controlnet', prefix)
|
prefix = getattr(session.options, 'controlnet', prefix)
|
||||||
prefixes = prefix.split()
|
prefixes = prefix.split()
|
||||||
|
# normal Config messaging will distribute controlnets
|
||||||
if len(prefixes) >= len(servers):
|
if len(prefixes) >= len(servers):
|
||||||
return # normal Config messaging will distribute controlnets
|
return
|
||||||
# this generates a config message having controlnet prefix assignments
|
# this generates a config message having controlnet prefix assignments
|
||||||
logger.info("Setting up default controlnet prefixes for distributed (%d configured)" % len(prefixes))
|
logger.info("Setting up default controlnet prefixes for distributed (%d configured)" % len(prefixes))
|
||||||
prefixes = ctrlnet.DEFAULT_PREFIX_LIST[0]
|
prefixes = ctrlnet.DEFAULT_PREFIX_LIST[0]
|
||||||
|
|
|
@ -277,6 +277,7 @@ class EventTypes(Enum):
|
||||||
FILE_SAVE = 12
|
FILE_SAVE = 12
|
||||||
SCHEDULED = 13
|
SCHEDULED = 13
|
||||||
RECONFIGURE = 14
|
RECONFIGURE = 14
|
||||||
|
INSTANTIATION_COMPLETE = 15
|
||||||
|
|
||||||
|
|
||||||
# Session Message TLV Types
|
# Session Message TLV Types
|
||||||
|
|
|
@ -53,7 +53,7 @@ class MobilityManager(ConfigurableManager):
|
||||||
# dummy node objects for tracking position of nodes on other servers
|
# dummy node objects for tracking position of nodes on other servers
|
||||||
self.phys = {}
|
self.phys = {}
|
||||||
self.physnets = {}
|
self.physnets = {}
|
||||||
self.session.broker.handlers += (self.physnodehandlelink,)
|
self.session.broker.handlers.add(self.physnodehandlelink)
|
||||||
|
|
||||||
def startup(self, node_ids=None):
|
def startup(self, node_ids=None):
|
||||||
"""
|
"""
|
||||||
|
@ -254,12 +254,13 @@ class MobilityManager(ConfigurableManager):
|
||||||
|
|
||||||
for nodenum in nodenums:
|
for nodenum in nodenums:
|
||||||
node = self.phys[nodenum]
|
node = self.phys[nodenum]
|
||||||
servers = self.session.broker.getserversbynode(nodenum)
|
# TODO: fix this bad logic, relating to depending on a break to get a valid server
|
||||||
(host, port, sock) = self.session.broker.getserver(servers[0])
|
for server in self.session.broker.getserversbynode(nodenum):
|
||||||
netif = self.session.broker.gettunnel(net.objid, IpAddress.to_int(host))
|
break
|
||||||
|
netif = self.session.broker.gettunnel(net.objid, IpAddress.to_int(server.host))
|
||||||
node.addnetif(netif, 0)
|
node.addnetif(netif, 0)
|
||||||
netif.node = node
|
netif.node = node
|
||||||
(x, y, z) = netif.node.position.get()
|
x, y, z = netif.node.position.get()
|
||||||
netif.poshook(netif, x, y, z)
|
netif.poshook(netif, x, y, z)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ class Sdt(object):
|
||||||
# node information for remote nodes not in session._objs
|
# node information for remote nodes not in session._objs
|
||||||
# local nodes also appear here since their obj may not exist yet
|
# local nodes also appear here since their obj may not exist yet
|
||||||
self.remotes = {}
|
self.remotes = {}
|
||||||
session.broker.handlers += (self.handledistributed,)
|
session.broker.handlers.add(self.handledistributed)
|
||||||
|
|
||||||
def is_enabled(self):
|
def is_enabled(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -225,6 +225,7 @@ class Session(object):
|
||||||
self.xen = XenConfigManager(session=self)
|
self.xen = XenConfigManager(session=self)
|
||||||
self.add_config_object(XenConfigManager.name, XenConfigManager.config_type, self.xen.configure)
|
self.add_config_object(XenConfigManager.name, XenConfigManager.config_type, self.xen.configure)
|
||||||
|
|
||||||
|
# setup sdt
|
||||||
self.sdt = Sdt(session=self)
|
self.sdt = Sdt(session=self)
|
||||||
|
|
||||||
# future parameters set by the GUI may go here
|
# future parameters set by the GUI may go here
|
||||||
|
@ -782,8 +783,9 @@ class Session(object):
|
||||||
# controlnet may be needed by some EMANE models
|
# controlnet may be needed by some EMANE models
|
||||||
self.add_remove_control_interface(node=None, remove=False)
|
self.add_remove_control_interface(node=None, remove=False)
|
||||||
|
|
||||||
|
# instantiate will be invoked again upon Emane configure
|
||||||
if self.emane.startup() == self.emane.NOT_READY:
|
if self.emane.startup() == self.emane.NOT_READY:
|
||||||
return # instantiate() will be invoked again upon Emane.configure()
|
return
|
||||||
|
|
||||||
# startup broker
|
# startup broker
|
||||||
self.broker.startup()
|
self.broker.startup()
|
||||||
|
@ -800,6 +802,9 @@ class Session(object):
|
||||||
# validate nodes
|
# validate nodes
|
||||||
self.validate_nodes()
|
self.validate_nodes()
|
||||||
|
|
||||||
|
# set broker local instantiation to complete
|
||||||
|
self.broker.local_instantiation_complete()
|
||||||
|
|
||||||
# assume either all nodes have booted already, or there are some
|
# assume either all nodes have booted already, or there are some
|
||||||
# nodes on slave servers that will be booted and those servers will
|
# nodes on slave servers that will be booted and those servers will
|
||||||
# send a node status response message
|
# send a node status response message
|
||||||
|
@ -851,7 +856,10 @@ class Session(object):
|
||||||
# return # do not have information on all nodes yet
|
# return # do not have information on all nodes yet
|
||||||
|
|
||||||
# information on all nodes has been received and they have been started enter the runtime state
|
# information on all nodes has been received and they have been started enter the runtime state
|
||||||
# TODO: more sophisticated checks to verify that all nodes and networks are running
|
|
||||||
|
# check to verify that all nodes and networks are running
|
||||||
|
if not self.broker.instantiation_complete():
|
||||||
|
return
|
||||||
|
|
||||||
# start event loop and set to runtime
|
# start event loop and set to runtime
|
||||||
self.event_loop.run()
|
self.event_loop.run()
|
||||||
|
@ -1029,7 +1037,7 @@ class Session(object):
|
||||||
prefix = prefixes[0]
|
prefix = prefixes[0]
|
||||||
else:
|
else:
|
||||||
# slave servers have their name and localhost in the serverlist
|
# slave servers have their name and localhost in the serverlist
|
||||||
servers = self.broker.getserverlist()
|
servers = self.broker.getservernames()
|
||||||
servers.remove("localhost")
|
servers.remove("localhost")
|
||||||
prefix = None
|
prefix = None
|
||||||
|
|
||||||
|
@ -1066,7 +1074,7 @@ class Session(object):
|
||||||
|
|
||||||
# tunnels between controlnets will be built with Broker.addnettunnels()
|
# tunnels between controlnets will be built with Broker.addnettunnels()
|
||||||
self.broker.addnet(object_id)
|
self.broker.addnet(object_id)
|
||||||
for server in self.broker.getserverlist():
|
for server in self.broker.getservers():
|
||||||
self.broker.addnodemap(server, object_id)
|
self.broker.addnodemap(server, object_id)
|
||||||
|
|
||||||
return control_net
|
return control_net
|
||||||
|
@ -1299,7 +1307,7 @@ class SessionConfig(ConfigurableManager, Configurable):
|
||||||
"""
|
"""
|
||||||
ConfigurableManager.__init__(self)
|
ConfigurableManager.__init__(self)
|
||||||
self.session = session
|
self.session = session
|
||||||
self.session.broker.handlers += (self.handle_distributed,)
|
self.session.broker.handlers.add(self.handle_distributed)
|
||||||
self.reset()
|
self.reset()
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
|
@ -1376,7 +1384,7 @@ class SessionConfig(ConfigurableManager, Configurable):
|
||||||
logger.warn("multiple controlnet prefixes do not exist")
|
logger.warn("multiple controlnet prefixes do not exist")
|
||||||
return
|
return
|
||||||
|
|
||||||
servers = self.session.broker.getserverlist()
|
servers = self.session.broker.getservernames()
|
||||||
if len(servers) < 2:
|
if len(servers) < 2:
|
||||||
logger.warn("not distributed")
|
logger.warn("not distributed")
|
||||||
return
|
return
|
||||||
|
|
|
@ -94,7 +94,7 @@ class CoreDeploymentWriter(object):
|
||||||
for n in nodelist:
|
for n in nodelist:
|
||||||
self.add_virtual_host(testhost, n)
|
self.add_virtual_host(testhost, n)
|
||||||
# TODO: handle other servers
|
# TODO: handle other servers
|
||||||
# servers = self.session.broker.getserverlist()
|
# servers = self.session.broker.getservernames()
|
||||||
# servers.remove('localhost')
|
# servers.remove('localhost')
|
||||||
|
|
||||||
def add_child_element(self, parent, tag_name):
|
def add_child_element(self, parent, tag_name):
|
||||||
|
|
|
@ -54,15 +54,15 @@ def cmd(node, exec_cmd):
|
||||||
exec_num += 1
|
exec_num += 1
|
||||||
|
|
||||||
# Now wait for the response
|
# Now wait for the response
|
||||||
h, p, sock = node.session.broker.servers["localhost"]
|
server = node.session.broker.servers["localhost"]
|
||||||
sock.settimeout(50.0)
|
server.sock.settimeout(50.0)
|
||||||
|
|
||||||
# receive messages until we get our execute response
|
# receive messages until we get our execute response
|
||||||
result = None
|
result = None
|
||||||
while True:
|
while True:
|
||||||
msghdr = sock.recv(coreapi.CoreMessage.header_len)
|
msghdr = server.sock.recv(coreapi.CoreMessage.header_len)
|
||||||
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
||||||
msgdata = sock.recv(msglen)
|
msgdata = server.sock.recv(msglen)
|
||||||
|
|
||||||
# If we get the right response return the results
|
# If we get the right response return the results
|
||||||
print "received response message: %s" % MessageTypes(msgtype)
|
print "received response message: %s" % MessageTypes(msgtype)
|
||||||
|
|
Loading…
Reference in a new issue