core-extra/daemon/core/gui/interface.py

270 lines
9.4 KiB
Python

import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
import netaddr
from netaddr import EUI, IPNetwork
from core.api.grpc.wrappers import Interface, Link, LinkType, Node
from core.gui import nodeutils as nutils
from core.gui.graph.edges import CanvasEdge
from core.gui.graph.node import CanvasNode
if TYPE_CHECKING:
from core.gui.app import Application
IP4_MASK: int = 24
IP6_MASK: int = 64
WIRELESS_IP4_MASK: int = 32
WIRELESS_IP6_MASK: int = 128
def get_index(iface: Interface) -> Optional[int]:
if not iface.ip4:
return None
net = netaddr.IPNetwork(f"{iface.ip4}/{iface.ip4_mask}")
ip_value = net.value
cidr_value = net.cidr.value
return ip_value - cidr_value
class Subnets:
def __init__(self, ip4: IPNetwork, ip6: IPNetwork) -> None:
self.ip4 = ip4
self.ip6 = ip6
self.used_indexes = set()
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Subnets):
return False
return self.key() == other.key()
def __hash__(self) -> int:
return hash(self.key())
def key(self) -> Tuple[IPNetwork, IPNetwork]:
return self.ip4, self.ip6
def next(self) -> "Subnets":
return Subnets(self.ip4.next(), self.ip6.next())
class InterfaceManager:
def __init__(self, app: "Application") -> None:
self.app: "Application" = app
ip4 = self.app.guiconfig.ips.ip4
ip6 = self.app.guiconfig.ips.ip6
self.ip4_subnets: IPNetwork = IPNetwork(f"{ip4}/{IP4_MASK}")
self.ip6_subnets: IPNetwork = IPNetwork(f"{ip6}/{IP6_MASK}")
mac = self.app.guiconfig.mac
self.mac: EUI = EUI(mac, dialect=netaddr.mac_unix_expanded)
self.current_mac: Optional[EUI] = None
self.current_subnets: Optional[Subnets] = None
self.used_subnets: Dict[Tuple[IPNetwork, IPNetwork], Subnets] = {}
self.used_macs: Set[str] = set()
def update_ips(self, ip4: str, ip6: str) -> None:
self.reset()
self.ip4_subnets = IPNetwork(f"{ip4}/{IP4_MASK}")
self.ip6_subnets = IPNetwork(f"{ip6}/{IP6_MASK}")
def next_mac(self) -> str:
while str(self.current_mac) in self.used_macs:
value = self.current_mac.value + 1
self.current_mac = EUI(value, dialect=netaddr.mac_unix_expanded)
mac = str(self.current_mac)
value = self.current_mac.value + 1
self.current_mac = EUI(value, dialect=netaddr.mac_unix_expanded)
return mac
def next_subnets(self) -> Subnets:
subnets = self.current_subnets
if subnets is None:
subnets = Subnets(self.ip4_subnets, self.ip6_subnets)
while subnets.key() in self.used_subnets:
subnets = subnets.next()
self.used_subnets[subnets.key()] = subnets
return subnets
def reset(self) -> None:
self.current_subnets = None
self.used_subnets.clear()
def removed(self, links: List[Link]) -> None:
# get remaining subnets
remaining_subnets = set()
for edge in self.app.core.links.values():
link = edge.link
if link.iface1:
subnets = self.get_subnets(link.iface1)
remaining_subnets.add(subnets)
if link.iface2:
subnets = self.get_subnets(link.iface2)
remaining_subnets.add(subnets)
# remove all subnets from used subnets when no longer present
# or remove used indexes from subnet
ifaces = []
for link in links:
if link.iface1:
ifaces.append(link.iface1)
if link.iface2:
ifaces.append(link.iface2)
for iface in ifaces:
subnets = self.get_subnets(iface)
if subnets not in remaining_subnets:
self.used_subnets.pop(subnets.key(), None)
else:
index = get_index(iface)
if index is not None:
subnets.used_indexes.discard(index)
self.current_subnets = None
def set_macs(self, links: List[Link]) -> None:
self.current_mac = self.mac
self.used_macs.clear()
for link in links:
if link.iface1:
self.used_macs.add(link.iface1.mac)
if link.iface2:
self.used_macs.add(link.iface2.mac)
def joined(self, links: List[Link]) -> None:
ifaces = []
for link in links:
if link.iface1:
ifaces.append(link.iface1)
if link.iface2:
ifaces.append(link.iface2)
# add to used subnets and mark used indexes
for iface in ifaces:
subnets = self.get_subnets(iface)
index = get_index(iface)
if index is None:
continue
subnets.used_indexes.add(index)
if subnets.key() not in self.used_subnets:
self.used_subnets[subnets.key()] = subnets
def next_index(self, node: Node) -> int:
if nutils.is_router(node):
index = 1
else:
index = 20
while True:
if index not in self.current_subnets.used_indexes:
self.current_subnets.used_indexes.add(index)
break
index += 1
return index
def get_ips(self, node: Node) -> [str, str]:
index = self.next_index(node)
ip4 = self.current_subnets.ip4[index]
ip6 = self.current_subnets.ip6[index]
return str(ip4), str(ip6)
def get_subnets(self, iface: Interface) -> Subnets:
ip4_subnet = self.ip4_subnets
if iface.ip4:
ip4_subnet = IPNetwork(f"{iface.ip4}/{IP4_MASK}").cidr
ip6_subnet = self.ip6_subnets
if iface.ip6:
ip6_subnet = IPNetwork(f"{iface.ip6}/{IP6_MASK}").cidr
subnets = Subnets(ip4_subnet, ip6_subnet)
return self.used_subnets.get(subnets.key(), subnets)
def determine_subnets(
self, canvas_src_node: CanvasNode, canvas_dst_node: CanvasNode
) -> None:
src_node = canvas_src_node.core_node
dst_node = canvas_dst_node.core_node
is_src_container = nutils.is_container(src_node)
is_dst_container = nutils.is_container(dst_node)
if is_src_container and is_dst_container:
self.current_subnets = self.next_subnets()
elif is_src_container and not is_dst_container:
subnets = self.find_subnets(canvas_dst_node, visited={src_node.id})
if subnets:
self.current_subnets = subnets
else:
self.current_subnets = self.next_subnets()
elif not is_src_container and is_dst_container:
subnets = self.find_subnets(canvas_src_node, visited={dst_node.id})
if subnets:
self.current_subnets = subnets
else:
self.current_subnets = self.next_subnets()
else:
logging.info("ignoring subnet change for link between network nodes")
def find_subnets(
self, canvas_node: CanvasNode, visited: Set[int] = None
) -> Optional[IPNetwork]:
logging.info("finding subnet for node: %s", canvas_node.core_node.name)
subnets = None
if not visited:
visited = set()
visited.add(canvas_node.core_node.id)
for edge in canvas_node.edges:
iface = edge.link.iface1
check_node = edge.src
if edge.src == canvas_node:
iface = edge.link.iface2
check_node = edge.dst
if check_node.core_node.id in visited:
continue
visited.add(check_node.core_node.id)
if iface:
subnets = self.get_subnets(iface)
else:
subnets = self.find_subnets(check_node, visited)
if subnets:
logging.info("found subnets: %s", subnets)
break
return subnets
def create_link(self, edge: CanvasEdge) -> Link:
"""
Create core link for a given edge based on src/dst nodes.
"""
src_node = edge.src.core_node
dst_node = edge.dst.core_node
self.determine_subnets(edge.src, edge.dst)
src_iface = None
if nutils.is_container(src_node):
src_iface = self.create_iface(edge.src, edge.linked_wireless)
dst_iface = None
if nutils.is_container(dst_node):
dst_iface = self.create_iface(edge.dst, edge.linked_wireless)
link = Link(
type=LinkType.WIRED,
node1_id=src_node.id,
node2_id=dst_node.id,
iface1=src_iface,
iface2=dst_iface,
)
logging.info("added link between %s and %s", src_node.name, dst_node.name)
return link
def create_iface(self, canvas_node: CanvasNode, wireless_link: bool) -> Interface:
node = canvas_node.core_node
ip4, ip6 = self.get_ips(node)
if wireless_link:
ip4_mask = WIRELESS_IP4_MASK
ip6_mask = WIRELESS_IP6_MASK
else:
ip4_mask = IP4_MASK
ip6_mask = IP6_MASK
iface_id = canvas_node.next_iface_id()
name = f"eth{iface_id}"
iface = Interface(
id=iface_id,
name=name,
ip4=ip4,
ip4_mask=ip4_mask,
ip6=ip6,
ip6_mask=ip6_mask,
)
logging.info("create node(%s) interface(%s)", node.name, iface)
return iface