daemon: updates to switch from using ebtables to nftables for wlan linking/unlinking

This commit is contained in:
Blake Harnden 2021-05-10 15:07:42 -07:00
parent 208c746b67
commit 30291a8438
14 changed files with 153 additions and 206 deletions

View file

@ -7,15 +7,15 @@ SYSCTL: str = "sysctl"
IP: str = "ip"
ETHTOOL: str = "ethtool"
TC: str = "tc"
EBTABLES: str = "ebtables"
MOUNT: str = "mount"
UMOUNT: str = "umount"
OVS_VSCTL: str = "ovs-vsctl"
TEST: str = "test"
NFTABLES: str = "nft"
COMMON_REQUIREMENTS: List[str] = [
BASH,
EBTABLES,
NFTABLES,
ETHTOOL,
IP,
MOUNT,

View file

@ -669,7 +669,7 @@ class CoreClient:
else:
services = self.session.default_services.get(model)
if services:
node.config_services = services.copy()
node.config_services = set(services)
logger.info(
"add node(%s) to session(%s), coordinates(%s, %s)",
node.name,

View file

@ -947,9 +947,9 @@ class CoreNetworkBase(NodeBase):
will run on, default is None for localhost
"""
super().__init__(session, _id, name, server)
self.brname = None
self._linked = {}
self._linked_lock = threading.Lock()
self.brname: Optional[str] = None
self._linked: Dict[CoreInterface, Dict[CoreInterface, bool]] = {}
self._linked_lock: threading.Lock = threading.Lock()
@abc.abstractmethod
def startup(self) -> None:

View file

@ -21,7 +21,7 @@ from core.emulator.enumerations import (
RegisterTlvs,
)
from core.errors import CoreCommandError, CoreError
from core.executables import EBTABLES, TC
from core.executables import NFTABLES, TC
from core.nodes.base import CoreNetworkBase
from core.nodes.interface import CoreInterface, GreTap, Veth
from core.nodes.netclient import get_net_client
@ -36,31 +36,31 @@ if TYPE_CHECKING:
WirelessModelType = Type[WirelessModel]
LEARNING_DISABLED: int = 0
ebtables_lock: threading.Lock = threading.Lock()
NFTABLES_LOCK: threading.Lock = threading.Lock()
class EbtablesQueue:
class NftablesQueue:
"""
Helper class for queuing up ebtables commands into rate-limited
Helper class for queuing up nftables commands into rate-limited
atomic commits. This improves performance and reliability when there are
many WLAN link updates.
"""
# update rate is every 300ms
rate: float = 0.3
# ebtables
atomic_file: str = "/tmp/pycore.ebtables.atomic"
atomic_file: str = "/tmp/pycore.nftables.atomic"
chain: str = "forward"
def __init__(self) -> None:
"""
Initialize the helper class, but don't start the update thread
until a WLAN is instantiated.
"""
self.doupdateloop: bool = False
self.updatethread: Optional[threading.Thread] = None
self.running: bool = False
self.run_thread: Optional[threading.Thread] = None
# this lock protects cmds and updates lists
self.updatelock: threading.Lock = threading.Lock()
# list of pending ebtables commands
self.lock: threading.Lock = threading.Lock()
# list of pending nftables commands
self.cmds: List[str] = []
# list of WLANs requiring update
self.updates: List["CoreNetwork"] = []
@ -68,192 +68,164 @@ class EbtablesQueue:
# using this queue
self.last_update_time: Dict["CoreNetwork", float] = {}
def startupdateloop(self, wlan: "CoreNetwork") -> None:
def start(self, net: "CoreNetwork") -> None:
"""
Kick off the update loop; only needs to be invoked once.
Start thread to listen for updates for the provided network.
:param net: network to start checking updates
:return: nothing
"""
with self.updatelock:
self.last_update_time[wlan] = time.monotonic()
if self.doupdateloop:
with self.lock:
self.last_update_time[net] = time.monotonic()
if self.running:
return
self.doupdateloop = True
self.updatethread = threading.Thread(target=self.updateloop, daemon=True)
self.updatethread.start()
self.running = True
self.run_thread = threading.Thread(target=self.run, daemon=True)
self.run_thread.start()
def stopupdateloop(self, wlan: "CoreNetwork") -> None:
def stop(self, net: "CoreNetwork") -> None:
"""
Kill the update loop thread if there are no more WLANs using it.
Stop updates for network, when no networks remain, stop update thread.
:param net: network to stop watching updates
:return: nothing
"""
with self.updatelock:
try:
del self.last_update_time[wlan]
except KeyError:
logger.exception(
"error deleting last update time for wlan, ignored before: %s", wlan
)
if len(self.last_update_time) > 0:
return
self.doupdateloop = False
if self.updatethread:
self.updatethread.join()
self.updatethread = None
with self.lock:
self.last_update_time.pop(net, None)
if self.last_update_time:
return
self.running = False
if self.run_thread:
self.run_thread.join()
self.run_thread = None
def ebatomiccmd(self, cmd: str) -> str:
def last_update(self, net: "CoreNetwork") -> float:
"""
Helper for building ebtables atomic file command list.
:param cmd: ebtable command
:return: ebtable atomic command
"""
return f"{EBTABLES} --atomic-file {self.atomic_file} {cmd}"
def lastupdate(self, wlan: "CoreNetwork") -> float:
"""
Return the time elapsed since this WLAN was last updated.
:param wlan: wlan entity
Return the time elapsed since this network was last updated.
:param net: network node
:return: elpased time
"""
try:
elapsed = time.monotonic() - self.last_update_time[wlan]
except KeyError:
self.last_update_time[wlan] = time.monotonic()
if net in self.last_update_time:
elapsed = time.monotonic() - self.last_update_time[net]
else:
self.last_update_time[net] = time.monotonic()
elapsed = 0.0
return elapsed
def updated(self, wlan: "CoreNetwork") -> None:
def updated(self, net: "CoreNetwork") -> None:
"""
Keep track of when this WLAN was last updated.
Keep track of when this network was last updated.
:param wlan: wlan entity
:param net: network node
:return: nothing
"""
self.last_update_time[wlan] = time.monotonic()
self.updates.remove(wlan)
self.last_update_time[net] = time.monotonic()
self.updates.remove(net)
def updateloop(self) -> None:
def run(self) -> None:
"""
Thread target that looks for WLANs needing update, and
rate limits the amount of ebtables activity. Only one userspace program
should use ebtables at any given time, or results can be unpredictable.
Thread target that looks for networks needing update, and
rate limits the amount of nftables activity. Only one userspace program
should use nftables at any given time, or results can be unpredictable.
:return: nothing
"""
while self.doupdateloop:
with self.updatelock:
for wlan in self.updates:
# Check if wlan is from a previously closed session. Because of the
# rate limiting scheme employed here, this may happen if a new session
# is started soon after closing a previous session.
# TODO: if these are WlanNodes, this will never throw an exception
try:
wlan.session
except Exception:
# Just mark as updated to remove from self.updates.
self.updated(wlan)
while self.running:
with self.lock:
for net in self.updates:
if not net.up:
self.updated(net)
continue
if self.lastupdate(wlan) > self.rate:
self.buildcmds(wlan)
self.ebcommit(wlan)
self.updated(wlan)
if self.last_update(net) > self.rate:
self.build_cmds(net)
self.commit(net)
self.updated(net)
time.sleep(self.rate)
def ebcommit(self, wlan: "CoreNetwork") -> None:
def commit(self, net: "CoreNetwork") -> None:
"""
Perform ebtables atomic commit using commands built in the self.cmds list.
Commit changes to nftables for the provided network.
:param net: network to commit nftables changes
:return: nothing
"""
# save kernel ebtables snapshot to a file
args = self.ebatomiccmd("--atomic-save")
wlan.host_cmd(args)
if not self.cmds:
return
# write out nft commands to file
for cmd in self.cmds:
net.host_cmd(f"echo {cmd} >> {self.atomic_file}", shell=True)
# read file as atomic change
net.host_cmd(f"{NFTABLES} -f {self.atomic_file}")
# remove file
net.host_cmd(f"rm -f {self.atomic_file}")
self.cmds.clear()
# modify the table file using queued ebtables commands
for c in self.cmds:
args = self.ebatomiccmd(c)
wlan.host_cmd(args)
self.cmds = []
# commit the table file to the kernel
args = self.ebatomiccmd("--atomic-commit")
wlan.host_cmd(args)
try:
wlan.host_cmd(f"rm -f {self.atomic_file}")
except CoreCommandError:
logger.exception("error removing atomic file: %s", self.atomic_file)
def ebchange(self, wlan: "CoreNetwork") -> None:
def update(self, net: "CoreNetwork") -> None:
"""
Flag a change to the given WLAN's _linked dict, so the ebtables
chain will be rebuilt at the next interval.
Flag this network has an update, so the nftables chain will be rebuilt.
:param net: wlan network
:return: nothing
"""
with self.updatelock:
if wlan not in self.updates:
self.updates.append(wlan)
with self.lock:
if net not in self.updates:
self.updates.append(net)
def buildcmds(self, wlan: "CoreNetwork") -> None:
def build_cmds(self, net: "CoreNetwork") -> None:
"""
Inspect a _linked dict from a wlan, and rebuild the ebtables chain for that WLAN.
Inspect linked nodes for a network, and rebuild the nftables chain commands.
:param net: network to build commands for
:return: nothing
"""
with wlan._linked_lock:
if wlan.has_ebtables_chain:
# flush the chain
self.cmds.append(f"-F {wlan.brname}")
with net._linked_lock:
if net.has_nftables_chain:
self.cmds.append(f"flush table bridge {net.brname}")
else:
wlan.has_ebtables_chain = True
self.cmds.extend(
[
f"-N {wlan.brname} -P {wlan.policy.value}",
f"-A FORWARD --logical-in {wlan.brname} -j {wlan.brname}",
]
net.has_nftables_chain = True
policy = net.policy.value.lower()
self.cmds.append(f"add table bridge {net.brname}")
self.cmds.append(
f"add chain bridge {net.brname} {self.chain} {{type filter hook "
f"forward priority 0\\; policy {policy}\\;}}"
)
# add default rule to accept all traffic not for this bridge
self.cmds.append(
f"add rule bridge {net.brname} {self.chain} "
f"ibriport != {net.brname} accept"
)
# rebuild the chain
for iface1, v in wlan._linked.items():
for oface2, linked in v.items():
if wlan.policy == NetworkPolicy.DROP and linked:
self.cmds.extend(
[
f"-A {wlan.brname} -i {iface1.localname} -o {oface2.localname} -j ACCEPT",
f"-A {wlan.brname} -o {iface1.localname} -i {oface2.localname} -j ACCEPT",
]
for iface1, v in net._linked.items():
for iface2, linked in v.items():
policy = None
if net.policy == NetworkPolicy.DROP and linked:
policy = "accept"
elif net.policy == NetworkPolicy.ACCEPT and not linked:
policy = "drop"
if policy:
self.cmds.append(
f"add rule bridge {net.brname} {self.chain} "
f"iif {iface1.localname} oif {iface2.localname} "
f"{policy}"
)
elif wlan.policy == NetworkPolicy.ACCEPT and not linked:
self.cmds.extend(
[
f"-A {wlan.brname} -i {iface1.localname} -o {oface2.localname} -j DROP",
f"-A {wlan.brname} -o {iface1.localname} -i {oface2.localname} -j DROP",
]
self.cmds.append(
f"add rule bridge {net.brname} {self.chain} "
f"oif {iface1.localname} iif {iface2.localname} "
f"{policy}"
)
# a global object because all WLANs share the same queue
# cannot have multiple threads invoking the ebtables commnd
ebq: EbtablesQueue = EbtablesQueue()
# a global object because all networks share the same queue
# cannot have multiple threads invoking the nftables commnd
nft_queue: NftablesQueue = NftablesQueue()
def ebtablescmds(call: Callable[..., str], cmds: List[str]) -> None:
def nftables_cmds(call: Callable[..., str], cmds: List[str]) -> None:
"""
Run ebtable commands.
Run nftable commands.
:param call: function to call commands
:param cmds: commands to call
:return: nothing
"""
with ebtables_lock:
for args in cmds:
call(args)
with NFTABLES_LOCK:
for cmd in cmds:
call(cmd)
class CoreNetwork(CoreNetworkBase):
@ -285,11 +257,11 @@ class CoreNetwork(CoreNetworkBase):
if name is None:
name = str(self.id)
if policy is not None:
self.policy = policy
self.policy: NetworkPolicy = policy
self.name: Optional[str] = name
sessionid = self.session.short_session_id()
self.brname: str = f"b.{self.id}.{sessionid}"
self.has_ebtables_chain: bool = False
self.has_nftables_chain: bool = False
def host_cmd(
self,
@ -324,9 +296,9 @@ class CoreNetwork(CoreNetworkBase):
:raises CoreCommandError: when there is a command exception
"""
self.net_client.create_bridge(self.brname)
self.has_ebtables_chain = False
self.has_nftables_chain = False
self.up = True
ebq.startupdateloop(self)
nft_queue.start(self)
def shutdown(self) -> None:
"""
@ -336,23 +308,19 @@ class CoreNetwork(CoreNetworkBase):
"""
if not self.up:
return
ebq.stopupdateloop(self)
nft_queue.stop(self)
try:
self.net_client.delete_bridge(self.brname)
if self.has_ebtables_chain:
cmds = [
f"{EBTABLES} -D FORWARD --logical-in {self.brname} -j {self.brname}",
f"{EBTABLES} -X {self.brname}",
]
ebtablescmds(self.host_cmd, cmds)
if self.has_nftables_chain:
cmds = [f"{NFTABLES} delete table bridge {self.brname}"]
nftables_cmds(self.host_cmd, cmds)
except CoreCommandError:
logger.exception("error during shutdown")
logging.exception("error during shutdown")
# removes veth pairs used for bridge-to-bridge connections
for iface in self.get_ifaces():
iface.shutdown()
self.ifaces.clear()
self._linked.clear()
del self.session
self.up = False
def attach(self, iface: CoreInterface) -> None:
@ -404,8 +372,7 @@ class CoreNetwork(CoreNetworkBase):
def unlink(self, iface1: CoreInterface, iface2: CoreInterface) -> None:
"""
Unlink two interfaces, resulting in adding or removing ebtables
filtering rules.
Unlink two interfaces, resulting in adding or removing filtering rules.
:param iface1: interface one
:param iface2: interface two
@ -415,13 +382,12 @@ class CoreNetwork(CoreNetworkBase):
if not self.linked(iface1, iface2):
return
self._linked[iface1][iface2] = False
ebq.ebchange(self)
nft_queue.update(self)
def link(self, iface1: CoreInterface, iface2: CoreInterface) -> None:
"""
Link two interfaces together, resulting in adding or removing
ebtables filtering rules.
filtering rules.
:param iface1: interface one
:param iface2: interface two
@ -431,8 +397,7 @@ class CoreNetwork(CoreNetworkBase):
if self.linked(iface1, iface2):
return
self._linked[iface1][iface2] = True
ebq.ebchange(self)
nft_queue.update(self)
def linkconfig(
self, iface: CoreInterface, options: LinkOptions, iface2: CoreInterface = None
@ -986,7 +951,7 @@ class WlanNode(CoreNetwork):
:return: nothing
"""
super().startup()
ebq.ebchange(self)
nft_queue.update(self)
def attach(self, iface: CoreInterface) -> None:
"""