From c4a724ee10c2c3807a79385f64141baef0497056 Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Wed, 2 Sep 2020 12:08:21 -0700 Subject: [PATCH] daemon: added more wrapping classes, updated grpc.clientw to leverage wrapped classes for listened events --- daemon/core/api/grpc/clientw.py | 77 +++++++++++----- daemon/core/api/grpc/wrappers.py | 147 ++++++++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 22 deletions(-) diff --git a/daemon/core/api/grpc/clientw.py b/daemon/core/api/grpc/clientw.py index 10de850b..25ee8b2f 100644 --- a/daemon/core/api/grpc/clientw.py +++ b/daemon/core/api/grpc/clientw.py @@ -108,36 +108,65 @@ class InterfaceHelper: ) -def stream_listener(stream: Any, handler: Callable[[core_pb2.Event], None]) -> None: +def throughput_listener( + stream: Any, handler: Callable[[wrappers.ThroughputsEvent], None] +) -> None: """ - Listen for stream events and provide them to the handler. + Listen for throughput events and provide them to the handler. + + :param stream: grpc stream that will provide events + :param handler: function that handles an event + :return: nothing + """ + try: + for event_proto in stream: + event = wrappers.ThroughputsEvent.from_proto(event_proto) + handler(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.CANCELLED: + logging.debug("throughput stream closed") + else: + logging.exception("throughput stream error") + + +def cpu_listener( + stream: Any, handler: Callable[[wrappers.CpuUsageEvent], None] +) -> None: + """ + Listen for cpu events and provide them to the handler. :param stream: grpc stream that will provide events :param handler: function that handles an event :return: nothing """ try: - for event in stream: + for event_proto in stream: + event = wrappers.CpuUsageEvent.from_proto(event_proto) handler(event) except grpc.RpcError as e: if e.code() == grpc.StatusCode.CANCELLED: - logging.debug("stream closed") + logging.debug("cpu stream closed") else: - logging.exception("stream error") + logging.exception("cpu stream error") -def start_streamer(stream: Any, handler: Callable[[core_pb2.Event], None]) -> None: +def event_listener(stream: Any, handler: Callable[[wrappers.Event], None]) -> None: """ - Convenience method for starting a grpc stream thread for handling streamed events. + Listen for session events and provide them to the handler. :param stream: grpc stream that will provide events :param handler: function that handles an event :return: nothing """ - thread = threading.Thread( - target=stream_listener, args=(stream, handler), daemon=True - ) - thread.start() + try: + for event_proto in stream: + event = wrappers.Event.from_proto(event_proto) + handler(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.CANCELLED: + logging.debug("session stream closed") + else: + logging.exception("session stream error") class CoreGrpcClient: @@ -469,12 +498,11 @@ class CoreGrpcClient: response = self.stub.SessionAlert(request) return response.result - # TODO: determine best path for handling non proto events def events( self, session_id: int, - handler: Callable[[core_pb2.Event], None], - events: List[core_pb2.Event] = None, + handler: Callable[[wrappers.Event], None], + events: List[wrappers.EventType] = None, ) -> grpc.Future: """ Listen for session events. @@ -487,12 +515,14 @@ class CoreGrpcClient: """ request = core_pb2.EventsRequest(session_id=session_id, events=events) stream = self.stub.Events(request) - start_streamer(stream, handler) + thread = threading.Thread( + target=event_listener, args=(stream, handler), daemon=True + ) + thread.start() return stream - # TODO: determine best path for handling non proto events def throughputs( - self, session_id: int, handler: Callable[[core_pb2.ThroughputsEvent], None] + self, session_id: int, handler: Callable[[wrappers.ThroughputsEvent], None] ) -> grpc.Future: """ Listen for throughput events with information for interfaces and bridges. @@ -504,12 +534,14 @@ class CoreGrpcClient: """ request = core_pb2.ThroughputsRequest(session_id=session_id) stream = self.stub.Throughputs(request) - start_streamer(stream, handler) + thread = threading.Thread( + target=throughput_listener, args=(stream, handler), daemon=True + ) + thread.start() return stream - # TODO: determine best path for handling non proto events def cpu_usage( - self, delay: int, handler: Callable[[core_pb2.CpuUsageEvent], None] + self, delay: int, handler: Callable[[wrappers.CpuUsageEvent], None] ) -> grpc.Future: """ Listen for cpu usage events with the given repeat delay. @@ -520,7 +552,10 @@ class CoreGrpcClient: """ request = core_pb2.CpuUsageRequest(delay=delay) stream = self.stub.CpuUsage(request) - start_streamer(stream, handler) + thread = threading.Thread( + target=cpu_listener, args=(stream, handler), daemon=True + ) + thread.start() return stream def add_node(self, session_id: int, node: wrappers.Node, source: str = None) -> int: diff --git a/daemon/core/api/grpc/wrappers.py b/daemon/core/api/grpc/wrappers.py index 285bdfce..3fc087fa 100644 --- a/daemon/core/api/grpc/wrappers.py +++ b/daemon/core/api/grpc/wrappers.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field from enum import Enum from pathlib import Path -from typing import Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple from core.api.grpc import ( common_pb2, @@ -100,6 +100,15 @@ class ServiceAction(Enum): VALIDATE = 3 +class EventType: + SESSION = 0 + NODE = 1 + LINK = 2 + CONFIG = 3 + EXCEPTION = 4 + FILE = 5 + + @dataclass class ConfigService: group: str @@ -285,6 +294,15 @@ class ThroughputsEvent: ) +@dataclass +class CpuUsageEvent: + usage: float + + @classmethod + def from_proto(cls, proto: core_pb2.CpuUsageEvent) -> "CpuUsageEvent": + return CpuUsageEvent(usage=proto.usage) + + @dataclass class SessionLocation: x: float @@ -776,6 +794,133 @@ class NodeEvent: ) +@dataclass +class SessionEvent: + node_id: int + event: int + name: str + data: str + time: float + + @classmethod + def from_proto(cls, proto: core_pb2.SessionEvent) -> "SessionEvent": + return SessionEvent( + node_id=proto.node_id, + event=proto.event, + name=proto.name, + data=proto.data, + time=proto.time, + ) + + +@dataclass +class FileEvent: + message_type: MessageType + node_id: int + name: str + mode: str + number: int + type: str + source: str + data: str + compressed_data: str + + @classmethod + def from_proto(cls, proto: core_pb2.FileEvent) -> "FileEvent": + return FileEvent( + message_type=MessageType(proto.message_type), + node_id=proto.node_id, + name=proto.name, + mode=proto.mode, + number=proto.number, + type=proto.type, + source=proto.source, + data=proto.data, + compressed_data=proto.compressed_data, + ) + + +@dataclass +class ConfigEvent: + message_type: MessageType + node_id: int + object: str + type: int + data_types: List[int] + data_values: str + captions: str + bitmap: str + possible_values: str + groups: str + iface_id: int + network_id: int + opaque: str + + @classmethod + def from_proto(cls, proto: core_pb2.ConfigEvent) -> "ConfigEvent": + return ConfigEvent( + message_type=MessageType(proto.message_type), + node_id=proto.node_id, + object=proto.object, + type=proto.type, + data_types=list(proto.data_types), + data_values=proto.data_values, + captions=proto.captions, + bitmap=proto.bitmap, + possible_values=proto.possible_values, + groups=proto.groups, + iface_id=proto.iface_id, + network_id=proto.network_id, + opaque=proto.opaque, + ) + + +@dataclass +class Event: + session_id: int + source: str = None + session_event: SessionEvent = None + node_event: NodeEvent = None + link_event: LinkEvent = None + config_event: Any = None + exception_event: ExceptionEvent = None + file_event: FileEvent = None + + @classmethod + def from_proto(cls, proto: core_pb2.Event) -> "Event": + source = proto.source if proto.source else None + node_event = None + link_event = None + exception_event = None + session_event = None + file_event = None + config_event = None + if proto.HasField("node_event"): + node_event = NodeEvent.from_proto(proto.node_event) + elif proto.HasField("link_event"): + link_event = LinkEvent.from_proto(proto.link_event) + elif proto.HasField("exception_event"): + exception_event = ExceptionEvent.from_proto( + proto.session_id, proto.exception_event + ) + elif proto.HasField("session_event"): + session_event = SessionEvent.from_proto(proto.session_event) + elif proto.HasField("file_event"): + file_event = FileEvent.from_proto(proto.file_event) + elif proto.HasField("config_event"): + config_event = ConfigEvent.from_proto(proto.config_event) + return Event( + session_id=proto.session_id, + source=source, + node_event=node_event, + link_event=link_event, + exception_event=exception_event, + session_event=session_event, + file_event=file_event, + config_event=config_event, + ) + + @dataclass class EmaneEventChannel: group: str