diff --git a/daemon/core/grpc/server.py b/daemon/core/grpc/server.py index bb3f8d27..0bb51704 100644 --- a/daemon/core/grpc/server.py +++ b/daemon/core/grpc/server.py @@ -2,7 +2,7 @@ import logging import os import tempfile import time -from Queue import Queue +from Queue import Queue, Empty from itertools import repeat import grpc @@ -377,175 +377,193 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer): queue = Queue() session.node_handlers.append(lambda x: queue.put(x)) - while True: - node = queue.get() - node_event = core_pb2.NodeEvent() - update_proto( - node_event.node, - id=node.id, - name=node.name, - model=node.model - ) - update_proto( - node_event.node.position, - x=node.x_position, - y=node.y_position - ) - services = node.services or "" - node_event.node.services.extend(services.split("|")) - yield node_event + while context.is_active(): + try: + node = queue.get(timeout=1) + node_event = core_pb2.NodeEvent() + update_proto( + node_event.node, + id=node.id, + name=node.name, + model=node.model + ) + update_proto( + node_event.node.position, + x=node.x_position, + y=node.y_position + ) + services = node.services or "" + node_event.node.services.extend(services.split("|")) + yield node_event + except Empty: + continue def LinkEvents(self, request, context): session = self.get_session(request.id, context) queue = Queue() session.link_handlers.append(lambda x: queue.put(x)) - while True: - event = queue.get() - link_event = core_pb2.LinkEvent() - if event.interface1_id is not None: - interface_one = link_event.link.interface_one - update_proto( - interface_one, - id=event.interface1_id, - name=event.interface1_name, - mac=convert_value(event.interface1_mac), - ip4=convert_value(event.interface1_ip4), - ip4mask=event.interface1_ip4_mask, - ip6=convert_value(event.interface1_ip6), - ip6mask=event.interface1_ip6_mask, - ) + while context.is_active(): + try: + event = queue.get(timeout=1) + link_event = core_pb2.LinkEvent() + if event.interface1_id is not None: + interface_one = link_event.link.interface_one + update_proto( + interface_one, + id=event.interface1_id, + name=event.interface1_name, + mac=convert_value(event.interface1_mac), + ip4=convert_value(event.interface1_ip4), + ip4mask=event.interface1_ip4_mask, + ip6=convert_value(event.interface1_ip6), + ip6mask=event.interface1_ip6_mask, + ) - if event.interface2_id is not None: - interface_two = link_event.link.interface_two - update_proto( - interface_two, - id=event.interface2_id, - name=event.interface2_name, - mac=convert_value(event.interface2_mac), - ip4=convert_value(event.interface2_ip4), - ip4mask=event.interface2_ip4_mask, - ip6=convert_value(event.interface2_ip6), - ip6mask=event.interface2_ip6_mask, - ) + if event.interface2_id is not None: + interface_two = link_event.link.interface_two + update_proto( + interface_two, + id=event.interface2_id, + name=event.interface2_name, + mac=convert_value(event.interface2_mac), + ip4=convert_value(event.interface2_ip4), + ip4mask=event.interface2_ip4_mask, + ip6=convert_value(event.interface2_ip6), + ip6mask=event.interface2_ip6_mask, + ) - link_event.message_type = event.message_type - update_proto( - link_event.link, - type=event.link_type, - node_one=event.node1_id, - node_two=event.node2_id - ) - update_proto( - link_event.link.options, - opaque=event.opaque, - jitter=event.jitter, - key=event.key, - mburst=event.mburst, - mer=event.mer, - per=event.per, - bandwidth=event.bandwidth, - burst=event.burst, - delay=event.delay, - dup=event.dup, - unidirectional=event.unidirectional - ) - yield link_event + link_event.message_type = event.message_type + update_proto( + link_event.link, + type=event.link_type, + node_one=event.node1_id, + node_two=event.node2_id + ) + update_proto( + link_event.link.options, + opaque=event.opaque, + jitter=event.jitter, + key=event.key, + mburst=event.mburst, + mer=event.mer, + per=event.per, + bandwidth=event.bandwidth, + burst=event.burst, + delay=event.delay, + dup=event.dup, + unidirectional=event.unidirectional + ) + yield link_event + except Empty: + continue def SessionEvents(self, request, context): session = self.get_session(request.id, context) queue = Queue() session.event_handlers.append(lambda x: queue.put(x)) - while True: - event = queue.get() - session_event = core_pb2.SessionEvent() - event_time = event.time - if event_time is not None: - event_time = float(event_time) - update_proto( - session_event, - node=event.node, - event=event.event_type, - name=event.name, - data=event.data, - time=event_time, - session=session.session_id - ) - yield session_event + while context.is_active(): + try: + event = queue.get(timeout=1) + session_event = core_pb2.SessionEvent() + event_time = event.time + if event_time is not None: + event_time = float(event_time) + update_proto( + session_event, + node=event.node, + event=event.event_type, + name=event.name, + data=event.data, + time=event_time, + session=session.session_id + ) + yield session_event + except Empty: + continue def ConfigEvents(self, request, context): session = self.get_session(request.id, context) queue = Queue() session.config_handlers.append(lambda x: queue.put(x)) - while True: - event = queue.get() - config_event = core_pb2.ConfigEvent() - update_proto( - config_event, - message_type=event.message_type, - node=event.node, - object=event.object, - type=event.type, - captions=event.captions, - bitmap=event.bitmap, - data_values=event.data_values, - possible_values=event.possible_values, - groups=event.groups, - session=event.session, - interface=event.interface_number, - network_id=event.network_id, - opaque=event.opaque - ) - config_event.data_types.extend(event.data_types) - yield config_event + while context.is_active(): + try: + event = queue.get(timeout=1) + config_event = core_pb2.ConfigEvent() + update_proto( + config_event, + message_type=event.message_type, + node=event.node, + object=event.object, + type=event.type, + captions=event.captions, + bitmap=event.bitmap, + data_values=event.data_values, + possible_values=event.possible_values, + groups=event.groups, + session=event.session, + interface=event.interface_number, + network_id=event.network_id, + opaque=event.opaque + ) + config_event.data_types.extend(event.data_types) + yield config_event + except Empty: + continue def ExceptionEvents(self, request, context): session = self.get_session(request.id, context) queue = Queue() session.exception_handlers.append(lambda x: queue.put(x)) - while True: - event = queue.get() - exception_event = core_pb2.ExceptionEvent() - event_time = event.date - if event_time is not None: - event_time = float(event_time) - update_proto( - exception_event, - node=event.node, - session=event.session, - level=event.level, - source=event.source, - date=event_time, - text=event.text, - opaque=event.opaque - ) - yield exception_event + while context.is_active(): + try: + event = queue.get(timeout=1) + exception_event = core_pb2.ExceptionEvent() + event_time = event.date + if event_time is not None: + event_time = float(event_time) + update_proto( + exception_event, + node=event.node, + session=event.session, + level=event.level, + source=event.source, + date=event_time, + text=event.text, + opaque=event.opaque + ) + yield exception_event + except Empty: + continue def FileEvents(self, request, context): session = self.get_session(request.id, context) queue = Queue() session.file_handlers.append(lambda x: queue.put(x)) - while True: - event = queue.get() - file_event = core_pb2.FileEvent() - update_proto( - file_event, - message_type=event.message_type, - node=event.node, - name=event.name, - mode=event.mode, - number=event.number, - type=event.type, - source=event.source, - session=event.session, - data=event.data, - compressed_data=event.compressed_data - ) - yield file_event + while context.is_active(): + try: + event = queue.get(timeout=1) + file_event = core_pb2.FileEvent() + update_proto( + file_event, + message_type=event.message_type, + node=event.node, + name=event.name, + mode=event.mode, + number=event.number, + type=event.type, + source=event.source, + session=event.session, + data=event.data, + compressed_data=event.compressed_data + ) + yield file_event + except Empty: + continue def CreateNode(self, request, context): session = self.get_session(request.session, context)