""" Incorporate grpc into python tkinter GUI """ import getpass import json import logging import os import tkinter as tk from pathlib import Path from tkinter import messagebox from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple import grpc from core.api.grpc import client, configservices_pb2, core_pb2 from core.api.grpc.wrappers import ( ConfigOption, ConfigService, EmaneModelConfig, Event, ExceptionEvent, Link, LinkEvent, LinkType, MessageType, Node, NodeEvent, NodeServiceData, NodeType, Position, Server, ServiceConfig, ServiceFileConfig, Session, SessionLocation, SessionState, ThroughputsEvent, ) from core.gui import nodeutils as nutils from core.gui.appconfig import XMLS_PATH, CoreServer, Observer from core.gui.dialogs.emaneinstall import EmaneInstallDialog from core.gui.dialogs.mobilityplayer import MobilityPlayer from core.gui.dialogs.sessions import SessionsDialog from core.gui.graph.edges import CanvasEdge from core.gui.graph.node import CanvasNode from core.gui.interface import InterfaceManager from core.gui.nodeutils import NodeDraw logger = logging.getLogger(__name__) if TYPE_CHECKING: from core.gui.app import Application GUI_SOURCE = "gui" CPU_USAGE_DELAY = 3 def to_dict(config: Dict[str, ConfigOption]) -> Dict[str, str]: return {x: y.value for x, y in config.items()} class CoreClient: def __init__(self, app: "Application", proxy: bool) -> None: """ Create a CoreGrpc instance """ self.app: "Application" = app self.master: tk.Tk = app.master self._client: client.CoreGrpcClient = client.CoreGrpcClient(proxy=proxy) self.session: Optional[Session] = None self.user = getpass.getuser() # menu options self.show_throughputs: tk.BooleanVar = tk.BooleanVar(value=False) # global service settings self.services: Dict[str, Set[str]] = {} self.config_services_groups: Dict[str, Set[str]] = {} self.config_services: Dict[str, ConfigService] = {} # loaded configuration data self.emane_models: List[str] = [] self.servers: Dict[str, CoreServer] = {} self.custom_nodes: Dict[str, NodeDraw] = {} self.custom_observers: Dict[str, Observer] = {} self.read_config() # helpers self.iface_to_edge: Dict[Tuple[int, ...], CanvasEdge] = {} self.ifaces_manager: InterfaceManager = InterfaceManager(self.app) self.observer: Optional[str] = None # session data self.mobility_players: Dict[int, MobilityPlayer] = {} self.canvas_nodes: Dict[int, CanvasNode] = {} self.links: Dict[str, CanvasEdge] = {} self.handling_throughputs: Optional[grpc.Future] = None self.handling_cpu_usage: Optional[grpc.Future] = None self.handling_events: Optional[grpc.Future] = None @property def client(self) -> client.CoreGrpcClient: if self.session: if not self._client.check_session(self.session.id): throughputs_enabled = self.handling_throughputs is not None self.cancel_throughputs() self.cancel_events() self._client.create_session(self.session.id) self.handling_events = self._client.events( self.session.id, self.handle_events ) if throughputs_enabled: self.enable_throughputs() self.setup_cpu_usage() return self._client def set_canvas_node(self, node: Node, canvas_node: CanvasNode) -> None: self.canvas_nodes[node.id] = canvas_node def get_canvas_node(self, node_id: int) -> CanvasNode: return self.canvas_nodes[node_id] def reset(self) -> None: # helpers self.ifaces_manager.reset() self.iface_to_edge.clear() # session data self.canvas_nodes.clear() self.links.clear() self.close_mobility_players() self.mobility_players.clear() # clear streams self.cancel_throughputs() self.cancel_events() def close_mobility_players(self) -> None: for mobility_player in self.mobility_players.values(): mobility_player.close() def set_observer(self, value: Optional[str]) -> None: self.observer = value def read_config(self) -> None: # read distributed servers for server in self.app.guiconfig.servers: self.servers[server.name] = server # read custom nodes for custom_node in self.app.guiconfig.nodes: node_draw = NodeDraw.from_custom(custom_node) self.custom_nodes[custom_node.name] = node_draw # read observers for observer in self.app.guiconfig.observers: self.custom_observers[observer.name] = observer def handle_events(self, event: Event) -> None: if not self.session or event.source == GUI_SOURCE: return if event.session_id != self.session.id: logger.warning( "ignoring event session(%s) current(%s)", event.session_id, self.session.id, ) return if event.link_event: self.app.after(0, self.handle_link_event, event.link_event) elif event.session_event: logger.info("session event: %s", event) session_event = event.session_event if session_event.event <= SessionState.SHUTDOWN.value: self.session.state = SessionState(session_event.event) elif session_event.event in {7, 8, 9}: node_id = session_event.node_id dialog = self.mobility_players.get(node_id) if dialog: if session_event.event == 7: dialog.set_play() elif session_event.event == 8: dialog.set_stop() else: dialog.set_pause() else: logger.warning("unknown session event: %s", session_event) elif event.node_event: self.app.after(0, self.handle_node_event, event.node_event) elif event.config_event: logger.info("config event: %s", event) elif event.exception_event: self.handle_exception_event(event.exception_event) else: logger.info("unhandled event: %s", event) def handle_link_event(self, event: LinkEvent) -> None: logger.debug("Link event: %s", event) node1_id = event.link.node1_id node2_id = event.link.node2_id if node1_id == node2_id: logger.warning("ignoring links with loops: %s", event) return canvas_node1 = self.canvas_nodes[node1_id] canvas_node2 = self.canvas_nodes[node2_id] if event.link.type == LinkType.WIRELESS: if event.message_type == MessageType.ADD: self.app.manager.add_wireless_edge( canvas_node1, canvas_node2, event.link ) elif event.message_type == MessageType.DELETE: self.app.manager.delete_wireless_edge( canvas_node1, canvas_node2, event.link ) elif event.message_type == MessageType.NONE: self.app.manager.update_wireless_edge( canvas_node1, canvas_node2, event.link ) else: logger.warning("unknown link event: %s", event) else: if event.message_type == MessageType.ADD: self.app.manager.add_wired_edge(canvas_node1, canvas_node2, event.link) elif event.message_type == MessageType.DELETE: self.app.manager.delete_wired_edge(event.link) elif event.message_type == MessageType.NONE: self.app.manager.update_wired_edge(event.link) else: logger.warning("unknown link event: %s", event) def handle_node_event(self, event: NodeEvent) -> None: logger.debug("node event: %s", event) node = event.node if event.message_type == MessageType.NONE: canvas_node = self.canvas_nodes[node.id] x = node.position.x y = node.position.y canvas_node.move(x, y) if node.icon and node.icon != canvas_node.core_node.icon: canvas_node.update_icon(node.icon) elif event.message_type == MessageType.DELETE: canvas_node = self.canvas_nodes[node.id] canvas_node.canvas_delete() elif event.message_type == MessageType.ADD: if node.id in self.session.nodes: logger.error("core node already exists: %s", node) self.app.manager.add_core_node(node) else: logger.warning("unknown node event: %s", event) def enable_throughputs(self) -> None: if not self.handling_throughputs: self.handling_throughputs = self.client.throughputs( self.session.id, self.handle_throughputs ) def cancel_throughputs(self) -> None: if self.handling_throughputs: self.handling_throughputs.cancel() self.handling_throughputs = None self.app.manager.clear_throughputs() def cancel_events(self) -> None: if self.handling_events: self.handling_events.cancel() self.handling_events = None def cancel_cpu_usage(self) -> None: if self.handling_cpu_usage: self.handling_cpu_usage.cancel() self.handling_cpu_usage = None def setup_cpu_usage(self) -> None: if self.handling_cpu_usage and self.handling_cpu_usage.running(): return if self.handling_cpu_usage: self.handling_cpu_usage.cancel() self.handling_cpu_usage = self._client.cpu_usage( CPU_USAGE_DELAY, self.handle_cpu_event ) def handle_throughputs(self, event: ThroughputsEvent) -> None: if event.session_id != self.session.id: logger.warning( "ignoring throughput event session(%s) current(%s)", event.session_id, self.session.id, ) return logger.debug("handling throughputs event: %s", event) self.app.after(0, self.app.manager.set_throughputs, event) def handle_cpu_event(self, event: core_pb2.CpuUsageEvent) -> None: self.app.after(0, self.app.statusbar.set_cpu, event.usage) def handle_exception_event(self, event: ExceptionEvent) -> None: logger.info("exception event: %s", event) self.app.statusbar.add_alert(event) def update_session_title(self) -> None: title_file = self.session.file.name if self.session.file else "" self.master.title(f"CORE Session({self.session.id}) {title_file}") def join_session(self, session_id: int) -> None: logger.info("joining session(%s)", session_id) self.reset() try: self.session = self.client.get_session(session_id) self.session.user = self.user self.update_session_title() self.handling_events = self.client.events( self.session.id, self.handle_events ) self.ifaces_manager.joined(self.session.links) self.app.manager.join(self.session) if self.is_runtime(): self.show_mobility_players() self.app.after(0, self.app.joined_session_update) except grpc.RpcError as e: self.app.show_grpc_exception("Join Session Error", e) def is_runtime(self) -> bool: return self.session and self.session.state == SessionState.RUNTIME def create_new_session(self) -> None: """ Create a new session """ try: session = self.client.create_session() logger.info("created session: %s", session.id) self.join_session(session.id) location_config = self.app.guiconfig.location self.session.location = SessionLocation( x=location_config.x, y=location_config.y, z=location_config.z, lat=location_config.lat, lon=location_config.lon, alt=location_config.alt, scale=location_config.scale, ) except grpc.RpcError as e: self.app.show_grpc_exception("New Session Error", e) def delete_session(self, session_id: int = None) -> None: if session_id is None and not self.session: return if session_id is None: session_id = self.session.id try: response = self.client.delete_session(session_id) logger.info("deleted session(%s), Result: %s", session_id, response) except grpc.RpcError as e: self.app.show_grpc_exception("Delete Session Error", e) def setup(self, session_id: int = None) -> None: """ Query sessions, if there exist any, prompt whether to join one """ try: self.client.connect() # get current core configurations services/config services core_config = self.client.get_config() self.emane_models = sorted(core_config.emane_models) for service in core_config.services: group_services = self.services.setdefault(service.group, set()) group_services.add(service.name) for service in core_config.config_services: self.config_services[service.name] = service group_services = self.config_services_groups.setdefault( service.group, set() ) group_services.add(service.name) # join provided session, create new session, or show dialog to select an # existing session sessions = self.client.get_sessions() if session_id: session_ids = set(x.id for x in sessions) if session_id not in session_ids: self.app.show_error( "Join Session Error", f"{session_id} does not exist", blocking=True, ) self.app.close() else: self.join_session(session_id) else: if not sessions: self.create_new_session() else: dialog = SessionsDialog(self.app, True) dialog.show() except grpc.RpcError as e: logger.exception("core setup error") self.app.show_grpc_exception("Setup Error", e, blocking=True) self.app.close() def edit_node(self, core_node: Node) -> None: try: self.client.move_node( self.session.id, core_node.id, core_node.position, source=GUI_SOURCE ) except grpc.RpcError as e: self.app.show_grpc_exception("Edit Node Error", e) def get_links(self, definition: bool = False) -> List[Link]: if not definition: self.ifaces_manager.set_macs([x.link for x in self.links.values()]) links = [] for edge in self.links.values(): link = edge.link if not definition: node1 = self.session.nodes[link.node1_id] node2 = self.session.nodes[link.node2_id] if nutils.is_container(node1) and link.iface1 and not link.iface1.mac: link.iface1.mac = self.ifaces_manager.next_mac() if nutils.is_container(node2) and link.iface2 and not link.iface2.mac: link.iface2.mac = self.ifaces_manager.next_mac() links.append(link) if edge.asymmetric_link: links.append(edge.asymmetric_link) return links def start_session(self, definition: bool = False) -> Tuple[bool, List[str]]: self.session.links = self.get_links(definition) self.session.metadata = self.get_metadata() self.session.servers.clear() for server in self.servers.values(): self.session.servers.append(Server(name=server.name, host=server.address)) result = False exceptions = [] try: result, exceptions = self.client.start_session(self.session, definition) logger.info( "start session(%s) definition(%s), result: %s", self.session.id, definition, result, ) if self.show_throughputs.get(): self.enable_throughputs() except grpc.RpcError as e: self.app.show_grpc_exception("Start Session Error", e) return result, exceptions def stop_session(self, session_id: int = None) -> bool: session_id = session_id or self.session.id self.cancel_throughputs() result = False try: result = self.client.stop_session(session_id) logger.info("stopped session(%s), result: %s", session_id, result) except grpc.RpcError as e: self.app.show_grpc_exception("Stop Session Error", e) return result def show_mobility_players(self) -> None: for node in self.session.nodes.values(): if not nutils.is_mobility(node): continue if node.mobility_config: mobility_player = MobilityPlayer(self.app, node) self.mobility_players[node.id] = mobility_player mobility_player.show() def get_metadata(self) -> Dict[str, str]: # create canvas data canvas_config = self.app.manager.get_metadata() canvas_config = json.dumps(canvas_config) # create shapes data shapes = [] for canvas in self.app.manager.all(): for shape in canvas.shapes.values(): shapes.append(shape.metadata()) shapes = json.dumps(shapes) # create edges config edges_config = [] for edge in self.links.values(): if not edge.is_customized(): continue edge_config = dict(token=edge.token, width=edge.width, color=edge.color) edges_config.append(edge_config) edges_config = json.dumps(edges_config) # create hidden metadata hidden = [x.core_node.id for x in self.canvas_nodes.values() if x.hidden] hidden = json.dumps(hidden) # save metadata return dict( canvas=canvas_config, shapes=shapes, edges=edges_config, hidden=hidden ) def launch_terminal(self, node_id: int) -> None: try: terminal = self.app.guiconfig.preferences.terminal if not terminal: messagebox.showerror( "Terminal Error", "No terminal set, please set within the preferences menu", parent=self.app, ) return node_term = self.client.get_node_terminal(self.session.id, node_id) cmd = f"{terminal} {node_term} &" logger.info("launching terminal %s", cmd) os.system(cmd) except grpc.RpcError as e: self.app.show_grpc_exception("Node Terminal Error", e) def get_xml_dir(self) -> str: return str(self.session.file.parent) if self.session.file else str(XMLS_PATH) def save_xml(self, file_path: Path = None) -> bool: """ Save core session as to an xml file """ if not file_path and not self.session.file: logger.error("trying to save xml for session with no file") return False if not file_path: file_path = self.session.file result = False try: if not self.is_runtime(): logger.debug("sending session data to the daemon") result, exceptions = self.start_session(definition=True) if not result: message = "\n".join(exceptions) self.app.show_exception_data( "Session Definition Exception", "Failed to define session", message, ) self.client.save_xml(self.session.id, str(file_path)) if self.session.file != file_path: self.session.file = file_path self.update_session_title() logger.info("saved xml file %s", file_path) result = True except grpc.RpcError as e: self.app.show_grpc_exception("Save XML Error", e) return result def open_xml(self, file_path: Path) -> None: """ Open core xml """ try: result, session_id = self._client.open_xml(file_path) logger.info( "open xml file %s, result(%s) session(%s)", file_path, result, session_id, ) self.join_session(session_id) except grpc.RpcError as e: self.app.show_grpc_exception("Open XML Error", e) def get_node_service(self, node_id: int, service_name: str) -> NodeServiceData: node_service = self.client.get_node_service( self.session.id, node_id, service_name ) logger.debug( "get node(%s) service(%s): %s", node_id, service_name, node_service ) return node_service def get_node_service_file( self, node_id: int, service_name: str, file_name: str ) -> str: data = self.client.get_node_service_file( self.session.id, node_id, service_name, file_name ) logger.debug( "get service file for node(%s), service: %s, file: %s, data: %s", node_id, service_name, file_name, data, ) return data def close(self) -> None: """ Clean ups when done using grpc """ logger.debug("close grpc") self.client.close() def next_node_id(self) -> int: """ Get the next usable node id. """ i = 1 while True: if i not in self.session.nodes: break i += 1 return i def create_node( self, x: float, y: float, node_type: NodeType, model: str ) -> Optional[Node]: """ Add node, with information filled in, to grpc manager """ node_id = self.next_node_id() position = Position(x=x, y=y) image = None if nutils.has_image(node_type): image = "ubuntu:latest" emane = None if node_type == NodeType.EMANE: if not self.emane_models: dialog = EmaneInstallDialog(self.app) dialog.show() return emane = self.emane_models[0] name = f"emane{node_id}" elif node_type == NodeType.WIRELESS_LAN: name = f"wlan{node_id}" elif node_type in [NodeType.RJ45, NodeType.TUNNEL]: name = "unassigned" else: name = f"n{node_id}" node = Node( id=node_id, type=node_type, name=name, model=model, position=position, image=image, emane=emane, ) if nutils.is_custom(node): services = nutils.get_custom_services(self.app.guiconfig, model) node.config_services = set(services) # assign default services to CORE node else: services = self.session.default_services.get(model) if services: node.config_services = set(services) logger.info( "add node(%s) to session(%s), coordinates(%s, %s)", node.name, self.session.id, x, y, ) self.session.nodes[node.id] = node return node def deleted_canvas_nodes(self, canvas_nodes: List[CanvasNode]) -> None: """ remove the nodes selected by the user and anything related to that node such as link, configurations, interfaces """ for canvas_node in canvas_nodes: node = canvas_node.core_node del self.canvas_nodes[node.id] del self.session.nodes[node.id] def deleted_canvas_edges(self, edges: Iterable[CanvasEdge]) -> None: links = [] for edge in edges: del self.links[edge.token] links.append(edge.link) self.ifaces_manager.removed(links) def save_edge(self, edge: CanvasEdge) -> None: self.links[edge.token] = edge src_node = edge.src.core_node dst_node = edge.dst.core_node if edge.link.iface1: src_iface_id = edge.link.iface1.id self.iface_to_edge[(src_node.id, src_iface_id)] = edge if edge.link.iface2: dst_iface_id = edge.link.iface2.id self.iface_to_edge[(dst_node.id, dst_iface_id)] = edge def get_wlan_configs(self) -> List[Tuple[int, Dict[str, str]]]: configs = [] for node in self.session.nodes.values(): if node.type != NodeType.WIRELESS_LAN: continue if not node.wlan_config: continue config = ConfigOption.to_dict(node.wlan_config) configs.append((node.id, config)) return configs def get_mobility_configs(self) -> List[Tuple[int, Dict[str, str]]]: configs = [] for node in self.session.nodes.values(): if not nutils.is_mobility(node): continue if not node.mobility_config: continue config = ConfigOption.to_dict(node.mobility_config) configs.append((node.id, config)) return configs def get_emane_model_configs(self) -> List[EmaneModelConfig]: configs = [] for node in self.session.nodes.values(): for key, config in node.emane_model_configs.items(): model, iface_id = key # config = ConfigOption.to_dict(config) if iface_id is None: iface_id = -1 config = EmaneModelConfig( node_id=node.id, model=model, iface_id=iface_id, config=config ) configs.append(config) return configs def get_service_configs(self) -> List[ServiceConfig]: configs = [] for node in self.session.nodes.values(): if not nutils.is_container(node): continue if not node.service_configs: continue for name, config in node.service_configs.items(): config = ServiceConfig( node_id=node.id, service=name, files=config.configs, directories=config.dirs, startup=config.startup, validate=config.validate, shutdown=config.shutdown, ) configs.append(config) return configs def get_service_file_configs(self) -> List[ServiceFileConfig]: configs = [] for node in self.session.nodes.values(): if not nutils.is_container(node): continue if not node.service_file_configs: continue for service, file_configs in node.service_file_configs.items(): for file, data in file_configs.items(): config = ServiceFileConfig(node.id, service, file, data) configs.append(config) return configs def get_config_service_rendered(self, node_id: int, name: str) -> Dict[str, str]: return self.client.get_config_service_rendered(self.session.id, node_id, name) def get_config_service_configs_proto( self ) -> List[configservices_pb2.ConfigServiceConfig]: config_service_protos = [] for node in self.session.nodes.values(): if not nutils.is_container(node): continue if not node.config_service_configs: continue for name, service_config in node.config_service_configs.items(): config_proto = configservices_pb2.ConfigServiceConfig( node_id=node.id, name=name, templates=service_config.templates, config=service_config.config, ) config_service_protos.append(config_proto) return config_service_protos def run(self, node_id: int) -> str: logger.info("running node(%s) cmd: %s", node_id, self.observer) _, output = self.client.node_command(self.session.id, node_id, self.observer) return output def get_wlan_config(self, node_id: int) -> Dict[str, ConfigOption]: config = self.client.get_wlan_config(self.session.id, node_id) logger.debug( "get wlan configuration from node %s, result configuration: %s", node_id, config, ) return config def get_wireless_config(self, node_id: int) -> Dict[str, ConfigOption]: return self.client.get_wireless_config(self.session.id, node_id) def get_mobility_config(self, node_id: int) -> Dict[str, ConfigOption]: config = self.client.get_mobility_config(self.session.id, node_id) logger.debug( "get mobility config from node %s, result configuration: %s", node_id, config, ) return config def get_emane_model_config( self, node_id: int, model: str, iface_id: int = None ) -> Dict[str, ConfigOption]: if iface_id is None: iface_id = -1 config = self.client.get_emane_model_config( self.session.id, node_id, model, iface_id ) logger.debug( "get emane model config: node id: %s, EMANE model: %s, " "interface: %s, config: %s", node_id, model, iface_id, config, ) return config def execute_script(self, script: str, options: str) -> None: session_id = self.client.execute_script(script, options) logger.info("execute python script %s", session_id) if session_id != -1: self.join_session(session_id) def add_link(self, link: Link) -> None: result, _, _ = self.client.add_link(self.session.id, link, source=GUI_SOURCE) logger.debug("added link: %s", result) if not result: logger.error("error adding link: %s", link) def edit_link(self, link: Link) -> None: result = self.client.edit_link(self.session.id, link, source=GUI_SOURCE) if not result: logger.error("error editing link: %s", link)