daemon: added refactored cleaned up logic for future handling of broadcasting data

This commit is contained in:
Blake Harnden 2023-04-12 16:14:58 -07:00
parent b6b300207b
commit da3cebe1cd

View file

@ -0,0 +1,70 @@
from collections.abc import Callable
from typing import TypeVar, Union
from core.emulator.data import (
ConfigData,
EventData,
ExceptionData,
FileData,
LinkData,
NodeData,
)
from core.errors import CoreError
T = TypeVar(
"T",
bound=Union[
EventData,
ExceptionData,
NodeData,
LinkData,
FileData,
ConfigData,
],
)
class BroadcastManager:
def __init__(self) -> None:
"""
Creates a BroadcastManager instance.
"""
self.handlers: dict[type[T], set[Callable[[T], None]]] = {}
def send(self, data: T) -> None:
"""
Retrieve handlers for data, and run all current handlers.
:param data: data to provide to handlers
:return: nothing
"""
handlers = self.handlers.get(type(data), set())
for handler in handlers:
handler(data)
def add_handler(self, data_type: type[T], handler: Callable[[T], None]) -> None:
"""
Add a handler for a given data type.
:param data_type: type of data to add handler for
:param handler: handler to add
:return: nothing
"""
handlers = self.handlers.setdefault(data_type, set())
handlers.add(handler)
def remove_handler(self, data_type: type[T], handler: Callable[[T], None]) -> None:
"""
Remove a handler for a given data type.
:param data_type: type of data to remove handler for
:param handler: handler to remove
:return: nothing
"""
handlers = self.handlers.get(data_type, set())
if handler not in handlers:
raise CoreError(
f"cannot remove data({data_type}) handler({repr(handler)}), "
f"does not exist "
)
handlers.remove(handler)