daemon: modified node service boot to properly order services and account for services with the same dependency without cycles, for now removed trying to simultaneously booting services
This commit is contained in:
parent
3e41d31c6c
commit
936d782e41
2 changed files with 129 additions and 166 deletions
|
@ -10,7 +10,17 @@ services.
|
|||
import enum
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple, Type
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from core import utils
|
||||
from core.emulator.data import FileData
|
||||
|
@ -21,6 +31,8 @@ from core.nodes.base import CoreNode
|
|||
if TYPE_CHECKING:
|
||||
from core.emulator.session import Session
|
||||
|
||||
CoreServiceType = Union["CoreService", Type["CoreService"]]
|
||||
|
||||
|
||||
class ServiceBootError(Exception):
|
||||
pass
|
||||
|
@ -39,95 +51,35 @@ class ServiceDependencies:
|
|||
provided.
|
||||
"""
|
||||
|
||||
def __init__(self, services: List["CoreService"]) -> None:
|
||||
# helpers to check validity
|
||||
self.dependents: Dict[str, Set[str]] = {}
|
||||
self.booted: Set[str] = set()
|
||||
self.node_services: Dict[str, "CoreService"] = {}
|
||||
for service in services:
|
||||
self.node_services[service.name] = service
|
||||
for dependency in service.dependencies:
|
||||
dependents = self.dependents.setdefault(dependency, set())
|
||||
dependents.add(service.name)
|
||||
|
||||
# used to find paths
|
||||
self.path: List["CoreService"] = []
|
||||
def __init__(self, services: List["CoreServiceType"]) -> None:
|
||||
self.visited: Set[str] = set()
|
||||
self.visiting: Set[str] = set()
|
||||
self.boot: List["CoreServiceType"] = []
|
||||
self.services: Dict[str, "CoreServiceType"] = {}
|
||||
for service in services:
|
||||
self.services[service.name] = service
|
||||
|
||||
def boot_paths(self) -> List[List["CoreService"]]:
|
||||
"""
|
||||
Generates the boot paths for the services provided to the class.
|
||||
def _search(self, service: "CoreServiceType", visiting: Set[str] = None) -> None:
|
||||
if service.name in self.visited:
|
||||
return
|
||||
self.visited.add(service.name)
|
||||
if visiting is None:
|
||||
visiting = set()
|
||||
visiting.add(service.name)
|
||||
for dependency in service.dependencies:
|
||||
service_dependency = self.services.get(dependency)
|
||||
if not service_dependency:
|
||||
raise ValueError(f"required dependency was not provided: {dependency}")
|
||||
if dependency in visiting:
|
||||
raise ValueError(f"circular dependency, already visited: {dependency}")
|
||||
else:
|
||||
self._search(service_dependency, visiting)
|
||||
visiting.remove(service.name)
|
||||
self.boot.append(service)
|
||||
|
||||
:return: list of services to boot, in order
|
||||
"""
|
||||
paths = []
|
||||
for name in self.node_services:
|
||||
service = self.node_services[name]
|
||||
if service.name in self.booted:
|
||||
logging.debug(
|
||||
"skipping service that will already be booted: %s", service.name
|
||||
)
|
||||
continue
|
||||
|
||||
path = self._start(service)
|
||||
if path:
|
||||
paths.append(path)
|
||||
|
||||
if self.booted != set(self.node_services):
|
||||
raise ValueError(
|
||||
"failure to boot all services: %s != %s"
|
||||
% (self.booted, self.node_services.keys())
|
||||
)
|
||||
|
||||
return paths
|
||||
|
||||
def _reset(self) -> None:
|
||||
self.path = []
|
||||
self.visited.clear()
|
||||
self.visiting.clear()
|
||||
|
||||
def _start(self, service: "CoreService") -> List["CoreService"]:
|
||||
logging.debug("starting service dependency check: %s", service.name)
|
||||
self._reset()
|
||||
return self._visit(service)
|
||||
|
||||
def _visit(self, current_service: "CoreService") -> List["CoreService"]:
|
||||
logging.debug("visiting service(%s): %s", current_service.name, self.path)
|
||||
self.visited.add(current_service.name)
|
||||
self.visiting.add(current_service.name)
|
||||
|
||||
# dive down
|
||||
for service_name in current_service.dependencies:
|
||||
if service_name not in self.node_services:
|
||||
raise ValueError(
|
||||
"required dependency was not included in node services: %s"
|
||||
% service_name
|
||||
)
|
||||
|
||||
if service_name in self.visiting:
|
||||
raise ValueError(
|
||||
"cyclic dependency at service(%s): %s"
|
||||
% (current_service.name, service_name)
|
||||
)
|
||||
|
||||
if service_name not in self.visited:
|
||||
service = self.node_services[service_name]
|
||||
self._visit(service)
|
||||
|
||||
# add service when bottom is found
|
||||
logging.debug("adding service to boot path: %s", current_service.name)
|
||||
self.booted.add(current_service.name)
|
||||
self.path.append(current_service)
|
||||
self.visiting.remove(current_service.name)
|
||||
|
||||
# rise back up
|
||||
for service_name in self.dependents.get(current_service.name, []):
|
||||
if service_name not in self.visited:
|
||||
service = self.node_services[service_name]
|
||||
self._visit(service)
|
||||
|
||||
return self.path
|
||||
def boot_order(self) -> List["CoreServiceType"]:
|
||||
for service in self.services.values():
|
||||
self._search(service)
|
||||
return self.boot
|
||||
|
||||
|
||||
class ServiceShim:
|
||||
|
@ -470,37 +422,21 @@ class CoreServices:
|
|||
:param node: node to start services on
|
||||
:return: nothing
|
||||
"""
|
||||
boot_paths = ServiceDependencies(node.services).boot_paths()
|
||||
funcs = []
|
||||
for boot_path in boot_paths:
|
||||
args = (node, boot_path)
|
||||
funcs.append((self._start_boot_paths, args, {}))
|
||||
result, exceptions = utils.threadpool(funcs)
|
||||
if exceptions:
|
||||
raise ServiceBootError(*exceptions)
|
||||
|
||||
def _start_boot_paths(self, node: CoreNode, boot_path: List["CoreService"]) -> None:
|
||||
"""
|
||||
Start all service boot paths found, based on dependencies.
|
||||
|
||||
:param node: node to start services on
|
||||
:param boot_path: service to start in dependent order
|
||||
:return: nothing
|
||||
"""
|
||||
services = ServiceDependencies(node.services).boot_order()
|
||||
logging.info(
|
||||
"booting node(%s) services: %s",
|
||||
node.name,
|
||||
" -> ".join([x.name for x in boot_path]),
|
||||
" -> ".join([x.name for x in services]),
|
||||
)
|
||||
for service in boot_path:
|
||||
for service in services:
|
||||
service = self.get_service(node.id, service.name, default_service=True)
|
||||
try:
|
||||
self.boot_service(node, service)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logging.exception("exception booting service: %s", service.name)
|
||||
raise
|
||||
raise ServiceBootError(e)
|
||||
|
||||
def boot_service(self, node: CoreNode, service: "CoreService") -> None:
|
||||
def boot_service(self, node: CoreNode, service: "CoreServiceType") -> None:
|
||||
"""
|
||||
Start a service on a node. Create private dirs, generate config
|
||||
files, and execute startup commands.
|
||||
|
@ -584,7 +520,7 @@ class CoreServices:
|
|||
return True
|
||||
return False
|
||||
|
||||
def validate_service(self, node: CoreNode, service: "CoreService") -> int:
|
||||
def validate_service(self, node: CoreNode, service: "CoreServiceType") -> int:
|
||||
"""
|
||||
Run the validation command(s) for a service.
|
||||
|
||||
|
@ -622,7 +558,7 @@ class CoreServices:
|
|||
for service in node.services:
|
||||
self.stop_service(node, service)
|
||||
|
||||
def stop_service(self, node: CoreNode, service: "CoreService") -> int:
|
||||
def stop_service(self, node: CoreNode, service: "CoreServiceType") -> int:
|
||||
"""
|
||||
Stop a service on a node.
|
||||
|
||||
|
@ -724,7 +660,7 @@ class CoreServices:
|
|||
service.config_data[file_name] = data
|
||||
|
||||
def startup_service(
|
||||
self, node: CoreNode, service: "CoreService", wait: bool = False
|
||||
self, node: CoreNode, service: "CoreServiceType", wait: bool = False
|
||||
) -> int:
|
||||
"""
|
||||
Startup a node service.
|
||||
|
@ -747,7 +683,7 @@ class CoreServices:
|
|||
status = -1
|
||||
return status
|
||||
|
||||
def create_service_files(self, node: CoreNode, service: "CoreService") -> None:
|
||||
def create_service_files(self, node: CoreNode, service: "CoreServiceType") -> None:
|
||||
"""
|
||||
Creates node service files.
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import itertools
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
@ -15,40 +16,6 @@ SERVICE_ONE = "MyService"
|
|||
SERVICE_TWO = "MyService2"
|
||||
|
||||
|
||||
class ServiceA(CoreService):
|
||||
name = "A"
|
||||
dependencies = ("B",)
|
||||
|
||||
|
||||
class ServiceB(CoreService):
|
||||
name = "B"
|
||||
dependencies = ()
|
||||
|
||||
|
||||
class ServiceC(CoreService):
|
||||
name = "C"
|
||||
dependencies = ("B", "D")
|
||||
|
||||
|
||||
class ServiceD(CoreService):
|
||||
name = "D"
|
||||
dependencies = ()
|
||||
|
||||
|
||||
class ServiceBadDependency(CoreService):
|
||||
name = "E"
|
||||
dependencies = ("Z",)
|
||||
|
||||
|
||||
class ServiceF(CoreService):
|
||||
name = "F"
|
||||
dependencies = ()
|
||||
|
||||
|
||||
class ServiceCycleDependency(CoreService):
|
||||
name = "G"
|
||||
|
||||
|
||||
class TestServices:
|
||||
def test_service_all_files(self, session: Session):
|
||||
# given
|
||||
|
@ -255,35 +222,95 @@ class TestServices:
|
|||
|
||||
def test_services_dependencies(self):
|
||||
# given
|
||||
services = [ServiceA, ServiceB, ServiceC, ServiceD, ServiceF]
|
||||
service_a = CoreService()
|
||||
service_a.name = "a"
|
||||
service_b = CoreService()
|
||||
service_b.name = "b"
|
||||
service_c = CoreService()
|
||||
service_c.name = "c"
|
||||
service_d = CoreService()
|
||||
service_d.name = "d"
|
||||
service_e = CoreService()
|
||||
service_e.name = "e"
|
||||
service_a.dependencies = (service_b.name,)
|
||||
service_b.dependencies = ()
|
||||
service_c.dependencies = (service_b.name, service_d.name)
|
||||
service_d.dependencies = ()
|
||||
service_e.dependencies = ()
|
||||
services = [service_a, service_b, service_c, service_d, service_e]
|
||||
|
||||
# when
|
||||
boot_paths = ServiceDependencies(services).boot_paths()
|
||||
results = []
|
||||
permutations = itertools.permutations(services)
|
||||
for permutation in permutations:
|
||||
permutation = list(permutation)
|
||||
result = ServiceDependencies(permutation).boot_order()
|
||||
results.append(result)
|
||||
|
||||
# then
|
||||
assert len(boot_paths) == 2
|
||||
for result in results:
|
||||
assert len(result) == len(services)
|
||||
|
||||
def test_services_dependencies_not_present(self):
|
||||
def test_services_missing_dependency(self):
|
||||
# given
|
||||
services = [
|
||||
ServiceA,
|
||||
ServiceB,
|
||||
ServiceC,
|
||||
ServiceD,
|
||||
ServiceF,
|
||||
ServiceBadDependency,
|
||||
]
|
||||
service_a = CoreService()
|
||||
service_a.name = "a"
|
||||
service_b = CoreService()
|
||||
service_b.name = "b"
|
||||
service_c = CoreService()
|
||||
service_c.name = "c"
|
||||
service_a.dependencies = (service_b.name,)
|
||||
service_b.dependencies = (service_c.name,)
|
||||
service_c.dependencies = ("d",)
|
||||
services = [service_a, service_b, service_c]
|
||||
|
||||
# when, then
|
||||
with pytest.raises(ValueError):
|
||||
ServiceDependencies(services).boot_paths()
|
||||
permutations = itertools.permutations(services)
|
||||
for permutation in permutations:
|
||||
permutation = list(permutation)
|
||||
with pytest.raises(ValueError):
|
||||
ServiceDependencies(permutation).boot_order()
|
||||
|
||||
def test_services_dependencies_cycle(self):
|
||||
# given
|
||||
service_d = ServiceD()
|
||||
service_d.dependencies = ("C",)
|
||||
services = [ServiceA, ServiceB, ServiceC, service_d, ServiceF]
|
||||
service_a = CoreService()
|
||||
service_a.name = "a"
|
||||
service_b = CoreService()
|
||||
service_b.name = "b"
|
||||
service_c = CoreService()
|
||||
service_c.name = "c"
|
||||
service_a.dependencies = (service_b.name,)
|
||||
service_b.dependencies = (service_c.name,)
|
||||
service_c.dependencies = (service_a.name,)
|
||||
services = [service_a, service_b, service_c]
|
||||
|
||||
# when, then
|
||||
with pytest.raises(ValueError):
|
||||
ServiceDependencies(services).boot_paths()
|
||||
permutations = itertools.permutations(services)
|
||||
for permutation in permutations:
|
||||
permutation = list(permutation)
|
||||
with pytest.raises(ValueError):
|
||||
ServiceDependencies(permutation).boot_order()
|
||||
|
||||
def test_services_common_dependency(self):
|
||||
# given
|
||||
service_a = CoreService()
|
||||
service_a.name = "a"
|
||||
service_b = CoreService()
|
||||
service_b.name = "b"
|
||||
service_c = CoreService()
|
||||
service_c.name = "c"
|
||||
service_b.dependencies = (service_a.name,)
|
||||
service_c.dependencies = (service_a.name, service_b.name)
|
||||
services = [service_a, service_b, service_c]
|
||||
|
||||
# when
|
||||
results = []
|
||||
permutations = itertools.permutations(services)
|
||||
for permutation in permutations:
|
||||
permutation = list(permutation)
|
||||
result = ServiceDependencies(permutation).boot_order()
|
||||
results.append(result)
|
||||
|
||||
# then
|
||||
for result in results:
|
||||
assert result == [service_a, service_b, service_c]
|
||||
|
|
Loading…
Reference in a new issue