grpc added convenience methods for starting streams, updated logic for bailing out on streams and allow handling them better

This commit is contained in:
bharnden 2019-03-20 22:11:09 -07:00
parent 8009a18a1c
commit 16d9009c3f
2 changed files with 79 additions and 66 deletions

View file

@ -21,6 +21,23 @@ def update_proto(obj, **kwargs):
setattr(obj, key, value) setattr(obj, key, value)
def stream_listener(stream, handler):
try:
for event in stream:
handler(event)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
logging.debug("stream closed")
else:
logging.exception("stream error")
def start_streamer(stream, handler):
thread = threading.Thread(target=stream_listener, args=(stream, handler))
thread.daemon = True
thread.start()
class CoreApiClient(object): class CoreApiClient(object):
def __init__(self, address="localhost:50051"): def __init__(self, address="localhost:50051"):
self.address = address self.address = address
@ -75,74 +92,38 @@ class CoreApiClient(object):
def node_events(self, _id, handler): def node_events(self, _id, handler):
request = core_pb2.NodeEventsRequest() request = core_pb2.NodeEventsRequest()
request.id = _id request.id = _id
stream = self.stub.NodeEvents(request)
def listen(): start_streamer(stream, handler)
for event in self.stub.NodeEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def link_events(self, _id, handler): def link_events(self, _id, handler):
request = core_pb2.LinkEventsRequest() request = core_pb2.LinkEventsRequest()
request.id = _id request.id = _id
stream = self.stub.LinkEvents(request)
def listen(): start_streamer(stream, handler)
for event in self.stub.LinkEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def session_events(self, _id, handler): def session_events(self, _id, handler):
request = core_pb2.SessionEventsRequest() request = core_pb2.SessionEventsRequest()
request.id = _id request.id = _id
stream = self.stub.SessionEvents(request)
def listen(): start_streamer(stream, handler)
for event in self.stub.SessionEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def config_events(self, _id, handler): def config_events(self, _id, handler):
request = core_pb2.ConfigEventsRequest() request = core_pb2.ConfigEventsRequest()
request.id = _id request.id = _id
stream = self.stub.ConfigEvents(request)
def listen(): start_streamer(stream, handler)
for event in self.stub.ConfigEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def exception_events(self, _id, handler): def exception_events(self, _id, handler):
request = core_pb2.ExceptionEventsRequest() request = core_pb2.ExceptionEventsRequest()
request.id = _id request.id = _id
stream = self.stub.ExceptionEvents(request)
def listen(): start_streamer(stream, handler)
for event in self.stub.ExceptionEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def file_events(self, _id, handler): def file_events(self, _id, handler):
request = core_pb2.FileEventsRequest() request = core_pb2.FileEventsRequest()
request.id = _id request.id = _id
stream = self.stub.FileEvents(request)
def listen(): start_streamer(stream, handler)
for event in self.stub.FileEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def create_node(self, session, _type=NodeTypes.DEFAULT, _id=None, node_options=None, emane=None): def create_node(self, session, _type=NodeTypes.DEFAULT, _id=None, node_options=None, emane=None):
if not node_options: if not node_options:
@ -492,7 +473,12 @@ def main():
# create session # create session
session_data = client.create_session() session_data = client.create_session()
client.exception_events(session_data.id, lambda x: print(x)) client.exception_events(session_data.id, lambda x: print(type(x)))
client.node_events(session_data.id, lambda x: print(type(x)))
client.session_events(session_data.id, lambda x: print(type(x)))
client.link_events(session_data.id, lambda x: print(type(x)))
client.file_events(session_data.id, lambda x: print(type(x)))
client.config_events(session_data.id, lambda x: print(type(x)))
print("created session: {}".format(session_data)) print("created session: {}".format(session_data))
print("default services: {}".format(client.get_service_defaults(session_data.id))) print("default services: {}".format(client.get_service_defaults(session_data.id)))
print("emane models: {}".format(client.get_emane_models(session_data.id))) print("emane models: {}".format(client.get_emane_models(session_data.id)))
@ -574,5 +560,5 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig() logging.basicConfig(level=logging.DEBUG)
main() main()

View file

@ -1,3 +1,4 @@
import atexit
import logging import logging
import os import os
import tempfile import tempfile
@ -194,16 +195,18 @@ def send_objects(session):
session.broadcast_config(config_data) session.broadcast_config(config_data)
# send session metadata # send session metadata
data_values = "|".join(["%s=%s" % item for item in session.metadata.get_configs().iteritems()]) configs = session.metadata.get_configs()
data_types = tuple(ConfigDataTypes.STRING.value for _ in session.metadata.get_configs()) if configs:
config_data = ConfigData( data_values = "|".join(["%s=%s" % item for item in configs.iteritems()])
message_type=0, data_types = tuple(ConfigDataTypes.STRING.value for _ in session.metadata.get_configs())
object=session.metadata.name, config_data = ConfigData(
type=ConfigFlags.NONE.value, message_type=0,
data_types=data_types, object=session.metadata.name,
data_values=data_values type=ConfigFlags.NONE.value,
) data_types=data_types,
session.broadcast_config(config_data) data_values=data_values
)
session.broadcast_config(config_data)
logging.debug("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data)) logging.debug("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data))
@ -212,6 +215,18 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
def __init__(self, coreemu): def __init__(self, coreemu):
super(CoreApiServer, self).__init__() super(CoreApiServer, self).__init__()
self.coreemu = coreemu self.coreemu = coreemu
self.running = True
atexit.register(self._exit_handler)
def _exit_handler(self):
logging.debug("catching exit, stop running")
self.running = False
def _is_running(self, context):
return self.running and context.is_active()
def _cancel_stream(self, context):
context.abort(grpc.StatusCode.CANCELLED, "server stopping")
def get_session(self, _id, context): def get_session(self, _id, context):
session = self.coreemu.sessions.get(_id) session = self.coreemu.sessions.get(_id)
@ -386,7 +401,7 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
queue = Queue() queue = Queue()
session.node_handlers.append(lambda x: queue.put(x)) session.node_handlers.append(lambda x: queue.put(x))
while context.is_active(): while self._is_running(context):
try: try:
node = queue.get(timeout=1) node = queue.get(timeout=1)
node_event = core_pb2.NodeEvent() node_event = core_pb2.NodeEvent()
@ -407,12 +422,14 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
except Empty: except Empty:
continue continue
self._cancel_stream(context)
def LinkEvents(self, request, context): def LinkEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.link_handlers.append(lambda x: queue.put(x)) session.link_handlers.append(lambda x: queue.put(x))
while context.is_active(): while self._is_running(context):
try: try:
event = queue.get(timeout=1) event = queue.get(timeout=1)
link_event = core_pb2.LinkEvent() link_event = core_pb2.LinkEvent()
@ -467,12 +484,14 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
except Empty: except Empty:
continue continue
self._cancel_stream(context)
def SessionEvents(self, request, context): def SessionEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.event_handlers.append(lambda x: queue.put(x)) session.event_handlers.append(lambda x: queue.put(x))
while context.is_active(): while self._is_running(context):
try: try:
event = queue.get(timeout=1) event = queue.get(timeout=1)
session_event = core_pb2.SessionEvent() session_event = core_pb2.SessionEvent()
@ -492,12 +511,14 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
except Empty: except Empty:
continue continue
self._cancel_stream(context)
def ConfigEvents(self, request, context): def ConfigEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.config_handlers.append(lambda x: queue.put(x)) session.config_handlers.append(lambda x: queue.put(x))
while context.is_active(): while self._is_running(context):
try: try:
event = queue.get(timeout=1) event = queue.get(timeout=1)
config_event = core_pb2.ConfigEvent() config_event = core_pb2.ConfigEvent()
@ -522,12 +543,14 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
except Empty: except Empty:
continue continue
self._cancel_stream(context)
def ExceptionEvents(self, request, context): def ExceptionEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.exception_handlers.append(lambda x: queue.put(x)) session.exception_handlers.append(lambda x: queue.put(x))
while context.is_active(): while self._is_running(context):
try: try:
event = queue.get(timeout=1) event = queue.get(timeout=1)
exception_event = core_pb2.ExceptionEvent() exception_event = core_pb2.ExceptionEvent()
@ -545,12 +568,14 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
except Empty: except Empty:
continue continue
self._cancel_stream(context)
def FileEvents(self, request, context): def FileEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.file_handlers.append(lambda x: queue.put(x)) session.file_handlers.append(lambda x: queue.put(x))
while context.is_active(): while self._is_running(context):
try: try:
event = queue.get(timeout=1) event = queue.get(timeout=1)
file_event = core_pb2.FileEvent() file_event = core_pb2.FileEvent()
@ -571,6 +596,8 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
except Empty: except Empty:
continue continue
self._cancel_stream(context)
def CreateNode(self, request, context): def CreateNode(self, request, context):
logging.debug("create node: %s", request) logging.debug("create node: %s", request)
session = self.get_session(request.session, context) session = self.get_session(request.session, context)