daemon: added more wrapping classes, updated grpc.clientw to leverage wrapped classes for listened events

This commit is contained in:
Blake Harnden 2020-09-02 12:08:21 -07:00
parent a80fda11f5
commit c4a724ee10
2 changed files with 202 additions and 22 deletions

View file

@ -108,36 +108,65 @@ class InterfaceHelper:
)
def stream_listener(stream: Any, handler: Callable[[core_pb2.Event], None]) -> None:
def throughput_listener(
stream: Any, handler: Callable[[wrappers.ThroughputsEvent], None]
) -> None:
"""
Listen for stream events and provide them to the handler.
Listen for throughput events and provide them to the handler.
:param stream: grpc stream that will provide events
:param handler: function that handles an event
:return: nothing
"""
try:
for event_proto in stream:
event = wrappers.ThroughputsEvent.from_proto(event_proto)
handler(event)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
logging.debug("throughput stream closed")
else:
logging.exception("throughput stream error")
def cpu_listener(
stream: Any, handler: Callable[[wrappers.CpuUsageEvent], None]
) -> None:
"""
Listen for cpu events and provide them to the handler.
:param stream: grpc stream that will provide events
:param handler: function that handles an event
:return: nothing
"""
try:
for event in stream:
for event_proto in stream:
event = wrappers.CpuUsageEvent.from_proto(event_proto)
handler(event)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
logging.debug("stream closed")
logging.debug("cpu stream closed")
else:
logging.exception("stream error")
logging.exception("cpu stream error")
def start_streamer(stream: Any, handler: Callable[[core_pb2.Event], None]) -> None:
def event_listener(stream: Any, handler: Callable[[wrappers.Event], None]) -> None:
"""
Convenience method for starting a grpc stream thread for handling streamed events.
Listen for session events and provide them to the handler.
:param stream: grpc stream that will provide events
:param handler: function that handles an event
:return: nothing
"""
thread = threading.Thread(
target=stream_listener, args=(stream, handler), daemon=True
)
thread.start()
try:
for event_proto in stream:
event = wrappers.Event.from_proto(event_proto)
handler(event)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
logging.debug("session stream closed")
else:
logging.exception("session stream error")
class CoreGrpcClient:
@ -469,12 +498,11 @@ class CoreGrpcClient:
response = self.stub.SessionAlert(request)
return response.result
# TODO: determine best path for handling non proto events
def events(
self,
session_id: int,
handler: Callable[[core_pb2.Event], None],
events: List[core_pb2.Event] = None,
handler: Callable[[wrappers.Event], None],
events: List[wrappers.EventType] = None,
) -> grpc.Future:
"""
Listen for session events.
@ -487,12 +515,14 @@ class CoreGrpcClient:
"""
request = core_pb2.EventsRequest(session_id=session_id, events=events)
stream = self.stub.Events(request)
start_streamer(stream, handler)
thread = threading.Thread(
target=event_listener, args=(stream, handler), daemon=True
)
thread.start()
return stream
# TODO: determine best path for handling non proto events
def throughputs(
self, session_id: int, handler: Callable[[core_pb2.ThroughputsEvent], None]
self, session_id: int, handler: Callable[[wrappers.ThroughputsEvent], None]
) -> grpc.Future:
"""
Listen for throughput events with information for interfaces and bridges.
@ -504,12 +534,14 @@ class CoreGrpcClient:
"""
request = core_pb2.ThroughputsRequest(session_id=session_id)
stream = self.stub.Throughputs(request)
start_streamer(stream, handler)
thread = threading.Thread(
target=throughput_listener, args=(stream, handler), daemon=True
)
thread.start()
return stream
# TODO: determine best path for handling non proto events
def cpu_usage(
self, delay: int, handler: Callable[[core_pb2.CpuUsageEvent], None]
self, delay: int, handler: Callable[[wrappers.CpuUsageEvent], None]
) -> grpc.Future:
"""
Listen for cpu usage events with the given repeat delay.
@ -520,7 +552,10 @@ class CoreGrpcClient:
"""
request = core_pb2.CpuUsageRequest(delay=delay)
stream = self.stub.CpuUsage(request)
start_streamer(stream, handler)
thread = threading.Thread(
target=cpu_listener, args=(stream, handler), daemon=True
)
thread.start()
return stream
def add_node(self, session_id: int, node: wrappers.Node, source: str = None) -> int:

View file

@ -1,7 +1,7 @@
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple
from core.api.grpc import (
common_pb2,
@ -100,6 +100,15 @@ class ServiceAction(Enum):
VALIDATE = 3
class EventType:
SESSION = 0
NODE = 1
LINK = 2
CONFIG = 3
EXCEPTION = 4
FILE = 5
@dataclass
class ConfigService:
group: str
@ -285,6 +294,15 @@ class ThroughputsEvent:
)
@dataclass
class CpuUsageEvent:
usage: float
@classmethod
def from_proto(cls, proto: core_pb2.CpuUsageEvent) -> "CpuUsageEvent":
return CpuUsageEvent(usage=proto.usage)
@dataclass
class SessionLocation:
x: float
@ -776,6 +794,133 @@ class NodeEvent:
)
@dataclass
class SessionEvent:
node_id: int
event: int
name: str
data: str
time: float
@classmethod
def from_proto(cls, proto: core_pb2.SessionEvent) -> "SessionEvent":
return SessionEvent(
node_id=proto.node_id,
event=proto.event,
name=proto.name,
data=proto.data,
time=proto.time,
)
@dataclass
class FileEvent:
message_type: MessageType
node_id: int
name: str
mode: str
number: int
type: str
source: str
data: str
compressed_data: str
@classmethod
def from_proto(cls, proto: core_pb2.FileEvent) -> "FileEvent":
return FileEvent(
message_type=MessageType(proto.message_type),
node_id=proto.node_id,
name=proto.name,
mode=proto.mode,
number=proto.number,
type=proto.type,
source=proto.source,
data=proto.data,
compressed_data=proto.compressed_data,
)
@dataclass
class ConfigEvent:
message_type: MessageType
node_id: int
object: str
type: int
data_types: List[int]
data_values: str
captions: str
bitmap: str
possible_values: str
groups: str
iface_id: int
network_id: int
opaque: str
@classmethod
def from_proto(cls, proto: core_pb2.ConfigEvent) -> "ConfigEvent":
return ConfigEvent(
message_type=MessageType(proto.message_type),
node_id=proto.node_id,
object=proto.object,
type=proto.type,
data_types=list(proto.data_types),
data_values=proto.data_values,
captions=proto.captions,
bitmap=proto.bitmap,
possible_values=proto.possible_values,
groups=proto.groups,
iface_id=proto.iface_id,
network_id=proto.network_id,
opaque=proto.opaque,
)
@dataclass
class Event:
session_id: int
source: str = None
session_event: SessionEvent = None
node_event: NodeEvent = None
link_event: LinkEvent = None
config_event: Any = None
exception_event: ExceptionEvent = None
file_event: FileEvent = None
@classmethod
def from_proto(cls, proto: core_pb2.Event) -> "Event":
source = proto.source if proto.source else None
node_event = None
link_event = None
exception_event = None
session_event = None
file_event = None
config_event = None
if proto.HasField("node_event"):
node_event = NodeEvent.from_proto(proto.node_event)
elif proto.HasField("link_event"):
link_event = LinkEvent.from_proto(proto.link_event)
elif proto.HasField("exception_event"):
exception_event = ExceptionEvent.from_proto(
proto.session_id, proto.exception_event
)
elif proto.HasField("session_event"):
session_event = SessionEvent.from_proto(proto.session_event)
elif proto.HasField("file_event"):
file_event = FileEvent.from_proto(proto.file_event)
elif proto.HasField("config_event"):
config_event = ConfigEvent.from_proto(proto.config_event)
return Event(
session_id=proto.session_id,
source=source,
node_event=node_event,
link_event=link_event,
exception_event=exception_event,
session_event=session_event,
file_event=file_event,
config_event=config_event,
)
@dataclass
class EmaneEventChannel:
group: str