daemon: added class variable type hinting to core.api.grpc

This commit is contained in:
Blake Harnden 2020-06-12 08:34:02 -07:00
parent e72e332bab
commit cfaa9397ad
4 changed files with 17 additions and 16 deletions

View file

@ -5,7 +5,7 @@ gRpc client for interfacing with CORE.
import logging import logging
import threading import threading
from contextlib import contextmanager from contextlib import contextmanager
from typing import Any, Callable, Dict, Generator, Iterable, List from typing import Any, Callable, Dict, Generator, Iterable, List, Optional
import grpc import grpc
@ -108,7 +108,7 @@ class InterfaceHelper:
:param ip6_prefix: ip6 prefix to use for generation :param ip6_prefix: ip6 prefix to use for generation
:raises ValueError: when both ip4 and ip6 prefixes have not been provided :raises ValueError: when both ip4 and ip6 prefixes have not been provided
""" """
self.prefixes = IpPrefixes(ip4_prefix, ip6_prefix) self.prefixes: IpPrefixes = IpPrefixes(ip4_prefix, ip6_prefix)
def create_interface( def create_interface(
self, node_id: int, interface_id: int, name: str = None, mac: str = None self, node_id: int, interface_id: int, name: str = None, mac: str = None
@ -177,10 +177,10 @@ class CoreGrpcClient:
:param address: grpc server address to connect to :param address: grpc server address to connect to
""" """
self.address = address self.address: str = address
self.stub = None self.stub: Optional[core_pb2_grpc.CoreApiStub] = None
self.channel = None self.channel: Optional[grpc.Channel] = None
self.proxy = proxy self.proxy: bool = proxy
def start_session( def start_session(
self, self,

View file

@ -140,9 +140,9 @@ class EventStreamer:
:param session: session to process events for :param session: session to process events for
:param event_types: types of events to process :param event_types: types of events to process
""" """
self.session = session self.session: Session = session
self.event_types = event_types self.event_types: Iterable[core_pb2.EventType] = event_types
self.queue = Queue() self.queue: Queue = Queue()
self.add_handlers() self.add_handlers()
def add_handlers(self) -> None: def add_handlers(self) -> None:

View file

@ -1,6 +1,6 @@
import logging import logging
import time import time
from typing import Any, Dict, List, Tuple, Type from typing import Any, Dict, List, Tuple, Type, Union
import grpc import grpc
import netaddr import netaddr
@ -190,7 +190,8 @@ def convert_value(value: Any) -> str:
def get_config_options( def get_config_options(
config: Dict[str, str], configurable_options: Type[ConfigurableOptions] config: Dict[str, str],
configurable_options: Union[ConfigurableOptions, Type[ConfigurableOptions]],
) -> Dict[str, common_pb2.ConfigOption]: ) -> Dict[str, common_pb2.ConfigOption]:
""" """
Retrieve configuration options in a form that is used by the grpc server. Retrieve configuration options in a form that is used by the grpc server.
@ -418,7 +419,7 @@ def service_configuration(session: Session, config: ServiceConfig) -> None:
service.shutdown = tuple(config.shutdown) service.shutdown = tuple(config.shutdown)
def get_service_configuration(service: Type[CoreService]) -> NodeServiceData: def get_service_configuration(service: CoreService) -> NodeServiceData:
""" """
Convenience for converting a service to service data proto. Convenience for converting a service to service data proto.

View file

@ -6,7 +6,7 @@ import tempfile
import threading import threading
import time import time
from concurrent import futures from concurrent import futures
from typing import Iterable, Type from typing import Iterable, Optional, Type
import grpc import grpc
from grpc import ServicerContext from grpc import ServicerContext
@ -131,9 +131,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def __init__(self, coreemu: CoreEmu) -> None: def __init__(self, coreemu: CoreEmu) -> None:
super().__init__() super().__init__()
self.coreemu = coreemu self.coreemu: CoreEmu = coreemu
self.running = True self.running: bool = True
self.server = None self.server: Optional[grpc.Server] = None
atexit.register(self._exit_handler) atexit.register(self._exit_handler)
def _exit_handler(self) -> None: def _exit_handler(self) -> None: