Reverted changes due to merge with github commits made on Oct 12. An issue with distributed deployment seem to have been introduced in those commits.
This commit is contained in:
parent
35356dc9c8
commit
5d0aa4ac1a
9 changed files with 345 additions and 348 deletions
|
@ -286,7 +286,6 @@ event_types = dict(enumerate([
|
|||
"CORE_EVENT_FILE_SAVE",
|
||||
"CORE_EVENT_SCHEDULED",
|
||||
"CORE_EVENT_RECONFIGURE",
|
||||
"CORE_EVENT_INSTANTIATION_COMPLETE",
|
||||
]))
|
||||
|
||||
enumdict(event_types)
|
||||
|
|
|
@ -25,29 +25,6 @@ if os.uname()[0] == "Linux":
|
|||
from core.netns.vif import GreTap
|
||||
from core.netns.vnet import GreTapBridge
|
||||
|
||||
class CoreServer(object):
|
||||
def __init__(self, name, host, port):
|
||||
self.name = name
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.sock = None
|
||||
self.instantiation_complete = False
|
||||
|
||||
def connect(self):
|
||||
assert self.sock is None
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
#sock.setblocking(0)
|
||||
try:
|
||||
sock.connect((self.host, self.port))
|
||||
except:
|
||||
sock.close()
|
||||
raise
|
||||
self.sock = sock
|
||||
|
||||
def close(self):
|
||||
if self.sock is not None:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
|
||||
class CoreBroker(ConfigurableManager):
|
||||
''' Member of pycore session class for handling global emulation server
|
||||
|
@ -71,12 +48,13 @@ class CoreBroker(ConfigurableManager):
|
|||
self.nodemap_lock = threading.Lock()
|
||||
# reference counts of nodes on servers
|
||||
self.nodecounts = { }
|
||||
# set of node numbers that are link-layer nodes (networks)
|
||||
self.nets = set()
|
||||
# set of node numbers that are PhysicalNode nodes
|
||||
self.phys = set()
|
||||
self.bootcount = 0
|
||||
# list of node numbers that are link-layer nodes (networks)
|
||||
self.nets = []
|
||||
# list of node numbers that are PhysicalNode nodes
|
||||
self.phys = []
|
||||
# allows for other message handlers to process API messages (e.g. EMANE)
|
||||
self.handlers = set()
|
||||
self.handlers = ()
|
||||
# dict with tunnel key to tunnel device mapping
|
||||
self.tunnels = {}
|
||||
self.dorecvloop = False
|
||||
|
@ -96,12 +74,14 @@ class CoreBroker(ConfigurableManager):
|
|||
'''
|
||||
with self.servers_lock:
|
||||
while len(self.servers) > 0:
|
||||
name, server = self.servers.popitem()
|
||||
if server.sock is not None:
|
||||
(server, v) = self.servers.popitem()
|
||||
(host, port, sock) = v
|
||||
if sock is None:
|
||||
continue
|
||||
if self.verbose:
|
||||
self.session.info("closing connection with %s @ %s:%s" % \
|
||||
(name, server.host, server.port))
|
||||
server.close()
|
||||
(server, host, port))
|
||||
sock.close()
|
||||
self.reset()
|
||||
self.dorecvloop = False
|
||||
if self.recvthread is not None:
|
||||
|
@ -112,13 +92,14 @@ class CoreBroker(ConfigurableManager):
|
|||
'''
|
||||
self.nodemap_lock.acquire()
|
||||
self.nodemap.clear()
|
||||
for server, count in self.nodecounts.iteritems():
|
||||
if count < 1:
|
||||
for server in self.nodecounts:
|
||||
if self.nodecounts[server] < 1:
|
||||
self.delserver(server)
|
||||
self.nodecounts.clear()
|
||||
self.bootcount = 0
|
||||
self.nodemap_lock.release()
|
||||
self.nets.clear()
|
||||
self.phys.clear()
|
||||
del self.nets[:]
|
||||
del self.phys[:]
|
||||
while len(self.tunnels) > 0:
|
||||
(key, gt) = self.tunnels.popitem()
|
||||
gt.shutdown()
|
||||
|
@ -147,32 +128,35 @@ class CoreBroker(ConfigurableManager):
|
|||
rlist = []
|
||||
with self.servers_lock:
|
||||
# build a socket list for select call
|
||||
for server in self.servers.itervalues():
|
||||
if server.sock is not None:
|
||||
rlist.append(server.sock)
|
||||
for name in self.servers:
|
||||
(h, p, sock) = self.servers[name]
|
||||
if sock is not None:
|
||||
rlist.append(sock.fileno())
|
||||
r, w, x = select.select(rlist, [], [], 1.0)
|
||||
for sock in r:
|
||||
server = self.getserverbysock(sock)
|
||||
if server is None:
|
||||
for sockfd in r:
|
||||
try:
|
||||
(h, p, sock, name) = self.getserverbysock(sockfd)
|
||||
except KeyError:
|
||||
# servers may have changed; loop again
|
||||
continue
|
||||
rcvlen = self.recv(server)
|
||||
break
|
||||
rcvlen = self.recv(sock, h)
|
||||
if rcvlen == 0:
|
||||
if self.verbose:
|
||||
msg = 'connection with %s @ %s:%s has closed' % \
|
||||
(server.name, server.host, server.port)
|
||||
self.session.info(msg)
|
||||
self.session.info("connection with %s @ %s:%s" \
|
||||
" has closed" % (name, h, p))
|
||||
self.servers[name] = (h, p, None)
|
||||
|
||||
def recv(self, server):
|
||||
|
||||
def recv(self, sock, host):
|
||||
''' Receive data on an emulation server socket and broadcast it to
|
||||
all connected session handlers. Returns the length of data recevied
|
||||
and forwarded. Return value of zero indicates the socket has closed
|
||||
and should be removed from the self.servers dict.
|
||||
'''
|
||||
msghdr = server.sock.recv(coreapi.CoreMessage.hdrsiz)
|
||||
msghdr = sock.recv(coreapi.CoreMessage.hdrsiz)
|
||||
if len(msghdr) == 0:
|
||||
# server disconnected
|
||||
server.close()
|
||||
sock.close()
|
||||
return 0
|
||||
if len(msghdr) != coreapi.CoreMessage.hdrsiz:
|
||||
if self.verbose:
|
||||
|
@ -181,31 +165,30 @@ class CoreBroker(ConfigurableManager):
|
|||
return len(msghdr)
|
||||
|
||||
msgtype, msgflags, msglen = coreapi.CoreMessage.unpackhdr(msghdr)
|
||||
msgdata = server.sock.recv(msglen)
|
||||
msgdata = sock.recv(msglen)
|
||||
data = msghdr + msgdata
|
||||
count = None
|
||||
# snoop exec response for remote interactive TTYs
|
||||
if msgtype == coreapi.CORE_API_EXEC_MSG and \
|
||||
msgflags & coreapi.CORE_API_TTY_FLAG:
|
||||
data = self.fixupremotetty(msghdr, msgdata, server.host)
|
||||
data = self.fixupremotetty(msghdr, msgdata, host)
|
||||
elif msgtype == coreapi.CORE_API_NODE_MSG:
|
||||
# snoop node delete response to decrement node counts
|
||||
if msgflags & coreapi.CORE_API_DEL_FLAG:
|
||||
msg = coreapi.CoreNodeMessage(msgflags, msghdr, msgdata)
|
||||
nodenum = msg.gettlv(coreapi.CORE_TLV_NODE_NUMBER)
|
||||
if nodenum is not None:
|
||||
count = self.delnodemap(server, nodenum)
|
||||
count = self.delnodemap(sock, nodenum)
|
||||
# snoop node add response to increment booted node count
|
||||
# (only CoreNodes send these response messages)
|
||||
elif msgflags & \
|
||||
(coreapi.CORE_API_ADD_FLAG | coreapi.CORE_API_LOC_FLAG):
|
||||
self.incrbootcount()
|
||||
self.session.checkruntime()
|
||||
elif msgtype == coreapi.CORE_API_LINK_MSG:
|
||||
# this allows green link lines for remote WLANs
|
||||
msg = coreapi.CoreLinkMessage(msgflags, msghdr, msgdata)
|
||||
self.session.sdt.handledistributed(msg)
|
||||
elif msgtype == coreapi.CORE_API_EVENT_MSG:
|
||||
msg = coreapi.CoreEventMessage(msgflags, msghdr, msgdata)
|
||||
eventtype = msg.gettlv(coreapi.CORE_TLV_EVENT_TYPE)
|
||||
if eventtype == coreapi.CORE_EVENT_INSTANTIATION_COMPLETE:
|
||||
server.instantiation_complete = True
|
||||
if self.instantiation_complete():
|
||||
self.session.checkruntime()
|
||||
|
||||
self.session.broadcastraw(None, data)
|
||||
if count is not None and count < 1:
|
||||
|
@ -213,98 +196,86 @@ class CoreBroker(ConfigurableManager):
|
|||
else:
|
||||
return len(data)
|
||||
|
||||
def local_instantiation_complete(self):
|
||||
'''\
|
||||
Set the local server's instantiation-complete status to True.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
server = self.servers.get('localhost')
|
||||
if server is not None:
|
||||
server.instantiation_complete = True
|
||||
|
||||
def instantiation_complete(self):
|
||||
'''\
|
||||
Return True if all servers have completed instantiation, False
|
||||
otherwise.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
for server in self.servers.itervalues():
|
||||
if not server.instantiation_complete:
|
||||
return False
|
||||
return True
|
||||
|
||||
def addserver(self, name, host, port):
|
||||
''' Add a new server, and try to connect to it. If we're already
|
||||
connected to this (host, port), then leave it alone. When host,port
|
||||
is None, do not try to connect.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
server = self.servers.get(name)
|
||||
if server is not None:
|
||||
if host == server.host and port == server.port and \
|
||||
server.sock is not None:
|
||||
self.servers_lock.acquire()
|
||||
if name in self.servers:
|
||||
(oldhost, oldport, sock) = self.servers[name]
|
||||
if host == oldhost or port == oldport:
|
||||
# leave this socket connected
|
||||
if sock is not None:
|
||||
self.servers_lock.release()
|
||||
return
|
||||
if self.verbose:
|
||||
self.session.info('closing connection with %s @ %s:%s' % \
|
||||
(name, server.host, server.port))
|
||||
server.close()
|
||||
del self.servers[name]
|
||||
if self.verbose:
|
||||
self.session.info('adding server %s @ %s:%s' % \
|
||||
if self.verbose and host is not None and sock is not None:
|
||||
self.session.info("closing connection with %s @ %s:%s" % \
|
||||
(name, host, port))
|
||||
server = CoreServer(name, host, port)
|
||||
if host is not None and port is not None:
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
self.servers_lock.release()
|
||||
if self.verbose and host is not None:
|
||||
self.session.info("adding server %s @ %s:%s" % (name, host, port))
|
||||
if host is None:
|
||||
sock = None
|
||||
else:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
#sock.setblocking(0)
|
||||
#error = sock.connect_ex((host, port))
|
||||
try:
|
||||
server.connect()
|
||||
except Exception as e:
|
||||
self.session.warn('error connecting to server %s:%s:\n\t%s' % \
|
||||
(host, port, e))
|
||||
if server.sock is not None:
|
||||
sock.connect((host, port))
|
||||
self.startrecvloop()
|
||||
self.servers[name] = server
|
||||
except Exception, e:
|
||||
self.session.warn("error connecting to server %s:%s:\n\t%s" % \
|
||||
(host, port, e))
|
||||
sock.close()
|
||||
sock = None
|
||||
self.servers_lock.acquire()
|
||||
self.servers[name] = (host, port, sock)
|
||||
self.servers_lock.release()
|
||||
|
||||
def delserver(self, server):
|
||||
def delserver(self, name):
|
||||
''' Remove a server and hang up any connection.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
try:
|
||||
s = self.servers.pop(server.name)
|
||||
assert s == server
|
||||
except KeyError:
|
||||
pass
|
||||
if server.sock is not None:
|
||||
self.servers_lock.acquire()
|
||||
if name not in self.servers:
|
||||
self.servers_lock.release()
|
||||
return
|
||||
(host, port, sock) = self.servers.pop(name)
|
||||
if sock is not None:
|
||||
if self.verbose:
|
||||
self.session.info("closing connection with %s @ %s:%s" % \
|
||||
(server.name, server.host, server.port))
|
||||
server.close()
|
||||
(name, host, port))
|
||||
sock.close()
|
||||
self.servers_lock.release()
|
||||
|
||||
def getserverbyname(self, name):
|
||||
''' Return the server object having the given name, or None.
|
||||
def getserver(self, name):
|
||||
''' Return the (host, port, sock) tuple, or raise a KeyError exception.
|
||||
'''
|
||||
if name not in self.servers:
|
||||
raise KeyError, "emulation server %s not found" % name
|
||||
return self.servers[name]
|
||||
|
||||
def getserverbysock(self, sockfd):
|
||||
''' Return a (host, port, sock, name) tuple based on socket file
|
||||
descriptor, or raise a KeyError exception.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
return self.servers.get(name)
|
||||
for name in self.servers:
|
||||
(host, port, sock) = self.servers[name]
|
||||
if sock is None:
|
||||
continue
|
||||
if sock.fileno() == sockfd:
|
||||
return (host, port, sock, name)
|
||||
raise KeyError, "socket fd %s not found" % sockfd
|
||||
|
||||
def getserverbysock(self, sock):
|
||||
''' Return the server object corresponding to the given socket,
|
||||
or None.
|
||||
def getserverlist(self):
|
||||
''' Return the list of server names (keys from self.servers).
|
||||
'''
|
||||
with self.servers_lock:
|
||||
for server in self.servers.itervalues():
|
||||
if server.sock == sock:
|
||||
return server
|
||||
return None
|
||||
|
||||
def getservers(self):
|
||||
'''Return a list of servers sorted by name.'''
|
||||
with self.servers_lock:
|
||||
return sorted(self.servers.values(), key = lambda x: x.name)
|
||||
|
||||
def getservernames(self):
|
||||
''' Return a sorted list of server names (keys from self.servers).
|
||||
'''
|
||||
with self.servers_lock:
|
||||
return sorted(self.servers.keys())
|
||||
serverlist = sorted(self.servers.keys())
|
||||
return serverlist
|
||||
|
||||
def tunnelkey(self, n1num, n2num):
|
||||
''' Compute a 32-bit key used to uniquely identify a GRE tunnel.
|
||||
|
@ -370,9 +341,10 @@ class CoreBroker(ConfigurableManager):
|
|||
return None
|
||||
hosts = []
|
||||
for server in servers:
|
||||
if server.host is None:
|
||||
(host, port, sock) = self.getserver(server)
|
||||
if host is None:
|
||||
continue
|
||||
hosts.append(server.host)
|
||||
hosts.append(host)
|
||||
if len(hosts) == 0:
|
||||
# get IP address from API message sender (master)
|
||||
self.session._handlerslock.acquire()
|
||||
|
@ -427,50 +399,78 @@ class CoreBroker(ConfigurableManager):
|
|||
def addnodemap(self, server, nodenum):
|
||||
''' Record a node number to emulation server mapping.
|
||||
'''
|
||||
with self.nodemap_lock:
|
||||
self.nodemap_lock.acquire()
|
||||
if nodenum in self.nodemap:
|
||||
if server in self.nodemap[nodenum]:
|
||||
self.nodemap_lock.release()
|
||||
return
|
||||
self.nodemap[nodenum].add(server)
|
||||
self.nodemap[nodenum].append(server)
|
||||
else:
|
||||
self.nodemap[nodenum] = {server}
|
||||
self.nodemap[nodenum] = [server,]
|
||||
if server in self.nodecounts:
|
||||
self.nodecounts[server] += 1
|
||||
else:
|
||||
self.nodecounts[server] = 1
|
||||
self.nodemap_lock.release()
|
||||
|
||||
def delnodemap(self, server, nodenum):
|
||||
def delnodemap(self, sock, nodenum):
|
||||
''' Remove a node number to emulation server mapping.
|
||||
Return the number of nodes left on this server.
|
||||
'''
|
||||
self.nodemap_lock.acquire()
|
||||
count = None
|
||||
with self.nodemap_lock:
|
||||
if nodenum not in self.nodemap:
|
||||
self.nodemap_lock.release()
|
||||
return count
|
||||
self.nodemap[nodenum].remove(server)
|
||||
found = False
|
||||
for server in self.nodemap[nodenum]:
|
||||
(host, port, srvsock) = self.getserver(server)
|
||||
if srvsock == sock:
|
||||
found = True
|
||||
break
|
||||
if server in self.nodecounts:
|
||||
count = self.nodecounts[server]
|
||||
if found:
|
||||
self.nodemap[nodenum].remove(server)
|
||||
if server in self.nodecounts:
|
||||
count -= 1
|
||||
self.nodecounts[server] = count
|
||||
self.nodemap_lock.release()
|
||||
return count
|
||||
|
||||
def getserversbynode(self, nodenum):
|
||||
''' Retrieve a set of emulation servers given a node number.
|
||||
def incrbootcount(self):
|
||||
''' Count a node that has booted.
|
||||
'''
|
||||
with self.nodemap_lock:
|
||||
self.bootcount += 1
|
||||
return self.bootcount
|
||||
|
||||
def getbootcount(self):
|
||||
''' Return the number of booted nodes.
|
||||
'''
|
||||
return self.bootcount
|
||||
|
||||
def getserversbynode(self, nodenum):
|
||||
''' Retrieve a list of emulation servers given a node number.
|
||||
'''
|
||||
self.nodemap_lock.acquire()
|
||||
if nodenum not in self.nodemap:
|
||||
return set()
|
||||
return self.nodemap[nodenum]
|
||||
self.nodemap_lock.release()
|
||||
return []
|
||||
r = self.nodemap[nodenum]
|
||||
self.nodemap_lock.release()
|
||||
return r
|
||||
|
||||
def addnet(self, nodenum):
|
||||
''' Add a node number to the list of link-layer nodes.
|
||||
'''
|
||||
self.nets.add(nodenum)
|
||||
if nodenum not in self.nets:
|
||||
self.nets.append(nodenum)
|
||||
|
||||
def addphys(self, nodenum):
|
||||
''' Add a node number to the list of physical nodes.
|
||||
'''
|
||||
self.phys.add(nodenum)
|
||||
if nodenum not in self.phys:
|
||||
self.phys.append(nodenum)
|
||||
|
||||
def configure_reset(self, msg):
|
||||
''' Ignore reset messages, because node delete responses may still
|
||||
|
@ -521,69 +521,67 @@ class CoreBroker(ConfigurableManager):
|
|||
and performs forwarding if required.
|
||||
Returning False indicates this message should be handled locally.
|
||||
'''
|
||||
servers = set()
|
||||
serverlist = []
|
||||
handle_locally = False
|
||||
# Do not forward messages when in definition state
|
||||
# (for e.g. configuring services)
|
||||
if self.session.getstate() == coreapi.CORE_EVENT_DEFINITION_STATE:
|
||||
return False
|
||||
handle_locally = True
|
||||
return not handle_locally
|
||||
# Decide whether message should be handled locally or forwarded, or both
|
||||
if msg.msgtype == coreapi.CORE_API_NODE_MSG:
|
||||
servers = self.handlenodemsg(msg)
|
||||
(handle_locally, serverlist) = self.handlenodemsg(msg)
|
||||
elif msg.msgtype == coreapi.CORE_API_EVENT_MSG:
|
||||
# broadcast events everywhere
|
||||
servers = self.getservers()
|
||||
serverlist = self.getserverlist()
|
||||
elif msg.msgtype == coreapi.CORE_API_CONF_MSG:
|
||||
# broadcast location and services configuration everywhere
|
||||
confobj = msg.gettlv(coreapi.CORE_TLV_CONF_OBJ)
|
||||
if confobj == "location" or confobj == "services" or \
|
||||
confobj == "session" or confobj == "all":
|
||||
servers = self.getservers()
|
||||
serverlist = self.getserverlist()
|
||||
elif msg.msgtype == coreapi.CORE_API_FILE_MSG:
|
||||
# broadcast hook scripts and custom service files everywhere
|
||||
filetype = msg.gettlv(coreapi.CORE_TLV_FILE_TYPE)
|
||||
if filetype is not None and \
|
||||
(filetype[:5] == "hook:" or filetype[:8] == "service:"):
|
||||
servers = self.getservers()
|
||||
serverlist = self.getserverlist()
|
||||
|
||||
if msg.msgtype == coreapi.CORE_API_LINK_MSG:
|
||||
# prepare a serverlist from two node numbers in link message
|
||||
servers, msg = self.handlelinkmsg(msg)
|
||||
elif len(servers) == 0:
|
||||
(handle_locally, serverlist, msg) = self.handlelinkmsg(msg)
|
||||
elif len(serverlist) == 0:
|
||||
# check for servers based on node numbers in all messages but link
|
||||
nn = msg.nodenumbers()
|
||||
if len(nn) == 0:
|
||||
return False
|
||||
servers = self.getserversbynode(nn[0])
|
||||
serverlist = self.getserversbynode(nn[0])
|
||||
|
||||
# allow other handlers to process this message (this is used
|
||||
# by e.g. EMANE to use the link add message to keep counts of
|
||||
# interfaces on other servers)
|
||||
if len(serverlist) == 0:
|
||||
handle_locally = True
|
||||
|
||||
# allow other handlers to process this message
|
||||
# (this is used by e.g. EMANE to use the link add message to keep counts
|
||||
# of interfaces on other servers)
|
||||
for handler in self.handlers:
|
||||
handler(msg)
|
||||
|
||||
# Perform any message forwarding
|
||||
handle_locally = self.forwardmsg(msg, servers)
|
||||
handle_locally = self.forwardmsg(msg, serverlist, handle_locally)
|
||||
return not handle_locally
|
||||
|
||||
def setupserver(self, servername):
|
||||
def setupserver(self, server):
|
||||
''' Send the appropriate API messages for configuring the specified
|
||||
emulation server.
|
||||
'''
|
||||
server = self.getserverbyname(servername)
|
||||
if server is None:
|
||||
msg = 'ignoring unknown server: \'%s\'' % servername
|
||||
self.session.warn(msg)
|
||||
return
|
||||
if server.sock is None or server.host is None or server.port is None:
|
||||
if self.verbose:
|
||||
msg = 'ignoring disconnected server: \'%s\'' % servername
|
||||
self.session.info(msg)
|
||||
(host, port, sock) = self.getserver(server)
|
||||
if host is None or sock is None:
|
||||
return
|
||||
# communicate this session's current state to the server
|
||||
tlvdata = coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
|
||||
self.session.getstate())
|
||||
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
||||
server.sock.send(msg)
|
||||
sock.send(msg)
|
||||
# send a Configuration message for the broker object and inform the
|
||||
# server of its local name
|
||||
tlvdata = ""
|
||||
|
@ -593,11 +591,11 @@ class CoreBroker(ConfigurableManager):
|
|||
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_DATA_TYPES,
|
||||
(coreapi.CONF_DATA_TYPE_STRING,))
|
||||
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES,
|
||||
"%s:%s:%s" % (server.name, server.host, server.port))
|
||||
"%s:%s:%s" % (server, host, port))
|
||||
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_SESSION,
|
||||
"%s" % self.session.sessionid)
|
||||
msg = coreapi.CoreConfMessage.pack(0, tlvdata)
|
||||
server.sock.send(msg)
|
||||
sock.send(msg)
|
||||
|
||||
@staticmethod
|
||||
def fixupremotetty(msghdr, msgdata, host):
|
||||
|
@ -628,7 +626,8 @@ class CoreBroker(ConfigurableManager):
|
|||
be forwarded. Also keep track of link-layer nodes and the mapping of
|
||||
nodes to servers.
|
||||
'''
|
||||
servers = set()
|
||||
serverlist = []
|
||||
handle_locally = False
|
||||
serverfiletxt = None
|
||||
# snoop Node Message for emulation server TLV and record mapping
|
||||
n = msg.tlvdata[coreapi.CORE_TLV_NODE_NUMBER]
|
||||
|
@ -639,21 +638,28 @@ class CoreBroker(ConfigurableManager):
|
|||
nodecls = coreapi.node_class(nodetype)
|
||||
except KeyError:
|
||||
self.session.warn("broker invalid node type %s" % nodetype)
|
||||
return servers
|
||||
return (False, serverlist)
|
||||
if nodecls is None:
|
||||
self.session.warn("broker unimplemented node type %s" % nodetype)
|
||||
return servers
|
||||
return (False, serverlist)
|
||||
if issubclass(nodecls, PyCoreNet) and \
|
||||
nodetype != coreapi.CORE_NODE_WLAN:
|
||||
# network node replicated on all servers; could be optimized
|
||||
# don't replicate WLANs, because ebtables rules won't work
|
||||
servers = self.getservers()
|
||||
serverlist = self.getserverlist()
|
||||
handle_locally = True
|
||||
self.addnet(n)
|
||||
for server in servers:
|
||||
for server in serverlist:
|
||||
self.addnodemap(server, n)
|
||||
# do not record server name for networks since network
|
||||
# nodes are replicated across all server
|
||||
return servers
|
||||
return (handle_locally, serverlist)
|
||||
if issubclass(nodecls, PyCoreNet) and \
|
||||
nodetype == coreapi.CORE_NODE_WLAN:
|
||||
# special case where remote WLANs not in session._objs, and no
|
||||
# node response message received, so they are counted here
|
||||
if msg.gettlv(coreapi.CORE_TLV_NODE_EMUSRV) is not None:
|
||||
self.incrbootcount()
|
||||
elif issubclass(nodecls, PyCoreNode):
|
||||
name = msg.gettlv(coreapi.CORE_TLV_NODE_NAME)
|
||||
if name:
|
||||
|
@ -663,25 +669,24 @@ class CoreBroker(ConfigurableManager):
|
|||
self.addphys(n)
|
||||
|
||||
# emulation server TLV specifies server
|
||||
servername = msg.gettlv(coreapi.CORE_TLV_NODE_EMUSRV)
|
||||
server = self.getserverbyname(servername)
|
||||
server = msg.gettlv(coreapi.CORE_TLV_NODE_EMUSRV)
|
||||
if server is not None:
|
||||
self.addnodemap(server, n)
|
||||
if server not in servers:
|
||||
servers.add(server)
|
||||
if server not in serverlist:
|
||||
serverlist.append(server)
|
||||
if serverfiletxt and self.session.master:
|
||||
self.writenodeserver(serverfiletxt, server)
|
||||
# hook to update coordinates of physical nodes
|
||||
if n in self.phys:
|
||||
self.session.mobility.physnodeupdateposition(msg)
|
||||
return servers
|
||||
return (handle_locally, serverlist)
|
||||
|
||||
def handlelinkmsg(self, msg):
|
||||
''' Determine and return the servers to which this link message should
|
||||
be forwarded. Also build tunnels between different servers or add
|
||||
opaque data to the link message before forwarding.
|
||||
'''
|
||||
servers = set()
|
||||
serverlist = []
|
||||
handle_locally = False
|
||||
|
||||
# determine link message destination using non-network nodes
|
||||
|
@ -692,65 +697,66 @@ class CoreBroker(ConfigurableManager):
|
|||
# the automatic tunnelling
|
||||
handle_locally = True
|
||||
else:
|
||||
servers = self.getserversbynode(nn[1])
|
||||
serverlist = self.getserversbynode(nn[1])
|
||||
elif nn[1] in self.nets:
|
||||
servers = self.getserversbynode(nn[0])
|
||||
serverlist = self.getserversbynode(nn[0])
|
||||
else:
|
||||
servers1 = self.getserversbynode(nn[0])
|
||||
servers2 = self.getserversbynode(nn[1])
|
||||
serverset1 = set(self.getserversbynode(nn[0]))
|
||||
serverset2 = set(self.getserversbynode(nn[1]))
|
||||
# nodes are on two different servers, build tunnels as needed
|
||||
if servers1 != servers2:
|
||||
if serverset1 != serverset2:
|
||||
localn = None
|
||||
if len(servers1) == 0 or len(servers2) == 0:
|
||||
if len(serverset1) == 0 or len(serverset2) == 0:
|
||||
handle_locally = True
|
||||
servers = servers1.union(servers2)
|
||||
serverlist = list(serverset1 | serverset2)
|
||||
host = None
|
||||
# get the IP of remote server and decide which node number
|
||||
# is for a local node
|
||||
for server in servers:
|
||||
host = server.host
|
||||
for server in serverlist:
|
||||
(host, port, sock) = self.getserver(server)
|
||||
if host is None:
|
||||
# server is local
|
||||
# named server is local
|
||||
handle_locally = True
|
||||
if server in servers1:
|
||||
if server in serverset1:
|
||||
localn = nn[0]
|
||||
else:
|
||||
localn = nn[1]
|
||||
if handle_locally and localn is None:
|
||||
# having no local node at this point indicates local node is
|
||||
# the one with the empty serverset
|
||||
if len(servers1) == 0:
|
||||
if len(serverset1) == 0:
|
||||
localn = nn[0]
|
||||
elif len(servers2) == 0:
|
||||
elif len(serverset2) == 0:
|
||||
localn = nn[1]
|
||||
if host is None:
|
||||
host = self.getlinkendpoint(msg, localn == nn[0])
|
||||
if localn is None:
|
||||
msg = self.addlinkendpoints(msg, servers1, servers2)
|
||||
msg = self.addlinkendpoints(msg, serverset1, serverset2)
|
||||
elif msg.flags & coreapi.CORE_API_ADD_FLAG:
|
||||
self.addtunnel(host, nn[0], nn[1], localn)
|
||||
elif msg.flags & coreapi.CORE_API_DEL_FLAG:
|
||||
self.deltunnel(nn[0], nn[1])
|
||||
handle_locally = False
|
||||
else:
|
||||
servers = servers1.union(servers2)
|
||||
serverlist = list(serverset1 | serverset2)
|
||||
|
||||
return servers, msg
|
||||
return (handle_locally, serverlist, msg)
|
||||
|
||||
def addlinkendpoints(self, msg, servers1, servers2):
|
||||
def addlinkendpoints(self, msg, serverset1, serverset2):
|
||||
''' For a link message that is not handled locally, inform the remote
|
||||
servers of the IP addresses used as tunnel endpoints by adding
|
||||
opaque data to the link message.
|
||||
'''
|
||||
ip1 = ''
|
||||
for server in servers1:
|
||||
if server.host is not None:
|
||||
ip1 = server.host
|
||||
break
|
||||
ip2 = ''
|
||||
for server in servers2:
|
||||
if server.host is not None:
|
||||
ip2 = server.host
|
||||
break
|
||||
ip1 = ""
|
||||
for server in serverset1:
|
||||
(host, port, sock) = self.getserver(server)
|
||||
if host is not None:
|
||||
ip1 = host
|
||||
ip2 = ""
|
||||
for server in serverset2:
|
||||
(host, port, sock) = self.getserver(server)
|
||||
if host is not None:
|
||||
ip2 = host
|
||||
tlvdata = msg.rawmsg[coreapi.CoreMessage.hdrsiz:]
|
||||
tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_OPAQUE,
|
||||
"%s:%s" % (ip1, ip2))
|
||||
|
@ -790,49 +796,58 @@ class CoreBroker(ConfigurableManager):
|
|||
msgcls = coreapi.msg_class(msgtype)
|
||||
return self.handlemsg(msgcls(flags, hdr, msg[coreapi.CoreMessage.hdrsiz:]))
|
||||
|
||||
def forwardmsg(self, msg, servers):
|
||||
''' Forward API message to all given servers.
|
||||
|
||||
Return True if an empty host/port is encountered, indicating
|
||||
the message should be handled locally.
|
||||
def forwardmsg(self, msg, serverlist, handle_locally):
|
||||
''' Forward API message to all servers in serverlist; if an empty
|
||||
host/port is encountered, set the handle_locally flag. Returns the
|
||||
value of the handle_locally flag, which may be unchanged.
|
||||
'''
|
||||
handle_locally = len(servers) == 0
|
||||
for server in servers:
|
||||
if server.host is None and server.port is None:
|
||||
for server in serverlist:
|
||||
try:
|
||||
(host, port, sock) = self.getserver(server)
|
||||
except KeyError:
|
||||
# server not found, don't handle this message locally
|
||||
self.session.info("broker could not find server %s, message " \
|
||||
"with type %s dropped" % \
|
||||
(server, msg.msgtype))
|
||||
continue
|
||||
if host is None and port is None:
|
||||
# local emulation server, handle this locally
|
||||
handle_locally = True
|
||||
elif server.sock is None:
|
||||
self.session.info("server %s @ %s:%s is disconnected" % \
|
||||
(server.name, server.host, server.port))
|
||||
else:
|
||||
server.sock.send(msg.rawmsg)
|
||||
if sock is None:
|
||||
self.session.info("server %s @ %s:%s is disconnected" % \
|
||||
(server, host, port))
|
||||
else:
|
||||
sock.send(msg.rawmsg)
|
||||
return handle_locally
|
||||
|
||||
def writeservers(self):
|
||||
''' Write the server list to a text file in the session directory upon
|
||||
startup: /tmp/pycore.nnnnn/servers
|
||||
'''
|
||||
servers = self.getservers()
|
||||
filename = os.path.join(self.session.sessiondir, "servers")
|
||||
try:
|
||||
f = open(filename, "w")
|
||||
master = self.session_id_master
|
||||
if master is None:
|
||||
master = self.session.sessionid
|
||||
try:
|
||||
with open(filename, 'w') as f:
|
||||
f.write("master=%s\n" % master)
|
||||
for server in servers:
|
||||
if server.name == "localhost":
|
||||
self.servers_lock.acquire()
|
||||
for name in sorted(self.servers.keys()):
|
||||
if name == "localhost":
|
||||
continue
|
||||
(host, port, sock) = self.servers[name]
|
||||
try:
|
||||
(lhost, lport) = server.sock.getsockname()
|
||||
(lhost, lport) = sock.getsockname()
|
||||
except:
|
||||
lhost, lport = None, None
|
||||
f.write('%s %s %s %s %s\n' % (server.name, server.host,
|
||||
server.port, lhost, lport))
|
||||
except Exception as e:
|
||||
msg = 'Error writing server list to the file: \'%s\'\n%s' % \
|
||||
(filename, e)
|
||||
self.session.warn(msg)
|
||||
f.write("%s %s %s %s %s\n" % (name, host, port, lhost, lport))
|
||||
f.close()
|
||||
except Exception, e:
|
||||
self.session.warn("Error writing server list to the file: %s\n%s" \
|
||||
% (filename, e))
|
||||
finally:
|
||||
self.servers_lock.release()
|
||||
|
||||
def writenodeserver(self, nodestr, server):
|
||||
''' Creates a /tmp/pycore.nnnnn/nX.conf/server file having the node
|
||||
|
@ -840,7 +855,8 @@ class CoreBroker(ConfigurableManager):
|
|||
other machines, much like local nodes may be accessed via the
|
||||
VnodeClient class.
|
||||
'''
|
||||
serverstr = "%s %s %s" % (server.name, server.host, server.port)
|
||||
(host, port, sock) = self.getserver(server)
|
||||
serverstr = "%s %s %s" % (server, host, port)
|
||||
name = nodestr.split()[1]
|
||||
dirname = os.path.join(self.session.sessiondir, name + ".conf")
|
||||
filename = os.path.join(dirname, "server")
|
||||
|
@ -850,9 +866,14 @@ class CoreBroker(ConfigurableManager):
|
|||
# directory may already exist from previous distributed run
|
||||
pass
|
||||
try:
|
||||
with open(filename, 'w') as f:
|
||||
f.write('%s\n%s\n' % (serverstr, nodestr))
|
||||
except Exception as e:
|
||||
msg = 'Error writing server file \'%s\' for node %s:\n%s' % \
|
||||
(filename, name, e)
|
||||
f = open(filename, "w")
|
||||
f.write("%s\n%s\n" % (serverstr, nodestr))
|
||||
f.close()
|
||||
return True
|
||||
except Exception, e:
|
||||
msg = "Error writing server file '%s'" % filename
|
||||
msg += "for node %s:\n%s" % (name, e)
|
||||
self.session.warn(msg)
|
||||
return False
|
||||
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ class Emane(ConfigurableManager):
|
|||
self.logversion()
|
||||
# model for global EMANE configuration options
|
||||
self.emane_config = EmaneGlobalModel(session, None, self.verbose)
|
||||
session.broker.handlers.add(self.handledistributed)
|
||||
session.broker.handlers += (self.handledistributed, )
|
||||
self.loadmodels()
|
||||
self.service = None
|
||||
|
||||
|
@ -455,11 +455,11 @@ class Emane(ConfigurableManager):
|
|||
servers.append(s)
|
||||
self._objslock.release()
|
||||
|
||||
servers.sort(key = lambda x: x.name)
|
||||
for server in servers:
|
||||
if server.name == "localhost":
|
||||
if server == "localhost":
|
||||
continue
|
||||
if server.sock is None:
|
||||
(host, port, sock) = self.session.broker.getserver(server)
|
||||
if sock is None:
|
||||
continue
|
||||
platformid += 1
|
||||
typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE
|
||||
|
@ -467,11 +467,12 @@ class Emane(ConfigurableManager):
|
|||
values[names.index("nem_id_start")] = str(nemid)
|
||||
msg = EmaneGlobalModel.toconfmsg(flags=0, nodenum=None,
|
||||
typeflags=typeflags, values=values)
|
||||
server.sock.send(msg)
|
||||
sock.send(msg)
|
||||
# increment nemid for next server by number of interfaces
|
||||
with self._ifccountslock:
|
||||
self._ifccountslock.acquire()
|
||||
if server in self._ifccounts:
|
||||
nemid += self._ifccounts[server]
|
||||
self._ifccountslock.release()
|
||||
|
||||
return False
|
||||
|
||||
|
@ -510,7 +511,7 @@ class Emane(ConfigurableManager):
|
|||
session = self.session
|
||||
if not session.master:
|
||||
return # slave server
|
||||
servers = session.broker.getservernames()
|
||||
servers = session.broker.getserverlist()
|
||||
if len(servers) < 2:
|
||||
return # not distributed
|
||||
prefix = session.cfg.get('controlnet')
|
||||
|
@ -1154,22 +1155,6 @@ class Emane(ConfigurableManager):
|
|||
self.session.sdt.updatenodegeo(node.objid, lat, long, alt)
|
||||
return True
|
||||
|
||||
def emanerunning(self, node):
|
||||
'''\
|
||||
Return True if an EMANE process associated with the given node
|
||||
is running, False otherwise.
|
||||
'''
|
||||
status = -1
|
||||
cmd = ['pkill', '-0', '-x', 'emane']
|
||||
try:
|
||||
if self.version < self.EMANE092:
|
||||
status = subprocess.call(cmd)
|
||||
else:
|
||||
status = node.cmd(cmd, wait=True)
|
||||
except:
|
||||
pass
|
||||
return status == 0
|
||||
|
||||
def emane_version():
|
||||
'Return the locally installed EMANE version identifier and string.'
|
||||
cmd = ('emane', '--version')
|
||||
|
|
|
@ -87,7 +87,7 @@ class CoreDeploymentWriter(object):
|
|||
for n in nodelist:
|
||||
self.add_virtual_host(testhost, n)
|
||||
# TODO: handle other servers
|
||||
# servers = self.session.broker.getservernames()
|
||||
# servers = self.session.broker.getserverlist()
|
||||
# servers.remove('localhost')
|
||||
|
||||
def add_child_element(self, parent, tagName):
|
||||
|
|
|
@ -33,7 +33,7 @@ class MobilityManager(ConfigurableManager):
|
|||
# dummy node objects for tracking position of nodes on other servers
|
||||
self.phys = {}
|
||||
self.physnets = {}
|
||||
self.session.broker.handlers.add(self.physnodehandlelink)
|
||||
self.session.broker.handlers += (self.physnodehandlelink, )
|
||||
self.register()
|
||||
|
||||
def startup(self, nodenums=None):
|
||||
|
@ -237,10 +237,9 @@ class MobilityManager(ConfigurableManager):
|
|||
return
|
||||
for nodenum in nodenums:
|
||||
node = self.phys[nodenum]
|
||||
for server in self.session.broker.getserversbynode(nodenum):
|
||||
break
|
||||
netif = self.session.broker.gettunnel(net.objid,
|
||||
IPAddr.toint(server.host))
|
||||
servers = self.session.broker.getserversbynode(nodenum)
|
||||
(host, port, sock) = self.session.broker.getserver(servers[0])
|
||||
netif = self.session.broker.gettunnel(net.objid, IPAddr.toint(host))
|
||||
node.addnetif(netif, 0)
|
||||
netif.node = node
|
||||
(x,y,z) = netif.node.position.get()
|
||||
|
|
|
@ -116,19 +116,7 @@ class TunTap(PyCoreNetIf):
|
|||
def nodedevexists():
|
||||
cmd = (IP_BIN, 'link', 'show', self.name)
|
||||
return self.node.cmd(cmd)
|
||||
count = 0
|
||||
while True:
|
||||
try:
|
||||
self.waitfor(nodedevexists)
|
||||
break
|
||||
except RuntimeError:
|
||||
# check if this is an EMANE interface; if so, continue
|
||||
# waiting if EMANE is still running
|
||||
if count < 5 and isinstance(self.net, EmaneNode) and \
|
||||
self.node.session.emane.emanerunning(self.node):
|
||||
count += 1
|
||||
else:
|
||||
raise
|
||||
|
||||
def install(self):
|
||||
''' Install this TAP into its namespace. This is not done from the
|
||||
|
|
|
@ -50,7 +50,7 @@ class Sdt(object):
|
|||
# node information for remote nodes not in session._objs
|
||||
# local nodes also appear here since their obj may not exist yet
|
||||
self.remotes = {}
|
||||
session.broker.handlers.add(self.handledistributed)
|
||||
session.broker.handlers += (self.handledistributed, )
|
||||
|
||||
def is_enabled(self):
|
||||
''' Check for 'enablesdt' session option. Return False by default if
|
||||
|
|
|
@ -638,16 +638,9 @@ class Session(object):
|
|||
# allow time for processes to start
|
||||
time.sleep(0.125)
|
||||
self.validatenodes()
|
||||
self.broker.local_instantiation_complete()
|
||||
if self.isconnected():
|
||||
tlvdata = ''
|
||||
tlvdata += coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
|
||||
coreapi.CORE_EVENT_INSTANTIATION_COMPLETE)
|
||||
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
||||
self.broadcastraw(None, msg)
|
||||
# assume either all nodes have booted already, or there are some
|
||||
# nodes on slave servers that will be booted and those servers will
|
||||
# send a status response message
|
||||
# send a node status response message
|
||||
self.checkruntime()
|
||||
|
||||
def getnodecount(self):
|
||||
|
@ -680,9 +673,22 @@ class Session(object):
|
|||
return
|
||||
if self.getstate() == coreapi.CORE_EVENT_RUNTIME_STATE:
|
||||
return
|
||||
# check if all servers have completed instantiation
|
||||
if not self.broker.instantiation_complete():
|
||||
return
|
||||
session_node_count = int(self.node_count)
|
||||
nc = self.getnodecount()
|
||||
# count booted nodes not emulated on this server
|
||||
# TODO: let slave server determine RUNTIME and wait for Event Message
|
||||
# broker.getbootocunt() counts all CoreNodes from status reponse
|
||||
# messages, plus any remote WLANs; remote EMANE, hub, switch, etc.
|
||||
# are already counted in self._objs
|
||||
nc += self.broker.getbootcount()
|
||||
self.info("Checking for runtime with %d of %d session nodes" % \
|
||||
(nc, session_node_count))
|
||||
if nc < session_node_count:
|
||||
return # do not have information on all nodes yet
|
||||
# information on all nodes has been received and they have been started
|
||||
# enter the runtime state
|
||||
# TODO: more sophisticated checks to verify that all nodes and networks
|
||||
# are running
|
||||
state = coreapi.CORE_EVENT_RUNTIME_STATE
|
||||
self.evq.run()
|
||||
self.setstate(state, info=True, sendevent=True)
|
||||
|
@ -884,7 +890,7 @@ class Session(object):
|
|||
prefix = prefixes[0]
|
||||
else:
|
||||
# slave servers have their name and localhost in the serverlist
|
||||
servers = self.broker.getservernames()
|
||||
servers = self.broker.getserverlist()
|
||||
servers.remove('localhost')
|
||||
prefix = None
|
||||
for server_prefix in prefixes:
|
||||
|
@ -921,7 +927,7 @@ class Session(object):
|
|||
|
||||
# tunnels between controlnets will be built with Broker.addnettunnels()
|
||||
self.broker.addnet(oid)
|
||||
for server in self.broker.getservers():
|
||||
for server in self.broker.getserverlist():
|
||||
self.broker.addnodemap(server, oid)
|
||||
|
||||
return ctrlnet
|
||||
|
@ -1126,7 +1132,7 @@ class SessionConfig(ConfigurableManager, Configurable):
|
|||
|
||||
def __init__(self, session):
|
||||
ConfigurableManager.__init__(self, session)
|
||||
session.broker.handlers.add(self.handledistributed)
|
||||
session.broker.handlers += (self.handledistributed, )
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
|
@ -1184,7 +1190,7 @@ class SessionConfig(ConfigurableManager, Configurable):
|
|||
controlnets = value.split()
|
||||
if len(controlnets) < 2:
|
||||
return # multiple controlnet prefixes do not exist
|
||||
servers = self.session.broker.getservernames()
|
||||
servers = self.session.broker.getserverlist()
|
||||
if len(servers) < 2:
|
||||
return # not distributed
|
||||
servers.remove("localhost")
|
||||
|
|
|
@ -62,4 +62,3 @@ emane_models = RfPipe, Ieee80211abg, CommEffect, Bypass
|
|||
#emane_log_level = 2
|
||||
emane_realtime = True
|
||||
|
||||
aux_request_handler = core.addons.api2handler.CoreApi2RequestHandler:12222
|
Loading…
Reference in a new issue