refactored service boot path discovery to be more robust, still based on services provided alone
This commit is contained in:
parent
23f5d7fb8c
commit
4a9c751935
3 changed files with 125 additions and 113 deletions
|
@ -31,6 +31,92 @@ class ServiceMode(enum.Enum):
|
||||||
TIMER = 2
|
TIMER = 2
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceDependencies(object):
|
||||||
|
"""
|
||||||
|
Can generate boot paths for services, based on their dependencies. Will validate
|
||||||
|
that all services will be booted and that all dependencies exist within the services provided.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, services):
|
||||||
|
# helpers to check validity
|
||||||
|
self.dependents = {}
|
||||||
|
self.booted = set()
|
||||||
|
self.node_services = {}
|
||||||
|
for service in services:
|
||||||
|
self.node_services[service.name] = service
|
||||||
|
for dependency in service.dependencies:
|
||||||
|
dependents = self.dependents.setdefault(dependency, set())
|
||||||
|
dependents.add(service.name)
|
||||||
|
|
||||||
|
# used to find paths
|
||||||
|
self.path = []
|
||||||
|
self.visited = set()
|
||||||
|
self.visiting = set()
|
||||||
|
|
||||||
|
def boot_paths(self):
|
||||||
|
"""
|
||||||
|
Generates the boot paths for the services provided to the class.
|
||||||
|
|
||||||
|
:return: list of services to boot, in order
|
||||||
|
:rtype: list[core.service.CoreService]
|
||||||
|
"""
|
||||||
|
paths = []
|
||||||
|
for service in self.node_services.itervalues():
|
||||||
|
if service.name in self.booted:
|
||||||
|
logger.debug("skipping service that will already be booted: %s", service.name)
|
||||||
|
continue
|
||||||
|
|
||||||
|
path = self._start(service)
|
||||||
|
if path:
|
||||||
|
paths.append(path)
|
||||||
|
|
||||||
|
if self.booted != self.node_services.viewkeys():
|
||||||
|
raise ValueError("failure to boot all services: %s != %s", self.booted, self.node_services.keys())
|
||||||
|
|
||||||
|
return paths
|
||||||
|
|
||||||
|
def _reset(self):
|
||||||
|
self.path = []
|
||||||
|
self.visited.clear()
|
||||||
|
self.visiting.clear()
|
||||||
|
|
||||||
|
def _start(self, service):
|
||||||
|
logger.debug("starting service dependency check: %s", service.name)
|
||||||
|
self._reset()
|
||||||
|
return self._visit(service)
|
||||||
|
|
||||||
|
def _visit(self, current_service):
|
||||||
|
logger.debug("visiting service(%s): %s", current_service.name, self.path)
|
||||||
|
self.visited.add(current_service.name)
|
||||||
|
self.visiting.add(current_service.name)
|
||||||
|
|
||||||
|
# dive down
|
||||||
|
for service_name in current_service.dependencies:
|
||||||
|
if service_name not in self.node_services:
|
||||||
|
raise ValueError("required dependency was not included in node services: %s" % service_name)
|
||||||
|
|
||||||
|
if service_name in self.visiting:
|
||||||
|
raise ValueError("cyclic dependency at service(%s): %s" % (current_service.name, service_name))
|
||||||
|
|
||||||
|
if service_name not in self.visited:
|
||||||
|
service = self.node_services[service_name]
|
||||||
|
self._visit(service)
|
||||||
|
|
||||||
|
# add service when bottom is found
|
||||||
|
logger.debug("adding service to boot path: %s", current_service.name)
|
||||||
|
self.booted.add(current_service.name)
|
||||||
|
self.path.append(current_service)
|
||||||
|
self.visiting.remove(current_service.name)
|
||||||
|
|
||||||
|
# rise back up
|
||||||
|
for service_name in self.dependents.get(current_service.name, []):
|
||||||
|
if service_name not in self.visited:
|
||||||
|
service = self.node_services[service_name]
|
||||||
|
self._visit(service)
|
||||||
|
|
||||||
|
return self.path
|
||||||
|
|
||||||
|
|
||||||
class ServiceShim(object):
|
class ServiceShim(object):
|
||||||
keys = ["dirs", "files", "startidx", "cmdup", "cmddown", "cmdval", "meta", "starttime"]
|
keys = ["dirs", "files", "startidx", "cmdup", "cmddown", "cmdval", "meta", "starttime"]
|
||||||
|
|
||||||
|
@ -217,87 +303,6 @@ class CoreServices(object):
|
||||||
self.default_services.clear()
|
self.default_services.clear()
|
||||||
self.custom_services.clear()
|
self.custom_services.clear()
|
||||||
|
|
||||||
def create_boot_paths(self, services):
|
|
||||||
"""
|
|
||||||
Create boot paths for starting up services based on dependencies. All services provided and their dependencies
|
|
||||||
must exist within this set of services, to be valid.
|
|
||||||
|
|
||||||
:param list[CoreService] services: service to create boot paths for
|
|
||||||
:return: list of boot paths for services
|
|
||||||
:rtype: list[list[CoreService]]
|
|
||||||
"""
|
|
||||||
# generate service map and find starting points
|
|
||||||
node_services = {service.name: service for service in services}
|
|
||||||
all_services = set()
|
|
||||||
has_dependency = set()
|
|
||||||
dependency_map = {}
|
|
||||||
for service in services:
|
|
||||||
all_services.add(service.name)
|
|
||||||
if service.dependencies:
|
|
||||||
has_dependency.add(service.name)
|
|
||||||
|
|
||||||
for dependency in service.dependencies:
|
|
||||||
dependents = dependency_map.setdefault(dependency, set())
|
|
||||||
dependents.add(service.name)
|
|
||||||
|
|
||||||
starting_points = all_services - has_dependency
|
|
||||||
|
|
||||||
# cycles means no starting points
|
|
||||||
if not starting_points:
|
|
||||||
raise ValueError("no valid service starting points")
|
|
||||||
|
|
||||||
stack = [iter(starting_points)]
|
|
||||||
|
|
||||||
# information used to traverse dependency graph
|
|
||||||
visited = set()
|
|
||||||
path = []
|
|
||||||
path_set = set()
|
|
||||||
|
|
||||||
# store startup orderings
|
|
||||||
startups = []
|
|
||||||
startup = []
|
|
||||||
|
|
||||||
logger.debug("starting points: %s", starting_points)
|
|
||||||
while stack:
|
|
||||||
for service_name in stack[-1]:
|
|
||||||
service = node_services[service_name]
|
|
||||||
logger.debug("evaluating: %s", service.name)
|
|
||||||
|
|
||||||
# check this is not a cycle
|
|
||||||
if service.name in path_set:
|
|
||||||
raise ValueError("service has a cyclic dependency: %s" % service.name)
|
|
||||||
# check that we have not already visited this node
|
|
||||||
elif service.name not in visited:
|
|
||||||
logger.debug("visiting: %s", service.name)
|
|
||||||
visited.add(service.name)
|
|
||||||
path.append(service.name)
|
|
||||||
path_set.add(service.name)
|
|
||||||
|
|
||||||
# retrieve and dependent services and add to stack
|
|
||||||
dependents = iter(dependency_map.get(service.name, []))
|
|
||||||
stack.append(dependents)
|
|
||||||
startup.append(service)
|
|
||||||
break
|
|
||||||
# for loop completed without a break
|
|
||||||
else:
|
|
||||||
logger.debug("finished a visit: path(%s)", path)
|
|
||||||
if path:
|
|
||||||
path_set.remove(path.pop())
|
|
||||||
|
|
||||||
if not path and startup:
|
|
||||||
# finalize startup path
|
|
||||||
startups.append(startup)
|
|
||||||
|
|
||||||
# reset new startup path
|
|
||||||
startup = []
|
|
||||||
|
|
||||||
stack.pop()
|
|
||||||
|
|
||||||
if visited != all_services:
|
|
||||||
raise ValueError("failure to visit all services for boot path")
|
|
||||||
|
|
||||||
return startups
|
|
||||||
|
|
||||||
def get_default_services(self, node_type):
|
def get_default_services(self, node_type):
|
||||||
"""
|
"""
|
||||||
Get the list of default services that should be enabled for a
|
Get the list of default services that should be enabled for a
|
||||||
|
@ -422,7 +427,7 @@ class CoreServices(object):
|
||||||
pool = ThreadPool()
|
pool = ThreadPool()
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
boot_paths = self.create_boot_paths(node.services)
|
boot_paths = ServiceDependencies(node.services).boot_paths()
|
||||||
for boot_path in boot_paths:
|
for boot_path in boot_paths:
|
||||||
result = pool.apply_async(self._start_boot_paths, (node, boot_path))
|
result = pool.apply_async(self._start_boot_paths, (node, boot_path))
|
||||||
results.append(result)
|
results.append(result)
|
||||||
|
@ -440,7 +445,7 @@ class CoreServices(object):
|
||||||
:param list[CoreService] boot_path: service to start in dependent order
|
:param list[CoreService] boot_path: service to start in dependent order
|
||||||
:return: nothing
|
:return: nothing
|
||||||
"""
|
"""
|
||||||
logger.debug("booting node service dependencies: %s", boot_path)
|
logger.info("booting node services: %s", boot_path)
|
||||||
for service in boot_path:
|
for service in boot_path:
|
||||||
self.boot_service(node, service)
|
self.boot_service(node, service)
|
||||||
|
|
||||||
|
|
|
@ -734,7 +734,7 @@ class Session(object):
|
||||||
pool.join()
|
pool.join()
|
||||||
for result in results:
|
for result in results:
|
||||||
result.get()
|
result.get()
|
||||||
logger.info("BOOT RUN TIME: %s", time.time() - start)
|
logger.debug("boot run time: %s", time.time() - start)
|
||||||
|
|
||||||
self.update_control_interface_hosts()
|
self.update_control_interface_hosts()
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ import os
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from core.service import CoreService
|
from core.service import CoreService
|
||||||
|
from core.service import ServiceDependencies
|
||||||
from core.service import ServiceManager
|
from core.service import ServiceManager
|
||||||
|
|
||||||
_PATH = os.path.abspath(os.path.dirname(__file__))
|
_PATH = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
@ -19,20 +20,20 @@ class ServiceA(CoreService):
|
||||||
|
|
||||||
class ServiceB(CoreService):
|
class ServiceB(CoreService):
|
||||||
name = "B"
|
name = "B"
|
||||||
dependencies = ("C",)
|
dependencies = ()
|
||||||
|
|
||||||
|
|
||||||
class ServiceC(CoreService):
|
class ServiceC(CoreService):
|
||||||
name = "C"
|
name = "C"
|
||||||
dependencies = ()
|
dependencies = ("B", "D")
|
||||||
|
|
||||||
|
|
||||||
class ServiceD(CoreService):
|
class ServiceD(CoreService):
|
||||||
name = "D"
|
name = "D"
|
||||||
dependencies = ("A",)
|
dependencies = ()
|
||||||
|
|
||||||
|
|
||||||
class ServiceE(CoreService):
|
class ServiceBadDependency(CoreService):
|
||||||
name = "E"
|
name = "E"
|
||||||
dependencies = ("Z",)
|
dependencies = ("Z",)
|
||||||
|
|
||||||
|
@ -42,6 +43,10 @@ class ServiceF(CoreService):
|
||||||
dependencies = ()
|
dependencies = ()
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceCycleDependency(CoreService):
|
||||||
|
name = "G"
|
||||||
|
|
||||||
|
|
||||||
class TestServices:
|
class TestServices:
|
||||||
def test_service_all_files(self, session):
|
def test_service_all_files(self, session):
|
||||||
# given
|
# given
|
||||||
|
@ -245,7 +250,23 @@ class TestServices:
|
||||||
assert default_service == my_service
|
assert default_service == my_service
|
||||||
assert custom_service and custom_service != my_service
|
assert custom_service and custom_service != my_service
|
||||||
|
|
||||||
def test_services_dependencies(self, session):
|
def test_services_dependencies(self):
|
||||||
|
# given
|
||||||
|
services = [
|
||||||
|
ServiceA,
|
||||||
|
ServiceB,
|
||||||
|
ServiceC,
|
||||||
|
ServiceD,
|
||||||
|
ServiceF
|
||||||
|
]
|
||||||
|
|
||||||
|
# when
|
||||||
|
boot_paths = ServiceDependencies(services).boot_paths()
|
||||||
|
|
||||||
|
# then
|
||||||
|
assert len(boot_paths) == 2
|
||||||
|
|
||||||
|
def test_services_dependencies_not_present(self):
|
||||||
# given
|
# given
|
||||||
services = [
|
services = [
|
||||||
ServiceA,
|
ServiceA,
|
||||||
|
@ -253,39 +274,25 @@ class TestServices:
|
||||||
ServiceC,
|
ServiceC,
|
||||||
ServiceD,
|
ServiceD,
|
||||||
ServiceF,
|
ServiceF,
|
||||||
|
ServiceBadDependency
|
||||||
]
|
]
|
||||||
|
|
||||||
# when
|
# when, then
|
||||||
startups = session.services.create_boot_paths(services)
|
with pytest.raises(ValueError):
|
||||||
|
ServiceDependencies(services).boot_paths()
|
||||||
|
|
||||||
# then
|
def test_services_dependencies_cycle(self):
|
||||||
assert len(startups) == 2
|
|
||||||
|
|
||||||
def test_services_dependencies_not_present(self, session):
|
|
||||||
# given
|
# given
|
||||||
|
service_d = ServiceD()
|
||||||
|
service_d.dependencies = ("C",)
|
||||||
services = [
|
services = [
|
||||||
ServiceA,
|
ServiceA,
|
||||||
ServiceB,
|
ServiceB,
|
||||||
ServiceC,
|
ServiceC,
|
||||||
ServiceE
|
service_d,
|
||||||
]
|
|
||||||
|
|
||||||
# when
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
session.services.create_boot_paths(services)
|
|
||||||
|
|
||||||
def test_services_dependencies_cycle(self, session):
|
|
||||||
# given
|
|
||||||
service_c = ServiceC()
|
|
||||||
service_c.dependencies = ("D",)
|
|
||||||
services = [
|
|
||||||
ServiceA,
|
|
||||||
ServiceB,
|
|
||||||
service_c,
|
|
||||||
ServiceD,
|
|
||||||
ServiceF
|
ServiceF
|
||||||
]
|
]
|
||||||
|
|
||||||
# when
|
# when, then
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
session.services.create_boot_paths(services)
|
ServiceDependencies(services).boot_paths()
|
||||||
|
|
Loading…
Add table
Reference in a new issue