grpc added node/session event streaming requests

This commit is contained in:
bharnden 2019-03-16 13:48:25 -07:00
parent d9ae7d5c34
commit f24376d66c
3 changed files with 108 additions and 14 deletions

View file

@ -1,6 +1,7 @@
from __future__ import print_function from __future__ import print_function
import logging import logging
import os import os
import threading
from contextlib import contextmanager from contextlib import contextmanager
import grpc import grpc
@ -64,6 +65,30 @@ class CoreApiClient(object):
request.state = state.value request.state = state.value
return self.stub.SetSessionState(request) return self.stub.SetSessionState(request)
def node_events(self, _id, handler):
request = core_pb2.NodeEventsRequest()
request.id = _id
def listen():
for event in self.stub.NodeEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def session_events(self, _id, handler):
request = core_pb2.SessionEventsRequest()
request.id = _id
def listen():
for event in self.stub.SessionEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def create_node(self, session, _type=NodeTypes.DEFAULT, _id=None, node_options=None, emane=None): def create_node(self, session, _type=NodeTypes.DEFAULT, _id=None, node_options=None, emane=None):
if not node_options: if not node_options:
node_options = NodeOptions() node_options = NodeOptions()

View file

@ -1,17 +1,16 @@
import logging
import os import os
import tempfile import tempfile
from core.emulator.emudata import NodeOptions, InterfaceData, LinkOptions
from core.enumerations import NodeTypes, EventTypes, LinkTypes
from concurrent import futures
import time import time
import logging
import grpc import grpc
from concurrent import futures
from Queue import Queue
import core_pb2 import core_pb2
import core_pb2_grpc import core_pb2_grpc
from core.emulator.emudata import NodeOptions, InterfaceData, LinkOptions
from core.enumerations import NodeTypes, EventTypes, LinkTypes
from core.misc import nodeutils from core.misc import nodeutils
from core.mobility import BasicRangeModel from core.mobility import BasicRangeModel
from core.service import ServiceManager from core.service import ServiceManager
@ -119,14 +118,6 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
session.location.setrefgeo(47.57917, -122.13232, 2.0) session.location.setrefgeo(47.57917, -122.13232, 2.0)
session.location.refscale = 150000.0 session.location.refscale = 150000.0
# grpc stream handlers
# session.event_handlers.append(websocket_routes.broadcast_event)
# session.node_handlers.append(websocket_routes.broadcast_node)
# session.config_handlers.append(websocket_routes.broadcast_config)
# session.link_handlers.append(websocket_routes.broadcast_link)
# session.exception_handlers.append(websocket_routes.broadcast_exception)
# session.file_handlers.append(websocket_routes.broadcast_file)
response = core_pb2.CreateSessionResponse() response = core_pb2.CreateSessionResponse()
response.id = session.session_id response.id = session.session_id
response.state = session.state response.state = session.state
@ -262,6 +253,57 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
return response return response
def NodeEvents(self, request, context):
session = self.coreemu.sessions.get(request.id)
if not session:
raise Exception("no session found")
queue = Queue()
session.node_handlers.append(lambda x: queue.put(x))
while True:
node = queue.get()
node_event = core_pb2.NodeEvent()
update_proto(
node_event.node,
id=node.id,
name=node.name,
model=node.model
)
update_proto(
node_event.node.position,
x=node.x_position,
y=node.y_position
)
services = node.services or ""
node_event.node.services.extend(services.split("|"))
yield node_event
def SessionEvents(self, request, context):
session = self.coreemu.sessions.get(request.id)
if not session:
raise Exception("no session found")
queue = Queue()
session.event_handlers.append(lambda x: queue.put(x))
while True:
event = queue.get()
session_event = core_pb2.SessionEvent()
event_time = event.time
if event_time is not None:
event_time = float(event_time)
update_proto(
session_event,
node=event.node,
event=event.event_type,
name=event.name,
data=event.data,
time=event_time,
session=session.session_id
)
yield session_event
def CreateNode(self, request, context): def CreateNode(self, request, context):
session = self.coreemu.sessions.get(request.session) session = self.coreemu.sessions.get(request.session)
if not session: if not session:

View file

@ -21,6 +21,12 @@ service CoreApi {
rpc SetSessionState (SetSessionStateRequest) returns (SetSessionStateResponse) { rpc SetSessionState (SetSessionStateRequest) returns (SetSessionStateResponse) {
} }
// event streams
rpc NodeEvents (NodeEventsRequest) returns (stream NodeEvent) {
}
rpc SessionEvents (SessionEventsRequest) returns (stream SessionEvent) {
}
// node rpc // node rpc
rpc CreateNode (CreateNodeRequest) returns (CreateNodeResponse) { rpc CreateNode (CreateNodeRequest) returns (CreateNodeResponse) {
} }
@ -155,6 +161,27 @@ message SetSessionStateResponse {
bool result = 1; bool result = 1;
} }
message NodeEventsRequest {
int32 id = 1;
}
message NodeEvent {
Node node = 1;
}
message SessionEventsRequest {
int32 id = 1;
}
message SessionEvent {
int32 node = 1;
int32 event = 2;
string name = 3;
bytes data = 4;
float time = 5;
int32 session = 6;
}
message CreateNodeRequest { message CreateNodeRequest {
int32 session = 1; int32 session = 1;
int32 id = 2; int32 id = 2;