initial effort to try and fix multiple clients to behave as before, includes fixes for guaranteed message ordering of nodes before links when joining a session

This commit is contained in:
Blake J. Harnden 2017-07-11 08:55:15 -07:00
parent 673fdc94b5
commit 777e19de93
4 changed files with 84 additions and 59 deletions

View file

@ -39,7 +39,7 @@ logger = log.get_logger(__name__)
# TODO: name conflict with main core server, probably should rename
class CoreServer(object):
"""
Reptesents CORE daemon servers for communication.
Represents CORE daemon servers for communication.
"""
def __init__(self, name, host, port):
"""
@ -104,7 +104,7 @@ class CoreBroker(ConfigurableManager):
ConfigurableManager.__init__(self)
self.session = session
self.session_handler = None
self.session_clients = []
self.session_id_master = None
self.myip = None
# dict containing tuples of (host, port, sock)
@ -263,7 +263,8 @@ class CoreBroker(ConfigurableManager):
logger.error("unknown message type received: %s", msgtype)
try:
self.session_handler.sendall(data)
for session_client in self.session_clients:
session_client.sendall(data)
except IOError:
logger.exception("error sending message")
@ -460,9 +461,11 @@ class CoreBroker(ConfigurableManager):
continue
hosts.append(server.host)
if len(hosts) == 0 and self.session_handler.client_address != "":
# get IP address from API message sender (master)
hosts.append(self.session_handler.client_address[0])
if len(hosts) == 0:
for session_client in self.session_clients:
# get IP address from API message sender (master)
if session_client.client_address != "":
hosts.append(session_client.client_address[0])
r = []
for host in hosts:
@ -942,9 +945,14 @@ class CoreBroker(ConfigurableManager):
host = opaque.split(":")[0]
if host == "":
host = None
if host is None and self.session_handler.client_address != "":
# get IP address from API message sender (master)
host = self.session_handler.client_address[0]
if host is None:
for session_client in self.session_clients:
# get IP address from API message sender (master)
if session_client.client_address != "":
host = session_client.client_address[0]
break
return host
def handlerawmsg(self, msg):
@ -1049,11 +1057,12 @@ class CoreBroker(ConfigurableManager):
if server is not None:
server.instantiation_complete = True
if self.session_handler:
tlvdata = ""
tlvdata += coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.INSTANTIATION_COMPLETE.value)
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
self.session_handler.sendall(msg)
# broadcast out instantiate complete
tlvdata = ""
tlvdata += coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.INSTANTIATION_COMPLETE.value)
message = coreapi.CoreEventMessage.pack(0, tlvdata)
for session_client in self.session_clients:
session_client.sendall(message)
def instantiation_complete(self):
"""

View file

@ -133,7 +133,12 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
self.session.exception_handlers.remove(self.handle_broadcast_exception)
self.session.node_handlers.remove(self.handle_broadcast_node)
self.session.link_handlers.remove(self.handle_broadcast_link)
self.session.shutdown()
self.session.file_handlers.remove(self.handle_broadcast_file)
# remove client from session broker and shutdown if there are no clients
self.session.broker.session_clients.remove(self)
if not self.session.broker.session_clients:
self.session.shutdown()
return SocketServer.BaseRequestHandler.finish(self)
@ -505,8 +510,10 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
# TODO: hack to associate this handler with this sessions broker for broadcasting
# TODO: broker needs to be pulled out of session to the server/handler level
self.session.broker.session_handler = self
# self.session.connect(self)
if self.master:
logger.info("SESSION SET TO MASTER!")
self.session.master = True
self.session.broker.session_clients.append(self)
# add handlers for various data
logger.info("adding session broadcast handlers")
@ -1178,6 +1185,13 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
# TODO: need to replicate functionality?
# self.server.set_session_master(self)
# find the session containing this client and set the session to master
for session in self.server.sessions.itervalues():
if self in session.broker.session_clients:
logger.info("SESSION SET TO MASTER!: %s", session.session_id)
session.master = True
break
replies.append(self.register())
replies.append(self.server.to_session_message())
@ -1460,18 +1474,35 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
logger.info("session %s not found (flags=0x%x)", session_id, message.flags)
continue
if session.server is None:
# this needs to be set when executing a Python script
session.server = self.server
if message.flags & MessageFlags.ADD.value:
# connect to the first session that exists
logger.info("request to connect to session %s" % session_id)
# this may shutdown the session if no handlers exist
# TODO: determine what we want to do here
self.session.disconnect(self)
# remove client from session broker and shutdown if needed
self.session.broker.session_clients.remove(self)
active_states = [
EventTypes.RUNTIME_STATE.value,
EventTypes.RUNTIME_STATE.value,
EventTypes.DATACOLLECT_STATE.value
]
if not self.session.broker.session_clients and self.session.state not in active_states:
self.session.shutdown()
# set session to join
self.session = session
self.session.connect(self)
# add client to session broker and set master if needed
if self.master:
self.session.master = True
self.session.broker.session_clients.append(self)
# add broadcast handlers
logger.info("adding session broadcast handlers")
self.session.event_handlers.append(self.handle_broadcast_event)
self.session.exception_handlers.append(self.handle_broadcast_exception)
self.session.node_handlers.append(self.handle_broadcast_node)
self.session.link_handlers.append(self.handle_broadcast_link)
self.session.file_handlers.append(self.handle_broadcast_file)
if user is not None:
self.session.set_user(user)
@ -1716,6 +1747,7 @@ class BaseAuxRequestHandler(CoreRequestHandler):
self.session.exception_handlers.remove(self.handle_broadcast_exception)
self.session.node_handlers.remove(self.handle_broadcast_node)
self.session.link_handlers.remove(self.handle_broadcast_link)
self.session.file_handlers.remove(self.handle_broadcast_file)
self.session.shutdown()
return SocketServer.BaseRequestHandler.finish(self)

View file

@ -281,12 +281,7 @@ class MobilityManager(ConfigurableManager):
:param net: network to install
:return: nothing
"""
try:
nodenums = self.physnets[net.objid]
except KeyError:
logger.exception("error retriving physical net object")
return
nodenums = self.physnets.get(net.objid, [])
for nodenum in nodenums:
node = self.phys[nodenum]
# TODO: fix this bad logic, relating to depending on a break to get a valid server

View file

@ -1256,24 +1256,27 @@ class Session(object):
"""
Return API messages that describe the current session.
"""
# send node messages for node and network objects
# send link messages from net objects
number_nodes = 0
number_links = 0
# find all nodes and links
nodes_data = []
links_data = []
with self._objects_lock:
for obj in self.objects.itervalues():
node_data = obj.data(message_type=MessageFlags.ADD.value)
if node_data:
self.broadcast_node(node_data)
# replies.append(message)
number_nodes += 1
nodes_data.append(node_data)
links_data = obj.all_link_data(flags=MessageFlags.ADD.value)
for link_data in links_data:
self.broadcast_link(link_data)
# replies.append(link_data)
number_links += 1
node_links = obj.all_link_data(flags=MessageFlags.ADD.value)
for link_data in node_links:
links_data.append(link_data)
# send all nodes first, so that they will exist for any links
logger.info("nodes: %s", nodes_data)
logger.info("links: %s", links_data)
for node_data in nodes_data:
self.broadcast_node(node_data)
for link_data in links_data:
self.broadcast_link(link_data)
# send model info
configs = self.mobility.getallconfigs()
@ -1295,18 +1298,10 @@ class Session(object):
node=node_number,
opaque=opaque
)
# replies.append(self.services.configure_request(config_data))
config_response = self.services.configure_request(config_data)
self.broadcast_config(config_response)
for file_name, config_data in self.services.getallfiles(service):
# flags = MessageFlags.ADD.value
# tlv_data = coreapi.CoreFileTlv.pack(FileTlvs.NODE.value, node_number)
# tlv_data += coreapi.CoreFileTlv.pack(FileTlvs.NAME.value, str(file_name))
# tlv_data += coreapi.CoreFileTlv.pack(FileTlvs.TYPE.value, opaque)
# tlv_data += coreapi.CoreFileTlv.pack(FileTlvs.DATA.value, str(config_data))
# replies.append(coreapi.CoreFileMessage.pack(flags, tlv_data))
file_data = FileData(
message_type=MessageFlags.ADD.value,
node=node_number,
@ -1321,12 +1316,6 @@ class Session(object):
# send hook scripts
for state in sorted(self._hooks.keys()):
for file_name, config_data in self._hooks[state]:
# flags = MessageFlags.ADD.value
# tlv_data = coreapi.CoreFileTlv.pack(FileTlvs.NAME.value, str(file_name))
# tlv_data += coreapi.CoreFileTlv.pack(FileTlvs.TYPE.value, "hook:%s" % state)
# tlv_data += coreapi.CoreFileTlv.pack(FileTlvs.DATA.value, str(config_data))
# replies.append(coreapi.CoreFileMessage.pack(flags, tlv_data))
file_data = FileData(
message_type=MessageFlags.ADD.value,
name=str(file_name),
@ -1345,7 +1334,7 @@ class Session(object):
metadata_config = self.metadata.configure_request(config_data, type_flags=ConfigFlags.UPDATE.value)
self.broadcast_config(metadata_config)
logger.info("informing GUI about %d nodes and %d links", number_nodes, number_links)
logger.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data))
class SessionConfig(ConfigurableManager, Configurable):