diff --git a/configure.ac b/configure.ac index 340f6139..a9d25d39 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. # this defines the CORE version number, must be static for AC_INIT -AC_INIT(core, 5.2.1) +AC_INIT(core, 5.2.2) # autoconf and automake initialization AC_CONFIG_SRCDIR([netns/version.h.in]) diff --git a/daemon/core/api/tlv/corehandlers.py b/daemon/core/api/tlv/corehandlers.py index e57ec54e..5a145704 100644 --- a/daemon/core/api/tlv/corehandlers.py +++ b/daemon/core/api/tlv/corehandlers.py @@ -4,16 +4,17 @@ socket server request handlers leveraged by core servers. import logging import os -from queue import Queue, Empty import shlex import shutil -import socketserver import sys import threading import time -from builtins import range from itertools import repeat +import socketserver +from builtins import range +from queue import Queue, Empty + from core import utils from core.api.tlv import coreapi, dataconversion, structutils from core.config import ConfigShim @@ -306,6 +307,7 @@ class CoreHandler(socketserver.BaseRequestHandler): """ logging.debug("handling broadcast node: %s", node_data) message = dataconversion.convert_node(node_data) + try: self.sendall(message) except IOError: @@ -538,7 +540,6 @@ class CoreHandler(socketserver.BaseRequestHandler): # TODO: add shutdown handler for session self.session = self.coreemu.create_session(port, master=False) - # self.session.shutdown_handlers.append(self.session_shutdown) logging.debug("created new session for client: %s", self.session.id) # TODO: hack to associate this handler with this sessions broker for broadcasting @@ -1023,7 +1024,7 @@ class CoreHandler(socketserver.BaseRequestHandler): self.session.location.setrefgeo(lat, lon, alt) self.session.location.refscale = values[5] logging.info("location configured: %s = %s scale=%s", self.session.location.refxyz, - self.session.location.refgeo, self.session.location.refscale) + self.session.location.refgeo, self.session.location.refscale) logging.info("location configured: UTM%s", self.session.location.refutm) def handle_config_metadata(self, message_type, config_data): @@ -1787,3 +1788,116 @@ class CoreHandler(socketserver.BaseRequestHandler): self.session.broadcast_config(config_data) logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data)) + + +class CoreUdpHandler(CoreHandler): + def __init__(self, request, client_address, server): + self.message_handlers = { + MessageTypes.NODE.value: self.handle_node_message, + MessageTypes.LINK.value: self.handle_link_message, + MessageTypes.EXECUTE.value: self.handle_execute_message, + MessageTypes.REGISTER.value: self.handle_register_message, + MessageTypes.CONFIG.value: self.handle_config_message, + MessageTypes.FILE.value: self.handle_file_message, + MessageTypes.INTERFACE.value: self.handle_interface_message, + MessageTypes.EVENT.value: self.handle_event_message, + MessageTypes.SESSION.value: self.handle_session_message, + } + self.master = False + self.session = None + socketserver.BaseRequestHandler.__init__(self, request, client_address, server) + + def setup(self): + """ + Client has connected, set up a new connection. + :return: nothing + """ + pass + + def receive_message(self): + data = self.request[0] + header = data[:coreapi.CoreMessage.header_len] + if len(header) < coreapi.CoreMessage.header_len: + raise IOError("error receiving header (received %d bytes)" % len(header)) + + message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header) + if message_len == 0: + logging.warning("received message with no data") + return + + if len(data) != coreapi.CoreMessage.header_len + message_len: + logging.error("received message length does not match received data (%s != %s)", + len(data), coreapi.CoreMessage.header_len + message_len) + raise IOError + + try: + message_class = coreapi.CLASS_MAP[message_type] + message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:]) + return message + except KeyError: + message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:]) + message.msgtype = message_type + logging.exception("unimplemented core message type: %s", message.type_str()) + + def handle(self): + message = self.receive_message() + sessions = message.session_numbers() + message.queuedtimes = 0 + if sessions: + for session_id in sessions: + session = self.server.mainserver.coreemu.sessions.get(session_id) + if session: + logging.debug("session handling message: %s", session.session_id) + self.session = session + self.handle_message(message) + self.broadcast(message) + else: + logging.error("session %d in %s message not found.", session_id, message.type_str()) + else: + # no session specified, find an existing one + session = None + node_count = 0 + for session_id in self.server.mainserver.coreemu.sessions: + current_session = self.server.mainserver.coreemu.sessions[session_id] + current_node_count = current_session.get_node_count() + if current_session.state == EventTypes.RUNTIME_STATE.value and current_node_count > node_count: + node_count = current_node_count + session = current_session + + if session or message.message_type == MessageTypes.REGISTER.value: + self.session = session + self.handle_message(message) + self.broadcast(message) + else: + logging.error("no active session, dropping %s message.", message.type_str()) + + def broadcast(self, message): + if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)): + return + + for client in self.session.broker.session_clients: + try: + client.sendall(message.raw_message) + except IOError: + logging.error("error broadcasting") + + def finish(self): + return socketserver.BaseRequestHandler.finish(self) + + def queuemsg(self, msg): + """ + UDP handlers are short-lived and do not have message queues. + + :param bytes msg: message to queue + :return: + """ + raise Exception("Unable to queue %s message for later processing using UDP!" % msg) + + def sendall(self, data): + """ + Use sendto() on the connectionless UDP socket. + + :param data: + :return: + """ + self.request[1].sendto(data, self.client_address) diff --git a/daemon/core/api/tlv/coreserver.py b/daemon/core/api/tlv/coreserver.py index 389ef78f..68e87619 100644 --- a/daemon/core/api/tlv/coreserver.py +++ b/daemon/core/api/tlv/coreserver.py @@ -23,8 +23,36 @@ class CoreServer(socketserver.ThreadingMixIn, socketserver.TCPServer): :param tuple[str, int] server_address: server host and port to use :param class handler_class: request handler :param dict config: configuration setting - :return: """ self.coreemu = CoreEmu(config) self.config = config socketserver.TCPServer.__init__(self, server_address, handler_class) + + +class CoreUdpServer(socketserver.ThreadingMixIn, socketserver.UDPServer): + """ + UDP server class, manages sessions and spawns request handlers for + incoming connections. + """ + daemon_threads = True + allow_reuse_address = True + + def __init__(self, server_address, handler_class, mainserver): + """ + Server class initialization takes configuration data and calls + the SocketServer constructor + + :param server_address: + :param class handler_class: request handler + :param mainserver: + """ + self.mainserver = mainserver + socketserver.UDPServer.__init__(self, server_address, handler_class) + + def start(self): + """ + Thread target to run concurrently with the TCP server. + + :return: nothing + """ + self.serve_forever() diff --git a/daemon/scripts/core-daemon b/daemon/scripts/core-daemon index 14170326..67bbbbcc 100755 --- a/daemon/scripts/core-daemon +++ b/daemon/scripts/core-daemon @@ -6,18 +6,21 @@ message handlers are defined and some support for sending messages. """ import argparse -from configparser import ConfigParser import logging import sys import threading import time -from core import load_logging_config +from configparser import ConfigParser + from core import constants -from core.emulator import enumerations -from core.api.tlv.corehandlers import CoreHandler -from core.api.tlv.coreserver import CoreServer +from core import load_logging_config from core.api.grpc.server import CoreGrpcServer +from core.api.tlv.corehandlers import CoreHandler +from core.api.tlv.corehandlers import CoreUdpHandler +from core.api.tlv.coreserver import CoreServer +from core.api.tlv.coreserver import CoreUdpServer +from core.emulator import enumerations from core.utils import close_onexec load_logging_config() @@ -32,7 +35,22 @@ def banner(): logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) -def cored(cfg): +def start_udp(mainserver, server_address): + """ + Start a thread running a UDP server on the same host,port for + connectionless requests. + + :param CoreServer mainserver: main core tcp server to piggy back off of + :param server_address: + :return: CoreUdpServer + """ + mainserver.udpserver = CoreUdpServer(server_address, CoreUdpHandler, mainserver) + mainserver.udpthread = threading.Thread(target=mainserver.udpserver.start) + mainserver.udpthread.daemon = True + mainserver.udpthread.start() + + +def cored(cfg, use_ovs): """ Start the CoreServer object and enter the server loop. @@ -46,8 +64,9 @@ def cored(cfg): host = "localhost" try: - server = CoreServer((host, port), CoreHandler, cfg) - if cfg["ovs"] == "True": + address = (host, port) + server = CoreServer(address, CoreHandler, cfg) + if use_ovs: from core.nodes.openvswitch import OVS_NODES server.coreemu.update_nodes(OVS_NODES) except: @@ -62,8 +81,13 @@ def cored(cfg): grpc_thread.daemon = True grpc_thread.start() + # start udp server + start_udp(server, address) + + # close handlers close_onexec(server.fileno()) - logging.info("server started, listening on: %s:%s", host, port) + + logging.info("tcp/udp servers started, listening on: %s:%s", host, port) server.serve_forever() @@ -133,8 +157,11 @@ def main(): cfg = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR) banner() + # check if ovs flag was provided + use_ovs = len(sys.argv) == 2 and sys.argv[1] == "ovs" + try: - cored(cfg) + cored(cfg, use_ovs) except KeyboardInterrupt: logging.info("keyboard interrupt, stopping core daemon") diff --git a/daemon/scripts/coresendmsg b/daemon/scripts/coresendmsg index 03551e8a..023fd950 100644 --- a/daemon/scripts/coresendmsg +++ b/daemon/scripts/coresendmsg @@ -186,6 +186,8 @@ def main(): help="Listen for a response message and print it.") parser.add_option("-t", "--list-tlvs", dest="tlvs", action="store_true", help="List TLVs for the specified message type.") + parser.add_option("--tcp", dest="tcp", action="store_true", + help="Use TCP instead of UDP and connect to a session default: %s" % parser.defaults["tcp"]) def usage(msg=None, err=0): sys.stdout.write("\n") @@ -249,7 +251,12 @@ def main(): msg = msg_cls.pack(flags, tlvdata) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if opt.tcp: + protocol = socket.SOCK_STREAM + else: + protocol = socket.SOCK_DGRAM + + sock = socket.socket(socket.AF_INET, protocol) sock.setblocking(True) try: @@ -258,7 +265,7 @@ def main(): print("Error connecting to %s:%s:\n\t%s" % (opt.address, opt.port, e)) sys.exit(1) - if not connect_to_session(sock, opt.session): + if opt.tcp and not connect_to_session(sock, opt.session): print("warning: continuing without joining a session!") sock.sendall(msg) diff --git a/gui/wlan.tcl b/gui/wlan.tcl index b110a4aa..bea770a7 100644 --- a/gui/wlan.tcl +++ b/gui/wlan.tcl @@ -243,7 +243,6 @@ proc moveNode { c node img xpos ypos dx dy } { "wlanlink && need_redraw"] { redrawWlanLink $wlanlink } - $c dtag node selected $c delete -withtags selectmark $c dtag link need_redraw $c dtag wlanlink need_redraw