added type hinting to location module funcs

This commit is contained in:
Blake Harnden 2020-01-13 22:15:44 -08:00
parent 03c4d8768d
commit 02156867e2
4 changed files with 119 additions and 94 deletions

View file

@ -9,6 +9,7 @@ import os
import threading
import time
from functools import total_ordering
from typing import TYPE_CHECKING, Dict, List, Tuple
from core import utils
from core.config import ConfigGroup, ConfigurableOptions, Configuration, ModelManager
@ -21,6 +22,11 @@ from core.emulator.enumerations import (
RegisterTlvs,
)
from core.errors import CoreError
from core.nodes.base import CoreNode, NodeBase
from core.nodes.interface import CoreInterface
if TYPE_CHECKING:
from core.emulator.session import Session
class MobilityManager(ModelManager):
@ -32,7 +38,7 @@ class MobilityManager(ModelManager):
name = "MobilityManager"
config_type = RegisterTlvs.WIRELESS.value
def __init__(self, session):
def __init__(self, session: "Session") -> None:
"""
Creates a MobilityManager instance.
@ -43,7 +49,7 @@ class MobilityManager(ModelManager):
self.models[BasicRangeModel.name] = BasicRangeModel
self.models[Ns2ScriptedMobility.name] = Ns2ScriptedMobility
def reset(self):
def reset(self) -> None:
"""
Clear out all current configurations.
@ -51,7 +57,7 @@ class MobilityManager(ModelManager):
"""
self.config_reset()
def startup(self, node_ids=None):
def startup(self, node_ids: List[int] = None) -> None:
"""
Session is transitioning from instantiation to runtime state.
Instantiate any mobility models that have been configured for a WLAN.
@ -86,7 +92,7 @@ class MobilityManager(ModelManager):
if node.mobility:
self.session.event_loop.add_event(0.0, node.mobility.startup)
def handleevent(self, event_data):
def handleevent(self, event_data: EventData) -> None:
"""
Handle an Event Message used to start, stop, or pause
mobility scripts for a given WlanNode.
@ -149,7 +155,7 @@ class MobilityManager(ModelManager):
if event_type == EventTypes.PAUSE.value:
model.pause()
def sendevent(self, model):
def sendevent(self, model: "WayPointMobility") -> None:
"""
Send an event message on behalf of a mobility model.
This communicates the current and end (max) times to the GUI.
@ -179,7 +185,9 @@ class MobilityManager(ModelManager):
self.session.broadcast_event(event_data)
def updatewlans(self, moved, moved_netifs):
def updatewlans(
self, moved: List[NodeBase], moved_netifs: List[CoreInterface]
) -> None:
"""
A mobility script has caused nodes in the 'moved' list to move.
Update every WlanNode. This saves range calculations if the model
@ -208,7 +216,7 @@ class WirelessModel(ConfigurableOptions):
bitmap = None
position_callback = None
def __init__(self, session, _id):
def __init__(self, session: "Session", _id: int):
"""
Create a WirelessModel instance.
@ -218,7 +226,7 @@ class WirelessModel(ConfigurableOptions):
self.session = session
self.id = _id
def all_link_data(self, flags):
def all_link_data(self, flags: int) -> List:
"""
May be used if the model can populate the GUI with wireless (green)
link lines.
@ -229,7 +237,7 @@ class WirelessModel(ConfigurableOptions):
"""
return []
def update(self, moved, moved_netifs):
def update(self, moved: bool, moved_netifs: List[CoreInterface]) -> None:
"""
Update this wireless model.
@ -239,10 +247,10 @@ class WirelessModel(ConfigurableOptions):
"""
raise NotImplementedError
def update_config(self, config):
def update_config(self, config: Dict[str, str]) -> None:
"""
For run-time updates of model config. Returns True when position callback and set link
parameters should be invoked.
For run-time updates of model config. Returns True when position callback and
set link parameters should be invoked.
:param dict config: configuration values to update
:return: nothing
@ -295,7 +303,7 @@ class BasicRangeModel(WirelessModel):
def config_groups(cls):
return [ConfigGroup("Basic Range Parameters", 1, len(cls.configurations()))]
def __init__(self, session, _id):
def __init__(self, session: "Session", _id: int) -> None:
"""
Create a BasicRangeModel instance.
@ -314,7 +322,7 @@ class BasicRangeModel(WirelessModel):
self.loss = None
self.jitter = None
def values_from_config(self, config):
def values_from_config(self, config: Dict[str, str]) -> None:
"""
Values to convert to link parameters.
@ -340,7 +348,7 @@ class BasicRangeModel(WirelessModel):
if self.jitter == 0:
self.jitter = None
def setlinkparams(self):
def setlinkparams(self) -> None:
"""
Apply link parameters to all interfaces. This is invoked from
WlanNode.setmodel() after the position callback has been set.
@ -356,7 +364,7 @@ class BasicRangeModel(WirelessModel):
jitter=self.jitter,
)
def get_position(self, netif):
def get_position(self, netif: CoreInterface) -> Tuple[float, float, float]:
"""
Retrieve network interface position.
@ -366,7 +374,9 @@ class BasicRangeModel(WirelessModel):
with self._netifslock:
return self._netifs[netif]
def set_position(self, netif, x=None, y=None, z=None):
def set_position(
self, netif: CoreInterface, x: float = None, y: float = None, z: float = None
) -> None:
"""
A node has moved; given an interface, a new (x,y,z) position has
been set; calculate the new distance between other nodes and link or
@ -389,7 +399,7 @@ class BasicRangeModel(WirelessModel):
position_callback = set_position
def update(self, moved, moved_netifs):
def update(self, moved: bool, moved_netifs: List[CoreInterface]) -> None:
"""
Node positions have changed without recalc. Update positions from
node.position, then re-calculate links for those that have moved.
@ -411,7 +421,7 @@ class BasicRangeModel(WirelessModel):
continue
self.calclink(netif, netif2)
def calclink(self, netif, netif2):
def calclink(self, netif: CoreInterface, netif2: CoreInterface) -> None:
"""
Helper used by set_position() and update() to
calculate distance between two interfaces and perform
@ -455,7 +465,9 @@ class BasicRangeModel(WirelessModel):
logging.exception("error getting interfaces during calclinkS")
@staticmethod
def calcdistance(p1, p2):
def calcdistance(
p1: Tuple[float, float, float], p2: Tuple[float, float, float]
) -> float:
"""
Calculate the distance between two three-dimensional points.
@ -471,7 +483,7 @@ class BasicRangeModel(WirelessModel):
c = p1[2] - p2[2]
return math.hypot(math.hypot(a, b), c)
def update_config(self, config):
def update_config(self, config: Dict[str, str]) -> None:
"""
Configuration has changed during runtime.
@ -482,12 +494,14 @@ class BasicRangeModel(WirelessModel):
self.setlinkparams()
return True
def create_link_data(self, interface1, interface2, message_type):
def create_link_data(
self, interface1: CoreInterface, interface2: CoreInterface, message_type: int
) -> LinkData:
"""
Create a wireless link/unlink data message.
:param core.coreobj.PyCoreNetIf interface1: interface one
:param core.coreobj.PyCoreNetIf interface2: interface two
:param core.nodes.interface.CoreInterface interface1: interface one
:param core.nodes.interface.CoreInterface interface2: interface two
:param message_type: link message type
:return: link data
:rtype: LinkData
@ -500,7 +514,9 @@ class BasicRangeModel(WirelessModel):
link_type=LinkTypes.WIRELESS.value,
)
def sendlinkmsg(self, netif, netif2, unlink=False):
def sendlinkmsg(
self, netif: CoreInterface, netif2: CoreInterface, unlink: bool = False
) -> None:
"""
Send a wireless link/unlink API message to the GUI.
@ -517,7 +533,7 @@ class BasicRangeModel(WirelessModel):
link_data = self.create_link_data(netif, netif2, message_type)
self.session.broadcast_link(link_data)
def all_link_data(self, flags):
def all_link_data(self, flags: int) -> List[LinkData]:
"""
Return a list of wireless link messages for when the GUI reconnects.
@ -540,7 +556,7 @@ class WayPoint:
Maintains information regarding waypoints.
"""
def __init__(self, time, nodenum, coords, speed):
def __init__(self, time: float, nodenum: int, coords, speed: float):
"""
Creates a WayPoint instance.
@ -554,13 +570,13 @@ class WayPoint:
self.coords = coords
self.speed = speed
def __eq__(self, other):
return (self.time, self.nodenum) == (other.time, other.nodedum)
def __eq__(self, other: "WayPoint") -> bool:
return (self.time, self.nodenum) == (other.time, other.nodenum)
def __ne__(self, other):
def __ne__(self, other: "WayPoint") -> bool:
return not self == other
def __lt__(self, other):
def __lt__(self, other: "WayPoint") -> bool:
result = self.time < other.time
if result:
result = self.nodenum < other.nodenum
@ -579,7 +595,7 @@ class WayPointMobility(WirelessModel):
STATE_RUNNING = 1
STATE_PAUSED = 2
def __init__(self, session, _id):
def __init__(self, session: "Session", _id: int) -> None:
"""
Create a WayPointMobility instance.
@ -603,7 +619,7 @@ class WayPointMobility(WirelessModel):
# (ns-3 sets this to False as new waypoints may be added from trace)
self.empty_queue_stop = True
def runround(self):
def runround(self) -> None:
"""
Advance script time and move nodes.
@ -657,7 +673,7 @@ class WayPointMobility(WirelessModel):
# TODO: check session state
self.session.event_loop.add_event(0.001 * self.refresh_ms, self.runround)
def run(self):
def run(self) -> None:
"""
Run the waypoint mobility scenario.
@ -670,7 +686,7 @@ class WayPointMobility(WirelessModel):
self.runround()
self.session.mobility.sendevent(self)
def movenode(self, node, dt):
def movenode(self, node: CoreNode, dt: float) -> bool:
"""
Calculate next node location and update its coordinates.
Returns True if the node's position has changed.
@ -723,7 +739,7 @@ class WayPointMobility(WirelessModel):
self.setnodeposition(node, x1 + dx, y1 + dy, z1)
return True
def movenodesinitial(self):
def movenodesinitial(self) -> None:
"""
Move nodes to their initial positions. Then calculate the ranges.
@ -741,11 +757,13 @@ class WayPointMobility(WirelessModel):
moved_netifs.append(netif)
self.session.mobility.updatewlans(moved, moved_netifs)
def addwaypoint(self, time, nodenum, x, y, z, speed):
def addwaypoint(
self, _time: float, nodenum: int, x: float, y: float, z: float, speed: float
) -> None:
"""
Waypoints are pushed to a heapq, sorted by time.
:param time: waypoint time
:param _time: waypoint time
:param int nodenum: node id
:param x: x position
:param y: y position
@ -753,10 +771,10 @@ class WayPointMobility(WirelessModel):
:param speed: speed
:return: nothing
"""
wp = WayPoint(time, nodenum, coords=(x, y, z), speed=speed)
wp = WayPoint(_time, nodenum, coords=(x, y, z), speed=speed)
heapq.heappush(self.queue, wp)
def addinitial(self, nodenum, x, y, z):
def addinitial(self, nodenum: int, x: float, y: float, z: float) -> None:
"""
Record initial position in a dict.
@ -769,11 +787,11 @@ class WayPointMobility(WirelessModel):
wp = WayPoint(0, nodenum, coords=(x, y, z), speed=0)
self.initial[nodenum] = wp
def updatepoints(self, now):
def updatepoints(self, now: float) -> None:
"""
Move items from self.queue to self.points when their time has come.
:param int now: current timestamp
:param float now: current timestamp
:return: nothing
"""
while len(self.queue):
@ -782,7 +800,7 @@ class WayPointMobility(WirelessModel):
wp = heapq.heappop(self.queue)
self.points[wp.nodenum] = wp
def copywaypoints(self):
def copywaypoints(self) -> None:
"""
Store backup copy of waypoints for looping and stopping.
@ -790,7 +808,7 @@ class WayPointMobility(WirelessModel):
"""
self.queue_copy = list(self.queue)
def loopwaypoints(self):
def loopwaypoints(self) -> None:
"""
Restore backup copy of waypoints when looping.
@ -799,13 +817,13 @@ class WayPointMobility(WirelessModel):
self.queue = list(self.queue_copy)
return self.loop
def setnodeposition(self, node, x, y, z):
def setnodeposition(self, node: CoreNode, x: float, y: float, z: float) -> None:
"""
Helper to move a node, notify any GUI (connected session handlers),
without invoking the interface poshook callback that may perform
range calculation.
:param core.netns.vnode.CoreNode node: node to set position for
:param core.nodes.base.CoreNode node: node to set position for
:param x: x position
:param y: y position
:param z: z position
@ -815,7 +833,7 @@ class WayPointMobility(WirelessModel):
node_data = node.data(message_type=0)
self.session.broadcast_node(node_data)
def setendtime(self):
def setendtime(self) -> None:
"""
Set self.endtime to the time of the last waypoint in the queue of
waypoints. This is just an estimate. The endtime will later be
@ -829,7 +847,7 @@ class WayPointMobility(WirelessModel):
except IndexError:
self.endtime = 0
def start(self):
def start(self) -> None:
"""
Run the script from the beginning or unpause from where it
was before.
@ -849,11 +867,12 @@ class WayPointMobility(WirelessModel):
self.lasttime = now - (0.001 * self.refresh_ms)
self.runround()
def stop(self, move_initial=True):
def stop(self, move_initial: bool = True) -> None:
"""
Stop the script and move nodes to initial positions.
:param bool move_initial: flag to check if we should move nodes to initial position
:param bool move_initial: flag to check if we should move nodes to initial
position
:return: nothing
"""
self.state = self.STATE_STOPPED
@ -864,7 +883,7 @@ class WayPointMobility(WirelessModel):
self.movenodesinitial()
self.session.mobility.sendevent(self)
def pause(self):
def pause(self) -> None:
"""
Pause the script; pause time is stored to self.lasttime.
@ -926,12 +945,12 @@ class Ns2ScriptedMobility(WayPointMobility):
]
@classmethod
def config_groups(cls):
def config_groups(cls) -> List[ConfigGroup]:
return [
ConfigGroup("ns-2 Mobility Script Parameters", 1, len(cls.configurations()))
]
def __init__(self, session, _id):
def __init__(self, session: "Session", _id: int):
"""
Creates a Ns2ScriptedMobility instance.
@ -951,7 +970,7 @@ class Ns2ScriptedMobility(WayPointMobility):
self.script_pause = None
self.script_stop = None
def update_config(self, config):
def update_config(self, config: Dict[str, str]) -> None:
self.file = config["file"]
logging.info(
"ns-2 scripted mobility configured for WLAN %d using file: %s",
@ -969,7 +988,7 @@ class Ns2ScriptedMobility(WayPointMobility):
self.copywaypoints()
self.setendtime()
def readscriptfile(self):
def readscriptfile(self) -> None:
"""
Read in mobility script from a file. This adds waypoints to a
priority queue, sorted by waypoint time. Initial waypoints are
@ -1012,7 +1031,6 @@ class Ns2ScriptedMobility(WayPointMobility):
# initial position (time=0, speed=0):
# $node_(6) set X_ 780.0
parts = line.split()
time = 0.0
nodenum = parts[0][1 + parts[0].index("(") : parts[0].index(")")]
if parts[2] == "X_":
if ix is not None and iy is not None:
@ -1036,7 +1054,7 @@ class Ns2ScriptedMobility(WayPointMobility):
if ix is not None and iy is not None:
self.addinitial(self.map(inodenum), ix, iy, iz)
def findfile(self, file_name):
def findfile(self, file_name: str) -> str:
"""
Locate a script file. If the specified file doesn't exist, look in the
same directory as the scenario file, or in the default
@ -1065,7 +1083,7 @@ class Ns2ScriptedMobility(WayPointMobility):
return file_name
def parsemap(self, mapstr):
def parsemap(self, mapstr: str) -> None:
"""
Parse a node mapping string, given as a configuration parameter.
@ -1085,18 +1103,18 @@ class Ns2ScriptedMobility(WayPointMobility):
except ValueError:
logging.exception("ns-2 mobility node map error")
def map(self, nodenum):
def map(self, nodenum: str) -> int:
"""
Map one node number (from a script file) to another.
:param str nodenum: node id to map
:param int nodenum: node id to map
:return: mapped value or the node id itself
:rtype: int
"""
nodenum = int(nodenum)
return self.nodemap.get(nodenum, nodenum)
def startup(self):
def startup(self) -> None:
"""
Start running the script if autostart is enabled.
Move node to initial positions when any autostart time is specified.
@ -1122,7 +1140,7 @@ class Ns2ScriptedMobility(WayPointMobility):
self.state = self.STATE_RUNNING
self.session.event_loop.add_event(t, self.run)
def start(self):
def start(self) -> None:
"""
Handle the case when un-paused.
@ -1134,7 +1152,7 @@ class Ns2ScriptedMobility(WayPointMobility):
if laststate == self.STATE_PAUSED:
self.statescript("unpause")
def run(self):
def run(self) -> None:
"""
Start is pressed or autostart is triggered.
@ -1143,7 +1161,7 @@ class Ns2ScriptedMobility(WayPointMobility):
super().run()
self.statescript("run")
def pause(self):
def pause(self) -> None:
"""
Pause the mobility script.
@ -1152,17 +1170,18 @@ class Ns2ScriptedMobility(WayPointMobility):
super().pause()
self.statescript("pause")
def stop(self, move_initial=True):
def stop(self, move_initial: bool = True) -> None:
"""
Stop the mobility script.
:param bool move_initial: flag to check if we should move node to initial position
:param bool move_initial: flag to check if we should move node to initial
position
:return: nothing
"""
super().stop(move_initial=move_initial)
self.statescript("stop")
def statescript(self, typestr):
def statescript(self, typestr: str) -> None:
"""
State of the mobility script.