diff --git a/daemon/core/broker.py b/daemon/core/broker.py index cdbbb124..c1b8570d 100644 --- a/daemon/core/broker.py +++ b/daemon/core/broker.py @@ -159,6 +159,7 @@ class CoreBroker(ConfigurableManager): """ Reset to initial state. """ + logger.info("broker reset") self.nodemap_lock.acquire() self.nodemap.clear() for server, count in self.nodecounts.iteritems(): @@ -178,11 +179,13 @@ class CoreBroker(ConfigurableManager): Spawn the receive loop for receiving messages. """ if self.recvthread is not None: + logger.info("server receive loop already started") if self.recvthread.isAlive(): return else: self.recvthread.join() # start reading data from connected sockets + logger.info("starting server receive loop") self.dorecvloop = True self.recvthread = threading.Thread(target=self.recvloop) self.recvthread.daemon = True @@ -205,6 +208,8 @@ class CoreBroker(ConfigurableManager): r, w, x = select.select(rlist, [], [], 1.0) for sock in r: server = self.getserverbysock(sock) + logger.info("attempting to receive from server: peer:%s remote:%s", + server.sock.getpeername(), server.sock.getsockname()) if server is None: # servers may have changed; loop again continue @@ -227,6 +232,7 @@ class CoreBroker(ConfigurableManager): msghdr = server.sock.recv(coreapi.CoreMessage.header_len) if len(msghdr) == 0: # server disconnected + logger.info("server disconnected, closing server") server.close() return 0 @@ -238,9 +244,11 @@ class CoreBroker(ConfigurableManager): msgdata = server.sock.recv(msglen) data = msghdr + msgdata count = None + logger.info("received message type: %s", MessageTypes(msgtype)) # snoop exec response for remote interactive TTYs if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value: data = self.fixupremotetty(msghdr, msgdata, server.host) + logger.info("created remote tty message: %s", data) elif msgtype == MessageTypes.NODE.value: # snoop node delete response to decrement node counts if msgflags & MessageFlags.DELETE.value: @@ -295,7 +303,7 @@ class CoreBroker(ConfigurableManager): server.close() del self.servers[name] - logger.info("adding server %s @ %s:%s" % (name, host, port)) + logger.info("adding server: %s @ %s:%s" % (name, host, port)) server = CoreServer(name, host, port) if host is not None and port is not None: try: @@ -403,11 +411,12 @@ class CoreBroker(ConfigurableManager): remotenum = n1num else: remotenum = n2num + if key in self.tunnels.keys(): logger.warn("tunnel with key %s (%s-%s) already exists!" % (key, n1num, n2num)) else: objid = key & ((1 << 16) - 1) - logger.info("Adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key) + logger.info("adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key) if localnum in self.physical_nodes: # no bridge is needed on physical nodes; use the GreTap directly gt = GreTap(node=None, name=None, session=self.session, @@ -424,6 +433,7 @@ class CoreBroker(ConfigurableManager): Add GreTaps between network devices on different machines. The GreTapBridge is not used since that would add an extra bridge. """ + logger.info("adding network tunnels for nodes: %s", self.network_nodes) for n in self.network_nodes: self.addnettunnel(n) @@ -460,13 +470,16 @@ class CoreBroker(ConfigurableManager): for server in servers: if server.host is None: continue + logger.info("adding server host for net tunnel: %s", server.host) hosts.append(server.host) if len(hosts) == 0: for session_client in self.session_clients: # get IP address from API message sender (master) if session_client.client_address != "": - hosts.append(session_client.client_address[0]) + address = session_client.client_address[0] + logger.info("adding session_client host: %s", address) + hosts.append(address) r = [] for host in hosts: @@ -478,8 +491,11 @@ class CoreBroker(ConfigurableManager): myip = host key = self.tunnelkey(node_id, IpAddress.to_int(myip)) if key in self.tunnels.keys(): + logger.info("tunnel already exists, returning existing tunnel: %s", key) + gt = self.tunnels[key] + r.append(gt) continue - logger.info("Adding tunnel for net %s to %s with key %s" % (node_id, host, key)) + logger.info("adding tunnel for net %s to %s with key %s" % (node_id, host, key)) gt = GreTap(node=None, name=None, session=self.session, remoteip=host, key=key) self.tunnels[key] = gt r.append(gt) @@ -499,6 +515,7 @@ class CoreBroker(ConfigurableManager): """ key = self.tunnelkey(n1num, n2num) try: + logger.info("deleting tunnel between %s - %s with key: %s", n1num, n2num, key) gt = self.tunnels.pop(key) except KeyError: gt = None @@ -515,6 +532,7 @@ class CoreBroker(ConfigurableManager): :return: gre tap between nodes or none """ key = self.tunnelkey(n1num, n2num) + logger.info("checking for tunnel(%s) in: %s", key, self.tunnels.keys()) if key in self.tunnels.keys(): return self.tunnels[key] else: @@ -671,6 +689,7 @@ class CoreBroker(ConfigurableManager): # (for e.g. configuring services) if self.session.state == EventTypes.DEFINITION_STATE.value: return False + # Decide whether message should be handled locally or forwarded, or both if message.message_type == MessageTypes.NODE.value: handle_locally, servers = self.handlenodemsg(message) @@ -691,6 +710,8 @@ class CoreBroker(ConfigurableManager): if message.message_type == MessageTypes.LINK.value: # prepare a server list from two node numbers in link message handle_locally, servers, message = self.handlelinkmsg(message) + logger.info("broker handle link message: %s - %s", handle_locally, + map(lambda x: "%s:%s" % (x.host, x.port), servers)) elif len(servers) == 0: # check for servers based on node numbers in all messages but link nn = message.node_numbers() @@ -704,7 +725,7 @@ class CoreBroker(ConfigurableManager): for handler in self.handlers: handler(message) - # Perform any message forwarding + # perform any message forwarding handle_locally |= self.forwardmsg(message, servers) return not handle_locally @@ -848,6 +869,7 @@ class CoreBroker(ConfigurableManager): # determine link message destination using non-network nodes nn = message.node_numbers() + logger.info("checking link nodes (%s) with network nodes (%s)", nn, self.network_nodes) if nn[0] in self.network_nodes: if nn[1] in self.network_nodes: # two network nodes linked together - prevent loops caused by @@ -858,8 +880,11 @@ class CoreBroker(ConfigurableManager): elif nn[1] in self.network_nodes: servers = self.getserversbynode(nn[0]) else: + logger.info("link nodes are not network nodes") servers1 = self.getserversbynode(nn[0]) + logger.info("servers for node(%s): %s", nn[0], servers1) servers2 = self.getserversbynode(nn[1]) + logger.info("servers for node(%s): %s", nn[1], servers2) # nodes are on two different servers, build tunnels as needed if servers1 != servers2: localn = None @@ -887,6 +912,8 @@ class CoreBroker(ConfigurableManager): localn = nn[1] if host is None: host = self.getlinkendpoint(message, localn == nn[0]) + + logger.info("handle locally(%s) and local node(%s)", handle_locally, localn) if localn is None: message = self.addlinkendpoints(message, servers1, servers2) elif message.flags & MessageFlags.ADD.value: @@ -992,6 +1019,8 @@ class CoreBroker(ConfigurableManager): logger.info("server %s @ %s:%s is disconnected" % ( server.name, server.host, server.port)) else: + logger.info("forwarding message to server: %s - %s:\n%s", + server.host, server.port, message) server.sock.send(message.raw_message) return handle_locally diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py index 15c187c1..4d10dc5a 100644 --- a/daemon/core/corehandlers.py +++ b/daemon/core/corehandlers.py @@ -441,7 +441,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler): :return: nothing """ if self.session and self.session.broker.handle_message(message): - logger.info("%s forwarding message:\n%s", threading.currentThread().getName(), message) + logger.info("message not being handled locally") return logger.info("%s handling message:\n%s", threading.currentThread().getName(), message) @@ -484,7 +484,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler): reply_message = "CoreMessage (type %d flags %d length %d)" % ( message_type, message_flags, message_length) - logger.info("%s: reply msg: \n%s", threading.currentThread().getName(), reply_message) + logger.info("reply to %s: \n%s", self.request.getpeername(), reply_message) try: self.sendall(reply) @@ -735,8 +735,11 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler): unidirectional = False # one of the nodes may exist on a remote server + logger.info("link message between node1(%s:%s) and node2(%s:%s)", + node_num1, interface_index1, node_num2, interface_index2) if node_num1 is not None and node_num2 is not None: tunnel = self.session.broker.gettunnel(node_num1, node_num2) + logger.info("tunnel between nodes: %s", tunnel) if isinstance(tunnel, coreobj.PyCoreNet): net = tunnel if tunnel.remotenum == node_num1: @@ -840,7 +843,6 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler): key = message.get_tlv(LinkTlvs.KEY.value) netaddrlist = [] - # print " n1=%s n2=%s net=%s net2=%s" % (node1, node2, net, net2) if node1 and net: addrlist = [] if ipv41 is not None and ipv4_mask1 is not None: diff --git a/daemon/core/netns/vnet.py b/daemon/core/netns/vnet.py index c1a17a19..e2899c2b 100644 --- a/daemon/core/netns/vnet.py +++ b/daemon/core/netns/vnet.py @@ -81,7 +81,7 @@ class EbtablesQueue(object): try: del self.last_update_time[wlan] except KeyError: - logger.exception("error deleting last update time for wlan: %s", wlan) + logger.exception("error deleting last update time for wlan, ignored before: %s", wlan) self.updatelock.release() if len(self.last_update_time) > 0: diff --git a/daemon/core/phys/pnodes.py b/daemon/core/phys/pnodes.py index 81ac4f56..e73672a0 100644 --- a/daemon/core/phys/pnodes.py +++ b/daemon/core/phys/pnodes.py @@ -169,7 +169,11 @@ class PhysicalNode(PyCoreNode): self.ifindex += 1 return ifindex - def newnetif(self, net=None, addrlist=[], hwaddr=None, ifindex=None, ifname=None): + def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None): + logger.info("creating interface") + if not addrlist: + addrlist = [] + if self.up and net is None: raise NotImplementedError @@ -229,7 +233,7 @@ class PhysicalNode(PyCoreNode): def opennodefile(self, filename, mode="w"): dirname, basename = os.path.split(filename) if not basename: - raise ValueError, "no basename for filename: " + filename + raise ValueError("no basename for filename: " + filename) if dirname and dirname[0] == "/": dirname = dirname[1:] dirname = dirname.replace("/", ".") diff --git a/daemon/core/session.py b/daemon/core/session.py index d28e6574..69fff8a6 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -26,7 +26,7 @@ from core.data import EventData from core.data import ExceptionData from core.data import FileData from core.emane.emanemanager import EmaneManager -from core.enumerations import ConfigDataTypes +from core.enumerations import ConfigDataTypes, EventTlvs from core.enumerations import ConfigFlags from core.enumerations import ConfigTlvs from core.enumerations import EventTypes @@ -830,6 +830,10 @@ class Session(object): # set broker local instantiation to complete self.broker.local_instantiation_complete() + # notify listeners that instantiation is complete + event = EventData(event_type=EventTypes.INSTANTIATION_COMPLETE.value) + self.broadcast_event(event) + # assume either all nodes have booted already, or there are some # nodes on slave servers that will be booted and those servers will # send a node status response message @@ -935,9 +939,13 @@ class Session(object): # TODO: determine instance type we need to check, due to method issue below if isinstance(obj, nodes.PyCoreNode) and not nodeutils.is_node(obj, NodeTypes.RJ45): # add a control interface if configured + logger.info("booting node: %s - %s", obj.objid, obj.name) self.add_remove_control_interface(node=obj, remove=False) obj.boot() + # TODO(blake): send node emu ids back + # self.sendnodeemuid(obj.objid) + self.update_control_interface_hosts() def validate_nodes(self): diff --git a/daemon/core/xml/xmlwriter1.py b/daemon/core/xml/xmlwriter1.py index 5dc062f0..19b882f2 100644 --- a/daemon/core/xml/xmlwriter1.py +++ b/daemon/core/xml/xmlwriter1.py @@ -962,7 +962,7 @@ def get_endpoints(network_object): if ep is not None: endpoints.append(ep) except: - logger.exception("error geting enpoints") + logger.exception("error geting endpoints, was skipped before") return endpoints