Merge branch 'rel/5.1' of git-ssh.web.boeing.com:Boeing-CORE/CORE into rel/5.1

This commit is contained in:
Blake J. Harnden 2018-03-16 13:01:23 -07:00
commit d46aaa1005
3 changed files with 14 additions and 209 deletions

View file

@ -14,7 +14,7 @@ import time
from core import coreobj from core import coreobj
from core import logger from core import logger
from core.api import coreapi from core.api import coreapi
from core.coreserver import CoreServer, CoreUdpServer from core.coreserver import CoreServer
from core.data import ConfigData from core.data import ConfigData
from core.data import EventData from core.data import EventData
from core.data import NodeData from core.data import NodeData
@ -87,17 +87,6 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
utils.close_onexec(request.fileno()) utils.close_onexec(request.fileno())
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server) SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def _get_server(self):
"""
Retrieve server to interface with, in cases where the server is a UDP instance.
:return: core.coreserver.CoreServer
"""
server = self.server
if isinstance(server, CoreUdpServer):
server = self.server.mainserver
return server
def setup(self): def setup(self):
""" """
Client has connected, set up a new connection. Client has connected, set up a new connection.
@ -1125,36 +1114,34 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
""" """
replies = [] replies = []
server = self._get_server()
# execute a Python script or XML file # execute a Python script or XML file
execute_server = message.get_tlv(RegisterTlvs.EXECUTE_SERVER.value) execute_server = message.get_tlv(RegisterTlvs.EXECUTE_SERVER.value)
if execute_server: if execute_server:
try: try:
logger.info("executing: %s", execute_server) logger.info("executing: %s", execute_server)
if message.flags & MessageFlags.STRING.value: if message.flags & MessageFlags.STRING.value:
old_session_ids = set(server.get_session_ids()) old_session_ids = set(self.server.get_session_ids())
sys.argv = shlex.split(execute_server) sys.argv = shlex.split(execute_server)
file_name = sys.argv[0] file_name = sys.argv[0]
if os.path.splitext(file_name)[1].lower() == ".xml": if os.path.splitext(file_name)[1].lower() == ".xml":
session = server.create_session() session = self.server.create_session()
try: try:
open_session_xml(session, file_name, start=True) open_session_xml(session, file_name, start=True)
except: except:
session.shutdown() session.shutdown()
server.remove_session(session) self.server.remove_session(session)
raise raise
else: else:
thread = threading.Thread( thread = threading.Thread(
target=execfile, target=execfile,
args=(file_name, {"__file__": file_name, "server": server}) args=(file_name, {"__file__": file_name, "server": self.server})
) )
thread.daemon = True thread.daemon = True
thread.start() thread.start()
# allow time for session creation # allow time for session creation
time.sleep(0.25) time.sleep(0.25)
if message.flags & MessageFlags.STRING.value: if message.flags & MessageFlags.STRING.value:
new_session_ids = set(server.get_session_ids()) new_session_ids = set(self.server.get_session_ids())
new_sid = new_session_ids.difference(old_session_ids) new_sid = new_session_ids.difference(old_session_ids)
try: try:
sid = new_sid.pop() sid = new_sid.pop()
@ -1163,7 +1150,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
logger.info("executed %s with unknown session ID", execute_server) logger.info("executed %s with unknown session ID", execute_server)
return replies return replies
logger.info("checking session %d for RUNTIME state" % sid) logger.info("checking session %d for RUNTIME state" % sid)
session = server.get_session(session_id=sid) session = self.server.get_session(session_id=sid)
retries = 10 retries = 10
# wait for session to enter RUNTIME state, to prevent GUI from # wait for session to enter RUNTIME state, to prevent GUI from
# connecting while nodes are still being instantiated # connecting while nodes are still being instantiated
@ -1197,14 +1184,14 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
# TODO: need to replicate functionality? # TODO: need to replicate functionality?
# self.server.set_session_master(self) # self.server.set_session_master(self)
# find the session containing this client and set the session to master # find the session containing this client and set the session to master
for session in server.sessions.itervalues(): for session in self.server.sessions.itervalues():
if self in session.broker.session_clients: if self in session.broker.session_clients:
logger.info("setting session to master: %s", session.session_id) logger.info("setting session to master: %s", session.session_id)
session.master = True session.master = True
break break
replies.append(self.register()) replies.append(self.register())
replies.append(server.to_session_message()) replies.append(self.server.to_session_message())
return replies return replies
@ -1443,8 +1430,6 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
node_counts = coreapi.str_to_list(node_count_str) node_counts = coreapi.str_to_list(node_count_str)
logger.info("SESSION message flags=0x%x sessions=%s" % (message.flags, session_id_str)) logger.info("SESSION message flags=0x%x sessions=%s" % (message.flags, session_id_str))
server = self._get_server()
if message.flags == 0: if message.flags == 0:
# modify a session # modify a session
i = 0 i = 0
@ -1453,7 +1438,7 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
if session_id == 0: if session_id == 0:
session = self.session session = self.session
else: else:
session = server.get_session(session_id=session_id) session = self.server.get_session(session_id=session_id)
if session is None: if session is None:
logger.info("session %s not found", session_id) logger.info("session %s not found", session_id)
@ -1467,7 +1452,6 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
session.file_name = files[i] session.file_name = files[i]
if node_counts is not None: if node_counts is not None:
pass pass
# session.node_count = ncs[i]
if thumb is not None: if thumb is not None:
session.set_thumbnail(thumb) session.set_thumbnail(thumb)
if user is not None: if user is not None:
@ -1476,12 +1460,12 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
else: else:
if message.flags & MessageFlags.STRING.value and not message.flags & MessageFlags.ADD.value: if message.flags & MessageFlags.STRING.value and not message.flags & MessageFlags.ADD.value:
# status request flag: send list of sessions # status request flag: send list of sessions
return server.to_session_message(), return self.server.to_session_message(),
# handle ADD or DEL flags # handle ADD or DEL flags
for session_id in session_ids: for session_id in session_ids:
session_id = int(session_id) session_id = int(session_id)
session = server.get_session(session_id=session_id) session = self.server.get_session(session_id=session_id)
if session is None: if session is None:
logger.info("session %s not found (flags=0x%x)", session_id, message.flags) logger.info("session %s not found (flags=0x%x)", session_id, message.flags)
@ -1548,131 +1532,3 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
logger.exception("error sending node emulation id message: %s", node_id) logger.exception("error sending node emulation id message: %s", node_id)
del self.node_status_request[node_id] del self.node_status_request[node_id]
class CoreDatagramRequestHandler(CoreRequestHandler):
"""
A child of the CoreRequestHandler class for handling connectionless
UDP messages. No new session is created; messages are handled immediately or
sometimes queued on existing session handlers.
"""
def __init__(self, request, client_address, server):
"""
Create a CoreDatagramRequestHandler instance.
:param request: request object
:param str client_address: client address
:param CoreServer server: core server instance
"""
# TODO: decide which messages cannot be handled with connectionless UDP
self.message_handlers = {
MessageTypes.NODE.value: self.handle_node_message,
MessageTypes.LINK.value: self.handle_link_message,
MessageTypes.EXECUTE.value: self.handle_execute_message,
MessageTypes.REGISTER.value: self.handle_register_message,
MessageTypes.CONFIG.value: self.handle_config_message,
MessageTypes.FILE.value: self.handle_file_message,
MessageTypes.INTERFACE.value: self.handle_interface_message,
MessageTypes.EVENT.value: self.handle_event_message,
MessageTypes.SESSION.value: self.handle_session_message,
}
self.node_status_request = {}
self.master = False
self.session = None
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def setup(self):
"""
Client has connected, set up a new connection.
:return: nothing
"""
logger.info("new UDP connection: %s:%s" % self.client_address)
def handle(self):
"""
Receive a message.
:return: nothing
"""
self.receive_message()
def finish(self):
"""
Handle the finish state of a client.
:return: nothing
"""
return SocketServer.BaseRequestHandler.finish(self)
def receive_message(self):
"""
Receive data, parse a CoreMessage and queue it onto an existing
session handler"s queue, if available.
:return: nothing
"""
data = self.request[0]
sock = self.request[1]
header = data[:coreapi.CoreMessage.header_len]
if len(header) < coreapi.CoreMessage.header_len:
raise IOError("error receiving header (received %d bytes)" % len(header))
message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header)
if message_len == 0:
logger.warn("received message with no data")
return
if len(data) != coreapi.CoreMessage.header_len + message_len:
logger.warn("received message length does not match received data (%s != %s)",
len(data), coreapi.CoreMessage.header_len + message_len)
raise IOError
else:
logger.info("UDP socket received message type=%d len=%d", message_type, message_len)
try:
message_class = coreapi.CLASS_MAP[message_type]
message = message_class(message_flags, header, data[coreapi.CoreMessage.header_len:])
except KeyError:
message = coreapi.CoreMessage(message_flags, header, data[coreapi.CoreMessage.header_len:])
message.message_type = message_type
logger.warn("unimplemented core message type: %s" % message.type_str())
return
session_ids = message.session_numbers()
message.queuedtimes = 0
if len(session_ids) > 0:
for session_id in session_ids:
session = self.server.mainserver.get_session(session_id=session_id)
if session:
self.session = session
self.handle_message(message)
else:
logger.warn("Session %d in %s message not found." % (session_id, message.type_str()))
else:
# no session specified, find an existing one
session = self.server.mainserver.get_session()
if session or message.message_type == MessageTypes.REGISTER.value:
self.session = session
self.handle_message(message)
else:
logger.warn("No active session, dropping %s message.", message.type_str())
def queue_message(self, message):
"""
UDP handlers are short-lived and do not have message queues.
:return: nothing
"""
raise Exception("Unable to queue %s message for later processing using UDP!" % message.type_str())
def sendall(self, data):
"""
Use sendto() on the connectionless UDP socket.
:return: nothing
"""
self.request[1].sendto(data, self.client_address)

View file

@ -270,32 +270,3 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
with self._sessions_lock: with self._sessions_lock:
for session_id in self.sessions: for session_id in self.sessions:
logger.info(session_id) logger.info(session_id)
class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
"""
UDP server class, manages sessions and spawns request handlers for
incoming connections.
"""
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address, handler_class, main_server):
"""
Server class initialization takes configuration data and calls
the SocketServer constructor
:param tuple[str, int] server_address: server address
:param class handler_class: class for handling requests
:param main_server: main server to associate with
"""
self.mainserver = main_server
SocketServer.UDPServer.__init__(self, server_address, handler_class)
def start(self):
"""
Thread target to run concurrently with the TCP server.
:return: nothing
"""
self.serve_forever()

View file

@ -12,7 +12,6 @@ import os
import signal import signal
import socket import socket
import sys import sys
import threading
import time import time
from core import constants from core import constants
@ -22,7 +21,6 @@ from core import enumerations
from core import logger from core import logger
from core import services from core import services
from core.api import coreapi from core.api import coreapi
from core.corehandlers import CoreDatagramRequestHandler
from core.enumerations import MessageFlags from core.enumerations import MessageFlags
from core.enumerations import RegisterTlvs from core.enumerations import RegisterTlvs
from core.misc import nodeutils from core.misc import nodeutils
@ -33,29 +31,13 @@ from core.service import ServiceManager
DEFAULT_MAXFD = 1024 DEFAULT_MAXFD = 1024
def startudp(core_server, server_address):
"""
Start a thread running a UDP server on the same host,port for connectionless requests.
:param core.coreserver.CoreServer core_server: core server instance
:param tuple[str, int] server_address: server address
:return: created core udp server
:rtype: core.coreserver.CoreUdpServer
"""
core_server.udpserver = coreserver.CoreUdpServer(server_address, CoreDatagramRequestHandler, core_server)
core_server.udpthread = threading.Thread(target=core_server.udpserver.start)
core_server.udpthread.daemon = True
core_server.udpthread.start()
return core_server.udpserver
def banner(): def banner():
""" """
Output the program banner printed to the terminal or log file. Output the program banner printed to the terminal or log file.
:return: nothing :return: nothing
""" """
logger.info("CORE daemon v.%s started %s\n" % (constants.COREDPY_VERSION, time.ctime())) logger.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime())
def cored(cfg=None): def cored(cfg=None):
@ -76,11 +58,7 @@ def cored(cfg=None):
sys.exit(1) sys.exit(1)
close_onexec(server.fileno()) close_onexec(server.fileno())
logger.info("main server started, listening on: %s:%s\n" % (host, port)) logger.info("main server started, listening on: %s:%s", host, port)
udpserver = startudp(server, (host, port))
close_onexec(udpserver.fileno())
server.serve_forever() server.serve_forever()