diff --git a/daemon/core/api/grpc/events.py b/daemon/core/api/grpc/events.py index 75f9eb2e..5c873a43 100644 --- a/daemon/core/api/grpc/events.py +++ b/daemon/core/api/grpc/events.py @@ -1,6 +1,6 @@ import logging from queue import Empty, Queue -from typing import Iterable +from typing import Iterable, Optional from core.api.grpc import core_pb2 from core.api.grpc.grpcutils import convert_link @@ -15,7 +15,7 @@ from core.emulator.data import ( from core.emulator.session import Session -def handle_node_event(node_data: NodeData) -> core_pb2.NodeEvent: +def handle_node_event(node_data: NodeData) -> core_pb2.Event: """ Handle node event when there is a node event @@ -36,98 +36,105 @@ def handle_node_event(node_data: NodeData) -> core_pb2.NodeEvent: geo=geo, services=services, ) - return core_pb2.NodeEvent(node=node_proto, source=node_data.source) + node_event = core_pb2.NodeEvent(node=node_proto) + return core_pb2.Event(node_event=node_event, source=node_data.source) -def handle_link_event(event: LinkData) -> core_pb2.LinkEvent: +def handle_link_event(link_data: LinkData) -> core_pb2.Event: """ Handle link event when there is a link event - :param event: link data + :param link_data: link data :return: link event that has message type and link information """ - link = convert_link(event) - return core_pb2.LinkEvent(message_type=event.message_type.value, link=link) + link = convert_link(link_data) + message_type = link_data.message_type.value + link_event = core_pb2.LinkEvent(message_type=message_type, link=link) + return core_pb2.Event(link_event=link_event, source=link_data.source) -def handle_session_event(event: EventData) -> core_pb2.SessionEvent: +def handle_session_event(event_data: EventData) -> core_pb2.Event: """ Handle session event when there is a session event - :param event: event data + :param event_data: event data :return: session event """ - event_time = event.time + event_time = event_data.time if event_time is not None: event_time = float(event_time) - return core_pb2.SessionEvent( - node_id=event.node, - event=event.event_type.value, - name=event.name, - data=event.data, + session_event = core_pb2.SessionEvent( + node_id=event_data.node, + event=event_data.event_type.value, + name=event_data.name, + data=event_data.data, time=event_time, ) + return core_pb2.Event(session_event=session_event) -def handle_config_event(event: ConfigData) -> core_pb2.ConfigEvent: +def handle_config_event(config_data: ConfigData) -> core_pb2.Event: """ Handle configuration event when there is configuration event - :param event: configuration data + :param config_data: configuration data :return: configuration event """ - return core_pb2.ConfigEvent( - message_type=event.message_type, - node_id=event.node, - object=event.object, - type=event.type, - captions=event.captions, - bitmap=event.bitmap, - data_values=event.data_values, - possible_values=event.possible_values, - groups=event.groups, - iface_id=event.iface_id, - network_id=event.network_id, - opaque=event.opaque, - data_types=event.data_types, + config_event = core_pb2.ConfigEvent( + message_type=config_data.message_type, + node_id=config_data.node, + object=config_data.object, + type=config_data.type, + captions=config_data.captions, + bitmap=config_data.bitmap, + data_values=config_data.data_values, + possible_values=config_data.possible_values, + groups=config_data.groups, + iface_id=config_data.iface_id, + network_id=config_data.network_id, + opaque=config_data.opaque, + data_types=config_data.data_types, ) + return core_pb2.Event(config_event=config_event) -def handle_exception_event(event: ExceptionData) -> core_pb2.ExceptionEvent: +def handle_exception_event(exception_data: ExceptionData) -> core_pb2.Event: """ Handle exception event when there is exception event - :param event: exception data + :param exception_data: exception data :return: exception event """ - return core_pb2.ExceptionEvent( - node_id=event.node, - level=event.level.value, - source=event.source, - date=event.date, - text=event.text, - opaque=event.opaque, + exception_event = core_pb2.ExceptionEvent( + node_id=exception_data.node, + level=exception_data.level.value, + source=exception_data.source, + date=exception_data.date, + text=exception_data.text, + opaque=exception_data.opaque, ) + return core_pb2.Event(exception_event=exception_event) -def handle_file_event(event: FileData) -> core_pb2.FileEvent: +def handle_file_event(file_data: FileData) -> core_pb2.Event: """ Handle file event - :param event: file data + :param file_data: file data :return: file event """ - return core_pb2.FileEvent( - message_type=event.message_type.value, - node_id=event.node, - name=event.name, - mode=event.mode, - number=event.number, - type=event.type, - source=event.source, - data=event.data, - compressed_data=event.compressed_data, + file_event = core_pb2.FileEvent( + message_type=file_data.message_type.value, + node_id=file_data.node, + name=file_data.name, + mode=file_data.mode, + number=file_data.number, + type=file_data.type, + source=file_data.source, + data=file_data.data, + compressed_data=file_data.compressed_data, ) + return core_pb2.Event(file_event=file_event) class EventStreamer: @@ -168,32 +175,33 @@ class EventStreamer: if core_pb2.EventType.SESSION in self.event_types: self.session.event_handlers.append(self.queue.put) - def process(self) -> core_pb2.Event: + def process(self) -> Optional[core_pb2.Event]: """ Process the next event in the queue. :return: grpc event, or None when invalid event or queue timeout """ - event = core_pb2.Event(session_id=self.session.id) + event = None try: data = self.queue.get(timeout=1) if isinstance(data, NodeData): - event.node_event.CopyFrom(handle_node_event(data)) + event = handle_node_event(data) elif isinstance(data, LinkData): - event.link_event.CopyFrom(handle_link_event(data)) + event = handle_link_event(data) elif isinstance(data, EventData): - event.session_event.CopyFrom(handle_session_event(data)) + event = handle_session_event(data) elif isinstance(data, ConfigData): - event.config_event.CopyFrom(handle_config_event(data)) + event = handle_config_event(data) elif isinstance(data, ExceptionData): - event.exception_event.CopyFrom(handle_exception_event(data)) + event = handle_exception_event(data) elif isinstance(data, FileData): - event.file_event.CopyFrom(handle_file_event(data)) + event = handle_file_event(data) else: logging.error("unknown event: %s", data) - event = None except Empty: - event = None + pass + if event: + event.session_id = self.session.id return event def remove_handlers(self) -> None: diff --git a/daemon/core/api/grpc/grpcutils.py b/daemon/core/api/grpc/grpcutils.py index 8df545cd..ed40a75b 100644 --- a/daemon/core/api/grpc/grpcutils.py +++ b/daemon/core/api/grpc/grpcutils.py @@ -435,6 +435,24 @@ def get_service_configuration(service: CoreService) -> NodeServiceData: ) +def iface_to_data(iface: CoreInterface) -> InterfaceData: + ip4 = iface.get_ip4() + ip4_addr = str(ip4.ip) if ip4 else None + ip4_mask = ip4.prefixlen if ip4 else None + ip6 = iface.get_ip6() + ip6_addr = str(ip6.ip) if ip6 else None + ip6_mask = ip6.prefixlen if ip6 else None + return InterfaceData( + id=iface.node_id, + name=iface.name, + mac=str(iface.mac), + ip4=ip4_addr, + ip4_mask=ip4_mask, + ip6=ip6_addr, + ip6_mask=ip6_mask, + ) + + def iface_to_proto(iface: CoreInterface) -> core_pb2.Interface: """ Convenience for converting a core interface to the protobuf representation. diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index 1964b6e8..2883103d 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -108,7 +108,7 @@ from core.api.grpc.wlan_pb2 import ( WlanLinkResponse, ) from core.emulator.coreemu import CoreEmu -from core.emulator.data import LinkData, LinkOptions, NodeOptions +from core.emulator.data import InterfaceData, LinkData, LinkOptions, NodeOptions from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags from core.emulator.session import NT, Session from core.errors import CoreCommandError, CoreError @@ -853,6 +853,22 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): node1_iface, node2_iface = session.add_link( node1_id, node2_id, iface1_data, iface2_data, options, link_type ) + iface1_data = None + if node1_iface: + iface1_data = grpcutils.iface_to_data(node1_iface) + iface2_data = None + if node2_iface: + iface2_data = grpcutils.iface_to_data(node2_iface) + source = request.source if request.source else None + link_data = LinkData( + message_type=MessageFlags.ADD, + node1_id=node1_id, + node2_id=node2_id, + iface1=iface1_data, + iface2=iface2_data, + source=source, + ) + session.broadcast_link(link_data) iface1_proto = None iface2_proto = None if node1_iface: @@ -912,6 +928,18 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): iface1_id = request.iface1_id iface2_id = request.iface2_id session.delete_link(node1_id, node2_id, iface1_id, iface2_id) + iface1 = InterfaceData(id=iface1_id) + iface2 = InterfaceData(id=iface2_id) + source = request.source if request.source else None + link_data = LinkData( + message_type=MessageFlags.DELETE, + node1_id=node1_id, + node2_id=node2_id, + iface1=iface1, + iface2=iface2, + source=source, + ) + session.broadcast_link(link_data) return core_pb2.DeleteLinkResponse(result=True) def GetHooks( diff --git a/daemon/core/emulator/data.py b/daemon/core/emulator/data.py index 22d10d2d..15d922a9 100644 --- a/daemon/core/emulator/data.py +++ b/daemon/core/emulator/data.py @@ -190,6 +190,7 @@ class LinkData: iface2: InterfaceData = None options: LinkOptions = LinkOptions() color: str = None + source: str = None class IpPrefixes: diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index c2573578..d2f64dde 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -833,11 +833,12 @@ class Session: for handler in self.config_handlers: handler(config_data) - def broadcast_link(self, link_data: LinkData) -> None: + def broadcast_link(self, link_data: LinkData, source: str = None) -> None: """ Handle link data that should be provided to link handlers. :param link_data: link data to send out + :param source: source of broadcast, None by default :return: nothing """ for handler in self.link_handlers: diff --git a/daemon/core/gui/coreclient.py b/daemon/core/gui/coreclient.py index d35f62e5..a5b96e17 100644 --- a/daemon/core/gui/coreclient.py +++ b/daemon/core/gui/coreclient.py @@ -148,6 +148,8 @@ class CoreClient: self.custom_observers[observer.name] = observer def handle_events(self, event: Event) -> None: + if event.source == GUI_SOURCE: + return if event.session_id != self.session_id: logging.warning( "ignoring event session(%s) current(%s)", @@ -193,19 +195,32 @@ class CoreClient: return canvas_node1 = self.canvas_nodes[node1_id] canvas_node2 = self.canvas_nodes[node2_id] - if event.message_type == MessageType.ADD: - self.app.canvas.add_wireless_edge(canvas_node1, canvas_node2, event.link) - elif event.message_type == MessageType.DELETE: - self.app.canvas.delete_wireless_edge(canvas_node1, canvas_node2, event.link) - elif event.message_type == MessageType.NONE: - self.app.canvas.update_wireless_edge(canvas_node1, canvas_node2, event.link) + if event.link.type == LinkType.WIRELESS: + if event.message_type == MessageType.ADD: + self.app.canvas.add_wireless_edge( + canvas_node1, canvas_node2, event.link + ) + elif event.message_type == MessageType.DELETE: + self.app.canvas.delete_wireless_edge( + canvas_node1, canvas_node2, event.link + ) + elif event.message_type == MessageType.NONE: + self.app.canvas.update_wireless_edge( + canvas_node1, canvas_node2, event.link + ) + else: + logging.warning("unknown link event: %s", event) else: - logging.warning("unknown link event: %s", event) + if event.message_type == MessageType.ADD: + self.app.canvas.add_wired_edge(canvas_node1, canvas_node2, event.link) + self.app.canvas.organize() + elif event.message_type == MessageType.DELETE: + self.app.canvas.delete_wired_edge(canvas_node1, canvas_node2) + else: + logging.warning("unknown link event: %s", event) def handle_node_event(self, event: NodeEvent) -> None: logging.debug("node event: %s", event) - if event.source == GUI_SOURCE: - return node_id = event.node.id x = event.node.position.x y = event.node.position.y diff --git a/daemon/core/gui/graph/graph.py b/daemon/core/gui/graph/graph.py index a3520d22..4e0358a5 100644 --- a/daemon/core/gui/graph/graph.py +++ b/daemon/core/gui/graph/graph.py @@ -225,6 +225,43 @@ class CanvasGraph(tk.Canvas): self.tag_lower(tags.GRIDLINE) self.tag_lower(self.rect) + def add_wired_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None: + token = create_edge_token(src.id, dst.id) + if token in self.edges and link.options.unidirectional: + edge = self.edges[token] + edge.asymmetric_link = link + elif token not in self.edges: + node1 = src.core_node + node2 = dst.core_node + src_pos = (node1.position.x, node1.position.y) + dst_pos = (node2.position.x, node2.position.y) + edge = CanvasEdge(self, src.id, src_pos, dst_pos) + edge.token = token + edge.dst = dst.id + edge.set_link(link) + edge.check_wireless() + src.edges.add(edge) + dst.edges.add(edge) + self.edges[edge.token] = edge + self.core.links[edge.token] = edge + if link.HasField("iface1"): + iface1 = link.iface1 + self.core.iface_to_edge[(node1.id, iface1.id)] = token + src.ifaces[iface1.id] = iface1 + edge.src_iface = iface1 + if link.HasField("iface2"): + iface2 = link.iface2 + self.core.iface_to_edge[(node2.id, iface2.id)] = edge.token + dst.ifaces[iface2.id] = iface2 + edge.dst_iface = iface2 + + def delete_wired_edge(self, src: CanvasNode, dst: CanvasNode) -> None: + token = create_edge_token(src.id, dst.id) + edge = self.edges.get(token) + if not edge: + return + self.delete_edge(edge) + def add_wireless_edge(self, src: CanvasNode, dst: CanvasNode, link: Link) -> None: network_id = link.network_id if link.network_id else None token = create_edge_token(src.id, dst.id, network_id) @@ -297,41 +334,11 @@ class CanvasGraph(tk.Canvas): for link in session.links: logging.debug("drawing link: %s", link) canvas_node1 = self.core.canvas_nodes[link.node1_id] - node1 = canvas_node1.core_node canvas_node2 = self.core.canvas_nodes[link.node2_id] - node2 = canvas_node2.core_node - token = create_edge_token(canvas_node1.id, canvas_node2.id) - if link.type == LinkType.WIRELESS: self.add_wireless_edge(canvas_node1, canvas_node2, link) else: - if token not in self.edges: - src_pos = (node1.position.x, node1.position.y) - dst_pos = (node2.position.x, node2.position.y) - edge = CanvasEdge(self, canvas_node1.id, src_pos, dst_pos) - edge.token = token - edge.dst = canvas_node2.id - edge.set_link(link) - edge.check_wireless() - canvas_node1.edges.add(edge) - canvas_node2.edges.add(edge) - self.edges[edge.token] = edge - self.core.links[edge.token] = edge - if link.HasField("iface1"): - iface1 = link.iface1 - self.core.iface_to_edge[(node1.id, iface1.id)] = token - canvas_node1.ifaces[iface1.id] = iface1 - edge.src_iface = iface1 - if link.HasField("iface2"): - iface2 = link.iface2 - self.core.iface_to_edge[(node2.id, iface2.id)] = edge.token - canvas_node2.ifaces[iface2.id] = iface2 - edge.dst_iface = iface2 - elif link.options.unidirectional: - edge = self.edges[token] - edge.asymmetric_link = link - else: - logging.error("duplicate link received: %s", link) + self.add_wired_edge(canvas_node1, canvas_node2, link) def stopped_session(self) -> None: # clear wireless edges diff --git a/daemon/proto/core/api/grpc/core.proto b/daemon/proto/core/api/grpc/core.proto index 46e1da91..6b1b304c 100644 --- a/daemon/proto/core/api/grpc/core.proto +++ b/daemon/proto/core/api/grpc/core.proto @@ -343,11 +343,11 @@ message Event { FileEvent file_event = 6; } int32 session_id = 7; + string source = 8; } message NodeEvent { Node node = 1; - string source = 2; } message LinkEvent { @@ -488,6 +488,7 @@ message GetNodeLinksResponse { message AddLinkRequest { int32 session_id = 1; Link link = 2; + string source = 3; } message AddLinkResponse { @@ -515,6 +516,7 @@ message DeleteLinkRequest { int32 node2_id = 3; int32 iface1_id = 4; int32 iface2_id = 5; + string source = 6; } message DeleteLinkResponse {