diff --git a/coretk/coretk/coreclient.py b/coretk/coretk/coreclient.py index b1d4d22f..8b01a468 100644 --- a/coretk/coretk/coreclient.py +++ b/coretk/coretk/coreclient.py @@ -20,6 +20,7 @@ from coretk.graph.shapeutils import ShapeType from coretk.interface import InterfaceManager from coretk.nodeutils import NodeDraw, NodeUtils +GUI_SOURCE = "gui" OBSERVERS = { "processes": "ps", "ifconfig": "ifconfig", @@ -133,6 +134,14 @@ class CoreClient: self.custom_observers[observer.name] = observer def handle_events(self, event): + if event.session_id != self.session_id: + logging.warn( + "ignoring event session(%s) current(%s)", + event.session_id, + self.session_id, + ) + return + if event.HasField("link_event"): logging.info("link event: %s", event) self.handle_link_event(event.link_event) @@ -176,7 +185,7 @@ class CoreClient: logging.warning("unknown link event: %s", event.message_type) def handle_node_event(self, event): - if event.source == "gui": + if event.source == GUI_SOURCE: return node_id = event.node.id x = event.node.position.x @@ -195,6 +204,11 @@ class CoreClient: def handle_throughputs(self, event): if event.session_id != self.session_id: + logging.warn( + "ignoring throughput event session(%s) current(%s)", + event.session_id, + self.session_id, + ) return logging.info("handling throughputs event: %s", event) self.app.canvas.throughput_draw.process_grpc_throughput_event( @@ -424,7 +438,7 @@ class CoreClient: def edit_node(self, core_node): try: self.client.edit_node( - self.session_id, core_node.id, core_node.position, source="gui" + self.session_id, core_node.id, core_node.position, source=GUI_SOURCE ) except grpc.RpcError as e: show_grpc_error(e) diff --git a/daemon/core/api/grpc/events.py b/daemon/core/api/grpc/events.py index d7a3094e..7b4756b1 100644 --- a/daemon/core/api/grpc/events.py +++ b/daemon/core/api/grpc/events.py @@ -107,7 +107,6 @@ def handle_session_event(event): name=event.name, data=event.data, time=event_time, - session_id=event.session, ) @@ -119,9 +118,6 @@ def handle_config_event(event): :return: configuration event :rtype: core.api.grpc.core_pb2.ConfigEvent """ - session_id = None - if event.session is not None: - session_id = int(event.session) return core_pb2.ConfigEvent( message_type=event.message_type, node_id=event.node, @@ -132,7 +128,6 @@ def handle_config_event(event): data_values=event.data_values, possible_values=event.possible_values, groups=event.groups, - session_id=session_id, interface=event.interface_number, network_id=event.network_id, opaque=event.opaque, @@ -150,7 +145,6 @@ def handle_exception_event(event): """ return core_pb2.ExceptionEvent( node_id=event.node, - session_id=int(event.session), level=event.level, source=event.source, date=event.date, @@ -175,7 +169,6 @@ def handle_file_event(event): number=event.number, type=event.type, source=event.source, - session_id=event.session, data=event.data, compressed_data=event.compressed_data, ) @@ -224,7 +217,7 @@ class EventStreamer: :return: grpc event, or None when invalid event or queue timeout :rtype: core.api.grpc.core_pb2.Event """ - event = core_pb2.Event() + event = core_pb2.Event(session_id=self.session.id) try: data = self.queue.get(timeout=1) if isinstance(data, NodeData): @@ -235,8 +228,6 @@ class EventStreamer: event.session_event.CopyFrom(handle_session_event(data)) elif isinstance(data, ConfigData): event.config_event.CopyFrom(handle_config_event(data)) - # TODO: remove when config events are fixed - event.config_event.session_id = self.session.id elif isinstance(data, ExceptionData): event.exception_event.CopyFrom(handle_exception_event(data)) elif isinstance(data, FileData): diff --git a/daemon/proto/core/api/grpc/core.proto b/daemon/proto/core/api/grpc/core.proto index ee099b31..ec7bf49c 100644 --- a/daemon/proto/core/api/grpc/core.proto +++ b/daemon/proto/core/api/grpc/core.proto @@ -301,6 +301,7 @@ message Event { ExceptionEvent exception_event = 5; FileEvent file_event = 6; } + int32 session_id = 7; } message NodeEvent { @@ -319,7 +320,6 @@ message SessionEvent { string name = 3; string data = 4; float time = 5; - int32 session_id = 6; } message ConfigEvent { @@ -333,20 +333,18 @@ message ConfigEvent { string bitmap = 8; string possible_values = 9; string groups = 10; - int32 session_id = 11; - int32 interface = 12; - int32 network_id = 13; - string opaque = 14; + int32 interface = 11; + int32 network_id = 12; + string opaque = 13; } message ExceptionEvent { int32 node_id = 1; - int32 session_id = 2; - ExceptionLevel.Enum level = 3; - string source = 4; - string date = 5; - string text = 6; - string opaque = 7; + ExceptionLevel.Enum level = 2; + string source = 3; + string date = 4; + string text = 5; + string opaque = 6; } message FileEvent { @@ -357,9 +355,8 @@ message FileEvent { int32 number = 5; string type = 6; string source = 7; - int32 session_id = 8; - string data = 9; - string compressed_data = 10; + string data = 8; + string compressed_data = 9; } message AddNodeRequest { diff --git a/daemon/tests/test_grpc.py b/daemon/tests/test_grpc.py index 72e469f3..89e46f9b 100644 --- a/daemon/tests/test_grpc.py +++ b/daemon/tests/test_grpc.py @@ -990,6 +990,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id assert event_data.HasField("node_event") queue.put(event_data) @@ -1014,6 +1015,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id assert event_data.HasField("link_event") queue.put(event_data) @@ -1036,6 +1038,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id queue.put(event_data) # then @@ -1053,6 +1056,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id assert event_data.HasField("session_event") queue.put(event_data) @@ -1075,6 +1079,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id assert event_data.HasField("config_event") queue.put(event_data) @@ -1098,6 +1103,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id assert event_data.HasField("exception_event") queue.put(event_data) @@ -1120,6 +1126,7 @@ class TestGrpc: queue = Queue() def handle_event(event_data): + assert event_data.session_id == session.id assert event_data.HasField("file_event") queue.put(event_data)