grpc merged multiple event request/handlers into a singular event request for a session, which will return all events

This commit is contained in:
Blake Harnden 2019-05-30 13:07:45 -07:00
parent 2ba8669c5c
commit df3a8980ed
4 changed files with 152 additions and 257 deletions

View file

@ -263,33 +263,7 @@ class CoreGrpcClient(object):
request = core_pb2.SetSessionStateRequest(session_id=session_id, state=state) request = core_pb2.SetSessionStateRequest(session_id=session_id, state=state)
return self.stub.SetSessionState(request) return self.stub.SetSessionState(request)
def node_events(self, session_id, handler): def events(self, session_id, handler):
"""
Listen for session node events.
:param int session_id: id of session
:param handler: handler for every event
:return: nothing
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.NodeEventsRequest(session_id=session_id)
stream = self.stub.NodeEvents(request)
start_streamer(stream, handler)
def link_events(self, session_id, handler):
"""
Listen for session link events.
:param int session_id: id of session
:param handler: handler for every event
:return: nothing
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.LinkEventsRequest(session_id=session_id)
stream = self.stub.LinkEvents(request)
start_streamer(stream, handler)
def session_events(self, session_id, handler):
""" """
Listen for session events. Listen for session events.
@ -298,47 +272,8 @@ class CoreGrpcClient(object):
:return: nothing :return: nothing
:raises grpc.RpcError: when session doesn't exist :raises grpc.RpcError: when session doesn't exist
""" """
request = core_pb2.SessionEventsRequest(session_id=session_id) request = core_pb2.EventsRequest(session_id=session_id)
stream = self.stub.SessionEvents(request) stream = self.stub.Events(request)
start_streamer(stream, handler)
def config_events(self, session_id, handler):
"""
Listen for session config events.
:param int session_id: id of session
:param handler: handler for every event
:return: nothing
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.ConfigEventsRequest(session_id=session_id)
stream = self.stub.ConfigEvents(request)
start_streamer(stream, handler)
def exception_events(self, session_id, handler):
"""
Listen for session exception events.
:param int session_id: id of session
:param handler: handler for every event
:return: nothing
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.ExceptionEventsRequest(session_id=session_id)
stream = self.stub.ExceptionEvents(request)
start_streamer(stream, handler)
def file_events(self, session_id, handler):
"""
Listen for session file events.
:param int session_id: id of session
:param handler: handler for every event
:return: nothing
:raises grpc.RpcError: when session doesn't exist
"""
request = core_pb2.FileEventsRequest(session_id=session_id)
stream = self.stub.FileEvents(request)
start_streamer(stream, handler) start_streamer(stream, handler)
def add_node(self, session_id, node): def add_node(self, session_id, node):

View file

@ -4,6 +4,7 @@ import os
import tempfile import tempfile
import time import time
from Queue import Queue, Empty from Queue import Queue, Empty
from core.data import NodeData, LinkData, EventData, ConfigData, ExceptionData, FileData
import grpc import grpc
from concurrent import futures from concurrent import futures
@ -262,177 +263,149 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
session_proto = core_pb2.Session(state=session.state, nodes=nodes, links=links) session_proto = core_pb2.Session(state=session.state, nodes=nodes, links=links)
return core_pb2.GetSessionResponse(session=session_proto) return core_pb2.GetSessionResponse(session=session_proto)
def NodeEvents(self, request, context): def Events(self, request, context):
session = self.get_session(request.session_id, context) session = self.get_session(request.session_id, context)
queue = Queue() queue = Queue()
session.node_handlers.append(queue.put) session.node_handlers.append(queue.put)
while self._is_running(context):
try:
node = queue.get(timeout=1)
position = core_pb2.Position(x=node.x_position, y=node.y_position)
services = node.services or ""
services = services.split("|")
node_proto = core_pb2.Node(
id=node.id, name=node.name, model=node.model, position=position, services=services)
node_event = core_pb2.NodeEvent(node=node_proto)
yield node_event
except Empty:
continue
self._cancel_stream(context)
def LinkEvents(self, request, context):
session = self.get_session(request.session_id, context)
queue = Queue()
session.link_handlers.append(queue.put) session.link_handlers.append(queue.put)
session.config_handlers.append(queue.put)
while self._is_running(context): session.file_handlers.append(queue.put)
try: session.exception_handlers.append(queue.put)
event = queue.get(timeout=1)
interface_one = None
if event.interface1_id is not None:
interface_one = core_pb2.Interface(
id=event.interface1_id, name=event.interface1_name, mac=convert_value(event.interface1_mac),
ip4=convert_value(event.interface1_ip4), ip4mask=event.interface1_ip4_mask,
ip6=convert_value(event.interface1_ip6), ip6mask=event.interface1_ip6_mask)
interface_two = None
if event.interface2_id is not None:
interface_two = core_pb2.Interface(
id=event.interface2_id, name=event.interface2_name, mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4), ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6), ip6mask=event.interface2_ip6_mask)
options = core_pb2.LinkOptions(
opaque=event.opaque,
jitter=event.jitter,
key=event.key,
mburst=event.mburst,
mer=event.mer,
per=event.per,
bandwidth=event.bandwidth,
burst=event.burst,
delay=event.delay,
dup=event.dup,
unidirectional=event.unidirectional
)
link = core_pb2.Link(
type=event.link_type, node_one_id=event.node1_id, node_two_id=event.node2_id,
interface_one=interface_one, interface_two=interface_two, options=options)
link_event = core_pb2.LinkEvent(message_type=event.message_type, link=link)
yield link_event
except Empty:
continue
self._cancel_stream(context)
def SessionEvents(self, request, context):
session = self.get_session(request.session_id, context)
queue = Queue()
session.event_handlers.append(queue.put) session.event_handlers.append(queue.put)
while self._is_running(context): while self._is_running(context):
event = core_pb2.Event()
try: try:
event = queue.get(timeout=1) data = queue.get(timeout=1)
event_time = event.time if isinstance(data, NodeData):
if event_time is not None: event.node_event.CopyFrom(self._handle_node_event(data))
event_time = float(event_time) elif isinstance(data, LinkData):
session_event = core_pb2.SessionEvent( event.link_event.CopyFrom(self._handle_link_event(data))
node_id=event.node, elif isinstance(data, EventData):
event=event.event_type, event.session_event.CopyFrom(self._handle_session_event(data))
name=event.name, elif isinstance(data, ConfigData):
data=event.data, event.config_event.CopyFrom(self._handle_config_event(data))
time=event_time, # TODO: remove when config events are fixed
session_id=session.id event.config_event.session_id = session.id
) elif isinstance(data, ExceptionData):
yield session_event event.exception_event.CopyFrom(self._handle_exception_event(data))
elif isinstance(data, FileData):
event.file_event.CopyFrom(self._handle_file_event(data))
else:
logging.error("unknown event: %s", data)
continue
yield event
except Empty: except Empty:
continue continue
session.node_handlers.remove(queue.put)
session.link_handlers.remove(queue.put)
session.config_handlers.remove(queue.put)
session.file_handlers.remove(queue.put)
session.exception_handlers.remove(queue.put)
session.event_handlers.remove(queue.put)
self._cancel_stream(context) self._cancel_stream(context)
def ConfigEvents(self, request, context): def _handle_node_event(self, event):
session = self.get_session(request.session_id, context) position = core_pb2.Position(x=event.x_position, y=event.y_position)
queue = Queue() services = event.services or ""
session.config_handlers.append(queue.put) services = services.split("|")
node_proto = core_pb2.Node(
id=event.id, name=event.name, model=event.model, position=position, services=services)
return core_pb2.NodeEvent(node=node_proto)
while self._is_running(context): def _handle_link_event(self, event):
try: interface_one = None
event = queue.get(timeout=1) if event.interface1_id is not None:
session_id = None interface_one = core_pb2.Interface(
if event.session is not None: id=event.interface1_id, name=event.interface1_name, mac=convert_value(event.interface1_mac),
session_id = int(event.session) ip4=convert_value(event.interface1_ip4), ip4mask=event.interface1_ip4_mask,
config_event = core_pb2.ConfigEvent( ip6=convert_value(event.interface1_ip6), ip6mask=event.interface1_ip6_mask)
message_type=event.message_type,
node_id=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
session_id=session_id,
interface=event.interface_number,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types
)
yield config_event
except Empty:
continue
self._cancel_stream(context) interface_two = None
if event.interface2_id is not None:
interface_two = core_pb2.Interface(
id=event.interface2_id, name=event.interface2_name, mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4), ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6), ip6mask=event.interface2_ip6_mask)
def ExceptionEvents(self, request, context): options = core_pb2.LinkOptions(
session = self.get_session(request.session_id, context) opaque=event.opaque,
queue = Queue() jitter=event.jitter,
session.exception_handlers.append(queue.put) key=event.key,
mburst=event.mburst,
mer=event.mer,
per=event.per,
bandwidth=event.bandwidth,
burst=event.burst,
delay=event.delay,
dup=event.dup,
unidirectional=event.unidirectional
)
link = core_pb2.Link(
type=event.link_type, node_one_id=event.node1_id, node_two_id=event.node2_id,
interface_one=interface_one, interface_two=interface_two, options=options)
return core_pb2.LinkEvent(message_type=event.message_type, link=link)
while self._is_running(context): def _handle_session_event(self, event):
try: event_time = event.time
event = queue.get(timeout=1) if event_time is not None:
exception_event = core_pb2.ExceptionEvent( event_time = float(event_time)
node_id=event.node, return core_pb2.SessionEvent(
session_id=int(event.session), node_id=event.node,
level=event.level.value, event=event.event_type,
source=event.source, name=event.name,
date=event.date, data=event.data,
text=event.text, time=event_time,
opaque=event.opaque session_id=event.session
) )
yield exception_event
except Empty:
continue
self._cancel_stream(context) def _handle_config_event(self, event):
session_id = None
if event.session is not None:
session_id = int(event.session)
return core_pb2.ConfigEvent(
message_type=event.message_type,
node_id=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
session_id=session_id,
interface=event.interface_number,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types
)
def FileEvents(self, request, context): def _handle_exception_event(self, event):
session = self.get_session(request.session_id, context) return core_pb2.ExceptionEvent(
queue = Queue() node_id=event.node,
session.file_handlers.append(queue.put) session_id=int(event.session),
level=event.level.value,
source=event.source,
date=event.date,
text=event.text,
opaque=event.opaque
)
while self._is_running(context): def _handle_file_event(self, event):
try: return core_pb2.FileEvent(
event = queue.get(timeout=1) message_type=event.message_type,
file_event = core_pb2.FileEvent( node_id=event.node,
message_type=event.message_type, name=event.name,
node_id=event.node, mode=event.mode,
name=event.name, number=event.number,
mode=event.mode, type=event.type,
number=event.number, source=event.source,
type=event.type, session_id=event.session,
source=event.source, data=event.data,
session_id=event.session, compressed_data=event.compressed_data
data=event.data, )
compressed_data=event.compressed_data
)
yield file_event
except Empty:
continue
self._cancel_stream(context)
def AddNode(self, request, context): def AddNode(self, request, context):
logging.debug("add node: %s", request) logging.debug("add node: %s", request)

View file

@ -27,17 +27,7 @@ service CoreApi {
} }
// event streams // event streams
rpc NodeEvents (NodeEventsRequest) returns (stream NodeEvent) { rpc Events (EventsRequest) returns (stream Event) {
}
rpc LinkEvents (LinkEventsRequest) returns (stream LinkEvent) {
}
rpc SessionEvents (SessionEventsRequest) returns (stream SessionEvent) {
}
rpc ConfigEvents (ConfigEventsRequest) returns (stream ConfigEvent) {
}
rpc ExceptionEvents (ExceptionEventsRequest) returns (stream ExceptionEvent) {
}
rpc FileEvents (FileEventsRequest) returns (stream FileEvent) {
} }
// node rpc // node rpc
@ -199,27 +189,30 @@ message SetSessionStateResponse {
bool result = 1; bool result = 1;
} }
message NodeEventsRequest { message EventsRequest {
int32 session_id = 1; int32 session_id = 1;
} }
message Event {
oneof event_type {
SessionEvent session_event = 1;
NodeEvent node_event = 2;
LinkEvent link_event = 3;
ConfigEvent config_event = 4;
ExceptionEvent exception_event = 5;
FileEvent file_event = 6;
}
}
message NodeEvent { message NodeEvent {
Node node = 1; Node node = 1;
} }
message LinkEventsRequest {
int32 session_id = 1;
}
message LinkEvent { message LinkEvent {
MessageType.Enum message_type = 1; MessageType.Enum message_type = 1;
Link link = 2; Link link = 2;
} }
message SessionEventsRequest {
int32 session_id = 1;
}
message SessionEvent { message SessionEvent {
int32 node_id = 1; int32 node_id = 1;
int32 event = 2; int32 event = 2;
@ -229,10 +222,6 @@ message SessionEvent {
int32 session_id = 6; int32 session_id = 6;
} }
message ConfigEventsRequest {
int32 session_id = 1;
}
message ConfigEvent { message ConfigEvent {
MessageType.Enum message_type = 1; MessageType.Enum message_type = 1;
int32 node_id = 2; int32 node_id = 2;
@ -250,10 +239,6 @@ message ConfigEvent {
string opaque = 14; string opaque = 14;
} }
message ExceptionEventsRequest {
int32 session_id = 1;
}
message ExceptionEvent { message ExceptionEvent {
int32 node_id = 1; int32 node_id = 1;
int32 session_id = 2; int32 session_id = 2;
@ -264,10 +249,6 @@ message ExceptionEvent {
string opaque = 7; string opaque = 7;
} }
message FileEventsRequest {
int32 session_id = 1;
}
message FileEvent { message FileEvent {
MessageType.Enum message_type = 1; MessageType.Enum message_type = 1;
int32 node_id = 2; int32 node_id = 2;

View file

@ -718,11 +718,12 @@ class TestGrpc:
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
assert event_data.HasField("node_event")
queue.put(event_data) queue.put(event_data)
# then # then
with client.context_connect(): with client.context_connect():
client.node_events(session.id, handle_event) client.events(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
session.broadcast_node(node_data) session.broadcast_node(node_data)
@ -741,11 +742,12 @@ class TestGrpc:
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
assert event_data.HasField("link_event")
queue.put(event_data) queue.put(event_data)
# then # then
with client.context_connect(): with client.context_connect():
client.link_events(session.id, handle_event) client.events(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
session.broadcast_link(link_data) session.broadcast_link(link_data)
@ -759,11 +761,12 @@ class TestGrpc:
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
assert event_data.HasField("session_event")
queue.put(event_data) queue.put(event_data)
# then # then
with client.context_connect(): with client.context_connect():
client.session_events(session.id, handle_event) client.events(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
event = EventData(event_type=EventTypes.RUNTIME_STATE.value, time="%s" % time.time()) event = EventData(event_type=EventTypes.RUNTIME_STATE.value, time="%s" % time.time())
session.broadcast_event(event) session.broadcast_event(event)
@ -778,11 +781,12 @@ class TestGrpc:
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
assert event_data.HasField("config_event")
queue.put(event_data) queue.put(event_data)
# then # then
with client.context_connect(): with client.context_connect():
client.config_events(session.id, handle_event) client.events(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
session_config = session.options.get_configs() session_config = session.options.get_configs()
config_data = ConfigShim.config_data(0, None, ConfigFlags.UPDATE.value, session.options, session_config) config_data = ConfigShim.config_data(0, None, ConfigFlags.UPDATE.value, session.options, session_config)
@ -798,11 +802,12 @@ class TestGrpc:
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
assert event_data.HasField("exception_event")
queue.put(event_data) queue.put(event_data)
# then # then
with client.context_connect(): with client.context_connect():
client.exception_events(session.id, handle_event) client.events(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
session.exception(ExceptionLevels.FATAL, "test", None, "exception message") session.exception(ExceptionLevels.FATAL, "test", None, "exception message")
@ -817,11 +822,12 @@ class TestGrpc:
queue = Queue() queue = Queue()
def handle_event(event_data): def handle_event(event_data):
assert event_data.HasField("file_event")
queue.put(event_data) queue.put(event_data)
# then # then
with client.context_connect(): with client.context_connect():
client.file_events(session.id, handle_event) client.events(session.id, handle_event)
time.sleep(0.1) time.sleep(0.1)
file_data = session.services.get_service_file(node, "IPForward", "ipforward.sh") file_data = session.services.get_service_file(node, "IPForward", "ipforward.sh")
session.broadcast_file(file_data) session.broadcast_file(file_data)