diff --git a/daemon/core/nodes/network.py b/daemon/core/nodes/network.py index 4b8b6e90..f9511924 100644 --- a/daemon/core/nodes/network.py +++ b/daemon/core/nodes/network.py @@ -6,8 +6,10 @@ import logging import math import threading import time +from collections import OrderedDict from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Type +from queue import Queue +from typing import TYPE_CHECKING, Dict, List, Optional, Type import netaddr @@ -38,6 +40,22 @@ if TYPE_CHECKING: LEARNING_DISABLED: int = 0 +class SetQueue(Queue): + """ + Set backed queue to avoid duplicate submissions. + """ + + def _init(self, maxsize): + self.queue: OrderedDict = OrderedDict() + + def _put(self, item): + self.queue[item] = None + + def _get(self): + key, _ = self.queue.popitem(last=False) + return key + + class NftablesQueue: """ Helper class for queuing up nftables commands into rate-limited @@ -62,7 +80,7 @@ class NftablesQueue: # list of pending nftables commands self.cmds: List[str] = [] # list of WLANs requiring update - self.updates: Set["CoreNetwork"] = set() + self.updates: SetQueue = SetQueue() # timestamps of last WLAN update; this keeps track of WLANs that are # using this queue self.last_update_time: Dict["CoreNetwork", float] = {} @@ -93,6 +111,7 @@ class NftablesQueue: return self.running = False if self.run_thread: + self.updates.put(None) self.run_thread.join() self.run_thread = None @@ -115,20 +134,16 @@ class NftablesQueue: :return: nothing """ while self.running: - with self.lock: - discard = set() - for net in self.updates: - if not net.up: - self.last_update_time[net] = time.monotonic() - discard.add(net) - continue - if self.last_update(net) > self.rate: - self.build_cmds(net) - self.commit(net) - self.last_update_time[net] = time.monotonic() - discard.add(net) - self.updates -= discard - time.sleep(self.rate) + net = self.updates.get() + if net is None: + break + if not net.up: + self.last_update_time[net] = time.monotonic() + elif self.last_update(net) > self.rate: + with self.lock: + self.build_cmds(net) + self.commit(net) + self.last_update_time[net] = time.monotonic() def commit(self, net: "CoreNetwork") -> None: """ @@ -153,15 +168,13 @@ class NftablesQueue: :param net: wlan network :return: nothing """ - with self.lock: - self.updates.add(net) + self.updates.put(net) def delete_table(self, net: "CoreNetwork") -> None: """ Delete nftable bridge rule table. :param net: network to delete table for - :param name: name of bridge table to delete :return: nothing """ with self.lock: