From e59a8bf66d23bf757f8b61e7202029bbef260268 Mon Sep 17 00:00:00 2001 From: Blake Harnden Date: Thu, 6 Jun 2019 11:33:22 -0700 Subject: [PATCH 1/4] changes to add back in coresendmsg udp support --- daemon/core/corehandlers.py | 122 +++++++++++++++++++++++++++++++++++- daemon/core/coreserver.py | 31 ++++++++- daemon/scripts/core-daemon | 38 ++++++++--- daemon/scripts/coresendmsg | 11 +++- 4 files changed, 189 insertions(+), 13 deletions(-) diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py index 4a38bd3c..ee3dc2e9 100644 --- a/daemon/core/corehandlers.py +++ b/daemon/core/corehandlers.py @@ -136,6 +136,8 @@ class CoreHandler(SocketServer.BaseRequestHandler): logging.info("connection closed: %s", self.client_address) if self.session: # remove client from session broker and shutdown if there are no clients + clients = self.server.session_clients.setdefault(self.session.session_id, []) + clients.remove(self) self.remove_session_handlers() self.session.broker.session_clients.remove(self) if not self.session.broker.session_clients and not self.session.is_active(): @@ -307,6 +309,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): """ logging.debug("handling broadcast node: %s", node_data) message = dataconversion.convert_node(node_data) + try: self.sendall(message) except IOError: @@ -534,7 +537,8 @@ class CoreHandler(SocketServer.BaseRequestHandler): # TODO: add shutdown handler for session self.session = self.coreemu.create_session(port, master=False) - # self.session.shutdown_handlers.append(self.session_shutdown) + clients = self.server.session_clients.setdefault(self.session.session_id, []) + clients.append(self) logging.debug("created new session for client: %s", self.session.session_id) # TODO: hack to associate this handler with this sessions broker for broadcasting @@ -1017,7 +1021,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.session.location.setrefgeo(lat, lon, alt) self.session.location.refscale = values[5] logging.info("location configured: %s = %s scale=%s", self.session.location.refxyz, - self.session.location.refgeo, self.session.location.refscale) + self.session.location.refgeo, self.session.location.refscale) logging.info("location configured: UTM%s", self.session.location.refutm) def handle_config_metadata(self, message_type, config_data): @@ -1770,3 +1774,117 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.session.broadcast_config(config_data) logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data)) + + +class CoreUdpHandler(CoreHandler): + def __init__(self, request, client_address, server): + self.message_handlers = { + MessageTypes.NODE.value: self.handle_node_message, + MessageTypes.LINK.value: self.handle_link_message, + MessageTypes.EXECUTE.value: self.handle_execute_message, + MessageTypes.REGISTER.value: self.handle_register_message, + MessageTypes.CONFIG.value: self.handle_config_message, + MessageTypes.FILE.value: self.handle_file_message, + MessageTypes.INTERFACE.value: self.handle_interface_message, + MessageTypes.EVENT.value: self.handle_event_message, + MessageTypes.SESSION.value: self.handle_session_message, + } + self.master = False + self.session = None + SocketServer.BaseRequestHandler.__init__(self, request, client_address, server) + + def setup(self): + """ + Client has connected, set up a new connection. + :return: nothing + """ + logging.info("new UDP connection: %s", self.client_address) + + def receive_message(self): + data = self.request[0] + header = data[:coreapi.CoreMessage.header_len] + if len(header) < coreapi.CoreMessage.header_len: + raise IOError("error receiving header (received %d bytes)" % len(header)) + + message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header) + if message_len == 0: + logging.warning("received message with no data") + return + + if len(data) != coreapi.CoreMessage.header_len + message_len: + logging.error("received message length does not match received data (%s != %s)", + len(data), coreapi.CoreMessage.header_len + message_len) + raise IOError + + try: + message_class = coreapi.CLASS_MAP[message_type] + message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:]) + return message + except KeyError: + message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:]) + message.msgtype = message_type + logging.exception("unimplemented core message type: %s", message.type_str()) + + def handle(self): + message = self.receive_message() + sessions = message.session_numbers() + message.queuedtimes = 0 + if sessions: + for session_id in sessions: + session = self.server.mainserver.coreemu.sessions.get(session_id) + if session: + logging.debug("session handling message: %s", session.session_id) + self.session = session + self.handle_message(message) + self.broadcast(message) + else: + logging.error("session %d in %s message not found.", session_id, message.type_str()) + else: + # no session specified, find an existing one + session = None + node_count = 0 + for session_id in self.server.mainserver.coreemu.sessions: + current_session = self.server.mainserver.coreemu.sessions[session_id] + current_node_count = current_session.get_node_count() + if current_session.state == EventTypes.RUNTIME_STATE.value and current_node_count > node_count: + node_count = current_node_count + session = current_session + + if session or message.message_type == MessageTypes.REGISTER.value: + self.session = session + self.handle_message(message) + self.broadcast(message) + else: + logging.error("no active session, dropping %s message.", message.type_str()) + + def broadcast(self, message): + if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)): + return + + clients = self.server.mainserver.session_clients.setdefault(self.session.session_id, []) + for client in clients: + try: + client.sendall(message.raw_message) + except IOError: + logging.error("error broadcasting") + + def finish(self): + return SocketServer.BaseRequestHandler.finish(self) + + def queuemsg(self, msg): + """ + UDP handlers are short-lived and do not have message queues. + + :param bytes msg: message to queue + :return: + """ + raise Exception("Unable to queue %s message for later processing using UDP!" % msg) + + def sendall(self, data): + """ + Use sendto() on the connectionless UDP socket. + + :param data: + :return: + """ + self.request[1].sendto(data, self.client_address) diff --git a/daemon/core/coreserver.py b/daemon/core/coreserver.py index 2c052b05..97f6b9d0 100644 --- a/daemon/core/coreserver.py +++ b/daemon/core/coreserver.py @@ -23,8 +23,37 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): :param tuple[str, int] server_address: server host and port to use :param class handler_class: request handler :param dict config: configuration setting - :return: """ self.coreemu = CoreEmu(config) self.config = config + self.session_clients = {} SocketServer.TCPServer.__init__(self, server_address, handler_class) + + +class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer): + """ + UDP server class, manages sessions and spawns request handlers for + incoming connections. + """ + daemon_threads = True + allow_reuse_address = True + + def __init__(self, server_address, handler_class, mainserver): + """ + Server class initialization takes configuration data and calls + the SocketServer constructor + + :param server_address: + :param class handler_class: request handler + :param mainserver: + """ + self.mainserver = mainserver + SocketServer.UDPServer.__init__(self, server_address, handler_class) + + def start(self): + """ + Thread target to run concurrently with the TCP server. + + :return: nothing + """ + self.serve_forever() diff --git a/daemon/scripts/core-daemon b/daemon/scripts/core-daemon index 34c516d3..d6142e94 100644 --- a/daemon/scripts/core-daemon +++ b/daemon/scripts/core-daemon @@ -9,13 +9,14 @@ import ConfigParser import logging import optparse import sys +import threading import time -from core import load_logging_config from core import constants from core import enumerations -from core.corehandlers import CoreHandler -from core.coreserver import CoreServer +from core import load_logging_config +from core.corehandlers import CoreHandler, CoreUdpHandler +from core.coreserver import CoreServer, CoreUdpServer from core.misc.utils import close_onexec load_logging_config() @@ -30,6 +31,21 @@ def banner(): logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) +def start_udp(mainserver, server_address): + """ + Start a thread running a UDP server on the same host,port for + connectionless requests. + + :param CoreServer mainserver: main core tcp server to piggy back off of + :param server_address: + :return: CoreUdpServer + """ + mainserver.udpserver = CoreUdpServer(server_address, CoreUdpHandler, mainserver) + mainserver.udpthread = threading.Thread(target=mainserver.udpserver.start) + mainserver.udpthread.daemon = True + mainserver.udpthread.start() + + def cored(cfg, use_ovs): """ Start the CoreServer object and enter the server loop. @@ -44,18 +60,24 @@ def cored(cfg, use_ovs): host = "localhost" try: - server = CoreServer((host, port), CoreHandler, cfg) + address = (host, port) + server = CoreServer(address, CoreHandler, cfg) if use_ovs: from core.netns.openvswitch import OVS_NODES server.coreemu.update_nodes(OVS_NODES) + + # start udp server + start_udp(server, address) + + # close handlers + close_onexec(server.fileno()) + + logging.info("tcp/udp servers started, listening on: %s:%s", host, port) + server.serve_forever() except: logging.exception("error starting main server on: %s:%s", host, port) sys.exit(1) - close_onexec(server.fileno()) - logging.info("server started, listening on: %s:%s", host, port) - server.serve_forever() - def get_merged_config(filename): """ diff --git a/daemon/scripts/coresendmsg b/daemon/scripts/coresendmsg index be6e030b..d08360b8 100644 --- a/daemon/scripts/coresendmsg +++ b/daemon/scripts/coresendmsg @@ -186,6 +186,8 @@ def main(): help="Listen for a response message and print it.") parser.add_option("-t", "--list-tlvs", dest="tlvs", action="store_true", help="List TLVs for the specified message type.") + parser.add_option("--tcp", dest="tcp", action="store_true", + help="Use TCP instead of UDP and connect to a session default: %s" % parser.defaults["tcp"]) def usage(msg=None, err=0): sys.stdout.write("\n") @@ -249,7 +251,12 @@ def main(): msg = msg_cls.pack(flags, tlvdata) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if opt.tcp: + protocol = socket.SOCK_STREAM + else: + protocol = socket.SOCK_DGRAM + + sock = socket.socket(socket.AF_INET, protocol) sock.setblocking(True) try: @@ -258,7 +265,7 @@ def main(): print "Error connecting to %s:%s:\n\t%s" % (opt.address, opt.port, e) sys.exit(1) - if not connect_to_session(sock, opt.session): + if opt.tcp and not connect_to_session(sock, opt.session): print "warning: continuing without joining a session!" sock.sendall(msg) From 0b770d8350f4db983c9b33ef360a4ab3caa6539f Mon Sep 17 00:00:00 2001 From: Blake Harnden Date: Thu, 6 Jun 2019 11:43:39 -0700 Subject: [PATCH 2/4] reverted enclosing system start in exception handler --- daemon/scripts/core-daemon | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/daemon/scripts/core-daemon b/daemon/scripts/core-daemon index d6142e94..b6b5b767 100644 --- a/daemon/scripts/core-daemon +++ b/daemon/scripts/core-daemon @@ -65,19 +65,19 @@ def cored(cfg, use_ovs): if use_ovs: from core.netns.openvswitch import OVS_NODES server.coreemu.update_nodes(OVS_NODES) - - # start udp server - start_udp(server, address) - - # close handlers - close_onexec(server.fileno()) - - logging.info("tcp/udp servers started, listening on: %s:%s", host, port) - server.serve_forever() except: logging.exception("error starting main server on: %s:%s", host, port) sys.exit(1) + # start udp server + start_udp(server, address) + + # close handlers + close_onexec(server.fileno()) + + logging.info("tcp/udp servers started, listening on: %s:%s", host, port) + server.serve_forever() + def get_merged_config(filename): """ From 994fe042e140770c3c4651dcd3ac7b189cf413c9 Mon Sep 17 00:00:00 2001 From: Blake Harnden Date: Thu, 6 Jun 2019 13:02:20 -0700 Subject: [PATCH 3/4] updates to just leverage broker clients instead of repeating logic for now, until broker is refactored --- daemon/core/corehandlers.py | 9 ++------- daemon/core/coreserver.py | 1 - 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py index ee3dc2e9..c550e141 100644 --- a/daemon/core/corehandlers.py +++ b/daemon/core/corehandlers.py @@ -136,8 +136,6 @@ class CoreHandler(SocketServer.BaseRequestHandler): logging.info("connection closed: %s", self.client_address) if self.session: # remove client from session broker and shutdown if there are no clients - clients = self.server.session_clients.setdefault(self.session.session_id, []) - clients.remove(self) self.remove_session_handlers() self.session.broker.session_clients.remove(self) if not self.session.broker.session_clients and not self.session.is_active(): @@ -537,8 +535,6 @@ class CoreHandler(SocketServer.BaseRequestHandler): # TODO: add shutdown handler for session self.session = self.coreemu.create_session(port, master=False) - clients = self.server.session_clients.setdefault(self.session.session_id, []) - clients.append(self) logging.debug("created new session for client: %s", self.session.session_id) # TODO: hack to associate this handler with this sessions broker for broadcasting @@ -1798,7 +1794,7 @@ class CoreUdpHandler(CoreHandler): Client has connected, set up a new connection. :return: nothing """ - logging.info("new UDP connection: %s", self.client_address) + pass def receive_message(self): data = self.request[0] @@ -1861,8 +1857,7 @@ class CoreUdpHandler(CoreHandler): if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)): return - clients = self.server.mainserver.session_clients.setdefault(self.session.session_id, []) - for client in clients: + for client in self.session.broker.session_clients: try: client.sendall(message.raw_message) except IOError: diff --git a/daemon/core/coreserver.py b/daemon/core/coreserver.py index 97f6b9d0..5ba232da 100644 --- a/daemon/core/coreserver.py +++ b/daemon/core/coreserver.py @@ -26,7 +26,6 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): """ self.coreemu = CoreEmu(config) self.config = config - self.session_clients = {} SocketServer.TCPServer.__init__(self, server_address, handler_class) From b448c6ebf08bee281cb2a1fba320ab9e74fd1f68 Mon Sep 17 00:00:00 2001 From: Blake Harnden Date: Fri, 7 Jun 2019 09:10:34 -0700 Subject: [PATCH 4/4] bumped version to 5.2.2 and fixed issue in gui that prevented moving nodes while mobility was occuring --- configure.ac | 2 +- gui/wlan.tcl | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/configure.ac b/configure.ac index 10c1c245..bd9127c3 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. # this defines the CORE version number, must be static for AC_INIT -AC_INIT(core, 5.2.1, core-dev@nrl.navy.mil) +AC_INIT(core, 5.2.2, core-dev@nrl.navy.mil) # autoconf and automake initialization AC_CONFIG_SRCDIR([netns/version.h.in]) diff --git a/gui/wlan.tcl b/gui/wlan.tcl index b110a4aa..bea770a7 100644 --- a/gui/wlan.tcl +++ b/gui/wlan.tcl @@ -243,7 +243,6 @@ proc moveNode { c node img xpos ypos dx dy } { "wlanlink && need_redraw"] { redrawWlanLink $wlanlink } - $c dtag node selected $c delete -withtags selectmark $c dtag link need_redraw $c dtag wlanlink need_redraw