added various log messages to help with correcting issues, added some for adding tunnels that fixes p2p with distributed core, but teardown for p2p is still broken

This commit is contained in:
Blake J. Harnden 2017-07-31 09:08:57 -07:00
parent e2a932698c
commit 350341cda7
6 changed files with 56 additions and 13 deletions

View file

@ -159,6 +159,7 @@ class CoreBroker(ConfigurableManager):
"""
Reset to initial state.
"""
logger.info("broker reset")
self.nodemap_lock.acquire()
self.nodemap.clear()
for server, count in self.nodecounts.iteritems():
@ -178,11 +179,13 @@ class CoreBroker(ConfigurableManager):
Spawn the receive loop for receiving messages.
"""
if self.recvthread is not None:
logger.info("server receive loop already started")
if self.recvthread.isAlive():
return
else:
self.recvthread.join()
# start reading data from connected sockets
logger.info("starting server receive loop")
self.dorecvloop = True
self.recvthread = threading.Thread(target=self.recvloop)
self.recvthread.daemon = True
@ -205,6 +208,8 @@ class CoreBroker(ConfigurableManager):
r, w, x = select.select(rlist, [], [], 1.0)
for sock in r:
server = self.getserverbysock(sock)
logger.info("attempting to receive from server: peer:%s remote:%s",
server.sock.getpeername(), server.sock.getsockname())
if server is None:
# servers may have changed; loop again
continue
@ -227,6 +232,7 @@ class CoreBroker(ConfigurableManager):
msghdr = server.sock.recv(coreapi.CoreMessage.header_len)
if len(msghdr) == 0:
# server disconnected
logger.info("server disconnected, closing server")
server.close()
return 0
@ -238,9 +244,11 @@ class CoreBroker(ConfigurableManager):
msgdata = server.sock.recv(msglen)
data = msghdr + msgdata
count = None
logger.info("received message type: %s", MessageTypes(msgtype))
# snoop exec response for remote interactive TTYs
if msgtype == MessageTypes.EXECUTE.value and msgflags & MessageFlags.TTY.value:
data = self.fixupremotetty(msghdr, msgdata, server.host)
logger.info("created remote tty message: %s", data)
elif msgtype == MessageTypes.NODE.value:
# snoop node delete response to decrement node counts
if msgflags & MessageFlags.DELETE.value:
@ -295,7 +303,7 @@ class CoreBroker(ConfigurableManager):
server.close()
del self.servers[name]
logger.info("adding server %s @ %s:%s" % (name, host, port))
logger.info("adding server: %s @ %s:%s" % (name, host, port))
server = CoreServer(name, host, port)
if host is not None and port is not None:
try:
@ -403,11 +411,12 @@ class CoreBroker(ConfigurableManager):
remotenum = n1num
else:
remotenum = n2num
if key in self.tunnels.keys():
logger.warn("tunnel with key %s (%s-%s) already exists!" % (key, n1num, n2num))
else:
objid = key & ((1 << 16) - 1)
logger.info("Adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key)
logger.info("adding tunnel for %s-%s to %s with key %s", n1num, n2num, remoteip, key)
if localnum in self.physical_nodes:
# no bridge is needed on physical nodes; use the GreTap directly
gt = GreTap(node=None, name=None, session=self.session,
@ -424,6 +433,7 @@ class CoreBroker(ConfigurableManager):
Add GreTaps between network devices on different machines.
The GreTapBridge is not used since that would add an extra bridge.
"""
logger.info("adding network tunnels for nodes: %s", self.network_nodes)
for n in self.network_nodes:
self.addnettunnel(n)
@ -460,13 +470,16 @@ class CoreBroker(ConfigurableManager):
for server in servers:
if server.host is None:
continue
logger.info("adding server host for net tunnel: %s", server.host)
hosts.append(server.host)
if len(hosts) == 0:
for session_client in self.session_clients:
# get IP address from API message sender (master)
if session_client.client_address != "":
hosts.append(session_client.client_address[0])
address = session_client.client_address[0]
logger.info("adding session_client host: %s", address)
hosts.append(address)
r = []
for host in hosts:
@ -478,8 +491,11 @@ class CoreBroker(ConfigurableManager):
myip = host
key = self.tunnelkey(node_id, IpAddress.to_int(myip))
if key in self.tunnels.keys():
logger.info("tunnel already exists, returning existing tunnel: %s", key)
gt = self.tunnels[key]
r.append(gt)
continue
logger.info("Adding tunnel for net %s to %s with key %s" % (node_id, host, key))
logger.info("adding tunnel for net %s to %s with key %s" % (node_id, host, key))
gt = GreTap(node=None, name=None, session=self.session, remoteip=host, key=key)
self.tunnels[key] = gt
r.append(gt)
@ -499,6 +515,7 @@ class CoreBroker(ConfigurableManager):
"""
key = self.tunnelkey(n1num, n2num)
try:
logger.info("deleting tunnel between %s - %s with key: %s", n1num, n2num, key)
gt = self.tunnels.pop(key)
except KeyError:
gt = None
@ -515,6 +532,7 @@ class CoreBroker(ConfigurableManager):
:return: gre tap between nodes or none
"""
key = self.tunnelkey(n1num, n2num)
logger.info("checking for tunnel(%s) in: %s", key, self.tunnels.keys())
if key in self.tunnels.keys():
return self.tunnels[key]
else:
@ -671,6 +689,7 @@ class CoreBroker(ConfigurableManager):
# (for e.g. configuring services)
if self.session.state == EventTypes.DEFINITION_STATE.value:
return False
# Decide whether message should be handled locally or forwarded, or both
if message.message_type == MessageTypes.NODE.value:
handle_locally, servers = self.handlenodemsg(message)
@ -691,6 +710,8 @@ class CoreBroker(ConfigurableManager):
if message.message_type == MessageTypes.LINK.value:
# prepare a server list from two node numbers in link message
handle_locally, servers, message = self.handlelinkmsg(message)
logger.info("broker handle link message: %s - %s", handle_locally,
map(lambda x: "%s:%s" % (x.host, x.port), servers))
elif len(servers) == 0:
# check for servers based on node numbers in all messages but link
nn = message.node_numbers()
@ -704,7 +725,7 @@ class CoreBroker(ConfigurableManager):
for handler in self.handlers:
handler(message)
# Perform any message forwarding
# perform any message forwarding
handle_locally |= self.forwardmsg(message, servers)
return not handle_locally
@ -848,6 +869,7 @@ class CoreBroker(ConfigurableManager):
# determine link message destination using non-network nodes
nn = message.node_numbers()
logger.info("checking link nodes (%s) with network nodes (%s)", nn, self.network_nodes)
if nn[0] in self.network_nodes:
if nn[1] in self.network_nodes:
# two network nodes linked together - prevent loops caused by
@ -858,8 +880,11 @@ class CoreBroker(ConfigurableManager):
elif nn[1] in self.network_nodes:
servers = self.getserversbynode(nn[0])
else:
logger.info("link nodes are not network nodes")
servers1 = self.getserversbynode(nn[0])
logger.info("servers for node(%s): %s", nn[0], servers1)
servers2 = self.getserversbynode(nn[1])
logger.info("servers for node(%s): %s", nn[1], servers2)
# nodes are on two different servers, build tunnels as needed
if servers1 != servers2:
localn = None
@ -887,6 +912,8 @@ class CoreBroker(ConfigurableManager):
localn = nn[1]
if host is None:
host = self.getlinkendpoint(message, localn == nn[0])
logger.info("handle locally(%s) and local node(%s)", handle_locally, localn)
if localn is None:
message = self.addlinkendpoints(message, servers1, servers2)
elif message.flags & MessageFlags.ADD.value:
@ -992,6 +1019,8 @@ class CoreBroker(ConfigurableManager):
logger.info("server %s @ %s:%s is disconnected" % (
server.name, server.host, server.port))
else:
logger.info("forwarding message to server: %s - %s:\n%s",
server.host, server.port, message)
server.sock.send(message.raw_message)
return handle_locally

View file

@ -441,7 +441,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
:return: nothing
"""
if self.session and self.session.broker.handle_message(message):
logger.info("%s forwarding message:\n%s", threading.currentThread().getName(), message)
logger.info("message not being handled locally")
return
logger.info("%s handling message:\n%s", threading.currentThread().getName(), message)
@ -484,7 +484,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
reply_message = "CoreMessage (type %d flags %d length %d)" % (
message_type, message_flags, message_length)
logger.info("%s: reply msg: \n%s", threading.currentThread().getName(), reply_message)
logger.info("reply to %s: \n%s", self.request.getpeername(), reply_message)
try:
self.sendall(reply)
@ -735,8 +735,11 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
unidirectional = False
# one of the nodes may exist on a remote server
logger.info("link message between node1(%s:%s) and node2(%s:%s)",
node_num1, interface_index1, node_num2, interface_index2)
if node_num1 is not None and node_num2 is not None:
tunnel = self.session.broker.gettunnel(node_num1, node_num2)
logger.info("tunnel between nodes: %s", tunnel)
if isinstance(tunnel, coreobj.PyCoreNet):
net = tunnel
if tunnel.remotenum == node_num1:
@ -840,7 +843,6 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
key = message.get_tlv(LinkTlvs.KEY.value)
netaddrlist = []
# print " n1=%s n2=%s net=%s net2=%s" % (node1, node2, net, net2)
if node1 and net:
addrlist = []
if ipv41 is not None and ipv4_mask1 is not None:

View file

@ -81,7 +81,7 @@ class EbtablesQueue(object):
try:
del self.last_update_time[wlan]
except KeyError:
logger.exception("error deleting last update time for wlan: %s", wlan)
logger.exception("error deleting last update time for wlan, ignored before: %s", wlan)
self.updatelock.release()
if len(self.last_update_time) > 0:

View file

@ -169,7 +169,11 @@ class PhysicalNode(PyCoreNode):
self.ifindex += 1
return ifindex
def newnetif(self, net=None, addrlist=[], hwaddr=None, ifindex=None, ifname=None):
def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None):
logger.info("creating interface")
if not addrlist:
addrlist = []
if self.up and net is None:
raise NotImplementedError
@ -229,7 +233,7 @@ class PhysicalNode(PyCoreNode):
def opennodefile(self, filename, mode="w"):
dirname, basename = os.path.split(filename)
if not basename:
raise ValueError, "no basename for filename: " + filename
raise ValueError("no basename for filename: " + filename)
if dirname and dirname[0] == "/":
dirname = dirname[1:]
dirname = dirname.replace("/", ".")

View file

@ -26,7 +26,7 @@ from core.data import EventData
from core.data import ExceptionData
from core.data import FileData
from core.emane.emanemanager import EmaneManager
from core.enumerations import ConfigDataTypes
from core.enumerations import ConfigDataTypes, EventTlvs
from core.enumerations import ConfigFlags
from core.enumerations import ConfigTlvs
from core.enumerations import EventTypes
@ -830,6 +830,10 @@ class Session(object):
# set broker local instantiation to complete
self.broker.local_instantiation_complete()
# notify listeners that instantiation is complete
event = EventData(event_type=EventTypes.INSTANTIATION_COMPLETE.value)
self.broadcast_event(event)
# assume either all nodes have booted already, or there are some
# nodes on slave servers that will be booted and those servers will
# send a node status response message
@ -935,9 +939,13 @@ class Session(object):
# TODO: determine instance type we need to check, due to method issue below
if isinstance(obj, nodes.PyCoreNode) and not nodeutils.is_node(obj, NodeTypes.RJ45):
# add a control interface if configured
logger.info("booting node: %s - %s", obj.objid, obj.name)
self.add_remove_control_interface(node=obj, remove=False)
obj.boot()
# TODO(blake): send node emu ids back
# self.sendnodeemuid(obj.objid)
self.update_control_interface_hosts()
def validate_nodes(self):

View file

@ -962,7 +962,7 @@ def get_endpoints(network_object):
if ep is not None:
endpoints.append(ep)
except:
logger.exception("error geting enpoints")
logger.exception("error geting endpoints, was skipped before")
return endpoints