initial changes to mimic prior address creation

This commit is contained in:
Blake Harnden 2020-05-01 13:39:27 -07:00
parent 7e0ead0766
commit 9a42368221
3 changed files with 70 additions and 32 deletions

View file

@ -1,6 +1,5 @@
import logging
import random
from typing import TYPE_CHECKING, Set, Union
from typing import TYPE_CHECKING, List, Set, Tuple, Union
import netaddr
from netaddr import EUI, IPNetwork
@ -14,20 +13,20 @@ if TYPE_CHECKING:
from core.gui.graph.node import CanvasNode
def random_mac():
return ("{:02x}" * 6).format(*[random.randrange(256) for _ in range(6)])
class Subnets:
def __init__(self, ip4: IPNetwork, ip6: IPNetwork) -> None:
self.ip4 = ip4
self.ip6 = ip6
self.used_indexes = set()
def __eq__(self, other: "Subnets") -> bool:
return (self.ip4, self.ip6) == (other.ip4, other.ip6)
return self.key() == other.key()
def __hash__(self) -> int:
return hash((self.ip4, self.ip6))
return hash(self.key())
def key(self) -> Tuple[IPNetwork, IPNetwork]:
return self.ip4, self.ip6
def next(self) -> "Subnets":
return Subnets(self.ip4.next(), self.ip6.next())
@ -47,6 +46,7 @@ class InterfaceManager:
self.mac = EUI(mac)
self.current_mac = None
self.current_subnets = None
self.used_subnets = {}
def update_ips(self, ip4: str, ip6: str) -> None:
self.reset()
@ -65,37 +65,66 @@ class InterfaceManager:
return mac
def next_subnets(self) -> Subnets:
# define currently used subnets
used_subnets = set()
for edge in self.app.core.links.values():
link = edge.link
subnets = None
if link.HasField("interface_one"):
subnets = self.get_subnets(link.interface_one)
if link.HasField("interface_two"):
subnets = self.get_subnets(link.interface_two)
if subnets:
used_subnets.add(subnets)
# find next available subnets
subnets = Subnets(self.ip4_subnets, self.ip6_subnets)
while subnets in used_subnets:
subnets = self.current_subnets
if subnets is None:
subnets = Subnets(self.ip4_subnets, self.ip6_subnets)
while subnets.key() in self.used_subnets:
subnets = subnets.next()
self.used_subnets[subnets.key()] = subnets
return subnets
def reset(self):
self.current_subnets = None
self.used_subnets.clear()
def get_ips(self, node_id: int) -> [str, str]:
ip4 = self.current_subnets.ip4[node_id]
ip6 = self.current_subnets.ip6[node_id]
def removed(self, links: List["core_pb2.Link"]):
# get remaining subnets
remaining_subnets = set()
for link in links:
if link.HasField("interface_one"):
subnets = self.get_subnets(link.interface_one)
if subnets not in remaining_subnets:
self.used_subnets.pop(subnets.key(), None)
if link.HasField("interface_two"):
subnets = self.get_subnets(link.interface_two)
if subnets not in remaining_subnets:
self.used_subnets.pop(subnets.key(), None)
def initialize_links(self, links: List["core_pb2.Link"]):
for link in links:
if link.HasField("interface_one"):
subnets = self.get_subnets(link.interface_one)
if subnets.key() not in self.used_subnets:
self.used_subnets[subnets.key()] = subnets
if link.HasField("interface_two"):
subnets = self.get_subnets(link.interface_two)
if subnets.key() not in self.used_subnets:
self.used_subnets[subnets.key()] = subnets
def next_index(self, node: "core_pb2.Node") -> int:
if NodeUtils.is_router_node(node):
index = 1
else:
index = 20
while True:
if index not in self.current_subnets.used_indexes:
self.current_subnets.used_indexes.add(index)
break
index += 1
return index
def get_ips(self, node: "core_pb2.Node") -> [str, str]:
index = self.next_index(node)
ip4 = self.current_subnets.ip4[index]
ip6 = self.current_subnets.ip6[index]
return str(ip4), str(ip6)
@classmethod
def get_subnets(cls, interface: "core_pb2.Interface") -> Subnets:
def get_subnets(self, interface: "core_pb2.Interface") -> Subnets:
ip4_subnet = IPNetwork(f"{interface.ip4}/{interface.ip4mask}").cidr
ip6_subnet = IPNetwork(f"{interface.ip6}/{interface.ip6mask}").cidr
return Subnets(ip4_subnet, ip6_subnet)
subnets = Subnets(ip4_subnet, ip6_subnet)
return self.used_subnets.get(subnets.key(), subnets)
def determine_subnets(
self, canvas_src_node: "CanvasNode", canvas_dst_node: "CanvasNode"