daemon: updated core.emulator to avoid using deprecated type hinting
This commit is contained in:
parent
4c222d1a7a
commit
8abf2561bf
7 changed files with 63 additions and 63 deletions
|
@ -1,7 +1,6 @@
|
|||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Type
|
||||
|
||||
from core import utils
|
||||
from core.configservice.manager import ConfigServiceManager
|
||||
|
@ -20,7 +19,7 @@ class CoreEmu:
|
|||
Provides logic for creating and configuring CORE sessions and the nodes within them.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, str] = None) -> None:
|
||||
def __init__(self, config: dict[str, str] = None) -> None:
|
||||
"""
|
||||
Create a CoreEmu object.
|
||||
|
||||
|
@ -31,13 +30,13 @@ class CoreEmu:
|
|||
|
||||
# configuration
|
||||
config = config if config else {}
|
||||
self.config: Dict[str, str] = config
|
||||
self.config: dict[str, str] = config
|
||||
|
||||
# session management
|
||||
self.sessions: Dict[int, Session] = {}
|
||||
self.sessions: dict[int, Session] = {}
|
||||
|
||||
# load services
|
||||
self.service_errors: List[str] = []
|
||||
self.service_errors: list[str] = []
|
||||
self.service_manager: ConfigServiceManager = ConfigServiceManager()
|
||||
self._load_services()
|
||||
|
||||
|
@ -119,7 +118,7 @@ class CoreEmu:
|
|||
_, session = self.sessions.popitem()
|
||||
session.shutdown()
|
||||
|
||||
def create_session(self, _id: int = None, _cls: Type[Session] = Session) -> Session:
|
||||
def create_session(self, _id: int = None, _cls: type[Session] = Session) -> Session:
|
||||
"""
|
||||
Create a new CORE session.
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
CORE data objects.
|
||||
"""
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Any, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
import netaddr
|
||||
|
||||
|
@ -24,7 +24,7 @@ class ConfigData:
|
|||
node: int = None
|
||||
object: str = None
|
||||
type: int = None
|
||||
data_types: Tuple[int] = None
|
||||
data_types: tuple[int] = None
|
||||
data_values: str = None
|
||||
captions: str = None
|
||||
bitmap: str = None
|
||||
|
@ -81,8 +81,8 @@ class NodeOptions:
|
|||
model: Optional[str] = "PC"
|
||||
canvas: int = None
|
||||
icon: str = None
|
||||
services: List[str] = field(default_factory=list)
|
||||
config_services: List[str] = field(default_factory=list)
|
||||
services: list[str] = field(default_factory=list)
|
||||
config_services: list[str] = field(default_factory=list)
|
||||
x: float = None
|
||||
y: float = None
|
||||
lat: float = None
|
||||
|
@ -93,9 +93,9 @@ class NodeOptions:
|
|||
emane: str = None
|
||||
legacy: bool = False
|
||||
# src, dst
|
||||
binds: List[Tuple[str, str]] = field(default_factory=list)
|
||||
binds: list[tuple[str, str]] = field(default_factory=list)
|
||||
# src, dst, unique, delete
|
||||
volumes: List[Tuple[str, str, bool, bool]] = field(default_factory=list)
|
||||
volumes: list[tuple[str, str, bool, bool]] = field(default_factory=list)
|
||||
|
||||
def set_position(self, x: float, y: float) -> None:
|
||||
"""
|
||||
|
@ -148,7 +148,7 @@ class InterfaceData:
|
|||
ip6_mask: int = None
|
||||
mtu: int = None
|
||||
|
||||
def get_ips(self) -> List[str]:
|
||||
def get_ips(self) -> list[str]:
|
||||
"""
|
||||
Returns a list of ip4 and ip6 addresses when present.
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import threading
|
|||
from collections import OrderedDict
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import TYPE_CHECKING, Callable, Dict, Tuple
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
|
||||
import netaddr
|
||||
from fabric import Connection
|
||||
|
@ -48,7 +48,7 @@ class DistributedServer:
|
|||
self.lock: threading.Lock = threading.Lock()
|
||||
|
||||
def remote_cmd(
|
||||
self, cmd: str, env: Dict[str, str] = None, cwd: str = None, wait: bool = True
|
||||
self, cmd: str, env: dict[str, str] = None, cwd: str = None, wait: bool = True
|
||||
) -> str:
|
||||
"""
|
||||
Run command remotely using server connection.
|
||||
|
@ -123,8 +123,8 @@ class DistributedController:
|
|||
:param session: session
|
||||
"""
|
||||
self.session: "Session" = session
|
||||
self.servers: Dict[str, DistributedServer] = OrderedDict()
|
||||
self.tunnels: Dict[int, Tuple[GreTap, GreTap]] = {}
|
||||
self.servers: dict[str, DistributedServer] = OrderedDict()
|
||||
self.tunnels: dict[int, tuple[GreTap, GreTap]] = {}
|
||||
self.address: str = self.session.options.get("distributed_address")
|
||||
|
||||
def add_server(self, name: str, host: str) -> None:
|
||||
|
@ -213,7 +213,7 @@ class DistributedController:
|
|||
|
||||
def create_gre_tunnel(
|
||||
self, node: CoreNetwork, server: DistributedServer, mtu: int, start: bool
|
||||
) -> Tuple[GreTap, GreTap]:
|
||||
) -> tuple[GreTap, GreTap]:
|
||||
"""
|
||||
Create gre tunnel using a pair of gre taps between the local and remote server.
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ class HookManager:
|
|||
cwd=directory,
|
||||
env=env,
|
||||
)
|
||||
except (IOError, subprocess.CalledProcessError) as e:
|
||||
except (OSError, subprocess.CalledProcessError) as e:
|
||||
raise CoreError(
|
||||
f"failure running state({state.name}) "
|
||||
f"hook script({file_name}): {e}",
|
||||
|
|
|
@ -4,8 +4,9 @@ for a session.
|
|||
"""
|
||||
|
||||
import logging
|
||||
from collections.abc import ValuesView
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional, Tuple, ValuesView
|
||||
from typing import Optional
|
||||
|
||||
from core.emulator.data import LinkData, LinkOptions
|
||||
from core.emulator.enumerations import LinkTypes, MessageFlags
|
||||
|
@ -15,7 +16,7 @@ from core.nodes.interface import CoreInterface
|
|||
from core.nodes.network import PtpNet
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
LinkKeyType = Tuple[int, Optional[int], int, Optional[int]]
|
||||
LinkKeyType = tuple[int, Optional[int], int, Optional[int]]
|
||||
|
||||
|
||||
def create_key(
|
||||
|
@ -145,8 +146,8 @@ class LinkManager:
|
|||
"""
|
||||
Create a LinkManager instance.
|
||||
"""
|
||||
self._links: Dict[LinkKeyType, CoreLink] = {}
|
||||
self._node_links: Dict[int, Dict[LinkKeyType, CoreLink]] = {}
|
||||
self._links: dict[LinkKeyType, CoreLink] = {}
|
||||
self._node_links: dict[int, dict[LinkKeyType, CoreLink]] = {}
|
||||
|
||||
def add(self, core_link: CoreLink) -> None:
|
||||
"""
|
||||
|
|
|
@ -14,7 +14,7 @@ import tempfile
|
|||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Callable, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union
|
||||
from typing import Callable, Optional, TypeVar, Union
|
||||
|
||||
from core import constants, utils
|
||||
from core.configservice.manager import ConfigServiceManager
|
||||
|
@ -66,7 +66,7 @@ from core.xml.corexml import CoreXmlReader, CoreXmlWriter
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
# maps for converting from API call node type values to classes and vice versa
|
||||
NODES: Dict[NodeTypes, Type[NodeBase]] = {
|
||||
NODES: dict[NodeTypes, type[NodeBase]] = {
|
||||
NodeTypes.DEFAULT: CoreNode,
|
||||
NodeTypes.PHYSICAL: PhysicalNode,
|
||||
NodeTypes.SWITCH: SwitchNode,
|
||||
|
@ -82,12 +82,12 @@ NODES: Dict[NodeTypes, Type[NodeBase]] = {
|
|||
NodeTypes.LXC: LxcNode,
|
||||
NodeTypes.WIRELESS: WirelessNode,
|
||||
}
|
||||
NODES_TYPE: Dict[Type[NodeBase], NodeTypes] = {NODES[x]: x for x in NODES}
|
||||
CONTAINER_NODES: Set[Type[NodeBase]] = {DockerNode, LxcNode}
|
||||
NODES_TYPE: dict[type[NodeBase], NodeTypes] = {NODES[x]: x for x in NODES}
|
||||
CONTAINER_NODES: set[type[NodeBase]] = {DockerNode, LxcNode}
|
||||
CTRL_NET_ID: int = 9001
|
||||
LINK_COLORS: List[str] = ["green", "blue", "orange", "purple", "turquoise"]
|
||||
LINK_COLORS: list[str] = ["green", "blue", "orange", "purple", "turquoise"]
|
||||
NT: TypeVar = TypeVar("NT", bound=NodeBase)
|
||||
WIRELESS_TYPE: Tuple[Type[WlanNode], Type[EmaneNet], Type[WirelessNode]] = (
|
||||
WIRELESS_TYPE: tuple[type[WlanNode], type[EmaneNet], type[WirelessNode]] = (
|
||||
WlanNode,
|
||||
EmaneNet,
|
||||
WirelessNode,
|
||||
|
@ -100,7 +100,7 @@ class Session:
|
|||
"""
|
||||
|
||||
def __init__(
|
||||
self, _id: int, config: Dict[str, str] = None, mkdir: bool = True
|
||||
self, _id: int, config: dict[str, str] = None, mkdir: bool = True
|
||||
) -> None:
|
||||
"""
|
||||
Create a Session instance.
|
||||
|
@ -121,33 +121,33 @@ class Session:
|
|||
self.thumbnail: Optional[Path] = None
|
||||
self.user: Optional[str] = None
|
||||
self.event_loop: EventLoop = EventLoop()
|
||||
self.link_colors: Dict[int, str] = {}
|
||||
self.link_colors: dict[int, str] = {}
|
||||
|
||||
# dict of nodes: all nodes and nets
|
||||
self.nodes: Dict[int, NodeBase] = {}
|
||||
self.nodes: dict[int, NodeBase] = {}
|
||||
self.nodes_lock: threading.Lock = threading.Lock()
|
||||
self.link_manager: LinkManager = LinkManager()
|
||||
|
||||
# states and hooks handlers
|
||||
self.state: EventTypes = EventTypes.DEFINITION_STATE
|
||||
self.state_time: float = time.monotonic()
|
||||
self.hooks: Dict[EventTypes, List[Tuple[str, str]]] = {}
|
||||
self.state_hooks: Dict[EventTypes, List[Callable[[EventTypes], None]]] = {}
|
||||
self.hooks: dict[EventTypes, list[tuple[str, str]]] = {}
|
||||
self.state_hooks: dict[EventTypes, list[Callable[[EventTypes], None]]] = {}
|
||||
self.add_state_hook(
|
||||
state=EventTypes.RUNTIME_STATE, hook=self.runtime_state_hook
|
||||
)
|
||||
|
||||
# handlers for broadcasting information
|
||||
self.event_handlers: List[Callable[[EventData], None]] = []
|
||||
self.exception_handlers: List[Callable[[ExceptionData], None]] = []
|
||||
self.node_handlers: List[Callable[[NodeData], None]] = []
|
||||
self.link_handlers: List[Callable[[LinkData], None]] = []
|
||||
self.file_handlers: List[Callable[[FileData], None]] = []
|
||||
self.config_handlers: List[Callable[[ConfigData], None]] = []
|
||||
self.event_handlers: list[Callable[[EventData], None]] = []
|
||||
self.exception_handlers: list[Callable[[ExceptionData], None]] = []
|
||||
self.node_handlers: list[Callable[[NodeData], None]] = []
|
||||
self.link_handlers: list[Callable[[LinkData], None]] = []
|
||||
self.file_handlers: list[Callable[[FileData], None]] = []
|
||||
self.config_handlers: list[Callable[[ConfigData], None]] = []
|
||||
|
||||
# session options/metadata
|
||||
self.options: SessionConfig = SessionConfig(config)
|
||||
self.metadata: Dict[str, str] = {}
|
||||
self.metadata: dict[str, str] = {}
|
||||
|
||||
# distributed support and logic
|
||||
self.distributed: DistributedController = DistributedController(self)
|
||||
|
@ -163,7 +163,7 @@ class Session:
|
|||
self.service_manager: Optional[ConfigServiceManager] = None
|
||||
|
||||
@classmethod
|
||||
def get_node_class(cls, _type: NodeTypes) -> Type[NodeBase]:
|
||||
def get_node_class(cls, _type: NodeTypes) -> type[NodeBase]:
|
||||
"""
|
||||
Retrieve the class for a given node type.
|
||||
|
||||
|
@ -176,7 +176,7 @@ class Session:
|
|||
return node_class
|
||||
|
||||
@classmethod
|
||||
def get_node_type(cls, _class: Type[NodeBase]) -> NodeTypes:
|
||||
def get_node_type(cls, _class: type[NodeBase]) -> NodeTypes:
|
||||
"""
|
||||
Retrieve node type for a given node class.
|
||||
|
||||
|
@ -238,7 +238,7 @@ class Session:
|
|||
iface1_data: InterfaceData = None,
|
||||
iface2_data: InterfaceData = None,
|
||||
options: LinkOptions = None,
|
||||
) -> Tuple[Optional[CoreInterface], Optional[CoreInterface]]:
|
||||
) -> tuple[Optional[CoreInterface], Optional[CoreInterface]]:
|
||||
"""
|
||||
Add a link between nodes.
|
||||
|
||||
|
@ -345,7 +345,7 @@ class Session:
|
|||
iface1_data: InterfaceData = None,
|
||||
iface2_data: InterfaceData = None,
|
||||
options: LinkOptions = None,
|
||||
) -> Tuple[CoreInterface, CoreInterface]:
|
||||
) -> tuple[CoreInterface, CoreInterface]:
|
||||
"""
|
||||
Create a wired link between two nodes.
|
||||
|
||||
|
@ -476,7 +476,7 @@ class Session:
|
|||
|
||||
def add_node(
|
||||
self,
|
||||
_class: Type[NT],
|
||||
_class: type[NT],
|
||||
_id: int = None,
|
||||
name: str = None,
|
||||
server: str = None,
|
||||
|
@ -745,7 +745,7 @@ class Session:
|
|||
for hook in hooks:
|
||||
self.run_hook(hook)
|
||||
|
||||
def run_hook(self, hook: Tuple[str, str]) -> None:
|
||||
def run_hook(self, hook: tuple[str, str]) -> None:
|
||||
"""
|
||||
Run a hook.
|
||||
|
||||
|
@ -769,7 +769,7 @@ class Session:
|
|||
cwd=self.directory,
|
||||
env=self.get_environment(),
|
||||
)
|
||||
except (IOError, subprocess.CalledProcessError):
|
||||
except (OSError, subprocess.CalledProcessError):
|
||||
logger.exception("error running hook: %s", file_path)
|
||||
|
||||
def run_state_hooks(self, state: EventTypes) -> None:
|
||||
|
@ -835,7 +835,7 @@ class Session:
|
|||
xml_file_path = self.directory / "session-deployed.xml"
|
||||
xml_writer.write(xml_file_path)
|
||||
|
||||
def get_environment(self, state: bool = True) -> Dict[str, str]:
|
||||
def get_environment(self, state: bool = True) -> dict[str, str]:
|
||||
"""
|
||||
Get an environment suitable for a subprocess.Popen call.
|
||||
This is the current process environment with some session-specific
|
||||
|
@ -870,7 +870,7 @@ class Session:
|
|||
if path.is_file():
|
||||
try:
|
||||
utils.load_config(path, env)
|
||||
except IOError:
|
||||
except OSError:
|
||||
logger.exception("error reading environment file: %s", path)
|
||||
return env
|
||||
|
||||
|
@ -887,12 +887,12 @@ class Session:
|
|||
uid = pwd.getpwnam(user).pw_uid
|
||||
gid = self.directory.stat().st_gid
|
||||
os.chown(self.directory, uid, gid)
|
||||
except IOError:
|
||||
except OSError:
|
||||
logger.exception("failed to set permission on %s", self.directory)
|
||||
|
||||
def create_node(
|
||||
self,
|
||||
_class: Type[NT],
|
||||
_class: type[NT],
|
||||
start: bool,
|
||||
_id: int = None,
|
||||
name: str = None,
|
||||
|
@ -928,7 +928,7 @@ class Session:
|
|||
node.startup()
|
||||
return node
|
||||
|
||||
def get_node(self, _id: int, _class: Type[NT]) -> NT:
|
||||
def get_node(self, _id: int, _class: type[NT]) -> NT:
|
||||
"""
|
||||
Get a session node.
|
||||
|
||||
|
@ -1002,7 +1002,7 @@ class Session:
|
|||
)
|
||||
self.broadcast_exception(exception_data)
|
||||
|
||||
def instantiate(self) -> List[Exception]:
|
||||
def instantiate(self) -> list[Exception]:
|
||||
"""
|
||||
We have entered the instantiation state, invoke startup methods
|
||||
of various managers and boot the nodes. Validate nodes and check
|
||||
|
@ -1121,7 +1121,7 @@ class Session:
|
|||
self.services.boot_services(node)
|
||||
node.start_config_services()
|
||||
|
||||
def boot_nodes(self) -> List[Exception]:
|
||||
def boot_nodes(self) -> list[Exception]:
|
||||
"""
|
||||
Invoke the boot() procedure for all nodes and send back node
|
||||
messages to the GUI for node messages that had the status
|
||||
|
@ -1143,7 +1143,7 @@ class Session:
|
|||
self.update_control_iface_hosts()
|
||||
return exceptions
|
||||
|
||||
def get_control_net_prefixes(self) -> List[str]:
|
||||
def get_control_net_prefixes(self) -> list[str]:
|
||||
"""
|
||||
Retrieve control net prefixes.
|
||||
|
||||
|
@ -1158,7 +1158,7 @@ class Session:
|
|||
p0 = p
|
||||
return [p0, p1, p2, p3]
|
||||
|
||||
def get_control_net_server_ifaces(self) -> List[str]:
|
||||
def get_control_net_server_ifaces(self) -> list[str]:
|
||||
"""
|
||||
Retrieve control net server interfaces.
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Dict, List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from core.config import ConfigBool, ConfigInt, ConfigString, Configuration
|
||||
from core.errors import CoreError
|
||||
|
@ -10,7 +10,7 @@ class SessionConfig:
|
|||
Provides session configuration.
|
||||
"""
|
||||
|
||||
options: List[Configuration] = [
|
||||
options: list[Configuration] = [
|
||||
ConfigString(id="controlnet", label="Control Network"),
|
||||
ConfigString(id="controlnet0", label="Control Network 0"),
|
||||
ConfigString(id="controlnet1", label="Control Network 1"),
|
||||
|
@ -35,16 +35,16 @@ class SessionConfig:
|
|||
ConfigInt(id="mtu", default="0", label="MTU for All Devices"),
|
||||
]
|
||||
|
||||
def __init__(self, config: Dict[str, str] = None) -> None:
|
||||
def __init__(self, config: dict[str, str] = None) -> None:
|
||||
"""
|
||||
Create a SessionConfig instance.
|
||||
|
||||
:param config: configuration to initialize with
|
||||
"""
|
||||
self._config: Dict[str, str] = {x.id: x.default for x in self.options}
|
||||
self._config: dict[str, str] = {x.id: x.default for x in self.options}
|
||||
self._config.update(config or {})
|
||||
|
||||
def update(self, config: Dict[str, str]) -> None:
|
||||
def update(self, config: dict[str, str]) -> None:
|
||||
"""
|
||||
Update current configuration with provided values.
|
||||
|
||||
|
@ -73,7 +73,7 @@ class SessionConfig:
|
|||
"""
|
||||
return self._config.get(name, default)
|
||||
|
||||
def all(self) -> Dict[str, str]:
|
||||
def all(self) -> dict[str, str]:
|
||||
"""
|
||||
Retrieve all configuration options.
|
||||
|
||||
|
|
Loading…
Reference in a new issue