From 424f69bb1519f35852d49780b3f73ed02a0145cd Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Mon, 16 Dec 2019 11:14:05 -0800 Subject: [PATCH] updated grpc throughputs to only check a specific session and verify the data being collected and sent is for that session, fixed data from throughputs being in hex getting converted to int, updated coretk to only run throughputs when enabled, updated grpc streams to return the stream to allow it being canceled --- coretk/coretk/coreclient.py | 22 ++++++++++++++++------ coretk/coretk/menuaction.py | 7 +++---- daemon/core/api/grpc/client.py | 12 ++++++++---- daemon/core/api/grpc/server.py | 19 ++++++++++++++----- daemon/proto/core/api/grpc/core.proto | 6 ++++-- daemon/tests/test_grpc.py | 4 ++-- 6 files changed, 47 insertions(+), 23 deletions(-) diff --git a/coretk/coretk/coreclient.py b/coretk/coretk/coreclient.py index 776e1750..9f6900cc 100644 --- a/coretk/coretk/coreclient.py +++ b/coretk/coretk/coreclient.py @@ -84,7 +84,7 @@ class CoreClient: self.service_configs = {} self.file_configs = {} self.mobility_players = {} - self.throughput = False + self.handling_throughputs = None def reset(self): # helpers @@ -176,11 +176,22 @@ class CoreClient: canvas_node = self.canvas_nodes[node_id] canvas_node.move(x, y) + def enable_throughputs(self): + self.handling_throughputs = self.client.throughputs( + self.session_id, self.handle_throughputs + ) + + def cancel_throughputs(self): + self.handling_throughputs.cancel() + self.handling_throughputs = None + def handle_throughputs(self, event): - if self.throughput: - self.app.canvas.throughput_draw.process_grpc_throughput_event( - event.interface_throughputs - ) + if event.session_id != self.session_id: + return + logging.info("handling throughputs event: %s", event) + self.app.canvas.throughput_draw.process_grpc_throughput_event( + event.interface_throughputs + ) def handle_exception_event(self, event): logging.info("exception event: %s", event) @@ -200,7 +211,6 @@ class CoreClient: session = response.session self.state = session.state self.client.events(self.session_id, self.handle_events) - self.client.throughputs(self.handle_throughputs) # get location if query_location: diff --git a/coretk/coretk/menuaction.py b/coretk/coretk/menuaction.py index 616e9bc1..ec7e7cb7 100644 --- a/coretk/coretk/menuaction.py +++ b/coretk/coretk/menuaction.py @@ -151,8 +151,7 @@ class MenuAction: dialog.show() def throughput(self): - throughput = self.app.core.throughput - if throughput: - self.app.core.throughput = False + if not self.app.core.handling_throughputs: + self.app.core.enable_throughputs() else: - self.app.core.throughput = True + self.app.core.cancel_throughputs() diff --git a/daemon/core/api/grpc/client.py b/daemon/core/api/grpc/client.py index fbafbb44..bc48c9ab 100644 --- a/daemon/core/api/grpc/client.py +++ b/daemon/core/api/grpc/client.py @@ -385,23 +385,27 @@ class CoreGrpcClient: :param int session_id: id of session :param handler: handler for received events :param list events: events to listen to, defaults to all - :return: nothing + :return: stream processing events, can be used to cancel stream :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.EventsRequest(session_id=session_id, events=events) stream = self.stub.Events(request) start_streamer(stream, handler) + return stream - def throughputs(self, handler): + def throughputs(self, session_id, handler): """ Listen for throughput events with information for interfaces and bridges. + :param int session_id: session id :param handler: handler for every event - :return: nothing + :return: stream processing events, can be used to cancel stream + :raises grpc.RpcError: when session doesn't exist """ - request = core_pb2.ThroughputsRequest() + request = core_pb2.ThroughputsRequest(session_id=session_id) stream = self.stub.Throughputs(request) start_streamer(stream, handler) + return stream def add_node(self, session_id, node): """ diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index 1ada5267..53c1d6d6 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -27,7 +27,7 @@ from core.nodes.lxd import LxcNode from core.services.coreservices import ServiceManager _ONE_DAY_IN_SECONDS = 60 * 60 * 24 -_INTERFACE_REGEX = re.compile(r"\d+") +_INTERFACE_REGEX = re.compile(r"[0-9a-fA-F]+") class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): @@ -452,9 +452,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): :param grpc.SrevicerContext context: context object :return: nothing """ + session = self.get_session(request.session_id, context) delay = 3 last_check = None last_stats = None + while self._is_running(context): now = time.monotonic() stats = get_net_stats() @@ -462,7 +464,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): # calculate average if last_check is not None: interval = now - last_check - throughputs_event = core_pb2.ThroughputsEvent() + throughputs_event = core_pb2.ThroughputsEvent(session_id=session.id) for key in stats: current_rxtx = stats[key] previous_rxtx = last_stats.get(key) @@ -477,8 +479,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): throughput = rx_kbps + tx_kbps if key.startswith("veth"): key = key.split(".") - node_id = int(_INTERFACE_REGEX.search(key[0]).group()) - interface_id = int(key[1]) + node_id = int(_INTERFACE_REGEX.search(key[0]).group(), base=16) + interface_id = int(key[1], base=16) + session_id = int(key[2], base=16) + if session.id != session_id: + continue interface_throughput = ( throughputs_event.interface_throughputs.add() ) @@ -487,7 +492,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): interface_throughput.throughput = throughput elif key.startswith("b."): try: - node_id = int(key.split(".")[1]) + key = key.split(".") + node_id = int(key[1], base=16) + session_id = int(key[2], base=16) + if session.id != session_id: + continue bridge_throughput = ( throughputs_event.bridge_throughputs.add() ) diff --git a/daemon/proto/core/api/grpc/core.proto b/daemon/proto/core/api/grpc/core.proto index b0a6c0d6..ee099b31 100644 --- a/daemon/proto/core/api/grpc/core.proto +++ b/daemon/proto/core/api/grpc/core.proto @@ -272,11 +272,13 @@ message EventsRequest { } message ThroughputsRequest { + int32 session_id = 1; } message ThroughputsEvent { - repeated BridgeThroughput bridge_throughputs = 1; - repeated InterfaceThroughput interface_throughputs = 2; + int32 session_id = 1; + repeated BridgeThroughput bridge_throughputs = 2; + repeated InterfaceThroughput interface_throughputs = 3; } message InterfaceThroughput { diff --git a/daemon/tests/test_grpc.py b/daemon/tests/test_grpc.py index 796febf7..72e469f3 100644 --- a/daemon/tests/test_grpc.py +++ b/daemon/tests/test_grpc.py @@ -1032,7 +1032,7 @@ class TestGrpc: # given client = CoreGrpcClient() - grpc_server.coreemu.create_session() + session = grpc_server.coreemu.create_session() queue = Queue() def handle_event(event_data): @@ -1040,7 +1040,7 @@ class TestGrpc: # then with client.context_connect(): - client.throughputs(handle_event) + client.throughputs(session.id, handle_event) time.sleep(0.1) # then