grpc added client disconnect detection for streams

This commit is contained in:
bharnden 2019-03-18 22:32:01 -07:00
parent e282b3b8f8
commit 84ff1f4275

View file

@ -2,7 +2,7 @@ import logging
import os import os
import tempfile import tempfile
import time import time
from Queue import Queue from Queue import Queue, Empty
from itertools import repeat from itertools import repeat
import grpc import grpc
@ -377,8 +377,9 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
queue = Queue() queue = Queue()
session.node_handlers.append(lambda x: queue.put(x)) session.node_handlers.append(lambda x: queue.put(x))
while True: while context.is_active():
node = queue.get() try:
node = queue.get(timeout=1)
node_event = core_pb2.NodeEvent() node_event = core_pb2.NodeEvent()
update_proto( update_proto(
node_event.node, node_event.node,
@ -394,14 +395,17 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
services = node.services or "" services = node.services or ""
node_event.node.services.extend(services.split("|")) node_event.node.services.extend(services.split("|"))
yield node_event yield node_event
except Empty:
continue
def LinkEvents(self, request, context): def LinkEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.link_handlers.append(lambda x: queue.put(x)) session.link_handlers.append(lambda x: queue.put(x))
while True: while context.is_active():
event = queue.get() try:
event = queue.get(timeout=1)
link_event = core_pb2.LinkEvent() link_event = core_pb2.LinkEvent()
if event.interface1_id is not None: if event.interface1_id is not None:
interface_one = link_event.link.interface_one interface_one = link_event.link.interface_one
@ -451,14 +455,17 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
unidirectional=event.unidirectional unidirectional=event.unidirectional
) )
yield link_event yield link_event
except Empty:
continue
def SessionEvents(self, request, context): def SessionEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.event_handlers.append(lambda x: queue.put(x)) session.event_handlers.append(lambda x: queue.put(x))
while True: while context.is_active():
event = queue.get() try:
event = queue.get(timeout=1)
session_event = core_pb2.SessionEvent() session_event = core_pb2.SessionEvent()
event_time = event.time event_time = event.time
if event_time is not None: if event_time is not None:
@ -473,14 +480,17 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
session=session.session_id session=session.session_id
) )
yield session_event yield session_event
except Empty:
continue
def ConfigEvents(self, request, context): def ConfigEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.config_handlers.append(lambda x: queue.put(x)) session.config_handlers.append(lambda x: queue.put(x))
while True: while context.is_active():
event = queue.get() try:
event = queue.get(timeout=1)
config_event = core_pb2.ConfigEvent() config_event = core_pb2.ConfigEvent()
update_proto( update_proto(
config_event, config_event,
@ -500,14 +510,17 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
) )
config_event.data_types.extend(event.data_types) config_event.data_types.extend(event.data_types)
yield config_event yield config_event
except Empty:
continue
def ExceptionEvents(self, request, context): def ExceptionEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.exception_handlers.append(lambda x: queue.put(x)) session.exception_handlers.append(lambda x: queue.put(x))
while True: while context.is_active():
event = queue.get() try:
event = queue.get(timeout=1)
exception_event = core_pb2.ExceptionEvent() exception_event = core_pb2.ExceptionEvent()
event_time = event.date event_time = event.date
if event_time is not None: if event_time is not None:
@ -523,14 +536,17 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
opaque=event.opaque opaque=event.opaque
) )
yield exception_event yield exception_event
except Empty:
continue
def FileEvents(self, request, context): def FileEvents(self, request, context):
session = self.get_session(request.id, context) session = self.get_session(request.id, context)
queue = Queue() queue = Queue()
session.file_handlers.append(lambda x: queue.put(x)) session.file_handlers.append(lambda x: queue.put(x))
while True: while context.is_active():
event = queue.get() try:
event = queue.get(timeout=1)
file_event = core_pb2.FileEvent() file_event = core_pb2.FileEvent()
update_proto( update_proto(
file_event, file_event,
@ -546,6 +562,8 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
compressed_data=event.compressed_data compressed_data=event.compressed_data
) )
yield file_event yield file_event
except Empty:
continue
def CreateNode(self, request, context): def CreateNode(self, request, context):
session = self.get_session(request.session, context) session = self.get_session(request.session, context)