diff --git a/daemon/core/broker.py b/daemon/core/broker.py index de06e49a..a927c19b 100644 --- a/daemon/core/broker.py +++ b/daemon/core/broker.py @@ -24,7 +24,29 @@ from core.conf import ConfigurableManager if os.uname()[0] == "Linux": from core.netns.vif import GreTap from core.netns.vnet import GreTapBridge - + +class CoreServer(object): + def __init__(self, name, host, port): + self.name = name + self.host = host + self.port = port + self.sock = None + + def connect(self): + assert self.sock is None + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + #sock.setblocking(0) + try: + sock.connect((self.host, self.port)) + except: + sock.close() + raise + self.sock = sock + + def close(self): + if self.sock is not None: + self.sock.close() + self.sock = None class CoreBroker(ConfigurableManager): ''' Member of pycore session class for handling global emulation server @@ -47,7 +69,7 @@ class CoreBroker(ConfigurableManager): # this lock also protects self.nodecounts self.nodemap_lock = threading.Lock() # reference counts of nodes on servers - self.nodecounts = { } + self.nodecounts = {} self.bootcount = 0 # set of node numbers that are link-layer nodes (networks) self.nets = set() @@ -67,21 +89,19 @@ class CoreBroker(ConfigurableManager): ''' self.addnettunnels() self.writeservers() - + def shutdown(self): - ''' Close all active sockets; called when the session enters the + ''' Close all active sockets; called when the session enters the data collect state ''' with self.servers_lock: while len(self.servers) > 0: - (server, v) = self.servers.popitem() - (host, port, sock) = v - if sock is None: - continue - if self.verbose: - self.session.info("closing connection with %s @ %s:%s" % \ - (server, host, port)) - sock.close() + name, server = self.servers.popitem() + if server.sock is not None: + if self.verbose: + self.session.info("closing connection with %s @ %s:%s" % \ + (name, server.host, server.port)) + server.close() self.reset() self.dorecvloop = False if self.recvthread is not None: @@ -92,8 +112,8 @@ class CoreBroker(ConfigurableManager): ''' self.nodemap_lock.acquire() self.nodemap.clear() - for server in self.nodecounts: - if self.nodecounts[server] < 1: + for server, count in self.nodecounts.iteritems(): + if count < 1: self.delserver(server) self.nodecounts.clear() self.bootcount = 0 @@ -128,35 +148,32 @@ class CoreBroker(ConfigurableManager): rlist = [] with self.servers_lock: # build a socket list for select call - for name in self.servers: - (h, p, sock) = self.servers[name] - if sock is not None: - rlist.append(sock.fileno()) + for server in self.servers.itervalues(): + if server.sock is not None: + rlist.append(server.sock) r, w, x = select.select(rlist, [], [], 1.0) - for sockfd in r: - try: - (h, p, sock, name) = self.getserverbysock(sockfd) - except KeyError: + for sock in r: + server = self.getserverbysock(sock) + if server is None: # servers may have changed; loop again - break - rcvlen = self.recv(sock, h) + continue + rcvlen = self.recv(server) if rcvlen == 0: if self.verbose: - self.session.info("connection with %s @ %s:%s" \ - " has closed" % (name, h, p)) - self.servers[name] = (h, p, None) + msg = 'connection with %s @ %s:%s has closed' % \ + (server.name, server.host, server.port) + self.session.info(msg) - - def recv(self, sock, host): + def recv(self, server): ''' Receive data on an emulation server socket and broadcast it to all connected session handlers. Returns the length of data recevied and forwarded. Return value of zero indicates the socket has closed and should be removed from the self.servers dict. ''' - msghdr = sock.recv(coreapi.CoreMessage.hdrsiz) + msghdr = server.sock.recv(coreapi.CoreMessage.hdrsiz) if len(msghdr) == 0: # server disconnected - sock.close() + server.close() return 0 if len(msghdr) != coreapi.CoreMessage.hdrsiz: if self.verbose: @@ -165,20 +182,20 @@ class CoreBroker(ConfigurableManager): return len(msghdr) msgtype, msgflags, msglen = coreapi.CoreMessage.unpackhdr(msghdr) - msgdata = sock.recv(msglen) + msgdata = server.sock.recv(msglen) data = msghdr + msgdata count = None # snoop exec response for remote interactive TTYs if msgtype == coreapi.CORE_API_EXEC_MSG and \ msgflags & coreapi.CORE_API_TTY_FLAG: - data = self.fixupremotetty(msghdr, msgdata, host) + data = self.fixupremotetty(msghdr, msgdata, server.host) elif msgtype == coreapi.CORE_API_NODE_MSG: # snoop node delete response to decrement node counts if msgflags & coreapi.CORE_API_DEL_FLAG: msg = coreapi.CoreNodeMessage(msgflags, msghdr, msgdata) nodenum = msg.gettlv(coreapi.CORE_TLV_NODE_NUMBER) if nodenum is not None: - count = self.delnodemap(sock, nodenum) + count = self.delnodemap(server, nodenum) # snoop node add response to increment booted node count # (only CoreNodes send these response messages) elif msgflags & \ @@ -201,82 +218,74 @@ class CoreBroker(ConfigurableManager): connected to this (host, port), then leave it alone. When host,port is None, do not try to connect. ''' - self.servers_lock.acquire() - if name in self.servers: - (oldhost, oldport, sock) = self.servers[name] - if host == oldhost or port == oldport: - # leave this socket connected - if sock is not None: - self.servers_lock.release() + with self.servers_lock: + server = self.servers.get(name) + if server is not None: + if host == server.host and port == server.port and \ + server.sock is not None: + # leave this socket connected return - if self.verbose and host is not None and sock is not None: - self.session.info("closing connection with %s @ %s:%s" % \ + if self.verbose: + self.session.info('closing connection with %s @ %s:%s' % \ + (name, server.host, server.port)) + server.close() + del self.servers[name] + if self.verbose: + self.session.info('adding server %s @ %s:%s' % \ (name, host, port)) - if sock is not None: - sock.close() - self.servers_lock.release() - if self.verbose and host is not None: - self.session.info("adding server %s @ %s:%s" % (name, host, port)) - if host is None: - sock = None - else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - #sock.setblocking(0) - #error = sock.connect_ex((host, port)) - try: - sock.connect((host, port)) - self.startrecvloop() - except Exception, e: - self.session.warn("error connecting to server %s:%s:\n\t%s" % \ - (host, port, e)) - sock.close() - sock = None - self.servers_lock.acquire() - self.servers[name] = (host, port, sock) - self.servers_lock.release() - - def delserver(self, name): + server = CoreServer(name, host, port) + if host is not None and port is not None: + try: + server.connect() + except Exception as e: + self.session.warn('error connecting to server %s:%s:\n\t%s' % \ + (host, port, e)) + if server.sock is not None: + self.startrecvloop() + self.servers[name] = server + + def delserver(self, server): ''' Remove a server and hang up any connection. ''' - self.servers_lock.acquire() - if name not in self.servers: - self.servers_lock.release() - return - (host, port, sock) = self.servers.pop(name) - if sock is not None: + with self.servers_lock: + try: + s = self.servers.pop(server.name) + assert s == server + except KeyError: + pass + if server.sock is not None: if self.verbose: self.session.info("closing connection with %s @ %s:%s" % \ - (name, host, port)) - sock.close() - self.servers_lock.release() + (server.name, server.host, server.port)) + server.close() - def getserver(self, name): - ''' Return the (host, port, sock) tuple, or raise a KeyError exception. - ''' - if name not in self.servers: - raise KeyError, "emulation server %s not found" % name - return self.servers[name] - - def getserverbysock(self, sockfd): - ''' Return a (host, port, sock, name) tuple based on socket file - descriptor, or raise a KeyError exception. + def getserverbyname(self, name): + ''' Return the server object having the given name, or None. ''' with self.servers_lock: - for name in self.servers: - (host, port, sock) = self.servers[name] - if sock is None: - continue - if sock.fileno() == sockfd: - return (host, port, sock, name) - raise KeyError, "socket fd %s not found" % sockfd - - def getserverlist(self): - ''' Return the list of server names (keys from self.servers). + return self.servers.get(name) + + def getserverbysock(self, sock): + ''' Return the server object corresponding to the given socket, + or None. ''' with self.servers_lock: - serverlist = sorted(self.servers.keys()) - return serverlist - + for server in self.servers.itervalues(): + if server.sock == sock: + return server + return None + + def getservers(self): + '''Return a list of servers sorted by name.''' + with self.servers_lock: + return sorted(self.servers.values(), key = lambda x: x.name) + + def getservernames(self): + ''' Return a sorted list of server names (keys from self.servers). + ''' + with self.servers_lock: + return sorted(self.servers.keys()) + def tunnelkey(self, n1num, n2num): ''' Compute a 32-bit key used to uniquely identify a GRE tunnel. The hash(n1num), hash(n2num) values are used, so node numbers may be @@ -322,7 +331,7 @@ class CoreBroker(ConfigurableManager): ''' for n in self.nets: self.addnettunnel(n) - + def addnettunnel(self, n): try: net = self.session.obj(n) @@ -335,16 +344,15 @@ class CoreBroker(ConfigurableManager): if hasattr(net, 'serverintf'): if net.serverintf is not None: return None - + servers = self.getserversbynode(n) if len(servers) < 2: return None hosts = [] for server in servers: - (host, port, sock) = self.getserver(server) - if host is None: + if server.host is None: continue - hosts.append(host) + hosts.append(server.host) if len(hosts) == 0: # get IP address from API message sender (master) self.session._handlerslock.acquire() @@ -352,7 +360,7 @@ class CoreBroker(ConfigurableManager): if h.client_address != "": hosts.append(h.client_address[0]) self.session._handlerslock.release() - + r = [] for host in hosts: if self.myip: @@ -370,11 +378,11 @@ class CoreBroker(ConfigurableManager): remoteip=host, key=key) self.tunnels[key] = gt r.append(gt) - # attaching to net will later allow gt to be destroyed + # attaching to net will later allow gt to be destroyed # during net.shutdown() net.attach(gt) return r - + def deltunnel(self, n1num, n2num): ''' Cleanup of the GreTapBridge. ''' @@ -411,7 +419,7 @@ class CoreBroker(ConfigurableManager): else: self.nodecounts[server] = 1 - def delnodemap(self, sock, nodenum): + def delnodemap(self, server, nodenum): ''' Remove a node number to emulation server mapping. Return the number of nodes left on this server. ''' @@ -419,19 +427,11 @@ class CoreBroker(ConfigurableManager): with self.nodemap_lock: if nodenum not in self.nodemap: return count - found = False - for server in self.nodemap[nodenum]: - (host, port, srvsock) = self.getserver(server) - if srvsock == sock: - found = True - break + self.nodemap[nodenum].remove(server) if server in self.nodecounts: count = self.nodecounts[server] - if found: - self.nodemap[nodenum].remove(server) - if server in self.nodecounts: - count -= 1 - self.nodecounts[server] = count + count -= 1 + self.nodecounts[server] = count return count def incrbootcount(self): @@ -507,72 +507,74 @@ class CoreBroker(ConfigurableManager): def handlemsg(self, msg): ''' Handle an API message. Determine whether this needs to be handled - by the local server or forwarded on to another one. + by the local server or forwarded on to another one. Returns True when message does not need to be handled locally, and performs forwarding if required. Returning False indicates this message should be handled locally. ''' - serverlist = [] - handle_locally = False - # Do not forward messages when in definition state + servers = set() + # Do not forward messages when in definition state # (for e.g. configuring services) if self.session.getstate() == coreapi.CORE_EVENT_DEFINITION_STATE: - handle_locally = True - return not handle_locally + return False # Decide whether message should be handled locally or forwarded, or both if msg.msgtype == coreapi.CORE_API_NODE_MSG: - (handle_locally, serverlist) = self.handlenodemsg(msg) + servers = self.handlenodemsg(msg) elif msg.msgtype == coreapi.CORE_API_EVENT_MSG: # broadcast events everywhere - serverlist = self.getserverlist() + servers = self.getservers() elif msg.msgtype == coreapi.CORE_API_CONF_MSG: # broadcast location and services configuration everywhere confobj = msg.gettlv(coreapi.CORE_TLV_CONF_OBJ) if confobj == "location" or confobj == "services" or \ confobj == "session" or confobj == "all": - serverlist = self.getserverlist() + servers = self.getservers() elif msg.msgtype == coreapi.CORE_API_FILE_MSG: # broadcast hook scripts and custom service files everywhere filetype = msg.gettlv(coreapi.CORE_TLV_FILE_TYPE) if filetype is not None and \ (filetype[:5] == "hook:" or filetype[:8] == "service:"): - serverlist = self.getserverlist() + servers = self.getservers() if msg.msgtype == coreapi.CORE_API_LINK_MSG: - # prepare a serverlist from two node numbers in link message - (handle_locally, serverlist, msg) = self.handlelinkmsg(msg) - elif len(serverlist) == 0: + # prepare a server list from two node numbers in link message + servers, msg = self.handlelinkmsg(msg) + elif len(servers) == 0: # check for servers based on node numbers in all messages but link nn = msg.nodenumbers() if len(nn) == 0: return False - serverlist = self.getserversbynode(nn[0]) + servers = self.getserversbynode(nn[0]) - if len(serverlist) == 0: - handle_locally = True - - # allow other handlers to process this message - # (this is used by e.g. EMANE to use the link add message to keep counts - # of interfaces on other servers) + # allow other handlers to process this message (this is used + # by e.g. EMANE to use the link add message to keep counts of + # interfaces on other servers) for handler in self.handlers: handler(msg) # Perform any message forwarding - handle_locally = self.forwardmsg(msg, serverlist, handle_locally) + handle_locally = self.forwardmsg(msg, servers) return not handle_locally - def setupserver(self, server): + def setupserver(self, servername): ''' Send the appropriate API messages for configuring the specified emulation server. ''' - (host, port, sock) = self.getserver(server) - if host is None or sock is None: + server = self.getserverbyname(servername) + if server is None: + msg = 'ignoring unknown server: \'%s\'' % servername + self.session.warn(msg) + return + if server.sock is None or server.host is None or server.port is None: + if self.verbose: + msg = 'ignoring disconnected server: \'%s\'' % servername + self.session.info(msg) return # communicate this session's current state to the server tlvdata = coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE, self.session.getstate()) msg = coreapi.CoreEventMessage.pack(0, tlvdata) - sock.send(msg) + server.sock.send(msg) # send a Configuration message for the broker object and inform the # server of its local name tlvdata = "" @@ -582,11 +584,11 @@ class CoreBroker(ConfigurableManager): tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_DATA_TYPES, (coreapi.CONF_DATA_TYPE_STRING,)) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES, - "%s:%s:%s" % (server, host, port)) + "%s:%s:%s" % (server.name, server.host, server.port)) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_SESSION, "%s" % self.session.sessionid) msg = coreapi.CoreConfMessage.pack(0, tlvdata) - sock.send(msg) + server.sock.send(msg) @staticmethod def fixupremotetty(msghdr, msgdata, host): @@ -617,8 +619,7 @@ class CoreBroker(ConfigurableManager): be forwarded. Also keep track of link-layer nodes and the mapping of nodes to servers. ''' - serverlist = [] - handle_locally = False + servers = set() serverfiletxt = None # snoop Node Message for emulation server TLV and record mapping n = msg.tlvdata[coreapi.CORE_TLV_NODE_NUMBER] @@ -629,22 +630,21 @@ class CoreBroker(ConfigurableManager): nodecls = coreapi.node_class(nodetype) except KeyError: self.session.warn("broker invalid node type %s" % nodetype) - return (False, serverlist) + return servers if nodecls is None: self.session.warn("broker unimplemented node type %s" % nodetype) - return (False, serverlist) + return servers if issubclass(nodecls, PyCoreNet) and \ nodetype != coreapi.CORE_NODE_WLAN: # network node replicated on all servers; could be optimized # don't replicate WLANs, because ebtables rules won't work - serverlist = self.getserverlist() - handle_locally = True + servers = self.getservers() self.addnet(n) - for server in serverlist: + for server in servers: self.addnodemap(server, n) # do not record server name for networks since network # nodes are replicated across all server - return (handle_locally, serverlist) + return servers if issubclass(nodecls, PyCoreNet) and \ nodetype == coreapi.CORE_NODE_WLAN: # special case where remote WLANs not in session._objs, and no @@ -658,26 +658,27 @@ class CoreBroker(ConfigurableManager): if issubclass(nodecls, PhysicalNode): # remember physical nodes self.addphys(n) - + # emulation server TLV specifies server - server = msg.gettlv(coreapi.CORE_TLV_NODE_EMUSRV) + servername = msg.gettlv(coreapi.CORE_TLV_NODE_EMUSRV) + server = self.getserverbyname(servername) if server is not None: self.addnodemap(server, n) - if server not in serverlist: - serverlist.append(server) + if server not in servers: + servers.add(server) if serverfiletxt and self.session.master: self.writenodeserver(serverfiletxt, server) # hook to update coordinates of physical nodes if n in self.phys: self.session.mobility.physnodeupdateposition(msg) - return (handle_locally, serverlist) - + return servers + def handlelinkmsg(self, msg): ''' Determine and return the servers to which this link message should be forwarded. Also build tunnels between different servers or add opaque data to the link message before forwarding. ''' - serverlist = [] + servers = set() handle_locally = False # determine link message destination using non-network nodes @@ -688,71 +689,70 @@ class CoreBroker(ConfigurableManager): # the automatic tunnelling handle_locally = True else: - serverlist = self.getserversbynode(nn[1]) + servers = self.getserversbynode(nn[1]) elif nn[1] in self.nets: - serverlist = self.getserversbynode(nn[0]) + servers = self.getserversbynode(nn[0]) else: - serverset1 = set(self.getserversbynode(nn[0])) - serverset2 = set(self.getserversbynode(nn[1])) + servers1 = self.getserversbynode(nn[0]) + servers2 = self.getserversbynode(nn[1]) # nodes are on two different servers, build tunnels as needed - if serverset1 != serverset2: + if servers1 != servers2: localn = None - if len(serverset1) == 0 or len(serverset2) == 0: + if len(servers1) == 0 or len(servers2) == 0: handle_locally = True - serverlist = list(serverset1 | serverset2) + servers = servers1.union(servers2) host = None # get the IP of remote server and decide which node number # is for a local node - for server in serverlist: - (host, port, sock) = self.getserver(server) + for server in servers: + host = server.host if host is None: - # named server is local + # server is local handle_locally = True - if server in serverset1: + if server in servers1: localn = nn[0] else: localn = nn[1] if handle_locally and localn is None: # having no local node at this point indicates local node is - # the one with the empty serverset - if len(serverset1) == 0: + # the one with the empty server set + if len(servers1) == 0: localn = nn[0] - elif len(serverset2) == 0: + elif len(servers2) == 0: localn = nn[1] if host is None: host = self.getlinkendpoint(msg, localn == nn[0]) if localn is None: - msg = self.addlinkendpoints(msg, serverset1, serverset2) + msg = self.addlinkendpoints(msg, servers1, servers2) elif msg.flags & coreapi.CORE_API_ADD_FLAG: self.addtunnel(host, nn[0], nn[1], localn) elif msg.flags & coreapi.CORE_API_DEL_FLAG: self.deltunnel(nn[0], nn[1]) - handle_locally = False else: - serverlist = list(serverset1 | serverset2) + servers = servers1.union(servers2) - return (handle_locally, serverlist, msg) + return servers, msg - def addlinkendpoints(self, msg, serverset1, serverset2): + def addlinkendpoints(self, msg, servers1, servers2): ''' For a link message that is not handled locally, inform the remote servers of the IP addresses used as tunnel endpoints by adding opaque data to the link message. ''' - ip1 = "" - for server in serverset1: - (host, port, sock) = self.getserver(server) - if host is not None: - ip1 = host - ip2 = "" - for server in serverset2: - (host, port, sock) = self.getserver(server) - if host is not None: - ip2 = host - tlvdata = msg.rawmsg[coreapi.CoreMessage.hdrsiz:] + ip1 = '' + for server in servers1: + if server.host is not None: + ip1 = server.host + break + ip2 = '' + for server in servers2: + if server.host is not None: + ip2 = server.host + break + tlvdata = msg.rawmsg[coreapi.CoreMessage.hdrsiz:] tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_OPAQUE, "%s:%s" % (ip1, ip2)) newraw = coreapi.CoreLinkMessage.pack(msg.flags, tlvdata) - msghdr = newraw[:coreapi.CoreMessage.hdrsiz] + msghdr = newraw[:coreapi.CoreMessage.hdrsiz] return coreapi.CoreLinkMessage(msg.flags, msghdr, tlvdata) def getlinkendpoint(self, msg, first_is_local): @@ -786,59 +786,50 @@ class CoreBroker(ConfigurableManager): msgtype, flags, msglen = coreapi.CoreMessage.unpackhdr(hdr) msgcls = coreapi.msg_class(msgtype) return self.handlemsg(msgcls(flags, hdr, msg[coreapi.CoreMessage.hdrsiz:])) - - def forwardmsg(self, msg, serverlist, handle_locally): - ''' Forward API message to all servers in serverlist; if an empty - host/port is encountered, set the handle_locally flag. Returns the - value of the handle_locally flag, which may be unchanged. + + def forwardmsg(self, msg, servers): + ''' Forward API message to all given servers. + + Return True if an empty host/port is encountered, indicating + the message should be handled locally. ''' - for server in serverlist: - try: - (host, port, sock) = self.getserver(server) - except KeyError: - # server not found, don't handle this message locally - self.session.info("broker could not find server %s, message " \ - "with type %s dropped" % \ - (server, msg.msgtype)) - continue - if host is None and port is None: + handle_locally = len(servers) == 0 + for server in servers: + if server.host is None and server.port is None: # local emulation server, handle this locally handle_locally = True + elif server.sock is None: + self.session.info("server %s @ %s:%s is disconnected" % \ + (server.name, server.host, server.port)) else: - if sock is None: - self.session.info("server %s @ %s:%s is disconnected" % \ - (server, host, port)) - else: - sock.send(msg.rawmsg) + server.sock.send(msg.rawmsg) return handle_locally def writeservers(self): ''' Write the server list to a text file in the session directory upon startup: /tmp/pycore.nnnnn/servers ''' + servers = self.getservers() filename = os.path.join(self.session.sessiondir, "servers") + master = self.session_id_master + if master is None: + master = self.session.sessionid try: - f = open(filename, "w") - master = self.session_id_master - if master is None: - master = self.session.sessionid - f.write("master=%s\n" % master) - self.servers_lock.acquire() - for name in sorted(self.servers.keys()): - if name == "localhost": - continue - (host, port, sock) = self.servers[name] - try: - (lhost, lport) = sock.getsockname() - except: - lhost, lport = None, None - f.write("%s %s %s %s %s\n" % (name, host, port, lhost, lport)) - f.close() - except Exception, e: - self.session.warn("Error writing server list to the file: %s\n%s" \ - % (filename, e)) - finally: - self.servers_lock.release() + with open(filename, 'w') as f: + f.write("master=%s\n" % master) + for server in servers: + if server.name == "localhost": + continue + try: + (lhost, lport) = server.sock.getsockname() + except: + lhost, lport = None, None + f.write('%s %s %s %s %s\n' % (server.name, server.host, + server.port, lhost, lport)) + except Exception as e: + msg = 'Error writing server list to the file: \'%s\'\n%s' % \ + (filename, e) + self.session.warn(msg) def writenodeserver(self, nodestr, server): ''' Creates a /tmp/pycore.nnnnn/nX.conf/server file having the node @@ -846,8 +837,7 @@ class CoreBroker(ConfigurableManager): other machines, much like local nodes may be accessed via the VnodeClient class. ''' - (host, port, sock) = self.getserver(server) - serverstr = "%s %s %s" % (server, host, port) + serverstr = "%s %s %s" % (server.name, server.host, server.port) name = nodestr.split()[1] dirname = os.path.join(self.session.sessiondir, name + ".conf") filename = os.path.join(dirname, "server") @@ -857,14 +847,9 @@ class CoreBroker(ConfigurableManager): # directory may already exist from previous distributed run pass try: - f = open(filename, "w") - f.write("%s\n%s\n" % (serverstr, nodestr)) - f.close() - return True - except Exception, e: - msg = "Error writing server file '%s'" % filename - msg += "for node %s:\n%s" % (name, e) + with open(filename, 'w') as f: + f.write('%s\n%s\n' % (serverstr, nodestr)) + except Exception as e: + msg = 'Error writing server file \'%s\' for node %s:\n%s' % \ + (filename, name, e) self.session.warn(msg) - return False - - diff --git a/daemon/core/emane/emane.py b/daemon/core/emane/emane.py index fcc299aa..dce55fcc 100644 --- a/daemon/core/emane/emane.py +++ b/daemon/core/emane/emane.py @@ -456,10 +456,9 @@ class Emane(ConfigurableManager): self._objslock.release() for server in servers: - if server == "localhost": + if server.name == "localhost": continue - (host, port, sock) = self.session.broker.getserver(server) - if sock is None: + if server.sock is None: continue platformid += 1 typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE @@ -467,12 +466,11 @@ class Emane(ConfigurableManager): values[names.index("nem_id_start")] = str(nemid) msg = EmaneGlobalModel.toconfmsg(flags=0, nodenum=None, typeflags=typeflags, values=values) - sock.send(msg) + server.sock.send(msg) # increment nemid for next server by number of interfaces - self._ifccountslock.acquire() - if server in self._ifccounts: - nemid += self._ifccounts[server] - self._ifccountslock.release() + with self._ifccountslock: + if server in self._ifccounts: + nemid += self._ifccounts[server] return False @@ -511,7 +509,7 @@ class Emane(ConfigurableManager): session = self.session if not session.master: return # slave server - servers = session.broker.getserverlist() + servers = session.broker.getservernames() if len(servers) < 2: return # not distributed prefix = session.cfg.get('controlnet') diff --git a/daemon/core/misc/xmldeployment.py b/daemon/core/misc/xmldeployment.py index d34266ff..0544f9f6 100644 --- a/daemon/core/misc/xmldeployment.py +++ b/daemon/core/misc/xmldeployment.py @@ -87,7 +87,7 @@ class CoreDeploymentWriter(object): for n in nodelist: self.add_virtual_host(testhost, n) # TODO: handle other servers - # servers = self.session.broker.getserverlist() + # servers = self.session.broker.getservernames() # servers.remove('localhost') def add_child_element(self, parent, tagName): diff --git a/daemon/core/mobility.py b/daemon/core/mobility.py index b5a8eb45..00a60564 100644 --- a/daemon/core/mobility.py +++ b/daemon/core/mobility.py @@ -239,8 +239,8 @@ class MobilityManager(ConfigurableManager): node = self.phys[nodenum] for server in self.session.broker.getserversbynode(nodenum): break - (host, port, sock) = self.session.broker.getserver(server) - netif = self.session.broker.gettunnel(net.objid, IPAddr.toint(host)) + netif = self.session.broker.gettunnel(net.objid, + IPAddr.toint(server.host)) node.addnetif(netif, 0) netif.node = node (x,y,z) = netif.node.position.get() diff --git a/daemon/core/session.py b/daemon/core/session.py index 8d99f8e7..2e40ab8c 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -890,7 +890,7 @@ class Session(object): prefix = prefixes[0] else: # slave servers have their name and localhost in the serverlist - servers = self.broker.getserverlist() + servers = self.broker.getservernames() servers.remove('localhost') prefix = None for server_prefix in prefixes: @@ -927,7 +927,7 @@ class Session(object): # tunnels between controlnets will be built with Broker.addnettunnels() self.broker.addnet(oid) - for server in self.broker.getserverlist(): + for server in self.broker.getservers(): self.broker.addnodemap(server, oid) return ctrlnet @@ -1190,7 +1190,7 @@ class SessionConfig(ConfigurableManager, Configurable): controlnets = value.split() if len(controlnets) < 2: return # multiple controlnet prefixes do not exist - servers = self.session.broker.getserverlist() + servers = self.session.broker.getservernames() if len(servers) < 2: return # not distributed servers.remove("localhost")