updates to grpc event streaming, client can now listen to a subset of desired events

This commit is contained in:
Blake Harnden 2019-12-13 16:11:58 -08:00
parent 47e087b365
commit 47cc20b567
4 changed files with 293 additions and 211 deletions

View file

@ -378,16 +378,17 @@ class CoreGrpcClient:
)
return self.stub.AddSessionServer(request)
def events(self, session_id, handler):
def events(self, session_id, handler, events=None):
"""
Listen for session events.
:param int session_id: id of session
:param handler: handler for every event
:param handler: handler for received events
:param list events: events to listen to, defaults to all
:return: nothing
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.EventsRequest(session_id=session_id)
request = core_pb2.EventsRequest(session_id=session_id, events=events)
stream = self.stub.Events(request)
start_streamer(stream, handler)

View file

@ -0,0 +1,268 @@
import logging
from queue import Empty, Queue
from core.api.grpc import core_pb2
from core.api.grpc.grpcutils import convert_value
from core.emulator.data import (
ConfigData,
EventData,
ExceptionData,
FileData,
LinkData,
NodeData,
)
def handle_node_event(event):
"""
Handle node event when there is a node event
:param core.emulator.data.NodeData event: node data
:return: node event that contains node id, name, model, position, and services
:rtype: core.api.grpc.core_pb2.NodeEvent
"""
position = core_pb2.Position(x=event.x_position, y=event.y_position)
services = event.services or ""
services = services.split("|")
node_proto = core_pb2.Node(
id=event.id,
name=event.name,
model=event.model,
position=position,
services=services,
)
return core_pb2.NodeEvent(node=node_proto, source=event.source)
def handle_link_event(event):
"""
Handle link event when there is a link event
:param core.emulator.data.LinkData event: link data
:return: link event that has message type and link information
:rtype: core.api.grpc.core_pb2.LinkEvent
"""
interface_one = None
if event.interface1_id is not None:
interface_one = core_pb2.Interface(
id=event.interface1_id,
name=event.interface1_name,
mac=convert_value(event.interface1_mac),
ip4=convert_value(event.interface1_ip4),
ip4mask=event.interface1_ip4_mask,
ip6=convert_value(event.interface1_ip6),
ip6mask=event.interface1_ip6_mask,
)
interface_two = None
if event.interface2_id is not None:
interface_two = core_pb2.Interface(
id=event.interface2_id,
name=event.interface2_name,
mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4),
ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6),
ip6mask=event.interface2_ip6_mask,
)
options = core_pb2.LinkOptions(
opaque=event.opaque,
jitter=event.jitter,
key=event.key,
mburst=event.mburst,
mer=event.mer,
per=event.per,
bandwidth=event.bandwidth,
burst=event.burst,
delay=event.delay,
dup=event.dup,
unidirectional=event.unidirectional,
)
link = core_pb2.Link(
type=event.link_type,
node_one_id=event.node1_id,
node_two_id=event.node2_id,
interface_one=interface_one,
interface_two=interface_two,
options=options,
)
return core_pb2.LinkEvent(message_type=event.message_type, link=link)
def handle_session_event(event):
"""
Handle session event when there is a session event
:param core.emulator.data.EventData event: event data
:return: session event
:rtype: core.api.grpc.core_pb2.SessionEvent
"""
event_time = event.time
if event_time is not None:
event_time = float(event_time)
return core_pb2.SessionEvent(
node_id=event.node,
event=event.event_type,
name=event.name,
data=event.data,
time=event_time,
session_id=event.session,
)
def handle_config_event(event):
"""
Handle configuration event when there is configuration event
:param core.emulator.data.ConfigData event: configuration data
:return: configuration event
:rtype: core.api.grpc.core_pb2.ConfigEvent
"""
session_id = None
if event.session is not None:
session_id = int(event.session)
return core_pb2.ConfigEvent(
message_type=event.message_type,
node_id=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
session_id=session_id,
interface=event.interface_number,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types,
)
def handle_exception_event(event):
"""
Handle exception event when there is exception event
:param core.emulator.data.ExceptionData event: exception data
:return: exception event
:rtype: core.api.grpc.core_pb2.ExceptionEvent
"""
return core_pb2.ExceptionEvent(
node_id=event.node,
session_id=int(event.session),
level=event.level,
source=event.source,
date=event.date,
text=event.text,
opaque=event.opaque,
)
def handle_file_event(event):
"""
Handle file event
:param core.emulator.data.FileData event: file data
:return: file event
:rtype: core.api.grpc.core_pb2.FileEvent
"""
return core_pb2.FileEvent(
message_type=event.message_type,
node_id=event.node,
name=event.name,
mode=event.mode,
number=event.number,
type=event.type,
source=event.source,
session_id=event.session,
data=event.data,
compressed_data=event.compressed_data,
)
class EventStreamer:
"""
Processes session events to generate grpc events.
"""
def __init__(self, session, event_types):
"""
Create a EventStreamer instance.
:param core.emulator.session.Session session: session to process events for
:param set event_types: types of events to process
"""
self.session = session
self.event_types = event_types
self.queue = Queue()
self.add_handlers()
def add_handlers(self):
"""
Add a session event handler for desired event types.
:return: nothing
"""
if core_pb2.EventType.NODE in self.event_types:
self.session.node_handlers.append(self.queue.put)
if core_pb2.EventType.LINK in self.event_types:
self.session.link_handlers.append(self.queue.put)
if core_pb2.EventType.CONFIG in self.event_types:
self.session.config_handlers.append(self.queue.put)
if core_pb2.EventType.FILE in self.event_types:
self.session.file_handlers.append(self.queue.put)
if core_pb2.EventType.EXCEPTION in self.event_types:
self.session.exception_handlers.append(self.queue.put)
if core_pb2.EventType.SESSION in self.event_types:
self.session.event_handlers.append(self.queue.put)
def process(self):
"""
Process the next event in the queue.
:return: grpc event, or None when invalid event or queue timeout
:rtype: core.api.grpc.core_pb2.Event
"""
event = core_pb2.Event()
try:
data = self.queue.get(timeout=1)
if isinstance(data, NodeData):
event.node_event.CopyFrom(handle_node_event(data))
elif isinstance(data, LinkData):
event.link_event.CopyFrom(handle_link_event(data))
elif isinstance(data, EventData):
event.session_event.CopyFrom(handle_session_event(data))
elif isinstance(data, ConfigData):
event.config_event.CopyFrom(handle_config_event(data))
# TODO: remove when config events are fixed
event.config_event.session_id = self.session.id
elif isinstance(data, ExceptionData):
event.exception_event.CopyFrom(handle_exception_event(data))
elif isinstance(data, FileData):
event.file_event.CopyFrom(handle_file_event(data))
else:
logging.error("unknown event: %s", data)
event = None
except Empty:
event = None
return event
def remove_handlers(self):
"""
Remove session event handlers for events being watched.
:return: nothing
"""
if core_pb2.EventType.NODE in self.event_types:
self.session.node_handlers.remove(self.queue.put)
if core_pb2.EventType.LINK in self.event_types:
self.session.link_handlers.remove(self.queue.put)
if core_pb2.EventType.CONFIG in self.event_types:
self.session.config_handlers.remove(self.queue.put)
if core_pb2.EventType.FILE in self.event_types:
self.session.file_handlers.remove(self.queue.put)
if core_pb2.EventType.EXCEPTION in self.event_types:
self.session.exception_handlers.remove(self.queue.put)
if core_pb2.EventType.SESSION in self.event_types:
self.session.event_handlers.remove(self.queue.put)

View file

@ -5,27 +5,19 @@ import re
import tempfile
import time
from concurrent import futures
from queue import Empty, Queue
import grpc
from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils
from core.api.grpc.events import EventStreamer
from core.api.grpc.grpcutils import (
convert_value,
get_config_options,
get_emane_model_id,
get_links,
get_net_stats,
)
from core.emane.nodes import EmaneNet
from core.emulator.data import (
ConfigData,
EventData,
ExceptionData,
FileData,
LinkData,
NodeData,
)
from core.emulator.data import LinkData
from core.emulator.emudata import LinkOptions, NodeOptions
from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags
from core.errors import CoreCommandError, CoreError
@ -439,210 +431,19 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def Events(self, request, context):
session = self.get_session(request.session_id, context)
queue = Queue()
session.node_handlers.append(queue.put)
session.link_handlers.append(queue.put)
session.config_handlers.append(queue.put)
session.file_handlers.append(queue.put)
session.exception_handlers.append(queue.put)
session.event_handlers.append(queue.put)
event_types = set(request.events)
if not event_types:
event_types = set(core_pb2.EventType.Enum.values())
streamer = EventStreamer(session, event_types)
while self._is_running(context):
event = core_pb2.Event()
try:
data = queue.get(timeout=1)
if isinstance(data, NodeData):
event.node_event.CopyFrom(self._handle_node_event(data))
elif isinstance(data, LinkData):
event.link_event.CopyFrom(self._handle_link_event(data))
elif isinstance(data, EventData):
event.session_event.CopyFrom(self._handle_session_event(data))
elif isinstance(data, ConfigData):
event.config_event.CopyFrom(self._handle_config_event(data))
# TODO: remove when config events are fixed
event.config_event.session_id = session.id
elif isinstance(data, ExceptionData):
event.exception_event.CopyFrom(self._handle_exception_event(data))
elif isinstance(data, FileData):
event.file_event.CopyFrom(self._handle_file_event(data))
else:
logging.error("unknown event: %s", data)
continue
event = streamer.process()
if event:
yield event
except Empty:
continue
session.node_handlers.remove(queue.put)
session.link_handlers.remove(queue.put)
session.config_handlers.remove(queue.put)
session.file_handlers.remove(queue.put)
session.exception_handlers.remove(queue.put)
session.event_handlers.remove(queue.put)
streamer.remove_handlers()
self._cancel_stream(context)
def _handle_node_event(self, event):
"""
Handle node event when there is a node event
:param core.emulator.data.NodeData event: node data
:return: node event that contains node id, name, model, position, and services
:rtype: core.api.grpc.core_pb2.NodeEvent
"""
position = core_pb2.Position(x=event.x_position, y=event.y_position)
services = event.services or ""
services = services.split("|")
node_proto = core_pb2.Node(
id=event.id,
name=event.name,
model=event.model,
position=position,
services=services,
)
return core_pb2.NodeEvent(node=node_proto, source=event.source)
def _handle_link_event(self, event):
"""
Handle link event when there is a link event
:param core.emulator.data.LinkData event: link data
:return: link event that has message type and link information
:rtype: core.api.grpc.core_pb2.LinkEvent
"""
interface_one = None
if event.interface1_id is not None:
interface_one = core_pb2.Interface(
id=event.interface1_id,
name=event.interface1_name,
mac=convert_value(event.interface1_mac),
ip4=convert_value(event.interface1_ip4),
ip4mask=event.interface1_ip4_mask,
ip6=convert_value(event.interface1_ip6),
ip6mask=event.interface1_ip6_mask,
)
interface_two = None
if event.interface2_id is not None:
interface_two = core_pb2.Interface(
id=event.interface2_id,
name=event.interface2_name,
mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4),
ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6),
ip6mask=event.interface2_ip6_mask,
)
options = core_pb2.LinkOptions(
opaque=event.opaque,
jitter=event.jitter,
key=event.key,
mburst=event.mburst,
mer=event.mer,
per=event.per,
bandwidth=event.bandwidth,
burst=event.burst,
delay=event.delay,
dup=event.dup,
unidirectional=event.unidirectional,
)
link = core_pb2.Link(
type=event.link_type,
node_one_id=event.node1_id,
node_two_id=event.node2_id,
interface_one=interface_one,
interface_two=interface_two,
options=options,
)
return core_pb2.LinkEvent(message_type=event.message_type, link=link)
def _handle_session_event(self, event):
"""
Handle session event when there is a session event
:param core.emulator.data.EventData event: event data
:return: session event
:rtype: core.api.grpc.core_pb2.SessionEvent
"""
event_time = event.time
if event_time is not None:
event_time = float(event_time)
return core_pb2.SessionEvent(
node_id=event.node,
event=event.event_type,
name=event.name,
data=event.data,
time=event_time,
session_id=event.session,
)
def _handle_config_event(self, event):
"""
Handle configuration event when there is configuration event
:param core.emulator.data.ConfigData event: configuration data
:return: configuration event
:rtype: core.api.grpc.core_pb2.ConfigEvent
"""
session_id = None
if event.session is not None:
session_id = int(event.session)
return core_pb2.ConfigEvent(
message_type=event.message_type,
node_id=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
session_id=session_id,
interface=event.interface_number,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types,
)
def _handle_exception_event(self, event):
"""
Handle exception event when there is exception event
:param core.emulator.data.ExceptionData event: exception data
:return: exception event
:rtype: core.api.grpc.core_pb2.ExceptionEvent
"""
return core_pb2.ExceptionEvent(
node_id=event.node,
session_id=int(event.session),
level=event.level,
source=event.source,
date=event.date,
text=event.text,
opaque=event.opaque,
)
def _handle_file_event(self, event):
"""
Handle file event
:param core.emulator.data.FileData event: file data
:return: file event
:rtype: core.api.grpc.core_pb2.FileEvent
"""
return core_pb2.FileEvent(
message_type=event.message_type,
node_id=event.node,
name=event.name,
mode=event.mode,
number=event.number,
type=event.type,
source=event.source,
session_id=event.session,
data=event.data,
compressed_data=event.compressed_data,
)
def Throughputs(self, request, context):
"""
Calculate average throughput after every certain amount of delay time

View file

@ -266,6 +266,7 @@ message AddSessionServerResponse {
message EventsRequest {
int32 session_id = 1;
repeated EventType.Enum events = 2;
}
message ThroughputsRequest {
@ -742,6 +743,17 @@ message ServiceFileConfig {
string data = 4;
}
message EventType {
enum Enum {
SESSION = 0;
NODE = 1;
LINK = 2;
CONFIG = 3;
EXCEPTION = 4;
FILE = 5;
}
}
message MessageType {
enum Enum {
NONE = 0;