updates to some log statements and fix to avoid an exception for finding enpoints in xml
This commit is contained in:
parent
f5bff494c7
commit
181a47b8ca
3 changed files with 32 additions and 33 deletions
|
@ -142,14 +142,13 @@ class CoreBroker(ConfigurableManager):
|
|||
Close all active sockets; called when the session enters the
|
||||
data collect state
|
||||
"""
|
||||
self.reset()
|
||||
with self.servers_lock:
|
||||
while len(self.servers) > 0:
|
||||
name, server = self.servers.popitem()
|
||||
if server.sock is not None:
|
||||
logger.info("closing connection with %s @ %s:%s" %
|
||||
(name, server.host, server.port))
|
||||
logger.info("closing connection with %s: %s:%s", name, server.host, server.port)
|
||||
server.close()
|
||||
self.reset()
|
||||
self.dorecvloop = False
|
||||
if self.recvthread is not None:
|
||||
self.recvthread.join()
|
||||
|
@ -214,8 +213,7 @@ class CoreBroker(ConfigurableManager):
|
|||
continue
|
||||
rcvlen = self.recv(server)
|
||||
if rcvlen == 0:
|
||||
logger.info("connection with %s @ %s:%s has closed" % (
|
||||
server.name, server.host, server.port))
|
||||
logger.info("connection with server(%s) closed: %s:%s", server.name, server.host, server.port)
|
||||
|
||||
def recv(self, server):
|
||||
"""
|
||||
|
@ -236,18 +234,18 @@ class CoreBroker(ConfigurableManager):
|
|||
return 0
|
||||
|
||||
if len(msghdr) != coreapi.CoreMessage.header_len:
|
||||
logger.info("warning: broker received not enough data len=%s" % len(msghdr))
|
||||
logger.warn("warning: broker received not enough data len=%s", len(msghdr))
|
||||
return len(msghdr)
|
||||
|
||||
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
||||
msgdata = server.sock.recv(msglen)
|
||||
data = msghdr + msgdata
|
||||
count = None
|
||||
logger.info("received message type: %s", MessageTypes(msgtype))
|
||||
logger.debug("received message type: %s", MessageTypes(msgtype))
|
||||
# snoop exec response for remote interactive TTYs
|
||||
if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value:
|
||||
data = self.fixupremotetty(msghdr, msgdata, server.host)
|
||||
logger.info("created remote tty message: %s", data)
|
||||
logger.debug("created remote tty message: %s", data)
|
||||
elif msgtype == MessageTypes.NODE.value:
|
||||
# snoop node delete response to decrement node counts
|
||||
if msgflags & MessageFlags.DELETE.value:
|
||||
|
@ -293,22 +291,21 @@ class CoreBroker(ConfigurableManager):
|
|||
with self.servers_lock:
|
||||
server = self.servers.get(name)
|
||||
if server is not None:
|
||||
if host == server.host and port == server.port and \
|
||||
server.sock is not None:
|
||||
if host == server.host and port == server.port and server.sock is not None:
|
||||
# leave this socket connected
|
||||
return
|
||||
|
||||
logger.info("closing connection with %s @ %s:%s" % (name, server.host, server.port))
|
||||
logger.info("closing connection with %s @ %s:%s", name, server.host, server.port)
|
||||
server.close()
|
||||
del self.servers[name]
|
||||
|
||||
logger.info("adding broker server(%s): %s:%s" % (name, host, port))
|
||||
logger.info("adding broker server(%s): %s:%s", name, host, port)
|
||||
server = CoreDistributedServer(name, host, port)
|
||||
if host is not None and port is not None:
|
||||
try:
|
||||
server.connect()
|
||||
except IOError:
|
||||
logger.exception("error connecting to server %s:%s" % (host, port))
|
||||
logger.exception("error connecting to server(%s): %s:%s", name, host, port)
|
||||
if server.sock is not None:
|
||||
self.startrecvloop()
|
||||
self.servers[name] = server
|
||||
|
@ -328,7 +325,7 @@ class CoreBroker(ConfigurableManager):
|
|||
logger.exception("error deleting server")
|
||||
|
||||
if server.sock is not None:
|
||||
logger.info("closing connection with %s @ %s:%s" % (server.name, server.host, server.port))
|
||||
logger.info("closing connection with %s @ %s:%s", server.name, server.host, server.port)
|
||||
server.close()
|
||||
|
||||
def getserverbyname(self, name):
|
||||
|
@ -412,7 +409,7 @@ class CoreBroker(ConfigurableManager):
|
|||
remotenum = n2num
|
||||
|
||||
if key in self.tunnels.keys():
|
||||
logger.warn("tunnel with key %s (%s-%s) already exists!" % (key, n1num, n2num))
|
||||
logger.warn("tunnel with key %s (%s-%s) already exists!", key, n1num, n2num)
|
||||
else:
|
||||
objid = key & ((1 << 16) - 1)
|
||||
logger.info("adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key)
|
||||
|
@ -494,7 +491,7 @@ class CoreBroker(ConfigurableManager):
|
|||
gt = self.tunnels[key]
|
||||
r.append(gt)
|
||||
continue
|
||||
logger.info("adding tunnel for net %s to %s with key %s" % (node_id, host, key))
|
||||
logger.info("adding tunnel for net %s to %s with key %s", node_id, host, key)
|
||||
gt = GreTap(node=None, name=None, session=self.session, remoteip=host, key=key)
|
||||
self.tunnels[key] = gt
|
||||
r.append(gt)
|
||||
|
@ -698,8 +695,7 @@ class CoreBroker(ConfigurableManager):
|
|||
elif message.message_type == MessageTypes.CONFIG.value:
|
||||
# broadcast location and services configuration everywhere
|
||||
confobj = message.get_tlv(ConfigTlvs.OBJECT.value)
|
||||
if confobj == "location" or confobj == "services" or \
|
||||
confobj == "session" or confobj == "all":
|
||||
if confobj == "location" or confobj == "services" or confobj == "session" or confobj == "all":
|
||||
servers = self.getservers()
|
||||
elif message.message_type == MessageTypes.FILE.value:
|
||||
# broadcast hook scripts and custom service files everywhere
|
||||
|
@ -735,10 +731,10 @@ class CoreBroker(ConfigurableManager):
|
|||
"""
|
||||
server = self.getserverbyname(servername)
|
||||
if server is None:
|
||||
logger.warn("ignoring unknown server: %s" % servername)
|
||||
logger.warn("ignoring unknown server: %s", servername)
|
||||
return
|
||||
if server.sock is None or server.host is None or server.port is None:
|
||||
logger.info("ignoring disconnected server: %s" % servername)
|
||||
logger.info("ignoring disconnected server: %s", servername)
|
||||
return
|
||||
|
||||
# communicate this session"s current state to the server
|
||||
|
@ -811,10 +807,10 @@ class CoreBroker(ConfigurableManager):
|
|||
try:
|
||||
nodecls = nodeutils.get_node_class(NodeTypes(nodetype))
|
||||
except KeyError:
|
||||
logger.warn("broker invalid node type %s" % nodetype)
|
||||
logger.warn("broker invalid node type %s", nodetype)
|
||||
return handle_locally, servers
|
||||
if nodecls is None:
|
||||
logger.warn("broker unimplemented node type %s" % nodetype)
|
||||
logger.warn("broker unimplemented node type %s", nodetype)
|
||||
return handle_locally, servers
|
||||
if issubclass(nodecls, PyCoreNet) and nodetype != NodeTypes.WIRELESS_LAN.value:
|
||||
# network node replicated on all servers; could be optimized
|
||||
|
@ -1013,11 +1009,10 @@ class CoreBroker(ConfigurableManager):
|
|||
# local emulation server, handle this locally
|
||||
handle_locally = True
|
||||
elif server.sock is None:
|
||||
logger.info("server %s @ %s:%s is disconnected" % (
|
||||
server.name, server.host, server.port))
|
||||
logger.info("server %s @ %s:%s is disconnected", server.name, server.host, server.port)
|
||||
else:
|
||||
logger.info("forwarding message to server: %s - %s:\n%s",
|
||||
server.host, server.port, message)
|
||||
logger.info("forwarding message to server(%s): %s:%s", server.name, server.host, server.port)
|
||||
logger.debug("message being forwarded:\n%s", message)
|
||||
server.sock.send(message.raw_message)
|
||||
return handle_locally
|
||||
|
||||
|
@ -1045,7 +1040,7 @@ class CoreBroker(ConfigurableManager):
|
|||
lhost, lport = server.sock.getsockname()
|
||||
f.write("%s %s %s %s %s\n" % (server.name, server.host, server.port, lhost, lport))
|
||||
except IOError:
|
||||
logger.exception("error writing server list to the file: %s" % filename)
|
||||
logger.exception("error writing server list to the file: %s", filename)
|
||||
|
||||
def writenodeserver(self, nodestr, server):
|
||||
"""
|
||||
|
@ -1072,7 +1067,7 @@ class CoreBroker(ConfigurableManager):
|
|||
with open(filename, "w") as f:
|
||||
f.write("%s\n%s\n" % (serverstr, nodestr))
|
||||
except IOError:
|
||||
logger.exception("error writing server file %s for node %s" % (filename, name))
|
||||
logger.exception("error writing server file %s for node %s", filename, name)
|
||||
|
||||
def local_instantiation_complete(self):
|
||||
"""
|
||||
|
@ -1126,9 +1121,9 @@ class CoreBroker(ConfigurableManager):
|
|||
if values_str is None:
|
||||
return
|
||||
|
||||
value_strings = values_str.split('|')
|
||||
value_strings = values_str.split("|")
|
||||
for value_string in value_strings:
|
||||
key, value = value_string.split('=', 1)
|
||||
key, value = value_string.split("=", 1)
|
||||
if key == "controlnet":
|
||||
self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string))
|
||||
|
||||
|
@ -1144,7 +1139,7 @@ class CoreBroker(ConfigurableManager):
|
|||
:return: nothing
|
||||
"""
|
||||
key_value = values[index]
|
||||
key, value = key_value.split('=', 1)
|
||||
key, value = key_value.split("=", 1)
|
||||
control_nets = value.split()
|
||||
|
||||
if len(control_nets) < 2:
|
||||
|
|
|
@ -891,6 +891,10 @@ def get_endpoint(network_object, interface_object):
|
|||
ep = None
|
||||
l2devport = None
|
||||
|
||||
# skip if either are none
|
||||
if not network_object or not interface_object:
|
||||
return ep
|
||||
|
||||
# if ifcObj references an interface of a node and is part of this network
|
||||
if interface_object.net.objid == network_object.objid and hasattr(interface_object,
|
||||
'node') and interface_object.node:
|
||||
|
@ -957,7 +961,7 @@ def get_endpoints(network_object):
|
|||
if ep is not None:
|
||||
endpoints.append(ep)
|
||||
except:
|
||||
logger.exception("error geting endpoints, was skipped before")
|
||||
logger.debug("error geting endpoints, was skipped before")
|
||||
|
||||
return endpoints
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ def cored(cfg, use_ovs):
|
|||
sys.exit(1)
|
||||
|
||||
close_onexec(server.fileno())
|
||||
logger.debug("main server started, listening on: %s:%s", host, port)
|
||||
logger.info("server started, listening on: %s:%s", host, port)
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue