updated corefx to use consolidated event streaming

This commit is contained in:
Blake J. Harnden 2019-05-30 13:27:54 -07:00
parent df3a8980ed
commit 4dd6dc1837

View file

@ -1073,149 +1073,103 @@ public class CoreGrpcClient implements ICoreClient {
logger.info("setting up event handlers"); logger.info("setting up event handlers");
handlingEvents = true; handlingEvents = true;
try { try {
handleSessionEvents(controller); CoreProto.EventsRequest request = CoreProto.EventsRequest.newBuilder()
handleNodeEvents(controller); .setSessionId(sessionId)
handleExceptionEvents(controller); .build();
handleConfigEvents(controller);
handleLinkEvents(controller); Iterator<CoreProto.Event> events = blockingStub.events(request);
handleFileEvents(controller); executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.Event event = events.next();
logger.info("handling event: {}", event);
switch (event.getEventTypeCase()) {
case SESSION_EVENT:
handleSessionEvents(controller, event.getSessionEvent());
break;
case NODE_EVENT:
handleNodeEvents(controller, event.getNodeEvent());
break;
case LINK_EVENT:
handleLinkEvents(controller, event.getLinkEvent());
break;
case CONFIG_EVENT:
handleConfigEvents(controller, event.getConfigEvent());
break;
case EXCEPTION_EVENT:
handleExceptionEvents(controller, event.getExceptionEvent());
break;
case FILE_EVENT:
handleFileEvents(controller, event.getFileEvent());
break;
default:
logger.error("unknown event type: {}", event.getEventTypeCase());
}
}
} catch (StatusRuntimeException ex) {
logger.error("error handling session events", ex);
}
});
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
throw new IOException("setup event handlers error", ex); throw new IOException("setup event handlers error", ex);
} }
} }
private void handleSessionEvents(Controller controller) { private void handleSessionEvents(Controller controller, CoreProto.SessionEvent event) {
CoreProto.SessionEventsRequest request = CoreProto.SessionEventsRequest.newBuilder() logger.info("session event: {}", event);
.setSessionId(sessionId) SessionState state = SessionState.get(event.getEvent());
.build(); if (state == null) {
Iterator<CoreProto.SessionEvent> events = blockingStub.sessionEvents(request); logger.warn("unknown session event: {}", event.getEvent());
executorService.submit(() -> { return;
try { }
while (handlingEvents) {
CoreProto.SessionEvent event = events.next();
logger.info("session event: {}", event);
SessionState state = SessionState.get(event.getEvent());
if (state == null) {
logger.warn("unknown event type: {}", event.getEvent());
continue;
}
// session state event // session state event
if (state.getValue() <= 6) { if (state.getValue() <= 6) {
logger.info("event updating session state: {}", state); logger.info("event updating session state: {}", state);
updateState(state); updateState(state);
// mobility script event // mobility script event
} else if (state.getValue() <= 9) { } else if (state.getValue() <= 9) {
Integer nodeId = event.getNodeId(); Integer nodeId = event.getNodeId();
String[] values = event.getData().toStringUtf8().split("\\s+"); String[] values = event.getData().toStringUtf8().split("\\s+");
Integer start = Integer.parseInt(values[0].split("=")[1]); Integer start = Integer.parseInt(values[0].split("=")[1]);
Integer end = Integer.parseInt(values[1].split("=")[1]); Integer end = Integer.parseInt(values[1].split("=")[1]);
logger.info(String.format("node(%s) mobility event (%s) - start(%s) stop(%s)", logger.info(String.format("node(%s) mobility event (%s) - start(%s) stop(%s)",
nodeId, state, start, end)); nodeId, state, start, end));
logger.info("all dialogs: {}", controller.getMobilityPlayerDialogs().keySet()); logger.info("all dialogs: {}", controller.getMobilityPlayerDialogs().keySet());
MobilityPlayerDialog mobilityPlayerDialog = controller.getMobilityPlayerDialogs().get(nodeId); MobilityPlayerDialog mobilityPlayerDialog = controller.getMobilityPlayerDialogs().get(nodeId);
mobilityPlayerDialog.event(state, start, end); mobilityPlayerDialog.event(state, start, end);
} }
}
} catch (StatusRuntimeException ex) {
logger.error("error handling session events", ex);
}
});
} }
private void handleNodeEvents(Controller controller) { private void handleNodeEvents(Controller controller, CoreProto.NodeEvent event) {
CoreProto.NodeEventsRequest request = CoreProto.NodeEventsRequest.newBuilder().setSessionId(sessionId) logger.info("node event: {}", event);
.build(); CoreNode node = protoToNode(event.getNode());
Iterator<CoreProto.NodeEvent> events = blockingStub.nodeEvents(request); controller.getNetworkGraph().setNodeLocation(node);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.NodeEvent event = events.next();
logger.info("node event: {}", event);
CoreNode node = protoToNode(event.getNode());
controller.getNetworkGraph().setNodeLocation(node);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling node events", ex);
}
});
} }
private void handleExceptionEvents(Controller controller) { private void handleExceptionEvents(Controller controller, CoreProto.ExceptionEvent event) {
CoreProto.ExceptionEventsRequest request = CoreProto.ExceptionEventsRequest.newBuilder() logger.info("exception event: {}", event);
.setSessionId(sessionId)
.build();
Iterator<CoreProto.ExceptionEvent> events = blockingStub.exceptionEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.ExceptionEvent event = events.next();
logger.info("exception event: {}", event);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling exception events", ex);
}
});
} }
private void handleConfigEvents(Controller controller) { private void handleConfigEvents(Controller controller, CoreProto.ConfigEvent event) {
CoreProto.ConfigEventsRequest request = CoreProto.ConfigEventsRequest.newBuilder() logger.info("config event: {}", event);
.setSessionId(sessionId)
.build();
Iterator<CoreProto.ConfigEvent> events = blockingStub.configEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.ConfigEvent event = events.next();
logger.info("config event: {}", event);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling config events", ex);
}
});
} }
private void handleLinkEvents(Controller controller) { private void handleLinkEvents(Controller controller, CoreProto.LinkEvent event) {
CoreProto.LinkEventsRequest request = CoreProto.LinkEventsRequest.newBuilder() logger.info("link event: {}", event);
.setSessionId(sessionId) CoreLink link = protoToLink(event.getLink());
.build(); MessageFlags flag = MessageFlags.get(event.getMessageTypeValue());
Iterator<CoreProto.LinkEvent> events = blockingStub.linkEvents(request); if (MessageFlags.DELETE == flag) {
executorService.submit(() -> { logger.info("delete");
try { controller.getNetworkGraph().removeWirelessLink(link);
while (handlingEvents) { } else if (MessageFlags.ADD == flag) {
CoreProto.LinkEvent event = events.next(); link.setLoaded(true);
logger.info("link event: {}", event); controller.getNetworkGraph().addLink(link);
CoreLink link = protoToLink(event.getLink()); }
MessageFlags flag = MessageFlags.get(event.getMessageTypeValue()); controller.getNetworkGraph().getGraphViewer().repaint();
if (MessageFlags.DELETE == flag) {
logger.info("delete");
controller.getNetworkGraph().removeWirelessLink(link);
} else if (MessageFlags.ADD == flag) {
link.setLoaded(true);
controller.getNetworkGraph().addLink(link);
}
controller.getNetworkGraph().getGraphViewer().repaint();
}
} catch (StatusRuntimeException ex) {
logger.error("error handling link events", ex);
}
});
} }
private void handleFileEvents(Controller controller) { private void handleFileEvents(Controller controller, CoreProto.FileEvent event) {
CoreProto.FileEventsRequest request = CoreProto.FileEventsRequest.newBuilder() logger.info("file event: {}", event);
.setSessionId(sessionId)
.build();
Iterator<CoreProto.FileEvent> events = blockingStub.fileEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.FileEvent event = events.next();
logger.info("file event: {}", event);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling file events", ex);
}
});
} }
} }