From aa8ea40ce63aa94a005605d783f16ce825f5f440 Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Thu, 28 Apr 2022 16:12:31 -0700 Subject: [PATCH] daemon: moved SetQeueue into utils to be leveraged by others, updated MoveNodesStreamer to leverage SetQueue, this will allow a means to stream node movements, but if position changes happen faster than processing, the latest position will override prior pushes and the latest position will be pulled off the queue --- daemon/core/api/grpc/client.py | 9 +++++---- daemon/core/api/grpc/wrappers.py | 12 ++++++++++++ daemon/core/nodes/network.py | 20 +------------------- daemon/core/utils.py | 18 ++++++++++++++++++ 4 files changed, 36 insertions(+), 23 deletions(-) diff --git a/daemon/core/api/grpc/client.py b/daemon/core/api/grpc/client.py index 74f47c57..9da2cb14 100644 --- a/daemon/core/api/grpc/client.py +++ b/daemon/core/api/grpc/client.py @@ -54,15 +54,16 @@ from core.api.grpc.wlan_pb2 import ( from core.api.grpc.wrappers import LinkOptions from core.emulator.data import IpPrefixes from core.errors import CoreError +from core.utils import SetQueue logger = logging.getLogger(__name__) class MoveNodesStreamer: - def __init__(self, session_id: int = None, source: str = None) -> None: - self.session_id = session_id - self.source = source - self.queue: Queue = Queue() + def __init__(self, session_id: int, source: str = None) -> None: + self.session_id: int = session_id + self.source: Optional[str] = source + self.queue: SetQueue = SetQueue() def send_position(self, node_id: int, x: float, y: float) -> None: position = wrappers.Position(x=x, y=y) diff --git a/daemon/core/api/grpc/wrappers.py b/daemon/core/api/grpc/wrappers.py index f3bb344c..0ab9cdd5 100644 --- a/daemon/core/api/grpc/wrappers.py +++ b/daemon/core/api/grpc/wrappers.py @@ -1213,3 +1213,15 @@ class MoveNodesRequest: position=position, geo=geo, ) + + def __members(self) -> Tuple[int, int]: + return self.session_id, self.node_id + + def __eq__(self, other: "MoveNodesRequest") -> bool: + if type(other) is type(self): + return self.__members() == other.__members() + else: + return False + + def __hash__(self): + return hash(self.__members()) diff --git a/daemon/core/nodes/network.py b/daemon/core/nodes/network.py index a3fe1e87..ccac56c4 100644 --- a/daemon/core/nodes/network.py +++ b/daemon/core/nodes/network.py @@ -4,9 +4,7 @@ Defines network nodes used within core. import logging import threading -from collections import OrderedDict from pathlib import Path -from queue import Queue from typing import TYPE_CHECKING, Dict, List, Optional, Type import netaddr @@ -32,22 +30,6 @@ if TYPE_CHECKING: LEARNING_DISABLED: int = 0 -class SetQueue(Queue): - """ - Set backed queue to avoid duplicate submissions. - """ - - def _init(self, maxsize): - self.queue: OrderedDict = OrderedDict() - - def _put(self, item): - self.queue[item] = None - - def _get(self): - key, _ = self.queue.popitem(last=False) - return key - - class NftablesQueue: """ Helper class for queuing up nftables commands into rate-limited @@ -72,7 +54,7 @@ class NftablesQueue: # list of pending nftables commands self.cmds: List[str] = [] # list of WLANs requiring update - self.updates: SetQueue = SetQueue() + self.updates: utils.SetQueue = utils.SetQueue() def start(self) -> None: """ diff --git a/daemon/core/utils.py b/daemon/core/utils.py index c9604f08..d2308f30 100644 --- a/daemon/core/utils.py +++ b/daemon/core/utils.py @@ -16,7 +16,9 @@ import shlex import shutil import sys import threading +from collections import OrderedDict from pathlib import Path +from queue import Queue from subprocess import PIPE, STDOUT, Popen from typing import ( TYPE_CHECKING, @@ -474,3 +476,19 @@ def parse_iface_config_id(config_id: int) -> Tuple[int, Optional[int]]: iface_id = config_id % IFACE_CONFIG_FACTOR node_id = config_id // IFACE_CONFIG_FACTOR return node_id, iface_id + + +class SetQueue(Queue): + """ + Set backed queue to avoid duplicate submissions. + """ + + def _init(self, maxsize): + self.queue: OrderedDict = OrderedDict() + + def _put(self, item): + self.queue[item] = None + + def _get(self): + key, _ = self.queue.popitem(last=False) + return key