Merge pull request #441 from coreemu/enhancement/pygui-address-creation

Enhancement/pygui address creation
This commit is contained in:
bharnden 2020-05-01 17:23:40 -07:00 committed by GitHub
commit 28f7d93abd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 34 deletions

View file

@ -283,6 +283,9 @@ class CoreClient:
response = self.client.get_emane_config(self.session_id)
self.emane_config = response.config
# update interface manager
self.interfaces_manager.joined(session.links)
# draw session
self.app.canvas.reset_and_redraw(session)
@ -833,6 +836,7 @@ class CoreClient:
such as link, configurations, interfaces
"""
edges = set()
removed_links = []
for canvas_node in canvas_nodes:
node_id = canvas_node.core_node.id
if node_id not in self.canvas_nodes:
@ -843,11 +847,14 @@ class CoreClient:
if edge in edges:
continue
edges.add(edge)
self.links.pop(edge.token, None)
edge = self.links.pop(edge.token, None)
if edge is not None:
removed_links.append(edge.link)
self.interfaces_manager.removed(removed_links)
def create_interface(self, canvas_node: CanvasNode) -> core_pb2.Interface:
node = canvas_node.core_node
ip4, ip6 = self.interfaces_manager.get_ips(node.id)
ip4, ip6 = self.interfaces_manager.get_ips(node)
ip4_mask = self.interfaces_manager.ip4_mask
ip6_mask = self.interfaces_manager.ip6_mask
interface_id = len(canvas_node.interfaces)

View file

@ -1,6 +1,5 @@
import logging
import random
from typing import TYPE_CHECKING, Set, Union
from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple
import netaddr
from netaddr import EUI, IPNetwork
@ -14,20 +13,29 @@ if TYPE_CHECKING:
from core.gui.graph.node import CanvasNode
def random_mac():
return ("{:02x}" * 6).format(*[random.randrange(256) for _ in range(6)])
def get_index(interface: "core_pb2.Interface") -> int:
net = netaddr.IPNetwork(f"{interface.ip4}/{interface.ip4mask}")
ip_value = net.value
cidr_value = net.cidr.value
return ip_value - cidr_value
class Subnets:
def __init__(self, ip4: IPNetwork, ip6: IPNetwork) -> None:
self.ip4 = ip4
self.ip6 = ip6
self.used_indexes = set()
def __eq__(self, other: "Subnets") -> bool:
return (self.ip4, self.ip6) == (other.ip4, other.ip6)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Subnets):
return False
return self.key() == other.key()
def __hash__(self) -> int:
return hash((self.ip4, self.ip6))
return hash(self.key())
def key(self) -> Tuple[IPNetwork, IPNetwork]:
return self.ip4, self.ip6
def next(self) -> "Subnets":
return Subnets(self.ip4.next(), self.ip6.next())
@ -47,6 +55,7 @@ class InterfaceManager:
self.mac = EUI(mac)
self.current_mac = None
self.current_subnets = None
self.used_subnets = {}
def update_ips(self, ip4: str, ip6: str) -> None:
self.reset()
@ -65,41 +74,91 @@ class InterfaceManager:
return mac
def next_subnets(self) -> Subnets:
# define currently used subnets
used_subnets = set()
for edge in self.app.core.links.values():
link = edge.link
subnets = None
if link.HasField("interface_one"):
subnets = self.get_subnets(link.interface_one)
if link.HasField("interface_two"):
subnets = self.get_subnets(link.interface_two)
if subnets:
used_subnets.add(subnets)
# find next available subnets
subnets = Subnets(self.ip4_subnets, self.ip6_subnets)
while subnets in used_subnets:
subnets = self.current_subnets
if subnets is None:
subnets = Subnets(self.ip4_subnets, self.ip6_subnets)
while subnets.key() in self.used_subnets:
subnets = subnets.next()
self.used_subnets[subnets.key()] = subnets
return subnets
def reset(self):
def reset(self) -> None:
self.current_subnets = None
self.used_subnets.clear()
def get_ips(self, node_id: int) -> [str, str]:
ip4 = self.current_subnets.ip4[node_id]
ip6 = self.current_subnets.ip6[node_id]
def removed(self, links: List["core_pb2.Link"]) -> None:
# get remaining subnets
remaining_subnets = set()
for edge in self.app.core.links.values():
link = edge.link
if link.HasField("interface_one"):
subnets = self.get_subnets(link.interface_one)
remaining_subnets.add(subnets)
if link.HasField("interface_two"):
subnets = self.get_subnets(link.interface_two)
remaining_subnets.add(subnets)
# remove all subnets from used subnets when no longer present
# or remove used indexes from subnet
interfaces = []
for link in links:
if link.HasField("interface_one"):
interfaces.append(link.interface_one)
if link.HasField("interface_two"):
interfaces.append(link.interface_two)
for interface in interfaces:
subnets = self.get_subnets(interface)
if subnets not in remaining_subnets:
if self.current_subnets == subnets:
self.current_subnets = None
self.used_subnets.pop(subnets.key(), None)
else:
index = get_index(interface)
subnets.used_indexes.discard(index)
def joined(self, links: List["core_pb2.Link"]) -> None:
interfaces = []
for link in links:
if link.HasField("interface_one"):
interfaces.append(link.interface_one)
if link.HasField("interface_two"):
interfaces.append(link.interface_two)
# add to used subnets and mark used indexes
for interface in interfaces:
subnets = self.get_subnets(interface)
index = get_index(interface)
subnets.used_indexes.add(index)
if subnets.key() not in self.used_subnets:
self.used_subnets[subnets.key()] = subnets
def next_index(self, node: "core_pb2.Node") -> int:
if NodeUtils.is_router_node(node):
index = 1
else:
index = 20
while True:
if index not in self.current_subnets.used_indexes:
self.current_subnets.used_indexes.add(index)
break
index += 1
return index
def get_ips(self, node: "core_pb2.Node") -> [str, str]:
index = self.next_index(node)
ip4 = self.current_subnets.ip4[index]
ip6 = self.current_subnets.ip6[index]
return str(ip4), str(ip6)
@classmethod
def get_subnets(cls, interface: "core_pb2.Interface") -> Subnets:
def get_subnets(self, interface: "core_pb2.Interface") -> Subnets:
ip4_subnet = IPNetwork(f"{interface.ip4}/{interface.ip4mask}").cidr
ip6_subnet = IPNetwork(f"{interface.ip6}/{interface.ip6mask}").cidr
return Subnets(ip4_subnet, ip6_subnet)
subnets = Subnets(ip4_subnet, ip6_subnet)
return self.used_subnets.get(subnets.key(), subnets)
def determine_subnets(
self, canvas_src_node: "CanvasNode", canvas_dst_node: "CanvasNode"
):
) -> None:
src_node = canvas_src_node.core_node
dst_node = canvas_dst_node.core_node
is_src_container = NodeUtils.is_container_node(src_node.type)
@ -123,7 +182,7 @@ class InterfaceManager:
def find_subnets(
self, canvas_node: "CanvasNode", visited: Set[int] = None
) -> Union[IPNetwork, None]:
) -> Optional[IPNetwork]:
logging.info("finding subnet for node: %s", canvas_node.core_node.name)
canvas = self.app.canvas
subnets = None

View file

@ -1,7 +1,7 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union
from core.api.grpc.core_pb2 import NodeType
from core.api.grpc.core_pb2 import Node, NodeType
from core.gui.images import ImageEnum, Images, TypeToImage
if TYPE_CHECKING:
@ -64,8 +64,13 @@ class NodeUtils:
RJ45_NODES = {NodeType.RJ45}
IGNORE_NODES = {NodeType.CONTROL_NET, NodeType.PEER_TO_PEER}
NODE_MODELS = {"router", "host", "PC", "mdr", "prouter"}
ROUTER_NODES = {"router", "mdr"}
ANTENNA_ICON = None
@classmethod
def is_router_node(cls, node: Node) -> bool:
return cls.is_model_node(node.type) and node.model in cls.ROUTER_NODES
@classmethod
def is_ignore_node(cls, node_type: NodeType) -> bool:
return node_type in cls.IGNORE_NODES