diff --git a/coretk/coretk/coreclient.py b/coretk/coretk/coreclient.py index bff47501..776e1750 100644 --- a/coretk/coretk/coreclient.py +++ b/coretk/coretk/coreclient.py @@ -242,27 +242,17 @@ class CoreClient: mapped_config = response.configs[_id] self.wlan_configs[_id] = mapped_config.config - # save and retrieve data, needed for session nodes - for node in session.nodes: - # get node service config and file config - # retrieve service configurations data for default nodes - if node.type == core_pb2.NodeType.DEFAULT: - for service in node.services: - response = self.client.get_node_service( - self.session_id, node.id, service - ) - if node.id not in self.service_configs: - self.service_configs[node.id] = {} - self.service_configs[node.id][service] = response.service - for file in response.service.configs: - response = self.client.get_node_service_file( - self.session_id, node.id, service, file - ) - if node.id not in self.file_configs: - self.file_configs[node.id] = {} - if service not in self.file_configs[node.id]: - self.file_configs[node.id][service] = {} - self.file_configs[node.id][service][file] = response.data + # get service configurations + response = self.client.get_node_service_configs(self.session_id) + for config in response.configs: + service_configs = self.service_configs.setdefault(config.node_id, {}) + service_configs[config.service] = config.data + logging.info("service file configs: %s", config.files) + for file_name in config.files: + file_configs = self.file_configs.setdefault(config.node_id, {}) + files = file_configs.setdefault(config.service, {}) + data = config.files[file_name] + files[file_name] = data # draw session self.app.canvas.reset_and_redraw(session) diff --git a/coretk/coretk/dialogs/serviceconfiguration.py b/coretk/coretk/dialogs/serviceconfiguration.py index 03c74ab2..a53c2aa1 100644 --- a/coretk/coretk/dialogs/serviceconfiguration.py +++ b/coretk/coretk/dialogs/serviceconfiguration.py @@ -323,7 +323,7 @@ class ServiceConfiguration(Dialog): button = ttk.Button(frame, text="Apply", command=self.click_apply) button.grid(row=0, column=0, sticky="ew", padx=PADX) button = ttk.Button( - frame, text="Dafults", command=self.click_defaults, state="disabled" + frame, text="Defaults", command=self.click_defaults, state="disabled" ) button.grid(row=0, column=1, sticky="ew", padx=PADX) button = ttk.Button( diff --git a/daemon/core/api/grpc/client.py b/daemon/core/api/grpc/client.py index 7887d5e8..fbafbb44 100644 --- a/daemon/core/api/grpc/client.py +++ b/daemon/core/api/grpc/client.py @@ -378,16 +378,17 @@ class CoreGrpcClient: ) return self.stub.AddSessionServer(request) - def events(self, session_id, handler): + def events(self, session_id, handler, events=None): """ Listen for session events. :param int session_id: id of session - :param handler: handler for every event + :param handler: handler for received events + :param list events: events to listen to, defaults to all :return: nothing :raises grpc.RpcError: when session doesn't exist """ - request = core_pb2.EventsRequest(session_id=session_id) + request = core_pb2.EventsRequest(session_id=session_id, events=events) stream = self.stub.Events(request) start_streamer(stream, handler) @@ -730,6 +731,18 @@ class CoreGrpcClient: ) return self.stub.SetServiceDefaults(request) + def get_node_service_configs(self, session_id): + """ + Get service data for a node. + + :param int session_id: session id + :return: response with all node service configs + :rtype: core_pb2.GetNodeServiceConfigsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetNodeServiceConfigsRequest(session_id=session_id) + return self.stub.GetNodeServiceConfigs(request) + def get_node_service(self, session_id, node_id, service): """ Get service data for a node. diff --git a/daemon/core/api/grpc/events.py b/daemon/core/api/grpc/events.py new file mode 100644 index 00000000..d7a3094e --- /dev/null +++ b/daemon/core/api/grpc/events.py @@ -0,0 +1,268 @@ +import logging +from queue import Empty, Queue + +from core.api.grpc import core_pb2 +from core.api.grpc.grpcutils import convert_value +from core.emulator.data import ( + ConfigData, + EventData, + ExceptionData, + FileData, + LinkData, + NodeData, +) + + +def handle_node_event(event): + """ + Handle node event when there is a node event + + :param core.emulator.data.NodeData event: node data + :return: node event that contains node id, name, model, position, and services + :rtype: core.api.grpc.core_pb2.NodeEvent + """ + position = core_pb2.Position(x=event.x_position, y=event.y_position) + services = event.services or "" + services = services.split("|") + node_proto = core_pb2.Node( + id=event.id, + name=event.name, + model=event.model, + position=position, + services=services, + ) + return core_pb2.NodeEvent(node=node_proto, source=event.source) + + +def handle_link_event(event): + """ + Handle link event when there is a link event + + :param core.emulator.data.LinkData event: link data + :return: link event that has message type and link information + :rtype: core.api.grpc.core_pb2.LinkEvent + """ + interface_one = None + if event.interface1_id is not None: + interface_one = core_pb2.Interface( + id=event.interface1_id, + name=event.interface1_name, + mac=convert_value(event.interface1_mac), + ip4=convert_value(event.interface1_ip4), + ip4mask=event.interface1_ip4_mask, + ip6=convert_value(event.interface1_ip6), + ip6mask=event.interface1_ip6_mask, + ) + + interface_two = None + if event.interface2_id is not None: + interface_two = core_pb2.Interface( + id=event.interface2_id, + name=event.interface2_name, + mac=convert_value(event.interface2_mac), + ip4=convert_value(event.interface2_ip4), + ip4mask=event.interface2_ip4_mask, + ip6=convert_value(event.interface2_ip6), + ip6mask=event.interface2_ip6_mask, + ) + + options = core_pb2.LinkOptions( + opaque=event.opaque, + jitter=event.jitter, + key=event.key, + mburst=event.mburst, + mer=event.mer, + per=event.per, + bandwidth=event.bandwidth, + burst=event.burst, + delay=event.delay, + dup=event.dup, + unidirectional=event.unidirectional, + ) + link = core_pb2.Link( + type=event.link_type, + node_one_id=event.node1_id, + node_two_id=event.node2_id, + interface_one=interface_one, + interface_two=interface_two, + options=options, + ) + return core_pb2.LinkEvent(message_type=event.message_type, link=link) + + +def handle_session_event(event): + """ + Handle session event when there is a session event + + :param core.emulator.data.EventData event: event data + :return: session event + :rtype: core.api.grpc.core_pb2.SessionEvent + """ + event_time = event.time + if event_time is not None: + event_time = float(event_time) + return core_pb2.SessionEvent( + node_id=event.node, + event=event.event_type, + name=event.name, + data=event.data, + time=event_time, + session_id=event.session, + ) + + +def handle_config_event(event): + """ + Handle configuration event when there is configuration event + + :param core.emulator.data.ConfigData event: configuration data + :return: configuration event + :rtype: core.api.grpc.core_pb2.ConfigEvent + """ + session_id = None + if event.session is not None: + session_id = int(event.session) + return core_pb2.ConfigEvent( + message_type=event.message_type, + node_id=event.node, + object=event.object, + type=event.type, + captions=event.captions, + bitmap=event.bitmap, + data_values=event.data_values, + possible_values=event.possible_values, + groups=event.groups, + session_id=session_id, + interface=event.interface_number, + network_id=event.network_id, + opaque=event.opaque, + data_types=event.data_types, + ) + + +def handle_exception_event(event): + """ + Handle exception event when there is exception event + + :param core.emulator.data.ExceptionData event: exception data + :return: exception event + :rtype: core.api.grpc.core_pb2.ExceptionEvent + """ + return core_pb2.ExceptionEvent( + node_id=event.node, + session_id=int(event.session), + level=event.level, + source=event.source, + date=event.date, + text=event.text, + opaque=event.opaque, + ) + + +def handle_file_event(event): + """ + Handle file event + + :param core.emulator.data.FileData event: file data + :return: file event + :rtype: core.api.grpc.core_pb2.FileEvent + """ + return core_pb2.FileEvent( + message_type=event.message_type, + node_id=event.node, + name=event.name, + mode=event.mode, + number=event.number, + type=event.type, + source=event.source, + session_id=event.session, + data=event.data, + compressed_data=event.compressed_data, + ) + + +class EventStreamer: + """ + Processes session events to generate grpc events. + """ + + def __init__(self, session, event_types): + """ + Create a EventStreamer instance. + + :param core.emulator.session.Session session: session to process events for + :param set event_types: types of events to process + """ + self.session = session + self.event_types = event_types + self.queue = Queue() + self.add_handlers() + + def add_handlers(self): + """ + Add a session event handler for desired event types. + + :return: nothing + """ + if core_pb2.EventType.NODE in self.event_types: + self.session.node_handlers.append(self.queue.put) + if core_pb2.EventType.LINK in self.event_types: + self.session.link_handlers.append(self.queue.put) + if core_pb2.EventType.CONFIG in self.event_types: + self.session.config_handlers.append(self.queue.put) + if core_pb2.EventType.FILE in self.event_types: + self.session.file_handlers.append(self.queue.put) + if core_pb2.EventType.EXCEPTION in self.event_types: + self.session.exception_handlers.append(self.queue.put) + if core_pb2.EventType.SESSION in self.event_types: + self.session.event_handlers.append(self.queue.put) + + def process(self): + """ + Process the next event in the queue. + + :return: grpc event, or None when invalid event or queue timeout + :rtype: core.api.grpc.core_pb2.Event + """ + event = core_pb2.Event() + try: + data = self.queue.get(timeout=1) + if isinstance(data, NodeData): + event.node_event.CopyFrom(handle_node_event(data)) + elif isinstance(data, LinkData): + event.link_event.CopyFrom(handle_link_event(data)) + elif isinstance(data, EventData): + event.session_event.CopyFrom(handle_session_event(data)) + elif isinstance(data, ConfigData): + event.config_event.CopyFrom(handle_config_event(data)) + # TODO: remove when config events are fixed + event.config_event.session_id = self.session.id + elif isinstance(data, ExceptionData): + event.exception_event.CopyFrom(handle_exception_event(data)) + elif isinstance(data, FileData): + event.file_event.CopyFrom(handle_file_event(data)) + else: + logging.error("unknown event: %s", data) + event = None + except Empty: + event = None + return event + + def remove_handlers(self): + """ + Remove session event handlers for events being watched. + + :return: nothing + """ + if core_pb2.EventType.NODE in self.event_types: + self.session.node_handlers.remove(self.queue.put) + if core_pb2.EventType.LINK in self.event_types: + self.session.link_handlers.remove(self.queue.put) + if core_pb2.EventType.CONFIG in self.event_types: + self.session.config_handlers.remove(self.queue.put) + if core_pb2.EventType.FILE in self.event_types: + self.session.file_handlers.remove(self.queue.put) + if core_pb2.EventType.EXCEPTION in self.event_types: + self.session.exception_handlers.remove(self.queue.put) + if core_pb2.EventType.SESSION in self.event_types: + self.session.event_handlers.remove(self.queue.put) diff --git a/daemon/core/api/grpc/grpcutils.py b/daemon/core/api/grpc/grpcutils.py index 7df86a18..4ea752fd 100644 --- a/daemon/core/api/grpc/grpcutils.py +++ b/daemon/core/api/grpc/grpcutils.py @@ -352,3 +352,25 @@ def service_configuration(session, config): service.startup = tuple(config.startup) service.validate = tuple(config.validate) service.shutdown = tuple(config.shutdown) + + +def get_service_configuration(service): + """ + Convenience for converting a service to service data proto. + + :param service: service to get proto data for + :return: service proto data + :rtype: core.api.grpc.core_pb2.NodeServiceData + """ + return core_pb2.NodeServiceData( + executables=service.executables, + dependencies=service.dependencies, + dirs=service.dirs, + configs=service.configs, + startup=service.startup, + validate=service.validate, + validation_mode=service.validation_mode.value, + validation_timer=service.validation_timer, + shutdown=service.shutdown, + meta=service.meta, + ) diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index e552e1a4..1ada5267 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -5,27 +5,19 @@ import re import tempfile import time from concurrent import futures -from queue import Empty, Queue import grpc from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils +from core.api.grpc.events import EventStreamer from core.api.grpc.grpcutils import ( - convert_value, get_config_options, get_emane_model_id, get_links, get_net_stats, ) from core.emane.nodes import EmaneNet -from core.emulator.data import ( - ConfigData, - EventData, - ExceptionData, - FileData, - LinkData, - NodeData, -) +from core.emulator.data import LinkData from core.emulator.emudata import LinkOptions, NodeOptions from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags from core.errors import CoreCommandError, CoreError @@ -439,210 +431,19 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): def Events(self, request, context): session = self.get_session(request.session_id, context) - queue = Queue() - session.node_handlers.append(queue.put) - session.link_handlers.append(queue.put) - session.config_handlers.append(queue.put) - session.file_handlers.append(queue.put) - session.exception_handlers.append(queue.put) - session.event_handlers.append(queue.put) + event_types = set(request.events) + if not event_types: + event_types = set(core_pb2.EventType.Enum.values()) + streamer = EventStreamer(session, event_types) while self._is_running(context): - event = core_pb2.Event() - try: - data = queue.get(timeout=1) - if isinstance(data, NodeData): - event.node_event.CopyFrom(self._handle_node_event(data)) - elif isinstance(data, LinkData): - event.link_event.CopyFrom(self._handle_link_event(data)) - elif isinstance(data, EventData): - event.session_event.CopyFrom(self._handle_session_event(data)) - elif isinstance(data, ConfigData): - event.config_event.CopyFrom(self._handle_config_event(data)) - # TODO: remove when config events are fixed - event.config_event.session_id = session.id - elif isinstance(data, ExceptionData): - event.exception_event.CopyFrom(self._handle_exception_event(data)) - elif isinstance(data, FileData): - event.file_event.CopyFrom(self._handle_file_event(data)) - else: - logging.error("unknown event: %s", data) - continue - + event = streamer.process() + if event: yield event - except Empty: - continue - session.node_handlers.remove(queue.put) - session.link_handlers.remove(queue.put) - session.config_handlers.remove(queue.put) - session.file_handlers.remove(queue.put) - session.exception_handlers.remove(queue.put) - session.event_handlers.remove(queue.put) + streamer.remove_handlers() self._cancel_stream(context) - def _handle_node_event(self, event): - """ - Handle node event when there is a node event - - :param core.emulator.data.NodeData event: node data - :return: node event that contains node id, name, model, position, and services - :rtype: core.api.grpc.core_pb2.NodeEvent - """ - position = core_pb2.Position(x=event.x_position, y=event.y_position) - services = event.services or "" - services = services.split("|") - node_proto = core_pb2.Node( - id=event.id, - name=event.name, - model=event.model, - position=position, - services=services, - ) - return core_pb2.NodeEvent(node=node_proto, source=event.source) - - def _handle_link_event(self, event): - """ - Handle link event when there is a link event - - :param core.emulator.data.LinkData event: link data - :return: link event that has message type and link information - :rtype: core.api.grpc.core_pb2.LinkEvent - """ - interface_one = None - if event.interface1_id is not None: - interface_one = core_pb2.Interface( - id=event.interface1_id, - name=event.interface1_name, - mac=convert_value(event.interface1_mac), - ip4=convert_value(event.interface1_ip4), - ip4mask=event.interface1_ip4_mask, - ip6=convert_value(event.interface1_ip6), - ip6mask=event.interface1_ip6_mask, - ) - - interface_two = None - if event.interface2_id is not None: - interface_two = core_pb2.Interface( - id=event.interface2_id, - name=event.interface2_name, - mac=convert_value(event.interface2_mac), - ip4=convert_value(event.interface2_ip4), - ip4mask=event.interface2_ip4_mask, - ip6=convert_value(event.interface2_ip6), - ip6mask=event.interface2_ip6_mask, - ) - - options = core_pb2.LinkOptions( - opaque=event.opaque, - jitter=event.jitter, - key=event.key, - mburst=event.mburst, - mer=event.mer, - per=event.per, - bandwidth=event.bandwidth, - burst=event.burst, - delay=event.delay, - dup=event.dup, - unidirectional=event.unidirectional, - ) - link = core_pb2.Link( - type=event.link_type, - node_one_id=event.node1_id, - node_two_id=event.node2_id, - interface_one=interface_one, - interface_two=interface_two, - options=options, - ) - return core_pb2.LinkEvent(message_type=event.message_type, link=link) - - def _handle_session_event(self, event): - """ - Handle session event when there is a session event - - :param core.emulator.data.EventData event: event data - :return: session event - :rtype: core.api.grpc.core_pb2.SessionEvent - """ - event_time = event.time - if event_time is not None: - event_time = float(event_time) - return core_pb2.SessionEvent( - node_id=event.node, - event=event.event_type, - name=event.name, - data=event.data, - time=event_time, - session_id=event.session, - ) - - def _handle_config_event(self, event): - """ - Handle configuration event when there is configuration event - - :param core.emulator.data.ConfigData event: configuration data - :return: configuration event - :rtype: core.api.grpc.core_pb2.ConfigEvent - """ - session_id = None - if event.session is not None: - session_id = int(event.session) - return core_pb2.ConfigEvent( - message_type=event.message_type, - node_id=event.node, - object=event.object, - type=event.type, - captions=event.captions, - bitmap=event.bitmap, - data_values=event.data_values, - possible_values=event.possible_values, - groups=event.groups, - session_id=session_id, - interface=event.interface_number, - network_id=event.network_id, - opaque=event.opaque, - data_types=event.data_types, - ) - - def _handle_exception_event(self, event): - """ - Handle exception event when there is exception event - - :param core.emulator.data.ExceptionData event: exception data - :return: exception event - :rtype: core.api.grpc.core_pb2.ExceptionEvent - """ - return core_pb2.ExceptionEvent( - node_id=event.node, - session_id=int(event.session), - level=event.level, - source=event.source, - date=event.date, - text=event.text, - opaque=event.opaque, - ) - - def _handle_file_event(self, event): - """ - Handle file event - - :param core.emulator.data.FileData event: file data - :return: file event - :rtype: core.api.grpc.core_pb2.FileEvent - """ - return core_pb2.FileEvent( - message_type=event.message_type, - node_id=event.node, - name=event.name, - mode=event.mode, - number=event.number, - type=event.type, - source=event.source, - session_id=event.session, - data=event.data, - compressed_data=event.compressed_data, - ) - def Throughputs(self, request, context): """ Calculate average throughput after every certain amount of delay time @@ -1116,6 +917,32 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): ] = service_defaults.services return core_pb2.SetServiceDefaultsResponse(result=True) + def GetNodeServiceConfigs(self, request, context): + """ + Retrieve all node service configurations. + + :param core.api.grpc.core_pb2.GetNodeServiceConfigsRequest request: + get-node-service request + :param grpc.ServicerContext context: context object + :return: all node service configs response + :rtype: core.api.grpc.core_pb2.GetNodeServiceConfigsResponse + """ + logging.debug("get node service configs: %s", request) + session = self.get_session(request.session_id, context) + configs = [] + for node_id, service_configs in session.services.custom_services.items(): + for name in service_configs: + service = session.services.get_service(node_id, name) + service_proto = grpcutils.get_service_configuration(service) + config = core_pb2.GetNodeServiceConfigsResponse.ServiceConfig( + node_id=node_id, + service=name, + data=service_proto, + files=service.config_data, + ) + configs.append(config) + return core_pb2.GetNodeServiceConfigsResponse(configs=configs) + def GetNodeService(self, request, context): """ Retrieve a requested service from a node @@ -1131,18 +958,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): service = session.services.get_service( request.node_id, request.service, default_service=True ) - service_proto = core_pb2.NodeServiceData( - executables=service.executables, - dependencies=service.dependencies, - dirs=service.dirs, - configs=service.configs, - startup=service.startup, - validate=service.validate, - validation_mode=service.validation_mode.value, - validation_timer=service.validation_timer, - shutdown=service.shutdown, - meta=service.meta, - ) + service_proto = grpcutils.get_service_configuration(service) return core_pb2.GetNodeServiceResponse(service=service_proto) def GetNodeServiceFile(self, request, context): diff --git a/daemon/core/services/coreservices.py b/daemon/core/services/coreservices.py index b51eb715..684ccbb9 100644 --- a/daemon/core/services/coreservices.py +++ b/daemon/core/services/coreservices.py @@ -461,8 +461,8 @@ class CoreServices: :param core.netns.vnode.LxcNode node: node to start services on :return: nothing """ - funcs = [] boot_paths = ServiceDependencies(node.services).boot_paths() + funcs = [] for boot_path in boot_paths: args = (node, boot_path) funcs.append((self._start_boot_paths, args, {})) @@ -484,6 +484,7 @@ class CoreServices: " -> ".join([x.name for x in boot_path]), ) for service in boot_path: + service = self.get_service(node.id, service.name, default_service=True) try: self.boot_service(node, service) except Exception: @@ -744,7 +745,9 @@ class CoreServices: config_files = service.get_configs(node) for file_name in config_files: - logging.debug("generating service config: %s", file_name) + logging.debug( + "generating service config custom(%s): %s", service.custom, file_name + ) if service.custom: cfg = service.config_data.get(file_name) if cfg is None: diff --git a/daemon/proto/core/api/grpc/core.proto b/daemon/proto/core/api/grpc/core.proto index 5f6a964b..b0a6c0d6 100644 --- a/daemon/proto/core/api/grpc/core.proto +++ b/daemon/proto/core/api/grpc/core.proto @@ -89,6 +89,8 @@ service CoreApi { } rpc SetServiceDefaults (SetServiceDefaultsRequest) returns (SetServiceDefaultsResponse) { } + rpc GetNodeServiceConfigs (GetNodeServiceConfigsRequest) returns (GetNodeServiceConfigsResponse) { + } rpc GetNodeService (GetNodeServiceRequest) returns (GetNodeServiceResponse) { } rpc GetNodeServiceFile (GetNodeServiceFileRequest) returns (GetNodeServiceFileResponse) { @@ -266,6 +268,7 @@ message AddSessionServerResponse { message EventsRequest { int32 session_id = 1; + repeated EventType.Enum events = 2; } message ThroughputsRequest { @@ -537,6 +540,20 @@ message SetServiceDefaultsResponse { bool result = 1; } +message GetNodeServiceConfigsRequest { + int32 session_id = 1; +} + +message GetNodeServiceConfigsResponse { + message ServiceConfig { + int32 node_id = 1; + string service = 2; + NodeServiceData data = 3; + map files = 4; + } + repeated ServiceConfig configs = 1; +} + message GetNodeServiceRequest { int32 session_id = 1; int32 node_id = 2; @@ -742,6 +759,17 @@ message ServiceFileConfig { string data = 4; } +message EventType { + enum Enum { + SESSION = 0; + NODE = 1; + LINK = 2; + CONFIG = 3; + EXCEPTION = 4; + FILE = 5; + } +} + message MessageType { enum Enum { NONE = 0; diff --git a/daemon/tests/test_grpc.py b/daemon/tests/test_grpc.py index d4df63d7..796febf7 100644 --- a/daemon/tests/test_grpc.py +++ b/daemon/tests/test_grpc.py @@ -878,6 +878,24 @@ class TestGrpc: assert response.result is True assert session.services.default_services[node_type] == services + def test_get_node_service_configs(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + service_name = "DefaultRoute" + session.services.set_service(node.id, service_name) + + # then + with client.context_connect(): + response = client.get_node_service_configs(session.id) + + # then + assert len(response.configs) == 1 + service_config = response.configs[0] + assert service_config.node_id == node.id + assert service_config.service == service_name + def test_get_node_service(self, grpc_server): # given client = CoreGrpcClient()