diff --git a/daemon/core/location/event.py b/daemon/core/location/event.py index 7f8a33a1..9b300241 100644 --- a/daemon/core/location/event.py +++ b/daemon/core/location/event.py @@ -6,7 +6,7 @@ import heapq import threading import time from functools import total_ordering -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Optional class Timer(threading.Thread): @@ -19,8 +19,8 @@ class Timer(threading.Thread): self, interval: float, func: Callable[..., None], - args: Tuple[Any] = None, - kwargs: Dict[Any, Any] = None, + args: tuple[Any] = None, + kwargs: dict[Any, Any] = None, ) -> None: """ Create a Timer instance. @@ -38,11 +38,11 @@ class Timer(threading.Thread): # validate arguments were provided if args is None: args = () - self.args: Tuple[Any] = args + self.args: tuple[Any] = args # validate keyword arguments were provided if kwargs is None: kwargs = {} - self.kwargs: Dict[Any, Any] = kwargs + self.kwargs: dict[Any, Any] = kwargs def cancel(self) -> bool: """ @@ -96,8 +96,8 @@ class Event: self.eventnum: int = eventnum self.time: float = event_time self.func: Callable[..., None] = func - self.args: Tuple[Any] = args - self.kwds: Dict[Any, Any] = kwds + self.args: tuple[Any] = args + self.kwds: dict[Any, Any] = kwds self.canceled: bool = False def __lt__(self, other: "Event") -> bool: @@ -135,7 +135,7 @@ class EventLoop: Creates a EventLoop instance. """ self.lock: threading.RLock = threading.RLock() - self.queue: List[Event] = [] + self.queue: list[Event] = [] self.eventnum: int = 0 self.timer: Optional[Timer] = None self.running: bool = False diff --git a/daemon/core/location/geo.py b/daemon/core/location/geo.py index 5896e074..78308728 100644 --- a/daemon/core/location/geo.py +++ b/daemon/core/location/geo.py @@ -3,7 +3,6 @@ Provides conversions from x,y,z to lon,lat,alt. """ import logging -from typing import Tuple import pyproj from pyproj import Transformer @@ -35,9 +34,9 @@ class GeoLocation: self.to_geo: Transformer = pyproj.Transformer.from_crs( CRS_PROJ, CRS_WGS84, always_xy=True ) - self.refproj: Tuple[float, float, float] = (0.0, 0.0, 0.0) - self.refgeo: Tuple[float, float, float] = (0.0, 0.0, 0.0) - self.refxyz: Tuple[float, float, float] = (0.0, 0.0, 0.0) + self.refproj: tuple[float, float, float] = (0.0, 0.0, 0.0) + self.refgeo: tuple[float, float, float] = (0.0, 0.0, 0.0) + self.refxyz: tuple[float, float, float] = (0.0, 0.0, 0.0) self.refscale: float = 1.0 def setrefgeo(self, lat: float, lon: float, alt: float) -> None: @@ -84,7 +83,7 @@ class GeoLocation: return 0.0 return SCALE_FACTOR * (value / self.refscale) - def getxyz(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]: + def getxyz(self, lat: float, lon: float, alt: float) -> tuple[float, float, float]: """ Convert provided lon,lat,alt to x,y,z. @@ -104,7 +103,7 @@ class GeoLocation: logger.debug("result x,y,z(%s, %s, %s)", x, y, z) return x, y, z - def getgeo(self, x: float, y: float, z: float) -> Tuple[float, float, float]: + def getgeo(self, x: float, y: float, z: float) -> tuple[float, float, float]: """ Convert provided x,y,z to lon,lat,alt. diff --git a/daemon/core/location/mobility.py b/daemon/core/location/mobility.py index 28040650..ebac9bc5 100644 --- a/daemon/core/location/mobility.py +++ b/daemon/core/location/mobility.py @@ -9,7 +9,7 @@ import threading import time from functools import total_ordering from pathlib import Path -from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Callable, Optional, Union from core import utils from core.config import ( @@ -47,7 +47,7 @@ def get_mobility_node(session: "Session", node_id: int) -> Union[WlanNode, Emane return session.get_node(node_id, EmaneNet) -def get_config_int(current: int, config: Dict[str, str], name: str) -> Optional[int]: +def get_config_int(current: int, config: dict[str, str], name: str) -> Optional[int]: """ Convenience function to get config values as int. @@ -63,7 +63,7 @@ def get_config_int(current: int, config: Dict[str, str], name: str) -> Optional[ def get_config_float( - current: Union[int, float], config: Dict[str, str], name: str + current: Union[int, float], config: dict[str, str], name: str ) -> Optional[float]: """ Convenience function to get config values as float. @@ -112,7 +112,7 @@ class MobilityManager(ModelManager): """ self.config_reset() - def startup(self, node_ids: List[int] = None) -> None: + def startup(self, node_ids: list[int] = None) -> None: """ Session is transitioning from instantiation to runtime state. Instantiate any mobility models that have been configured for a WLAN. @@ -237,7 +237,7 @@ class WirelessModel(ConfigurableOptions): self.session: "Session" = session self.id: int = _id - def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]: + def links(self, flags: MessageFlags = MessageFlags.NONE) -> list[LinkData]: """ May be used if the model can populate the GUI with wireless (green) link lines. @@ -247,7 +247,7 @@ class WirelessModel(ConfigurableOptions): """ return [] - def update(self, moved_ifaces: List[CoreInterface]) -> None: + def update(self, moved_ifaces: list[CoreInterface]) -> None: """ Update this wireless model. @@ -256,7 +256,7 @@ class WirelessModel(ConfigurableOptions): """ raise NotImplementedError - def update_config(self, config: Dict[str, str]) -> None: + def update_config(self, config: dict[str, str]) -> None: """ For run-time updates of model config. Returns True when position callback and set link parameters should be invoked. @@ -275,7 +275,7 @@ class BasicRangeModel(WirelessModel): """ name: str = "basic_range" - options: List[Configuration] = [ + options: list[Configuration] = [ ConfigInt(id="range", default="275", label="wireless range (pixels)"), ConfigInt(id="bandwidth", default="54000000", label="bandwidth (bps)"), ConfigInt(id="jitter", default="0", label="transmission jitter (usec)"), @@ -298,7 +298,7 @@ class BasicRangeModel(WirelessModel): super().__init__(session, _id) self.session: "Session" = session self.wlan: WlanNode = session.get_node(_id, WlanNode) - self.iface_to_pos: Dict[CoreInterface, Tuple[float, float, float]] = {} + self.iface_to_pos: dict[CoreInterface, tuple[float, float, float]] = {} self.iface_lock: threading.Lock = threading.Lock() self.range: int = 0 self.bw: Optional[int] = None @@ -323,7 +323,7 @@ class BasicRangeModel(WirelessModel): iface.options.update(options) iface.set_config() - def get_position(self, iface: CoreInterface) -> Tuple[float, float, float]: + def get_position(self, iface: CoreInterface) -> tuple[float, float, float]: """ Retrieve network interface position. @@ -352,7 +352,7 @@ class BasicRangeModel(WirelessModel): position_callback = set_position - def update(self, moved_ifaces: List[CoreInterface]) -> None: + def update(self, moved_ifaces: list[CoreInterface]) -> None: """ Node positions have changed without recalc. Update positions from node.position, then re-calculate links for those that have moved. @@ -412,7 +412,7 @@ class BasicRangeModel(WirelessModel): @staticmethod def calcdistance( - p1: Tuple[float, float, float], p2: Tuple[float, float, float] + p1: tuple[float, float, float], p2: tuple[float, float, float] ) -> float: """ Calculate the distance between two three-dimensional points. @@ -428,7 +428,7 @@ class BasicRangeModel(WirelessModel): c = p1[2] - p2[2] return math.hypot(math.hypot(a, b), c) - def update_config(self, config: Dict[str, str]) -> None: + def update_config(self, config: dict[str, str]) -> None: """ Configuration has changed during runtime. @@ -487,7 +487,7 @@ class BasicRangeModel(WirelessModel): link_data = self.create_link_data(iface, iface2, message_type) self.session.broadcast_link(link_data) - def links(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]: + def links(self, flags: MessageFlags = MessageFlags.NONE) -> list[LinkData]: """ Return a list of wireless link messages for when the GUI reconnects. @@ -513,7 +513,7 @@ class WayPoint: self, _time: float, node_id: int, - coords: Tuple[float, float, Optional[float]], + coords: tuple[float, float, Optional[float]], speed: float, ) -> None: """ @@ -526,7 +526,7 @@ class WayPoint: """ self.time: float = _time self.node_id: int = node_id - self.coords: Tuple[float, float, Optional[float]] = coords + self.coords: tuple[float, float, Optional[float]] = coords self.speed: float = speed def __eq__(self, other: "WayPoint") -> bool: @@ -563,10 +563,10 @@ class WayPointMobility(WirelessModel): """ super().__init__(session=session, _id=_id) self.state: int = self.STATE_STOPPED - self.queue: List[WayPoint] = [] - self.queue_copy: List[WayPoint] = [] - self.points: Dict[int, WayPoint] = {} - self.initial: Dict[int, WayPoint] = {} + self.queue: list[WayPoint] = [] + self.queue_copy: list[WayPoint] = [] + self.points: dict[int, WayPoint] = {} + self.initial: dict[int, WayPoint] = {} self.lasttime: Optional[float] = None self.endtime: Optional[int] = None self.timezero: float = 0.0 @@ -855,7 +855,7 @@ class Ns2ScriptedMobility(WayPointMobility): """ name: str = "ns2script" - options: List[Configuration] = [ + options: list[Configuration] = [ ConfigString(id="file", label="mobility script file"), ConfigInt(id="refresh_ms", default="50", label="refresh time (ms)"), ConfigBool(id="loop", default="1", label="loop"), @@ -867,7 +867,7 @@ class Ns2ScriptedMobility(WayPointMobility): ] @classmethod - def config_groups(cls) -> List[ConfigGroup]: + def config_groups(cls) -> list[ConfigGroup]: return [ ConfigGroup("ns-2 Mobility Script Parameters", 1, len(cls.configurations())) ] @@ -882,12 +882,12 @@ class Ns2ScriptedMobility(WayPointMobility): super().__init__(session, _id) self.file: Optional[Path] = None self.autostart: Optional[str] = None - self.nodemap: Dict[int, int] = {} + self.nodemap: dict[int, int] = {} self.script_start: Optional[str] = None self.script_pause: Optional[str] = None self.script_stop: Optional[str] = None - def update_config(self, config: Dict[str, str]) -> None: + def update_config(self, config: dict[str, str]) -> None: self.file = Path(config["file"]) logger.info( "ns-2 scripted mobility configured for WLAN %d using file: %s", @@ -916,7 +916,7 @@ class Ns2ScriptedMobility(WayPointMobility): file_path = self.findfile(self.file) try: f = file_path.open("r") - except IOError: + except OSError: logger.exception( "ns-2 scripted mobility failed to load file: %s", self.file ) diff --git a/daemon/core/plugins/sdt.py b/daemon/core/plugins/sdt.py index 48a6cdf0..f963c817 100644 --- a/daemon/core/plugins/sdt.py +++ b/daemon/core/plugins/sdt.py @@ -5,7 +5,7 @@ sdt.py: Scripted Display Tool (SDT3D) helper import logging import socket from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type +from typing import TYPE_CHECKING, Optional from urllib.parse import urlparse from core.constants import CORE_CONF_DIR @@ -28,9 +28,9 @@ CORE_LAYER: str = "CORE" NODE_LAYER: str = "CORE::Nodes" LINK_LAYER: str = "CORE::Links" WIRED_LINK_LAYER: str = f"{LINK_LAYER}::wired" -CORE_LAYERS: List[str] = [CORE_LAYER, LINK_LAYER, NODE_LAYER, WIRED_LINK_LAYER] +CORE_LAYERS: list[str] = [CORE_LAYER, LINK_LAYER, NODE_LAYER, WIRED_LINK_LAYER] DEFAULT_LINK_COLOR: str = "red" -NODE_TYPES: Dict[Type[NodeBase], str] = { +NODE_TYPES: dict[type[NodeBase], str] = { HubNode: "hub", SwitchNode: "lanswitch", TunnelNode: "tunnel", @@ -63,7 +63,7 @@ class Sdt: # default altitude (in meters) for flyto view DEFAULT_ALT: int = 2500 # TODO: read in user"s nodes.conf here; below are default node types from the GUI - DEFAULT_SPRITES: Dict[str, str] = [ + DEFAULT_SPRITES: dict[str, str] = [ ("router", "router.png"), ("host", "host.png"), ("PC", "pc.png"), @@ -88,9 +88,9 @@ class Sdt: self.sock: Optional[socket.socket] = None self.connected: bool = False self.url: str = self.DEFAULT_SDT_URL - self.address: Optional[Tuple[Optional[str], Optional[int]]] = None + self.address: Optional[tuple[Optional[str], Optional[int]]] = None self.protocol: Optional[str] = None - self.network_layers: Set[str] = set() + self.network_layers: set[str] = set() self.session.node_handlers.append(self.handle_node_update) self.session.link_handlers.append(self.handle_link_update) @@ -138,7 +138,7 @@ class Sdt: else: # Default to tcp self.sock = socket.create_connection(self.address, 5) - except IOError: + except OSError: logger.exception("SDT socket connect error") return False @@ -176,7 +176,7 @@ class Sdt: if self.sock: try: self.sock.close() - except IOError: + except OSError: logger.error("error closing socket") finally: self.sock = None @@ -212,7 +212,7 @@ class Sdt: logger.debug("sdt cmd: %s", cmd) self.sock.sendall(cmd) return True - except IOError: + except OSError: logger.exception("SDT connection error") self.sock = None self.connected = False