changes to add back in coresendmsg udp support
This commit is contained in:
parent
70abb8cc14
commit
e59a8bf66d
4 changed files with 189 additions and 13 deletions
|
@ -136,6 +136,8 @@ class CoreHandler(SocketServer.BaseRequestHandler):
|
|||
logging.info("connection closed: %s", self.client_address)
|
||||
if self.session:
|
||||
# remove client from session broker and shutdown if there are no clients
|
||||
clients = self.server.session_clients.setdefault(self.session.session_id, [])
|
||||
clients.remove(self)
|
||||
self.remove_session_handlers()
|
||||
self.session.broker.session_clients.remove(self)
|
||||
if not self.session.broker.session_clients and not self.session.is_active():
|
||||
|
@ -307,6 +309,7 @@ class CoreHandler(SocketServer.BaseRequestHandler):
|
|||
"""
|
||||
logging.debug("handling broadcast node: %s", node_data)
|
||||
message = dataconversion.convert_node(node_data)
|
||||
|
||||
try:
|
||||
self.sendall(message)
|
||||
except IOError:
|
||||
|
@ -534,7 +537,8 @@ class CoreHandler(SocketServer.BaseRequestHandler):
|
|||
|
||||
# TODO: add shutdown handler for session
|
||||
self.session = self.coreemu.create_session(port, master=False)
|
||||
# self.session.shutdown_handlers.append(self.session_shutdown)
|
||||
clients = self.server.session_clients.setdefault(self.session.session_id, [])
|
||||
clients.append(self)
|
||||
logging.debug("created new session for client: %s", self.session.session_id)
|
||||
|
||||
# TODO: hack to associate this handler with this sessions broker for broadcasting
|
||||
|
@ -1017,7 +1021,7 @@ class CoreHandler(SocketServer.BaseRequestHandler):
|
|||
self.session.location.setrefgeo(lat, lon, alt)
|
||||
self.session.location.refscale = values[5]
|
||||
logging.info("location configured: %s = %s scale=%s", self.session.location.refxyz,
|
||||
self.session.location.refgeo, self.session.location.refscale)
|
||||
self.session.location.refgeo, self.session.location.refscale)
|
||||
logging.info("location configured: UTM%s", self.session.location.refutm)
|
||||
|
||||
def handle_config_metadata(self, message_type, config_data):
|
||||
|
@ -1770,3 +1774,117 @@ class CoreHandler(SocketServer.BaseRequestHandler):
|
|||
self.session.broadcast_config(config_data)
|
||||
|
||||
logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data))
|
||||
|
||||
|
||||
class CoreUdpHandler(CoreHandler):
|
||||
def __init__(self, request, client_address, server):
|
||||
self.message_handlers = {
|
||||
MessageTypes.NODE.value: self.handle_node_message,
|
||||
MessageTypes.LINK.value: self.handle_link_message,
|
||||
MessageTypes.EXECUTE.value: self.handle_execute_message,
|
||||
MessageTypes.REGISTER.value: self.handle_register_message,
|
||||
MessageTypes.CONFIG.value: self.handle_config_message,
|
||||
MessageTypes.FILE.value: self.handle_file_message,
|
||||
MessageTypes.INTERFACE.value: self.handle_interface_message,
|
||||
MessageTypes.EVENT.value: self.handle_event_message,
|
||||
MessageTypes.SESSION.value: self.handle_session_message,
|
||||
}
|
||||
self.master = False
|
||||
self.session = None
|
||||
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
|
||||
|
||||
def setup(self):
|
||||
"""
|
||||
Client has connected, set up a new connection.
|
||||
:return: nothing
|
||||
"""
|
||||
logging.info("new UDP connection: %s", self.client_address)
|
||||
|
||||
def receive_message(self):
|
||||
data = self.request[0]
|
||||
header = data[:coreapi.CoreMessage.header_len]
|
||||
if len(header) < coreapi.CoreMessage.header_len:
|
||||
raise IOError("error receiving header (received %d bytes)" % len(header))
|
||||
|
||||
message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header)
|
||||
if message_len == 0:
|
||||
logging.warning("received message with no data")
|
||||
return
|
||||
|
||||
if len(data) != coreapi.CoreMessage.header_len + message_len:
|
||||
logging.error("received message length does not match received data (%s != %s)",
|
||||
len(data), coreapi.CoreMessage.header_len + message_len)
|
||||
raise IOError
|
||||
|
||||
try:
|
||||
message_class = coreapi.CLASS_MAP[message_type]
|
||||
message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:])
|
||||
return message
|
||||
except KeyError:
|
||||
message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:])
|
||||
message.msgtype = message_type
|
||||
logging.exception("unimplemented core message type: %s", message.type_str())
|
||||
|
||||
def handle(self):
|
||||
message = self.receive_message()
|
||||
sessions = message.session_numbers()
|
||||
message.queuedtimes = 0
|
||||
if sessions:
|
||||
for session_id in sessions:
|
||||
session = self.server.mainserver.coreemu.sessions.get(session_id)
|
||||
if session:
|
||||
logging.debug("session handling message: %s", session.session_id)
|
||||
self.session = session
|
||||
self.handle_message(message)
|
||||
self.broadcast(message)
|
||||
else:
|
||||
logging.error("session %d in %s message not found.", session_id, message.type_str())
|
||||
else:
|
||||
# no session specified, find an existing one
|
||||
session = None
|
||||
node_count = 0
|
||||
for session_id in self.server.mainserver.coreemu.sessions:
|
||||
current_session = self.server.mainserver.coreemu.sessions[session_id]
|
||||
current_node_count = current_session.get_node_count()
|
||||
if current_session.state == EventTypes.RUNTIME_STATE.value and current_node_count > node_count:
|
||||
node_count = current_node_count
|
||||
session = current_session
|
||||
|
||||
if session or message.message_type == MessageTypes.REGISTER.value:
|
||||
self.session = session
|
||||
self.handle_message(message)
|
||||
self.broadcast(message)
|
||||
else:
|
||||
logging.error("no active session, dropping %s message.", message.type_str())
|
||||
|
||||
def broadcast(self, message):
|
||||
if not isinstance(message, (coreapi.CoreNodeMessage, coreapi.CoreLinkMessage)):
|
||||
return
|
||||
|
||||
clients = self.server.mainserver.session_clients.setdefault(self.session.session_id, [])
|
||||
for client in clients:
|
||||
try:
|
||||
client.sendall(message.raw_message)
|
||||
except IOError:
|
||||
logging.error("error broadcasting")
|
||||
|
||||
def finish(self):
|
||||
return SocketServer.BaseRequestHandler.finish(self)
|
||||
|
||||
def queuemsg(self, msg):
|
||||
"""
|
||||
UDP handlers are short-lived and do not have message queues.
|
||||
|
||||
:param bytes msg: message to queue
|
||||
:return:
|
||||
"""
|
||||
raise Exception("Unable to queue %s message for later processing using UDP!" % msg)
|
||||
|
||||
def sendall(self, data):
|
||||
"""
|
||||
Use sendto() on the connectionless UDP socket.
|
||||
|
||||
:param data:
|
||||
:return:
|
||||
"""
|
||||
self.request[1].sendto(data, self.client_address)
|
||||
|
|
|
@ -23,8 +23,37 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
|||
:param tuple[str, int] server_address: server host and port to use
|
||||
:param class handler_class: request handler
|
||||
:param dict config: configuration setting
|
||||
:return:
|
||||
"""
|
||||
self.coreemu = CoreEmu(config)
|
||||
self.config = config
|
||||
self.session_clients = {}
|
||||
SocketServer.TCPServer.__init__(self, server_address, handler_class)
|
||||
|
||||
|
||||
class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
|
||||
"""
|
||||
UDP server class, manages sessions and spawns request handlers for
|
||||
incoming connections.
|
||||
"""
|
||||
daemon_threads = True
|
||||
allow_reuse_address = True
|
||||
|
||||
def __init__(self, server_address, handler_class, mainserver):
|
||||
"""
|
||||
Server class initialization takes configuration data and calls
|
||||
the SocketServer constructor
|
||||
|
||||
:param server_address:
|
||||
:param class handler_class: request handler
|
||||
:param mainserver:
|
||||
"""
|
||||
self.mainserver = mainserver
|
||||
SocketServer.UDPServer.__init__(self, server_address, handler_class)
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Thread target to run concurrently with the TCP server.
|
||||
|
||||
:return: nothing
|
||||
"""
|
||||
self.serve_forever()
|
||||
|
|
|
@ -9,13 +9,14 @@ import ConfigParser
|
|||
import logging
|
||||
import optparse
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from core import load_logging_config
|
||||
from core import constants
|
||||
from core import enumerations
|
||||
from core.corehandlers import CoreHandler
|
||||
from core.coreserver import CoreServer
|
||||
from core import load_logging_config
|
||||
from core.corehandlers import CoreHandler, CoreUdpHandler
|
||||
from core.coreserver import CoreServer, CoreUdpServer
|
||||
from core.misc.utils import close_onexec
|
||||
|
||||
load_logging_config()
|
||||
|
@ -30,6 +31,21 @@ def banner():
|
|||
logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime())
|
||||
|
||||
|
||||
def start_udp(mainserver, server_address):
|
||||
"""
|
||||
Start a thread running a UDP server on the same host,port for
|
||||
connectionless requests.
|
||||
|
||||
:param CoreServer mainserver: main core tcp server to piggy back off of
|
||||
:param server_address:
|
||||
:return: CoreUdpServer
|
||||
"""
|
||||
mainserver.udpserver = CoreUdpServer(server_address, CoreUdpHandler, mainserver)
|
||||
mainserver.udpthread = threading.Thread(target=mainserver.udpserver.start)
|
||||
mainserver.udpthread.daemon = True
|
||||
mainserver.udpthread.start()
|
||||
|
||||
|
||||
def cored(cfg, use_ovs):
|
||||
"""
|
||||
Start the CoreServer object and enter the server loop.
|
||||
|
@ -44,18 +60,24 @@ def cored(cfg, use_ovs):
|
|||
host = "localhost"
|
||||
|
||||
try:
|
||||
server = CoreServer((host, port), CoreHandler, cfg)
|
||||
address = (host, port)
|
||||
server = CoreServer(address, CoreHandler, cfg)
|
||||
if use_ovs:
|
||||
from core.netns.openvswitch import OVS_NODES
|
||||
server.coreemu.update_nodes(OVS_NODES)
|
||||
|
||||
# start udp server
|
||||
start_udp(server, address)
|
||||
|
||||
# close handlers
|
||||
close_onexec(server.fileno())
|
||||
|
||||
logging.info("tcp/udp servers started, listening on: %s:%s", host, port)
|
||||
server.serve_forever()
|
||||
except:
|
||||
logging.exception("error starting main server on: %s:%s", host, port)
|
||||
sys.exit(1)
|
||||
|
||||
close_onexec(server.fileno())
|
||||
logging.info("server started, listening on: %s:%s", host, port)
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
def get_merged_config(filename):
|
||||
"""
|
||||
|
|
|
@ -186,6 +186,8 @@ def main():
|
|||
help="Listen for a response message and print it.")
|
||||
parser.add_option("-t", "--list-tlvs", dest="tlvs", action="store_true",
|
||||
help="List TLVs for the specified message type.")
|
||||
parser.add_option("--tcp", dest="tcp", action="store_true",
|
||||
help="Use TCP instead of UDP and connect to a session default: %s" % parser.defaults["tcp"])
|
||||
|
||||
def usage(msg=None, err=0):
|
||||
sys.stdout.write("\n")
|
||||
|
@ -249,7 +251,12 @@ def main():
|
|||
|
||||
msg = msg_cls.pack(flags, tlvdata)
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
if opt.tcp:
|
||||
protocol = socket.SOCK_STREAM
|
||||
else:
|
||||
protocol = socket.SOCK_DGRAM
|
||||
|
||||
sock = socket.socket(socket.AF_INET, protocol)
|
||||
sock.setblocking(True)
|
||||
|
||||
try:
|
||||
|
@ -258,7 +265,7 @@ def main():
|
|||
print "Error connecting to %s:%s:\n\t%s" % (opt.address, opt.port, e)
|
||||
sys.exit(1)
|
||||
|
||||
if not connect_to_session(sock, opt.session):
|
||||
if opt.tcp and not connect_to_session(sock, opt.session):
|
||||
print "warning: continuing without joining a session!"
|
||||
|
||||
sock.sendall(msg)
|
||||
|
|
Loading…
Reference in a new issue