daemon: updated core.location and core.plugins to avoid using deprecated type hinting

This commit is contained in:
Blake Harnden 2023-04-13 12:29:16 -07:00
parent 8abf2561bf
commit 7ea950f8ec
4 changed files with 47 additions and 48 deletions

View file

@ -6,7 +6,7 @@ import heapq
import threading
import time
from functools import total_ordering
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Optional
class Timer(threading.Thread):
@ -19,8 +19,8 @@ class Timer(threading.Thread):
self,
interval: float,
func: Callable[..., None],
args: Tuple[Any] = None,
kwargs: Dict[Any, Any] = None,
args: tuple[Any] = None,
kwargs: dict[Any, Any] = None,
) -> None:
"""
Create a Timer instance.
@ -38,11 +38,11 @@ class Timer(threading.Thread):
# validate arguments were provided
if args is None:
args = ()
self.args: Tuple[Any] = args
self.args: tuple[Any] = args
# validate keyword arguments were provided
if kwargs is None:
kwargs = {}
self.kwargs: Dict[Any, Any] = kwargs
self.kwargs: dict[Any, Any] = kwargs
def cancel(self) -> bool:
"""
@ -96,8 +96,8 @@ class Event:
self.eventnum: int = eventnum
self.time: float = event_time
self.func: Callable[..., None] = func
self.args: Tuple[Any] = args
self.kwds: Dict[Any, Any] = kwds
self.args: tuple[Any] = args
self.kwds: dict[Any, Any] = kwds
self.canceled: bool = False
def __lt__(self, other: "Event") -> bool:
@ -135,7 +135,7 @@ class EventLoop:
Creates a EventLoop instance.
"""
self.lock: threading.RLock = threading.RLock()
self.queue: List[Event] = []
self.queue: list[Event] = []
self.eventnum: int = 0
self.timer: Optional[Timer] = None
self.running: bool = False

View file

@ -3,7 +3,6 @@ Provides conversions from x,y,z to lon,lat,alt.
"""
import logging
from typing import Tuple
import pyproj
from pyproj import Transformer
@ -35,9 +34,9 @@ class GeoLocation:
self.to_geo: Transformer = pyproj.Transformer.from_crs(
CRS_PROJ, CRS_WGS84, always_xy=True
)
self.refproj: Tuple[float, float, float] = (0.0, 0.0, 0.0)
self.refgeo: Tuple[float, float, float] = (0.0, 0.0, 0.0)
self.refxyz: Tuple[float, float, float] = (0.0, 0.0, 0.0)
self.refproj: tuple[float, float, float] = (0.0, 0.0, 0.0)
self.refgeo: tuple[float, float, float] = (0.0, 0.0, 0.0)
self.refxyz: tuple[float, float, float] = (0.0, 0.0, 0.0)
self.refscale: float = 1.0
def setrefgeo(self, lat: float, lon: float, alt: float) -> None:
@ -84,7 +83,7 @@ class GeoLocation:
return 0.0
return SCALE_FACTOR * (value / self.refscale)
def getxyz(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]:
def getxyz(self, lat: float, lon: float, alt: float) -> tuple[float, float, float]:
"""
Convert provided lon,lat,alt to x,y,z.
@ -104,7 +103,7 @@ class GeoLocation:
logger.debug("result x,y,z(%s, %s, %s)", x, y, z)
return x, y, z
def getgeo(self, x: float, y: float, z: float) -> Tuple[float, float, float]:
def getgeo(self, x: float, y: float, z: float) -> tuple[float, float, float]:
"""
Convert provided x,y,z to lon,lat,alt.

View file

@ -9,7 +9,7 @@ import threading
import time
from functools import total_ordering
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Callable, Optional, Union
from core import utils
from core.config import (
@ -47,7 +47,7 @@ def get_mobility_node(session: "Session", node_id: int) -> Union[WlanNode, Emane
return session.get_node(node_id, EmaneNet)
def get_config_int(current: int, config: Dict[str, str], name: str) -> Optional[int]:
def get_config_int(current: int, config: dict[str, str], name: str) -> Optional[int]:
"""
Convenience function to get config values as int.
@ -63,7 +63,7 @@ def get_config_int(current: int, config: Dict[str, str], name: str) -> Optional[
def get_config_float(
current: Union[int, float], config: Dict[str, str], name: str
current: Union[int, float], config: dict[str, str], name: str
) -> Optional[float]:
"""
Convenience function to get config values as float.
@ -112,7 +112,7 @@ class MobilityManager(ModelManager):
"""
self.config_reset()
def startup(self, node_ids: List[int] = None) -> None:
def startup(self, node_ids: list[int] = None) -> None:
"""
Session is transitioning from instantiation to runtime state.
Instantiate any mobility models that have been configured for a WLAN.
@ -237,7 +237,7 @@ class WirelessModel(ConfigurableOptions):
self.session: "Session" = session
self.id: int = _id
def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]:
def links(self, flags: MessageFlags = MessageFlags.NONE) -> list[LinkData]:
"""
May be used if the model can populate the GUI with wireless (green)
link lines.
@ -247,7 +247,7 @@ class WirelessModel(ConfigurableOptions):
"""
return []
def update(self, moved_ifaces: List[CoreInterface]) -> None:
def update(self, moved_ifaces: list[CoreInterface]) -> None:
"""
Update this wireless model.
@ -256,7 +256,7 @@ class WirelessModel(ConfigurableOptions):
"""
raise NotImplementedError
def update_config(self, config: Dict[str, str]) -> None:
def update_config(self, config: dict[str, str]) -> None:
"""
For run-time updates of model config. Returns True when position callback and
set link parameters should be invoked.
@ -275,7 +275,7 @@ class BasicRangeModel(WirelessModel):
"""
name: str = "basic_range"
options: List[Configuration] = [
options: list[Configuration] = [
ConfigInt(id="range", default="275", label="wireless range (pixels)"),
ConfigInt(id="bandwidth", default="54000000", label="bandwidth (bps)"),
ConfigInt(id="jitter", default="0", label="transmission jitter (usec)"),
@ -298,7 +298,7 @@ class BasicRangeModel(WirelessModel):
super().__init__(session, _id)
self.session: "Session" = session
self.wlan: WlanNode = session.get_node(_id, WlanNode)
self.iface_to_pos: Dict[CoreInterface, Tuple[float, float, float]] = {}
self.iface_to_pos: dict[CoreInterface, tuple[float, float, float]] = {}
self.iface_lock: threading.Lock = threading.Lock()
self.range: int = 0
self.bw: Optional[int] = None
@ -323,7 +323,7 @@ class BasicRangeModel(WirelessModel):
iface.options.update(options)
iface.set_config()
def get_position(self, iface: CoreInterface) -> Tuple[float, float, float]:
def get_position(self, iface: CoreInterface) -> tuple[float, float, float]:
"""
Retrieve network interface position.
@ -352,7 +352,7 @@ class BasicRangeModel(WirelessModel):
position_callback = set_position
def update(self, moved_ifaces: List[CoreInterface]) -> None:
def update(self, moved_ifaces: list[CoreInterface]) -> None:
"""
Node positions have changed without recalc. Update positions from
node.position, then re-calculate links for those that have moved.
@ -412,7 +412,7 @@ class BasicRangeModel(WirelessModel):
@staticmethod
def calcdistance(
p1: Tuple[float, float, float], p2: Tuple[float, float, float]
p1: tuple[float, float, float], p2: tuple[float, float, float]
) -> float:
"""
Calculate the distance between two three-dimensional points.
@ -428,7 +428,7 @@ class BasicRangeModel(WirelessModel):
c = p1[2] - p2[2]
return math.hypot(math.hypot(a, b), c)
def update_config(self, config: Dict[str, str]) -> None:
def update_config(self, config: dict[str, str]) -> None:
"""
Configuration has changed during runtime.
@ -487,7 +487,7 @@ class BasicRangeModel(WirelessModel):
link_data = self.create_link_data(iface, iface2, message_type)
self.session.broadcast_link(link_data)
def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]:
def links(self, flags: MessageFlags = MessageFlags.NONE) -> list[LinkData]:
"""
Return a list of wireless link messages for when the GUI reconnects.
@ -513,7 +513,7 @@ class WayPoint:
self,
_time: float,
node_id: int,
coords: Tuple[float, float, Optional[float]],
coords: tuple[float, float, Optional[float]],
speed: float,
) -> None:
"""
@ -526,7 +526,7 @@ class WayPoint:
"""
self.time: float = _time
self.node_id: int = node_id
self.coords: Tuple[float, float, Optional[float]] = coords
self.coords: tuple[float, float, Optional[float]] = coords
self.speed: float = speed
def __eq__(self, other: "WayPoint") -> bool:
@ -563,10 +563,10 @@ class WayPointMobility(WirelessModel):
"""
super().__init__(session=session, _id=_id)
self.state: int = self.STATE_STOPPED
self.queue: List[WayPoint] = []
self.queue_copy: List[WayPoint] = []
self.points: Dict[int, WayPoint] = {}
self.initial: Dict[int, WayPoint] = {}
self.queue: list[WayPoint] = []
self.queue_copy: list[WayPoint] = []
self.points: dict[int, WayPoint] = {}
self.initial: dict[int, WayPoint] = {}
self.lasttime: Optional[float] = None
self.endtime: Optional[int] = None
self.timezero: float = 0.0
@ -855,7 +855,7 @@ class Ns2ScriptedMobility(WayPointMobility):
"""
name: str = "ns2script"
options: List[Configuration] = [
options: list[Configuration] = [
ConfigString(id="file", label="mobility script file"),
ConfigInt(id="refresh_ms", default="50", label="refresh time (ms)"),
ConfigBool(id="loop", default="1", label="loop"),
@ -867,7 +867,7 @@ class Ns2ScriptedMobility(WayPointMobility):
]
@classmethod
def config_groups(cls) -> List[ConfigGroup]:
def config_groups(cls) -> list[ConfigGroup]:
return [
ConfigGroup("ns-2 Mobility Script Parameters", 1, len(cls.configurations()))
]
@ -882,12 +882,12 @@ class Ns2ScriptedMobility(WayPointMobility):
super().__init__(session, _id)
self.file: Optional[Path] = None
self.autostart: Optional[str] = None
self.nodemap: Dict[int, int] = {}
self.nodemap: dict[int, int] = {}
self.script_start: Optional[str] = None
self.script_pause: Optional[str] = None
self.script_stop: Optional[str] = None
def update_config(self, config: Dict[str, str]) -> None:
def update_config(self, config: dict[str, str]) -> None:
self.file = Path(config["file"])
logger.info(
"ns-2 scripted mobility configured for WLAN %d using file: %s",
@ -916,7 +916,7 @@ class Ns2ScriptedMobility(WayPointMobility):
file_path = self.findfile(self.file)
try:
f = file_path.open("r")
except IOError:
except OSError:
logger.exception(
"ns-2 scripted mobility failed to load file: %s", self.file
)

View file

@ -5,7 +5,7 @@ sdt.py: Scripted Display Tool (SDT3D) helper
import logging
import socket
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type
from typing import TYPE_CHECKING, Optional
from urllib.parse import urlparse
from core.constants import CORE_CONF_DIR
@ -28,9 +28,9 @@ CORE_LAYER: str = "CORE"
NODE_LAYER: str = "CORE::Nodes"
LINK_LAYER: str = "CORE::Links"
WIRED_LINK_LAYER: str = f"{LINK_LAYER}::wired"
CORE_LAYERS: List[str] = [CORE_LAYER, LINK_LAYER, NODE_LAYER, WIRED_LINK_LAYER]
CORE_LAYERS: list[str] = [CORE_LAYER, LINK_LAYER, NODE_LAYER, WIRED_LINK_LAYER]
DEFAULT_LINK_COLOR: str = "red"
NODE_TYPES: Dict[Type[NodeBase], str] = {
NODE_TYPES: dict[type[NodeBase], str] = {
HubNode: "hub",
SwitchNode: "lanswitch",
TunnelNode: "tunnel",
@ -63,7 +63,7 @@ class Sdt:
# default altitude (in meters) for flyto view
DEFAULT_ALT: int = 2500
# TODO: read in user"s nodes.conf here; below are default node types from the GUI
DEFAULT_SPRITES: Dict[str, str] = [
DEFAULT_SPRITES: dict[str, str] = [
("router", "router.png"),
("host", "host.png"),
("PC", "pc.png"),
@ -88,9 +88,9 @@ class Sdt:
self.sock: Optional[socket.socket] = None
self.connected: bool = False
self.url: str = self.DEFAULT_SDT_URL
self.address: Optional[Tuple[Optional[str], Optional[int]]] = None
self.address: Optional[tuple[Optional[str], Optional[int]]] = None
self.protocol: Optional[str] = None
self.network_layers: Set[str] = set()
self.network_layers: set[str] = set()
self.session.node_handlers.append(self.handle_node_update)
self.session.link_handlers.append(self.handle_link_update)
@ -138,7 +138,7 @@ class Sdt:
else:
# Default to tcp
self.sock = socket.create_connection(self.address, 5)
except IOError:
except OSError:
logger.exception("SDT socket connect error")
return False
@ -176,7 +176,7 @@ class Sdt:
if self.sock:
try:
self.sock.close()
except IOError:
except OSError:
logger.error("error closing socket")
finally:
self.sock = None
@ -212,7 +212,7 @@ class Sdt:
logger.debug("sdt cmd: %s", cmd)
self.sock.sendall(cmd)
return True
except IOError:
except OSError:
logger.exception("SDT connection error")
self.sock = None
self.connected = False