corefx properly cancel server stream from client for session events

This commit is contained in:
Blake J. Harnden 2019-05-31 16:53:16 -07:00
parent b075181796
commit 43e18d820f

View file

@ -1129,36 +1129,42 @@ public class CoreGrpcClient implements ICoreClient {
Iterator<CoreProto.Event> events = blockingStub.events(request); Iterator<CoreProto.Event> events = blockingStub.events(request);
executorService.submit(() -> { executorService.submit(() -> {
try { Context.CancellableContext context = Context.current().withCancellation();
while (handlingEvents) { context.run(() -> {
CoreProto.Event event = events.next(); try {
logger.info("handling event: {}", event); while (handlingEvents) {
switch (event.getEventTypeCase()) { CoreProto.Event event = events.next();
case SESSION_EVENT: logger.info("handling event: {}", event);
handleSessionEvents(controller, event.getSessionEvent()); switch (event.getEventTypeCase()) {
break; case SESSION_EVENT:
case NODE_EVENT: handleSessionEvents(controller, event.getSessionEvent());
handleNodeEvents(controller, event.getNodeEvent()); break;
break; case NODE_EVENT:
case LINK_EVENT: handleNodeEvents(controller, event.getNodeEvent());
handleLinkEvents(controller, event.getLinkEvent()); break;
break; case LINK_EVENT:
case CONFIG_EVENT: handleLinkEvents(controller, event.getLinkEvent());
handleConfigEvents(controller, event.getConfigEvent()); break;
break; case CONFIG_EVENT:
case EXCEPTION_EVENT: handleConfigEvents(controller, event.getConfigEvent());
handleExceptionEvents(controller, event.getExceptionEvent()); break;
break; case EXCEPTION_EVENT:
case FILE_EVENT: handleExceptionEvents(controller, event.getExceptionEvent());
handleFileEvents(controller, event.getFileEvent()); break;
break; case FILE_EVENT:
default: handleFileEvents(controller, event.getFileEvent());
logger.error("unknown event type: {}", event.getEventTypeCase()); break;
default:
logger.error("unknown event type: {}", event.getEventTypeCase());
}
} }
} catch (StatusRuntimeException ex) {
logger.error("error handling session events", ex);
} finally {
context.cancel(null);
context.close();
} }
} catch (StatusRuntimeException ex) { });
logger.error("error handling session events", ex);
}
}); });
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
throw new IOException("setup event handlers error", ex); throw new IOException("setup event handlers error", ex);