Merge pull request #343 from coreemu/coregui-typehint

Coregui typehint
This commit is contained in:
bharnden 2020-01-15 13:39:03 -08:00 committed by GitHub
commit 7e50dbdc65
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
49 changed files with 1204 additions and 530 deletions

View file

@ -17,7 +17,7 @@ HEIGHT = 800
class Application(tk.Frame):
def __init__(self, proxy):
def __init__(self, proxy: bool):
super().__init__(master=None)
# load node icons
NodeUtils.setup()

View file

@ -5,6 +5,7 @@ import json
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List
import grpc
@ -14,11 +15,16 @@ from core.gui.dialogs.mobilityplayer import MobilityPlayer
from core.gui.dialogs.sessions import SessionsDialog
from core.gui.errors import show_grpc_error
from core.gui.graph import tags
from core.gui.graph.edges import CanvasEdge
from core.gui.graph.node import CanvasNode
from core.gui.graph.shape import AnnotationData, Shape
from core.gui.graph.shapeutils import ShapeType
from core.gui.interface import InterfaceManager
from core.gui.nodeutils import NodeDraw, NodeUtils
if TYPE_CHECKING:
from core.gui.app import Application
GUI_SOURCE = "gui"
OBSERVERS = {
"processes": "ps",
@ -34,20 +40,20 @@ OBSERVERS = {
class CoreServer:
def __init__(self, name, address, port):
def __init__(self, name: str, address: str, port: int):
self.name = name
self.address = address
self.port = port
class Observer:
def __init__(self, name, cmd):
def __init__(self, name: str, cmd: str):
self.name = name
self.cmd = cmd
class CoreClient:
def __init__(self, app, proxy):
def __init__(self, app: "Application", proxy: bool):
"""
Create a CoreGrpc instance
"""
@ -110,7 +116,7 @@ class CoreClient:
self.handling_events.cancel()
self.handling_events = None
def set_observer(self, value):
def set_observer(self, value: str):
self.observer = value
def read_config(self):
@ -132,7 +138,7 @@ class CoreClient:
observer = Observer(config["name"], config["cmd"])
self.custom_observers[observer.name] = observer
def handle_events(self, event):
def handle_events(self, event: core_pb2.Event):
if event.session_id != self.session_id:
logging.warning(
"ignoring event session(%s) current(%s)",
@ -170,7 +176,7 @@ class CoreClient:
else:
logging.info("unhandled event: %s", event)
def handle_link_event(self, event):
def handle_link_event(self, event: core_pb2.LinkEvent):
node_one_id = event.link.node_one_id
node_two_id = event.link.node_two_id
canvas_node_one = self.canvas_nodes[node_one_id]
@ -183,7 +189,7 @@ class CoreClient:
else:
logging.warning("unknown link event: %s", event.message_type)
def handle_node_event(self, event):
def handle_node_event(self, event: core_pb2.NodeEvent):
if event.source == GUI_SOURCE:
return
node_id = event.node.id
@ -201,7 +207,7 @@ class CoreClient:
self.handling_throughputs.cancel()
self.handling_throughputs = None
def handle_throughputs(self, event):
def handle_throughputs(self, event: core_pb2.ThroughputsEvent):
if event.session_id != self.session_id:
logging.warning(
"ignoring throughput event session(%s) current(%s)",
@ -212,11 +218,11 @@ class CoreClient:
logging.info("handling throughputs event: %s", event)
self.app.canvas.set_throughputs(event)
def handle_exception_event(self, event):
def handle_exception_event(self, event: core_pb2.ExceptionEvent):
logging.info("exception event: %s", event)
self.app.statusbar.core_alarms.append(event)
def join_session(self, session_id, query_location=True):
def join_session(self, session_id: int, query_location: bool = True):
# update session and title
self.session_id = session_id
self.master.title(f"CORE Session({self.session_id})")
@ -297,10 +303,10 @@ class CoreClient:
# update ui to represent current state
self.app.after(0, self.app.joined_session_update)
def is_runtime(self):
def is_runtime(self) -> bool:
return self.state == core_pb2.SessionState.RUNTIME
def parse_metadata(self, config):
def parse_metadata(self, config: Dict[str, str]):
# canvas setting
canvas_config = config.get("canvas")
logging.info("canvas metadata: %s", canvas_config)
@ -364,8 +370,6 @@ class CoreClient:
def create_new_session(self):
"""
Create a new session
:return: nothing
"""
try:
response = self.client.create_session()
@ -384,7 +388,7 @@ class CoreClient:
except grpc.RpcError as e:
self.app.after(0, show_grpc_error, e)
def delete_session(self, session_id=None):
def delete_session(self, session_id: int = None):
if session_id is None:
session_id = self.session_id
try:
@ -396,8 +400,6 @@ class CoreClient:
def set_up(self):
"""
Query sessions, if there exist any, prompt whether to join one
:return: existing sessions
"""
try:
self.client.connect()
@ -426,7 +428,7 @@ class CoreClient:
self.app.after(0, show_grpc_error, e)
self.app.close()
def edit_node(self, core_node):
def edit_node(self, core_node: core_pb2.Node):
try:
self.client.edit_node(
self.session_id, core_node.id, core_node.position, source=GUI_SOURCE
@ -434,7 +436,7 @@ class CoreClient:
except grpc.RpcError as e:
self.app.after(0, show_grpc_error, e)
def start_session(self):
def start_session(self) -> core_pb2.StartSessionResponse:
nodes = [x.core_node for x in self.canvas_nodes.values()]
links = [x.link for x in self.links.values()]
wlan_configs = self.get_wlan_configs_proto()
@ -477,7 +479,7 @@ class CoreClient:
self.app.after(0, show_grpc_error, e)
return response
def stop_session(self, session_id=None):
def stop_session(self, session_id: int = None) -> core_pb2.StartSessionResponse:
if not session_id:
session_id = self.session_id
response = core_pb2.StopSessionResponse(result=False)
@ -519,7 +521,7 @@ class CoreClient:
response = self.client.set_session_metadata(self.session_id, metadata)
logging.info("set session metadata: %s", response)
def launch_terminal(self, node_id):
def launch_terminal(self, node_id: int):
try:
terminal = self.app.guiconfig["preferences"]["terminal"]
response = self.client.get_node_terminal(self.session_id, node_id)
@ -528,12 +530,9 @@ class CoreClient:
except grpc.RpcError as e:
self.app.after(0, show_grpc_error, e)
def save_xml(self, file_path):
def save_xml(self, file_path: str):
"""
Save core session as to an xml file
:param str file_path: file path that user pick
:return: nothing
"""
try:
if self.state != core_pb2.SessionState.RUNTIME:
@ -546,12 +545,9 @@ class CoreClient:
except grpc.RpcError as e:
self.app.after(0, show_grpc_error, e)
def open_xml(self, file_path):
def open_xml(self, file_path: str):
"""
Open core xml
:param str file_path: file to open
:return: session id
"""
try:
response = self.client.open_xml(file_path)
@ -560,12 +556,21 @@ class CoreClient:
except grpc.RpcError as e:
self.app.after(0, show_grpc_error, e)
def get_node_service(self, node_id, service_name):
def get_node_service(
self, node_id: int, service_name: str
) -> core_pb2.NodeServiceData:
response = self.client.get_node_service(self.session_id, node_id, service_name)
logging.debug("get node service %s", response)
return response.service
def set_node_service(self, node_id, service_name, startups, validations, shutdowns):
def set_node_service(
self,
node_id: int,
service_name: str,
startups: List[str],
validations: List[str],
shutdowns: List[str],
) -> core_pb2.NodeServiceData:
response = self.client.set_node_service(
self.session_id, node_id, service_name, startups, validations, shutdowns
)
@ -574,14 +579,18 @@ class CoreClient:
logging.debug("get node service : %s", response)
return response.service
def get_node_service_file(self, node_id, service_name, file_name):
def get_node_service_file(
self, node_id: int, service_name: str, file_name: str
) -> str:
response = self.client.get_node_service_file(
self.session_id, node_id, service_name, file_name
)
logging.debug("get service file %s", response)
return response.data
def set_node_service_file(self, node_id, service_name, file_name, data):
def set_node_service_file(
self, node_id: int, service_name: str, file_name: str, data: bytes
):
response = self.client.set_node_service_file(
self.session_id, node_id, service_name, file_name, data
)
@ -590,8 +599,6 @@ class CoreClient:
def create_nodes_and_links(self):
"""
create nodes and links that have not been created yet
:return: nothing
"""
node_protos = [x.core_node for x in self.canvas_nodes.values()]
link_protos = [x.link for x in self.links.values()]
@ -618,8 +625,6 @@ class CoreClient:
def send_data(self):
"""
send to daemon all session info, but don't start the session
:return: nothing
"""
self.create_nodes_and_links()
for config_proto in self.get_wlan_configs_proto():
@ -664,18 +669,13 @@ class CoreClient:
def close(self):
"""
Clean ups when done using grpc
:return: nothing
"""
logging.debug("close grpc")
self.client.close()
def next_node_id(self):
def next_node_id(self) -> int:
"""
Get the next usable node id.
:return: the next id to be used
:rtype: int
"""
i = 1
while True:
@ -684,15 +684,11 @@ class CoreClient:
i += 1
return i
def create_node(self, x, y, node_type, model):
def create_node(
self, x: int, y: int, node_type: core_pb2.NodeType, model: str
) -> core_pb2.Node:
"""
Add node, with information filled in, to grpc manager
:param int x: x coord
:param int y: y coord
:param core_pb2.NodeType node_type: node type
:param str model: node model
:return: nothing
"""
node_id = self.next_node_id()
position = core_pb2.Position(x=x, y=y)
@ -727,13 +723,10 @@ class CoreClient:
)
return node
def delete_graph_nodes(self, canvas_nodes):
def delete_graph_nodes(self, canvas_nodes: List[core_pb2.Node]):
"""
remove the nodes selected by the user and anything related to that node
such as link, configurations, interfaces
:param list canvas_nodes: list of nodes to delete
:return: nothing
"""
edges = set()
for canvas_node in canvas_nodes:
@ -755,12 +748,9 @@ class CoreClient:
if edge in edges:
continue
edges.add(edge)
#
# if edge.token not in self.links:
# logging.error("unknown edge: %s", edge.token)
self.links.pop(edge.token, None)
def create_interface(self, canvas_node):
def create_interface(self, canvas_node: CanvasNode) -> core_pb2.Interface:
node = canvas_node.core_node
ip4, ip6, prefix = self.interfaces_manager.get_ips(node.id)
interface_id = len(canvas_node.interfaces)
@ -777,16 +767,12 @@ class CoreClient:
)
return interface
def create_link(self, edge, canvas_src_node, canvas_dst_node):
def create_link(
self, edge: CanvasEdge, canvas_src_node: CanvasNode, canvas_dst_node: CanvasNode
):
"""
Create core link for a pair of canvas nodes, with token referencing
the canvas edge.
:param edge: edge for link
:param canvas_src_node: canvas node one
:param canvas_dst_node: canvas node two
:return: nothing
"""
src_node = canvas_src_node.core_node
dst_node = canvas_dst_node.core_node
@ -816,7 +802,7 @@ class CoreClient:
edge.set_link(link)
self.links[edge.token] = edge
def get_wlan_configs_proto(self):
def get_wlan_configs_proto(self) -> List[core_pb2.WlanConfig]:
configs = []
for node_id, config in self.wlan_configs.items():
config = {x: config[x].value for x in config}
@ -824,7 +810,7 @@ class CoreClient:
configs.append(wlan_config)
return configs
def get_mobility_configs_proto(self):
def get_mobility_configs_proto(self) -> List[core_pb2.MobilityConfig]:
configs = []
for node_id, config in self.mobility_configs.items():
config = {x: config[x].value for x in config}
@ -832,7 +818,7 @@ class CoreClient:
configs.append(mobility_config)
return configs
def get_emane_model_configs_proto(self):
def get_emane_model_configs_proto(self) -> List[core_pb2.EmaneModelConfig]:
configs = []
for key, config in self.emane_model_configs.items():
node_id, model, interface = key
@ -845,7 +831,7 @@ class CoreClient:
configs.append(config_proto)
return configs
def get_service_configs_proto(self):
def get_service_configs_proto(self) -> List[core_pb2.ServiceConfig]:
configs = []
for node_id, services in self.service_configs.items():
for name, config in services.items():
@ -859,7 +845,7 @@ class CoreClient:
configs.append(config_proto)
return configs
def get_service_file_configs_proto(self):
def get_service_file_configs_proto(self) -> List[core_pb2.ServiceFileConfig]:
configs = []
for (node_id, file_configs) in self.file_configs.items():
for service, file_config in file_configs.items():
@ -870,25 +856,27 @@ class CoreClient:
configs.append(config_proto)
return configs
def run(self, node_id):
def run(self, node_id: int) -> str:
logging.info("running node(%s) cmd: %s", node_id, self.observer)
return self.client.node_command(self.session_id, node_id, self.observer).output
def get_wlan_config(self, node_id):
def get_wlan_config(self, node_id: int) -> Dict[str, core_pb2.ConfigOption]:
config = self.wlan_configs.get(node_id)
if not config:
response = self.client.get_wlan_config(self.session_id, node_id)
config = response.config
return config
def get_mobility_config(self, node_id):
def get_mobility_config(self, node_id: int) -> Dict[str, core_pb2.ConfigOption]:
config = self.mobility_configs.get(node_id)
if not config:
response = self.client.get_mobility_config(self.session_id, node_id)
config = response.config
return config
def get_emane_model_config(self, node_id, model, interface=None):
def get_emane_model_config(
self, node_id: int, model: str, interface: int = None
) -> Dict[str, core_pb2.ConfigOption]:
logging.info("getting emane model config: %s %s %s", node_id, model, interface)
config = self.emane_model_configs.get((node_id, model, interface))
if not config:
@ -900,15 +888,21 @@ class CoreClient:
config = response.config
return config
def set_emane_model_config(self, node_id, model, config, interface=None):
def set_emane_model_config(
self,
node_id: int,
model: str,
config: Dict[str, core_pb2.ConfigOption],
interface: int = None,
):
logging.info("setting emane model config: %s %s %s", node_id, model, interface)
self.emane_model_configs[(node_id, model, interface)] = config
def copy_node_service(self, _from, _to):
def copy_node_service(self, _from: int, _to: int):
services = self.canvas_nodes[_from].core_node.services
self.canvas_nodes[_to].core_node.services[:] = services
def copy_node_config(self, _from, _to):
def copy_node_config(self, _from: int, _to: int):
node_type = self.canvas_nodes[_from].core_node.type
if node_type == core_pb2.NodeType.DEFAULT:
services = self.canvas_nodes[_from].core_node.services

View file

@ -1,9 +1,13 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.dialogs.dialog import Dialog
from core.gui.widgets import CodeText
if TYPE_CHECKING:
from core.gui.app import Application
LICENSE = """\
Copyright (c) 2005-2020, the Boeing Company.
@ -31,7 +35,7 @@ THE POSSIBILITY OF SUCH DAMAGE.\
class AboutDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "About CORE", modal=True)
self.draw()

View file

@ -3,15 +3,19 @@ check engine light
"""
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.api.grpc.core_pb2 import ExceptionLevel
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import PADX, PADY
from core.gui.widgets import CodeText
if TYPE_CHECKING:
from core.gui.app import Application
class AlertsDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Alerts", modal=True)
self.app = app
self.tree = None
@ -110,7 +114,7 @@ class AlertsDialog(Dialog):
dialog = DaemonLog(self, self.app)
dialog.show()
def click_select(self, event):
def click_select(self, event: tk.Event):
current = self.tree.selection()[0]
alarm = self.alarm_map[current]
self.codetext.text.config(state=tk.NORMAL)
@ -120,7 +124,7 @@ class AlertsDialog(Dialog):
class DaemonLog(Dialog):
def __init__(self, master, app):
def __init__(self, master: tk.Widget, app: "Application"):
super().__init__(master, app, "core-daemon log", modal=True)
self.columnconfigure(0, weight=1)
self.path = tk.StringVar(value="/var/log/core-daemon.log")

View file

@ -3,19 +3,21 @@ size and scale
"""
import tkinter as tk
from tkinter import font, ttk
from typing import TYPE_CHECKING
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import FRAME_PAD, PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
PIXEL_SCALE = 100
class SizeAndScaleDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
"""
create an instance for size and scale object
:param app: main application
"""
super().__init__(master, app, "Canvas Size and Scale", modal=True)
self.canvas = self.app.canvas

View file

@ -4,6 +4,7 @@ set wallpaper
import logging
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.appconfig import BACKGROUNDS_PATH
from core.gui.dialogs.dialog import Dialog
@ -11,13 +12,14 @@ from core.gui.images import Images
from core.gui.themes import PADX, PADY
from core.gui.widgets import image_chooser
if TYPE_CHECKING:
from core.gui.app import Application
class CanvasWallpaperDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
"""
create an instance of CanvasWallpaper object
:param coretk.app.Application app: root application
"""
super().__init__(master, app, "Canvas Background", modal=True)
self.canvas = self.app.canvas
@ -140,8 +142,6 @@ class CanvasWallpaperDialog(Dialog):
def click_clear(self):
"""
delete like shown in image link entry if there is any
:return: nothing
"""
# delete entry
self.filename.set("")

View file

@ -4,12 +4,16 @@ custom color picker
import logging
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any
from core.gui.dialogs.dialog import Dialog
if TYPE_CHECKING:
from core.gui.app import Application
class ColorPickerDialog(Dialog):
def __init__(self, master, app, initcolor="#000000"):
def __init__(self, master: Any, app: "Application", initcolor: str = "#000000"):
super().__init__(master, app, "color picker", modal=True)
self.red_entry = None
self.blue_entry = None
@ -31,7 +35,7 @@ class ColorPickerDialog(Dialog):
self.draw()
self.set_bindings()
def askcolor(self):
def askcolor(self) -> str:
self.show()
return self.color
@ -175,19 +179,16 @@ class ColorPickerDialog(Dialog):
self.color = self.hex.get()
self.destroy()
def get_hex(self):
def get_hex(self) -> str:
"""
convert current RGB values into hex color
:rtype: str
:return: hex color
"""
red = self.red_entry.get()
blue = self.blue_entry.get()
green = self.green_entry.get()
return "#%02x%02x%02x" % (int(red), int(green), int(blue))
def current_focus(self, focus):
def current_focus(self, focus: str):
self.focus = focus
def update_color(self, arg1=None, arg2=None, arg3=None):
@ -210,35 +211,31 @@ class ColorPickerDialog(Dialog):
self.set_entry(red, green, blue)
self.set_scale(red, green, blue)
self.display.config(background=hex_code)
self.set_label(red, green, blue)
self.set_label(str(red), str(green), str(blue))
def scale_callback(self, var, color_var):
def scale_callback(self, var: tk.IntVar, color_var: tk.IntVar):
color_var.set(var.get())
self.focus = "rgb"
self.update_color()
def set_scale(self, red, green, blue):
def set_scale(self, red: int, green: int, blue: int):
self.red_scale.set(red)
self.green_scale.set(green)
self.blue_scale.set(blue)
def set_entry(self, red, green, blue):
def set_entry(self, red: int, green: int, blue: int):
self.red.set(red)
self.green.set(green)
self.blue.set(blue)
def set_label(self, red, green, blue):
def set_label(self, red: str, green: str, blue: str):
self.red_label.configure(background="#%02x%02x%02x" % (int(red), 0, 0))
self.green_label.configure(background="#%02x%02x%02x" % (0, int(green), 0))
self.blue_label.configure(background="#%02x%02x%02x" % (0, 0, int(blue)))
def get_rgb(self, hex_code):
def get_rgb(self, hex_code: str) -> [int, int, int]:
"""
convert a valid hex code to RGB values
:param string hex_code: color in hex
:rtype: tuple(int, int, int)
:return: the RGB values
"""
if len(hex_code) == 4:
red = hex_code[1]

View file

@ -5,14 +5,18 @@ copy service config dialog
import logging
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Tuple
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import FRAME_PAD, PADX
from core.gui.widgets import CodeText
if TYPE_CHECKING:
from core.gui.app import Application
class CopyServiceConfigDialog(Dialog):
def __init__(self, master, app, node_id):
def __init__(self, master: Any, app: "Application", node_id: int):
super().__init__(master, app, f"Copy services to node {node_id}", modal=True)
self.parent = master
self.app = app
@ -128,6 +132,7 @@ class CopyServiceConfigDialog(Dialog):
def click_view(self):
selected = self.tree.selection()
data = ""
if selected:
item = self.tree.item(selected[0])
if "file" in item["tags"]:
@ -157,7 +162,7 @@ class CopyServiceConfigDialog(Dialog):
)
dialog.show()
def get_node_service(self, selected):
def get_node_service(self, selected: Tuple[str]) -> [int, str]:
service_tree_id = self.tree.parent(selected[0])
service_name = self.tree.item(service_tree_id)["text"]
node_tree_id = self.tree.parent(service_tree_id)
@ -166,7 +171,14 @@ class CopyServiceConfigDialog(Dialog):
class ViewConfigDialog(Dialog):
def __init__(self, master, app, node_id, data, filename=None):
def __init__(
self,
master: Any,
app: "Application",
node_id: int,
data: str,
filename: str = None,
):
super().__init__(master, app, f"n{node_id} config data", modal=True)
self.data = data
self.service_data = None

View file

@ -2,6 +2,7 @@ import logging
import tkinter as tk
from pathlib import Path
from tkinter import ttk
from typing import TYPE_CHECKING, Any, Set
from core.gui import nodeutils
from core.gui.appconfig import ICONS_PATH
@ -11,9 +12,12 @@ from core.gui.nodeutils import NodeDraw
from core.gui.themes import FRAME_PAD, PADX, PADY
from core.gui.widgets import CheckboxList, ListboxScroll, image_chooser
if TYPE_CHECKING:
from core.gui.app import Application
class ServicesSelectDialog(Dialog):
def __init__(self, master, app, current_services):
def __init__(self, master: Any, app: "Application", current_services: Set[str]):
super().__init__(master, app, "Node Services", modal=True)
self.groups = None
self.services = None
@ -71,7 +75,7 @@ class ServicesSelectDialog(Dialog):
# trigger group change
self.groups.listbox.event_generate("<<ListboxSelect>>")
def handle_group_change(self, event):
def handle_group_change(self, event: tk.Event):
selection = self.groups.listbox.curselection()
if selection:
index = selection[0]
@ -81,7 +85,7 @@ class ServicesSelectDialog(Dialog):
checked = name in self.current_services
self.services.add(name, checked)
def service_clicked(self, name, var):
def service_clicked(self, name: str, var: tk.BooleanVar):
if var.get() and name not in self.current_services:
self.current_services.add(name)
elif not var.get() and name in self.current_services:
@ -96,7 +100,7 @@ class ServicesSelectDialog(Dialog):
class CustomNodesDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Custom Nodes", modal=True)
self.edit_button = None
self.delete_button = None
@ -241,7 +245,7 @@ class CustomNodesDialog(Dialog):
self.nodes_list.listbox.selection_clear(0, tk.END)
self.nodes_list.listbox.event_generate("<<ListboxSelect>>")
def handle_node_select(self, event):
def handle_node_select(self, event: tk.Event):
selection = self.nodes_list.listbox.curselection()
if selection:
self.selected_index = selection[0]

View file

@ -1,12 +1,18 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.images import ImageEnum, Images
from core.gui.themes import DIALOG_PAD
if TYPE_CHECKING:
from core.gui.app import Application
class Dialog(tk.Toplevel):
def __init__(self, master, app, title, modal=False):
def __init__(
self, master: tk.Widget, app: "Application", title: str, modal: bool = False
):
super().__init__(master)
self.withdraw()
self.app = app
@ -30,7 +36,7 @@ class Dialog(tk.Toplevel):
self.grab_set()
self.wait_window()
def draw_spacer(self, row=None):
def draw_spacer(self, row: int = None):
frame = ttk.Frame(self.top)
frame.grid(row=row, sticky="nsew")
frame.rowconfigure(0, weight=1)

View file

@ -5,18 +5,24 @@ import logging
import tkinter as tk
import webbrowser
from tkinter import ttk
from typing import TYPE_CHECKING, Any
import grpc
from core.api.grpc import core_pb2
from core.gui.dialogs.dialog import Dialog
from core.gui.errors import show_grpc_error
from core.gui.images import ImageEnum, Images
from core.gui.themes import PADX, PADY
from core.gui.widgets import ConfigFrame
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.node import CanvasNode
class GlobalEmaneDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: Any, app: "Application"):
super().__init__(master, app, "EMANE Configuration", modal=True)
self.config_frame = None
self.draw()
@ -47,7 +53,14 @@ class GlobalEmaneDialog(Dialog):
class EmaneModelDialog(Dialog):
def __init__(self, master, app, node, model, interface=None):
def __init__(
self,
master: Any,
app: "Application",
node: core_pb2.Node,
model: str,
interface: int = None,
):
super().__init__(master, app, f"{node.name} {model} Configuration", modal=True)
self.node = node
self.model = f"emane_{model}"
@ -91,7 +104,9 @@ class EmaneModelDialog(Dialog):
class EmaneConfigDialog(Dialog):
def __init__(self, master, app, canvas_node):
def __init__(
self, master: "Application", app: "Application", canvas_node: "CanvasNode"
):
super().__init__(
master, app, f"{canvas_node.core_node.name} EMANE Configuration", modal=True
)
@ -116,8 +131,6 @@ class EmaneConfigDialog(Dialog):
def draw_emane_configuration(self):
"""
draw the main frame for emane configuration
:return: nothing
"""
label = ttk.Label(
self.top,
@ -143,8 +156,6 @@ class EmaneConfigDialog(Dialog):
def draw_emane_models(self):
"""
create a combobox that has all the known emane models
:return: nothing
"""
frame = ttk.Frame(self.top)
frame.grid(sticky="ew", pady=PADY)
@ -210,8 +221,6 @@ class EmaneConfigDialog(Dialog):
def click_model_config(self):
"""
draw emane model configuration
:return: nothing
"""
model_name = self.emane_model.get()
logging.info("configuring emane model: %s", model_name)
@ -220,12 +229,9 @@ class EmaneConfigDialog(Dialog):
)
dialog.show()
def emane_model_change(self, event):
def emane_model_change(self, event: tk.Event):
"""
update emane model options button
:param event:
:return: nothing
"""
model_name = self.emane_model.get()
self.emane_model_button.config(text=f"{model_name} options")

View file

@ -1,14 +1,18 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any
from core.api.grpc import core_pb2
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import PADX, PADY
from core.gui.widgets import CodeText, ListboxScroll
if TYPE_CHECKING:
from core.gui.app import Application
class HookDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: Any, app: "Application"):
super().__init__(master, app, "Hook", modal=True)
self.name = tk.StringVar()
self.codetext = None
@ -62,11 +66,11 @@ class HookDialog(Dialog):
button = ttk.Button(frame, text="Cancel", command=lambda: self.destroy())
button.grid(row=0, column=1, sticky="ew")
def state_change(self, event):
def state_change(self, event: tk.Event):
state_name = self.state.get()
self.name.set(f"{state_name.lower()}_hook.sh")
def set(self, hook):
def set(self, hook: core_pb2.Hook):
self.hook = hook
self.name.set(hook.file)
self.codetext.text.delete(1.0, tk.END)
@ -84,7 +88,7 @@ class HookDialog(Dialog):
class HooksDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Hooks", modal=True)
self.listbox = None
self.edit_button = None
@ -140,7 +144,7 @@ class HooksDialog(Dialog):
self.edit_button.config(state=tk.DISABLED)
self.delete_button.config(state=tk.DISABLED)
def select(self, event):
def select(self, event: tk.Event):
if self.listbox.curselection():
index = self.listbox.curselection()[0]
self.selected = self.listbox.get(index)

View file

@ -4,14 +4,19 @@ link configuration
import logging
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Union
from core.api.grpc import core_pb2
from core.gui.dialogs.colorpicker import ColorPickerDialog
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.graph import CanvasGraph, CanvasEdge
def get_int(var):
def get_int(var: tk.StringVar) -> Union[int, None]:
value = var.get()
if value != "":
return int(value)
@ -19,7 +24,7 @@ def get_int(var):
return None
def get_float(var):
def get_float(var: tk.StringVar) -> Union[float, None]:
value = var.get()
if value != "":
return float(value)
@ -28,7 +33,7 @@ def get_float(var):
class LinkConfigurationDialog(Dialog):
def __init__(self, master, app, edge):
def __init__(self, master: "CanvasGraph", app: "Application", edge: "CanvasEdge"):
super().__init__(master, app, "Link Configuration", modal=True)
self.app = app
self.edge = edge
@ -103,7 +108,7 @@ class LinkConfigurationDialog(Dialog):
button = ttk.Button(frame, text="Cancel", command=self.destroy)
button.grid(row=0, column=1, sticky="ew")
def get_frame(self):
def get_frame(self) -> ttk.Frame:
frame = ttk.Frame(self.top)
frame.columnconfigure(1, weight=1)
if self.is_symmetric:
@ -339,8 +344,6 @@ class LinkConfigurationDialog(Dialog):
def load_link_config(self):
"""
populate link config to the table
:return: nothing
"""
width = self.app.canvas.itemcget(self.edge.id, "width")
self.width.set(width)

View file

@ -4,15 +4,21 @@ marker dialog
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.dialogs.colorpicker import ColorPickerDialog
from core.gui.dialogs.dialog import Dialog
if TYPE_CHECKING:
from core.gui.app import Application
MARKER_THICKNESS = [3, 5, 8, 10]
class MarkerDialog(Dialog):
def __init__(self, master, app, initcolor="#000000"):
def __init__(
self, master: "Application", app: "Application", initcolor: str = "#000000"
):
super().__init__(master, app, "marker tool", modal=False)
self.app = app
self.color = initcolor
@ -53,13 +59,13 @@ class MarkerDialog(Dialog):
for i in canvas.find_withtag("marker"):
canvas.delete(i)
def change_color(self, event):
def change_color(self, event: tk.Event):
color_picker = ColorPickerDialog(self, self.app, self.color)
color = color_picker.askcolor()
event.widget.configure(background=color)
self.color = color
def change_thickness(self, event):
def change_thickness(self, event: tk.Event):
self.radius = self.marker_thickness.get()
def show(self):

View file

@ -2,6 +2,7 @@
mobility configuration
"""
from tkinter import ttk
from typing import TYPE_CHECKING
import grpc
@ -10,9 +11,15 @@ from core.gui.errors import show_grpc_error
from core.gui.themes import PADX, PADY
from core.gui.widgets import ConfigFrame
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.node import CanvasNode
class MobilityConfigDialog(Dialog):
def __init__(self, master, app, canvas_node):
def __init__(
self, master: "Application", app: "Application", canvas_node: "CanvasNode"
):
super().__init__(
master,
app,

View file

@ -1,5 +1,6 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any
import grpc
@ -9,11 +10,21 @@ from core.gui.errors import show_grpc_error
from core.gui.images import ImageEnum, Images
from core.gui.themes import PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.node import CanvasNode
ICON_SIZE = 16
class MobilityPlayer:
def __init__(self, master, app, canvas_node, config):
def __init__(
self,
master: "Application",
app: "Application",
canvas_node: "CanvasNode",
config,
):
self.master = master
self.app = app
self.canvas_node = canvas_node
@ -57,7 +68,9 @@ class MobilityPlayer:
class MobilityPlayerDialog(Dialog):
def __init__(self, master, app, canvas_node, config):
def __init__(
self, master: Any, app: "Application", canvas_node: "CanvasNode", config
):
super().__init__(
master, app, f"{canvas_node.core_node.name} Mobility Player", modal=False
)

View file

@ -2,6 +2,7 @@ import logging
import tkinter as tk
from functools import partial
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui import nodeutils
from core.gui.appconfig import ICONS_PATH
@ -12,20 +13,32 @@ from core.gui.nodeutils import NodeUtils
from core.gui.themes import FRAME_PAD, PADX, PADY
from core.gui.widgets import ListboxScroll, image_chooser
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.node import CanvasNode
def mac_auto(is_auto, entry):
def mac_auto(is_auto: tk.BooleanVar, entry: ttk.Entry):
logging.info("mac auto clicked")
if is_auto.get():
logging.info("disabling mac")
entry.var.set("")
entry.delete(0, tk.END)
entry.insert(tk.END, "")
entry.config(state=tk.DISABLED)
else:
entry.var.set("00:00:00:00:00:00")
entry.delete(0, tk.END)
entry.insert(tk.END, "00:00:00:00:00:00")
entry.config(state=tk.NORMAL)
class InterfaceData:
def __init__(self, is_auto, mac, ip4, ip6):
def __init__(
self,
is_auto: tk.BooleanVar,
mac: tk.StringVar,
ip4: tk.StringVar,
ip6: tk.StringVar,
):
self.is_auto = is_auto
self.mac = mac
self.ip4 = ip4
@ -33,13 +46,11 @@ class InterfaceData:
class NodeConfigDialog(Dialog):
def __init__(self, master, app, canvas_node):
def __init__(
self, master: "Application", app: "Application", canvas_node: "CanvasNode"
):
"""
create an instance of node configuration
:param master: dialog master
:param coretk.app.Application: main app
:param coretk.graph.CanvasNode canvas_node: canvas node object
"""
super().__init__(
master, app, f"{canvas_node.core_node.name} Configuration", modal=True
@ -217,7 +228,7 @@ class NodeConfigDialog(Dialog):
button = ttk.Button(frame, text="Cancel", command=self.destroy)
button.grid(row=0, column=1, sticky="ew")
def click_emane_config(self, emane_model, interface_id):
def click_emane_config(self, emane_model: str, interface_id: int):
dialog = EmaneModelDialog(self, self.app, self.node, emane_model, interface_id)
dialog.show()
@ -248,7 +259,7 @@ class NodeConfigDialog(Dialog):
self.canvas_node.redraw()
self.destroy()
def interface_select(self, event):
def interface_select(self, event: tk.Event):
listbox = event.widget
cur = listbox.curselection()
if cur:

View file

@ -3,15 +3,26 @@ core node services
"""
import tkinter as tk
from tkinter import messagebox, ttk
from typing import TYPE_CHECKING, Any, Set
from core.gui.dialogs.dialog import Dialog
from core.gui.dialogs.serviceconfig import ServiceConfigDialog
from core.gui.themes import FRAME_PAD, PADX, PADY
from core.gui.widgets import CheckboxList, ListboxScroll
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.node import CanvasNode
class NodeServiceDialog(Dialog):
def __init__(self, master, app, canvas_node, services=None):
def __init__(
self,
master: Any,
app: "Application",
canvas_node: "CanvasNode",
services: Set[str] = None,
):
title = f"{canvas_node.core_node.name} Services"
super().__init__(master, app, title, modal=True)
self.app = app
@ -87,7 +98,7 @@ class NodeServiceDialog(Dialog):
# trigger group change
self.groups.listbox.event_generate("<<ListboxSelect>>")
def handle_group_change(self, event=None):
def handle_group_change(self, event: tk.Event = None):
selection = self.groups.listbox.curselection()
if selection:
index = selection[0]
@ -97,7 +108,7 @@ class NodeServiceDialog(Dialog):
checked = name in self.current_services
self.services.add(name, checked)
def service_clicked(self, name, var):
def service_clicked(self, name: str, var: tk.IntVar):
if var.get() and name not in self.current_services:
self.current_services.add(name)
elif not var.get() and name in self.current_services:
@ -150,7 +161,7 @@ class NodeServiceDialog(Dialog):
checkbutton.invoke()
return
def is_custom_service(self, service):
def is_custom_service(self, service: str) -> bool:
service_configs = self.app.core.service_configs
file_configs = self.app.core.file_configs
if self.node_id in service_configs and service in service_configs[self.node_id]:

View file

@ -1,14 +1,18 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.coreclient import Observer
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import PADX, PADY
from core.gui.widgets import ListboxScroll
if TYPE_CHECKING:
from core.gui.app import Application
class ObserverDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Observer Widgets", modal=True)
self.observers = None
self.save_button = None
@ -126,7 +130,7 @@ class ObserverDialog(Dialog):
self.save_button.config(state=tk.DISABLED)
self.delete_button.config(state=tk.DISABLED)
def handle_observer_change(self, event):
def handle_observer_change(self, event: tk.Event):
selection = self.observers.curselection()
if selection:
self.selected_index = selection[0]

View file

@ -1,14 +1,18 @@
import logging
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui import appconfig
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import FRAME_PAD, PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
class PreferencesDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Preferences", modal=True)
preferences = self.app.guiconfig["preferences"]
self.editor = tk.StringVar(value=preferences["editor"])
@ -72,7 +76,7 @@ class PreferencesDialog(Dialog):
button = ttk.Button(frame, text="Cancel", command=self.destroy)
button.grid(row=0, column=1, sticky="ew")
def theme_change(self, event):
def theme_change(self, event: tk.Event):
theme = self.theme.get()
logging.info("changing theme: %s", theme)
self.app.style.theme_use(theme)

View file

@ -1,18 +1,22 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.coreclient import CoreServer
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import FRAME_PAD, PADX, PADY
from core.gui.widgets import ListboxScroll
if TYPE_CHECKING:
from core.gui.app import Application
DEFAULT_NAME = "example"
DEFAULT_ADDRESS = "127.0.0.1"
DEFAULT_PORT = 50051
class ServersDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "CORE Servers", modal=True)
self.name = tk.StringVar(value=DEFAULT_NAME)
self.address = tk.StringVar(value=DEFAULT_ADDRESS)
@ -155,7 +159,7 @@ class ServersDialog(Dialog):
self.save_button.config(state=tk.DISABLED)
self.delete_button.config(state=tk.DISABLED)
def handle_server_change(self, event):
def handle_server_change(self, event: tk.Event):
selection = self.servers.curselection()
if selection:
self.selected_index = selection[0]

View file

@ -1,6 +1,9 @@
"Service configuration dialog"
"""
Service configuration dialog
"""
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Any, List
import grpc
@ -12,9 +15,14 @@ from core.gui.images import ImageEnum, Images
from core.gui.themes import FRAME_PAD, PADX, PADY
from core.gui.widgets import CodeText, ListboxScroll
if TYPE_CHECKING:
from core.gui.app import Application
class ServiceConfigDialog(Dialog):
def __init__(self, master, app, service_name, node_id):
def __init__(
self, master: Any, app: "Application", service_name: str, node_id: int
):
title = f"{service_name} Service"
super().__init__(master, app, title, modal=True)
self.master = master
@ -225,7 +233,7 @@ class ServiceConfigDialog(Dialog):
for i in range(3):
tab.rowconfigure(i, weight=1)
self.notebook.add(tab, text="Startup/Shutdown")
commands = []
# tab 3
for i in range(3):
label_frame = None
@ -345,7 +353,7 @@ class ServiceConfigDialog(Dialog):
button = ttk.Button(frame, text="Cancel", command=self.destroy)
button.grid(row=0, column=3, sticky="ew")
def add_filename(self, event):
def add_filename(self, event: tk.Event):
# not worry about it for now
return
frame_contains_button = event.widget.master
@ -354,7 +362,7 @@ class ServiceConfigDialog(Dialog):
if filename not in combobox["values"]:
combobox["values"] += (filename,)
def delete_filename(self, event):
def delete_filename(self, event: tk.Event):
# not worry about it for now
return
frame_comntains_button = event.widget.master
@ -364,7 +372,7 @@ class ServiceConfigDialog(Dialog):
combobox["values"] = tuple([x for x in combobox["values"] if x != filename])
combobox.set("")
def add_command(self, event):
def add_command(self, event: tk.Event):
frame_contains_button = event.widget.master
listbox = frame_contains_button.master.grid_slaves(row=1, column=0)[0].listbox
command_to_add = frame_contains_button.grid_slaves(row=0, column=0)[0].get()
@ -375,7 +383,7 @@ class ServiceConfigDialog(Dialog):
return
listbox.insert(tk.END, command_to_add)
def update_entry(self, event):
def update_entry(self, event: tk.Event):
listbox = event.widget
current_selection = listbox.curselection()
if len(current_selection) > 0:
@ -386,7 +394,7 @@ class ServiceConfigDialog(Dialog):
entry.delete(0, "end")
entry.insert(0, cmd)
def delete_command(self, event):
def delete_command(self, event: tk.Event):
button = event.widget
frame_contains_button = button.master
listbox = frame_contains_button.master.grid_slaves(row=1, column=0)[0].listbox
@ -439,13 +447,13 @@ class ServiceConfigDialog(Dialog):
show_grpc_error(e)
self.destroy()
def display_service_file_data(self, event):
def display_service_file_data(self, event: tk.Event):
combobox = event.widget
filename = combobox.get()
self.service_file_data.text.delete(1.0, "end")
self.service_file_data.text.insert("end", self.temp_service_files[filename])
def update_temp_service_file_data(self, event):
def update_temp_service_file_data(self, event: tk.Event):
scrolledtext = event.widget
filename = self.filename_combobox.get()
self.temp_service_files[filename] = scrolledtext.get(1.0, "end")
@ -490,7 +498,9 @@ class ServiceConfigDialog(Dialog):
dialog = CopyServiceConfigDialog(self, self.app, self.node_id)
dialog.show()
def append_commands(self, commands, listbox, to_add):
def append_commands(
self, commands: List[str], listbox: tk.Listbox, to_add: List[str]
):
for cmd in to_add:
commands.append(cmd)
listbox.insert(tk.END, cmd)

View file

@ -1,5 +1,6 @@
import logging
from tkinter import ttk
from typing import TYPE_CHECKING
import grpc
@ -8,9 +9,12 @@ from core.gui.errors import show_grpc_error
from core.gui.themes import PADX, PADY
from core.gui.widgets import ConfigFrame
if TYPE_CHECKING:
from core.gui.app import Application
class SessionOptionsDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Session Options", modal=True)
self.config_frame = None
self.config = self.get_config()

View file

@ -1,6 +1,7 @@
import logging
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING, Iterable
import grpc
@ -11,9 +12,12 @@ from core.gui.images import ImageEnum, Images
from core.gui.task import BackgroundTask
from core.gui.themes import PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
class SessionsDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Sessions", modal=True)
self.selected = False
self.selected_id = None
@ -21,7 +25,7 @@ class SessionsDialog(Dialog):
self.sessions = self.get_sessions()
self.draw()
def get_sessions(self):
def get_sessions(self) -> Iterable[core_pb2.SessionSummary]:
try:
response = self.app.core.client.get_sessions()
logging.info("sessions: %s", response)
@ -40,7 +44,6 @@ class SessionsDialog(Dialog):
def draw_description(self):
"""
write a short description
:return: nothing
"""
label = ttk.Label(
self.top,
@ -129,7 +132,7 @@ class SessionsDialog(Dialog):
self.app.core.create_new_session()
self.destroy()
def click_select(self, event):
def click_select(self, event: tk.Event):
item = self.tree.selection()
session_id = int(self.tree.item(item, "text"))
self.selected = True
@ -138,8 +141,6 @@ class SessionsDialog(Dialog):
def click_connect(self):
"""
if no session is selected yet, create a new one else join that session
:return: nothing
"""
if self.selected and self.selected_id is not None:
self.join_session(self.selected_id)
@ -152,8 +153,6 @@ class SessionsDialog(Dialog):
"""
if no session is currently selected create a new session else shut the selected
session down.
:return: nothing
"""
if self.selected and self.selected_id is not None:
self.shutdown_session(self.selected_id)
@ -162,18 +161,18 @@ class SessionsDialog(Dialog):
else:
logging.error("querysessiondrawing.py invalid state")
def join_session(self, session_id):
def join_session(self, session_id: int):
self.app.statusbar.progress_bar.start(5)
task = BackgroundTask(self.app, self.app.core.join_session, args=(session_id,))
task.start()
self.destroy()
def on_selected(self, event):
def on_selected(self, event: tk.Event):
item = self.tree.selection()
sid = int(self.tree.item(item, "text"))
self.join_session(sid)
def shutdown_session(self, sid):
def shutdown_session(self, sid: int):
self.app.core.stop_session(sid)
self.click_new()
self.destroy()

View file

@ -3,6 +3,7 @@ shape input dialog
"""
import tkinter as tk
from tkinter import font, ttk
from typing import TYPE_CHECKING, List, Union
from core.gui.dialogs.colorpicker import ColorPickerDialog
from core.gui.dialogs.dialog import Dialog
@ -10,12 +11,16 @@ from core.gui.graph import tags
from core.gui.graph.shapeutils import is_draw_shape, is_shape_text
from core.gui.themes import FRAME_PAD, PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.shape import Shape
FONT_SIZES = [8, 9, 10, 11, 12, 14, 16, 18, 20, 22, 24, 26, 28, 36, 48, 72]
BORDER_WIDTH = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
class ShapeDialog(Dialog):
def __init__(self, master, app, shape):
def __init__(self, master: "Application", app: "Application", shape: "Shape"):
if is_draw_shape(shape.shape_type):
title = "Add Shape"
else:
@ -162,10 +167,9 @@ class ShapeDialog(Dialog):
self.add_text()
self.destroy()
def make_font(self):
def make_font(self) -> List[Union[int, str]]:
"""
create font for text or shape label
:return: list(font specifications)
"""
size = int(self.font_size.get())
text_font = [self.font.get(), size]
@ -180,8 +184,6 @@ class ShapeDialog(Dialog):
def save_text(self):
"""
save info related to text or shape label
:return: nothing
"""
data = self.shape.shape_data
data.text = self.shape_text.get()
@ -195,8 +197,6 @@ class ShapeDialog(Dialog):
def save_shape(self):
"""
save info related to shape
:return: nothing
"""
data = self.shape.shape_data
data.fill_color = self.fill_color
@ -206,8 +206,6 @@ class ShapeDialog(Dialog):
def add_text(self):
"""
add text to canvas
:return: nothing
"""
text = self.shape_text.get()
text_font = self.make_font()

View file

@ -3,14 +3,18 @@ throughput dialog
"""
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.dialogs.colorpicker import ColorPickerDialog
from core.gui.dialogs.dialog import Dialog
from core.gui.themes import FRAME_PAD, PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
class ThroughputDialog(Dialog):
def __init__(self, master, app):
def __init__(self, master: "Application", app: "Application"):
super().__init__(master, app, "Throughput Config", modal=False)
self.app = app
self.canvas = app.canvas

View file

@ -3,6 +3,7 @@ wlan configuration
"""
from tkinter import ttk
from typing import TYPE_CHECKING
import grpc
@ -11,9 +12,15 @@ from core.gui.errors import show_grpc_error
from core.gui.themes import PADX, PADY
from core.gui.widgets import ConfigFrame
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.node import CanvasNode
class WlanConfigDialog(Dialog):
def __init__(self, master, app, canvas_node):
def __init__(
self, master: "Application", app: "Application", canvas_node: "CanvasNode"
):
super().__init__(
master, app, f"{canvas_node.core_node.name} Wlan Configuration", modal=True
)
@ -38,8 +45,6 @@ class WlanConfigDialog(Dialog):
def draw_apply_buttons(self):
"""
create node configuration options
:return: nothing
"""
frame = ttk.Frame(self.top)
frame.grid(sticky="ew")
@ -55,8 +60,6 @@ class WlanConfigDialog(Dialog):
def click_apply(self):
"""
retrieve user's wlan configuration and store the new configuration values
:return: nothing
"""
config = self.config_frame.parse_config()
self.app.core.wlan_configs[self.node.id] = self.config

View file

@ -1,7 +1,11 @@
from tkinter import messagebox
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import grpc
def show_grpc_error(e):
def show_grpc_error(e: "grpc.RpcError"):
title = [x.capitalize() for x in e.code().name.lower().split("_")]
title = " ".join(title)
title = f"GRPC {title}"

View file

@ -1,19 +1,30 @@
import logging
import tkinter as tk
from tkinter.font import Font
from typing import TYPE_CHECKING, Any, Tuple
from core.gui import themes
from core.gui.dialogs.linkconfig import LinkConfigurationDialog
from core.gui.graph import tags
from core.gui.nodeutils import NodeUtils
if TYPE_CHECKING:
from core.gui.graph.graph import CanvasGraph
TEXT_DISTANCE = 0.30
EDGE_WIDTH = 3
EDGE_COLOR = "#ff0000"
class CanvasWirelessEdge:
def __init__(self, token, position, src, dst, canvas):
def __init__(
self,
token: Tuple[Any, ...],
position: Tuple[float, float, float, float],
src: int,
dst: int,
canvas: "CanvasGraph",
):
self.token = token
self.src = src
self.dst = dst
@ -31,15 +42,17 @@ class CanvasEdge:
Canvas edge class
"""
def __init__(self, x1, y1, x2, y2, src, canvas):
def __init__(
self,
x1: float,
y1: float,
x2: float,
y2: float,
src: int,
canvas: "CanvasGraph",
):
"""
Create an instance of canvas edge object
:param int x1: source x-coord
:param int y1: source y-coord
:param int x2: destination x-coord
:param int y2: destination y-coord
:param int src: source id
:param coretk.graph.graph.GraphCanvas canvas: canvas object
"""
self.src = src
self.dst = None
@ -66,7 +79,7 @@ class CanvasEdge:
self.link = link
self.draw_labels()
def get_coordinates(self):
def get_coordinates(self) -> [float, float, float, float]:
x1, y1, x2, y2 = self.canvas.coords(self.id)
v1 = x2 - x1
v2 = y2 - y1
@ -78,7 +91,7 @@ class CanvasEdge:
y2 = y2 - uy
return x1, y1, x2, y2
def get_midpoint(self):
def get_midpoint(self) -> [float, float]:
x1, y1, x2, y2 = self.canvas.coords(self.id)
x = (x1 + x2) / 2
y = (y1 + y2) / 2
@ -118,8 +131,6 @@ class CanvasEdge:
def update_labels(self):
"""
Move edge labels based on current position.
:return: nothing
"""
x1, y1, x2, y2 = self.get_coordinates()
self.canvas.coords(self.text_src, x1, y1)
@ -128,7 +139,7 @@ class CanvasEdge:
x, y = self.get_midpoint()
self.canvas.coords(self.text_middle, x, y)
def set_throughput(self, throughput):
def set_throughput(self, throughput: float):
throughput = 0.001 * throughput
value = f"{throughput:.3f} kbps"
if self.text_middle is None:
@ -147,7 +158,7 @@ class CanvasEdge:
width = EDGE_WIDTH
self.canvas.itemconfig(self.id, fill=color, width=width)
def complete(self, dst):
def complete(self, dst: int):
self.dst = dst
self.token = tuple(sorted((self.src, self.dst)))
x, y = self.canvas.coords(self.dst)
@ -157,7 +168,7 @@ class CanvasEdge:
self.canvas.tag_raise(self.src)
self.canvas.tag_raise(self.dst)
def is_wireless(self):
def is_wireless(self) -> [bool, bool]:
src_node = self.canvas.nodes[self.src]
dst_node = self.canvas.nodes[self.dst]
src_node_type = src_node.core_node.type
@ -183,7 +194,6 @@ class CanvasEdge:
dst_node.add_antenna()
elif not is_src_wireless and is_dst_wireless:
src_node.add_antenna()
# TODO: remove this? dont allow linking wireless nodes?
else:
src_node.add_antenna()
@ -199,7 +209,7 @@ class CanvasEdge:
self.text_middle = None
self.canvas.itemconfig(self.id, fill=EDGE_COLOR, width=EDGE_WIDTH)
def create_context(self, event):
def create_context(self, event: tk.Event):
logging.debug("create link context")
context = tk.Menu(self.canvas)
themes.style_menu(context)

View file

@ -1,5 +1,6 @@
import logging
import tkinter as tk
from typing import TYPE_CHECKING, List, Tuple
from PIL import Image, ImageTk
@ -15,12 +16,18 @@ from core.gui.graph.shapeutils import ShapeType, is_draw_shape, is_marker
from core.gui.images import Images
from core.gui.nodeutils import NodeUtils
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.coreclient import CoreClient
ZOOM_IN = 1.1
ZOOM_OUT = 0.9
class CanvasGraph(tk.Canvas):
def __init__(self, master, core, width, height):
def __init__(
self, master: "Application", core: "CoreClient", width: int, height: int
):
super().__init__(master, highlightthickness=0, background="#cccccc")
self.app = master
self.core = core
@ -67,7 +74,7 @@ class CanvasGraph(tk.Canvas):
self.draw_canvas()
self.draw_grid()
def draw_canvas(self, dimensions=None):
def draw_canvas(self, dimensions: Tuple[int, int] = None):
if self.grid is not None:
self.delete(self.grid)
if not dimensions:
@ -84,13 +91,11 @@ class CanvasGraph(tk.Canvas):
)
self.configure(scrollregion=self.bbox(tk.ALL))
def reset_and_redraw(self, session):
def reset_and_redraw(self, session: core_pb2.Session):
"""
Reset the private variables CanvasGraph object, redraw nodes given the new grpc
client.
:param core.api.grpc.core_pb2.Session session: session to draw
:return: nothing
"""
# hide context
self.hide_context()
@ -114,8 +119,6 @@ class CanvasGraph(tk.Canvas):
def setup_bindings(self):
"""
Bind any mouse events or hot keys to the matching action
:return: nothing
"""
self.bind("<ButtonPress-1>", self.click_press)
self.bind("<ButtonRelease-1>", self.click_release)
@ -135,28 +138,28 @@ class CanvasGraph(tk.Canvas):
self.context.unpost()
self.context = None
def get_actual_coords(self, x, y):
def get_actual_coords(self, x: float, y: float) -> [float, float]:
actual_x = (x - self.offset[0]) / self.ratio
actual_y = (y - self.offset[1]) / self.ratio
return actual_x, actual_y
def get_scaled_coords(self, x, y):
def get_scaled_coords(self, x: float, y: float) -> [float, float]:
scaled_x = (x * self.ratio) + self.offset[0]
scaled_y = (y * self.ratio) + self.offset[1]
return scaled_x, scaled_y
def inside_canvas(self, x, y):
def inside_canvas(self, x: float, y: float) -> [bool, bool]:
x1, y1, x2, y2 = self.bbox(self.grid)
valid_x = x1 <= x <= x2
valid_y = y1 <= y <= y2
return valid_x and valid_y
def valid_position(self, x1, y1, x2, y2):
def valid_position(self, x1: int, y1: int, x2: int, y2: int) -> [bool, bool]:
valid_topleft = self.inside_canvas(x1, y1)
valid_bottomright = self.inside_canvas(x2, y2)
return valid_topleft and valid_bottomright
def set_throughputs(self, throughputs_event):
def set_throughputs(self, throughputs_event: core_pb2.ThroughputsEvent):
for interface_throughput in throughputs_event.interface_throughputs:
node_id = interface_throughput.node_id
interface_id = interface_throughput.interface_id
@ -174,8 +177,6 @@ class CanvasGraph(tk.Canvas):
def draw_grid(self):
"""
Create grid.
:return: nothing
"""
width, height = self.width_and_height()
width = int(width)
@ -187,13 +188,9 @@ class CanvasGraph(tk.Canvas):
self.tag_lower(tags.GRIDLINE)
self.tag_lower(self.grid)
def add_wireless_edge(self, src, dst):
def add_wireless_edge(self, src: CanvasNode, dst: CanvasNode):
"""
add a wireless edge between 2 canvas nodes
:param CanvasNode src: source node
:param CanvasNode dst: destination node
:return: nothing
"""
token = tuple(sorted((src.id, dst.id)))
x1, y1 = self.coords(src.id)
@ -206,18 +203,16 @@ class CanvasGraph(tk.Canvas):
self.tag_raise(src.id)
self.tag_raise(dst.id)
def delete_wireless_edge(self, src, dst):
def delete_wireless_edge(self, src: CanvasNode, dst: CanvasNode):
token = tuple(sorted((src.id, dst.id)))
edge = self.wireless_edges.pop(token)
edge.delete()
src.wireless_edges.remove(edge)
dst.wireless_edges.remove(edge)
def draw_session(self, session):
def draw_session(self, session: core_pb2.Session):
"""
Draw existing session.
:return: nothing
"""
# draw existing nodes
for core_node in session.nodes:
@ -296,25 +291,17 @@ class CanvasGraph(tk.Canvas):
for edge in self.edges.values():
edge.reset()
def canvas_xy(self, event):
def canvas_xy(self, event: tk.Event) -> [float, float]:
"""
Convert window coordinate to canvas coordinate
:param event:
:rtype: (int, int)
:return: x, y canvas coordinate
"""
x = self.canvasx(event.x)
y = self.canvasy(event.y)
return x, y
def get_selected(self, event):
def get_selected(self, event: tk.Event) -> int:
"""
Retrieve the item id that is on the mouse position
:param event: mouse event
:rtype: int
:return: the item that the mouse point to
"""
x, y = self.canvas_xy(event)
overlapping = self.find_overlapping(x, y, x, y)
@ -332,12 +319,9 @@ class CanvasGraph(tk.Canvas):
return selected
def click_release(self, event):
def click_release(self, event: tk.Event):
"""
Draw a node or finish drawing an edge according to the current graph mode
:param event: mouse event
:return: nothing
"""
logging.debug("click release")
x, y = self.canvas_xy(event)
@ -380,7 +364,7 @@ class CanvasGraph(tk.Canvas):
self.mode = GraphMode.NODE
self.selected = None
def handle_edge_release(self, event):
def handle_edge_release(self, event: tk.Event):
edge = self.drawing_edge
self.drawing_edge = None
@ -417,7 +401,7 @@ class CanvasGraph(tk.Canvas):
node_dst.edges.add(edge)
self.core.create_link(edge, node_src, node_dst)
def select_object(self, object_id, choose_multiple=False):
def select_object(self, object_id: int, choose_multiple: bool = False):
"""
create a bounding box when a node is selected
"""
@ -441,19 +425,17 @@ class CanvasGraph(tk.Canvas):
def clear_selection(self):
"""
Clear current selection boxes.
:return: nothing
"""
for _id in self.selection.values():
self.delete(_id)
self.selection.clear()
def move_selection(self, object_id, x_offset, y_offset):
def move_selection(self, object_id: int, x_offset: float, y_offset: float):
select_id = self.selection.get(object_id)
if select_id is not None:
self.move(select_id, x_offset, y_offset)
def delete_selection_objects(self):
def delete_selection_objects(self) -> List[CanvasNode]:
edges = set()
nodes = []
for object_id in self.selection:
@ -499,7 +481,7 @@ class CanvasGraph(tk.Canvas):
self.selection.clear()
return nodes
def zoom(self, event, factor=None):
def zoom(self, event: tk.Event, factor: float = None):
if not factor:
factor = ZOOM_IN if event.delta > 0 else ZOOM_OUT
event.x, event.y = self.canvasx(event.x), self.canvasy(event.y)
@ -517,12 +499,9 @@ class CanvasGraph(tk.Canvas):
if self.wallpaper:
self.redraw_wallpaper()
def click_press(self, event):
def click_press(self, event: tk.Event):
"""
Start drawing an edge if mouse click is on a node
:param event: mouse event
:return: nothing
"""
x, y = self.canvas_xy(event)
if not self.inside_canvas(x, y):
@ -581,7 +560,7 @@ class CanvasGraph(tk.Canvas):
self.select_box = shape
self.clear_selection()
def ctrl_click(self, event):
def ctrl_click(self, event: tk.Event):
# update cursor location
x, y = self.canvas_xy(event)
if not self.inside_canvas(x, y):
@ -599,12 +578,9 @@ class CanvasGraph(tk.Canvas):
):
self.select_object(selected, choose_multiple=True)
def click_motion(self, event):
def click_motion(self, event: tk.Event):
"""
Redraw drawing edge according to the current position of the mouse
:param event: mouse event
:return: nothing
"""
x, y = self.canvas_xy(event)
if not self.inside_canvas(x, y):
@ -658,7 +634,7 @@ class CanvasGraph(tk.Canvas):
if self.select_box and self.mode == GraphMode.SELECT:
self.select_box.shape_motion(x, y)
def click_context(self, event):
def click_context(self, event: tk.Event):
logging.info("context event: %s", self.context)
if not self.context:
selected = self.get_selected(event)
@ -670,24 +646,22 @@ class CanvasGraph(tk.Canvas):
else:
self.hide_context()
def press_delete(self, event):
def press_delete(self, event: tk.Event):
"""
delete selected nodes and any data that relates to it
:param event:
:return:
"""
logging.debug("press delete key")
nodes = self.delete_selection_objects()
self.core.delete_graph_nodes(nodes)
def double_click(self, event):
def double_click(self, event: tk.Event):
selected = self.get_selected(event)
if selected is not None and selected in self.shapes:
shape = self.shapes[selected]
dialog = ShapeDialog(self.app, self.app, shape)
dialog.show()
def add_node(self, x, y):
def add_node(self, x: float, y: float) -> CanvasNode:
if self.selected is None or self.selected in self.shapes:
actual_x, actual_y = self.get_actual_coords(x, y)
core_node = self.core.create_node(
@ -701,26 +675,25 @@ class CanvasGraph(tk.Canvas):
def width_and_height(self):
"""
retrieve canvas width and height in pixels
:return: nothing
"""
x0, y0, x1, y1 = self.coords(self.grid)
canvas_w = abs(x0 - x1)
canvas_h = abs(y0 - y1)
return canvas_w, canvas_h
def get_wallpaper_image(self):
def get_wallpaper_image(self) -> Image.Image:
width = int(self.wallpaper.width * self.ratio)
height = int(self.wallpaper.height * self.ratio)
image = self.wallpaper.resize((width, height), Image.ANTIALIAS)
return image
def draw_wallpaper(self, image, x=None, y=None):
def draw_wallpaper(
self, image: ImageTk.PhotoImage, x: float = None, y: float = None
):
if x is None and y is None:
x1, y1, x2, y2 = self.bbox(self.grid)
x = (x1 + x2) / 2
y = (y1 + y2) / 2
self.wallpaper_id = self.create_image((x, y), image=image, tags=tags.WALLPAPER)
self.wallpaper_drawn = image
@ -748,8 +721,6 @@ class CanvasGraph(tk.Canvas):
def wallpaper_center(self):
"""
place the image at the center of canvas
:return: nothing
"""
self.delete(self.wallpaper_id)
@ -773,8 +744,6 @@ class CanvasGraph(tk.Canvas):
def wallpaper_scaled(self):
"""
scale image based on canvas dimension
:return: nothing
"""
self.delete(self.wallpaper_id)
canvas_w, canvas_h = self.width_and_height()
@ -788,7 +757,7 @@ class CanvasGraph(tk.Canvas):
self.redraw_canvas((image.width(), image.height()))
self.draw_wallpaper(image)
def redraw_canvas(self, dimensions=None):
def redraw_canvas(self, dimensions: Tuple[int, int] = None):
logging.info("redrawing canvas to dimensions: %s", dimensions)
# reset scale and move back to original position
@ -836,7 +805,7 @@ class CanvasGraph(tk.Canvas):
else:
self.itemconfig(tags.GRIDLINE, state=tk.HIDDEN)
def set_wallpaper(self, filename):
def set_wallpaper(self, filename: str):
logging.info("setting wallpaper: %s", filename)
if filename:
img = Image.open(filename)
@ -849,16 +818,12 @@ class CanvasGraph(tk.Canvas):
self.wallpaper = None
self.wallpaper_file = None
def is_selection_mode(self):
def is_selection_mode(self) -> bool:
return self.mode == GraphMode.SELECT
def create_edge(self, source, dest):
def create_edge(self, source: CanvasNode, dest: CanvasNode):
"""
create an edge between source node and destination node
:param CanvasNode source: source node
:param CanvasNode dest: destination node
:return: nothing
"""
if (source.id, dest.id) not in self.edges:
pos0 = source.core_node.position

View file

@ -1,5 +1,6 @@
import tkinter as tk
from tkinter import font
from typing import TYPE_CHECKING
import grpc
@ -16,11 +17,22 @@ from core.gui.graph import tags
from core.gui.graph.tooltip import CanvasTooltip
from core.gui.nodeutils import NodeUtils
if TYPE_CHECKING:
from core.gui.app import Application
from PIL.ImageTk import PhotoImage
NODE_TEXT_OFFSET = 5
class CanvasNode:
def __init__(self, app, x, y, core_node, image):
def __init__(
self,
app: "Application",
x: float,
y: float,
core_node: core_pb2.Node,
image: "PhotoImage",
):
self.app = app
self.canvas = app.canvas
self.image = image
@ -70,8 +82,6 @@ class CanvasNode:
def delete_antenna(self):
"""
delete one antenna
:return: nothing
"""
if self.antennae:
antenna_id = self.antennae.pop()
@ -80,8 +90,6 @@ class CanvasNode:
def delete_antennae(self):
"""
delete all antennas
:return: nothing
"""
for antenna_id in self.antennae:
self.canvas.delete(antenna_id)
@ -95,14 +103,14 @@ class CanvasNode:
image_box = self.canvas.bbox(self.id)
return image_box[3] + NODE_TEXT_OFFSET
def move(self, x, y):
def move(self, x: int, y: int):
x, y = self.canvas.get_scaled_coords(x, y)
current_x, current_y = self.canvas.coords(self.id)
x_offset = x - current_x
y_offset = y - current_y
self.motion(x_offset, y_offset, update=False)
def motion(self, x_offset, y_offset, update=True):
def motion(self, x_offset: int, y_offset: int, update: bool = True):
original_position = self.canvas.coords(self.id)
self.canvas.move(self.id, x_offset, y_offset)
x, y = self.canvas.coords(self.id)
@ -144,7 +152,7 @@ class CanvasNode:
if self.app.core.is_runtime() and update:
self.app.core.edit_node(self.core_node)
def on_enter(self, event):
def on_enter(self, event: tk.Event):
if self.app.core.is_runtime() and self.app.core.observer:
self.tooltip.text.set("waiting...")
self.tooltip.on_enter(event)
@ -154,16 +162,16 @@ class CanvasNode:
except grpc.RpcError as e:
show_grpc_error(e)
def on_leave(self, event):
def on_leave(self, event: tk.Event):
self.tooltip.on_leave(event)
def double_click(self, event):
def double_click(self, event: tk.Event):
if self.app.core.is_runtime():
self.canvas.core.launch_terminal(self.core_node.id)
else:
self.show_config()
def create_context(self):
def create_context(self) -> tk.Menu:
is_wlan = self.core_node.type == NodeType.WIRELESS_LAN
is_emane = self.core_node.type == NodeType.EMANE
context = tk.Menu(self.canvas)
@ -245,7 +253,7 @@ class CanvasNode:
dialog = NodeServiceDialog(self.app.master, self.app, self)
dialog.show()
def has_emane_link(self, interface_id):
def has_emane_link(self, interface_id: int) -> core_pb2.Node:
result = None
for edge in self.edges:
if self.id == edge.src:

View file

@ -1,23 +1,28 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Union
from core.gui.dialogs.shapemod import ShapeDialog
from core.gui.graph import tags
from core.gui.graph.shapeutils import ShapeType
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.graph.graph import CanvasGraph
class AnnotationData:
def __init__(
self,
text="",
font="Arial",
font_size=12,
text_color="#000000",
fill_color="",
border_color="#000000",
border_width=1,
bold=False,
italic=False,
underline=False,
text: str = "",
font: str = "Arial",
font_size: int = 12,
text_color: str = "#000000",
fill_color: str = "",
border_color: str = "#000000",
border_width: int = 1,
bold: bool = False,
italic: bool = False,
underline: bool = False,
):
self.text = text
self.font = font
@ -32,7 +37,17 @@ class AnnotationData:
class Shape:
def __init__(self, app, canvas, shape_type, x1, y1, x2=None, y2=None, data=None):
def __init__(
self,
app: "Application",
canvas: "CanvasGraph",
shape_type: ShapeType,
x1: float,
y1: float,
x2: float = None,
y2: float = None,
data: AnnotationData = None,
):
self.app = app
self.canvas = canvas
self.shape_type = shape_type
@ -99,7 +114,7 @@ class Shape:
logging.error("unknown shape type: %s", self.shape_type)
self.created = True
def get_font(self):
def get_font(self) -> List[Union[int, str]]:
font = [self.shape_data.font, self.shape_data.font_size]
if self.shape_data.bold:
font.append("bold")
@ -123,10 +138,10 @@ class Shape:
font=font,
)
def shape_motion(self, x1, y1):
def shape_motion(self, x1: float, y1: float):
self.canvas.coords(self.id, self.x1, self.y1, x1, y1)
def shape_complete(self, x, y):
def shape_complete(self, x: float, y: float):
for component in tags.ABOVE_SHAPE:
self.canvas.tag_raise(component)
s = ShapeDialog(self.app, self.app, self)
@ -135,7 +150,7 @@ class Shape:
def disappear(self):
self.canvas.delete(self.id)
def motion(self, x_offset, y_offset):
def motion(self, x_offset: float, y_offset: float):
original_position = self.canvas.coords(self.id)
self.canvas.move(self.id, x_offset, y_offset)
coords = self.canvas.coords(self.id)
@ -151,7 +166,7 @@ class Shape:
self.canvas.delete(self.id)
self.canvas.delete(self.text_id)
def metadata(self):
def metadata(self) -> Dict[str, Union[str, int, bool]]:
coords = self.canvas.coords(self.id)
# update coords to actual positions
if len(coords) == 4:

View file

@ -11,13 +11,13 @@ class ShapeType(enum.Enum):
SHAPES = {ShapeType.OVAL, ShapeType.RECTANGLE}
def is_draw_shape(shape_type):
def is_draw_shape(shape_type: ShapeType) -> bool:
return shape_type in SHAPES
def is_shape_text(shape_type):
def is_shape_text(shape_type: ShapeType) -> bool:
return shape_type == ShapeType.TEXT
def is_marker(shape_type):
def is_marker(shape_type: ShapeType) -> bool:
return shape_type == ShapeType.MARKER

View file

@ -1,8 +1,12 @@
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.themes import Styles
if TYPE_CHECKING:
from core.gui.graph.graph import CanvasGraph
class CanvasTooltip:
"""
@ -19,7 +23,14 @@ class CanvasTooltip:
Alberto Vassena on 2016.12.10.
"""
def __init__(self, canvas, *, pad=(5, 3, 5, 3), waittime=400, wraplength=600):
def __init__(
self,
canvas: "CanvasGraph",
*,
pad=(5, 3, 5, 3),
waittime: int = 400,
wraplength: int = 600
):
# in miliseconds, originally 500
self.waittime = waittime
# in pixels, originally 180
@ -30,10 +41,10 @@ class CanvasTooltip:
self.id = None
self.tw = None
def on_enter(self, event=None):
def on_enter(self, event: tk.Event = None):
self.schedule()
def on_leave(self, event=None):
def on_leave(self, event: tk.Event = None):
self.unschedule()
self.hide()
@ -47,7 +58,7 @@ class CanvasTooltip:
if id_:
self.canvas.after_cancel(id_)
def show(self, event=None):
def show(self, event: tk.Event = None):
def tip_pos_calculator(canvas, label, *, tip_delta=(10, 5), pad=(5, 3, 5, 3)):
c = canvas
s_width, s_height = c.winfo_screenwidth(), c.winfo_screenheight()

View file

@ -9,7 +9,7 @@ class Images:
images = {}
@classmethod
def create(cls, file_path, width, height=None):
def create(cls, file_path: str, width: int, height: int = None):
if height is None:
height = width
image = Image.open(file_path)
@ -22,12 +22,12 @@ class Images:
cls.images[image.stem] = str(image)
@classmethod
def get(cls, image_enum, width, height=None):
def get(cls, image_enum: Enum, width: int, height: int = None):
file_path = cls.images[image_enum.value]
return cls.create(file_path, width, height)
@classmethod
def get_custom(cls, name, width, height=None):
def get_custom(cls, name: str, width: int, height: int = None):
file_path = cls.images[name]
return cls.create(file_path, width, height)

View file

@ -1,24 +1,30 @@
import logging
import random
from typing import TYPE_CHECKING, Set, Union
from netaddr import IPNetwork
from core.gui.nodeutils import NodeUtils
if TYPE_CHECKING:
from core.gui.app import Application
from core.api.grpc import core_pb2
from core.gui.graph.node import CanvasNode
def random_mac():
return ("{:02x}" * 6).format(*[random.randrange(256) for _ in range(6)])
class InterfaceManager:
def __init__(self, app, address="10.0.0.0", mask=24):
def __init__(self, app: "Application", address: str = "10.0.0.0", mask: int = 24):
self.app = app
self.mask = mask
self.base_prefix = max(self.mask - 8, 0)
self.subnets = IPNetwork(f"{address}/{self.base_prefix}")
self.current_subnet = None
def next_subnet(self):
def next_subnet(self) -> IPNetwork:
# define currently used subnets
used_subnets = set()
for edge in self.app.core.links.values():
@ -38,17 +44,19 @@ class InterfaceManager:
def reset(self):
self.current_subnet = None
def get_ips(self, node_id):
def get_ips(self, node_id: int) -> [str, str, int]:
ip4 = self.current_subnet[node_id]
ip6 = ip4.ipv6()
prefix = self.current_subnet.prefixlen
return str(ip4), str(ip6), prefix
@classmethod
def get_subnet(cls, interface):
def get_subnet(cls, interface: "core_pb2.Interface") -> IPNetwork:
return IPNetwork(f"{interface.ip4}/{interface.ip4mask}").cidr
def determine_subnet(self, canvas_src_node, canvas_dst_node):
def determine_subnet(
self, canvas_src_node: "CanvasNode", canvas_dst_node: "CanvasNode"
):
src_node = canvas_src_node.core_node
dst_node = canvas_dst_node.core_node
is_src_container = NodeUtils.is_container_node(src_node.type)
@ -70,7 +78,9 @@ class InterfaceManager:
else:
logging.info("ignoring subnet change for link between network nodes")
def find_subnet(self, canvas_node, visited=None):
def find_subnet(
self, canvas_node: "CanvasNode", visited: Set[int] = None
) -> Union[IPNetwork, None]:
logging.info("finding subnet for node: %s", canvas_node.core_node.name)
canvas = self.app.canvas
cidr = None

View file

@ -3,8 +3,10 @@ The actions taken when each menubar option is clicked
"""
import logging
import tkinter as tk
import webbrowser
from tkinter import filedialog, messagebox
from typing import TYPE_CHECKING
from core.gui.appconfig import XMLS_PATH
from core.gui.dialogs.about import AboutDialog
@ -19,29 +21,24 @@ from core.gui.dialogs.sessions import SessionsDialog
from core.gui.dialogs.throughput import ThroughputDialog
from core.gui.task import BackgroundTask
if TYPE_CHECKING:
from core.gui.app import Application
class MenuAction:
"""
Actions performed when choosing menu items
"""
def __init__(self, app, master):
def __init__(self, app: "Application", master: tk.Tk):
self.master = master
self.app = app
self.canvas = app.canvas
def cleanup_old_session(self, quitapp=False):
def cleanup_old_session(self):
logging.info("cleaning up old session")
self.app.core.stop_session()
self.app.core.delete_session()
# if quitapp:
# self.app.quit()
def prompt_save_running_session(self, quitapp=False):
def prompt_save_running_session(self, quitapp: bool = False):
"""
Prompt use to stop running session before application is closed
:return: nothing
"""
result = True
if self.app.core.is_runtime():
@ -56,15 +53,13 @@ class MenuAction:
elif quitapp:
self.app.quit()
def on_quit(self, event=None):
def on_quit(self, event: tk.Event = None):
"""
Prompt user whether so save running session, and then close the application
:return: nothing
"""
self.prompt_save_running_session(quitapp=True)
def file_save_as_xml(self, event=None):
def file_save_as_xml(self, event: tk.Event = None):
logging.info("menuaction.py file_save_as_xml()")
file_path = filedialog.asksaveasfilename(
initialdir=str(XMLS_PATH),
@ -75,7 +70,7 @@ class MenuAction:
if file_path:
self.app.core.save_xml(file_path)
def file_open_xml(self, event=None):
def file_open_xml(self, event: tk.Event = None):
logging.info("menuaction.py file_open_xml()")
file_path = filedialog.askopenfilename(
initialdir=str(XMLS_PATH),
@ -141,11 +136,11 @@ class MenuAction:
else:
self.app.core.cancel_throughputs()
def copy(self, event=None):
def copy(self, event: tk.Event = None):
logging.debug("copy")
self.app.canvas.copy()
def paste(self, event=None):
def paste(self, event: tk.Event = None):
logging.debug("paste")
self.app.canvas.paste()

View file

@ -1,22 +1,22 @@
import tkinter as tk
from functools import partial
from typing import TYPE_CHECKING
import core.gui.menuaction as action
from core.gui.coreclient import OBSERVERS
if TYPE_CHECKING:
from core.gui.app import Application
class Menubar(tk.Menu):
"""
Core menubar
"""
def __init__(self, master, app, cnf={}, **kwargs):
def __init__(self, master: tk.Tk, app: "Application", cnf={}, **kwargs):
"""
Create a CoreMenubar instance
:param master:
:param tkinter.Menu menubar: menubar object
:param coretk.app.Application app: application object
"""
super().__init__(master, cnf, **kwargs)
self.master.config(menu=self)
@ -27,8 +27,6 @@ class Menubar(tk.Menu):
def draw(self):
"""
Create core menubar and bind the hot keys to their matching command
:return: nothing
"""
self.draw_file_menu()
self.draw_edit_menu()
@ -42,8 +40,6 @@ class Menubar(tk.Menu):
def draw_file_menu(self):
"""
Create file menu
:return: nothing
"""
menu = tk.Menu(self)
menu.add_command(
@ -81,8 +77,6 @@ class Menubar(tk.Menu):
def draw_edit_menu(self):
"""
Create edit menu
:return: nothing
"""
menu = tk.Menu(self)
menu.add_command(label="Preferences", command=self.menuaction.gui_preferences)
@ -112,8 +106,6 @@ class Menubar(tk.Menu):
def draw_canvas_menu(self):
"""
Create canvas menu
:return: nothing
"""
menu = tk.Menu(self)
menu.add_command(
@ -136,8 +128,6 @@ class Menubar(tk.Menu):
def draw_view_menu(self):
"""
Create view menu
:return: nothing
"""
view_menu = tk.Menu(self)
self.create_show_menu(view_menu)
@ -149,12 +139,9 @@ class Menubar(tk.Menu):
view_menu.add_command(label="Zoom out", accelerator="-", state=tk.DISABLED)
self.add_cascade(label="View", menu=view_menu)
def create_show_menu(self, view_menu):
def create_show_menu(self, view_menu: tk.Menu):
"""
Create the menu items in View/Show
:param tkinter.Menu view_menu: the view menu
:return: nothing
"""
menu = tk.Menu(view_menu)
menu.add_command(label="All", state=tk.DISABLED)
@ -169,12 +156,9 @@ class Menubar(tk.Menu):
menu.add_command(label="API Messages", state=tk.DISABLED)
view_menu.add_cascade(label="Show", menu=menu)
def create_experimental_menu(self, tools_menu):
def create_experimental_menu(self, tools_menu: tk.Menu):
"""
Create experimental menu item and the sub menu items inside
:param tkinter.Menu tools_menu: tools menu
:return: nothing
"""
menu = tk.Menu(tools_menu)
menu.add_command(label="Plugins...", state=tk.DISABLED)
@ -182,12 +166,9 @@ class Menubar(tk.Menu):
menu.add_command(label="Topology partitioning...", state=tk.DISABLED)
tools_menu.add_cascade(label="Experimental", menu=menu)
def create_random_menu(self, topology_generator_menu):
def create_random_menu(self, topology_generator_menu: tk.Menu):
"""
Create random menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
# list of number of random nodes to create
@ -197,12 +178,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Random", menu=menu)
def create_grid_menu(self, topology_generator_menu):
def create_grid_menu(self, topology_generator_menu: tk.Menu):
"""
Create grid menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology_generator_menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
# list of number of nodes to create
@ -212,12 +190,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Grid", menu=menu)
def create_connected_grid_menu(self, topology_generator_menu):
def create_connected_grid_menu(self, topology_generator_menu: tk.Menu):
"""
Create connected grid menu items and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
for i in range(1, 11, 1):
@ -229,12 +204,9 @@ class Menubar(tk.Menu):
menu.add_cascade(label=label, menu=submenu)
topology_generator_menu.add_cascade(label="Connected Grid", menu=menu)
def create_chain_menu(self, topology_generator_menu):
def create_chain_menu(self, topology_generator_menu: tk.Menu):
"""
Create chain menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
# number of nodes to create
@ -244,12 +216,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Chain", menu=menu)
def create_star_menu(self, topology_generator_menu):
def create_star_menu(self, topology_generator_menu: tk.Menu):
"""
Create star menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
for i in range(3, 26, 1):
@ -257,12 +226,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Star", menu=menu)
def create_cycle_menu(self, topology_generator_menu):
def create_cycle_menu(self, topology_generator_menu: tk.Menu):
"""
Create cycle menu item and the sub items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
for i in range(3, 25, 1):
@ -270,12 +236,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Cycle", menu=menu)
def create_wheel_menu(self, topology_generator_menu):
def create_wheel_menu(self, topology_generator_menu: tk.Menu):
"""
Create wheel menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
for i in range(4, 26, 1):
@ -283,12 +246,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Wheel", menu=menu)
def create_cube_menu(self, topology_generator_menu):
def create_cube_menu(self, topology_generator_menu: tk.Menu):
"""
Create cube menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
for i in range(2, 7, 1):
@ -296,12 +256,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Cube", menu=menu)
def create_clique_menu(self, topology_generator_menu):
def create_clique_menu(self, topology_generator_menu: tk.Menu):
"""
Create clique menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology generator menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
for i in range(3, 25, 1):
@ -309,12 +266,9 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Clique", menu=menu)
def create_bipartite_menu(self, topology_generator_menu):
def create_bipartite_menu(self, topology_generator_menu: tk.Menu):
"""
Create bipartite menu item and the sub menu items inside
:param tkinter.Menu topology_generator_menu: topology_generator_menu
:return: nothing
"""
menu = tk.Menu(topology_generator_menu)
temp = 24
@ -328,13 +282,9 @@ class Menubar(tk.Menu):
temp = temp - 1
topology_generator_menu.add_cascade(label="Bipartite", menu=menu)
def create_topology_generator_menu(self, tools_menu):
def create_topology_generator_menu(self, tools_menu: tk.Menu):
"""
Create topology menu item and its sub menu items
:param tkinter.Menu tools_menu: tools menu
:return: nothing
"""
menu = tk.Menu(tools_menu)
self.create_random_menu(menu)
@ -352,8 +302,6 @@ class Menubar(tk.Menu):
def draw_tools_menu(self):
"""
Create tools menu
:return: nothing
"""
menu = tk.Menu(self)
menu.add_command(label="Auto rearrange all", state=tk.DISABLED)
@ -371,12 +319,9 @@ class Menubar(tk.Menu):
menu.add_command(label="Debugger...", state=tk.DISABLED)
self.add_cascade(label="Tools", menu=menu)
def create_observer_widgets_menu(self, widget_menu):
def create_observer_widgets_menu(self, widget_menu: tk.Menu):
"""
Create observer widget menu item and create the sub menu items inside
:param tkinter.Menu widget_menu: widget_menu
:return: nothing
"""
var = tk.StringVar(value="none")
menu = tk.Menu(widget_menu)
@ -409,12 +354,9 @@ class Menubar(tk.Menu):
)
widget_menu.add_cascade(label="Observer Widgets", menu=menu)
def create_adjacency_menu(self, widget_menu):
def create_adjacency_menu(self, widget_menu: tk.Menu):
"""
Create adjacency menu item and the sub menu items inside
:param tkinter.Menu widget_menu: widget menu
:return: nothing
"""
menu = tk.Menu(widget_menu)
menu.add_command(label="OSPFv2", state=tk.DISABLED)
@ -426,8 +368,6 @@ class Menubar(tk.Menu):
def draw_widgets_menu(self):
"""
Create widget menu
:return: nothing
"""
menu = tk.Menu(self)
self.create_observer_widgets_menu(menu)
@ -443,8 +383,6 @@ class Menubar(tk.Menu):
def draw_session_menu(self):
"""
Create session menu
:return: nothing
"""
menu = tk.Menu(self)
menu.add_command(
@ -461,8 +399,6 @@ class Menubar(tk.Menu):
def draw_help_menu(self):
"""
Create help menu
:return: nothing
"""
menu = tk.Menu(self)
menu.add_command(

View file

@ -1,22 +1,34 @@
from typing import TYPE_CHECKING, Optional, Set
from core.api.grpc.core_pb2 import NodeType
from core.gui.images import ImageEnum, Images
if TYPE_CHECKING:
from core.api.grpc import core_pb2
ICON_SIZE = 48
ANTENNA_SIZE = 32
class NodeDraw:
def __init__(self):
self.custom = False
self.custom: bool = False
self.image = None
self.image_enum = None
self.image_enum: Optional[ImageEnum] = None
self.image_file = None
self.node_type = None
self.model = None
self.services = set()
self.node_type: core_pb2.NodeType = None
self.model: Optional[str] = None
self.services: Set[str] = set()
@classmethod
def from_setup(cls, image_enum, node_type, label, model=None, tooltip=None):
def from_setup(
cls,
image_enum: ImageEnum,
node_type: "core_pb2.NodeType",
label: str,
model: str = None,
tooltip=None,
):
node_draw = NodeDraw()
node_draw.image_enum = image_enum
node_draw.image = Images.get(image_enum, ICON_SIZE)
@ -27,7 +39,7 @@ class NodeDraw:
return node_draw
@classmethod
def from_custom(cls, name, image_file, services):
def from_custom(cls, name: str, image_file: str, services: Set[str]):
node_draw = NodeDraw()
node_draw.custom = True
node_draw.image_file = image_file
@ -53,31 +65,31 @@ class NodeUtils:
ANTENNA_ICON = None
@classmethod
def is_ignore_node(cls, node_type):
def is_ignore_node(cls, node_type: NodeType) -> bool:
return node_type in cls.IGNORE_NODES
@classmethod
def is_container_node(cls, node_type):
def is_container_node(cls, node_type: NodeType) -> bool:
return node_type in cls.CONTAINER_NODES
@classmethod
def is_model_node(cls, node_type):
def is_model_node(cls, node_type: NodeType) -> bool:
return node_type == NodeType.DEFAULT
@classmethod
def is_image_node(cls, node_type):
def is_image_node(cls, node_type: NodeType) -> bool:
return node_type in cls.IMAGE_NODES
@classmethod
def is_wireless_node(cls, node_type):
def is_wireless_node(cls, node_type: NodeType) -> bool:
return node_type in cls.WIRELESS_NODES
@classmethod
def is_rj45_node(cls, node_type):
def is_rj45_node(cls, node_type: NodeType) -> bool:
return node_type in cls.RJ45_NODES
@classmethod
def node_icon(cls, node_type, model):
def node_icon(cls, node_type: NodeType, model: str) -> bool:
if model == "":
model = None
return cls.NODE_ICONS[(node_type, model)]

View file

@ -1,13 +1,19 @@
"status bar"
"""
status bar
"""
import tkinter as tk
from tkinter import ttk
from typing import TYPE_CHECKING
from core.gui.dialogs.alerts import AlertsDialog
from core.gui.themes import Styles
if TYPE_CHECKING:
from core.gui.app import Application
class StatusBar(ttk.Frame):
def __init__(self, master, app, **kwargs):
def __init__(self, master: "Application", app: "Application", **kwargs):
super().__init__(master, **kwargs)
self.app = app
self.status = None
@ -68,9 +74,5 @@ class StatusBar(ttk.Frame):
dialog = AlertsDialog(self.app, self.app)
dialog.show()
def set_status(self, message):
def set_status(self, message: str):
self.statusvar.set(message)
def stop_session_callback(self, cleanup_time):
self.progress_bar.stop()
self.statusvar.set(f"Stopped in {cleanup_time:.3f} seconds")

View file

@ -1,9 +1,10 @@
import logging
import threading
from typing import Any, Callable
class BackgroundTask:
def __init__(self, master, task, callback=None, args=()):
def __init__(self, master: Any, task: Callable, callback: Callable = None, args=()):
self.master = master
self.args = args
self.task = task

View file

@ -33,7 +33,7 @@ class Colors:
listboxbg = "#f2f1f0"
def load(style):
def load(style: ttk.Style):
style.theme_create(
THEME_DARK,
"clam",
@ -141,13 +141,13 @@ def load(style):
)
def theme_change_menu(event):
def theme_change_menu(event: tk.Event):
if not isinstance(event.widget, tk.Menu):
return
style_menu(event.widget)
def style_menu(widget):
def style_menu(widget: tk.Widget):
style = ttk.Style()
bg = style.lookup(".", "background")
fg = style.lookup(".", "foreground")
@ -159,7 +159,7 @@ def style_menu(widget):
)
def style_listbox(widget):
def style_listbox(widget: tk.Widget):
style = ttk.Style()
bg = style.lookup(".", "background")
fg = style.lookup(".", "foreground")
@ -176,7 +176,7 @@ def style_listbox(widget):
)
def theme_change(event):
def theme_change(event: tk.Event):
style = ttk.Style()
style.configure(Styles.picker_button, font=("TkDefaultFont", 8, "normal"))
style.configure(

View file

@ -4,17 +4,23 @@ import tkinter as tk
from functools import partial
from tkinter import messagebox, ttk
from tkinter.font import Font
from typing import TYPE_CHECKING, Callable
from core.api.grpc import core_pb2
from core.gui.dialogs.customnodes import CustomNodesDialog
from core.gui.dialogs.marker import MarkerDialog
from core.gui.graph.enums import GraphMode
from core.gui.graph.shapeutils import ShapeType, is_marker
from core.gui.images import ImageEnum, Images
from core.gui.nodeutils import NodeUtils
from core.gui.nodeutils import NodeDraw, NodeUtils
from core.gui.task import BackgroundTask
from core.gui.themes import Styles
from core.gui.tooltip import Tooltip
if TYPE_CHECKING:
from core.gui.app import Application
from PIL import ImageTk
TOOLBAR_SIZE = 32
PICKER_SIZE = 24
@ -28,11 +34,9 @@ class Toolbar(ttk.Frame):
Core toolbar class
"""
def __init__(self, master, app, **kwargs):
def __init__(self, master: "Application", app: "Application", **kwargs):
"""
Create a CoreToolbar instance
:param tkinter.Frame edit_frame: edit frame
"""
super().__init__(master, **kwargs)
self.app = app
@ -100,7 +104,7 @@ class Toolbar(ttk.Frame):
self.create_network_button()
self.create_annotation_button()
def design_select(self, button):
def design_select(self, button: ttk.Button):
logging.info("selecting design button: %s", button)
self.select_button.state(["!pressed"])
self.link_button.state(["!pressed"])
@ -109,7 +113,7 @@ class Toolbar(ttk.Frame):
self.annotation_button.state(["!pressed"])
button.state(["pressed"])
def runtime_select(self, button):
def runtime_select(self, button: ttk.Button):
logging.info("selecting runtime button: %s", button)
self.runtime_select_button.state(["!pressed"])
self.stop_button.state(["!pressed"])
@ -185,7 +189,7 @@ class Toolbar(ttk.Frame):
0, lambda: self.show_picker(self.node_button, self.node_picker)
)
def show_picker(self, button, picker):
def show_picker(self, button: ttk.Button, picker: ttk.Frame):
x = self.winfo_width() + 1
y = button.winfo_rooty() - picker.master.winfo_rooty() - 1
picker.place(x=x, y=y)
@ -195,7 +199,9 @@ class Toolbar(ttk.Frame):
self.wait_window(picker)
self.app.unbind_all("<ButtonRelease-1>")
def create_picker_button(self, image, func, frame, label):
def create_picker_button(
self, image: "ImageTk.PhotoImage", func: Callable, frame: ttk.Frame, label: str
):
"""
Create button and put it on the frame
@ -203,7 +209,6 @@ class Toolbar(ttk.Frame):
:param func: the command that is executed when button is clicked
:param tkinter.Frame frame: frame that contains the button
:param str label: button label
:return: nothing
"""
button = ttk.Button(
frame, image=image, text=label, compound=tk.TOP, style=Styles.picker_button
@ -212,7 +217,13 @@ class Toolbar(ttk.Frame):
button.bind("<ButtonRelease-1>", lambda e: func())
button.grid(pady=1)
def create_button(self, frame, image, func, tooltip):
def create_button(
self,
frame: ttk.Frame,
image: "ImageTk.PhotoImage",
func: Callable,
tooltip: str,
):
button = ttk.Button(frame, image=image, command=func)
button.image = image
button.grid(sticky="ew")
@ -233,8 +244,6 @@ class Toolbar(ttk.Frame):
"""
Start session handler redraw buttons, send node and link messages to grpc
server.
:return: nothing
"""
self.app.canvas.hide_context()
self.app.statusbar.progress_bar.start(5)
@ -243,7 +252,7 @@ class Toolbar(ttk.Frame):
task = BackgroundTask(self, self.app.core.start_session, self.start_callback)
task.start()
def start_callback(self, response):
def start_callback(self, response: core_pb2.StartSessionResponse):
self.app.statusbar.progress_bar.stop()
total = time.perf_counter() - self.time
message = f"Start ran for {total:.3f} seconds"
@ -275,7 +284,7 @@ class Toolbar(ttk.Frame):
dialog = CustomNodesDialog(self.app, self.app)
dialog.show()
def update_button(self, button, image, node_draw):
def update_button(self, button: ttk.Button, image: "ImageTk", node_draw: NodeDraw):
logging.info("update button(%s): %s", button, node_draw)
self.hide_pickers()
button.configure(image=image)
@ -298,8 +307,6 @@ class Toolbar(ttk.Frame):
def create_node_button(self):
"""
Create network layer button
:return: nothing
"""
image = icon(ImageEnum.ROUTER)
self.node_button = ttk.Button(
@ -312,8 +319,6 @@ class Toolbar(ttk.Frame):
def draw_network_picker(self):
"""
Draw the options for link-layer button.
:return: nothing
"""
self.hide_pickers()
self.network_picker = ttk.Frame(self.master)
@ -337,8 +342,6 @@ class Toolbar(ttk.Frame):
"""
Create link-layer node button and the options that represent different
link-layer node types.
:return: nothing
"""
image = icon(ImageEnum.HUB)
self.network_button = ttk.Button(
@ -351,8 +354,6 @@ class Toolbar(ttk.Frame):
def draw_annotation_picker(self):
"""
Draw the options for marker button.
:return: nothing
"""
self.hide_pickers()
self.annotation_picker = ttk.Frame(self.master)
@ -379,8 +380,6 @@ class Toolbar(ttk.Frame):
def create_annotation_button(self):
"""
Create marker button and options that represent different marker types
:return: nothing
"""
image = icon(ImageEnum.MARKER)
self.annotation_button = ttk.Button(
@ -417,8 +416,6 @@ class Toolbar(ttk.Frame):
def click_stop(self):
"""
redraw buttons on the toolbar, send node and link messages to grpc server
:return: nothing
"""
self.app.canvas.hide_context()
self.app.statusbar.progress_bar.start(5)
@ -426,7 +423,7 @@ class Toolbar(ttk.Frame):
task = BackgroundTask(self, self.app.core.stop_session, self.stop_callback)
task.start()
def stop_callback(self, response):
def stop_callback(self, response: core_pb2.StopSessionResponse):
self.app.statusbar.progress_bar.stop()
self.set_design()
total = time.perf_counter() - self.time
@ -436,7 +433,7 @@ class Toolbar(ttk.Frame):
if not response.result:
messagebox.showerror("Stop Error", "Errors stopping session")
def update_annotation(self, image, shape_type):
def update_annotation(self, image: "ImageTk.PhotoImage", shape_type: ShapeType):
logging.info("clicked annotation: ")
self.hide_pickers()
self.annotation_button.configure(image=image)
@ -446,7 +443,7 @@ class Toolbar(ttk.Frame):
if is_marker(shape_type):
if self.marker_tool:
self.marker_tool.destroy()
self.marker_tool = MarkerDialog(self.master, self.app)
self.marker_tool = MarkerDialog(self.app, self.app)
self.marker_tool.show()
def click_run_button(self):
@ -462,7 +459,7 @@ class Toolbar(ttk.Frame):
self.app.canvas.annotation_type = ShapeType.MARKER
if self.marker_tool:
self.marker_tool.destroy()
self.marker_tool = MarkerDialog(self.master, self.app)
self.marker_tool = MarkerDialog(self.app, self.app)
self.marker_tool.show()
def click_two_node_button(self):

View file

@ -9,7 +9,7 @@ class Tooltip(object):
Create tool tip for a given widget
"""
def __init__(self, widget, text="widget info"):
def __init__(self, widget: tk.Widget, text: str = "widget info"):
self.widget = widget
self.text = text
self.widget.bind("<Enter>", self.on_enter)
@ -18,10 +18,10 @@ class Tooltip(object):
self.id = None
self.tw = None
def on_enter(self, event=None):
def on_enter(self, event: tk.Event = None):
self.schedule()
def on_leave(self, event=None):
def on_leave(self, event: tk.Event = None):
self.unschedule()
self.close(event)
@ -35,7 +35,7 @@ class Tooltip(object):
if id_:
self.widget.after_cancel(id_)
def enter(self, event=None):
def enter(self, event: tk.Event = None):
x, y, cx, cy = self.widget.bbox("insert")
x += self.widget.winfo_rootx()
y += self.widget.winfo_rooty() + 32
@ -50,6 +50,6 @@ class Tooltip(object):
label = ttk.Label(frame, text=self.text, style=Styles.tooltip)
label.grid()
def close(self, event=None):
def close(self, event: tk.Event = None):
if self.tw:
self.tw.destroy()

View file

@ -3,13 +3,17 @@ input validation
"""
import re
import tkinter as tk
from typing import TYPE_CHECKING
import netaddr
from netaddr import IPNetwork
if TYPE_CHECKING:
from core.gui.app import Application
class InputValidation:
def __init__(self, app):
def __init__(self, app: "Application"):
self.master = app.master
self.positive_int = None
self.positive_float = None
@ -27,7 +31,7 @@ class InputValidation:
self.rgb = self.master.register(self.check_rbg)
self.hex = self.master.register(self.check_hex)
def ip_focus_out(self, event):
def ip_focus_out(self, event: tk.Event):
value = event.widget.get()
try:
IPNetwork(value)
@ -35,12 +39,12 @@ class InputValidation:
event.widget.delete(0, tk.END)
event.widget.insert(tk.END, "invalid")
def focus_out(self, event, default):
def focus_out(self, event: tk.Event, default: str):
value = event.widget.get()
if value == "":
event.widget.insert(tk.END, default)
def check_positive_int(self, s):
def check_positive_int(self, s: str) -> bool:
if len(s) == 0:
return True
try:
@ -51,7 +55,7 @@ class InputValidation:
except ValueError:
return False
def check_positive_float(self, s):
def check_positive_float(self, s: str) -> bool:
if len(s) == 0:
return True
try:
@ -62,7 +66,7 @@ class InputValidation:
except ValueError:
return False
def check_node_name(self, s):
def check_node_name(self, s: str) -> bool:
if len(s) < 0:
return False
if len(s) == 0:
@ -72,7 +76,7 @@ class InputValidation:
return False
return True
def check_canvas_int(sefl, s):
def check_canvas_int(self, s: str) -> bool:
if len(s) == 0:
return True
try:
@ -83,7 +87,7 @@ class InputValidation:
except ValueError:
return False
def check_canvas_float(self, s):
def check_canvas_float(self, s: str) -> bool:
if not s:
return True
try:
@ -94,7 +98,7 @@ class InputValidation:
except ValueError:
return False
def check_ip4(self, s):
def check_ip4(self, s: str) -> bool:
if not s:
return True
pat = re.compile("^([0-9]+[.])*[0-9]*$")
@ -113,7 +117,7 @@ class InputValidation:
else:
return False
def check_rbg(self, s):
def check_rbg(self, s: str) -> bool:
if not s:
return True
if s.startswith("0") and len(s) >= 2:
@ -127,7 +131,7 @@ class InputValidation:
except ValueError:
return False
def check_hex(self, s):
def check_hex(self, s: str) -> bool:
if not s:
return True
pat = re.compile("^([#]([0-9]|[a-f])+)$|^[#]$")

View file

@ -1,12 +1,18 @@
import logging
import tkinter as tk
from functools import partial
from pathlib import PosixPath
from tkinter import filedialog, font, ttk
from typing import TYPE_CHECKING, Dict
from core.api.grpc import core_pb2
from core.gui import themes
from core.gui.themes import FRAME_PAD, PADX, PADY
if TYPE_CHECKING:
from core.gui.app import Application
from core.gui.dialogs.dialog import Dialog
INT_TYPES = {
core_pb2.ConfigOptionType.UINT8,
core_pb2.ConfigOptionType.UINT16,
@ -19,14 +25,14 @@ INT_TYPES = {
}
def file_button_click(value, parent):
def file_button_click(value: tk.StringVar, parent: tk.Widget):
file_path = filedialog.askopenfilename(title="Select File", parent=parent)
if file_path:
value.set(file_path)
class FrameScroll(ttk.Frame):
def __init__(self, master, app, _cls=ttk.Frame, **kw):
def __init__(self, master: tk.Widget, app: "Application", _cls=ttk.Frame, **kw):
super().__init__(master, **kw)
self.app = app
self.rowconfigure(0, weight=1)
@ -49,13 +55,13 @@ class FrameScroll(ttk.Frame):
self.frame.bind("<Configure>", self._configure_frame)
self.canvas.bind("<Configure>", self._configure_canvas)
def _configure_frame(self, event):
def _configure_frame(self, event: tk.Event):
req_width = self.frame.winfo_reqwidth()
if req_width != self.canvas.winfo_reqwidth():
self.canvas.configure(width=req_width)
self.canvas.configure(scrollregion=self.canvas.bbox("all"))
def _configure_canvas(self, event):
def _configure_canvas(self, event: tk.Event):
self.canvas.itemconfig(self.frame_id, width=event.width)
def clear(self):
@ -64,7 +70,13 @@ class FrameScroll(ttk.Frame):
class ConfigFrame(ttk.Notebook):
def __init__(self, master, app, config, **kw):
def __init__(
self,
master: tk.Widget,
app: "Application",
config: Dict[str, core_pb2.ConfigOption],
**kw
):
super().__init__(master, **kw)
self.app = app
self.config = config
@ -174,7 +186,7 @@ class ConfigFrame(ttk.Notebook):
class ListboxScroll(ttk.Frame):
def __init__(self, master=None, **kw):
def __init__(self, master: tk.Widget = None, **kw):
super().__init__(master, **kw)
self.columnconfigure(0, weight=1)
self.rowconfigure(0, weight=1)
@ -189,12 +201,12 @@ class ListboxScroll(ttk.Frame):
class CheckboxList(FrameScroll):
def __init__(self, master, app, clicked=None, **kw):
def __init__(self, master: ttk.Widget, app: "Application", clicked=None, **kw):
super().__init__(master, app, **kw)
self.clicked = clicked
self.frame.columnconfigure(0, weight=1)
def add(self, name, checked):
def add(self, name: str, checked: bool):
var = tk.BooleanVar(value=checked)
func = partial(self.clicked, name, var)
checkbox = ttk.Checkbutton(self.frame, text=name, variable=var, command=func)
@ -207,7 +219,7 @@ class CodeFont(font.Font):
class CodeText(ttk.Frame):
def __init__(self, master, **kwargs):
def __init__(self, master: tk.Widget, **kwargs):
super().__init__(master, **kwargs)
self.rowconfigure(0, weight=1)
self.columnconfigure(0, weight=1)
@ -231,14 +243,14 @@ class CodeText(ttk.Frame):
class Spinbox(ttk.Entry):
def __init__(self, master=None, **kwargs):
def __init__(self, master: tk.Widget = None, **kwargs):
super().__init__(master, "ttk::spinbox", **kwargs)
def set(self, value):
self.tk.call(self._w, "set", value)
def image_chooser(parent, path):
def image_chooser(parent: "Dialog", path: PosixPath):
return filedialog.askopenfilename(
parent=parent,
initialdir=str(path),

View file

@ -0,0 +1,9 @@
"""
corens3
Python package containing CORE components for use
with the ns-3 simulator.
See http://code.google.com/p/coreemu/
for more information on CORE.
"""

View file

@ -0,0 +1,550 @@
"""
ns3.py: defines classes for running emulations with ns-3 simulated networks.
"""
import logging
import subprocess
import threading
import time
import ns.core
import ns.internet
import ns.lte
import ns.mobility
import ns.network
import ns.tap_bridge
import ns.wifi
import ns.wimax
from core import constants
from core.emulator.enumerations import EventTypes
from core.emulator.enumerations import LinkTypes
from core.emulator.enumerations import NodeTypes
from core.utils import make_tuple
from core.location.mobility import WayPointMobility
from core.nodes.base import CoreNode, CoreNetworkBase
from core.emulator.session import Session
ns.core.GlobalValue.Bind(
"SimulatorImplementationType",
ns.core.StringValue("ns3::RealtimeSimulatorImpl")
)
ns.core.GlobalValue.Bind(
"ChecksumEnabled",
ns.core.BooleanValue("true")
)
class CoreNs3Node(CoreNode, ns.network.Node):
"""
The CoreNs3Node is both a CoreNode backed by a network namespace and
an ns-3 Node simulator object. When linked to simulated networks, the TunTap
device will be used.
"""
def __init__(self, *args, **kwds):
ns.network.Node.__init__(self)
# ns-3 ID starts at 0, CORE uses 1
_id = self.GetId() + 1
if '_id' not in kwds:
kwds['_id'] = _id
CoreNode.__init__(self, *args, **kwds)
def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None):
"""
Add a network interface. If we are attaching to a CoreNs3Net, this
will be a TunTap. Otherwise dispatch to CoreNode.newnetif().
"""
if not addrlist:
addrlist = []
if not isinstance(net, CoreNs3Net):
return CoreNode.newnetif(self, net, addrlist, hwaddr, ifindex, ifname)
ifindex = self.newtuntap(ifindex, ifname)
self.attachnet(ifindex, net)
netif = self.netif(ifindex)
netif.sethwaddr(hwaddr)
for addr in make_tuple(addrlist):
netif.addaddr(addr)
addrstr = netif.addrlist[0]
addr, mask = addrstr.split('/')
tap = net._tapdevs[netif]
tap.SetAttribute(
"IpAddress",
ns.network.Ipv4AddressValue(ns.network.Ipv4Address(addr))
)
tap.SetAttribute(
"Netmask",
ns.network.Ipv4MaskValue(ns.network.Ipv4Mask(f"/{mask}"))
)
ns.core.Simulator.Schedule(ns.core.Time("0"), netif.install)
return ifindex
def getns3position(self):
"""
Return the ns-3 (x, y, z) position of a node.
"""
try:
mm = self.GetObject(ns.mobility.MobilityModel.GetTypeId())
pos = mm.GetPosition()
return pos.x, pos.y, pos.z
except AttributeError:
self.warn("ns-3 mobility model not found")
return 0, 0, 0
def setns3position(self, x, y, z):
"""
Set the ns-3 (x, y, z) position of a node.
"""
try:
mm = self.GetObject(ns.mobility.MobilityModel.GetTypeId())
if z is None:
z = 0.0
mm.SetPosition(ns.core.Vector(x, y, z))
except AttributeError:
self.warn("ns-3 mobility model not found, not setting position")
class CoreNs3Net(CoreNetworkBase):
"""
The CoreNs3Net is a helper PyCoreNet object. Networks are represented
entirely in simulation with the TunTap device bridging the emulated and
simulated worlds.
"""
apitype = NodeTypes.WIRELESS_LAN.value
linktype = LinkTypes.WIRELESS.value
# icon used
type = "wlan"
def __init__(
self, session, _id=None, name=None, start=True, server=None
):
CoreNetworkBase.__init__(self, session, _id, name, start, server)
self.tapbridge = ns.tap_bridge.TapBridgeHelper()
self._ns3devs = {}
self._tapdevs = {}
def attach(self, netif):
"""
Invoked from netif.attach(). Create a TAP device using the TapBridge
object. Call getns3dev() to get model-specific device.
"""
self._netif[netif] = netif
self._linked[netif] = {}
ns3dev = self.getns3dev(netif.node)
tap = self.tapbridge.Install(netif.node, ns3dev)
tap.SetMode(ns.tap_bridge.TapBridge.CONFIGURE_LOCAL)
tap.SetAttribute(
"DeviceName",
ns.core.StringValue(netif.localname)
)
self._ns3devs[netif] = ns3dev
self._tapdevs[netif] = tap
def getns3dev(self, node):
"""
Implement depending on network helper. Install this network onto
the given node and return the device. Register the ns3 device into
self._ns3devs
"""
raise NotImplementedError
def findns3dev(self, node):
"""
Given a node, return the interface and ns3 device associated with
this network.
"""
for netif in node.netifs():
if netif in self._ns3devs:
return netif, self._ns3devs[netif]
return None, None
def shutdown(self):
"""
Session.shutdown() will invoke this.
"""
pass
def usecorepositions(self):
"""
Set position callbacks for interfaces on this net so the CORE GUI
can update the ns-3 node position when moved with the mouse.
"""
for netif in self.netifs():
netif.poshook = self.setns3position
def setns3position(self, netif, x, y, z):
logging.info("setns3position: %s (%s, %s, %s)", netif.node.name, x, y, z)
netif.node.setns3position(x, y, z)
class Ns3LteNet(CoreNs3Net):
def __init__(self, *args, **kwds):
"""
Uses a LteHelper to create an ns-3 based LTE network.
"""
CoreNs3Net.__init__(self, *args, **kwds)
self.lte = ns.lte.LteHelper()
# enhanced NodeB node list
self.enbnodes = []
self.dlsubchannels = None
self.ulsubchannels = None
def setsubchannels(self, downlink, uplink):
"""
Set the downlink/uplink subchannels, which are a list of ints.
These should be set prior to using CoreNs3Node.newnetif().
"""
self.dlsubchannels = downlink
self.ulsubchannels = uplink
def setnodeb(self, node):
"""
Mark the given node as a nodeb (base transceiver station)
"""
self.enbnodes.append(node)
def linknodeb(self, node, nodeb, mob, mobb):
"""
Register user equipment with a nodeb.
Optionally install mobility model while we have the ns-3 devs handy.
"""
_tmp, nodebdev = self.findns3dev(nodeb)
_tmp, dev = self.findns3dev(node)
if nodebdev is None or dev is None:
raise KeyError("ns-3 device for node not found")
self.lte.RegisterUeToTheEnb(dev, nodebdev)
if mob:
self.lte.AddMobility(dev.GetPhy(), mob)
if mobb:
self.lte.AddDownlinkChannelRealization(mobb, mob, dev.GetPhy())
def getns3dev(self, node):
"""
Get the ns3 NetDevice using the LteHelper.
"""
if node in self.enbnodes:
devtype = ns.lte.LteHelper.DEVICE_TYPE_ENODEB
else:
devtype = ns.lte.LteHelper.DEVICE_TYPE_USER_EQUIPMENT
nodes = ns.network.NodeContainer(node)
devs = self.lte.Install(nodes, devtype)
devs.Get(0).GetPhy().SetDownlinkSubChannels(self.dlsubchannels)
devs.Get(0).GetPhy().SetUplinkSubChannels(self.ulsubchannels)
return devs.Get(0)
def attach(self, netif):
"""
Invoked from netif.attach(). Create a TAP device using the TapBridge
object. Call getns3dev() to get model-specific device.
"""
self._netif[netif] = netif
self._linked[netif] = {}
ns3dev = self.getns3dev(netif.node)
self.tapbridge.SetAttribute("Mode", ns.core.StringValue("UseLocal"))
# self.tapbridge.SetAttribute("Mode",
# ns.core.IntegerValue(ns.tap_bridge.TapBridge.USE_LOCAL))
tap = self.tapbridge.Install(netif.node, ns3dev)
# tap.SetMode(ns.tap_bridge.TapBridge.USE_LOCAL)
logging.info("using TAP device %s for %s/%s", netif.localname, netif.node.name, netif.name)
subprocess.check_call(['tunctl', '-t', netif.localname, '-n'])
# check_call([IP_BIN, 'link', 'set', 'dev', netif.localname, \
# 'address', '%s' % netif.hwaddr])
subprocess.check_call([constants.IP_BIN, 'link', 'set', netif.localname, 'up'])
tap.SetAttribute("DeviceName", ns.core.StringValue(netif.localname))
self._ns3devs[netif] = ns3dev
self._tapdevs[netif] = tap
class Ns3WifiNet(CoreNs3Net):
def __init__(self, *args, **kwds):
"""
Uses a WifiHelper to create an ns-3 based Wifi network.
"""
rate = kwds.pop('rate', 'OfdmRate54Mbps')
CoreNs3Net.__init__(self, *args, **kwds)
self.wifi = ns.wifi.WifiHelper().Default()
self.wifi.SetStandard(ns.wifi.WIFI_PHY_STANDARD_80211a)
self.wifi.SetRemoteStationManager(
"ns3::ConstantRateWifiManager",
"DataMode",
ns.core.StringValue(rate),
"NonUnicastMode",
ns.core.StringValue(rate)
)
self.mac = ns.wifi.NqosWifiMacHelper.Default()
self.mac.SetType("ns3::AdhocWifiMac")
channel = ns.wifi.YansWifiChannelHelper.Default()
self.phy = ns.wifi.YansWifiPhyHelper.Default()
self.phy.SetChannel(channel.Create())
def getns3dev(self, node):
"""
Get the ns3 NetDevice using the WifiHelper.
"""
devs = self.wifi.Install(self.phy, self.mac, node)
return devs.Get(0)
class Ns3WimaxNet(CoreNs3Net):
def __init__(self, *args, **kwds):
CoreNs3Net.__init__(self, *args, **kwds)
self.wimax = ns.wimax.WimaxHelper()
self.scheduler = ns.wimax.WimaxHelper.SCHED_TYPE_SIMPLE
self.phy = ns.wimax.WimaxHelper.SIMPLE_PHY_TYPE_OFDM
# base station node list
self.bsnodes = []
def setbasestation(self, node):
self.bsnodes.append(node)
def getns3dev(self, node):
if node in self.bsnodes:
devtype = ns.wimax.WimaxHelper.DEVICE_TYPE_BASE_STATION
else:
devtype = ns.wimax.WimaxHelper.DEVICE_TYPE_SUBSCRIBER_STATION
nodes = ns.network.NodeContainer(node)
devs = self.wimax.Install(nodes, devtype, self.phy, self.scheduler)
if node not in self.bsnodes:
devs.Get(0).SetModulationType(ns.wimax.WimaxPhy.MODULATION_TYPE_QAM16_12)
# debug
self.wimax.EnableAscii("wimax-device-%s" % node.name, devs)
return devs.Get(0)
@staticmethod
def ipv4netifaddr(netif):
for addr in netif.addrlist:
if ':' in addr:
# skip ipv6
continue
ip = ns.network.Ipv4Address(addr.split('/')[0])
mask = ns.network.Ipv4Mask('/' + addr.split('/')[1])
return ip, mask
return None, None
def addflow(self, node1, node2, upclass, downclass):
"""
Add a Wimax service flow between two nodes.
"""
netif1, ns3dev1 = self.findns3dev(node1)
netif2, ns3dev2 = self.findns3dev(node2)
if not netif1 or not netif2:
raise ValueError("interface not found")
addr1, mask1 = self.ipv4netifaddr(netif1)
addr2, mask2 = self.ipv4netifaddr(netif2)
clargs1 = (addr1, mask1, addr2, mask2) + downclass
clargs2 = (addr2, mask2, addr1, mask1) + upclass
clrec1 = ns.wimax.IpcsClassifierRecord(*clargs1)
clrec2 = ns.wimax.IpcsClassifierRecord(*clargs2)
ns3dev1.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_DOWN,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec1)
)
ns3dev1.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_UP,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec2)
)
ns3dev2.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_DOWN,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec2)
)
ns3dev2.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_UP,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec1)
)
class Ns3Session(Session):
"""
A Session that starts an ns-3 simulation thread.
"""
def __init__(self, _id, persistent=False, duration=600):
self.duration = duration
self.nodes = ns.network.NodeContainer()
self.mobhelper = ns.mobility.MobilityHelper()
Session.__init__(self, _id)
def run(self, vis=False):
"""
Run the ns-3 simulation and return the simulator thread.
"""
def runthread():
ns.core.Simulator.Stop(ns.core.Seconds(self.duration))
logging.info("running ns-3 simulation for %d seconds", self.duration)
if vis:
try:
import visualizer
except ImportError:
logging.exception("visualizer is not available")
ns.core.Simulator.Run()
else:
visualizer.start()
else:
ns.core.Simulator.Run()
# self.evq.run() # event queue may have WayPointMobility events
self.set_state(EventTypes.RUNTIME_STATE, send_event=True)
t = threading.Thread(target=runthread)
t.daemon = True
t.start()
return t
def shutdown(self):
# TODO: the following line tends to segfault ns-3 (and therefore core-daemon)
ns.core.Simulator.Destroy()
Session.shutdown(self)
def addnode(self, name):
"""
A convenience helper for Session.addobj(), for adding CoreNs3Nodes
to this session. Keeps a NodeContainer for later use.
"""
n = self.create_node(cls=CoreNs3Node, name=name)
self.nodes.Add(n)
return n
def setupconstantmobility(self):
"""
Install a ConstantPositionMobilityModel.
"""
palloc = ns.mobility.ListPositionAllocator()
for i in xrange(self.nodes.GetN()):
(x, y, z) = ((100.0 * i) + 50, 200.0, 0.0)
palloc.Add(ns.core.Vector(x, y, z))
node = self.nodes.Get(i)
node.position.set(x, y, z)
self.mobhelper.SetPositionAllocator(palloc)
self.mobhelper.SetMobilityModel("ns3::ConstantPositionMobilityModel")
self.mobhelper.Install(self.nodes)
def setuprandomwalkmobility(self, bounds, time=10, speed=25.0):
"""
Set up the random walk mobility model within a bounding box.
- bounds is the max (x, y, z) boundary
- time is the number of seconds to maintain the current speed
and direction
- speed is the maximum speed, with node speed randomly chosen
from [0, speed]
"""
x, y, z = map(float, bounds)
self.mobhelper.SetPositionAllocator(
"ns3::RandomBoxPositionAllocator",
"X",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % x),
"Y",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % y),
"Z",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % z)
)
self.mobhelper.SetMobilityModel(
"ns3::RandomWalk2dMobilityModel",
"Mode", ns.core.StringValue("Time"),
"Time", ns.core.StringValue("%ss" % time),
"Speed",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % speed),
"Bounds", ns.core.StringValue("0|%s|0|%s" % (x, y))
)
self.mobhelper.Install(self.nodes)
def startns3mobility(self, refresh_ms=300):
"""
Start a thread that updates CORE nodes based on their ns-3
positions.
"""
self.set_state(EventTypes.INSTANTIATION_STATE)
self.mobilitythread = threading.Thread(
target=self.ns3mobilitythread,
args=(refresh_ms,))
self.mobilitythread.daemon = True
self.mobilitythread.start()
def ns3mobilitythread(self, refresh_ms):
"""
Thread target that updates CORE nodes every refresh_ms based on
their ns-3 positions.
"""
valid_states = (
EventTypes.RUNTIME_STATE.value,
EventTypes.INSTANTIATION_STATE.value
)
while self.state in valid_states:
for i in xrange(self.nodes.GetN()):
node = self.nodes.Get(i)
x, y, z = node.getns3position()
if (x, y, z) == node.position.get():
continue
# from WayPointMobility.setnodeposition(node, x, y, z)
node.position.set(x, y, z)
node_data = node.data(0)
self.broadcast_node(node_data)
self.sdt.updatenode(node.id, flags=0, x=x, y=y, z=z)
time.sleep(0.001 * refresh_ms)
def setupmobilitytracing(self, net, filename, nodes):
"""
Start a tracing thread using the ASCII output from the ns3
mobility helper.
"""
net.mobility = WayPointMobility(session=self, _id=net.id)
net.mobility.setendtime()
net.mobility.refresh_ms = 300
net.mobility.empty_queue_stop = False
of = ns.network.OutputStreamWrapper(filename, filemode=0o777)
self.mobhelper.EnableAsciiAll(of)
self.mobilitytracethread = threading.Thread(
target=self.mobilitytrace,
args=(net, filename, nodes)
)
self.mobilitytracethread.daemon = True
self.mobilitytracethread.start()
def mobilitytrace(self, net, filename, nodes, verbose):
nodemap = {}
# move nodes to initial positions
for node in nodes:
x, y, z = node.getns3position()
net.mobility.setnodeposition(node, x, y, z)
nodemap[node.GetId()] = node
logging.info("mobilitytrace opening '%s'", filename)
f = None
try:
f = open(filename)
f.seek(0, 2)
sleep = 0.001
kickstart = True
while True:
if self.state != EventTypes.RUNTIME_STATE.value:
break
line = f.readline()
if not line:
time.sleep(sleep)
if sleep < 1.0:
sleep += 0.001
continue
sleep = 0.001
items = dict(x.split("=") for x in line.split())
logging.info("trace: %s %s %s", items['node'], items['pos'], items['vel'])
x, y, z = map(float, items['pos'].split(':'))
vel = map(float, items['vel'].split(':'))
node = nodemap[int(items['node'])]
net.mobility.addwaypoint(time=0, nodenum=node.id, x=x, y=y, z=z, speed=vel)
if kickstart:
kickstart = False
self.event_loop.add_event(0, net.mobility.start)
self.event_loop.run()
else:
if net.mobility.state != net.mobility.STATE_RUNNING:
net.mobility.state = net.mobility.STATE_RUNNING
self.event_loop.add_event(0, net.mobility.runround)
except IOError:
logging.exception("mobilitytrace error opening: %s", filename)
finally:
if f:
f.close()

19
ns3/setup.py Normal file
View file

@ -0,0 +1,19 @@
import glob
from setuptools import setup
_EXAMPLES_DIR = "share/corens3/examples"
setup(
name="core-ns3",
version="5.5.2",
packages=[
"corens3",
],
data_files=[(_EXAMPLES_DIR, glob.glob("examples/*"))],
description="Python ns-3 components of CORE",
url="https://github.com/coreemu/core",
author="Boeing Research & Technology",
license="GPLv2",
long_description="Python scripts and modules for building virtual simulated networks."
)