Merge pull request #254 from coreemu/sendmsg-udp

Merging back in core-daemon / coresendmsg udp support
This commit is contained in:
bharnden 2019-06-07 09:21:02 -07:00 committed by GitHub
commit a74f2dae1f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 181 additions and 12 deletions

View file

@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script. # Process this file with autoconf to produce a configure script.
# this defines the CORE version number, must be static for AC_INIT # this defines the CORE version number, must be static for AC_INIT
AC_INIT(core, 5.2.1, core-dev@nrl.navy.mil) AC_INIT(core, 5.2.2, core-dev@nrl.navy.mil)
# autoconf and automake initialization # autoconf and automake initialization
AC_CONFIG_SRCDIR([netns/version.h.in]) AC_CONFIG_SRCDIR([netns/version.h.in])

View file

@ -307,6 +307,7 @@ class CoreHandler(SocketServer.BaseRequestHandler):
""" """
logging.debug("handling broadcast node: %s", node_data) logging.debug("handling broadcast node: %s", node_data)
message = dataconversion.convert_node(node_data) message = dataconversion.convert_node(node_data)
try: try:
self.sendall(message) self.sendall(message)
except IOError: except IOError:
@ -534,7 +535,6 @@ class CoreHandler(SocketServer.BaseRequestHandler):
# TODO: add shutdown handler for session # TODO: add shutdown handler for session
self.session = self.coreemu.create_session(port, master=False) self.session = self.coreemu.create_session(port, master=False)
# self.session.shutdown_handlers.append(self.session_shutdown)
logging.debug("created new session for client: %s", self.session.session_id) logging.debug("created new session for client: %s", self.session.session_id)
# TODO: hack to associate this handler with this sessions broker for broadcasting # TODO: hack to associate this handler with this sessions broker for broadcasting
@ -1770,3 +1770,116 @@ class CoreHandler(SocketServer.BaseRequestHandler):
self.session.broadcast_config(config_data) self.session.broadcast_config(config_data)
logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data)) logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data))
class CoreUdpHandler(CoreHandler):
def __init__(self, request, client_address, server):
self.message_handlers = {
MessageTypes.NODE.value: self.handle_node_message,
MessageTypes.LINK.value: self.handle_link_message,
MessageTypes.EXECUTE.value: self.handle_execute_message,
MessageTypes.REGISTER.value: self.handle_register_message,
MessageTypes.CONFIG.value: self.handle_config_message,
MessageTypes.FILE.value: self.handle_file_message,
MessageTypes.INTERFACE.value: self.handle_interface_message,
MessageTypes.EVENT.value: self.handle_event_message,
MessageTypes.SESSION.value: self.handle_session_message,
}
self.master = False
self.session = None
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def setup(self):
"""
Client has connected, set up a new connection.
:return: nothing
"""
pass
def receive_message(self):
data = self.request[0]
header = data[:coreapi.CoreMessage.header_len]
if len(header) < coreapi.CoreMessage.header_len:
raise IOError("error receiving header (received %d bytes)" % len(header))
message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header)
if message_len == 0:
logging.warning("received message with no data")
return
if len(data) != coreapi.CoreMessage.header_len + message_len:
logging.error("received message length does not match received data (%s != %s)",
len(data), coreapi.CoreMessage.header_len + message_len)
raise IOError
try:
message_class = coreapi.CLASS_MAP[message_type]
message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:])
return message
except KeyError:
message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:])
message.msgtype = message_type
logging.exception("unimplemented core message type: %s", message.type_str())
def handle(self):
message = self.receive_message()
sessions = message.session_numbers()
message.queuedtimes = 0
if sessions:
for session_id in sessions:
session = self.server.mainserver.coreemu.sessions.get(session_id)
if session:
logging.debug("session handling message: %s", session.session_id)
self.session = session
self.handle_message(message)
self.broadcast(message)
else:
logging.error("session %d in %s message not found.", session_id, message.type_str())
else:
# no session specified, find an existing one
session = None
node_count = 0
for session_id in self.server.mainserver.coreemu.sessions:
current_session = self.server.mainserver.coreemu.sessions[session_id]
current_node_count = current_session.get_node_count()
if current_session.state == EventTypes.RUNTIME_STATE.value and current_node_count > node_count:
node_count = current_node_count
session = current_session
if session or message.message_type == MessageTypes.REGISTER.value:
self.session = session
self.handle_message(message)
self.broadcast(message)
else:
logging.error("no active session, dropping %s message.", message.type_str())
def broadcast(self, message):
if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)):
return
for client in self.session.broker.session_clients:
try:
client.sendall(message.raw_message)
except IOError:
logging.error("error broadcasting")
def finish(self):
return SocketServer.BaseRequestHandler.finish(self)
def queuemsg(self, msg):
"""
UDP handlers are short-lived and do not have message queues.
:param bytes msg: message to queue
:return:
"""
raise Exception("Unable to queue %s message for later processing using UDP!" % msg)
def sendall(self, data):
"""
Use sendto() on the connectionless UDP socket.
:param data:
:return:
"""
self.request[1].sendto(data, self.client_address)

View file

@ -23,8 +23,36 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
:param tuple[str, int] server_address: server host and port to use :param tuple[str, int] server_address: server host and port to use
:param class handler_class: request handler :param class handler_class: request handler
:param dict config: configuration setting :param dict config: configuration setting
:return:
""" """
self.coreemu = CoreEmu(config) self.coreemu = CoreEmu(config)
self.config = config self.config = config
SocketServer.TCPServer.__init__(self, server_address, handler_class) SocketServer.TCPServer.__init__(self, server_address, handler_class)
class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
"""
UDP server class, manages sessions and spawns request handlers for
incoming connections.
"""
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address, handler_class, mainserver):
"""
Server class initialization takes configuration data and calls
the SocketServer constructor
:param server_address:
:param class handler_class: request handler
:param mainserver:
"""
self.mainserver = mainserver
SocketServer.UDPServer.__init__(self, server_address, handler_class)
def start(self):
"""
Thread target to run concurrently with the TCP server.
:return: nothing
"""
self.serve_forever()

View file

@ -9,13 +9,14 @@ import ConfigParser
import logging import logging
import optparse import optparse
import sys import sys
import threading
import time import time
from core import load_logging_config
from core import constants from core import constants
from core import enumerations from core import enumerations
from core.corehandlers import CoreHandler from core import load_logging_config
from core.coreserver import CoreServer from core.corehandlers import CoreHandler, CoreUdpHandler
from core.coreserver import CoreServer, CoreUdpServer
from core.misc.utils import close_onexec from core.misc.utils import close_onexec
load_logging_config() load_logging_config()
@ -30,6 +31,21 @@ def banner():
logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime())
def start_udp(mainserver, server_address):
"""
Start a thread running a UDP server on the same host,port for
connectionless requests.
:param CoreServer mainserver: main core tcp server to piggy back off of
:param server_address:
:return: CoreUdpServer
"""
mainserver.udpserver = CoreUdpServer(server_address, CoreUdpHandler, mainserver)
mainserver.udpthread = threading.Thread(target=mainserver.udpserver.start)
mainserver.udpthread.daemon = True
mainserver.udpthread.start()
def cored(cfg, use_ovs): def cored(cfg, use_ovs):
""" """
Start the CoreServer object and enter the server loop. Start the CoreServer object and enter the server loop.
@ -44,7 +60,8 @@ def cored(cfg, use_ovs):
host = "localhost" host = "localhost"
try: try:
server = CoreServer((host, port), CoreHandler, cfg) address = (host, port)
server = CoreServer(address, CoreHandler, cfg)
if use_ovs: if use_ovs:
from core.netns.openvswitch import OVS_NODES from core.netns.openvswitch import OVS_NODES
server.coreemu.update_nodes(OVS_NODES) server.coreemu.update_nodes(OVS_NODES)
@ -52,8 +69,13 @@ def cored(cfg, use_ovs):
logging.exception("error starting main server on: %s:%s", host, port) logging.exception("error starting main server on: %s:%s", host, port)
sys.exit(1) sys.exit(1)
# start udp server
start_udp(server, address)
# close handlers
close_onexec(server.fileno()) close_onexec(server.fileno())
logging.info("server started, listening on: %s:%s", host, port)
logging.info("tcp/udp servers started, listening on: %s:%s", host, port)
server.serve_forever() server.serve_forever()

View file

@ -186,6 +186,8 @@ def main():
help="Listen for a response message and print it.") help="Listen for a response message and print it.")
parser.add_option("-t", "--list-tlvs", dest="tlvs", action="store_true", parser.add_option("-t", "--list-tlvs", dest="tlvs", action="store_true",
help="List TLVs for the specified message type.") help="List TLVs for the specified message type.")
parser.add_option("--tcp", dest="tcp", action="store_true",
help="Use TCP instead of UDP and connect to a session default: %s" % parser.defaults["tcp"])
def usage(msg=None, err=0): def usage(msg=None, err=0):
sys.stdout.write("\n") sys.stdout.write("\n")
@ -249,7 +251,12 @@ def main():
msg = msg_cls.pack(flags, tlvdata) msg = msg_cls.pack(flags, tlvdata)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if opt.tcp:
protocol = socket.SOCK_STREAM
else:
protocol = socket.SOCK_DGRAM
sock = socket.socket(socket.AF_INET, protocol)
sock.setblocking(True) sock.setblocking(True)
try: try:
@ -258,7 +265,7 @@ def main():
print "Error connecting to %s:%s:\n\t%s" % (opt.address, opt.port, e) print "Error connecting to %s:%s:\n\t%s" % (opt.address, opt.port, e)
sys.exit(1) sys.exit(1)
if not connect_to_session(sock, opt.session): if opt.tcp and not connect_to_session(sock, opt.session):
print "warning: continuing without joining a session!" print "warning: continuing without joining a session!"
sock.sendall(msg) sock.sendall(msg)

View file

@ -243,7 +243,6 @@ proc moveNode { c node img xpos ypos dx dy } {
"wlanlink && need_redraw"] { "wlanlink && need_redraw"] {
redrawWlanLink $wlanlink redrawWlanLink $wlanlink
} }
$c dtag node selected
$c delete -withtags selectmark $c delete -withtags selectmark
$c dtag link need_redraw $c dtag link need_redraw
$c dtag wlanlink need_redraw $c dtag wlanlink need_redraw