merged latest from master

This commit is contained in:
Blake Harnden 2019-06-07 10:05:40 -07:00
commit e62ae42bdc
6 changed files with 195 additions and 20 deletions

View file

@ -4,16 +4,17 @@ socket server request handlers leveraged by core servers.
import logging
import os
from queue import Queue, Empty
import shlex
import shutil
import socketserver
import sys
import threading
import time
from builtins import range
from itertools import repeat
import socketserver
from builtins import range
from queue import Queue, Empty
from core import utils
from core.api.tlv import coreapi, dataconversion, structutils
from core.config import ConfigShim
@ -306,6 +307,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
"""
logging.debug("handling broadcast node: %s", node_data)
message = dataconversion.convert_node(node_data)
try:
self.sendall(message)
except IOError:
@ -538,7 +540,6 @@ class CoreHandler(socketserver.BaseRequestHandler):
# TODO: add shutdown handler for session
self.session = self.coreemu.create_session(port, master=False)
# self.session.shutdown_handlers.append(self.session_shutdown)
logging.debug("created new session for client: %s", self.session.id)
# TODO: hack to associate this handler with this sessions broker for broadcasting
@ -1023,7 +1024,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
self.session.location.setrefgeo(lat, lon, alt)
self.session.location.refscale = values[5]
logging.info("location configured: %s = %s scale=%s", self.session.location.refxyz,
self.session.location.refgeo, self.session.location.refscale)
self.session.location.refgeo, self.session.location.refscale)
logging.info("location configured: UTM%s", self.session.location.refutm)
def handle_config_metadata(self, message_type, config_data):
@ -1787,3 +1788,116 @@ class CoreHandler(socketserver.BaseRequestHandler):
self.session.broadcast_config(config_data)
logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data))
class CoreUdpHandler(CoreHandler):
def __init__(self, request, client_address, server):
self.message_handlers = {
MessageTypes.NODE.value: self.handle_node_message,
MessageTypes.LINK.value: self.handle_link_message,
MessageTypes.EXECUTE.value: self.handle_execute_message,
MessageTypes.REGISTER.value: self.handle_register_message,
MessageTypes.CONFIG.value: self.handle_config_message,
MessageTypes.FILE.value: self.handle_file_message,
MessageTypes.INTERFACE.value: self.handle_interface_message,
MessageTypes.EVENT.value: self.handle_event_message,
MessageTypes.SESSION.value: self.handle_session_message,
}
self.master = False
self.session = None
socketserver.BaseRequestHandler.__init__(self, request, client_address, server)
def setup(self):
"""
Client has connected, set up a new connection.
:return: nothing
"""
pass
def receive_message(self):
data = self.request[0]
header = data[:coreapi.CoreMessage.header_len]
if len(header) < coreapi.CoreMessage.header_len:
raise IOError("error receiving header (received %d bytes)" % len(header))
message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header)
if message_len == 0:
logging.warning("received message with no data")
return
if len(data) != coreapi.CoreMessage.header_len + message_len:
logging.error("received message length does not match received data (%s != %s)",
len(data), coreapi.CoreMessage.header_len + message_len)
raise IOError
try:
message_class = coreapi.CLASS_MAP[message_type]
message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:])
return message
except KeyError:
message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:])
message.msgtype = message_type
logging.exception("unimplemented core message type: %s", message.type_str())
def handle(self):
message = self.receive_message()
sessions = message.session_numbers()
message.queuedtimes = 0
if sessions:
for session_id in sessions:
session = self.server.mainserver.coreemu.sessions.get(session_id)
if session:
logging.debug("session handling message: %s", session.session_id)
self.session = session
self.handle_message(message)
self.broadcast(message)
else:
logging.error("session %d in %s message not found.", session_id, message.type_str())
else:
# no session specified, find an existing one
session = None
node_count = 0
for session_id in self.server.mainserver.coreemu.sessions:
current_session = self.server.mainserver.coreemu.sessions[session_id]
current_node_count = current_session.get_node_count()
if current_session.state == EventTypes.RUNTIME_STATE.value and current_node_count > node_count:
node_count = current_node_count
session = current_session
if session or message.message_type == MessageTypes.REGISTER.value:
self.session = session
self.handle_message(message)
self.broadcast(message)
else:
logging.error("no active session, dropping %s message.", message.type_str())
def broadcast(self, message):
if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)):
return
for client in self.session.broker.session_clients:
try:
client.sendall(message.raw_message)
except IOError:
logging.error("error broadcasting")
def finish(self):
return socketserver.BaseRequestHandler.finish(self)
def queuemsg(self, msg):
"""
UDP handlers are short-lived and do not have message queues.
:param bytes msg: message to queue
:return:
"""
raise Exception("Unable to queue %s message for later processing using UDP!" % msg)
def sendall(self, data):
"""
Use sendto() on the connectionless UDP socket.
:param data:
:return:
"""
self.request[1].sendto(data, self.client_address)

View file

@ -23,8 +23,36 @@ class CoreServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
:param tuple[str, int] server_address: server host and port to use
:param class handler_class: request handler
:param dict config: configuration setting
:return:
"""
self.coreemu = CoreEmu(config)
self.config = config
socketserver.TCPServer.__init__(self, server_address, handler_class)
class CoreUdpServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
"""
UDP server class, manages sessions and spawns request handlers for
incoming connections.
"""
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address, handler_class, mainserver):
"""
Server class initialization takes configuration data and calls
the SocketServer constructor
:param server_address:
:param class handler_class: request handler
:param mainserver:
"""
self.mainserver = mainserver
socketserver.UDPServer.__init__(self, server_address, handler_class)
def start(self):
"""
Thread target to run concurrently with the TCP server.
:return: nothing
"""
self.serve_forever()