diff --git a/configure.ac b/configure.ac index 10c1c245..bd9127c3 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. # this defines the CORE version number, must be static for AC_INIT -AC_INIT(core, 5.2.1, core-dev@nrl.navy.mil) +AC_INIT(core, 5.2.2, core-dev@nrl.navy.mil) # autoconf and automake initialization AC_CONFIG_SRCDIR([netns/version.h.in]) diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py index 4a38bd3c..c550e141 100644 --- a/daemon/core/corehandlers.py +++ b/daemon/core/corehandlers.py @@ -307,6 +307,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): """ logging.debug("handling broadcast node: %s", node_data) message = dataconversion.convert_node(node_data) + try: self.sendall(message) except IOError: @@ -534,7 +535,6 @@ class CoreHandler(SocketServer.BaseRequestHandler): # TODO: add shutdown handler for session self.session = self.coreemu.create_session(port, master=False) - # self.session.shutdown_handlers.append(self.session_shutdown) logging.debug("created new session for client: %s", self.session.session_id) # TODO: hack to associate this handler with this sessions broker for broadcasting @@ -1017,7 +1017,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.session.location.setrefgeo(lat, lon, alt) self.session.location.refscale = values[5] logging.info("location configured: %s = %s scale=%s", self.session.location.refxyz, - self.session.location.refgeo, self.session.location.refscale) + self.session.location.refgeo, self.session.location.refscale) logging.info("location configured: UTM%s", self.session.location.refutm) def handle_config_metadata(self, message_type, config_data): @@ -1770,3 +1770,116 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.session.broadcast_config(config_data) logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data)) + + +class CoreUdpHandler(CoreHandler): + def __init__(self, request, client_address, server): + self.message_handlers = { + MessageTypes.NODE.value: self.handle_node_message, + MessageTypes.LINK.value: self.handle_link_message, + MessageTypes.EXECUTE.value: self.handle_execute_message, + MessageTypes.REGISTER.value: self.handle_register_message, + MessageTypes.CONFIG.value: self.handle_config_message, + MessageTypes.FILE.value: self.handle_file_message, + MessageTypes.INTERFACE.value: self.handle_interface_message, + MessageTypes.EVENT.value: self.handle_event_message, + MessageTypes.SESSION.value: self.handle_session_message, + } + self.master = False + self.session = None + SocketServer.BaseRequestHandler.__init__(self, request, client_address, server) + + def setup(self): + """ + Client has connected, set up a new connection. + :return: nothing + """ + pass + + def receive_message(self): + data = self.request[0] + header = data[:coreapi.CoreMessage.header_len] + if len(header) < coreapi.CoreMessage.header_len: + raise IOError("error receiving header (received %d bytes)" % len(header)) + + message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header) + if message_len == 0: + logging.warning("received message with no data") + return + + if len(data) != coreapi.CoreMessage.header_len + message_len: + logging.error("received message length does not match received data (%s != %s)", + len(data), coreapi.CoreMessage.header_len + message_len) + raise IOError + + try: + message_class = coreapi.CLASS_MAP[message_type] + message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:]) + return message + except KeyError: + message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:]) + message.msgtype = message_type + logging.exception("unimplemented core message type: %s", message.type_str()) + + def handle(self): + message = self.receive_message() + sessions = message.session_numbers() + message.queuedtimes = 0 + if sessions: + for session_id in sessions: + session = self.server.mainserver.coreemu.sessions.get(session_id) + if session: + logging.debug("session handling message: %s", session.session_id) + self.session = session + self.handle_message(message) + self.broadcast(message) + else: + logging.error("session %d in %s message not found.", session_id, message.type_str()) + else: + # no session specified, find an existing one + session = None + node_count = 0 + for session_id in self.server.mainserver.coreemu.sessions: + current_session = self.server.mainserver.coreemu.sessions[session_id] + current_node_count = current_session.get_node_count() + if current_session.state == EventTypes.RUNTIME_STATE.value and current_node_count > node_count: + node_count = current_node_count + session = current_session + + if session or message.message_type == MessageTypes.REGISTER.value: + self.session = session + self.handle_message(message) + self.broadcast(message) + else: + logging.error("no active session, dropping %s message.", message.type_str()) + + def broadcast(self, message): + if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)): + return + + for client in self.session.broker.session_clients: + try: + client.sendall(message.raw_message) + except IOError: + logging.error("error broadcasting") + + def finish(self): + return SocketServer.BaseRequestHandler.finish(self) + + def queuemsg(self, msg): + """ + UDP handlers are short-lived and do not have message queues. + + :param bytes msg: message to queue + :return: + """ + raise Exception("Unable to queue %s message for later processing using UDP!" % msg) + + def sendall(self, data): + """ + Use sendto() on the connectionless UDP socket. + + :param data: + :return: + """ + self.request[1].sendto(data, self.client_address) diff --git a/daemon/core/coreserver.py b/daemon/core/coreserver.py index 2c052b05..5ba232da 100644 --- a/daemon/core/coreserver.py +++ b/daemon/core/coreserver.py @@ -23,8 +23,36 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): :param tuple[str, int] server_address: server host and port to use :param class handler_class: request handler :param dict config: configuration setting - :return: """ self.coreemu = CoreEmu(config) self.config = config SocketServer.TCPServer.__init__(self, server_address, handler_class) + + +class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer): + """ + UDP server class, manages sessions and spawns request handlers for + incoming connections. + """ + daemon_threads = True + allow_reuse_address = True + + def __init__(self, server_address, handler_class, mainserver): + """ + Server class initialization takes configuration data and calls + the SocketServer constructor + + :param server_address: + :param class handler_class: request handler + :param mainserver: + """ + self.mainserver = mainserver + SocketServer.UDPServer.__init__(self, server_address, handler_class) + + def start(self): + """ + Thread target to run concurrently with the TCP server. + + :return: nothing + """ + self.serve_forever() diff --git a/daemon/scripts/core-daemon b/daemon/scripts/core-daemon index 34c516d3..b6b5b767 100644 --- a/daemon/scripts/core-daemon +++ b/daemon/scripts/core-daemon @@ -9,13 +9,14 @@ import ConfigParser import logging import optparse import sys +import threading import time -from core import load_logging_config from core import constants from core import enumerations -from core.corehandlers import CoreHandler -from core.coreserver import CoreServer +from core import load_logging_config +from core.corehandlers import CoreHandler, CoreUdpHandler +from core.coreserver import CoreServer, CoreUdpServer from core.misc.utils import close_onexec load_logging_config() @@ -30,6 +31,21 @@ def banner(): logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) +def start_udp(mainserver, server_address): + """ + Start a thread running a UDP server on the same host,port for + connectionless requests. + + :param CoreServer mainserver: main core tcp server to piggy back off of + :param server_address: + :return: CoreUdpServer + """ + mainserver.udpserver = CoreUdpServer(server_address, CoreUdpHandler, mainserver) + mainserver.udpthread = threading.Thread(target=mainserver.udpserver.start) + mainserver.udpthread.daemon = True + mainserver.udpthread.start() + + def cored(cfg, use_ovs): """ Start the CoreServer object and enter the server loop. @@ -44,7 +60,8 @@ def cored(cfg, use_ovs): host = "localhost" try: - server = CoreServer((host, port), CoreHandler, cfg) + address = (host, port) + server = CoreServer(address, CoreHandler, cfg) if use_ovs: from core.netns.openvswitch import OVS_NODES server.coreemu.update_nodes(OVS_NODES) @@ -52,8 +69,13 @@ def cored(cfg, use_ovs): logging.exception("error starting main server on: %s:%s", host, port) sys.exit(1) + # start udp server + start_udp(server, address) + + # close handlers close_onexec(server.fileno()) - logging.info("server started, listening on: %s:%s", host, port) + + logging.info("tcp/udp servers started, listening on: %s:%s", host, port) server.serve_forever() diff --git a/daemon/scripts/coresendmsg b/daemon/scripts/coresendmsg index be6e030b..d08360b8 100644 --- a/daemon/scripts/coresendmsg +++ b/daemon/scripts/coresendmsg @@ -186,6 +186,8 @@ def main(): help="Listen for a response message and print it.") parser.add_option("-t", "--list-tlvs", dest="tlvs", action="store_true", help="List TLVs for the specified message type.") + parser.add_option("--tcp", dest="tcp", action="store_true", + help="Use TCP instead of UDP and connect to a session default: %s" % parser.defaults["tcp"]) def usage(msg=None, err=0): sys.stdout.write("\n") @@ -249,7 +251,12 @@ def main(): msg = msg_cls.pack(flags, tlvdata) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if opt.tcp: + protocol = socket.SOCK_STREAM + else: + protocol = socket.SOCK_DGRAM + + sock = socket.socket(socket.AF_INET, protocol) sock.setblocking(True) try: @@ -258,7 +265,7 @@ def main(): print "Error connecting to %s:%s:\n\t%s" % (opt.address, opt.port, e) sys.exit(1) - if not connect_to_session(sock, opt.session): + if opt.tcp and not connect_to_session(sock, opt.session): print "warning: continuing without joining a session!" sock.sendall(msg) diff --git a/gui/wlan.tcl b/gui/wlan.tcl index b110a4aa..bea770a7 100644 --- a/gui/wlan.tcl +++ b/gui/wlan.tcl @@ -243,7 +243,6 @@ proc moveNode { c node img xpos ypos dx dy } { "wlanlink && need_redraw"] { redrawWlanLink $wlanlink } - $c dtag node selected $c delete -withtags selectmark $c dtag link need_redraw $c dtag wlanlink need_redraw