corefx updates to support handling streamed events from the grpc server

This commit is contained in:
Blake J. Harnden 2019-05-29 12:25:33 -07:00
parent 6b93f60056
commit ec672d209f
4 changed files with 225 additions and 50 deletions

View file

@ -154,6 +154,9 @@ public class Controller implements Initializable {
coreClient.updateSession(sessionId);
coreClient.updateState(sessionState);
// setup event handlers
coreClient.setupEventHandlers(this);
// display all nodes
logger.info("joining core session({}) state({}): {}", sessionId, sessionState, session);
for (CoreNode node : session.getNodes()) {

View file

@ -1,5 +1,6 @@
package com.core.client;
import com.core.Controller;
import com.core.client.rest.ServiceFile;
import com.core.client.rest.WlanConfig;
import com.core.data.*;
@ -115,4 +116,6 @@ public interface ICoreClient {
LocationConfig getLocationConfig() throws IOException;
boolean setLocationConfig(LocationConfig config) throws IOException;
void setupEventHandlers(Controller controller) throws IOException;
}

View file

@ -1,9 +1,11 @@
package com.core.client.grpc;
import com.core.Controller;
import com.core.client.ICoreClient;
import com.core.client.rest.ServiceFile;
import com.core.client.rest.WlanConfig;
import com.core.data.*;
import com.core.ui.dialogs.MobilityPlayerDialog;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@ -16,6 +18,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CoreGrpcClient implements ICoreClient {
private static final Logger logger = LogManager.getLogger();
@ -25,6 +29,8 @@ public class CoreGrpcClient implements ICoreClient {
private SessionState sessionState;
private CoreApiGrpc.CoreApiBlockingStub blockingStub;
private ManagedChannel channel;
private final ExecutorService executorService = Executors.newFixedThreadPool(6);
private boolean handlingEvents = false;
private CoreProto.Node nodeToProto(CoreNode node) {
CoreProto.Position position = CoreProto.Position.newBuilder()
@ -144,6 +150,60 @@ public class CoreGrpcClient implements ICoreClient {
return config;
}
private CoreNode protoToNode(CoreProto.Node protoNode) {
CoreNode node = new CoreNode(protoNode.getId());
node.setName(protoNode.getName());
node.setEmane(protoNode.getEmane());
node.setIcon(protoNode.getIcon());
node.setModel(protoNode.getModel());
node.setServices(new HashSet<>(protoNode.getServicesList()));
node.getPosition().setX((double) protoNode.getPosition().getX());
node.getPosition().setY((double) protoNode.getPosition().getY());
node.setType(protoNode.getTypeValue());
return node;
}
private CoreInterface protoToInterface(CoreProto.Interface protoInterface) {
CoreInterface coreInterface = new CoreInterface();
coreInterface.setId(protoInterface.getId());
coreInterface.setName(protoInterface.getName());
coreInterface.setMac(protoInterface.getMac());
coreInterface.setIp4(protoInterface.getIp4());
coreInterface.setIp4Mask(protoInterface.getIp4Mask());
coreInterface.setIp6(protoInterface.getIp6());
coreInterface.setIp6Mask(Integer.toString(protoInterface.getIp6Mask()));
return coreInterface;
}
private CoreLink protoToLink(CoreProto.Link linkProto) {
CoreLink link = new CoreLink();
link.setNodeOne(linkProto.getNodeOneId());
link.setNodeTwo(linkProto.getNodeTwoId());
CoreInterface interfaceOne = protoToInterface(linkProto.getInterfaceOne());
link.setInterfaceOne(interfaceOne);
CoreInterface interfaceTwo = protoToInterface(linkProto.getInterfaceTwo());
link.setInterfaceTwo(interfaceTwo);
CoreLinkOptions options = new CoreLinkOptions();
CoreProto.LinkOptions protoOptions = linkProto.getOptions();
options.setBandwidth((double) protoOptions.getBandwidth());
options.setDelay((double) protoOptions.getDelay());
options.setDup((double) protoOptions.getDup());
options.setJitter((double) protoOptions.getJitter());
options.setPer((double) protoOptions.getPer());
options.setBurst((double) protoOptions.getBurst());
if (!protoOptions.getKey().isEmpty()) {
options.setKey(Integer.parseInt(protoOptions.getKey()));
}
options.setMburst((double) protoOptions.getMburst());
options.setMer((double) protoOptions.getMer());
options.setOpaque(protoOptions.getOpaque());
options.setUnidirectional(protoOptions.getUnidirectional() ? 1 : 0);
link.setOptions(options);
return link;
}
@Override
public void setConnection(String address, int port) {
this.address = address;
@ -245,60 +305,12 @@ public class CoreGrpcClient implements ICoreClient {
}
logger.info("adding node: {}", protoNode);
CoreNode node = new CoreNode(protoNode.getId());
node.setName(protoNode.getName());
node.setEmane(protoNode.getEmane());
node.setIcon(protoNode.getIcon());
node.setModel(protoNode.getModel());
node.setServices(new HashSet<>(protoNode.getServicesList()));
node.getPosition().setX((double) protoNode.getPosition().getX());
node.getPosition().setY((double) protoNode.getPosition().getY());
node.setType(protoNode.getTypeValue());
CoreNode node = protoToNode(protoNode);
session.getNodes().add(node);
}
for (CoreProto.Link linkProto : response.getSession().getLinksList()) {
logger.info("adding link: {} - {}", linkProto.getNodeOneId(), linkProto.getNodeTwoId());
CoreLink link = new CoreLink();
link.setNodeOne(linkProto.getNodeOneId());
link.setNodeTwo(linkProto.getNodeTwoId());
CoreProto.Interface interfaceOneProto = linkProto.getInterfaceOne();
CoreInterface interfaceOne = new CoreInterface();
interfaceOne.setId(interfaceOneProto.getId());
interfaceOne.setName(interfaceOneProto.getName());
interfaceOne.setMac(interfaceOneProto.getMac());
interfaceOne.setIp4(interfaceOneProto.getIp4());
interfaceOne.setIp4Mask(interfaceOneProto.getIp4Mask());
interfaceOne.setIp6(interfaceOneProto.getIp6());
interfaceOne.setIp6Mask(Integer.toString(interfaceOneProto.getIp6Mask()));
link.setInterfaceOne(interfaceOne);
CoreProto.Interface interfaceTwoProto = linkProto.getInterfaceTwo();
CoreInterface interfaceTwo = new CoreInterface();
interfaceTwo.setId(interfaceTwoProto.getId());
interfaceTwo.setName(interfaceTwoProto.getName());
interfaceTwo.setMac(interfaceTwoProto.getMac());
interfaceTwo.setIp4(interfaceTwoProto.getIp4());
interfaceTwo.setIp4Mask(interfaceTwoProto.getIp4Mask());
interfaceTwo.setIp6(interfaceTwoProto.getIp6());
interfaceTwo.setIp6Mask(Integer.toString(interfaceTwoProto.getIp6Mask()));
link.setInterfaceTwo(interfaceTwo);
CoreLinkOptions options = new CoreLinkOptions();
CoreProto.LinkOptions protoOptions = linkProto.getOptions();
options.setBandwidth((double) protoOptions.getBandwidth());
options.setDelay((double) protoOptions.getDelay());
options.setDup((double) protoOptions.getDup());
options.setJitter((double) protoOptions.getJitter());
options.setPer((double) protoOptions.getPer());
options.setBurst((double) protoOptions.getBurst());
if (!protoOptions.getKey().isEmpty()) {
options.setKey(Integer.parseInt(protoOptions.getKey()));
}
options.setMburst((double) protoOptions.getMburst());
options.setMer((double) protoOptions.getMer());
options.setOpaque(protoOptions.getOpaque());
options.setUnidirectional(protoOptions.getUnidirectional() ? 1 : 0);
link.setOptions(options);
CoreLink link = protoToLink(linkProto);
session.getLinks().add(link);
}
session.setState(response.getSession().getStateValue());
@ -349,6 +361,7 @@ public class CoreGrpcClient implements ICoreClient {
@Override
public boolean stop() throws IOException {
handlingEvents = false;
return setState(SessionState.SHUTDOWN);
}
@ -1054,4 +1067,155 @@ public class CoreGrpcClient implements ICoreClient {
throw new IOException(ex);
}
}
@Override
public void setupEventHandlers(Controller controller) throws IOException {
logger.info("setting up event handlers");
handlingEvents = true;
try {
handleSessionEvents(controller);
handleNodeEvents(controller);
handleExceptionEvents(controller);
handleConfigEvents(controller);
handleLinkEvents(controller);
handleFileEvents(controller);
} catch (StatusRuntimeException ex) {
throw new IOException("setup event handlers error", ex);
}
}
private void handleSessionEvents(Controller controller) {
CoreProto.SessionEventsRequest request = CoreProto.SessionEventsRequest.newBuilder()
.setSessionId(sessionId)
.build();
Iterator<CoreProto.SessionEvent> events = blockingStub.sessionEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.SessionEvent event = events.next();
logger.info("session event: {}", event);
SessionState state = SessionState.get(event.getEvent());
if (state == null) {
logger.warn("unknown event type: {}", event.getEvent());
continue;
}
// session state event
if (state.getValue() <= 6) {
logger.info("event updating session state: {}", state);
updateState(state);
// mobility script event
} else if (state.getValue() <= 9) {
Integer nodeId = event.getNodeId();
String[] values = event.getData().toStringUtf8().split("\\s+");
Integer start = Integer.parseInt(values[0].split("=")[1]);
Integer end = Integer.parseInt(values[1].split("=")[1]);
logger.info(String.format("node(%s) mobility event (%s) - start(%s) stop(%s)",
nodeId, state, start, end));
logger.info("all dialogs: {}", controller.getMobilityPlayerDialogs().keySet());
MobilityPlayerDialog mobilityPlayerDialog = controller.getMobilityPlayerDialogs().get(nodeId);
mobilityPlayerDialog.event(state, start, end);
}
}
} catch (StatusRuntimeException ex) {
logger.error("error handling session events", ex);
}
});
}
private void handleNodeEvents(Controller controller) {
CoreProto.NodeEventsRequest request = CoreProto.NodeEventsRequest.newBuilder().setSessionId(sessionId)
.build();
Iterator<CoreProto.NodeEvent> events = blockingStub.nodeEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.NodeEvent event = events.next();
logger.info("node event: {}", event);
CoreNode node = protoToNode(event.getNode());
controller.getNetworkGraph().setNodeLocation(node);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling node events", ex);
}
});
}
private void handleExceptionEvents(Controller controller) {
CoreProto.ExceptionEventsRequest request = CoreProto.ExceptionEventsRequest.newBuilder()
.setSessionId(sessionId)
.build();
Iterator<CoreProto.ExceptionEvent> events = blockingStub.exceptionEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.ExceptionEvent event = events.next();
logger.info("exception event: {}", event);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling exception events", ex);
}
});
}
private void handleConfigEvents(Controller controller) {
CoreProto.ConfigEventsRequest request = CoreProto.ConfigEventsRequest.newBuilder()
.setSessionId(sessionId)
.build();
Iterator<CoreProto.ConfigEvent> events = blockingStub.configEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.ConfigEvent event = events.next();
logger.info("config event: {}", event);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling config events", ex);
}
});
}
private void handleLinkEvents(Controller controller) {
CoreProto.LinkEventsRequest request = CoreProto.LinkEventsRequest.newBuilder()
.setSessionId(sessionId)
.build();
Iterator<CoreProto.LinkEvent> events = blockingStub.linkEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.LinkEvent event = events.next();
logger.info("link event: {}", event);
CoreLink link = protoToLink(event.getLink());
MessageFlags flag = MessageFlags.get(event.getMessageTypeValue());
if (MessageFlags.DELETE == flag) {
logger.info("delete");
controller.getNetworkGraph().removeWirelessLink(link);
} else if (MessageFlags.ADD == flag) {
link.setLoaded(true);
controller.getNetworkGraph().addLink(link);
}
controller.getNetworkGraph().getGraphViewer().repaint();
}
} catch (StatusRuntimeException ex) {
logger.error("error handling link events", ex);
}
});
}
private void handleFileEvents(Controller controller) {
CoreProto.FileEventsRequest request = CoreProto.FileEventsRequest.newBuilder()
.setSessionId(sessionId)
.build();
Iterator<CoreProto.FileEvent> events = blockingStub.fileEvents(request);
executorService.submit(() -> {
try {
while (handlingEvents) {
CoreProto.FileEvent event = events.next();
logger.info("file event: {}", event);
}
} catch (StatusRuntimeException ex) {
logger.error("error handling file events", ex);
}
});
}
}

View file

@ -1,5 +1,6 @@
package com.core.client.rest;
import com.core.Controller;
import com.core.client.ICoreClient;
import com.core.data.*;
import com.core.utils.WebUtils;
@ -412,4 +413,8 @@ public class CoreRestClient implements ICoreClient {
String url = getUrl(String.format("sessions/%s/nodes/%s/mobility/%s", sessionId, node.getId(), action));
return WebUtils.putJson(url);
}
@Override
public void setupEventHandlers(Controller controller) throws IOException {
}
}