grpc added link events

This commit is contained in:
bharnden 2019-03-18 21:46:27 -07:00
parent 8ee1db5dc8
commit e282b3b8f8
3 changed files with 186 additions and 2 deletions

View file

@ -1,4 +1,5 @@
from __future__ import print_function
import logging
import os
import threading
@ -83,6 +84,18 @@ class CoreApiClient(object):
thread.daemon = True
thread.start()
def link_events(self, _id, handler):
request = core_pb2.LinkEventsRequest()
request.id = _id
def listen():
for event in self.stub.LinkEvents(request):
handler(event)
thread = threading.Thread(target=listen)
thread.daemon = True
thread.start()
def session_events(self, _id, handler):
request = core_pb2.SessionEventsRequest()
request.id = _id

View file

@ -3,17 +3,20 @@ import os
import tempfile
import time
from Queue import Queue
from itertools import repeat
import grpc
from concurrent import futures
import core_pb2
import core_pb2_grpc
from core.conf import ConfigShim
from core.data import ConfigData, FileData
from core.emulator.emudata import NodeOptions, InterfaceData, LinkOptions
from core.enumerations import NodeTypes, EventTypes, LinkTypes
from core.enumerations import NodeTypes, EventTypes, LinkTypes, MessageFlags, ConfigFlags, ConfigDataTypes
from core.misc import nodeutils
from core.mobility import BasicRangeModel, Ns2ScriptedMobility
from core.service import ServiceManager
from core.service import ServiceManager, ServiceShim
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
@ -105,6 +108,106 @@ def convert_link(session, link_data, link):
)
def send_objects(session):
time.sleep(1)
# find all nodes and links
nodes_data = []
links_data = []
with session._objects_lock:
for obj in session.objects.itervalues():
node_data = obj.data(message_type=MessageFlags.ADD.value)
if node_data:
nodes_data.append(node_data)
node_links = obj.all_link_data(flags=MessageFlags.ADD.value)
for link_data in node_links:
links_data.append(link_data)
# send all nodes first, so that they will exist for any links
for node_data in nodes_data:
session.broadcast_node(node_data)
for link_data in links_data:
session.broadcast_link(link_data)
# send mobility model info
for node_id in session.mobility.nodes():
for model_name, config in session.mobility.get_all_configs(node_id).iteritems():
model_class = session.mobility.models[model_name]
logging.debug("mobility config: node(%s) class(%s) values(%s)", node_id, model_class, config)
config_data = ConfigShim.config_data(0, node_id, ConfigFlags.UPDATE.value, model_class, config)
session.broadcast_config(config_data)
# send emane model info
for node_id in session.emane.nodes():
for model_name, config in session.emane.get_all_configs(node_id).iteritems():
model_class = session.emane.models[model_name]
logging.debug("emane config: node(%s) class(%s) values(%s)", node_id, model_class, config)
config_data = ConfigShim.config_data(0, node_id, ConfigFlags.UPDATE.value, model_class, config)
session.broadcast_config(config_data)
# service customizations
service_configs = session.services.all_configs()
for node_id, service in service_configs:
opaque = "service:%s" % service.name
data_types = tuple(repeat(ConfigDataTypes.STRING.value, len(ServiceShim.keys)))
node = session.get_object(node_id)
values = ServiceShim.tovaluelist(node, service)
config_data = ConfigData(
message_type=0,
node=node_id,
object=session.services.name,
type=ConfigFlags.UPDATE.value,
data_types=data_types,
data_values=values,
session=str(session.session_id),
opaque=opaque
)
session.broadcast_config(config_data)
for file_name, config_data in session.services.all_files(service):
file_data = FileData(
message_type=MessageFlags.ADD.value,
node=node_id,
name=str(file_name),
type=opaque,
data=str(config_data)
)
session.broadcast_file(file_data)
# TODO: send location info
# send hook scripts
for state in sorted(session._hooks.keys()):
for file_name, config_data in session._hooks[state]:
file_data = FileData(
message_type=MessageFlags.ADD.value,
name=str(file_name),
type="hook:%s" % state,
data=str(config_data)
)
session.broadcast_file(file_data)
# send session configuration
session_config = session.options.get_configs()
config_data = ConfigShim.config_data(0, None, ConfigFlags.UPDATE.value, session.options, session_config)
session.broadcast_config(config_data)
# send session metadata
data_values = "|".join(["%s=%s" % item for item in session.metadata.get_configs().iteritems()])
data_types = tuple(ConfigDataTypes.STRING.value for _ in session.metadata.get_configs())
config_data = ConfigData(
message_type=0,
object=session.metadata.name,
type=ConfigFlags.NONE.value,
data_types=data_types,
data_values=data_values
)
session.broadcast_config(config_data)
logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data))
class CoreApiServer(core_pb2_grpc.CoreApiServicer):
def __init__(self, coreemu):
super(CoreApiServer, self).__init__()
@ -292,6 +395,63 @@ class CoreApiServer(core_pb2_grpc.CoreApiServicer):
node_event.node.services.extend(services.split("|"))
yield node_event
def LinkEvents(self, request, context):
session = self.get_session(request.id, context)
queue = Queue()
session.link_handlers.append(lambda x: queue.put(x))
while True:
event = queue.get()
link_event = core_pb2.LinkEvent()
if event.interface1_id is not None:
interface_one = link_event.link.interface_one
update_proto(
interface_one,
id=event.interface1_id,
name=event.interface1_name,
mac=convert_value(event.interface1_mac),
ip4=convert_value(event.interface1_ip4),
ip4mask=event.interface1_ip4_mask,
ip6=convert_value(event.interface1_ip6),
ip6mask=event.interface1_ip6_mask,
)
if event.interface2_id is not None:
interface_two = link_event.link.interface_two
update_proto(
interface_two,
id=event.interface2_id,
name=event.interface2_name,
mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4),
ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6),
ip6mask=event.interface2_ip6_mask,
)
link_event.message_type = event.message_type
update_proto(
link_event.link,
type=event.link_type,
node_one=event.node1_id,
node_two=event.node2_id
)
update_proto(
link_event.link.options,
opaque=event.opaque,
jitter=event.jitter,
key=event.key,
mburst=event.mburst,
mer=event.mer,
per=event.per,
bandwidth=event.bandwidth,
burst=event.burst,
delay=event.delay,
dup=event.dup,
unidirectional=event.unidirectional
)
yield link_event
def SessionEvents(self, request, context):
session = self.get_session(request.id, context)
queue = Queue()

View file

@ -26,6 +26,8 @@ service CoreApi {
// event streams
rpc NodeEvents (NodeEventsRequest) returns (stream NodeEvent) {
}
rpc LinkEvents (LinkEventsRequest) returns (stream LinkEvent) {
}
rpc SessionEvents (SessionEventsRequest) returns (stream SessionEvent) {
}
rpc ConfigEvents (ConfigEventsRequest) returns (stream ConfigEvent) {
@ -205,6 +207,15 @@ message NodeEvent {
Node node = 1;
}
message LinkEventsRequest {
int32 id = 1;
}
message LinkEvent {
MessageType message_type = 1;
Link link = 2;
}
message SessionEventsRequest {
int32 id = 1;
}