more work on coretk

This commit is contained in:
Huy Pham 2019-10-18 16:42:00 -07:00
parent ec2e959bda
commit 38683cb0d0
18 changed files with 939 additions and 202 deletions

View file

@ -25,6 +25,7 @@ class Application(tk.Frame):
self.create_widgets()
self.draw_canvas()
self.start_grpc()
# self.try_make_table()
def load_images(self):
"""
@ -89,7 +90,7 @@ class Application(tk.Frame):
:return: nothing
"""
self.master.update()
self.core_grpc = CoreGrpc(self.master)
self.core_grpc = CoreGrpc(self)
self.core_grpc.set_up()
self.canvas.core_grpc = self.core_grpc
self.canvas.draw_existing_component()
@ -99,6 +100,13 @@ class Application(tk.Frame):
menu_action.on_quit()
# self.quit()
def try_make_table(self):
f = tk.Frame(self.master)
for i in range(3):
e = tk.Entry(f)
e.grid(row=0, column=1, stick="nsew")
f.pack(side=tk.TOP)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)

View file

@ -3,26 +3,37 @@ Incorporate grpc into python tkinter GUI
"""
import logging
import os
import tkinter as tk
from collections import OrderedDict
from core.api.grpc import client, core_pb2
from coretk.linkinfo import Throughput
from coretk.querysessiondrawing import SessionTable
from coretk.wirelessconnection import WirelessConnection
class CoreGrpc:
def __init__(self, master):
def __init__(self, app, sid=None):
"""
Create a CoreGrpc instance
"""
print("Create grpc instance")
self.core = client.CoreGrpcClient()
self.session_id = None
self.master = master
self.session_id = sid
self.master = app.master
# self.set_up()
self.interface_helper = None
self.throughput_draw = Throughput(app.canvas, self)
self.wireless_draw = WirelessConnection(app.canvas, self)
def log_event(self, event):
logging.info("event: %s", event)
if event.link_event is not None:
self.wireless_draw.hangle_link_event(event.link_event)
def log_throughput(self, event):
interface_throughputs = event.interface_throughputs
# bridge_throughputs = event.bridge_throughputs
self.throughput_draw.process_grpc_throughput_event(interface_throughputs)
def create_new_session(self):
"""
@ -36,29 +47,7 @@ class CoreGrpc:
# handle events session may broadcast
self.session_id = response.session_id
self.core.events(self.session_id, self.log_event)
def _enter_session(self, session_id, dialog):
"""
enter an existing session
:return:
"""
dialog.destroy()
response = self.core.get_session(session_id)
self.session_id = session_id
print("set session id: %s", session_id)
logging.info("Entering session_id %s.... Result: %s", session_id, response)
# self.master.canvas.draw_existing_component()
def _create_session(self, dialog):
"""
create a new session
:param tkinter.Toplevel dialog: save core session prompt dialog
:return: nothing
"""
dialog.destroy()
self.create_new_session()
self.core.throughputs(self.log_throughput)
def query_existing_sessions(self, sessions):
"""
@ -68,35 +57,29 @@ class CoreGrpc:
:return: nothing
"""
dialog = tk.Toplevel()
dialog.title("CORE sessions")
for session in sessions:
b = tk.Button(
dialog,
text="Session " + str(session.id),
command=lambda sid=session.id: self._enter_session(sid, dialog),
)
b.pack(side=tk.TOP)
b = tk.Button(
dialog, text="create new", command=lambda: self._create_session(dialog)
)
b.pack(side=tk.TOP)
dialog.update()
x = (
self.master.winfo_x()
+ (self.master.winfo_width() - dialog.winfo_width()) / 2
)
y = (
self.master.winfo_y()
+ (self.master.winfo_height() / 2 - dialog.winfo_height()) / 2
)
dialog.geometry(f"+{int(x)}+{int(y)}")
dialog.wait_window()
SessionTable(self, self.master)
def delete_session(self):
response = self.core.delete_session(self.session_id)
def delete_session(self, custom_sid=None):
if custom_sid is None:
sid = self.session_id
else:
sid = custom_sid
response = self.core.delete_session(sid)
logging.info("Deleted session result: %s", response)
def terminate_session(self, custom_sid=None):
if custom_sid is None:
sid = self.session_id
else:
sid = custom_sid
s = self.core.get_session(sid).session
# delete links and nodes from running session
if s.state == core_pb2.SessionState.RUNTIME:
self.set_session_state("datacollect", sid)
self.delete_links(sid)
self.delete_nodes(sid)
self.delete_session(sid)
def set_up(self):
"""
Query sessions, if there exist any, prompt whether to join one
@ -106,7 +89,7 @@ class CoreGrpc:
self.core.connect()
response = self.core.get_sessions()
logging.info("all sessions: %s", response)
# logging.info("coregrpc.py: all sessions: %s", response)
# if there are no sessions, create a new session, else join a session
sessions = response.sessions
@ -119,70 +102,87 @@ class CoreGrpc:
def get_session_state(self):
response = self.core.get_session(self.session_id)
logging.info("get session: %s", response)
# logging.info("get session: %s", response)
return response.session.state
def set_session_state(self, state):
def set_session_state(self, state, custom_session_id=None):
"""
Set session state
:param str state: session state to set
:return: nothing
"""
response = None
if custom_session_id is None:
sid = self.session_id
else:
sid = custom_session_id
if state == "configuration":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.CONFIGURATION
sid, core_pb2.SessionState.CONFIGURATION
)
elif state == "instantiation":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.INSTANTIATION
sid, core_pb2.SessionState.INSTANTIATION
)
elif state == "datacollect":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.DATACOLLECT
sid, core_pb2.SessionState.DATACOLLECT
)
elif state == "shutdown":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.SHUTDOWN
)
response = self.core.set_session_state(sid, core_pb2.SessionState.SHUTDOWN)
elif state == "runtime":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.RUNTIME
)
response = self.core.set_session_state(sid, core_pb2.SessionState.RUNTIME)
elif state == "definition":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.DEFINITION
sid, core_pb2.SessionState.DEFINITION
)
elif state == "none":
response = self.core.set_session_state(
self.session_id, core_pb2.SessionState.NONE
)
response = self.core.set_session_state(sid, core_pb2.SessionState.NONE)
else:
logging.error("coregrpc.py: set_session_state: INVALID STATE")
logging.info("set session state: %s", response)
def add_node(self, node_type, model, x, y, name, node_id):
logging.info("coregrpc.py ADD NODE %s", name)
position = core_pb2.Position(x=x, y=y)
node = core_pb2.Node(id=node_id, type=node_type, position=position, model=model)
response = self.core.add_node(self.session_id, node)
logging.info("created node: %s", response)
if node_type == core_pb2.NodeType.WIRELESS_LAN:
d = OrderedDict()
d["basic_range"] = "275"
d["bandwidth"] = "54000000"
d["jitter"] = "0"
d["delay"] = "20000"
d["error"] = "0"
r = self.core.set_wlan_config(self.session_id, node_id, d)
logging.debug("set wlan config %s", r)
return response.node_id
def edit_node(self, node_id, x, y):
position = core_pb2.Position(x=x, y=y)
response = self.core.edit_node(self.session_id, node_id, position)
logging.info("updated node id %s: %s", node_id, response)
# self.core.events(self.session_id, self.log_event)
def delete_nodes(self):
for node in self.core.get_session(self.session_id).session.nodes:
def delete_nodes(self, delete_session=None):
if delete_session is None:
sid = self.session_id
else:
sid = delete_session
for node in self.core.get_session(sid).session.nodes:
response = self.core.delete_node(self.session_id, node.id)
logging.info("delete node %s", response)
logging.info("delete nodes %s", response)
def delete_links(self):
for link in self.core.get_session(self.session_id).session.links:
def delete_links(self, delete_session=None):
sid = None
if delete_session is None:
sid = self.session_id
else:
sid = delete_session
for link in self.core.get_session(sid).session.links:
response = self.core.delete_link(
self.session_id,
link.node_one_id,
@ -190,7 +190,7 @@ class CoreGrpc:
link.interface_one.id,
link.interface_two.id,
)
logging.info("delete link %s", response)
logging.info("delete links %s", response)
def add_link(self, id1, id2, type1, type2, edge):
"""
@ -235,6 +235,13 @@ class CoreGrpc:
response = self.core.add_link(self.session_id, id1, id2, if1, if2)
logging.info("created link: %s", response)
# def get_session(self):
# response = self.core.get_session(self.session_id)
# nodes = response.session.nodes
# for node in nodes:
# r = self.core.get_node_links(self.session_id, node.id)
# logging.info(r)
def launch_terminal(self, node_id):
response = self.core.get_node_terminal(self.session_id, node_id)
logging.info("get terminal %s", response.terminal)
@ -249,6 +256,7 @@ class CoreGrpc:
"""
response = self.core.save_xml(self.session_id, file_path)
logging.info("coregrpc.py save xml %s", response)
self.core.events(self.session_id, self.log_event)
def open_xml(self, file_path):
"""
@ -258,7 +266,10 @@ class CoreGrpc:
:return: session id
"""
response = self.core.open_xml(file_path)
return response.session_id
self.session_id = response.session_id
# print("Sessionz")
# self.core.events(self.session_id, self.log_event)
# return response.session_id
# logging.info("coregrpc.py open_xml()", type(response))
def close(self):

View file

@ -0,0 +1,44 @@
"""
provide mapping from core to canvas
"""
import logging
class CoreToCanvasMapping:
def __init__(self):
self.core_id_to_canvas_id = {}
self.core_node_and_interface_to_canvas_edge = {}
def map_node_and_interface_to_canvas_edge(self, nid, iid, edge_token):
self.core_node_and_interface_to_canvas_edge[tuple([nid, iid])] = edge_token
def get_token_from_node_and_interface(self, nid, iid):
key = tuple([nid, iid])
if key in self.core_node_and_interface_to_canvas_edge:
return self.core_node_and_interface_to_canvas_edge[key]
else:
logging.error("invalid key")
return None
def map_core_id_to_canvas_id(self, core_nid, canvas_nid):
if core_nid not in self.core_id_to_canvas_id:
self.core_id_to_canvas_id[core_nid] = canvas_nid
else:
logging.debug("key already existed")
def get_canvas_id_from_core_id(self, core_id):
if core_id in self.core_id_to_canvas_id:
return self.core_id_to_canvas_id[core_id]
else:
logging.debug("invalid key")
return None
# def add_mapping(self, core_id, canvas_id):
# if core_id not in self.core_id_to_canvas_id:
# self.core_id_to_canvas_id[core_id] = canvas_id
# else:
# logging.error("key already mapped")
#
# def delete_mapping(self, core_id):
# result = self.core_id_to_canvas_id.pop(core_id, None)
# return result

View file

@ -1,11 +1,24 @@
import logging
import tkinter as tk
from enum import Enum
from core.api.grpc import core_pb2
from coretk.graph import GraphMode
from coretk.images import ImageEnum, Images
from coretk.tooltip import CreateToolTip
# from coretk.graph_helper import WlanConnection
class SessionStateEnum(Enum):
NONE = "none"
DEFINITION = "definition"
CONFIGURATION = "configuration"
RUNTIME = "runtime"
DATACOLLECT = "datacollect"
SHUTDOWN = "shutdown"
INSTANTIATION = "instantiation"
class CoreToolbar(object):
"""
@ -62,7 +75,7 @@ class CoreToolbar(object):
if self.marker_option_menu and self.marker_option_menu.winfo_exists():
self.marker_option_menu.destroy()
def destroy_children_widgets(self, parent):
def destroy_children_widgets(self):
"""
Destroy all children of a parent widget
@ -70,7 +83,7 @@ class CoreToolbar(object):
:return: nothing
"""
for i in parent.winfo_children():
for i in self.edit_frame.winfo_children():
if i.winfo_name() != "!frame":
i.destroy()
@ -150,14 +163,23 @@ class CoreToolbar(object):
self.canvas.mode = GraphMode.SELECT
def click_start_session_tool(self):
"""
Start session handler: redraw buttons, send node and link messages to grpc server
:return: nothing
"""
logging.debug("Click START STOP SESSION button")
self.destroy_children_widgets(self.edit_frame)
# self.destroy_children_widgets(self.edit_frame)
self.destroy_children_widgets()
self.canvas.mode = GraphMode.SELECT
# set configuration state
if self.canvas.core_grpc.get_session_state() == core_pb2.SessionState.SHUTDOWN:
self.canvas.core_grpc.set_session_state("definition")
self.canvas.core_grpc.set_session_state("configuration")
state = self.canvas.core_grpc.get_session_state()
if state == core_pb2.SessionState.SHUTDOWN:
self.canvas.core_grpc.set_session_state(SessionStateEnum.DEFINITION.value)
self.canvas.core_grpc.set_session_state(SessionStateEnum.CONFIGURATION.value)
for node in self.canvas.grpc_manager.nodes.values():
self.canvas.core_grpc.add_node(
@ -169,8 +191,9 @@ class CoreToolbar(object):
edge.id1, edge.id2, edge.type1, edge.type2, edge
)
self.canvas.core_grpc.set_session_state("instantiation")
self.canvas.core_grpc.set_session_state(SessionStateEnum.INSTANTIATION.value)
# self.canvas.core_grpc.get_session()
self.create_runtime_toolbar()
def click_link_tool(self):
@ -206,7 +229,7 @@ class CoreToolbar(object):
self.network_layer_option_menu.destroy()
main_button.configure(image=Images.get(ImageEnum.MDR.value))
self.canvas.mode = GraphMode.PICKNODE
self.canvas.draw_node_image = Images.get(ImageEnum.MD.value)
self.canvas.draw_node_image = Images.get(ImageEnum.MDR.value)
self.canvas.draw_node_name = "mdr"
def pick_prouter(self, main_button):
@ -493,6 +516,11 @@ class CoreToolbar(object):
CreateToolTip(marker_main_button, "background annotation tools")
def create_toolbar(self):
"""
Create buttons for toolbar in edit mode
:return: nothing
"""
self.create_regular_button(
self.edit_frame,
Images.get(ImageEnum.START.value),
@ -551,9 +579,16 @@ class CoreToolbar(object):
menu_button.menu.add_command(label="Edit...")
def click_stop_button(self):
"""
redraw buttons on the toolbar, send node and link messages to grpc server
:return: nothing
"""
logging.debug("Click on STOP button ")
self.destroy_children_widgets(self.edit_frame)
self.canvas.core_grpc.set_session_state("datacollect")
# self.destroy_children_widgets(self.edit_frame)
self.destroy_children_widgets()
self.canvas.core_grpc.set_session_state(SessionStateEnum.DATACOLLECT.value)
self.canvas.core_grpc.delete_links()
self.canvas.core_grpc.delete_nodes()
self.create_toolbar()

View file

@ -1,12 +1,13 @@
import enum
import logging
import math
import tkinter as tk
from core.api.grpc import core_pb2
from coretk.graph_helper import GraphHelper, WlanAntennaManager
from coretk.grpcmanagement import GrpcManager
from coretk.images import Images
from coretk.interface import Interface
from coretk.linkinfo import LinkInfo
class GraphMode(enum.Enum):
@ -31,11 +32,17 @@ class CanvasGraph(tk.Canvas):
self.nodes = {}
self.edges = {}
self.drawing_edge = None
self.setup_menus()
self.setup_bindings()
self.draw_grid()
self.core_grpc = grpc
self.grpc_manager = GrpcManager()
self.core_grpc = grpc
self.helper = GraphHelper(self)
# self.core_id_to_canvas_id = {}
# self.core_map = CoreToCanvasMapping()
# self.draw_existing_component()
def setup_menus(self):
@ -44,6 +51,31 @@ class CanvasGraph(tk.Canvas):
self.node_context.add_command(label="Two")
self.node_context.add_command(label="Three")
def canvas_reset_and_redraw(self, new_grpc):
"""
Reset the private variables CanvasGraph object, redraw nodes given the new grpc client
:param new_grpc:
:return:
"""
# delete any existing drawn items
self.delete_components()
# set the private variables to default value
self.mode = GraphMode.SELECT
self.draw_node_image = None
self.draw_node_name = None
self.selected = None
self.node_context = None
self.nodes = {}
self.edges = {}
self.drawing_edge = None
self.grpc_manager = GrpcManager()
# new grpc
self.core_grpc = new_grpc
self.draw_existing_component()
def setup_bindings(self):
"""
Bind any mouse events or hot keys to the matching action
@ -97,7 +129,9 @@ class CanvasGraph(tk.Canvas):
image, name = Images.convert_type_and_model_to_image(
node.type, node.model
)
n = CanvasNode(node.position.x, node.position.y, image, self, node.id)
n = CanvasNode(
node.position.x, node.position.y, image, name, self, node.id
)
self.nodes[n.id] = n
core_id_to_canvas_id[node.id] = n.id
self.grpc_manager.add_preexisting_node(n, session_id, node, name)
@ -107,12 +141,33 @@ class CanvasGraph(tk.Canvas):
for link in session.links:
n1 = self.nodes[core_id_to_canvas_id[link.node_one_id]]
n2 = self.nodes[core_id_to_canvas_id[link.node_two_id]]
e = CanvasEdge(n1.x_coord, n1.y_coord, n2.x_coord, n2.y_coord, n1.id, self)
if link.type == core_pb2.LinkType.WIRED:
e = CanvasEdge(
n1.x_coord,
n1.y_coord,
n2.x_coord,
n2.y_coord,
n1.id,
self,
is_wired=True,
)
elif link.type == core_pb2.LinkType.WIRELESS:
e = CanvasEdge(
n1.x_coord,
n1.y_coord,
n2.x_coord,
n2.y_coord,
n1.id,
self,
is_wired=False,
)
n1.edges.add(e)
n2.edges.add(e)
self.edges[e.token] = e
self.grpc_manager.add_edge(session_id, e.token, n1.id, n2.id)
self.helper.redraw_antenna(link, n1, n2)
# TODO add back the link info to grpc manager also redraw
grpc_if1 = link.interface_one
grpc_if2 = link.interface_two
@ -128,12 +183,11 @@ class CanvasGraph(tk.Canvas):
ip6_dst = grpc_if2.ip6
e.link_info = LinkInfo(
canvas=self,
edge_id=e.id,
edge=e,
ip4_src=ip4_src,
ip6_src=ip6_src,
ip4_dst=ip4_dst,
ip6_dst=ip6_dst,
throughput=None,
)
# TODO will include throughput and ipv6 in the future
@ -213,7 +267,6 @@ class CanvasGraph(tk.Canvas):
self.mode = GraphMode.NODE
def handle_edge_release(self, event):
print("Calling edge release")
edge = self.drawing_edge
self.drawing_edge = None
@ -253,14 +306,19 @@ class CanvasGraph(tk.Canvas):
# draw link info on the edge
if1 = self.grpc_manager.edges[edge.token].interface_1
if2 = self.grpc_manager.edges[edge.token].interface_2
ip4_and_prefix_1 = None
ip4_and_prefix_2 = None
if if1 is not None:
ip4_and_prefix_1 = if1.ip4_and_prefix
if if2 is not None:
ip4_and_prefix_2 = if2.ip4_and_prefix
edge.link_info = LinkInfo(
self,
edge.id,
ip4_src=if1.ip4_and_prefix,
edge,
ip4_src=ip4_and_prefix_1,
ip6_src=None,
ip4_dst=if2.ip4_and_prefix,
ip4_dst=ip4_and_prefix_2,
ip6_dst=None,
throughput=None,
)
logging.debug(f"edges: {self.find_withtag('edge')}")
@ -272,7 +330,6 @@ class CanvasGraph(tk.Canvas):
:param event: mouse event
:return: nothing
"""
print("click on the canvas")
logging.debug(f"click press: {event}")
selected = self.get_selected(event)
is_node = selected in self.find_withtag("node")
@ -302,7 +359,12 @@ class CanvasGraph(tk.Canvas):
def add_node(self, x, y, image, node_name):
if self.selected == 1:
node = CanvasNode(
x=x, y=y, image=image, canvas=self, core_id=self.grpc_manager.peek_id()
x=x,
y=y,
image=image,
node_type=node_name,
canvas=self,
core_id=self.grpc_manager.peek_id(),
)
self.nodes[node.id] = node
self.grpc_manager.add_node(
@ -316,9 +378,9 @@ class CanvasEdge:
Canvas edge class
"""
width = 1.3
width = 1.4
def __init__(self, x1, y1, x2, y2, src, canvas):
def __init__(self, x1, y1, x2, y2, src, canvas, is_wired=None):
"""
Create an instance of canvas edge object
:param int x1: source x-coord
@ -331,13 +393,28 @@ class CanvasEdge:
self.src = src
self.dst = None
self.canvas = canvas
self.id = self.canvas.create_line(
x1, y1, x2, y2, tags="edge", width=self.width, fill="#ff0000"
)
if is_wired is None or is_wired is True:
self.id = self.canvas.create_line(
x1, y1, x2, y2, tags="edge", width=self.width, fill="#ff0000"
)
else:
self.id = self.canvas.create_line(
x1,
y1,
x2,
y2,
tags="edge",
width=self.width,
fill="#ff0000",
state=tk.HIDDEN,
)
self.token = None
# link info object
self.link_info = None
self.throughput = None
self.wired = is_wired
# TODO resolve this
# self.canvas.tag_lower(self.id)
@ -346,6 +423,7 @@ class CanvasEdge:
self.token = tuple(sorted((self.src, self.dst)))
x1, y1, _, _ = self.canvas.coords(self.id)
self.canvas.coords(self.id, x1, y1, x, y)
self.canvas.helper.draw_wireless_case(self.src, self.dst, self)
self.canvas.lift(self.src)
self.canvas.lift(self.dst)
@ -354,8 +432,9 @@ class CanvasEdge:
class CanvasNode:
def __init__(self, x, y, image, canvas, core_id):
def __init__(self, x, y, image, node_type, canvas, core_id):
self.image = image
self.node_type = node_type
self.canvas = canvas
self.id = self.canvas.create_image(
x, y, anchor=tk.CENTER, image=self.image, tags="node"
@ -367,6 +446,8 @@ class CanvasNode:
self.text_id = self.canvas.create_text(
x, y + 20, text=self.name, tags="nodename"
)
self.antenna_draw = WlanAntennaManager(self.canvas, self.id)
self.canvas.tag_bind(self.id, "<ButtonPress-1>", self.click_press)
self.canvas.tag_bind(self.id, "<ButtonRelease-1>", self.click_release)
self.canvas.tag_bind(self.id, "<B1-Motion>", self.motion)
@ -374,6 +455,7 @@ class CanvasNode:
self.canvas.tag_bind(self.id, "<Double-Button-1>", self.double_click)
self.edges = set()
self.wlans = []
self.moving = None
def double_click(self, event):
@ -386,7 +468,6 @@ class CanvasNode:
self.x_coord, self.y_coord = self.canvas.coords(self.id)
def click_press(self, event):
print("click on the node")
logging.debug(f"click press {self.name}: {event}")
self.moving = self.canvas.canvas_xy(event)
# return "break"
@ -410,7 +491,13 @@ class CanvasNode:
old_x, old_y = self.canvas.coords(self.id)
self.canvas.move(self.id, offset_x, offset_y)
self.canvas.move(self.text_id, offset_x, offset_y)
self.antenna_draw.update_antennas_position(offset_x, offset_y)
new_x, new_y = self.canvas.coords(self.id)
if self.canvas.core_grpc.get_session_state() == core_pb2.SessionState.RUNTIME:
self.canvas.core_grpc.edit_node(self.core_id, int(new_x), int(new_y))
for edge in self.edges:
x1, y1, x2, y2 = self.canvas.coords(edge.id)
if x1 == old_x and y1 == old_y:
@ -418,63 +505,11 @@ class CanvasNode:
else:
self.canvas.coords(edge.id, x1, y1, new_x, new_y)
edge.link_info.recalculate_info()
self.canvas.core_grpc.throughput_draw.update_throughtput_location(edge)
self.canvas.helper.update_wlan_connection(
old_x, old_y, new_x, new_y, self.wlans
)
def context(self, event):
logging.debug(f"context click {self.name}: {event}")
class LinkInfo:
def __init__(self, canvas, edge_id, ip4_src, ip6_src, ip4_dst, ip6_dst, throughput):
self.canvas = canvas
self.edge_id = edge_id
self.radius = 37
self.ip4_address_1 = ip4_src
self.ip6_address_1 = ip6_src
self.ip4_address_2 = ip4_dst
self.ip6_address_2 = ip6_dst
self.throughput = throughput
self.id1 = self.create_edge_src_info()
self.id2 = self.create_edge_dst_info()
def slope_src_dst(self):
x1, y1, x2, y2 = self.canvas.coords(self.edge_id)
return (y2 - y1) / (x2 - x1)
def create_edge_src_info(self):
x1, y1, x2, _ = self.canvas.coords(self.edge_id)
m = self.slope_src_dst()
distance = math.cos(math.atan(m)) * self.radius
if x1 > x2:
distance = -distance
# id1 = self.canvas.create_text(x1, y1, text=self.ip4_address_1)
print(self.ip4_address_1)
id1 = self.canvas.create_text(
x1 + distance, y1 + distance * m, text=self.ip4_address_1, tags="linkinfo"
)
return id1
def create_edge_dst_info(self):
x1, _, x2, y2 = self.canvas.coords(self.edge_id)
m = self.slope_src_dst()
distance = math.cos(math.atan(m)) * self.radius
if x1 > x2:
distance = -distance
# id2 = self.canvas.create_text(x2, y2, text=self.ip4_address_2)
id2 = self.canvas.create_text(
x2 - distance, y2 - distance * m, text=self.ip4_address_2, tags="linkinfo"
)
return id2
def recalculate_info(self):
x1, y1, x2, y2 = self.canvas.coords(self.edge_id)
m = self.slope_src_dst()
distance = math.cos(math.atan(m)) * self.radius
if x1 > x2:
distance = -distance
new_x1 = x1 + distance
new_y1 = y1 + distance * m
new_x2 = x2 - distance
new_y2 = y2 - distance * m
self.canvas.coords(self.id1, new_x1, new_y1)
self.canvas.coords(self.id2, new_x2, new_y2)

View file

@ -0,0 +1,198 @@
"""
Some graph helper functions
"""
import logging
import tkinter as tk
from core.api.grpc import core_pb2
from coretk.images import ImageEnum, Images
class GraphHelper:
def __init__(self, canvas):
"""
create an instance of GraphHelper object
"""
self.canvas = canvas
def draw_wireless_case(self, src_id, dst_id, edge):
src_node_name = self.canvas.nodes[src_id].node_type
dst_node_name = self.canvas.nodes[dst_id].node_type
if src_node_name == "wlan" or dst_node_name == "wlan":
self.canvas.itemconfig(edge.id, state=tk.HIDDEN)
edge.wired = False
if edge.token not in self.canvas.edges:
if src_node_name == "wlan" and dst_node_name == "wlan":
self.canvas.nodes[src_id].antenna_draw.add_antenna()
elif src_node_name == "wlan":
self.canvas.nodes[dst_id].antenna_draw.add_antenna()
else:
self.canvas.nodes[src_id].antenna_draw.add_antenna()
edge.wired = True
def redraw_antenna(self, link, node_one, node_two):
if link.type == core_pb2.LinkType.WIRELESS:
if node_one.node_type == "wlan" and node_two.node_type == "wlan":
node_one.antenna_draw.add_antenna()
elif node_one.node_type == "wlan" and node_two.node_type != "wlan":
node_two.antenna_draw.add_antenna()
elif node_one.node_type != "wlan" and node_two.node_type == "wlan":
node_one.antenna_draw.add_antenna()
else:
logging.error(
"graph_helper.py WIRELESS link but both nodes are non-wireless node"
)
def update_wlan_connection(self, old_x, old_y, new_x, new_y, edge_ids):
for eid in edge_ids:
x1, y1, x2, y2 = self.canvas.coords(eid)
if x1 == old_x and y1 == old_y:
self.canvas.coords(eid, new_x, new_y, x2, y2)
else:
self.canvas.coords(eid, x1, y1, new_x, new_y)
class WlanAntennaManager:
def __init__(self, canvas, node_id):
"""
crate an instance for AntennaManager
"""
self.canvas = canvas
self.node_id = node_id
self.quantity = 0
self._max = 5
self.antennas = []
# distance between each antenna
self.offset = 0
def add_antenna(self):
"""
add an antenna to a node
:return: nothing
"""
if self.quantity < 5:
x, y = self.canvas.coords(self.node_id)
self.antennas.append(
self.canvas.create_image(
x - 16 + self.offset,
y - 16,
anchor=tk.CENTER,
image=Images.get(ImageEnum.ANTENNA.value),
tags="antenna",
)
)
self.quantity = self.quantity + 1
self.offset = self.offset + 8
def update_antennas_position(self, offset_x, offset_y):
"""
redraw antennas of a node according to the new node position
:return: nothing
"""
for i in self.antennas:
self.canvas.move(i, offset_x, offset_y)
def delete_antenna(self, canvas_id):
return
def delete_antennas(self):
"""
Delete all the antennas of a node
:return: nothing
"""
for i in self.antennas:
self.canvas.delete(i)
# class WlanConnection:
# def __init__(self, canvas, grpc):
# """
# create in
# :param canvas:
# """
# self.canvas = canvas
# self.core_grpc = grpc
# self.throughput_on = False
# self.map_node_link = {}
# self.links = []
#
# def wireless_nodes(self):
# """
# retrieve all the wireless clouds in the canvas
#
# :return: list(coretk.graph.CanvasNode)
# """
# wireless_nodes = []
# for n in self.canvas.nodes.values():
# if n.node_type == "wlan":
# wireless_nodes.append(n)
# return wireless_nodes
#
# def draw_wireless_link(self, src, dst):
# """
# draw a line between 2 nodes that are connected to the same wireless cloud
#
# :param coretk.graph.CanvasNode src: source node
# :param coretk.graph.CanvasNode dst: destination node
# :return: nothing
# """
# cid = self.canvas.create_line(src.x_coord, src.y_coord, dst.x_coord, dst.y_coord, tags="wlanconnection")
# if src.id not in self.map_node_link:
# self.map_node_link[src.id] = []
# if dst.id not in self.map_node_link:
# self.map_node_link[dst.id] = []
# self.map_node_link[src.id].append(cid)
# self.map_node_link[dst.id].append(cid)
# self.links.append(cid)
#
# def subnet_wireless_connection(self, wlan_node):
# """
# retrieve all the non-wireless nodes connected to wireless_node and create line (represent wireless connection) between each pair of nodes
# :param coretk.grpah.CanvasNode wlan_node: wireless node
#
# :return: nothing
# """
# non_wlan_nodes = []
# for e in wlan_node.edges:
# src = self.canvas.nodes[e.src]
# dst = self.canvas.nodes[e.dst]
# if src.node_type == "wlan" and dst.node_type != "wlan":
# non_wlan_nodes.append(dst)
# elif src.node_type != "wlan" and dst.node_type == "wlan":
# non_wlan_nodes.append(src)
#
# size = len(non_wlan_nodes)
# for i in range(size):
# for j in range(i+1, size):
# self.draw_wireless_link(non_wlan_nodes[i], non_wlan_nodes[j])
#
# def session_wireless_connection(self):
# """
# draw all the wireless connection in the canvas
#
# :return: nothing
# """
# wlan_nodes = self.wireless_nodes()
# for n in wlan_nodes:
# self.subnet_wireless_connection(n)
#
# def show_links(self):
# """
# show all the links
# """
# for l in self.links:
# self.canvas.itemconfig(l, state=tk.NORMAL)
#
# def hide_links(self):
# """
# hide all the links
# :return:
# """
# for l in self.links:
# self.canvas.itemconfig(l, state=tk.HIDDEN)

View file

@ -5,6 +5,7 @@ that can be useful for grpc, acts like a session class
import logging
from core.api.grpc import core_pb2
from coretk.coretocanvas import CoreToCanvasMapping
from coretk.interface import Interface, InterfaceManager
link_layer_nodes = ["switch", "hub", "wlan", "rj45", "tunnel"]
@ -61,8 +62,13 @@ class GrpcManager:
self.reusable = []
self.preexisting = []
# self.core_id_to_canvas_id = {}
self.interfaces_manager = InterfaceManager()
# map tuple(core_node_id, interface_id) to and edge
# self.node_id_and_interface_to_edge_token = {}
self.core_mapping = CoreToCanvasMapping()
def peek_id(self):
"""
Peek the next id to be used
@ -117,8 +123,11 @@ class GrpcManager:
node_model = name
else:
logging.error("grpcmanagemeny.py INVALID node name")
create_node = Node(session_id, self.get_id(), node_type, node_model, x, y, name)
nid = self.get_id()
create_node = Node(session_id, nid, node_type, node_model, x, y, name)
self.nodes[canvas_id] = create_node
self.core_mapping.map_core_id_to_canvas_id(nid, canvas_id)
# self.core_id_to_canvas_id[nid] = canvas_id
logging.debug(
"Adding node to GrpcManager.. Session id: %s, Coords: (%s, %s), Name: %s",
session_id,
@ -219,7 +228,7 @@ class GrpcManager:
dst_node = self.nodes[dst_canvas_id]
if dst_node.model in network_layer_nodes:
ifid = len(dst_node.interfaces)
name = "veth" + str(ifid)
name = "eth" + str(ifid)
dst_interface = Interface(
name=name, ifid=ifid, ipv4=str(self.interfaces_manager.get_address())
)
@ -232,6 +241,7 @@ class GrpcManager:
edge.interface_1 = src_interface
edge.interface_2 = dst_interface
return src_interface, dst_interface
def add_edge(self, session_id, token, canvas_id_1, canvas_id_2):
"""
@ -253,7 +263,37 @@ class GrpcManager:
self.nodes[canvas_id_2].type,
)
self.edges[token] = edge
self.create_interface(edge, canvas_id_1, canvas_id_2)
src_interface, dst_interface = self.create_interface(
edge, canvas_id_1, canvas_id_2
)
node_one_id = self.nodes[canvas_id_1].node_id
node_two_id = self.nodes[canvas_id_2].node_id
# provide a way to get an edge from a core node and an interface id
if src_interface is not None:
# self.node_id_and_interface_to_edge_token[tuple([node_one_id, src_interface.id])] = token
self.core_mapping.map_node_and_interface_to_canvas_edge(
node_one_id, src_interface.id, token
)
logging.debug(
"map node id %s, interface_id %s to edge token %s",
node_one_id,
src_interface.id,
token,
)
if dst_interface is not None:
# self.node_id_and_interface_to_edge_token[tuple([node_two_id, dst_interface.id])] = token
self.core_mapping.map_node_and_interface_to_canvas_edge(
node_two_id, dst_interface.id, token
)
logging.debug(
"map node id %s, interface_id %s to edge token %s",
node_two_id,
dst_interface.id,
token,
)
logging.debug("Adding edge to grpc manager...")
else:
logging.error("grpcmanagement.py INVALID CANVAS NODE ID")

Binary file not shown.

After

Width:  |  Height:  |  Size: 230 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1,006 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

View file

@ -87,6 +87,10 @@ class ImageEnum(Enum):
STOP = "stop"
OBSERVE = "observe"
RUN = "run"
DOCUMENTNEW = "document-new"
FILEOPEN = "fileopen"
EDITDELETE = "edit-delete"
ANTENNA = "antenna"
def load_core_images(images):

View file

@ -20,6 +20,11 @@ class Interface:
self.id = ifid
def random_mac_address(self):
"""
create a random MAC address for an interface
:return: nothing
"""
return "02:00:00:%02x:%02x:%02x" % (
random.randint(0, 255),
random.randint(0, 255),

134
coretk/coretk/linkinfo.py Normal file
View file

@ -0,0 +1,134 @@
"""
Link information, such as IPv4, IPv6 and throughput drawn in the canvas
"""
import math
class LinkInfo:
def __init__(self, canvas, edge, ip4_src, ip6_src, ip4_dst, ip6_dst):
self.canvas = canvas
self.edge = edge
# self.edge_id = edge.id
self.radius = 37
self.core_grpc = self.canvas.core_grpc
self.ip4_address_1 = ip4_src
self.ip6_address_1 = ip6_src
self.ip4_address_2 = ip4_dst
self.ip6_address_2 = ip6_dst
self.id1 = self.create_edge_src_info()
self.id2 = self.create_edge_dst_info()
def slope_src_dst(self):
x1, y1, x2, y2 = self.canvas.coords(self.edge.id)
if x2 - x1 == 0:
return 9999
else:
return (y2 - y1) / (x2 - x1)
def create_edge_src_info(self):
x1, y1, x2, _ = self.canvas.coords(self.edge.id)
m = self.slope_src_dst()
distance = math.cos(math.atan(m)) * self.radius
if x1 > x2:
distance = -distance
# id1 = self.canvas.create_text(x1, y1, text=self.ip4_address_1)
id1 = self.canvas.create_text(
x1 + distance, y1 + distance * m, text=self.ip4_address_1, tags="linkinfo"
)
return id1
def create_edge_dst_info(self):
x1, _, x2, y2 = self.canvas.coords(self.edge.id)
m = self.slope_src_dst()
distance = math.cos(math.atan(m)) * self.radius
if x1 > x2:
distance = -distance
# id2 = self.canvas.create_text(x2, y2, text=self.ip4_address_2)
id2 = self.canvas.create_text(
x2 - distance, y2 - distance * m, text=self.ip4_address_2, tags="linkinfo"
)
return id2
def recalculate_info(self):
x1, y1, x2, y2 = self.canvas.coords(self.edge.id)
m = self.slope_src_dst()
distance = math.cos(math.atan(m)) * self.radius
if x1 > x2:
distance = -distance
new_x1 = x1 + distance
new_y1 = y1 + distance * m
new_x2 = x2 - distance
new_y2 = y2 - distance * m
self.canvas.coords(self.id1, new_x1, new_y1)
self.canvas.coords(self.id2, new_x2, new_y2)
# def link_througput(self):
# x1, y1, x2, y2 = self.canvas.coords(self.edge.id)
# x = (x1 + x2) / 2
# y = (y1 + y2) / 2
# tid = self.canvas.create_text(x, y, text="place text here")
# return tid
class Throughput:
def __init__(self, canvas, grpc):
self.canvas = canvas
self.core_grpc = grpc
self.grpc_manager = canvas.grpc_manager
# edge canvas id mapped to throughput value
self.tracker = {}
# map an edge canvas id to a throughput canvas id
self.map = {}
def load_throughput_info(self, interface_throughputs):
"""
load all interface throughouts from an event
:param repeated core_bp2.InterfaceThroughputinterface_throughputs: interface throughputs
:return: nothing
"""
for t in interface_throughputs:
nid = t.node_id
iid = t.interface_id
tp = t.throughput
# token = self.grpc_manager.node_id_and_interface_to_edge_token[nid, iid]
token = self.grpc_manager.core_mapping.get_token_from_node_and_interface(
nid, iid
)
edge_id = self.canvas.edges[token].id
if edge_id not in self.tracker:
self.tracker[edge_id] = tp
else:
temp = self.tracker[edge_id]
self.tracker[edge_id] = (temp + tp) / 2
def draw_throughputs(self):
for edge_id in self.tracker:
x1, y1, x2, y2 = self.canvas.coords(edge_id)
x = (x1 + x2) / 2
y = (y1 + y2) / 2
if edge_id not in self.map:
tp_id = self.canvas.create_text(
x, y, text="{0:.3f} kbps".format(0.001 * self.tracker[edge_id])
)
self.map[edge_id] = tp_id
else:
self.canvas.itemconfig(
self.map[edge_id],
text="{0:.3f} kbps".format(0.001 * self.tracker[edge_id]),
)
def process_grpc_throughput_event(self, interface_throughputs):
self.load_throughput_info(interface_throughputs)
self.draw_throughputs()
def update_throughtput_location(self, edge):
tp_id = self.map[edge.id]
x1, y1 = self.canvas.coords(edge.src)
x2, y2 = self.canvas.coords(edge.dst)
x = (x1 + x2) / 2
y = (y1 + y2) / 2
self.canvas.coords(tp_id, x, y)

View file

@ -3,12 +3,10 @@ The actions taken when each menubar option is clicked
"""
import logging
import tkinter as tk
import webbrowser
from tkinter import messagebox
from tkinter import filedialog, messagebox
from core.api.grpc import core_pb2
from coretk.coregrpc import CoreGrpc
SAVEDIR = "/home/ncs/Desktop/"
@ -317,14 +315,6 @@ def session_options():
logging.debug("Click session options")
# def help_core_github():
# webbrowser.open_new("https://github.com/coreemu/core")
#
#
# def help_core_documentation():
# webbrowser.open_new("http://coreemu.github.io/core/")
def help_about():
logging.debug("Click help About")
@ -367,7 +357,8 @@ class MenuAction:
grpc.delete_links()
grpc.delete_nodes()
grpc.delete_session()
# else:
# grpc.set_session_state("definition")
grpc.core.close()
# self.application.quit()
@ -384,35 +375,43 @@ class MenuAction:
def file_save_as_xml(self):
logging.info("menuaction.py file_save_as_xml()")
grpc = self.application.core_grpc
file_path = tk.filedialog.asksaveasfilename(
file_path = filedialog.asksaveasfilename(
initialdir=SAVEDIR,
title="Save As",
filetypes=(("EmulationScript XML files", "*.xml"), ("All files", "*")),
defaultextension=".xml",
)
with open("prev_saved_xml.txt", "a") as file:
file.write(file_path + "\n")
# with open("prev_saved_xml.txt", "a") as file:
# file.write(file_path + "\n")
grpc.save_xml(file_path)
def file_open_xml(self):
logging.info("menuaction.py file_open_xml()")
# grpc = self.application.core_grpc
file_path = tk.filedialog.askopenfilename(
file_path = filedialog.askopenfilename(
initialdir=SAVEDIR,
title="Open",
filetypes=(("EmulationScript XML File", "*.xml"), ("All Files", "*")),
)
# clean up before openning a new session
# t0 = time.clock()
# clean up before opening a new session
self.clean_nodes_links_and_set_configuarations()
grpc = CoreGrpc(self.application.master)
grpc.core.connect()
session_id = grpc.open_xml(file_path)
grpc.session_id = session_id
self.application.core_grpc = grpc
self.application.canvas.core_grpc = grpc
self.application.canvas.delete_components()
self.application.canvas.draw_existing_component()
# grpc = CoreGrpc(self.application.master)
# grpc.core.connect()
core_grpc = self.application.core_grpc
core_grpc.core.connect()
# session_id = core_grpc.open_xml(file_path)
# core_grpc.session_id = session_id
core_grpc.open_xml(file_path)
# print("Print session state")
# print(grpc.get_session_state())
self.application.canvas.canvas_reset_and_redraw(core_grpc)
# Todo might not need
self.application.core_grpc = core_grpc
self.application.core_editbar.destroy_children_widgets()
self.application.core_editbar.create_runtime_toolbar()
# self.application.canvas.draw_existing_component()
# t1 = time.clock()
# print(t1 - t0)

View file

@ -1,5 +1,2 @@
/home/ncs/Desktop/random.xml/home/ncs/Desktop/untitled.xml/home/ncs/Desktop/test.xml
/home/ncs/Desktop/test1.xml
/home/ncs/Desktop/untitled.xml
/home/ncs/Desktop/test1.xml
/home/ncs/Desktop/runningtest.xml
/home/ncs/Desktop/notrunning.xml

View file

@ -0,0 +1,174 @@
import logging
import tkinter as tk
from tkinter.ttk import Scrollbar, Treeview
from coretk.images import ImageEnum, Images
class SessionTable:
def __init__(self, grpc, master):
self.grpc = grpc
self.selected = False
self.selected_sid = None
self.master = master
self.top = tk.Toplevel(self.master)
self.description_definition()
self.top.title("CORE sessions")
self.tree = Treeview(self.top)
# self.tree.pack(side=tk.TOP)
self.tree.grid(row=1, column=0, columnspan=2)
self.draw_scrollbar()
self.draw()
def description_definition(self):
lable = tk.Label(
self.top,
text="Below is a list of active CORE sessions. Double-click to "
"\nconnect to an existing session. Usually, only sessions in "
"\nthe RUNTIME state persist in the daemon, except for the "
"\none you might be concurrently editting.",
)
lable.grid(sticky=tk.W)
def column_definition(self):
# self.tree["columns"] = ("name", "nodecount", "filename", "date")
self.tree["columns"] = "nodecount"
self.tree.column("#0", width=300, minwidth=30)
# self.tree.column("name", width=72, miwidth=30)
self.tree.column("nodecount", width=300, minwidth=30)
# self.tree.column("filename", width=92, minwidth=30)
# self.tree.column("date", width=170, minwidth=30)
def draw_scrollbar(self):
yscrollbar = Scrollbar(self.top, orient="vertical", command=self.tree.yview)
yscrollbar.grid(row=1, column=3, sticky=tk.N + tk.S + tk.W)
self.tree.configure(yscrollcommand=yscrollbar.set)
xscrollbar = Scrollbar(self.top, orient="horizontal", command=self.tree.xview)
xscrollbar.grid(row=2, columnspan=2, sticky=tk.E + tk.W + tk.S)
self.tree.configure(xscrollcommand=xscrollbar.set)
def heading_definition(self):
self.tree.heading("#0", text="ID", anchor=tk.W)
# self.tree.heading("name", text="Name", anchor=tk.CENTER)
self.tree.heading("nodecount", text="Node Count", anchor=tk.W)
# self.tree.heading("filename", text="Filename", anchor=tk.CENTER)
# self.tree.heading("date", text="Date", anchor=tk.CENTER)
def enter_session(self, sid):
self.top.destroy()
response = self.grpc.core.get_session(sid)
self.grpc.session_id = sid
self.grpc.core.events(sid, self.grpc.log_event)
logging.info("Entering session_id %s.... Result: %s", sid, response)
def new_session(self):
self.top.destroy()
self.grpc.create_new_session()
def on_selected(self, event):
item = self.tree.selection()
sid = int(self.tree.item(item, "text"))
self.enter_session(sid)
def click_select(self, event):
# logging.debug("Click on %s ", event)
item = self.tree.selection()
sid = int(self.tree.item(item, "text"))
self.selected = True
self.selected_sid = sid
def session_definition(self):
response = self.grpc.core.get_sessions()
# logging.info("querysessiondrawing.py Get all sessions %s", response)
index = 1
for session in response.sessions:
self.tree.insert(
"", index, None, text=str(session.id), values=(str(session.nodes))
)
index = index + 1
self.tree.bind("<Double-1>", self.on_selected)
self.tree.bind("<<TreeviewSelect>>", self.click_select)
def click_connect(self):
"""
if no session is selected yet, create a new one else join that session
:return: nothing
"""
if self.selected and self.selected_sid is not None:
self.enter_session(self.selected_sid)
elif not self.selected and self.selected_sid is None:
self.new_session()
else:
logging.error("querysessiondrawing.py invalid state")
def shutdown_session(self, sid):
self.grpc.terminate_session(sid)
self.new_session()
self.top.destroy()
def click_shutdown(self):
"""
if no session is currently selected create a new session else shut the selected session down
:return: nothing
"""
if self.selected and self.selected_sid is not None:
self.shutdown_session(self.selected_sid)
elif not self.selected and self.selected_sid is None:
self.new_session()
else:
logging.error("querysessiondrawing.py invalid state")
# if self.selected and self.selected_sid is not None:
def draw_buttons(self):
f = tk.Frame(self.top)
f.grid(row=3, sticky=tk.W)
b = tk.Button(
f,
image=Images.get(ImageEnum.DOCUMENTNEW.value),
text="New",
compound=tk.LEFT,
command=self.new_session,
)
b.pack(side=tk.LEFT, padx=3, pady=4)
b = tk.Button(
f,
image=Images.get(ImageEnum.FILEOPEN.value),
text="Connect",
compound=tk.LEFT,
command=self.click_connect,
)
b.pack(side=tk.LEFT, padx=3, pady=4)
b = tk.Button(
f,
image=Images.get(ImageEnum.EDITDELETE.value),
text="Shutdown",
compound=tk.LEFT,
command=self.click_shutdown,
)
b.pack(side=tk.LEFT, padx=3, pady=4)
b = tk.Button(f, text="Cancel", command=self.new_session)
b.pack(side=tk.LEFT, padx=3, pady=4)
def center(self):
window_width = self.master.winfo_width()
window_height = self.master.winfo_height()
self.top.update()
top_level_width = self.top.winfo_width()
top_level_height = self.top.winfo_height()
x = window_width / 2 - top_level_width / 2
y = window_height / 2 - top_level_height / 2
self.top.geometry("+%d+%d" % (x, y))
def draw(self):
self.column_definition()
self.heading_definition()
self.session_definition()
self.draw_buttons()
self.center()
self.top.wait_window()

View file

@ -0,0 +1,53 @@
"""
Wireless connection handler
"""
from core.api.grpc import core_pb2
class WirelessConnection:
def __init__(self, canvas, grpc):
self.canvas = canvas
self.core_grpc = grpc
self.core_mapping = canvas.grpc_manager.core_mapping
# map a (node_one_id, node_two_id) to a wlan canvas id
self.map = {}
def add_wlan_connection(self, node_one_id, node_two_id):
canvas_id_one = self.core_mapping.get_canvas_id_from_core_id(node_one_id)
canvas_id_two = self.core_mapping.get_canvas_id_from_core_id(node_two_id)
key = tuple(sorted((node_one_id, node_two_id)))
if key not in self.map:
x1, y1 = self.canvas.coords(canvas_id_one)
x2, y2 = self.canvas.coords(canvas_id_two)
wlan_canvas_id = self.canvas.create_line(
x1, y1, x2, y2, fill="#009933", tags="wlan", width=1.5
)
self.map[key] = wlan_canvas_id
self.canvas.nodes[canvas_id_one].wlans.append(wlan_canvas_id)
self.canvas.nodes[canvas_id_two].wlans.append(wlan_canvas_id)
def delete_wlan_connection(self, node_one_id, node_two_id):
canvas_id_one = self.core_mapping.get_canvas_id_from_core_id(node_one_id)
canvas_id_two = self.core_mapping.get_canvas_id_from_core_id(node_two_id)
key = tuple(sorted((node_one_id, node_two_id)))
wlan_canvas_id = self.map[key]
self.canvas.nodes[canvas_id_one].wlans.remove(wlan_canvas_id)
self.canvas.nodes[canvas_id_two].wlans.remove(wlan_canvas_id)
self.canvas.delete(wlan_canvas_id)
self.map.pop(key, None)
def hangle_link_event(self, link_event):
if link_event.message_type == core_pb2.MessageType.ADD:
self.add_wlan_connection(
link_event.link.node_one_id, link_event.link.node_two_id
)
if link_event.message_type == core_pb2.MessageType.DELETE:
self.delete_wlan_connection(
link_event.link.node_one_id, link_event.link.node_two_id
)