daemon: update deprecated typing for core.api

This commit is contained in:
Blake Harnden 2023-04-13 11:22:37 -07:00
parent 6ff2abf0b8
commit e770bcd47c
5 changed files with 113 additions and 109 deletions

View file

@ -1,7 +1,7 @@
import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Optional, Union
import grpc
from grpc import ServicerContext
@ -63,8 +63,8 @@ class CpuUsage:
def add_node_data(
_class: Type[NodeBase], node_proto: core_pb2.Node
) -> Tuple[Position, NodeOptions]:
_class: type[NodeBase], node_proto: core_pb2.Node
) -> tuple[Position, NodeOptions]:
"""
Convert node protobuf message to data for creating a node.
@ -118,7 +118,7 @@ def link_iface(iface_proto: core_pb2.Interface) -> InterfaceData:
def add_link_data(
link_proto: core_pb2.Link,
) -> Tuple[InterfaceData, InterfaceData, LinkOptions]:
) -> tuple[InterfaceData, InterfaceData, LinkOptions]:
"""
Convert link proto to link interfaces and options data.
@ -145,8 +145,8 @@ def add_link_data(
def create_nodes(
session: Session, node_protos: List[core_pb2.Node]
) -> Tuple[List[NodeBase], List[Exception]]:
session: Session, node_protos: list[core_pb2.Node]
) -> tuple[list[NodeBase], list[Exception]]:
"""
Create nodes using a thread pool and wait for completion.
@ -176,8 +176,8 @@ def create_nodes(
def create_links(
session: Session, link_protos: List[core_pb2.Link]
) -> Tuple[List[NodeBase], List[Exception]]:
session: Session, link_protos: list[core_pb2.Link]
) -> tuple[list[NodeBase], list[Exception]]:
"""
Create links using a thread pool and wait for completion.
@ -200,8 +200,8 @@ def create_links(
def edit_links(
session: Session, link_protos: List[core_pb2.Link]
) -> Tuple[List[None], List[Exception]]:
session: Session, link_protos: list[core_pb2.Link]
) -> tuple[list[None], list[Exception]]:
"""
Edit links using a thread pool and wait for completion.
@ -235,7 +235,7 @@ def convert_value(value: Any) -> str:
return value
def convert_session_options(session: Session) -> Dict[str, common_pb2.ConfigOption]:
def convert_session_options(session: Session) -> dict[str, common_pb2.ConfigOption]:
config_options = {}
for option in session.options.options:
value = session.options.get(option.id)
@ -252,9 +252,9 @@ def convert_session_options(session: Session) -> Dict[str, common_pb2.ConfigOpti
def get_config_options(
config: Dict[str, str],
configurable_options: Union[ConfigurableOptions, Type[ConfigurableOptions]],
) -> Dict[str, common_pb2.ConfigOption]:
config: dict[str, str],
configurable_options: Union[ConfigurableOptions, type[ConfigurableOptions]],
) -> dict[str, common_pb2.ConfigOption]:
"""
Retrieve configuration options in a form that is used by the grpc server.
@ -283,7 +283,7 @@ def get_config_options(
def get_node_proto(
session: Session, node: NodeBase, emane_configs: List[NodeEmaneConfig]
session: Session, node: NodeBase, emane_configs: list[NodeEmaneConfig]
) -> core_pb2.Node:
"""
Convert CORE node to protobuf representation.
@ -390,7 +390,7 @@ def get_node_proto(
)
def get_links(session: Session, node: NodeBase) -> List[core_pb2.Link]:
def get_links(session: Session, node: NodeBase) -> list[core_pb2.Link]:
"""
Retrieve a list of links for grpc to use.
@ -435,7 +435,7 @@ def convert_iface(iface: CoreInterface) -> core_pb2.Interface:
)
def convert_core_link(core_link: CoreLink) -> List[core_pb2.Link]:
def convert_core_link(core_link: CoreLink) -> list[core_pb2.Link]:
"""
Convert core link to protobuf data.
@ -581,7 +581,7 @@ def convert_link(
)
def parse_proc_net_dev(lines: List[str]) -> Dict[str, Any]:
def parse_proc_net_dev(lines: list[str]) -> dict[str, dict[str, float]]:
"""
Parse lines of output from /proc/net/dev.
@ -599,7 +599,7 @@ def parse_proc_net_dev(lines: List[str]) -> Dict[str, Any]:
return stats
def get_net_stats() -> Dict[str, Dict]:
def get_net_stats() -> dict[str, dict[str, float]]:
"""
Retrieve status about the current interfaces in the system
@ -728,7 +728,7 @@ def get_nem_id(
return nem_id
def get_emane_model_configs_dict(session: Session) -> Dict[int, List[NodeEmaneConfig]]:
def get_emane_model_configs_dict(session: Session) -> dict[int, list[NodeEmaneConfig]]:
"""
Get emane model configuration protobuf data.
@ -751,7 +751,7 @@ def get_emane_model_configs_dict(session: Session) -> Dict[int, List[NodeEmaneCo
return configs
def get_hooks(session: Session) -> List[core_pb2.Hook]:
def get_hooks(session: Session) -> list[core_pb2.Hook]:
"""
Retrieve hook protobuf data for a session.
@ -767,7 +767,7 @@ def get_hooks(session: Session) -> List[core_pb2.Hook]:
return hooks
def get_default_services(session: Session) -> List[ServiceDefaults]:
def get_default_services(session: Session) -> list[ServiceDefaults]:
"""
Retrieve the default service sets for a given session.