diff --git a/daemon/core/api/grpc/client.py b/daemon/core/api/grpc/client.py index 7887d5e8..ae193909 100644 --- a/daemon/core/api/grpc/client.py +++ b/daemon/core/api/grpc/client.py @@ -378,16 +378,17 @@ class CoreGrpcClient: ) return self.stub.AddSessionServer(request) - def events(self, session_id, handler): + def events(self, session_id, handler, events=None): """ Listen for session events. :param int session_id: id of session - :param handler: handler for every event + :param handler: handler for received events + :param list events: events to listen to, defaults to all :return: nothing :raises grpc.RpcError: when session doesn't exist """ - request = core_pb2.EventsRequest(session_id=session_id) + request = core_pb2.EventsRequest(session_id=session_id, events=events) stream = self.stub.Events(request) start_streamer(stream, handler) diff --git a/daemon/core/api/grpc/events.py b/daemon/core/api/grpc/events.py new file mode 100644 index 00000000..d7a3094e --- /dev/null +++ b/daemon/core/api/grpc/events.py @@ -0,0 +1,268 @@ +import logging +from queue import Empty, Queue + +from core.api.grpc import core_pb2 +from core.api.grpc.grpcutils import convert_value +from core.emulator.data import ( + ConfigData, + EventData, + ExceptionData, + FileData, + LinkData, + NodeData, +) + + +def handle_node_event(event): + """ + Handle node event when there is a node event + + :param core.emulator.data.NodeData event: node data + :return: node event that contains node id, name, model, position, and services + :rtype: core.api.grpc.core_pb2.NodeEvent + """ + position = core_pb2.Position(x=event.x_position, y=event.y_position) + services = event.services or "" + services = services.split("|") + node_proto = core_pb2.Node( + id=event.id, + name=event.name, + model=event.model, + position=position, + services=services, + ) + return core_pb2.NodeEvent(node=node_proto, source=event.source) + + +def handle_link_event(event): + """ + Handle link event when there is a link event + + :param core.emulator.data.LinkData event: link data + :return: link event that has message type and link information + :rtype: core.api.grpc.core_pb2.LinkEvent + """ + interface_one = None + if event.interface1_id is not None: + interface_one = core_pb2.Interface( + id=event.interface1_id, + name=event.interface1_name, + mac=convert_value(event.interface1_mac), + ip4=convert_value(event.interface1_ip4), + ip4mask=event.interface1_ip4_mask, + ip6=convert_value(event.interface1_ip6), + ip6mask=event.interface1_ip6_mask, + ) + + interface_two = None + if event.interface2_id is not None: + interface_two = core_pb2.Interface( + id=event.interface2_id, + name=event.interface2_name, + mac=convert_value(event.interface2_mac), + ip4=convert_value(event.interface2_ip4), + ip4mask=event.interface2_ip4_mask, + ip6=convert_value(event.interface2_ip6), + ip6mask=event.interface2_ip6_mask, + ) + + options = core_pb2.LinkOptions( + opaque=event.opaque, + jitter=event.jitter, + key=event.key, + mburst=event.mburst, + mer=event.mer, + per=event.per, + bandwidth=event.bandwidth, + burst=event.burst, + delay=event.delay, + dup=event.dup, + unidirectional=event.unidirectional, + ) + link = core_pb2.Link( + type=event.link_type, + node_one_id=event.node1_id, + node_two_id=event.node2_id, + interface_one=interface_one, + interface_two=interface_two, + options=options, + ) + return core_pb2.LinkEvent(message_type=event.message_type, link=link) + + +def handle_session_event(event): + """ + Handle session event when there is a session event + + :param core.emulator.data.EventData event: event data + :return: session event + :rtype: core.api.grpc.core_pb2.SessionEvent + """ + event_time = event.time + if event_time is not None: + event_time = float(event_time) + return core_pb2.SessionEvent( + node_id=event.node, + event=event.event_type, + name=event.name, + data=event.data, + time=event_time, + session_id=event.session, + ) + + +def handle_config_event(event): + """ + Handle configuration event when there is configuration event + + :param core.emulator.data.ConfigData event: configuration data + :return: configuration event + :rtype: core.api.grpc.core_pb2.ConfigEvent + """ + session_id = None + if event.session is not None: + session_id = int(event.session) + return core_pb2.ConfigEvent( + message_type=event.message_type, + node_id=event.node, + object=event.object, + type=event.type, + captions=event.captions, + bitmap=event.bitmap, + data_values=event.data_values, + possible_values=event.possible_values, + groups=event.groups, + session_id=session_id, + interface=event.interface_number, + network_id=event.network_id, + opaque=event.opaque, + data_types=event.data_types, + ) + + +def handle_exception_event(event): + """ + Handle exception event when there is exception event + + :param core.emulator.data.ExceptionData event: exception data + :return: exception event + :rtype: core.api.grpc.core_pb2.ExceptionEvent + """ + return core_pb2.ExceptionEvent( + node_id=event.node, + session_id=int(event.session), + level=event.level, + source=event.source, + date=event.date, + text=event.text, + opaque=event.opaque, + ) + + +def handle_file_event(event): + """ + Handle file event + + :param core.emulator.data.FileData event: file data + :return: file event + :rtype: core.api.grpc.core_pb2.FileEvent + """ + return core_pb2.FileEvent( + message_type=event.message_type, + node_id=event.node, + name=event.name, + mode=event.mode, + number=event.number, + type=event.type, + source=event.source, + session_id=event.session, + data=event.data, + compressed_data=event.compressed_data, + ) + + +class EventStreamer: + """ + Processes session events to generate grpc events. + """ + + def __init__(self, session, event_types): + """ + Create a EventStreamer instance. + + :param core.emulator.session.Session session: session to process events for + :param set event_types: types of events to process + """ + self.session = session + self.event_types = event_types + self.queue = Queue() + self.add_handlers() + + def add_handlers(self): + """ + Add a session event handler for desired event types. + + :return: nothing + """ + if core_pb2.EventType.NODE in self.event_types: + self.session.node_handlers.append(self.queue.put) + if core_pb2.EventType.LINK in self.event_types: + self.session.link_handlers.append(self.queue.put) + if core_pb2.EventType.CONFIG in self.event_types: + self.session.config_handlers.append(self.queue.put) + if core_pb2.EventType.FILE in self.event_types: + self.session.file_handlers.append(self.queue.put) + if core_pb2.EventType.EXCEPTION in self.event_types: + self.session.exception_handlers.append(self.queue.put) + if core_pb2.EventType.SESSION in self.event_types: + self.session.event_handlers.append(self.queue.put) + + def process(self): + """ + Process the next event in the queue. + + :return: grpc event, or None when invalid event or queue timeout + :rtype: core.api.grpc.core_pb2.Event + """ + event = core_pb2.Event() + try: + data = self.queue.get(timeout=1) + if isinstance(data, NodeData): + event.node_event.CopyFrom(handle_node_event(data)) + elif isinstance(data, LinkData): + event.link_event.CopyFrom(handle_link_event(data)) + elif isinstance(data, EventData): + event.session_event.CopyFrom(handle_session_event(data)) + elif isinstance(data, ConfigData): + event.config_event.CopyFrom(handle_config_event(data)) + # TODO: remove when config events are fixed + event.config_event.session_id = self.session.id + elif isinstance(data, ExceptionData): + event.exception_event.CopyFrom(handle_exception_event(data)) + elif isinstance(data, FileData): + event.file_event.CopyFrom(handle_file_event(data)) + else: + logging.error("unknown event: %s", data) + event = None + except Empty: + event = None + return event + + def remove_handlers(self): + """ + Remove session event handlers for events being watched. + + :return: nothing + """ + if core_pb2.EventType.NODE in self.event_types: + self.session.node_handlers.remove(self.queue.put) + if core_pb2.EventType.LINK in self.event_types: + self.session.link_handlers.remove(self.queue.put) + if core_pb2.EventType.CONFIG in self.event_types: + self.session.config_handlers.remove(self.queue.put) + if core_pb2.EventType.FILE in self.event_types: + self.session.file_handlers.remove(self.queue.put) + if core_pb2.EventType.EXCEPTION in self.event_types: + self.session.exception_handlers.remove(self.queue.put) + if core_pb2.EventType.SESSION in self.event_types: + self.session.event_handlers.remove(self.queue.put) diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index e552e1a4..75ce23ba 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -5,27 +5,19 @@ import re import tempfile import time from concurrent import futures -from queue import Empty, Queue import grpc from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils +from core.api.grpc.events import EventStreamer from core.api.grpc.grpcutils import ( - convert_value, get_config_options, get_emane_model_id, get_links, get_net_stats, ) from core.emane.nodes import EmaneNet -from core.emulator.data import ( - ConfigData, - EventData, - ExceptionData, - FileData, - LinkData, - NodeData, -) +from core.emulator.data import LinkData from core.emulator.emudata import LinkOptions, NodeOptions from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags from core.errors import CoreCommandError, CoreError @@ -439,210 +431,19 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): def Events(self, request, context): session = self.get_session(request.session_id, context) - queue = Queue() - session.node_handlers.append(queue.put) - session.link_handlers.append(queue.put) - session.config_handlers.append(queue.put) - session.file_handlers.append(queue.put) - session.exception_handlers.append(queue.put) - session.event_handlers.append(queue.put) + event_types = set(request.events) + if not event_types: + event_types = set(core_pb2.EventType.Enum.values()) + streamer = EventStreamer(session, event_types) while self._is_running(context): - event = core_pb2.Event() - try: - data = queue.get(timeout=1) - if isinstance(data, NodeData): - event.node_event.CopyFrom(self._handle_node_event(data)) - elif isinstance(data, LinkData): - event.link_event.CopyFrom(self._handle_link_event(data)) - elif isinstance(data, EventData): - event.session_event.CopyFrom(self._handle_session_event(data)) - elif isinstance(data, ConfigData): - event.config_event.CopyFrom(self._handle_config_event(data)) - # TODO: remove when config events are fixed - event.config_event.session_id = session.id - elif isinstance(data, ExceptionData): - event.exception_event.CopyFrom(self._handle_exception_event(data)) - elif isinstance(data, FileData): - event.file_event.CopyFrom(self._handle_file_event(data)) - else: - logging.error("unknown event: %s", data) - continue - + event = streamer.process() + if event: yield event - except Empty: - continue - session.node_handlers.remove(queue.put) - session.link_handlers.remove(queue.put) - session.config_handlers.remove(queue.put) - session.file_handlers.remove(queue.put) - session.exception_handlers.remove(queue.put) - session.event_handlers.remove(queue.put) + streamer.remove_handlers() self._cancel_stream(context) - def _handle_node_event(self, event): - """ - Handle node event when there is a node event - - :param core.emulator.data.NodeData event: node data - :return: node event that contains node id, name, model, position, and services - :rtype: core.api.grpc.core_pb2.NodeEvent - """ - position = core_pb2.Position(x=event.x_position, y=event.y_position) - services = event.services or "" - services = services.split("|") - node_proto = core_pb2.Node( - id=event.id, - name=event.name, - model=event.model, - position=position, - services=services, - ) - return core_pb2.NodeEvent(node=node_proto, source=event.source) - - def _handle_link_event(self, event): - """ - Handle link event when there is a link event - - :param core.emulator.data.LinkData event: link data - :return: link event that has message type and link information - :rtype: core.api.grpc.core_pb2.LinkEvent - """ - interface_one = None - if event.interface1_id is not None: - interface_one = core_pb2.Interface( - id=event.interface1_id, - name=event.interface1_name, - mac=convert_value(event.interface1_mac), - ip4=convert_value(event.interface1_ip4), - ip4mask=event.interface1_ip4_mask, - ip6=convert_value(event.interface1_ip6), - ip6mask=event.interface1_ip6_mask, - ) - - interface_two = None - if event.interface2_id is not None: - interface_two = core_pb2.Interface( - id=event.interface2_id, - name=event.interface2_name, - mac=convert_value(event.interface2_mac), - ip4=convert_value(event.interface2_ip4), - ip4mask=event.interface2_ip4_mask, - ip6=convert_value(event.interface2_ip6), - ip6mask=event.interface2_ip6_mask, - ) - - options = core_pb2.LinkOptions( - opaque=event.opaque, - jitter=event.jitter, - key=event.key, - mburst=event.mburst, - mer=event.mer, - per=event.per, - bandwidth=event.bandwidth, - burst=event.burst, - delay=event.delay, - dup=event.dup, - unidirectional=event.unidirectional, - ) - link = core_pb2.Link( - type=event.link_type, - node_one_id=event.node1_id, - node_two_id=event.node2_id, - interface_one=interface_one, - interface_two=interface_two, - options=options, - ) - return core_pb2.LinkEvent(message_type=event.message_type, link=link) - - def _handle_session_event(self, event): - """ - Handle session event when there is a session event - - :param core.emulator.data.EventData event: event data - :return: session event - :rtype: core.api.grpc.core_pb2.SessionEvent - """ - event_time = event.time - if event_time is not None: - event_time = float(event_time) - return core_pb2.SessionEvent( - node_id=event.node, - event=event.event_type, - name=event.name, - data=event.data, - time=event_time, - session_id=event.session, - ) - - def _handle_config_event(self, event): - """ - Handle configuration event when there is configuration event - - :param core.emulator.data.ConfigData event: configuration data - :return: configuration event - :rtype: core.api.grpc.core_pb2.ConfigEvent - """ - session_id = None - if event.session is not None: - session_id = int(event.session) - return core_pb2.ConfigEvent( - message_type=event.message_type, - node_id=event.node, - object=event.object, - type=event.type, - captions=event.captions, - bitmap=event.bitmap, - data_values=event.data_values, - possible_values=event.possible_values, - groups=event.groups, - session_id=session_id, - interface=event.interface_number, - network_id=event.network_id, - opaque=event.opaque, - data_types=event.data_types, - ) - - def _handle_exception_event(self, event): - """ - Handle exception event when there is exception event - - :param core.emulator.data.ExceptionData event: exception data - :return: exception event - :rtype: core.api.grpc.core_pb2.ExceptionEvent - """ - return core_pb2.ExceptionEvent( - node_id=event.node, - session_id=int(event.session), - level=event.level, - source=event.source, - date=event.date, - text=event.text, - opaque=event.opaque, - ) - - def _handle_file_event(self, event): - """ - Handle file event - - :param core.emulator.data.FileData event: file data - :return: file event - :rtype: core.api.grpc.core_pb2.FileEvent - """ - return core_pb2.FileEvent( - message_type=event.message_type, - node_id=event.node, - name=event.name, - mode=event.mode, - number=event.number, - type=event.type, - source=event.source, - session_id=event.session, - data=event.data, - compressed_data=event.compressed_data, - ) - def Throughputs(self, request, context): """ Calculate average throughput after every certain amount of delay time diff --git a/daemon/proto/core/api/grpc/core.proto b/daemon/proto/core/api/grpc/core.proto index 5f6a964b..55ed272e 100644 --- a/daemon/proto/core/api/grpc/core.proto +++ b/daemon/proto/core/api/grpc/core.proto @@ -266,6 +266,7 @@ message AddSessionServerResponse { message EventsRequest { int32 session_id = 1; + repeated EventType.Enum events = 2; } message ThroughputsRequest { @@ -742,6 +743,17 @@ message ServiceFileConfig { string data = 4; } +message EventType { + enum Enum { + SESSION = 0; + NODE = 1; + LINK = 2; + CONFIG = 3; + EXCEPTION = 4; + FILE = 5; + } +} + message MessageType { enum Enum { NONE = 0;