daemon: update to nftables update tracker to use a queue for more reactive changes

This commit is contained in:
Blake Harnden 2021-05-10 23:31:11 -07:00
parent 5286938e44
commit 5e843a7674

View file

@ -6,8 +6,10 @@ import logging
import math import math
import threading import threading
import time import time
from collections import OrderedDict
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Type from queue import Queue
from typing import TYPE_CHECKING, Dict, List, Optional, Type
import netaddr import netaddr
@ -38,6 +40,22 @@ if TYPE_CHECKING:
LEARNING_DISABLED: int = 0 LEARNING_DISABLED: int = 0
class SetQueue(Queue):
"""
Set backed queue to avoid duplicate submissions.
"""
def _init(self, maxsize):
self.queue: OrderedDict = OrderedDict()
def _put(self, item):
self.queue[item] = None
def _get(self):
key, _ = self.queue.popitem(last=False)
return key
class NftablesQueue: class NftablesQueue:
""" """
Helper class for queuing up nftables commands into rate-limited Helper class for queuing up nftables commands into rate-limited
@ -62,7 +80,7 @@ class NftablesQueue:
# list of pending nftables commands # list of pending nftables commands
self.cmds: List[str] = [] self.cmds: List[str] = []
# list of WLANs requiring update # list of WLANs requiring update
self.updates: Set["CoreNetwork"] = set() self.updates: SetQueue = SetQueue()
# timestamps of last WLAN update; this keeps track of WLANs that are # timestamps of last WLAN update; this keeps track of WLANs that are
# using this queue # using this queue
self.last_update_time: Dict["CoreNetwork", float] = {} self.last_update_time: Dict["CoreNetwork", float] = {}
@ -93,6 +111,7 @@ class NftablesQueue:
return return
self.running = False self.running = False
if self.run_thread: if self.run_thread:
self.updates.put(None)
self.run_thread.join() self.run_thread.join()
self.run_thread = None self.run_thread = None
@ -115,20 +134,16 @@ class NftablesQueue:
:return: nothing :return: nothing
""" """
while self.running: while self.running:
with self.lock: net = self.updates.get()
discard = set() if net is None:
for net in self.updates: break
if not net.up: if not net.up:
self.last_update_time[net] = time.monotonic() self.last_update_time[net] = time.monotonic()
discard.add(net) elif self.last_update(net) > self.rate:
continue with self.lock:
if self.last_update(net) > self.rate: self.build_cmds(net)
self.build_cmds(net) self.commit(net)
self.commit(net) self.last_update_time[net] = time.monotonic()
self.last_update_time[net] = time.monotonic()
discard.add(net)
self.updates -= discard
time.sleep(self.rate)
def commit(self, net: "CoreNetwork") -> None: def commit(self, net: "CoreNetwork") -> None:
""" """
@ -153,15 +168,13 @@ class NftablesQueue:
:param net: wlan network :param net: wlan network
:return: nothing :return: nothing
""" """
with self.lock: self.updates.put(net)
self.updates.add(net)
def delete_table(self, net: "CoreNetwork") -> None: def delete_table(self, net: "CoreNetwork") -> None:
""" """
Delete nftable bridge rule table. Delete nftable bridge rule table.
:param net: network to delete table for :param net: network to delete table for
:param name: name of bridge table to delete
:return: nothing :return: nothing
""" """
with self.lock: with self.lock: