grpc added config, exception, and file event streams

This commit is contained in:
bharnden 2019-03-17 23:29:38 -07:00
parent f60a6720f0
commit 8ee1db5dc8
3 changed files with 182 additions and 5 deletions

View file

@ -95,6 +95,42 @@ class CoreApiClient(object):
thread.daemon = True thread.daemon = True
thread.start() thread.start()
def config_events(self, _id, handler):
request = core_pb2.ConfigEventsRequest()
request.id = _id
def listen():
for event in self.stub.ConfigEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def exception_events(self, _id, handler):
request = core_pb2.ExceptionEventsRequest()
request.id = _id
def listen():
for event in self.stub.ExceptionEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def file_events(self, _id, handler):
request = core_pb2.FileEventsRequest()
request.id = _id
def listen():
for event in self.stub.FileEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def create_node(self, session, _type=NodeTypes.DEFAULT, _id=None, node_options=None, emane=None): def create_node(self, session, _type=NodeTypes.DEFAULT, _id=None, node_options=None, emane=None):
if not node_options: if not node_options:
node_options = NodeOptions() node_options = NodeOptions()
@ -436,7 +472,8 @@ def main():
client = CoreApiClient() client = CoreApiClient()
with client.context_connect(): with client.context_connect():
if os.path.exists(xml_file_name): if os.path.exists(xml_file_name):
print("open xml: {}".format(client.open_xml(xml_file_name))) response = client.open_xml(xml_file_name)
print("open xml: {}".format(response))
print("services: {}".format(client.get_services())) print("services: {}".format(client.get_services()))

View file

@ -2,10 +2,10 @@ import logging
import os import os
import tempfile import tempfile
import time import time
from Queue import Queue
import grpc import grpc
from concurrent import futures from concurrent import futures
from Queue import Queue
import core_pb2 import core_pb2
import core_pb2_grpc import core_pb2_grpc
@ -311,9 +311,82 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
data=event.data, data=event.data,
time=event_time, time=event_time,
session=session.session_id session=session.session_id
) )
yield session_event yield session_event
def ConfigEvents(self, request, context):
session = self.get_session(request.id, context)
queue = Queue()
session.config_handlers.append(lambda x: queue.put(x))
while True:
event = queue.get()
config_event = core_pb2.ConfigEvent()
update_proto(
config_event,
message_type=event.message_type,
node=event.node,
object=event.object,
type=event.type,
captions=event.captions,
bitmap=event.bitmap,
data_values=event.data_values,
possible_values=event.possible_values,
groups=event.groups,
session=event.session,
interface=event.interface_number,
network_id=event.network_id,
opaque=event.opaque
)
config_event.data_types.extend(event.data_types)
yield config_event
def ExceptionEvents(self, request, context):
session = self.get_session(request.id, context)
queue = Queue()
session.exception_handlers.append(lambda x: queue.put(x))
while True:
event = queue.get()
exception_event = core_pb2.ExceptionEvent()
event_time = event.date
if event_time is not None:
event_time = float(event_time)
update_proto(
exception_event,
node=event.node,
session=event.session,
level=event.level,
source=event.source,
date=event_time,
text=event.text,
opaque=event.opaque
)
yield exception_event
def FileEvents(self, request, context):
session = self.get_session(request.id, context)
queue = Queue()
session.file_handlers.append(lambda x: queue.put(x))
while True:
event = queue.get()
file_event = core_pb2.FileEvent()
update_proto(
file_event,
message_type=event.message_type,
node=event.node,
name=event.name,
mode=event.mode,
number=event.number,
type=event.type,
source=event.source,
session=event.session,
data=event.data,
compressed_data=event.compressed_data
)
yield file_event
def CreateNode(self, request, context): def CreateNode(self, request, context):
session = self.get_session(request.session, context) session = self.get_session(request.session, context)
@ -799,11 +872,9 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
model = session.emane.models[model_name] model = session.emane.models[model_name]
config = session.emane.get_model_config(node_id, model_name) config = session.emane.get_model_config(node_id, model_name)
config_groups = get_config_groups(config, model) config_groups = get_config_groups(config, model)
# node_configurations = response.setdefault(node_id, {})
node_configurations = response.configs[node_id] node_configurations = response.configs[node_id]
node_configurations.model = model_name node_configurations.model = model_name
node_configurations.groups.extend(config_groups) node_configurations.groups.extend(config_groups)
# node_configurations[model_name] = config_groups
return response return response
def SaveXml(self, request, context): def SaveXml(self, request, context):

View file

@ -28,6 +28,12 @@ service CoreApi {
} }
rpc SessionEvents (SessionEventsRequest) returns (stream SessionEvent) { rpc SessionEvents (SessionEventsRequest) returns (stream SessionEvent) {
} }
rpc ConfigEvents (ConfigEventsRequest) returns (stream ConfigEvent) {
}
rpc ExceptionEvents (ExceptionEventsRequest) returns (stream ExceptionEvent) {
}
rpc FileEvents (FileEventsRequest) returns (stream FileEvent) {
}
// node rpc // node rpc
rpc CreateNode (CreateNodeRequest) returns (CreateNodeResponse) { rpc CreateNode (CreateNodeRequest) returns (CreateNodeResponse) {
@ -212,6 +218,58 @@ message SessionEvent {
int32 session = 6; int32 session = 6;
} }
message ConfigEventsRequest {
int32 id = 1;
}
message ConfigEvent {
MessageType message_type = 1;
int32 node = 2;
string object = 3;
int32 type = 4;
repeated int32 data_types = 5;
string data_values = 6;
string captions = 7;
string bitmap = 8;
string possible_values = 9;
string groups = 10;
string session = 11;
int32 interface = 12;
int32 network_id = 13;
string opaque = 14;
}
message ExceptionEventsRequest {
int32 id = 1;
}
message ExceptionEvent {
int32 node = 1;
int32 session = 2;
string level = 3;
string source = 4;
float date = 5;
string text = 6;
string opaque = 7;
}
message FileEventsRequest {
int32 id = 1;
}
message FileEvent {
MessageType message_type = 1;
int32 node = 2;
string name = 3;
string mode = 4;
int32 number = 5;
string type = 6;
string source = 7;
int32 session = 8;
bytes data = 9;
bytes compressed_data = 10;
}
message CreateNodeRequest { message CreateNodeRequest {
int32 session = 1; int32 session = 1;
int32 id = 2; int32 id = 2;
@ -535,6 +593,17 @@ message OpenXmlResponse {
} }
// data structures for messages below // data structures for messages below
enum MessageType {
NOTHING = 0;
ADD = 1;
DELETE = 2;
CRI = 4;
LOCAL = 8;
STRING = 16;
TEXT = 32;
TTY = 64;
}
enum SessionState { enum SessionState {
NONE = 0; NONE = 0;
DEFINITION = 1; DEFINITION = 1;