433 lines
16 KiB
Python
433 lines
16 KiB
Python
import json
|
|
import logging
|
|
import tkinter as tk
|
|
from copy import deepcopy
|
|
from tkinter import BooleanVar, messagebox, ttk
|
|
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, ValuesView
|
|
|
|
from core.api.grpc.wrappers import Link, LinkType, Node, Session, ThroughputsEvent
|
|
from core.gui import nodeutils as nutils
|
|
from core.gui.graph import tags
|
|
from core.gui.graph.edges import (
|
|
CanvasEdge,
|
|
CanvasWirelessEdge,
|
|
create_edge_token,
|
|
create_wireless_token,
|
|
)
|
|
from core.gui.graph.enums import GraphMode
|
|
from core.gui.graph.graph import CanvasGraph
|
|
from core.gui.graph.node import CanvasNode
|
|
from core.gui.graph.shape import Shape
|
|
from core.gui.graph.shapeutils import ShapeType
|
|
from core.gui.nodeutils import NodeDraw
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if TYPE_CHECKING:
|
|
from core.gui.app import Application
|
|
from core.gui.coreclient import CoreClient
|
|
|
|
|
|
class ShowVar(BooleanVar):
|
|
def __init__(self, manager: "CanvasManager", tag: str, value: bool) -> None:
|
|
super().__init__(value=value)
|
|
self.manager: "CanvasManager" = manager
|
|
self.tag: str = tag
|
|
|
|
def state(self) -> str:
|
|
return tk.NORMAL if self.get() else tk.HIDDEN
|
|
|
|
def click_handler(self) -> None:
|
|
for canvas in self.manager.all():
|
|
canvas.itemconfigure(self.tag, state=self.state())
|
|
|
|
|
|
class ShowNodeLabels(ShowVar):
|
|
def click_handler(self) -> None:
|
|
state = self.state()
|
|
for canvas in self.manager.all():
|
|
for node in canvas.nodes.values():
|
|
if not node.hidden:
|
|
node.set_label(state)
|
|
|
|
|
|
class ShowLinks(ShowVar):
|
|
def click_handler(self) -> None:
|
|
for edge in self.manager.edges.values():
|
|
if not edge.hidden:
|
|
edge.check_visibility()
|
|
|
|
|
|
class ShowLinkLabels(ShowVar):
|
|
def click_handler(self) -> None:
|
|
state = self.state()
|
|
for edge in self.manager.edges.values():
|
|
if not edge.hidden:
|
|
edge.set_labels(state)
|
|
|
|
|
|
class CanvasManager:
|
|
def __init__(
|
|
self, master: tk.BaseWidget, app: "Application", core: "CoreClient"
|
|
) -> None:
|
|
self.master: tk.BaseWidget = master
|
|
self.app: "Application" = app
|
|
self.core: "CoreClient" = core
|
|
|
|
# canvas interactions
|
|
self.mode: GraphMode = GraphMode.SELECT
|
|
self.annotation_type: Optional[ShapeType] = None
|
|
self.node_draw: Optional[NodeDraw] = None
|
|
self.canvases: Dict[int, CanvasGraph] = {}
|
|
|
|
# global edge management
|
|
self.edges: Dict[str, CanvasEdge] = {}
|
|
self.wireless_edges: Dict[str, CanvasWirelessEdge] = {}
|
|
|
|
# global canvas settings
|
|
self.default_dimensions: Tuple[int, int] = (
|
|
self.app.guiconfig.preferences.width,
|
|
self.app.guiconfig.preferences.height,
|
|
)
|
|
self.show_node_labels: ShowVar = ShowNodeLabels(
|
|
self, tags.NODE_LABEL, value=True
|
|
)
|
|
self.show_link_labels: ShowVar = ShowLinkLabels(
|
|
self, tags.LINK_LABEL, value=True
|
|
)
|
|
self.show_links: ShowVar = ShowLinks(self, tags.EDGE, value=True)
|
|
self.show_wireless: ShowVar = ShowVar(self, tags.WIRELESS_EDGE, value=True)
|
|
self.show_grid: ShowVar = ShowVar(self, tags.GRIDLINE, value=True)
|
|
self.show_annotations: ShowVar = ShowVar(self, tags.ANNOTATION, value=True)
|
|
self.show_loss_links: ShowVar = ShowLinks(self, tags.LOSS_EDGES, value=True)
|
|
self.show_iface_names: BooleanVar = BooleanVar(value=False)
|
|
self.show_ip4s: BooleanVar = BooleanVar(value=True)
|
|
self.show_ip6s: BooleanVar = BooleanVar(value=True)
|
|
|
|
# throughput settings
|
|
self.throughput_threshold: float = 250.0
|
|
self.throughput_width: int = 10
|
|
self.throughput_color: str = "#FF0000"
|
|
|
|
# widget
|
|
self.notebook: Optional[ttk.Notebook] = None
|
|
self.canvas_ids: Dict[str, int] = {}
|
|
self.unique_ids: Dict[int, str] = {}
|
|
self.draw()
|
|
|
|
self.setup_bindings()
|
|
# start with a single tab by default
|
|
self.add_canvas()
|
|
|
|
def setup_bindings(self) -> None:
|
|
self.notebook.bind("<<NotebookTabChanged>>", self.tab_change)
|
|
|
|
def tab_change(self, _event: tk.Event) -> None:
|
|
# ignore tab change events before tab data has been setup
|
|
unique_id = self.notebook.select()
|
|
if not unique_id or unique_id not in self.canvas_ids:
|
|
return
|
|
canvas = self.current()
|
|
self.app.statusbar.set_zoom(canvas.ratio)
|
|
|
|
def select(self, tab_id: int):
|
|
unique_id = self.unique_ids.get(tab_id)
|
|
self.notebook.select(unique_id)
|
|
|
|
def draw(self) -> None:
|
|
self.notebook = ttk.Notebook(self.master)
|
|
self.notebook.grid(sticky=tk.NSEW, pady=1)
|
|
|
|
def _next_id(self) -> int:
|
|
_id = 1
|
|
canvas_ids = set(self.canvas_ids.values())
|
|
while _id in canvas_ids:
|
|
_id += 1
|
|
return _id
|
|
|
|
def current(self) -> CanvasGraph:
|
|
unique_id = self.notebook.select()
|
|
canvas_id = self.canvas_ids[unique_id]
|
|
return self.get(canvas_id)
|
|
|
|
def all(self) -> ValuesView[CanvasGraph]:
|
|
return self.canvases.values()
|
|
|
|
def get(self, canvas_id: int) -> CanvasGraph:
|
|
canvas = self.canvases.get(canvas_id)
|
|
if not canvas:
|
|
canvas = self.add_canvas(canvas_id)
|
|
return canvas
|
|
|
|
def add_canvas(self, canvas_id: int = None) -> CanvasGraph:
|
|
# create tab frame
|
|
tab = ttk.Frame(self.notebook, padding=0)
|
|
tab.grid(sticky=tk.NSEW)
|
|
tab.columnconfigure(0, weight=1)
|
|
tab.rowconfigure(0, weight=1)
|
|
if canvas_id is None:
|
|
canvas_id = self._next_id()
|
|
self.notebook.add(tab, text=f"Canvas {canvas_id}")
|
|
unique_id = self.notebook.tabs()[-1]
|
|
logger.info("creating canvas(%s)", canvas_id)
|
|
self.canvas_ids[unique_id] = canvas_id
|
|
self.unique_ids[canvas_id] = unique_id
|
|
|
|
# create canvas
|
|
canvas = CanvasGraph(
|
|
tab, self.app, self, self.core, canvas_id, self.default_dimensions
|
|
)
|
|
canvas.grid(sticky=tk.NSEW)
|
|
self.canvases[canvas_id] = canvas
|
|
|
|
# add scrollbars
|
|
scroll_y = ttk.Scrollbar(tab, command=canvas.yview)
|
|
scroll_y.grid(row=0, column=1, sticky=tk.NS)
|
|
scroll_x = ttk.Scrollbar(tab, orient=tk.HORIZONTAL, command=canvas.xview)
|
|
scroll_x.grid(row=1, column=0, sticky=tk.EW)
|
|
canvas.configure(xscrollcommand=scroll_x.set)
|
|
canvas.configure(yscrollcommand=scroll_y.set)
|
|
return canvas
|
|
|
|
def delete_canvas(self) -> None:
|
|
if len(self.notebook.tabs()) == 1:
|
|
messagebox.showinfo("Canvas", "Cannot delete last canvas", parent=self.app)
|
|
return
|
|
unique_id = self.notebook.select()
|
|
self.notebook.forget(unique_id)
|
|
canvas_id = self.canvas_ids.pop(unique_id)
|
|
canvas = self.canvases.pop(canvas_id)
|
|
edges = set()
|
|
for node in canvas.nodes.values():
|
|
node.delete()
|
|
while node.edges:
|
|
edge = node.edges.pop()
|
|
if edge in edges:
|
|
continue
|
|
edges.add(edge)
|
|
edge.delete()
|
|
|
|
def join(self, session: Session) -> None:
|
|
# clear out all canvases
|
|
for canvas_id in self.notebook.tabs():
|
|
self.notebook.forget(canvas_id)
|
|
self.canvases.clear()
|
|
self.canvas_ids.clear()
|
|
self.unique_ids.clear()
|
|
self.edges.clear()
|
|
self.wireless_edges.clear()
|
|
logger.info("cleared canvases")
|
|
|
|
# reset settings
|
|
self.show_node_labels.set(True)
|
|
self.show_link_labels.set(True)
|
|
self.show_grid.set(True)
|
|
self.show_annotations.set(True)
|
|
self.show_iface_names.set(False)
|
|
self.show_ip4s.set(True)
|
|
self.show_ip6s.set(True)
|
|
self.show_loss_links.set(True)
|
|
self.mode = GraphMode.SELECT
|
|
self.annotation_type = None
|
|
self.node_draw = None
|
|
|
|
# draw session
|
|
self.draw_session(session)
|
|
|
|
def draw_session(self, session: Session) -> None:
|
|
# draw canvas configurations and shapes
|
|
self.parse_metadata_canvas(session.metadata)
|
|
self.parse_metadata_shapes(session.metadata)
|
|
|
|
# create session nodes
|
|
for core_node in session.nodes.values():
|
|
# add node, avoiding ignored nodes
|
|
if nutils.should_ignore(core_node):
|
|
continue
|
|
self.add_core_node(core_node)
|
|
|
|
# organize canvas tabs
|
|
canvas_ids = sorted(self.canvases)
|
|
for index, canvas_id in enumerate(canvas_ids):
|
|
canvas = self.canvases[canvas_id]
|
|
self.notebook.insert(index, canvas.master)
|
|
|
|
# draw existing links
|
|
for link in session.links:
|
|
node1 = self.core.get_canvas_node(link.node1_id)
|
|
node2 = self.core.get_canvas_node(link.node2_id)
|
|
if link.type == LinkType.WIRELESS:
|
|
self.add_wireless_edge(node1, node2, link)
|
|
else:
|
|
self.add_wired_edge(node1, node2, link)
|
|
|
|
# organize canvas order
|
|
for canvas in self.canvases.values():
|
|
canvas.organize()
|
|
|
|
# parse metada for edge configs and hidden nodes
|
|
self.parse_metadata_edges(session.metadata)
|
|
self.parse_metadata_hidden(session.metadata)
|
|
|
|
# create a default canvas if none were created prior
|
|
if not self.canvases:
|
|
self.add_canvas()
|
|
|
|
def redraw_canvas(self, dimensions: Tuple[int, int]) -> None:
|
|
canvas = self.current()
|
|
canvas.redraw_canvas(dimensions)
|
|
if canvas.wallpaper:
|
|
canvas.redraw_wallpaper()
|
|
|
|
def get_metadata(self) -> Dict[str, Any]:
|
|
canvases = [x.get_metadata() for x in self.all()]
|
|
return dict(gridlines=self.show_grid.get(), canvases=canvases)
|
|
|
|
def parse_metadata_canvas(self, metadata: Dict[str, Any]) -> None:
|
|
# canvas setting
|
|
canvas_config = metadata.get("canvas")
|
|
logger.debug("canvas metadata: %s", canvas_config)
|
|
if not canvas_config:
|
|
return
|
|
canvas_config = json.loads(canvas_config)
|
|
# get configured dimensions and gridlines option
|
|
gridlines = canvas_config.get("gridlines", True)
|
|
self.show_grid.set(gridlines)
|
|
|
|
# get background configurations
|
|
for canvas_config in canvas_config.get("canvases", []):
|
|
canvas_id = canvas_config.get("id")
|
|
if canvas_id is None:
|
|
logger.error("canvas config id not provided")
|
|
continue
|
|
canvas = self.get(canvas_id)
|
|
canvas.parse_metadata(canvas_config)
|
|
|
|
def parse_metadata_shapes(self, metadata: Dict[str, Any]) -> None:
|
|
# load saved shapes
|
|
shapes_config = metadata.get("shapes")
|
|
if not shapes_config:
|
|
return
|
|
shapes_config = json.loads(shapes_config)
|
|
for shape_config in shapes_config:
|
|
logger.debug("loading shape: %s", shape_config)
|
|
Shape.from_metadata(self.app, shape_config)
|
|
|
|
def parse_metadata_edges(self, metadata: Dict[str, Any]) -> None:
|
|
# load edges config
|
|
edges_config = metadata.get("edges")
|
|
if not edges_config:
|
|
return
|
|
edges_config = json.loads(edges_config)
|
|
logger.info("edges config: %s", edges_config)
|
|
for edge_config in edges_config:
|
|
edge_token = edge_config["token"]
|
|
edge = self.core.links.get(edge_token)
|
|
if edge:
|
|
edge.width = edge_config["width"]
|
|
edge.color = edge_config["color"]
|
|
edge.redraw()
|
|
else:
|
|
logger.warning("invalid edge token to configure: %s", edge_token)
|
|
|
|
def parse_metadata_hidden(self, metadata: Dict[str, Any]) -> None:
|
|
# read hidden nodes
|
|
hidden_config = metadata.get("hidden")
|
|
if not hidden_config:
|
|
return
|
|
hidden_config = json.loads(hidden_config)
|
|
for node_id in hidden_config:
|
|
canvas_node = self.core.canvas_nodes.get(node_id)
|
|
if canvas_node:
|
|
canvas_node.hide()
|
|
else:
|
|
logger.warning("invalid node to hide: %s", node_id)
|
|
|
|
def add_core_node(self, core_node: Node) -> None:
|
|
# get canvas tab for node
|
|
canvas_id = core_node.canvas if core_node.canvas > 0 else 1
|
|
logger.info("adding core node canvas(%s): %s", core_node.name, canvas_id)
|
|
canvas = self.get(canvas_id)
|
|
image = nutils.get_icon(core_node, self.app)
|
|
x = core_node.position.x
|
|
y = core_node.position.y
|
|
node = CanvasNode(self.app, canvas, x, y, core_node, image)
|
|
canvas.nodes[node.id] = node
|
|
self.core.set_canvas_node(core_node, node)
|
|
|
|
def set_throughputs(self, throughputs_event: ThroughputsEvent):
|
|
for iface_throughput in throughputs_event.iface_throughputs:
|
|
node_id = iface_throughput.node_id
|
|
iface_id = iface_throughput.iface_id
|
|
throughput = iface_throughput.throughput
|
|
iface_to_edge_id = (node_id, iface_id)
|
|
edge = self.core.iface_to_edge.get(iface_to_edge_id)
|
|
if edge:
|
|
edge.set_throughput(throughput)
|
|
|
|
def clear_throughputs(self) -> None:
|
|
for edge in self.edges.values():
|
|
edge.clear_throughput()
|
|
|
|
def stopped_session(self) -> None:
|
|
# clear wireless edges
|
|
for edge in self.wireless_edges.values():
|
|
edge.delete()
|
|
self.wireless_edges.clear()
|
|
self.clear_throughputs()
|
|
|
|
def update_wired_edge(self, link: Link) -> None:
|
|
token = create_edge_token(link)
|
|
edge = self.edges.get(token)
|
|
if edge:
|
|
edge.link.options = deepcopy(link.options)
|
|
edge.draw_link_options()
|
|
edge.check_visibility()
|
|
|
|
def delete_wired_edge(self, link: Link) -> None:
|
|
token = create_edge_token(link)
|
|
edge = self.edges.get(token)
|
|
if edge:
|
|
edge.delete()
|
|
|
|
def add_wired_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
|
|
token = create_edge_token(link)
|
|
if token in self.edges and link.options.unidirectional:
|
|
edge = self.edges[token]
|
|
edge.asymmetric_link = link
|
|
edge.redraw()
|
|
elif token not in self.edges:
|
|
edge = CanvasEdge(self.app, src, dst)
|
|
edge.complete(dst, link)
|
|
|
|
def add_wireless_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None:
|
|
network_id = link.network_id if link.network_id else None
|
|
token = create_wireless_token(src.id, dst.id, network_id)
|
|
if token in self.wireless_edges:
|
|
logger.warning("ignoring link that already exists: %s", link)
|
|
return
|
|
edge = CanvasWirelessEdge(self.app, src, dst, network_id, token, link)
|
|
self.wireless_edges[token] = edge
|
|
|
|
def delete_wireless_edge(
|
|
self, src: CanvasNode, dst: CanvasNode, link: Link
|
|
) -> None:
|
|
network_id = link.network_id if link.network_id else None
|
|
token = create_wireless_token(src.id, dst.id, network_id)
|
|
if token not in self.wireless_edges:
|
|
return
|
|
edge = self.wireless_edges.pop(token)
|
|
edge.delete()
|
|
|
|
def update_wireless_edge(
|
|
self, src: CanvasNode, dst: CanvasNode, link: Link
|
|
) -> None:
|
|
if not link.label:
|
|
return
|
|
network_id = link.network_id if link.network_id else None
|
|
token = create_wireless_token(src.id, dst.id, network_id)
|
|
if token not in self.wireless_edges:
|
|
self.add_wireless_edge(src, dst, link)
|
|
else:
|
|
edge = self.wireless_edges[token]
|
|
edge.middle_label_text(link.label)
|