daemon: moved SetQeueue into utils to be leveraged by others, updated MoveNodesStreamer to leverage SetQueue, this will allow a means to stream node movements, but if position changes happen faster than processing, the latest position will override prior pushes and the latest position will be pulled off the queue

This commit is contained in:
Blake Harnden 2022-04-28 16:12:31 -07:00
parent fe0bc2b405
commit aa8ea40ce6
4 changed files with 36 additions and 23 deletions

View file

@ -54,15 +54,16 @@ from core.api.grpc.wlan_pb2 import (
from core.api.grpc.wrappers import LinkOptions
from core.emulator.data import IpPrefixes
from core.errors import CoreError
from core.utils import SetQueue
logger = logging.getLogger(__name__)
class MoveNodesStreamer:
def __init__(self, session_id: int = None, source: str = None) -> None:
self.session_id = session_id
self.source = source
self.queue: Queue = Queue()
def __init__(self, session_id: int, source: str = None) -> None:
self.session_id: int = session_id
self.source: Optional[str] = source
self.queue: SetQueue = SetQueue()
def send_position(self, node_id: int, x: float, y: float) -> None:
position = wrappers.Position(x=x, y=y)

View file

@ -1213,3 +1213,15 @@ class MoveNodesRequest:
position=position,
geo=geo,
)
def __members(self) -> Tuple[int, int]:
return self.session_id, self.node_id
def __eq__(self, other: "MoveNodesRequest") -> bool:
if type(other) is type(self):
return self.__members() == other.__members()
else:
return False
def __hash__(self):
return hash(self.__members())

View file

@ -4,9 +4,7 @@ Defines network nodes used within core.
import logging
import threading
from collections import OrderedDict
from pathlib import Path
from queue import Queue
from typing import TYPE_CHECKING, Dict, List, Optional, Type
import netaddr
@ -32,22 +30,6 @@ if TYPE_CHECKING:
LEARNING_DISABLED: int = 0
class SetQueue(Queue):
"""
Set backed queue to avoid duplicate submissions.
"""
def _init(self, maxsize):
self.queue: OrderedDict = OrderedDict()
def _put(self, item):
self.queue[item] = None
def _get(self):
key, _ = self.queue.popitem(last=False)
return key
class NftablesQueue:
"""
Helper class for queuing up nftables commands into rate-limited
@ -72,7 +54,7 @@ class NftablesQueue:
# list of pending nftables commands
self.cmds: List[str] = []
# list of WLANs requiring update
self.updates: SetQueue = SetQueue()
self.updates: utils.SetQueue = utils.SetQueue()
def start(self) -> None:
"""

View file

@ -16,7 +16,9 @@ import shlex
import shutil
import sys
import threading
from collections import OrderedDict
from pathlib import Path
from queue import Queue
from subprocess import PIPE, STDOUT, Popen
from typing import (
TYPE_CHECKING,
@ -474,3 +476,19 @@ def parse_iface_config_id(config_id: int) -> Tuple[int, Optional[int]]:
iface_id = config_id % IFACE_CONFIG_FACTOR
node_id = config_id // IFACE_CONFIG_FACTOR
return node_id, iface_id
class SetQueue(Queue):
"""
Set backed queue to avoid duplicate submissions.
"""
def _init(self, maxsize):
self.queue: OrderedDict = OrderedDict()
def _put(self, item):
self.queue[item] = None
def _get(self):
key, _ = self.queue.popitem(last=False)
return key