updated logging usage, to use the library directly and avoid imposing a core config by default, allowing users of the core library to configure logging as desired
This commit is contained in:
parent
37f747c212
commit
7aee2b2ba7
44 changed files with 552 additions and 527 deletions
|
@ -4,12 +4,12 @@ other emulation servers. The broker is consulted when handling messages to deter
|
|||
should be handled locally or forwarded on to another emulation server.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
import socket
|
||||
import threading
|
||||
|
||||
from core import logger
|
||||
from core.api import coreapi
|
||||
from core.coreobj import PyCoreNet
|
||||
from core.coreobj import PyCoreNode
|
||||
|
@ -148,7 +148,7 @@ class CoreBroker(object):
|
|||
while len(self.servers) > 0:
|
||||
name, server = self.servers.popitem()
|
||||
if server.sock is not None:
|
||||
logger.info("closing connection with %s: %s:%s", name, server.host, server.port)
|
||||
logging.info("closing connection with %s: %s:%s", name, server.host, server.port)
|
||||
server.close()
|
||||
self.dorecvloop = False
|
||||
if self.recvthread is not None:
|
||||
|
@ -158,7 +158,7 @@ class CoreBroker(object):
|
|||
"""
|
||||
Reset to initial state.
|
||||
"""
|
||||
logger.info("clearing state")
|
||||
logging.info("clearing state")
|
||||
self.nodemap_lock.acquire()
|
||||
self.nodemap.clear()
|
||||
for server, count in self.nodecounts.iteritems():
|
||||
|
@ -178,13 +178,13 @@ class CoreBroker(object):
|
|||
Spawn the receive loop for receiving messages.
|
||||
"""
|
||||
if self.recvthread is not None:
|
||||
logger.info("server receive loop already started")
|
||||
logging.info("server receive loop already started")
|
||||
if self.recvthread.isAlive():
|
||||
return
|
||||
else:
|
||||
self.recvthread.join()
|
||||
# start reading data from connected sockets
|
||||
logger.info("starting server receive loop")
|
||||
logging.info("starting server receive loop")
|
||||
self.dorecvloop = True
|
||||
self.recvthread = threading.Thread(target=self.recvloop)
|
||||
self.recvthread.daemon = True
|
||||
|
@ -207,14 +207,14 @@ class CoreBroker(object):
|
|||
r, _w, _x = select.select(rlist, [], [], 1.0)
|
||||
for sock in r:
|
||||
server = self.getserverbysock(sock)
|
||||
logger.info("attempting to receive from server: peer:%s remote:%s",
|
||||
logging.info("attempting to receive from server: peer:%s remote:%s",
|
||||
server.sock.getpeername(), server.sock.getsockname())
|
||||
if server is None:
|
||||
# servers may have changed; loop again
|
||||
continue
|
||||
rcvlen = self.recv(server)
|
||||
if rcvlen == 0:
|
||||
logger.info("connection with server(%s) closed: %s:%s", server.name, server.host, server.port)
|
||||
logging.info("connection with server(%s) closed: %s:%s", server.name, server.host, server.port)
|
||||
|
||||
def recv(self, server):
|
||||
"""
|
||||
|
@ -230,23 +230,23 @@ class CoreBroker(object):
|
|||
msghdr = server.sock.recv(coreapi.CoreMessage.header_len)
|
||||
if len(msghdr) == 0:
|
||||
# server disconnected
|
||||
logger.info("server disconnected, closing server")
|
||||
logging.info("server disconnected, closing server")
|
||||
server.close()
|
||||
return 0
|
||||
|
||||
if len(msghdr) != coreapi.CoreMessage.header_len:
|
||||
logger.warn("warning: broker received not enough data len=%s", len(msghdr))
|
||||
logging.warn("warning: broker received not enough data len=%s", len(msghdr))
|
||||
return len(msghdr)
|
||||
|
||||
msgtype, msgflags, msglen = coreapi.CoreMessage.unpack_header(msghdr)
|
||||
msgdata = server.sock.recv(msglen)
|
||||
data = msghdr + msgdata
|
||||
count = None
|
||||
logger.debug("received message type: %s", MessageTypes(msgtype))
|
||||
logging.debug("received message type: %s", MessageTypes(msgtype))
|
||||
# snoop exec response for remote interactive TTYs
|
||||
if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value:
|
||||
data = self.fixupremotetty(msghdr, msgdata, server.host)
|
||||
logger.debug("created remote tty message: %s", data)
|
||||
logging.debug("created remote tty message: %s", data)
|
||||
elif msgtype == MessageTypes.NODE.value:
|
||||
# snoop node delete response to decrement node counts
|
||||
if msgflags & MessageFlags.DELETE.value:
|
||||
|
@ -266,13 +266,13 @@ class CoreBroker(object):
|
|||
if self.instantiation_complete():
|
||||
self.session.check_runtime()
|
||||
else:
|
||||
logger.error("unknown message type received: %s", msgtype)
|
||||
logging.error("unknown message type received: %s", msgtype)
|
||||
|
||||
try:
|
||||
for session_client in self.session_clients:
|
||||
session_client.sendall(data)
|
||||
except IOError:
|
||||
logger.exception("error sending message")
|
||||
logging.exception("error sending message")
|
||||
|
||||
if count is not None and count < 1:
|
||||
return 0
|
||||
|
@ -296,17 +296,17 @@ class CoreBroker(object):
|
|||
# leave this socket connected
|
||||
return
|
||||
|
||||
logger.info("closing connection with %s @ %s:%s", name, server.host, server.port)
|
||||
logging.info("closing connection with %s @ %s:%s", name, server.host, server.port)
|
||||
server.close()
|
||||
del self.servers[name]
|
||||
|
||||
logger.info("adding broker server(%s): %s:%s", name, host, port)
|
||||
logging.info("adding broker server(%s): %s:%s", name, host, port)
|
||||
server = CoreDistributedServer(name, host, port)
|
||||
if host is not None and port is not None:
|
||||
try:
|
||||
server.connect()
|
||||
except IOError:
|
||||
logger.exception("error connecting to server(%s): %s:%s", name, host, port)
|
||||
logging.exception("error connecting to server(%s): %s:%s", name, host, port)
|
||||
if server.sock is not None:
|
||||
self.startrecvloop()
|
||||
self.servers[name] = server
|
||||
|
@ -324,10 +324,10 @@ class CoreBroker(object):
|
|||
if s != server:
|
||||
raise ValueError("server removed was not the server provided")
|
||||
except KeyError:
|
||||
logger.exception("error deleting server")
|
||||
logging.exception("error deleting server")
|
||||
|
||||
if server.sock is not None:
|
||||
logger.info("closing connection with %s @ %s:%s", server.name, server.host, server.port)
|
||||
logging.info("closing connection with %s @ %s:%s", server.name, server.host, server.port)
|
||||
server.close()
|
||||
|
||||
def getserverbyname(self, name):
|
||||
|
@ -411,10 +411,10 @@ class CoreBroker(object):
|
|||
remotenum = n2num
|
||||
|
||||
if key in self.tunnels.keys():
|
||||
logger.warn("tunnel with key %s (%s-%s) already exists!", key, n1num, n2num)
|
||||
logging.warn("tunnel with key %s (%s-%s) already exists!", key, n1num, n2num)
|
||||
else:
|
||||
objid = key & ((1 << 16) - 1)
|
||||
logger.info("adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key)
|
||||
logging.info("adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key)
|
||||
if localnum in self.physical_nodes:
|
||||
# no bridge is needed on physical nodes; use the GreTap directly
|
||||
gt = GreTap(node=None, name=None, session=self.session,
|
||||
|
@ -431,7 +431,7 @@ class CoreBroker(object):
|
|||
Add GreTaps between network devices on different machines.
|
||||
The GreTapBridge is not used since that would add an extra bridge.
|
||||
"""
|
||||
logger.debug("adding network tunnels for nodes: %s", self.network_nodes)
|
||||
logging.debug("adding network tunnels for nodes: %s", self.network_nodes)
|
||||
for n in self.network_nodes:
|
||||
self.addnettunnel(n)
|
||||
|
||||
|
@ -445,30 +445,30 @@ class CoreBroker(object):
|
|||
"""
|
||||
try:
|
||||
net = self.session.get_object(node_id)
|
||||
logger.info("adding net tunnel for: id(%s) %s", node_id, net)
|
||||
logging.info("adding net tunnel for: id(%s) %s", node_id, net)
|
||||
except KeyError:
|
||||
raise KeyError("network node %s not found" % node_id)
|
||||
|
||||
# add other nets here that do not require tunnels
|
||||
if nodeutils.is_node(net, NodeTypes.EMANE_NET):
|
||||
logger.warn("emane network does not require a tunnel")
|
||||
logging.warn("emane network does not require a tunnel")
|
||||
return None
|
||||
|
||||
server_interface = getattr(net, "serverintf", None)
|
||||
if nodeutils.is_node(net, NodeTypes.CONTROL_NET) and server_interface is not None:
|
||||
logger.warn("control networks with server interfaces do not need a tunnel")
|
||||
logging.warn("control networks with server interfaces do not need a tunnel")
|
||||
return None
|
||||
|
||||
servers = self.getserversbynode(node_id)
|
||||
if len(servers) < 2:
|
||||
logger.warn("not enough servers to create a tunnel: %s", servers)
|
||||
logging.warn("not enough servers to create a tunnel: %s", servers)
|
||||
return None
|
||||
|
||||
hosts = []
|
||||
for server in servers:
|
||||
if server.host is None:
|
||||
continue
|
||||
logger.info("adding server host for net tunnel: %s", server.host)
|
||||
logging.info("adding server host for net tunnel: %s", server.host)
|
||||
hosts.append(server.host)
|
||||
|
||||
if len(hosts) == 0:
|
||||
|
@ -476,7 +476,7 @@ class CoreBroker(object):
|
|||
# get IP address from API message sender (master)
|
||||
if session_client.client_address != "":
|
||||
address = session_client.client_address[0]
|
||||
logger.info("adding session_client host: %s", address)
|
||||
logging.info("adding session_client host: %s", address)
|
||||
hosts.append(address)
|
||||
|
||||
r = []
|
||||
|
@ -489,11 +489,11 @@ class CoreBroker(object):
|
|||
myip = host
|
||||
key = self.tunnelkey(node_id, IpAddress.to_int(myip))
|
||||
if key in self.tunnels.keys():
|
||||
logger.info("tunnel already exists, returning existing tunnel: %s", key)
|
||||
logging.info("tunnel already exists, returning existing tunnel: %s", key)
|
||||
gt = self.tunnels[key]
|
||||
r.append(gt)
|
||||
continue
|
||||
logger.info("adding tunnel for net %s to %s with key %s", node_id, host, key)
|
||||
logging.info("adding tunnel for net %s to %s with key %s", node_id, host, key)
|
||||
gt = GreTap(node=None, name=None, session=self.session, remoteip=host, key=key)
|
||||
self.tunnels[key] = gt
|
||||
r.append(gt)
|
||||
|
@ -513,7 +513,7 @@ class CoreBroker(object):
|
|||
"""
|
||||
key = self.tunnelkey(n1num, n2num)
|
||||
try:
|
||||
logger.info("deleting tunnel between %s - %s with key: %s", n1num, n2num, key)
|
||||
logging.info("deleting tunnel between %s - %s with key: %s", n1num, n2num, key)
|
||||
gt = self.tunnels.pop(key)
|
||||
except KeyError:
|
||||
gt = None
|
||||
|
@ -530,7 +530,7 @@ class CoreBroker(object):
|
|||
:return: gre tap between nodes or none
|
||||
"""
|
||||
key = self.tunnelkey(n1num, n2num)
|
||||
logger.debug("checking for tunnel(%s) in: %s", key, self.tunnels.keys())
|
||||
logging.debug("checking for tunnel(%s) in: %s", key, self.tunnels.keys())
|
||||
if key in self.tunnels.keys():
|
||||
return self.tunnels[key]
|
||||
else:
|
||||
|
@ -600,9 +600,9 @@ class CoreBroker(object):
|
|||
:param int nodenum: node id to add
|
||||
:return: nothing
|
||||
"""
|
||||
logger.info("adding net to broker: %s", nodenum)
|
||||
logging.info("adding net to broker: %s", nodenum)
|
||||
self.network_nodes.add(nodenum)
|
||||
logger.info("broker network nodes: %s", self.network_nodes)
|
||||
logging.info("broker network nodes: %s", self.network_nodes)
|
||||
|
||||
def addphys(self, nodenum):
|
||||
"""
|
||||
|
@ -677,11 +677,11 @@ class CoreBroker(object):
|
|||
"""
|
||||
server = self.getserverbyname(servername)
|
||||
if server is None:
|
||||
logger.warn("ignoring unknown server: %s", servername)
|
||||
logging.warn("ignoring unknown server: %s", servername)
|
||||
return
|
||||
|
||||
if server.sock is None or server.host is None or server.port is None:
|
||||
logger.info("ignoring disconnected server: %s", servername)
|
||||
logging.info("ignoring disconnected server: %s", servername)
|
||||
return
|
||||
|
||||
# communicate this session"s current state to the server
|
||||
|
@ -753,10 +753,10 @@ class CoreBroker(object):
|
|||
try:
|
||||
nodecls = nodeutils.get_node_class(NodeTypes(nodetype))
|
||||
except KeyError:
|
||||
logger.warn("broker invalid node type %s", nodetype)
|
||||
logging.warn("broker invalid node type %s", nodetype)
|
||||
return handle_locally, servers
|
||||
if nodecls is None:
|
||||
logger.warn("broker unimplemented node type %s", nodetype)
|
||||
logging.warn("broker unimplemented node type %s", nodetype)
|
||||
return handle_locally, servers
|
||||
if issubclass(nodecls, PyCoreNet) and nodetype != NodeTypes.WIRELESS_LAN.value:
|
||||
# network node replicated on all servers; could be optimized
|
||||
|
@ -808,7 +808,7 @@ class CoreBroker(object):
|
|||
|
||||
# determine link message destination using non-network nodes
|
||||
nn = message.node_numbers()
|
||||
logger.debug("checking link nodes (%s) with network nodes (%s)", nn, self.network_nodes)
|
||||
logging.debug("checking link nodes (%s) with network nodes (%s)", nn, self.network_nodes)
|
||||
if nn[0] in self.network_nodes:
|
||||
if nn[1] in self.network_nodes:
|
||||
# two network nodes linked together - prevent loops caused by
|
||||
|
@ -819,11 +819,11 @@ class CoreBroker(object):
|
|||
elif nn[1] in self.network_nodes:
|
||||
servers = self.getserversbynode(nn[0])
|
||||
else:
|
||||
logger.debug("link nodes are not network nodes")
|
||||
logging.debug("link nodes are not network nodes")
|
||||
servers1 = self.getserversbynode(nn[0])
|
||||
logger.debug("servers for node(%s): %s", nn[0], servers1)
|
||||
logging.debug("servers for node(%s): %s", nn[0], servers1)
|
||||
servers2 = self.getserversbynode(nn[1])
|
||||
logger.debug("servers for node(%s): %s", nn[1], servers2)
|
||||
logging.debug("servers for node(%s): %s", nn[1], servers2)
|
||||
# nodes are on two different servers, build tunnels as needed
|
||||
if servers1 != servers2:
|
||||
localn = None
|
||||
|
@ -852,7 +852,7 @@ class CoreBroker(object):
|
|||
if host is None:
|
||||
host = self.getlinkendpoint(message, localn == nn[0])
|
||||
|
||||
logger.debug("handle locally(%s) and local node(%s)", handle_locally, localn)
|
||||
logging.debug("handle locally(%s) and local node(%s)", handle_locally, localn)
|
||||
if localn is None:
|
||||
message = self.addlinkendpoints(message, servers1, servers2)
|
||||
elif message.flags & MessageFlags.ADD.value:
|
||||
|
@ -955,10 +955,10 @@ class CoreBroker(object):
|
|||
# local emulation server, handle this locally
|
||||
handle_locally = True
|
||||
elif server.sock is None:
|
||||
logger.info("server %s @ %s:%s is disconnected", server.name, server.host, server.port)
|
||||
logging.info("server %s @ %s:%s is disconnected", server.name, server.host, server.port)
|
||||
else:
|
||||
logger.info("forwarding message to server(%s): %s:%s", server.name, server.host, server.port)
|
||||
logger.debug("message being forwarded:\n%s", message)
|
||||
logging.info("forwarding message to server(%s): %s:%s", server.name, server.host, server.port)
|
||||
logging.debug("message being forwarded:\n%s", message)
|
||||
server.sock.send(message.raw_message)
|
||||
return handle_locally
|
||||
|
||||
|
@ -986,7 +986,7 @@ class CoreBroker(object):
|
|||
lhost, lport = server.sock.getsockname()
|
||||
f.write("%s %s %s %s %s\n" % (server.name, server.host, server.port, lhost, lport))
|
||||
except IOError:
|
||||
logger.exception("error writing server list to the file: %s", filename)
|
||||
logging.exception("error writing server list to the file: %s", filename)
|
||||
|
||||
def writenodeserver(self, nodestr, server):
|
||||
"""
|
||||
|
@ -1007,13 +1007,13 @@ class CoreBroker(object):
|
|||
os.makedirs(dirname)
|
||||
except OSError:
|
||||
# directory may already exist from previous distributed run
|
||||
logger.exception("error creating directory: %s", dirname)
|
||||
logging.exception("error creating directory: %s", dirname)
|
||||
|
||||
try:
|
||||
with open(filename, "w") as f:
|
||||
f.write("%s\n%s\n" % (serverstr, nodestr))
|
||||
except IOError:
|
||||
logger.exception("error writing server file %s for node %s", filename, name)
|
||||
logging.exception("error writing server file %s for node %s", filename, name)
|
||||
|
||||
def local_instantiation_complete(self):
|
||||
"""
|
||||
|
@ -1089,12 +1089,12 @@ class CoreBroker(object):
|
|||
control_nets = value.split()
|
||||
|
||||
if len(control_nets) < 2:
|
||||
logger.warn("multiple controlnet prefixes do not exist")
|
||||
logging.warn("multiple controlnet prefixes do not exist")
|
||||
return
|
||||
|
||||
servers = self.session.broker.getservernames()
|
||||
if len(servers) < 2:
|
||||
logger.warn("not distributed")
|
||||
logging.warn("not distributed")
|
||||
return
|
||||
|
||||
servers.remove("localhost")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue