made use of threadpool for starting services, refactored services to support 3 validation models (blocking, non-blocking, timer)

This commit is contained in:
Blake J. Harnden 2018-06-20 12:59:07 -07:00
parent 37ce407460
commit c6d2ca6b02
4 changed files with 121 additions and 111 deletions

View file

@ -1527,15 +1527,15 @@ class CoreHandler(SocketServer.BaseRequestHandler):
if event_type == EventTypes.STOP.value or event_type == EventTypes.RESTART.value: if event_type == EventTypes.STOP.value or event_type == EventTypes.RESTART.value:
status = self.session.services.stopnodeservice(node, service) status = self.session.services.stopnodeservice(node, service)
if status != "0": if status:
fail += "Stop %s," % service.name fail += "Stop %s," % service.name
if event_type == EventTypes.START.value or event_type == EventTypes.RESTART.value: if event_type == EventTypes.START.value or event_type == EventTypes.RESTART.value:
status = self.session.services.node_service_startup(node, service, services) status = self.session.services.node_service_startup(node, service, services)
if status != "0": if status:
fail += "Start %s(%s)," % service.name fail += "Start %s(%s)," % service.name
if event_type == EventTypes.PAUSE.value: if event_type == EventTypes.PAUSE.value:
status = self.session.services.validatenodeservice(node, service, services) status = self.session.services.validatenodeservice(node, service, services)
if status != 0: if status:
fail += "%s," % service.name fail += "%s," % service.name
if event_type == EventTypes.RECONFIGURE.value: if event_type == EventTypes.RECONFIGURE.value:
self.session.services.node_service_reconfigure(node, service, services) self.session.services.node_service_reconfigure(node, service, services)

View file

@ -7,6 +7,10 @@ a list of available services to the GUI and for configuring individual
services. services.
""" """
import time
from multiprocessing.pool import ThreadPool
import enum
from core.constants import which from core.constants import which
from core import CoreCommandError from core import CoreCommandError
@ -17,6 +21,16 @@ from core.enumerations import RegisterTlvs
from core.misc import utils from core.misc import utils
class ServiceBootError(Exception):
pass
class ServiceMode(enum.Enum):
BLOCKING = 0
NON_BLOCKING = 1
TIMER = 2
class ServiceShim(object): class ServiceShim(object):
keys = ["dirs", "files", "startidx", "cmdup", "cmddown", "cmdval", "meta", "starttime"] keys = ["dirs", "files", "startidx", "cmdup", "cmddown", "cmdval", "meta", "starttime"]
@ -211,7 +225,7 @@ class CoreServices(object):
self.defaultservices.clear() self.defaultservices.clear()
self.customservices.clear() self.customservices.clear()
def get_service_startups(self, services): def node_service_startups(self, services):
# generate service map and find starting points # generate service map and find starting points
node_services = {service.name: service for service in services} node_services = {service.name: service for service in services}
is_dependency = set() is_dependency = set()
@ -400,6 +414,9 @@ class CoreServices(object):
:param core.netns.vnode.LxcNode node: node to start services on :param core.netns.vnode.LxcNode node: node to start services on
:return: nothing :return: nothing
""" """
pool = ThreadPool()
results = []
services = sorted(node.services, key=lambda x: x.startindex) services = sorted(node.services, key=lambda x: x.startindex)
use_startup_service = any(map(self.is_startup_service, services)) use_startup_service = any(map(self.is_startup_service, services))
for service in services: for service in services:
@ -412,7 +429,13 @@ class CoreServices(object):
continue continue
except ValueError: except ValueError:
logger.exception("error converting start time to float") logger.exception("error converting start time to float")
self.bootnodeservice(node, service, services, use_startup_service) result = pool.apply_async(self.bootnodeservice, (node, service, services, use_startup_service))
results.append(result)
pool.close()
pool.join()
for result in results:
result.get()
def bootnodeservice(self, node, service, services, use_startup_service): def bootnodeservice(self, node, service, services, use_startup_service):
""" """
@ -425,62 +448,35 @@ class CoreServices(object):
:param bool use_startup_service: flag to use startup services or not :param bool use_startup_service: flag to use startup services or not
:return: nothing :return: nothing
""" """
if service.custom:
self.bootnodecustomservice(node, service, services, use_startup_service)
return
logger.info("starting node(%s) service: %s (%s)", node.name, service.name, service.startindex) logger.info("starting node(%s) service: %s (%s)", node.name, service.name, service.startindex)
# create service directories
for directory in service.dirs: for directory in service.dirs:
node.privatedir(directory) node.privatedir(directory)
for filename in service.getconfigfilenames(node.objid, services): # create service files
cfg = service.generateconfig(node, filename, services) self.node_service_files(node, service, services)
node.nodefile(filename, cfg)
# check for startup service
if use_startup_service and not self.is_startup_service(service): if use_startup_service and not self.is_startup_service(service):
return return
for args in service.getstartup(node, services): # run startup
# TODO: this wait=False can be problematic! wait = service.validation_mode == ServiceMode.BLOCKING
node.cmd(args, wait=False) status = self.node_service_startup(node, service, services, wait)
if status:
raise ServiceBootError("node(%s) service(%s) error during startup" % (node.name, service.name))
def bootnodecustomservice(self, node, service, services, use_startup_service): # wait for time if provided, default to a time previously used to provide a small buffer
""" time.sleep(0.125)
Start a custom service on a node. Create private dirs, use supplied if service.validation_timer:
config files, and execute supplied startup commands. time.sleep(service.validation_timer)
:param core.netns.vnode.LxcNode node: node to boot services on # run validation commands, if present and not timer mode
:param CoreService service: service to start if service.validation_mode != ServiceMode.TIMER:
:param list services: service list status = self.validatenodeservice(node, service, services)
:param bool use_startup_service: flag to use startup services or not if status:
:return: nothing raise ServiceBootError("node(%s) service(%s) failed validation" % (node.name, service.name))
"""
logger.info("starting node(%s) service(custom): %s (%s)", node.name, service.name, service.startindex)
for directory in service.dirs:
node.privatedir(directory)
logger.info("service configurations: %s", service.configs)
for filename in service.configs:
logger.info("generating service config: %s", filename)
cfg = service.configtxt.get(filename)
if cfg is None:
cfg = service.generateconfig(node, filename, services)
# cfg may have a file:/// url for copying from a file
try:
if self.copyservicefile(node, filename, cfg):
continue
except IOError:
logger.exception("error copying service file '%s'", filename)
continue
node.nodefile(filename, cfg)
if use_startup_service and not self.is_startup_service(service):
return
for args in service.startup:
# TODO: this wait=False can be problematic!
node.cmd(args, wait=False)
def copyservicefile(self, node, filename, cfg): def copyservicefile(self, node, filename, cfg):
""" """
@ -510,7 +506,7 @@ class CoreServices(object):
:param core.netns.vnode.LxcNode node: node to validate services for :param core.netns.vnode.LxcNode node: node to validate services for
:return: nothing :return: nothing
""" """
services = sorted(node.services, key=lambda service: service.startindex) services = sorted(node.services, key=lambda x: x.startindex)
for service in services: for service in services:
self.validatenodeservice(node, service, services) self.validatenodeservice(node, service, services)
@ -524,17 +520,16 @@ class CoreServices(object):
:return: service validation status :return: service validation status
:rtype: int :rtype: int
""" """
logger.info("validating service for node (%s): %s (%s)", node.name, service.name, service.startindex) logger.info("validating node(%s) service(%s): %s", node.name, service.name, service.startindex)
if service.custom: cmds = service.validate
validate_cmds = service.validate if not service.custom:
else: cmds = service.getvalidate(node, services)
validate_cmds = service.getvalidate(node, services)
status = 0 status = 0
for args in validate_cmds: for cmd in cmds:
logger.info("validating service %s using: %s", service.name, args) logger.info("validating service %s using: %s", service.name, cmd)
try: try:
node.check_cmd(args) node.check_cmd(cmd)
except CoreCommandError: except CoreCommandError:
logger.exception("validate command failed") logger.exception("validate command failed")
status = -1 status = -1
@ -561,14 +556,13 @@ class CoreServices(object):
:return: status for stopping the services :return: status for stopping the services
:rtype: str :rtype: str
""" """
status = "0" status = 0
for args in service.shutdown: for args in service.shutdown:
try: try:
node.check_cmd(args) node.check_cmd(args)
except CoreCommandError: except CoreCommandError:
logger.exception("error running stop command %s", args) logger.exception("error running stop command %s", args)
# TODO: determine if its ok to just return the bad exit status status = -1
status = "-1"
return status return status
def getservicefile(self, service_name, node, filename, services): def getservicefile(self, service_name, node, filename, services):
@ -608,7 +602,7 @@ class CoreServices(object):
# get the file data # get the file data
data = service.configtxt.get(filename) data = service.configtxt.get(filename)
if data is None: if data is None:
data = "%s" % service.generateconfig(node, filename, services) data = "%s" % service.generateconfig(node, filename, node_services)
else: else:
data = "%s" % data data = "%s" % data
@ -637,45 +631,81 @@ class CoreServices(object):
self.setcustomservice(node_id, service_name) self.setcustomservice(node_id, service_name)
# retrieve custom service # retrieve custom service
svc = self.getcustomservice(node_id, service_name) service = self.getcustomservice(node_id, service_name)
if svc is None: if service is None:
logger.warn("received filename for unknown service: %s", service_name) logger.warn("received filename for unknown service: %s", service_name)
return return
# validate file being set is valid # validate file being set is valid
cfgfiles = svc.configs cfgfiles = service.configs
if filename not in cfgfiles: if filename not in cfgfiles:
logger.warn("received unknown file '%s' for service '%s'", filename, service_name) logger.warn("received unknown file '%s' for service '%s'", filename, service_name)
return return
# set custom service file data # set custom service file data
svc.configtxt[filename] = data service.configtxt[filename] = data
def node_service_startup(self, node, service, services): def node_service_startup(self, node, service, services, wait=False):
""" """
Startup a node service. Startup a node service.
:param PyCoreNode node: node to reconfigure service for :param PyCoreNode node: node to reconfigure service for
:param CoreService service: service to reconfigure :param CoreService service: service to reconfigure
:param list[CoreService] services: node services :param list[CoreService] services: node services
:param bool wait: determines if we should wait to validate startup
:return: status of startup :return: status of startup
:rtype: str :rtype: int
""" """
if service.custom: cmds = service.startup
cmds = service.startup if not service.custom:
else:
cmds = service.getstartup(node, services) cmds = service.getstartup(node, services)
status = "0" status = 0
for args in cmds: for cmd in cmds:
try: try:
node.check_cmd(args) if wait:
node.check_cmd(cmd)
else:
node.cmd(cmd, wait=False)
except CoreCommandError: except CoreCommandError:
logger.exception("error starting command") logger.exception("error starting command")
status = "-1" status = -1
return status return status
def node_service_files(self, node, service, services):
"""
Creates node service files.
:param PyCoreNode node: node to reconfigure service for
:param CoreService service: service to reconfigure
:param list[CoreService] services: node services
:return: nothing
"""
# get values depending on if custom or not
file_names = service.configs
if not service.custom:
file_names = service.getconfigfilenames(node.objid, services)
for file_name in file_names:
logger.info("generating service config: %s", file_name)
if service.custom:
cfg = service.configtxt.get(file_name)
if cfg is None:
cfg = service.generateconfig(node, file_name, services)
# cfg may have a file:/// url for copying from a file
try:
if self.copyservicefile(node, file_name, cfg):
continue
except IOError:
logger.exception("error copying service file: %s", file_name)
continue
else:
cfg = service.generateconfig(node, file_name, services)
node.nodefile(file_name, cfg)
def node_service_reconfigure(self, node, service, services): def node_service_reconfigure(self, node, service, services):
""" """
Reconfigure a node service. Reconfigure a node service.
@ -685,21 +715,20 @@ class CoreServices(object):
:param list[CoreService] services: node services :param list[CoreService] services: node services
:return: nothing :return: nothing
""" """
if service.custom: file_names = service.configs
cfgfiles = service.configs if not service.custom:
else: file_names = service.getconfigfilenames(node.objid, services)
cfgfiles = service.getconfigfilenames(node.objid, services)
for filename in cfgfiles: for file_name in file_names:
if filename[:7] == "file:///": if file_name[:7] == "file:///":
# TODO: implement this # TODO: implement this
raise NotImplementedError raise NotImplementedError
cfg = service.configtxt.get(filename) cfg = service.configtxt.get(file_name)
if cfg is None: if cfg is None:
cfg = service.generateconfig(node, filename, services) cfg = service.generateconfig(node, file_name, services)
node.nodefile(filename, cfg) node.nodefile(file_name, cfg)
class CoreService(object): class CoreService(object):
@ -742,6 +771,12 @@ class CoreService(object):
# list of validate commands # list of validate commands
validate = () validate = ()
# validation mode, used to determine startup success
validation_mode = ServiceMode.NON_BLOCKING
# time to wait for determining if service started successfully
validation_timer = 0
# metadata associated with this service # metadata associated with this service
meta = None meta = None

View file

@ -607,12 +607,6 @@ class Session(object):
# boot the services on each node # boot the services on each node
self.boot_nodes() self.boot_nodes()
# allow time for processes to start
time.sleep(0.125)
# validate nodes
self.validate_nodes()
# set broker local instantiation to complete # set broker local instantiation to complete
self.broker.local_instantiation_complete() self.broker.local_instantiation_complete()
@ -732,24 +726,6 @@ class Session(object):
self.update_control_interface_hosts() self.update_control_interface_hosts()
def validate_nodes(self):
"""
Validate all nodes that are known by the session.
:return: nothing
"""
with self._objects_lock:
for obj in self.objects.itervalues():
# TODO: issues with checking PyCoreNode alone, validate is not a method
# such as vnoded process, bridges, etc.
if not isinstance(obj, nodes.PyCoreNode):
continue
if nodeutils.is_node(obj, NodeTypes.RJ45):
continue
obj.validate()
def get_control_net_prefixes(self): def get_control_net_prefixes(self):
""" """
Retrieve control net prefixes. Retrieve control net prefixes.

View file

@ -39,7 +39,6 @@ class TestNodes:
assert node.alive() assert node.alive()
assert node.up assert node.up
assert node.check_cmd(["ip", "addr", "show", "lo"]) assert node.check_cmd(["ip", "addr", "show", "lo"])
node.validate()
def test_node_update(self, session): def test_node_update(self, session):
# given # given