updates to service dependency resolution to allow for multithreaded startup, also improved tests to validate service boot ordering for expected outcomes
This commit is contained in:
parent
f687115522
commit
05247524d7
2 changed files with 121 additions and 31 deletions
|
@ -53,18 +53,34 @@ class ServiceDependencies:
|
|||
|
||||
def __init__(self, services: List["CoreServiceType"]) -> None:
|
||||
self.visited: Set[str] = set()
|
||||
self.boot: List["CoreServiceType"] = []
|
||||
self.services: Dict[str, "CoreServiceType"] = {}
|
||||
self.paths: Dict[str, List["CoreServiceType"]] = {}
|
||||
self.boot_paths: List[List["CoreServiceType"]] = []
|
||||
roots = set([x.name for x in services])
|
||||
for service in services:
|
||||
self.services[service.name] = service
|
||||
roots -= set(service.dependencies)
|
||||
self.roots: List["CoreServiceType"] = [x for x in services if x.name in roots]
|
||||
if services and not self.roots:
|
||||
raise ValueError("circular dependency is present")
|
||||
|
||||
def _search(self, service: "CoreServiceType", visiting: Set[str] = None) -> None:
|
||||
def _search(
|
||||
self,
|
||||
service: "CoreServiceType",
|
||||
visiting: Set[str] = None,
|
||||
path: List[str] = None,
|
||||
) -> List["CoreServiceType"]:
|
||||
if service.name in self.visited:
|
||||
return
|
||||
return self.paths[service.name]
|
||||
self.visited.add(service.name)
|
||||
if visiting is None:
|
||||
visiting = set()
|
||||
visiting.add(service.name)
|
||||
if path is None:
|
||||
for dependency in service.dependencies:
|
||||
path = self.paths.get(dependency)
|
||||
if path is not None:
|
||||
break
|
||||
for dependency in service.dependencies:
|
||||
service_dependency = self.services.get(dependency)
|
||||
if not service_dependency:
|
||||
|
@ -72,14 +88,19 @@ class ServiceDependencies:
|
|||
if dependency in visiting:
|
||||
raise ValueError(f"circular dependency, already visited: {dependency}")
|
||||
else:
|
||||
self._search(service_dependency, visiting)
|
||||
path = self._search(service_dependency, visiting, path)
|
||||
visiting.remove(service.name)
|
||||
self.boot.append(service)
|
||||
if path is None:
|
||||
path = []
|
||||
self.boot_paths.append(path)
|
||||
path.append(service)
|
||||
self.paths[service.name] = path
|
||||
return path
|
||||
|
||||
def boot_order(self) -> List["CoreServiceType"]:
|
||||
for service in self.services.values():
|
||||
def boot_order(self) -> List[List["CoreServiceType"]]:
|
||||
for service in self.roots:
|
||||
self._search(service)
|
||||
return self.boot
|
||||
return self.boot_paths
|
||||
|
||||
|
||||
class ServiceShim:
|
||||
|
@ -422,13 +443,22 @@ class CoreServices:
|
|||
:param node: node to start services on
|
||||
:return: nothing
|
||||
"""
|
||||
services = ServiceDependencies(node.services).boot_order()
|
||||
boot_paths = ServiceDependencies(node.services).boot_order()
|
||||
funcs = []
|
||||
for boot_path in boot_paths:
|
||||
args = (node, boot_path)
|
||||
funcs.append((self._boot_service_path, args, {}))
|
||||
result, exceptions = utils.threadpool(funcs)
|
||||
if exceptions:
|
||||
raise ServiceBootError(*exceptions)
|
||||
|
||||
def _boot_service_path(self, node: CoreNode, boot_path: List["CoreServiceType"]):
|
||||
logging.info(
|
||||
"booting node(%s) services: %s",
|
||||
node.name,
|
||||
" -> ".join([x.name for x in services]),
|
||||
" -> ".join([x.name for x in boot_path]),
|
||||
)
|
||||
for service in services:
|
||||
for service in boot_path:
|
||||
service = self.get_service(node.id, service.name, default_service=True)
|
||||
try:
|
||||
self.boot_service(node, service)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue