Merge branch 'enhancement/distributed-flask' of https://github.com/coreemu/core into enhancement/distributed-flask

This commit is contained in:
bharnden 2019-10-17 12:13:26 -07:00
commit 8a3183c8b3
31 changed files with 692 additions and 1965 deletions

File diff suppressed because it is too large Load diff

View file

@ -15,7 +15,6 @@ from core.api.tlv import structutils
from core.emulator.enumerations import (
ConfigTlvs,
EventTlvs,
EventTypes,
ExceptionTlvs,
ExecuteTlvs,
FileTlvs,
@ -1017,20 +1016,3 @@ def str_to_list(value):
return None
return value.split("|")
def state_name(value):
"""
Helper to convert state number into state name using event types.
:param int value: state value to derive name from
:return: state name
:rtype: str
"""
try:
value = EventTypes(value).name
except ValueError:
value = "unknown"
return value

View file

@ -86,6 +86,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
self.master = False
self.session = None
self.session_clients = {}
# core emulator
self.coreemu = server.coreemu
@ -138,8 +139,9 @@ class CoreHandler(socketserver.BaseRequestHandler):
if self.session:
# remove client from session broker and shutdown if there are no clients
self.remove_session_handlers()
self.session.broker.session_clients.remove(self)
if not self.session.broker.session_clients and not self.session.is_active():
clients = self.session_clients[self.session.id]
clients.remove(self)
if not clients and not self.session.is_active():
logging.info(
"no session clients left and not active, initiating shutdown"
)
@ -407,9 +409,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
tlv_data += coreapi.CoreRegisterTlv.pack(
RegisterTlvs.EMULATION_SERVER.value, "core-daemon"
)
tlv_data += coreapi.CoreRegisterTlv.pack(
self.session.broker.config_type, self.session.broker.name
)
tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.UTILITY.value, "broker")
tlv_data += coreapi.CoreRegisterTlv.pack(
self.session.location.config_type, self.session.location.name
)
@ -533,10 +533,6 @@ class CoreHandler(socketserver.BaseRequestHandler):
:param message: message to handle
:return: nothing
"""
if self.session and self.session.broker.handle_message(message):
logging.debug("message not being handled locally")
return
logging.debug(
"%s handling message:\n%s", threading.currentThread().getName(), message
)
@ -606,12 +602,11 @@ class CoreHandler(socketserver.BaseRequestHandler):
self.session = self.coreemu.create_session(port, master=False)
logging.debug("created new session for client: %s", self.session.id)
# TODO: hack to associate this handler with this sessions broker for broadcasting
# TODO: broker needs to be pulled out of session to the server/handler level
if self.master:
logging.debug("session set to master")
self.session.master = True
self.session.broker.session_clients.append(self)
clients = self.session_clients.setdefault(self.session.id, [])
clients.append(self)
# add handlers for various data
self.add_session_handlers()
@ -643,7 +638,8 @@ class CoreHandler(socketserver.BaseRequestHandler):
]:
continue
for client in self.session.broker.session_clients:
clients = self.session_clients[self.session.id]
for client in clients:
if client == self:
continue
@ -734,6 +730,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
node_options.icon = message.get_tlv(NodeTlvs.ICON.value)
node_options.canvas = message.get_tlv(NodeTlvs.CANVAS.value)
node_options.opaque = message.get_tlv(NodeTlvs.OPAQUE.value)
node_options.emulation_server = message.get_tlv(NodeTlvs.EMULATION_SERVER.value)
services = message.get_tlv(NodeTlvs.SERVICES.value)
if services:
@ -1027,8 +1024,9 @@ class CoreHandler(socketserver.BaseRequestHandler):
# find the session containing this client and set the session to master
for _id in self.coreemu.sessions:
session = self.coreemu.sessions[_id]
if self in session.broker.session_clients:
clients = self.session_clients[_id]
if self in clients:
session = self.coreemu.sessions[_id]
logging.debug("setting session to master: %s", session.id)
session.master = True
break
@ -1077,7 +1075,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
self.handle_config_location(message_type, config_data)
elif config_data.object == self.session.metadata.name:
replies = self.handle_config_metadata(message_type, config_data)
elif config_data.object == self.session.broker.name:
elif config_data.object == "broker":
self.handle_config_broker(message_type, config_data)
elif config_data.object == self.session.services.name:
replies = self.handle_config_services(message_type, config_data)
@ -1182,7 +1180,6 @@ class CoreHandler(socketserver.BaseRequestHandler):
def handle_config_broker(self, message_type, config_data):
if message_type not in [ConfigFlags.REQUEST, ConfigFlags.RESET]:
session_id = config_data.session
if not config_data.data_values:
logging.info("emulation server data missing")
else:
@ -1194,29 +1191,10 @@ class CoreHandler(socketserver.BaseRequestHandler):
for server in server_list:
server_items = server.split(":")
name, host, port = server_items[:3]
if host == "":
host = None
if port == "":
port = None
else:
port = int(port)
if session_id is not None:
# receive session ID and my IP from master
self.session.broker.session_id_master = int(
session_id.split("|")[0]
)
self.session.broker.myip = host
host = None
port = None
# this connects to the server immediately; maybe we should wait
# or spin off a new "client" thread here
self.session.broker.addserver(name, host, port)
self.session.broker.setupserver(name)
name, host, _ = server_items[:3]
self.session.distributed.add_server(name, host)
elif message_type == ConfigFlags.RESET:
self.session.distributed.shutdown()
def handle_config_services(self, message_type, config_data):
replies = []
@ -1842,11 +1820,9 @@ class CoreHandler(socketserver.BaseRequestHandler):
# remove client from session broker and shutdown if needed
self.remove_session_handlers()
self.session.broker.session_clients.remove(self)
if (
not self.session.broker.session_clients
and not self.session.is_active()
):
clients = self.session_clients[self.session.id]
clients.remove(self)
if not clients and not self.session.is_active():
self.coreemu.delete_session(self.session.id)
# set session to join
@ -1855,7 +1831,8 @@ class CoreHandler(socketserver.BaseRequestHandler):
# add client to session broker and set master if needed
if self.master:
self.session.master = True
self.session.broker.session_clients.append(self)
clients = self.session_clients.setdefault(self.session.id, [])
clients.append(self)
# add broadcast handlers
logging.info("adding session broadcast handlers")
@ -2106,6 +2083,7 @@ class CoreUdpHandler(CoreHandler):
logging.debug("session handling message: %s", session.session_id)
self.session = session
self.handle_message(message)
self.session.sdt.handle_distributed(message)
self.broadcast(message)
else:
logging.error(
@ -2130,6 +2108,7 @@ class CoreUdpHandler(CoreHandler):
if session or message.message_type == MessageTypes.REGISTER.value:
self.session = session
self.handle_message(message)
self.session.sdt.handle_distributed(message)
self.broadcast(message)
else:
logging.error(
@ -2140,7 +2119,8 @@ class CoreUdpHandler(CoreHandler):
if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)):
return
for client in self.session.broker.session_clients:
clients = self.session_clients[self.session.id]
for client in clients:
try:
client.sendall(message.raw_message)
except IOError: