removed the core server reference from sessions, added a shutdown handler to initiate callbacks for when a session shutsdown, this is how the core server can run the same functionality going forward, small core-daemon documentation cleanup

This commit is contained in:
Blake J. Harnden 2017-05-04 13:49:14 -07:00
parent 3f82c980de
commit 7ad57bfb53
3 changed files with 115 additions and 68 deletions

View file

@ -31,7 +31,7 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
Server class initialization takes configuration data and calls Server class initialization takes configuration data and calls
the SocketServer constructor the SocketServer constructor
:param tuple server_address: server host and port to use :param tuple[str, int] server_address: server host and port to use
:param class handler_class: request handler :param class handler_class: request handler
:param dict config: configuration setting :param dict config: configuration setting
:return: :return:
@ -64,10 +64,8 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
:param CoreServer server: server to remove :param CoreServer server: server to remove
:return: nothing :return: nothing
""" """
try: if server in cls.servers:
cls.servers.remove(server) cls.servers.remove(server)
except KeyError:
logger.exception("error removing server: %s", server)
def shutdown(self): def shutdown(self):
""" """
@ -150,8 +148,13 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
if session_id not in self.sessions if session_id not in self.sessions
) )
session = Session(session_id, config=self.config, server=self) # create and add session to local manager
session = Session(session_id, config=self.config)
self.add_session(session) self.add_session(session)
# add shutdown handler to remove session from manager
session.shutdown_handlers.append(self.session_shutdown)
return session return session
def get_session(self, session_id=None): def get_session(self, session_id=None):
@ -187,6 +190,15 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
return session return session
def session_shutdown(self, session):
"""
Handler method to be used as a callback when a session has shutdown.
:param core.session.Session session: session shutting down
:return: nothing
"""
self.remove_session(session)
def to_session_message(self, flags=0): def to_session_message(self, flags=0):
""" """
Build CORE API Sessions message based on current session info. Build CORE API Sessions message based on current session info.
@ -293,7 +305,7 @@ class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
Server class initialization takes configuration data and calls Server class initialization takes configuration data and calls
the SocketServer constructor the SocketServer constructor
:param str server_address: server address :param tuple[str, int] server_address: server address
:param class handler_class: class for handling requests :param class handler_class: class for handling requests
:param main_server: main server to associate with :param main_server: main server to associate with
""" """
@ -320,7 +332,7 @@ class CoreAuxServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
""" """
Create a CoreAuxServer instance. Create a CoreAuxServer instance.
:param str server_address: server address :param tuple[str, int] server_address: server address
:param class handler_class: class for handling requests :param class handler_class: class for handling requests
:param main_server: main server to associate with :param main_server: main server to associate with
""" """

View file

@ -210,13 +210,12 @@ class Session(object):
CORE session manager. CORE session manager.
""" """
def __init__(self, session_id, config=None, server=None, persistent=False, mkdir=True): def __init__(self, session_id, config=None, persistent=False, mkdir=True):
""" """
Create a Session instance. Create a Session instance.
:param int session_id: session id :param int session_id: session id
:param dict config: session configuration :param dict config: session configuration
:param core.coreserver.CoreServer server: core server object
:param bool persistent: flag is session is considered persistent :param bool persistent: flag is session is considered persistent
:param bool mkdir: flag to determine if a directory should be made :param bool mkdir: flag to determine if a directory should be made
""" """
@ -256,9 +255,6 @@ class Session(object):
self.add_state_hook(state=EventTypes.RUNTIME_STATE.value, hook=self.runtime_state_hook) self.add_state_hook(state=EventTypes.RUNTIME_STATE.value, hook=self.runtime_state_hook)
# TODO: remove this server reference
self.server = server
if not persistent: if not persistent:
SessionManager.add(self) SessionManager.add(self)
@ -307,6 +303,7 @@ class Session(object):
self.link_handlers = [] self.link_handlers = []
self.file_handlers = [] self.file_handlers = []
self.config_handlers = [] self.config_handlers = []
self.shutdown_handlers = []
def shutdown(self): def shutdown(self):
""" """
@ -333,13 +330,13 @@ class Session(object):
if not preserve: if not preserve:
shutil.rmtree(self.session_dir, ignore_errors=True) shutil.rmtree(self.session_dir, ignore_errors=True)
# remove session from server if one was provided
if self.server:
self.server.remove_session(self)
# remove this session from the manager # remove this session from the manager
SessionManager.remove(self) SessionManager.remove(self)
# call session shutdown handlers
for handler in self.shutdown_handlers:
handler(self)
def broadcast_event(self, event_data): def broadcast_event(self, event_data):
""" """
Handle event data that should be provided to event handler. Handle event data that should be provided to event handler.

View file

@ -31,6 +31,7 @@ from core import corehandlers
from core import coreserver from core import coreserver
from core import enumerations from core import enumerations
from core.api import coreapi from core.api import coreapi
from core.corehandlers import CoreDatagramRequestHandler
from core.enumerations import MessageFlags from core.enumerations import MessageFlags
from core.enumerations import RegisterTlvs from core.enumerations import RegisterTlvs
from core.misc import log from core.misc import log
@ -48,54 +49,64 @@ from core.services import ucarp
from core.services import utility from core.services import utility
from core.services import xorp from core.services import xorp
logger = log.get_logger(__name__)
DEFAULT_MAXFD = 1024 DEFAULT_MAXFD = 1024
# def startudp(core_server, server_address):
# UDP server startup
#
def startudp(mainserver, server_address):
""" Start a thread running a UDP server on the same host,port for
connectionless requests.
""" """
mainserver.udpserver = coreserver.CoreUdpServer( Start a thread running a UDP server on the same host,port for connectionless requests.
server_address,
corehandlers.CoreDatagramRequestHandler, :param core.coreserver.CoreServer core_server: core server instance
mainserver) :param tuple[str, int] server_address: server address
mainserver.udpthread = threading.Thread(target=mainserver.udpserver.start) :return: created core udp server
mainserver.udpthread.daemon = True :rtype: core.coreserver.CoreUdpServer
mainserver.udpthread.start() """
return mainserver.udpserver core_server.udpserver = coreserver.CoreUdpServer(server_address, CoreDatagramRequestHandler, core_server)
core_server.udpthread = threading.Thread(target=core_server.udpserver.start)
core_server.udpthread.daemon = True
core_server.udpthread.start()
return core_server.udpserver
# def startaux(core_server, aux_address, aux_handler):
# Auxiliary server startup """
# Start a thread running an auxiliary TCP server on the given address.
def startaux(mainserver, aux_address, aux_handler):
""" Start a thread running an auxiliary TCP server on the given address.
This server will communicate with client requests using a handler This server will communicate with client requests using a handler
using the aux_handler class. The aux_handler can provide an alternative using the aux_handler class. The aux_handler can provide an alternative
API to CORE. API to CORE.
:param core.coreserver.CoreServer core_server: core server instance
:param tuple[str, int] aux_address: auxiliary server address
:param str aux_handler: auxiliary handler string to import
:return: auxiliary server
""" """
handlermodname, dot, handlerclassname = aux_handler.rpartition(".") handlermodname, dot, handlerclassname = aux_handler.rpartition(".")
handlermod = importlib.import_module(handlermodname) handlermod = importlib.import_module(handlermodname)
handlerclass = getattr(handlermod, handlerclassname) handlerclass = getattr(handlermod, handlerclassname)
mainserver.auxserver = coreserver.CoreAuxServer(aux_address, handlerclass, mainserver) core_server.auxserver = coreserver.CoreAuxServer(aux_address, handlerclass, core_server)
mainserver.auxthread = threading.Thread(target=mainserver.auxserver.start) core_server.auxthread = threading.Thread(target=core_server.auxserver.start)
mainserver.auxthread.daemon = True core_server.auxthread.daemon = True
mainserver.auxthread.start() core_server.auxthread.start()
return mainserver.auxserver return core_server.auxserver
def banner(): def banner():
""" Output the program banner printed to the terminal or log file.
""" """
sys.stdout.write("CORE daemon v.%s started %s\n" % (constants.COREDPY_VERSION, time.ctime())) Output the program banner printed to the terminal or log file.
sys.stdout.flush()
:return: nothing
"""
logger.info("CORE daemon v.%s started %s\n" % (constants.COREDPY_VERSION, time.ctime()))
def cored(cfg=None): def cored(cfg=None):
""" Start the CoreServer object and enter the server loop. """
Start the CoreServer object and enter the server loop.
:param dict cfg: core configuration
:return: nothing
""" """
host = cfg["listenaddr"] host = cfg["listenaddr"]
port = int(cfg["port"]) port = int(cfg["port"])
@ -103,30 +114,32 @@ def cored(cfg=None):
host = "localhost" host = "localhost"
try: try:
server = coreserver.CoreServer((host, port), corehandlers.CoreRequestHandler, cfg) server = coreserver.CoreServer((host, port), corehandlers.CoreRequestHandler, cfg)
except Exception, e: except:
sys.stderr.write("error starting main server on: %s:%s\n\t%s\n" % (host, port, e)) logger.exception("error starting main server on: %s:%s", host, port)
sys.stderr.flush()
sys.exit(1) sys.exit(1)
closeonexec(server.fileno()) closeonexec(server.fileno())
sys.stdout.write("main server started, listening on: %s:%s\n" % (host, port)) logger.info("main server started, listening on: %s:%s\n" % (host, port))
sys.stdout.flush()
udpserver = startudp(server, (host, port)) udpserver = startudp(server, (host, port))
closeonexec(udpserver.fileno()) closeonexec(udpserver.fileno())
auxreqhandler = cfg["aux_request_handler"] auxreqhandler = cfg["aux_request_handler"]
if auxreqhandler: if auxreqhandler:
try: handler, auxport = auxreqhandler.rsplit(":")
handler, auxport = auxreqhandler.rsplit(":") auxserver = startaux(server, (host, int(auxport)), handler)
auxserver = startaux(server, (host, int(auxport)), handler) closeonexec(auxserver.fileno())
closeonexec(auxserver.fileno())
except Exception as e:
raise ValueError, "invalid auxreqhandler:(%s)\nError: %s" % (auxreqhandler, e)
server.serve_forever() server.serve_forever()
# TODO: should sessions and the main core daemon both catch at exist to shutdown independently?
def cleanup(): def cleanup():
"""
Runs server shutdown and cleanup when catching an exit signal.
:return: nothing
"""
while coreserver.CoreServer.servers: while coreserver.CoreServer.servers:
server = coreserver.CoreServer.servers.pop() server = coreserver.CoreServer.servers.pop()
server.shutdown() server.shutdown()
@ -136,7 +149,14 @@ atexit.register(cleanup)
def sighandler(signum, stackframe): def sighandler(signum, stackframe):
print >> sys.stderr, "terminated by signal:", signum """
Signal handler when different signals are sent.
:param int signum: singal number sent
:param stackframe: stack frame sent
:return: nothing
"""
logger.error("terminated by signal: %s", signum)
sys.exit(signum) sys.exit(signum)
@ -149,6 +169,16 @@ signal.signal(signal.SIGUSR2, sighandler)
def logrotate(stdout, stderr, stdoutmode=0644, stderrmode=0644): def logrotate(stdout, stderr, stdoutmode=0644, stderrmode=0644):
"""
Log rotation method.
:param stdout: stdout
:param stderr: stderr
:param int stdoutmode: stdout mode
:param int stderrmode: stderr mode
:return:
"""
def reopen(fileno, filename, mode): def reopen(fileno, filename, mode):
err = 0 err = 0
fd = -1 fd = -1
@ -176,8 +206,12 @@ def logrotate(stdout, stderr, stdoutmode=0644, stderrmode=0644):
def get_merged_config(filename): def get_merged_config(filename):
""" Return a configuration after merging config file and command-line """
arguments. Return a configuration after merging config file and command-line arguments.
:param str filename: file name to merge configuration settings with
:return: merged configuration
:rtype: dict
""" """
# these are the defaults used in the config file # these are the defaults used in the config file
defaults = {"port": "%d" % enumerations.CORE_API_PORT, defaults = {"port": "%d" % enumerations.CORE_API_PORT,
@ -227,7 +261,7 @@ def get_merged_config(filename):
defaults["debug"]) defaults["debug"])
# parse command line options # parse command line options
(options, args) = parser.parse_args() options, args = parser.parse_args()
# read the config file # read the config file
if options.configfile is not None: if options.configfile is not None:
@ -263,12 +297,14 @@ def get_merged_config(filename):
def exec_file(cfg): def exec_file(cfg):
""" Send a Register Message to execute a new session based on XML or Python """
script file. Send a Register Message to execute a new session based on XML or Python script file.
:param dict cfg: configuration settings
:return: 0
""" """
filename = cfg["execfile"] filename = cfg["execfile"]
sys.stdout.write("Telling daemon to execute file: %s...\n" % filename) logger.info("Telling daemon to execute file: %s...", filename)
sys.stdout.flush()
tlvdata = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, filename) tlvdata = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, filename)
msg = coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlvdata) msg = coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlvdata)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -278,12 +314,15 @@ def exec_file(cfg):
def main(): def main():
""" Main program startup. """
Main program startup.
:return: nothing
""" """
# get a configuration merged from config file and command-line arguments # get a configuration merged from config file and command-line arguments
cfg, args = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR) cfg, args = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR)
for a in args: for a in args:
sys.stderr.write("ignoring command line argument: %s\n" % a) logger.error("ignoring command line argument: %s", a)
if cfg["daemonize"] == "True": if cfg["daemonize"] == "True":
daemonize(rootdir=None, umask=0, close_fds=False, daemonize(rootdir=None, umask=0, close_fds=False,
@ -292,8 +331,7 @@ def main():
pidfilename=cfg["pidfile"], pidfilename=cfg["pidfile"],
defaultmaxfd=DEFAULT_MAXFD) defaultmaxfd=DEFAULT_MAXFD)
signal.signal(signal.SIGUSR1, lambda signum, stackframe: signal.signal(signal.SIGUSR1, lambda signum, stackframe:
logrotate(stdout=cfg["logfile"], logrotate(stdout=cfg["logfile"], stderr=cfg["logfile"]))
stderr=cfg["logfile"]))
banner() banner()
if cfg["execfile"]: if cfg["execfile"]:
@ -302,7 +340,7 @@ def main():
try: try:
cored(cfg) cored(cfg)
except KeyboardInterrupt: except KeyboardInterrupt:
pass logger.info("keyboard interrupt, stopping core daemon")
sys.exit(0) sys.exit(0)