daemon: added core.location class variable type hinting

This commit is contained in:
Blake Harnden 2020-06-10 10:24:44 -07:00
parent fd341bd69b
commit 784c4d2419
5 changed files with 137 additions and 130 deletions

View file

@ -9,7 +9,7 @@ import os
import threading
import time
from functools import total_ordering
from typing import TYPE_CHECKING, Dict, List, Tuple
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple
from core import utils
from core.config import ConfigGroup, ConfigurableOptions, Configuration, ModelManager
@ -23,7 +23,7 @@ from core.emulator.enumerations import (
RegisterTlvs,
)
from core.errors import CoreError
from core.nodes.base import CoreNode, NodeBase
from core.nodes.base import CoreNode
from core.nodes.interface import CoreInterface
from core.nodes.network import WlanNode
@ -47,7 +47,7 @@ class MobilityManager(ModelManager):
:param session: session this manager is tied to
"""
super().__init__()
self.session = session
self.session: "Session" = session
self.models[BasicRangeModel.name] = BasicRangeModel
self.models[Ns2ScriptedMobility.name] = Ns2ScriptedMobility
@ -178,7 +178,7 @@ class MobilityManager(ModelManager):
self.session.broadcast_event(event_data)
def updatewlans(
self, moved: List[NodeBase], moved_netifs: List[CoreInterface]
self, moved: List[CoreNode], moved_netifs: List[CoreInterface]
) -> None:
"""
A mobility script has caused nodes in the 'moved' list to move.
@ -204,21 +204,21 @@ class WirelessModel(ConfigurableOptions):
Used for managing arbitrary configuration parameters.
"""
config_type = RegisterTlvs.WIRELESS
bitmap = None
position_callback = None
config_type: RegisterTlvs = RegisterTlvs.WIRELESS
bitmap: str = None
position_callback: Callable[[CoreInterface], None] = None
def __init__(self, session: "Session", _id: int):
def __init__(self, session: "Session", _id: int) -> None:
"""
Create a WirelessModel instance.
:param session: core session we are tied to
:param _id: object id
"""
self.session = session
self.id = _id
self.session: "Session" = session
self.id: int = _id
def all_link_data(self, flags: MessageFlags = MessageFlags.NONE) -> List:
def all_link_data(self, flags: MessageFlags = MessageFlags.NONE) -> List[LinkData]:
"""
May be used if the model can populate the GUI with wireless (green)
link lines.
@ -228,11 +228,11 @@ class WirelessModel(ConfigurableOptions):
"""
return []
def update(self, moved: bool, moved_netifs: List[CoreInterface]) -> None:
def update(self, moved: List[CoreNode], moved_netifs: List[CoreInterface]) -> None:
"""
Update this wireless model.
:param moved: flag is it was moved
:param moved: moved nodes
:param moved_netifs: moved network interfaces
:return: nothing
"""
@ -256,8 +256,8 @@ class BasicRangeModel(WirelessModel):
the GUI.
"""
name = "basic_range"
options = [
name: str = "basic_range"
options: List[Configuration] = [
Configuration(
_id="range",
_type=ConfigDataTypes.UINT32,
@ -299,15 +299,15 @@ class BasicRangeModel(WirelessModel):
:param _id: object id
"""
super().__init__(session, _id)
self.session = session
self.wlan = session.get_node(_id, WlanNode)
self._netifs = {}
self._netifslock = threading.Lock()
self.range = 0
self.bw = None
self.delay = None
self.loss = None
self.jitter = None
self.session: "Session" = session
self.wlan: WlanNode = session.get_node(_id, WlanNode)
self._netifs: Dict[CoreInterface, Tuple[float, float, float]] = {}
self._netifslock: threading.Lock = threading.Lock()
self.range: int = 0
self.bw: Optional[int] = None
self.delay: Optional[int] = None
self.loss: Optional[float] = None
self.jitter: Optional[int] = None
def _get_config(self, current_value: int, config: Dict[str, str], name: str) -> int:
"""
@ -374,14 +374,14 @@ class BasicRangeModel(WirelessModel):
position_callback = set_position
def update(self, moved: bool, moved_netifs: List[CoreInterface]) -> None:
def update(self, moved: List[CoreNode], moved_netifs: List[CoreInterface]) -> None:
"""
Node positions have changed without recalc. Update positions from
node.position, then re-calculate links for those that have moved.
Assumes bidirectional links, with one calculation per node pair, where
one of the nodes has moved.
:param moved: flag is it was moved
:param moved: moved nodes
:param moved_netifs: moved network interfaces
:return: nothing
"""
@ -535,29 +535,35 @@ class WayPoint:
Maintains information regarding waypoints.
"""
def __init__(self, time: float, nodenum: int, coords, speed: float):
def __init__(
self,
_time: float,
node_id: int,
coords: Tuple[float, float, float],
speed: float,
) -> None:
"""
Creates a WayPoint instance.
:param time: waypoint time
:param nodenum: node id
:param _time: waypoint time
:param node_id: node id
:param coords: waypoint coordinates
:param speed: waypoint speed
"""
self.time = time
self.nodenum = nodenum
self.coords = coords
self.speed = speed
self.time: float = _time
self.node_id: int = node_id
self.coords: Tuple[float, float, float] = coords
self.speed: float = speed
def __eq__(self, other: "WayPoint") -> bool:
return (self.time, self.nodenum) == (other.time, other.nodenum)
return (self.time, self.node_id) == (other.time, other.node_id)
def __ne__(self, other: "WayPoint") -> bool:
return not self == other
def __lt__(self, other: "WayPoint") -> bool:
if self.time == other.time:
return self.nodenum < other.nodenum
return self.node_id < other.node_id
else:
return self.time < other.time
@ -567,12 +573,11 @@ class WayPointMobility(WirelessModel):
Abstract class for mobility models that set node waypoints.
"""
name = "waypoint"
config_type = RegisterTlvs.MOBILITY
STATE_STOPPED = 0
STATE_RUNNING = 1
STATE_PAUSED = 2
name: str = "waypoint"
config_type: RegisterTlvs = RegisterTlvs.MOBILITY
STATE_STOPPED: int = 0
STATE_RUNNING: int = 1
STATE_PAUSED: int = 2
def __init__(self, session: "Session", _id: int) -> None:
"""
@ -583,20 +588,21 @@ class WayPointMobility(WirelessModel):
:return:
"""
super().__init__(session=session, _id=_id)
self.state = self.STATE_STOPPED
self.queue = []
self.queue_copy = []
self.points = {}
self.initial = {}
self.lasttime = None
self.endtime = None
self.wlan = session.get_node(_id, WlanNode)
self.state: int = self.STATE_STOPPED
self.queue: List[WayPoint] = []
self.queue_copy: List[WayPoint] = []
self.points: Dict[int, WayPoint] = {}
self.initial: Dict[int, WayPoint] = {}
self.lasttime: Optional[float] = None
self.endtime: Optional[int] = None
self.timezero: float = 0.0
self.wlan: WlanNode = session.get_node(_id, WlanNode)
# these are really set in child class via confmatrix
self.loop = False
self.refresh_ms = 50
self.loop: bool = False
self.refresh_ms: int = 50
# flag whether to stop scheduling when queue is empty
# (ns-3 sets this to False as new waypoints may be added from trace)
self.empty_queue_stop = True
self.empty_queue_stop: bool = True
def runround(self) -> None:
"""
@ -684,16 +690,11 @@ class WayPointMobility(WirelessModel):
self.setnodeposition(node, x2, y2, z2)
del self.points[node.id]
return True
# speed can be a velocity vector or speed value
if isinstance(speed, (float, int)):
# linear speed value
alpha = math.atan2(y2 - y1, x2 - x1)
sx = speed * math.cos(alpha)
sy = speed * math.sin(alpha)
else:
# velocity vector
sx = speed[0]
sy = speed[1]
# linear speed value
alpha = math.atan2(y2 - y1, x2 - x1)
sx = speed * math.cos(alpha)
sy = speed * math.sin(alpha)
# calculate dt * speed = distance moved
dx = sx * dt
@ -776,7 +777,7 @@ class WayPointMobility(WirelessModel):
if self.queue[0].time > now:
break
wp = heapq.heappop(self.queue)
self.points[wp.nodenum] = wp
self.points[wp.node_id] = wp
def copywaypoints(self) -> None:
"""
@ -876,8 +877,8 @@ class Ns2ScriptedMobility(WayPointMobility):
BonnMotion.
"""
name = "ns2script"
options = [
name: str = "ns2script"
options: List[Configuration] = [
Configuration(
_id="file", _type=ConfigDataTypes.STRING, label="mobility script file"
),
@ -923,7 +924,7 @@ class Ns2ScriptedMobility(WayPointMobility):
ConfigGroup("ns-2 Mobility Script Parameters", 1, len(cls.configurations()))
]
def __init__(self, session: "Session", _id: int):
def __init__(self, session: "Session", _id: int) -> None:
"""
Creates a Ns2ScriptedMobility instance.
@ -931,17 +932,14 @@ class Ns2ScriptedMobility(WayPointMobility):
:param _id: object id
"""
super().__init__(session, _id)
self._netifs = {}
self._netifslock = threading.Lock()
self.file = None
self.refresh_ms = None
self.loop = None
self.autostart = None
self.nodemap = {}
self.script_start = None
self.script_pause = None
self.script_stop = None
self.file: Optional[str] = None
self.refresh_ms: Optional[int] = None
self.loop: Optional[bool] = None
self.autostart: Optional[str] = None
self.nodemap: Dict[int, int] = {}
self.script_start: Optional[str] = None
self.script_pause: Optional[str] = None
self.script_stop: Optional[str] = None
def update_config(self, config: Dict[str, str]) -> None:
self.file = config["file"]