daemon: added type hinting to Session

This commit is contained in:
Blake Harnden 2020-06-09 20:03:32 -07:00
parent a79ba1b8d3
commit 32ad8a9b68

View file

@ -15,9 +15,17 @@ import time
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar
from core import constants, utils
from core.configservice.manager import ConfigServiceManager
from core.emane.emanemanager import EmaneManager
from core.emane.nodes import EmaneNet
from core.emulator.data import ConfigData, EventData, ExceptionData, FileData, LinkData
from core.emulator.data import (
ConfigData,
EventData,
ExceptionData,
FileData,
LinkData,
NodeData,
)
from core.emulator.distributed import DistributedController
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import (
@ -89,62 +97,62 @@ class Session:
:param config: session configuration
:param mkdir: flag to determine if a directory should be made
"""
self.id = _id
self.id: int = _id
# define and create session directory when desired
self.session_dir = os.path.join(tempfile.gettempdir(), f"pycore.{self.id}")
self.session_dir: str = os.path.join(tempfile.gettempdir(), f"pycore.{self.id}")
if mkdir:
os.mkdir(self.session_dir)
self.name = None
self.file_name = None
self.thumbnail = None
self.user = None
self.event_loop = EventLoop()
self.link_colors = {}
self.name: Optional[str] = None
self.file_name: Optional[str] = None
self.thumbnail: Optional[str] = None
self.user: Optional[str] = None
self.event_loop: EventLoop = EventLoop()
self.link_colors: Dict[int, str] = {}
# dict of nodes: all nodes and nets
self.nodes = {}
self.nodes: Dict[int, NodeBase] = {}
self._nodes_lock = threading.Lock()
self.state = EventTypes.DEFINITION_STATE
self._state_time = time.monotonic()
self._state_file = os.path.join(self.session_dir, "state")
self.state: EventTypes = EventTypes.DEFINITION_STATE
self._state_time: float = time.monotonic()
self._state_file: str = os.path.join(self.session_dir, "state")
# hooks handlers
self._hooks = {}
self._state_hooks = {}
self._hooks: Dict[EventTypes, Tuple[str, str]] = {}
self._state_hooks: Dict[EventTypes, Callable[[int], None]] = {}
self.add_state_hook(
state=EventTypes.RUNTIME_STATE, hook=self.runtime_state_hook
)
# handlers for broadcasting information
self.event_handlers = []
self.exception_handlers = []
self.node_handlers = []
self.link_handlers = []
self.file_handlers = []
self.config_handlers = []
self.shutdown_handlers = []
self.event_handlers: List[Callable[[EventData], None]] = []
self.exception_handlers: List[Callable[[ExceptionData], None]] = []
self.node_handlers: List[Callable[[NodeData], None]] = []
self.link_handlers: List[Callable[[LinkData], None]] = []
self.file_handlers: List[Callable[[FileData], None]] = []
self.config_handlers: List[Callable[[ConfigData], None]] = []
self.shutdown_handlers: List[Callable[[Session], None]] = []
# session options/metadata
self.options = SessionConfig()
self.options: SessionConfig = SessionConfig()
if not config:
config = {}
for key in config:
value = config[key]
self.options.set_config(key, value)
self.metadata = {}
self.metadata: Dict[str, str] = {}
# distributed support and logic
self.distributed = DistributedController(self)
self.distributed: DistributedController = DistributedController(self)
# initialize session feature helpers
self.location = GeoLocation()
self.mobility = MobilityManager(session=self)
self.services = CoreServices(session=self)
self.emane = EmaneManager(session=self)
self.sdt = Sdt(session=self)
self.location: GeoLocation = GeoLocation()
self.mobility: MobilityManager = MobilityManager(self)
self.services: CoreServices = CoreServices(self)
self.emane: EmaneManager = EmaneManager(self)
self.sdt: Sdt = Sdt(self)
# initialize default node services
self.services.default_services = {
@ -156,7 +164,7 @@ class Session:
}
# config services
self.service_manager = None
self.service_manager: Optional[ConfigServiceManager] = None
@classmethod
def get_node_class(cls, _type: NodeTypes) -> Type[NodeBase]: