diff --git a/daemon/core/api/grpc/client.py b/daemon/core/api/grpc/client.py index 5e1fe6a1..23a66f1f 100644 --- a/daemon/core/api/grpc/client.py +++ b/daemon/core/api/grpc/client.py @@ -5,98 +5,131 @@ gRpc client for interfacing with CORE. import logging import threading from contextlib import contextmanager -from typing import Any, Callable, Dict, Generator, Iterable, List, Optional +from pathlib import Path +from queue import Queue +from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple import grpc -from core.api.grpc import configservices_pb2, core_pb2, core_pb2_grpc +from core.api.grpc import ( + configservices_pb2, + core_pb2, + core_pb2_grpc, + emane_pb2, + mobility_pb2, + services_pb2, + wlan_pb2, + wrappers, +) from core.api.grpc.configservices_pb2 import ( GetConfigServiceDefaultsRequest, - GetConfigServiceDefaultsResponse, GetConfigServicesRequest, - GetConfigServicesResponse, GetNodeConfigServiceConfigsRequest, - GetNodeConfigServiceConfigsResponse, GetNodeConfigServiceRequest, - GetNodeConfigServiceResponse, GetNodeConfigServicesRequest, - GetNodeConfigServicesResponse, SetNodeConfigServiceRequest, - SetNodeConfigServiceResponse, ) -from core.api.grpc.core_pb2 import ExecuteScriptRequest, ExecuteScriptResponse +from core.api.grpc.core_pb2 import ExecuteScriptRequest from core.api.grpc.emane_pb2 import ( EmaneLinkRequest, - EmaneLinkResponse, - EmaneModelConfig, - EmanePathlossesRequest, - EmanePathlossesResponse, GetEmaneConfigRequest, - GetEmaneConfigResponse, GetEmaneEventChannelRequest, - GetEmaneEventChannelResponse, GetEmaneModelConfigRequest, - GetEmaneModelConfigResponse, GetEmaneModelConfigsRequest, - GetEmaneModelConfigsResponse, GetEmaneModelsRequest, - GetEmaneModelsResponse, SetEmaneConfigRequest, - SetEmaneConfigResponse, SetEmaneModelConfigRequest, - SetEmaneModelConfigResponse, ) from core.api.grpc.mobility_pb2 import ( GetMobilityConfigRequest, - GetMobilityConfigResponse, GetMobilityConfigsRequest, - GetMobilityConfigsResponse, MobilityActionRequest, - MobilityActionResponse, MobilityConfig, SetMobilityConfigRequest, - SetMobilityConfigResponse, ) from core.api.grpc.services_pb2 import ( GetNodeServiceConfigsRequest, - GetNodeServiceConfigsResponse, GetNodeServiceFileRequest, - GetNodeServiceFileResponse, GetNodeServiceRequest, - GetNodeServiceResponse, GetServiceDefaultsRequest, - GetServiceDefaultsResponse, GetServicesRequest, - GetServicesResponse, - ServiceAction, ServiceActionRequest, - ServiceActionResponse, - ServiceConfig, ServiceDefaults, - ServiceFileConfig, SetNodeServiceFileRequest, - SetNodeServiceFileResponse, SetNodeServiceRequest, - SetNodeServiceResponse, SetServiceDefaultsRequest, - SetServiceDefaultsResponse, ) from core.api.grpc.wlan_pb2 import ( GetWlanConfigRequest, - GetWlanConfigResponse, GetWlanConfigsRequest, - GetWlanConfigsResponse, SetWlanConfigRequest, - SetWlanConfigResponse, WlanConfig, WlanLinkRequest, - WlanLinkResponse, ) from core.emulator.data import IpPrefixes +from core.errors import CoreError logger = logging.getLogger(__name__) +class MoveNodesStreamer: + def __init__(self, session_id: int = None, source: str = None) -> None: + self.session_id = session_id + self.source = source + self.queue: Queue = Queue() + + def send_position(self, node_id: int, x: float, y: float) -> None: + position = wrappers.Position(x=x, y=y) + request = wrappers.MoveNodesRequest( + session_id=self.session_id, + node_id=node_id, + source=self.source, + position=position, + ) + self.send(request) + + def send_geo(self, node_id: int, lon: float, lat: float, alt: float) -> None: + geo = wrappers.Geo(lon=lon, lat=lat, alt=alt) + request = wrappers.MoveNodesRequest( + session_id=self.session_id, node_id=node_id, source=self.source, geo=geo + ) + self.send(request) + + def send(self, request: wrappers.MoveNodesRequest) -> None: + self.queue.put(request) + + def stop(self) -> None: + self.queue.put(None) + + def next(self) -> Optional[core_pb2.MoveNodesRequest]: + request: Optional[wrappers.MoveNodesRequest] = self.queue.get() + if request: + return request.to_proto() + else: + return request + + def iter(self) -> Iterable: + return iter(self.next, None) + + +class EmanePathlossesStreamer: + def __init__(self) -> None: + self.queue: Queue = Queue() + + def send(self, request: Optional[wrappers.EmanePathlossesRequest]) -> None: + self.queue.put(request) + + def next(self) -> Optional[emane_pb2.EmanePathlossesRequest]: + request: Optional[wrappers.EmanePathlossesRequest] = self.queue.get() + if request: + return request.to_proto() + else: + return request + + def iter(self): + return iter(self.next, None) + + class InterfaceHelper: """ Convenience class to help generate IP4 and IP6 addresses for gRPC clients. @@ -114,7 +147,7 @@ class InterfaceHelper: def create_iface( self, node_id: int, iface_id: int, name: str = None, mac: str = None - ) -> core_pb2.Interface: + ) -> wrappers.Interface: """ Create an interface protobuf object. @@ -125,7 +158,7 @@ class InterfaceHelper: :return: interface protobuf """ iface_data = self.prefixes.gen_iface(node_id, name, mac) - return core_pb2.Interface( + return wrappers.Interface( id=iface_id, name=iface_data.name, ip4=iface_data.ip4, @@ -136,36 +169,65 @@ class InterfaceHelper: ) -def stream_listener(stream: Any, handler: Callable[[core_pb2.Event], None]) -> None: +def throughput_listener( + stream: Any, handler: Callable[[wrappers.ThroughputsEvent], None] +) -> None: """ - Listen for stream events and provide them to the handler. + Listen for throughput events and provide them to the handler. + + :param stream: grpc stream that will provide events + :param handler: function that handles an event + :return: nothing + """ + try: + for event_proto in stream: + event = wrappers.ThroughputsEvent.from_proto(event_proto) + handler(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.CANCELLED: + logger.debug("throughput stream closed") + else: + logger.exception("throughput stream error") + + +def cpu_listener( + stream: Any, handler: Callable[[wrappers.CpuUsageEvent], None] +) -> None: + """ + Listen for cpu events and provide them to the handler. :param stream: grpc stream that will provide events :param handler: function that handles an event :return: nothing """ try: - for event in stream: + for event_proto in stream: + event = wrappers.CpuUsageEvent.from_proto(event_proto) handler(event) except grpc.RpcError as e: if e.code() == grpc.StatusCode.CANCELLED: - logger.debug("stream closed") + logger.debug("cpu stream closed") else: - logger.exception("stream error") + logger.exception("cpu stream error") -def start_streamer(stream: Any, handler: Callable[[core_pb2.Event], None]) -> None: +def event_listener(stream: Any, handler: Callable[[wrappers.Event], None]) -> None: """ - Convenience method for starting a grpc stream thread for handling streamed events. + Listen for session events and provide them to the handler. :param stream: grpc stream that will provide events :param handler: function that handles an event :return: nothing """ - thread = threading.Thread( - target=stream_listener, args=(stream, handler), daemon=True - ) - thread.start() + try: + for event_proto in stream: + event = wrappers.Event.from_proto(event_proto) + handler(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.CANCELLED: + logger.debug("session stream closed") + else: + logger.exception("session stream error") class CoreGrpcClient: @@ -186,43 +248,81 @@ class CoreGrpcClient: def start_session( self, - session_id: int, - nodes: List[core_pb2.Node], - links: List[core_pb2.Link], - location: core_pb2.SessionLocation = None, - hooks: List[core_pb2.Hook] = None, - emane_config: Dict[str, str] = None, - emane_model_configs: List[EmaneModelConfig] = None, - wlan_configs: List[WlanConfig] = None, - mobility_configs: List[MobilityConfig] = None, - service_configs: List[ServiceConfig] = None, - service_file_configs: List[ServiceFileConfig] = None, - asymmetric_links: List[core_pb2.Link] = None, - config_service_configs: List[configservices_pb2.ConfigServiceConfig] = None, - ) -> core_pb2.StartSessionResponse: + session: wrappers.Session, + asymmetric_links: List[wrappers.Link] = None, + definition: bool = False, + ) -> Tuple[bool, List[str]]: """ Start a session. - :param session_id: id of session - :param nodes: list of nodes to create - :param links: list of links to create - :param location: location to set - :param hooks: session hooks to set - :param emane_config: emane configuration to set - :param emane_model_configs: node emane model configurations - :param wlan_configs: node wlan configurations - :param mobility_configs: node mobility configurations - :param service_configs: node service configurations - :param service_file_configs: node service file configurations - :param asymmetric_links: asymmetric links to edit - :param config_service_configs: config service configurations - :return: start session response + :param session: session to start + :param asymmetric_links: link configuration for asymmetric links + :param definition: True to only define session data, False to start session + :return: tuple of result and exception strings """ + nodes = [x.to_proto() for x in session.nodes.values()] + links = [x.to_proto() for x in session.links] + if asymmetric_links: + asymmetric_links = [x.to_proto() for x in asymmetric_links] + hooks = [x.to_proto() for x in session.hooks.values()] + emane_config = {k: v.value for k, v in session.emane_config.items()} + emane_model_configs = [] + mobility_configs = [] + wlan_configs = [] + service_configs = [] + service_file_configs = [] + config_service_configs = [] + for node in session.nodes.values(): + for key, config in node.emane_model_configs.items(): + model, iface_id = key + config = wrappers.ConfigOption.to_dict(config) + if iface_id is None: + iface_id = -1 + emane_model_config = emane_pb2.EmaneModelConfig( + node_id=node.id, iface_id=iface_id, model=model, config=config + ) + emane_model_configs.append(emane_model_config) + if node.wlan_config: + config = wrappers.ConfigOption.to_dict(node.wlan_config) + wlan_config = wlan_pb2.WlanConfig(node_id=node.id, config=config) + wlan_configs.append(wlan_config) + if node.mobility_config: + config = wrappers.ConfigOption.to_dict(node.mobility_config) + mobility_config = mobility_pb2.MobilityConfig( + node_id=node.id, config=config + ) + mobility_configs.append(mobility_config) + for name, config in node.service_configs.items(): + service_config = services_pb2.ServiceConfig( + node_id=node.id, + service=name, + directories=config.dirs, + files=config.configs, + startup=config.startup, + validate=config.validate, + shutdown=config.shutdown, + ) + service_configs.append(service_config) + for service, file_configs in node.service_file_configs.items(): + for file, data in file_configs.items(): + service_file_config = services_pb2.ServiceFileConfig( + node_id=node.id, service=service, file=file, data=data + ) + service_file_configs.append(service_file_config) + for name, service_config in node.config_service_configs.items(): + config_service_config = configservices_pb2.ConfigServiceConfig( + node_id=node.id, + name=name, + templates=service_config.templates, + config=service_config.config, + ) + config_service_configs.append(config_service_config) + options = {k: v.value for k, v in session.options.items()} request = core_pb2.StartSessionRequest( - session_id=session_id, + session_id=session.id, nodes=nodes, links=links, - location=location, + location=session.location.to_proto(), hooks=hooks, emane_config=emane_config, emane_model_configs=emane_model_configs, @@ -232,125 +332,155 @@ class CoreGrpcClient: service_file_configs=service_file_configs, asymmetric_links=asymmetric_links, config_service_configs=config_service_configs, + options=options, + user=session.user, + definition=definition, + metadata=session.metadata, ) - return self.stub.StartSession(request) + response = self.stub.StartSession(request) + return response.result, list(response.exceptions) - def stop_session(self, session_id: int) -> core_pb2.StopSessionResponse: + def stop_session(self, session_id: int) -> bool: """ Stop a running session. :param session_id: id of session - :return: stop session response + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.StopSessionRequest(session_id=session_id) - return self.stub.StopSession(request) + response = self.stub.StopSession(request) + return response.result - def create_session(self, session_id: int = None) -> core_pb2.CreateSessionResponse: + def add_session(self, session_id: int = None) -> wrappers.Session: + session_id = self.create_session(session_id) + return self.get_session(session_id) + + def create_session(self, session_id: int = None) -> int: """ Create a session. :param session_id: id for session, default is None and one will be created for you - :return: response with created session id + :return: session id """ request = core_pb2.CreateSessionRequest(session_id=session_id) - return self.stub.CreateSession(request) + response = self.stub.CreateSession(request) + return response.session_id - def delete_session(self, session_id: int) -> core_pb2.DeleteSessionResponse: + def delete_session(self, session_id: int) -> bool: """ Delete a session. :param session_id: id of session - :return: response with result of deletion success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.DeleteSessionRequest(session_id=session_id) - return self.stub.DeleteSession(request) + response = self.stub.DeleteSession(request) + return response.result - def get_sessions(self) -> core_pb2.GetSessionsResponse: + def get_sessions(self) -> List[wrappers.SessionSummary]: """ Retrieves all currently known sessions. :return: response with a list of currently known session, their state and number of nodes """ - return self.stub.GetSessions(core_pb2.GetSessionsRequest()) + response = self.stub.GetSessions(core_pb2.GetSessionsRequest()) + sessions = [] + for session_proto in response.sessions: + session = wrappers.SessionSummary.from_proto(session_proto) + sessions.append(session) + return sessions - def check_session(self, session_id: int) -> core_pb2.CheckSessionResponse: + def check_session(self, session_id: int) -> bool: """ Check if a session exists. :param session_id: id of session to check for - :return: response with result if session was found + :return: True if exists, False otherwise """ request = core_pb2.CheckSessionRequest(session_id=session_id) - return self.stub.CheckSession(request) + response = self.stub.CheckSession(request) + return response.result - def get_session(self, session_id: int) -> core_pb2.GetSessionResponse: + def get_session(self, session_id: int) -> wrappers.Session: """ Retrieve a session. :param session_id: id of session - :return: response with sessions state, nodes, and links + :return: session :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.GetSessionRequest(session_id=session_id) - return self.stub.GetSession(request) + response = self.stub.GetSession(request) + return wrappers.Session.from_proto(response.session) - def set_session_state( - self, session_id: int, state: core_pb2.SessionState - ) -> core_pb2.SetSessionStateResponse: + def set_session_state(self, session_id: int, state: wrappers.SessionState) -> bool: """ Set session state. :param session_id: id of session :param state: session state to transition to - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ - request = core_pb2.SetSessionStateRequest(session_id=session_id, state=state) - return self.stub.SetSessionState(request) + request = core_pb2.SetSessionStateRequest( + session_id=session_id, state=state.value + ) + response = self.stub.SetSessionState(request) + return response.result - def add_session_server( - self, session_id: int, name: str, host: str - ) -> core_pb2.AddSessionServerResponse: + def add_session_server(self, session_id: int, name: str, host: str) -> bool: """ Add distributed session server. :param session_id: id of session :param name: name of server to add :param host: host address to connect to - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.AddSessionServerRequest( session_id=session_id, name=name, host=host ) - return self.stub.AddSessionServer(request) + response = self.stub.AddSessionServer(request) + return response.result def alert( self, session_id: int, - level: core_pb2.ExceptionLevel, + level: wrappers.ExceptionLevel, source: str, text: str, node_id: int = None, - ) -> core_pb2.SessionAlertResponse: + ) -> bool: + """ + Initiate an alert to be broadcast out to all listeners. + + :param session_id: id of session + :param level: alert level + :param source: source of alert + :param text: alert text + :param node_id: node associated with alert + :return: True for success, False otherwise + """ request = core_pb2.SessionAlertRequest( session_id=session_id, - level=level, + level=level.value, source=source, text=text, node_id=node_id, ) - return self.stub.SessionAlert(request) + response = self.stub.SessionAlert(request) + return response.result def events( self, session_id: int, - handler: Callable[[core_pb2.Event], None], - events: List[core_pb2.Event] = None, + handler: Callable[[wrappers.Event], None], + events: List[wrappers.EventType] = None, ) -> grpc.Future: """ Listen for session events. @@ -363,11 +493,14 @@ class CoreGrpcClient: """ request = core_pb2.EventsRequest(session_id=session_id, events=events) stream = self.stub.Events(request) - start_streamer(stream, handler) + thread = threading.Thread( + target=event_listener, args=(stream, handler), daemon=True + ) + thread.start() return stream def throughputs( - self, session_id: int, handler: Callable[[core_pb2.ThroughputsEvent], None] + self, session_id: int, handler: Callable[[wrappers.ThroughputsEvent], None] ) -> grpc.Future: """ Listen for throughput events with information for interfaces and bridges. @@ -379,11 +512,14 @@ class CoreGrpcClient: """ request = core_pb2.ThroughputsRequest(session_id=session_id) stream = self.stub.Throughputs(request) - start_streamer(stream, handler) + thread = threading.Thread( + target=throughput_listener, args=(stream, handler), daemon=True + ) + thread.start() return stream def cpu_usage( - self, delay: int, handler: Callable[[core_pb2.CpuUsageEvent], None] + self, delay: int, handler: Callable[[wrappers.CpuUsageEvent], None] ) -> grpc.Future: """ Listen for cpu usage events with the given repeat delay. @@ -394,47 +530,61 @@ class CoreGrpcClient: """ request = core_pb2.CpuUsageRequest(delay=delay) stream = self.stub.CpuUsage(request) - start_streamer(stream, handler) + thread = threading.Thread( + target=cpu_listener, args=(stream, handler), daemon=True + ) + thread.start() return stream - def add_node( - self, session_id: int, node: core_pb2.Node, source: str = None - ) -> core_pb2.AddNodeResponse: + def add_node(self, session_id: int, node: wrappers.Node, source: str = None) -> int: """ Add node to session. :param session_id: session id :param node: node to add :param source: source application - :return: response with node id + :return: id of added node :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.AddNodeRequest( - session_id=session_id, node=node, source=source + session_id=session_id, node=node.to_proto(), source=source ) - return self.stub.AddNode(request) + response = self.stub.AddNode(request) + return response.node_id - def get_node(self, session_id: int, node_id: int) -> core_pb2.GetNodeResponse: + def get_node( + self, session_id: int, node_id: int + ) -> Tuple[wrappers.Node, List[wrappers.Interface], List[wrappers.Link]]: """ Get node details. :param session_id: session id :param node_id: node id - :return: response with node details + :return: tuple of node and its interfaces :raises grpc.RpcError: when session or node doesn't exist """ request = core_pb2.GetNodeRequest(session_id=session_id, node_id=node_id) - return self.stub.GetNode(request) + response = self.stub.GetNode(request) + node = wrappers.Node.from_proto(response.node) + ifaces = [] + for iface_proto in response.ifaces: + iface = wrappers.Interface.from_proto(iface_proto) + ifaces.append(iface) + links = [] + for link_proto in response.links: + link = wrappers.Link.from_proto(link_proto) + links.append(link) + return node, ifaces, links def edit_node( self, session_id: int, node_id: int, - position: core_pb2.Position = None, + position: wrappers.Position = None, icon: str = None, - geo: core_pb2.Geo = None, + geo: wrappers.Geo = None, source: str = None, - ) -> core_pb2.EditNodeResponse: + ) -> bool: """ Edit a node's icon and/or location, can only use position(x,y) or geo(lon, lat, alt), not both. @@ -445,47 +595,49 @@ class CoreGrpcClient: :param icon: path to icon for gui to use for node :param geo: lon,lat,alt location for node :param source: application source - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ + if position and geo: + raise CoreError("cannot edit position and geo at same time") + position_proto = position.to_proto() if position else None + geo_proto = geo.to_proto() if geo else None request = core_pb2.EditNodeRequest( session_id=session_id, node_id=node_id, - position=position, + position=position_proto, icon=icon, source=source, - geo=geo, + geo=geo_proto, ) - return self.stub.EditNode(request) + response = self.stub.EditNode(request) + return response.result - def move_nodes( - self, move_iterator: Iterable[core_pb2.MoveNodesRequest] - ) -> core_pb2.MoveNodesResponse: + def move_nodes(self, streamer: MoveNodesStreamer) -> None: """ Stream node movements using the provided iterator. - :param move_iterator: iterator for generating node movements - :return: move nodes response + :param streamer: move nodes streamer + :return: nothing :raises grpc.RpcError: when session or nodes do not exist """ - return self.stub.MoveNodes(move_iterator) + self.stub.MoveNodes(streamer.iter()) - def delete_node( - self, session_id: int, node_id: int, source: str = None - ) -> core_pb2.DeleteNodeResponse: + def delete_node(self, session_id: int, node_id: int, source: str = None) -> bool: """ Delete node from session. :param session_id: session id :param node_id: node id :param source: application source - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ request = core_pb2.DeleteNodeRequest( session_id=session_id, node_id=node_id, source=source ) - return self.stub.DeleteNode(request) + response = self.stub.DeleteNode(request) + return response.result def node_command( self, @@ -494,7 +646,7 @@ class CoreGrpcClient: command: str, wait: bool = True, shell: bool = False, - ) -> core_pb2.NodeCommandResponse: + ) -> Tuple[int, str]: """ Send command to a node and get the output. @@ -503,7 +655,7 @@ class CoreGrpcClient: :param command: command to run on node :param wait: wait for command to complete :param shell: send shell command - :return: response with command combined stdout/stderr + :return: returns tuple of return code and output :raises grpc.RpcError: when session or node doesn't exist """ request = core_pb2.NodeCommandRequest( @@ -513,214 +665,220 @@ class CoreGrpcClient: wait=wait, shell=shell, ) - return self.stub.NodeCommand(request) + response = self.stub.NodeCommand(request) + return response.return_code, response.output - def get_node_terminal( - self, session_id: int, node_id: int - ) -> core_pb2.GetNodeTerminalResponse: + def get_node_terminal(self, session_id: int, node_id: int) -> str: """ Retrieve terminal command string for launching a local terminal. :param session_id: session id :param node_id: node id - :return: response with a node terminal command + :return: node terminal :raises grpc.RpcError: when session or node doesn't exist """ request = core_pb2.GetNodeTerminalRequest( session_id=session_id, node_id=node_id ) - return self.stub.GetNodeTerminal(request) + response = self.stub.GetNodeTerminal(request) + return response.terminal + + def get_node_links(self, session_id: int, node_id: int) -> List[wrappers.Link]: + """ + Get current links for a node. + + :param session_id: session id + :param node_id: node id + :return: list of links + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.GetNodeLinksRequest(session_id=session_id, node_id=node_id) + response = self.stub.GetNodeLinks(request) + links = [] + for link_proto in response.links: + link = wrappers.Link.from_proto(link_proto) + links.append(link) + return links def add_link( - self, - session_id: int, - node1_id: int, - node2_id: int, - iface1: core_pb2.Interface = None, - iface2: core_pb2.Interface = None, - options: core_pb2.LinkOptions = None, - source: str = None, - ) -> core_pb2.AddLinkResponse: + self, session_id: int, link: wrappers.Link, source: str = None + ) -> Tuple[bool, wrappers.Interface, wrappers.Interface]: """ Add a link between nodes. :param session_id: session id - :param node1_id: node one id - :param node2_id: node two id - :param iface1: node one interface data - :param iface2: node two interface data - :param options: options for link (jitter, bandwidth, etc) + :param link: link to add :param source: application source - :return: response with result of success or failure + :return: tuple of result and finalized interface values :raises grpc.RpcError: when session or one of the nodes don't exist """ - link = core_pb2.Link( - node1_id=node1_id, - node2_id=node2_id, - type=core_pb2.LinkType.WIRED, - iface1=iface1, - iface2=iface2, - options=options, - ) request = core_pb2.AddLinkRequest( - session_id=session_id, link=link, source=source + session_id=session_id, link=link.to_proto(), source=source ) - return self.stub.AddLink(request) + response = self.stub.AddLink(request) + iface1 = wrappers.Interface.from_proto(response.iface1) + iface2 = wrappers.Interface.from_proto(response.iface2) + return response.result, iface1, iface2 def edit_link( - self, - session_id: int, - node1_id: int, - node2_id: int, - options: core_pb2.LinkOptions, - iface1_id: int = None, - iface2_id: int = None, - source: str = None, - ) -> core_pb2.EditLinkResponse: + self, session_id: int, link: wrappers.Link, source: str = None + ) -> bool: """ Edit a link between nodes. :param session_id: session id - :param node1_id: node one id - :param node2_id: node two id - :param options: options for link (jitter, bandwidth, etc) - :param iface1_id: node one interface id - :param iface2_id: node two interface id + :param link: link to edit :param source: application source :return: response with result of success or failure :raises grpc.RpcError: when session or one of the nodes don't exist """ + iface1_id = link.iface1.id if link.iface1 else None + iface2_id = link.iface2.id if link.iface2 else None request = core_pb2.EditLinkRequest( session_id=session_id, - node1_id=node1_id, - node2_id=node2_id, - options=options, + node1_id=link.node1_id, + node2_id=link.node2_id, + options=link.options.to_proto(), iface1_id=iface1_id, iface2_id=iface2_id, source=source, ) - return self.stub.EditLink(request) + response = self.stub.EditLink(request) + return response.result def delete_link( - self, - session_id: int, - node1_id: int, - node2_id: int, - iface1_id: int = None, - iface2_id: int = None, - source: str = None, - ) -> core_pb2.DeleteLinkResponse: + self, session_id: int, link: wrappers.Link, source: str = None + ) -> bool: """ Delete a link between nodes. :param session_id: session id - :param node1_id: node one id - :param node2_id: node two id - :param iface1_id: node one interface id - :param iface2_id: node two interface id + :param link: link to delete :param source: application source :return: response with result of success or failure :raises grpc.RpcError: when session doesn't exist """ + iface1_id = link.iface1.id if link.iface1 else None + iface2_id = link.iface2.id if link.iface2 else None request = core_pb2.DeleteLinkRequest( session_id=session_id, - node1_id=node1_id, - node2_id=node2_id, + node1_id=link.node1_id, + node2_id=link.node2_id, iface1_id=iface1_id, iface2_id=iface2_id, source=source, ) - return self.stub.DeleteLink(request) + response = self.stub.DeleteLink(request) + return response.result - def get_mobility_configs(self, session_id: int) -> GetMobilityConfigsResponse: + def get_mobility_configs( + self, session_id: int + ) -> Dict[int, Dict[str, wrappers.ConfigOption]]: """ Get all mobility configurations. :param session_id: session id - :return: response with a dict of node ids to mobility configurations + :return: dict of node id to mobility configuration dict :raises grpc.RpcError: when session doesn't exist """ request = GetMobilityConfigsRequest(session_id=session_id) - return self.stub.GetMobilityConfigs(request) + response = self.stub.GetMobilityConfigs(request) + configs = {} + for node_id, mapped_config in response.configs.items(): + configs[node_id] = wrappers.ConfigOption.from_dict(mapped_config.config) + return configs def get_mobility_config( self, session_id: int, node_id: int - ) -> GetMobilityConfigResponse: + ) -> Dict[str, wrappers.ConfigOption]: """ Get mobility configuration for a node. :param session_id: session id :param node_id: node id - :return: response with a list of configuration groups + :return: dict of config name to options :raises grpc.RpcError: when session or node doesn't exist """ request = GetMobilityConfigRequest(session_id=session_id, node_id=node_id) - return self.stub.GetMobilityConfig(request) + response = self.stub.GetMobilityConfig(request) + return wrappers.ConfigOption.from_dict(response.config) def set_mobility_config( self, session_id: int, node_id: int, config: Dict[str, str] - ) -> SetMobilityConfigResponse: + ) -> bool: """ Set mobility configuration for a node. :param session_id: session id :param node_id: node id :param config: mobility configuration - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ mobility_config = MobilityConfig(node_id=node_id, config=config) request = SetMobilityConfigRequest( session_id=session_id, mobility_config=mobility_config ) - return self.stub.SetMobilityConfig(request) + response = self.stub.SetMobilityConfig(request) + return response.result def mobility_action( - self, session_id: int, node_id: int, action: ServiceAction - ) -> MobilityActionResponse: + self, session_id: int, node_id: int, action: wrappers.MobilityAction + ) -> bool: """ Send a mobility action for a node. :param session_id: session id :param node_id: node id :param action: action to take - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ request = MobilityActionRequest( - session_id=session_id, node_id=node_id, action=action + session_id=session_id, node_id=node_id, action=action.value ) - return self.stub.MobilityAction(request) + response = self.stub.MobilityAction(request) + return response.result - def get_services(self) -> GetServicesResponse: + def get_services(self) -> List[wrappers.Service]: """ Get all currently loaded services. - :return: response with a list of services + :return: list of services, name and groups only """ request = GetServicesRequest() - return self.stub.GetServices(request) + response = self.stub.GetServices(request) + services = [] + for service_proto in response.services: + service = wrappers.Service.from_proto(service_proto) + services.append(service) + return services - def get_service_defaults(self, session_id: int) -> GetServiceDefaultsResponse: + def get_service_defaults(self, session_id: int) -> List[wrappers.ServiceDefault]: """ Get default services for different default node models. :param session_id: session id - :return: response with a dict of node model to a list of services + :return: list of service defaults :raises grpc.RpcError: when session doesn't exist """ request = GetServiceDefaultsRequest(session_id=session_id) - return self.stub.GetServiceDefaults(request) + response = self.stub.GetServiceDefaults(request) + defaults = [] + for default_proto in response.defaults: + default = wrappers.ServiceDefault.from_proto(default_proto) + defaults.append(default) + return defaults def set_service_defaults( self, session_id: int, service_defaults: Dict[str, List[str]] - ) -> SetServiceDefaultsResponse: + ) -> bool: """ Set default services for node models. :param session_id: session id :param service_defaults: node models to lists of services - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ defaults = [] @@ -729,41 +887,48 @@ class CoreGrpcClient: default = ServiceDefaults(node_type=node_type, services=services) defaults.append(default) request = SetServiceDefaultsRequest(session_id=session_id, defaults=defaults) - return self.stub.SetServiceDefaults(request) + response = self.stub.SetServiceDefaults(request) + return response.result def get_node_service_configs( self, session_id: int - ) -> GetNodeServiceConfigsResponse: + ) -> List[wrappers.NodeServiceConfig]: """ Get service data for a node. :param session_id: session id - :return: response with all node service configs + :return: list of node service data :raises grpc.RpcError: when session doesn't exist """ request = GetNodeServiceConfigsRequest(session_id=session_id) - return self.stub.GetNodeServiceConfigs(request) + response = self.stub.GetNodeServiceConfigs(request) + node_services = [] + for config in response.configs: + node_service = wrappers.NodeServiceConfig.from_proto(config) + node_services.append(node_service) + return node_services def get_node_service( self, session_id: int, node_id: int, service: str - ) -> GetNodeServiceResponse: + ) -> wrappers.NodeServiceData: """ Get service data for a node. :param session_id: session id :param node_id: node id :param service: service name - :return: response with node service data + :return: node service data :raises grpc.RpcError: when session or node doesn't exist """ request = GetNodeServiceRequest( session_id=session_id, node_id=node_id, service=service ) - return self.stub.GetNodeService(request) + response = self.stub.GetNodeService(request) + return wrappers.NodeServiceData.from_proto(response.service) def get_node_service_file( self, session_id: int, node_id: int, service: str, file_name: str - ) -> GetNodeServiceFileResponse: + ) -> str: """ Get a service file for a node. @@ -771,74 +936,55 @@ class CoreGrpcClient: :param node_id: node id :param service: service name :param file_name: file name to get data for - :return: response with file data + :return: file data :raises grpc.RpcError: when session or node doesn't exist """ request = GetNodeServiceFileRequest( session_id=session_id, node_id=node_id, service=service, file=file_name ) - return self.stub.GetNodeServiceFile(request) + response = self.stub.GetNodeServiceFile(request) + return response.data def set_node_service( - self, - session_id: int, - node_id: int, - service: str, - files: List[str] = None, - directories: List[str] = None, - startup: List[str] = None, - validate: List[str] = None, - shutdown: List[str] = None, - ) -> SetNodeServiceResponse: + self, session_id: int, service_config: wrappers.ServiceConfig + ) -> bool: """ Set service data for a node. :param session_id: session id - :param node_id: node id - :param service: service name - :param files: service files - :param directories: service directories - :param startup: startup commands - :param validate: validation commands - :param shutdown: shutdown commands - :return: response with result of success or failure + :param service_config: service configuration for a node + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ - config = ServiceConfig( - node_id=node_id, - service=service, - files=files, - directories=directories, - startup=startup, - validate=validate, - shutdown=shutdown, + request = SetNodeServiceRequest( + session_id=session_id, config=service_config.to_proto() ) - request = SetNodeServiceRequest(session_id=session_id, config=config) - return self.stub.SetNodeService(request) + response = self.stub.SetNodeService(request) + return response.result def set_node_service_file( - self, session_id: int, node_id: int, service: str, file_name: str, data: str - ) -> SetNodeServiceFileResponse: + self, session_id: int, service_file_config: wrappers.ServiceFileConfig + ) -> bool: """ Set a service file for a node. :param session_id: session id - :param node_id: node id - :param service: service name - :param file_name: file name to save - :param data: data to save for file - :return: response with result of success or failure + :param service_file_config: configuration to set + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ - config = ServiceFileConfig( - node_id=node_id, service=service, file=file_name, data=data - ) + config = service_file_config.to_proto() request = SetNodeServiceFileRequest(session_id=session_id, config=config) - return self.stub.SetNodeServiceFile(request) + response = self.stub.SetNodeServiceFile(request) + return response.result def service_action( - self, session_id: int, node_id: int, service: str, action: ServiceAction - ) -> ServiceActionResponse: + self, + session_id: int, + node_id: int, + service: str, + action: wrappers.ServiceAction, + ) -> bool: """ Send an action to a service for a node. @@ -847,54 +993,65 @@ class CoreGrpcClient: :param service: service name :param action: action for service (start, stop, restart, validate) - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ request = ServiceActionRequest( - session_id=session_id, node_id=node_id, service=service, action=action + session_id=session_id, node_id=node_id, service=service, action=action.value ) - return self.stub.ServiceAction(request) + response = self.stub.ServiceAction(request) + return response.result - def get_wlan_configs(self, session_id: int) -> GetWlanConfigsResponse: + def get_wlan_configs( + self, session_id: int + ) -> Dict[int, Dict[str, wrappers.ConfigOption]]: """ Get all wlan configurations. :param session_id: session id - :return: response with a dict of node ids to wlan configurations + :return: dict of node ids to dict of names to options :raises grpc.RpcError: when session doesn't exist """ request = GetWlanConfigsRequest(session_id=session_id) - return self.stub.GetWlanConfigs(request) + response = self.stub.GetWlanConfigs(request) + configs = {} + for node_id, mapped_config in response.configs.items(): + configs[node_id] = wrappers.ConfigOption.from_dict(mapped_config.config) + return configs - def get_wlan_config(self, session_id: int, node_id: int) -> GetWlanConfigResponse: + def get_wlan_config( + self, session_id: int, node_id: int + ) -> Dict[str, wrappers.ConfigOption]: """ Get wlan configuration for a node. :param session_id: session id :param node_id: node id - :return: response with a list of configuration groups + :return: dict of names to options :raises grpc.RpcError: when session doesn't exist """ request = GetWlanConfigRequest(session_id=session_id, node_id=node_id) - return self.stub.GetWlanConfig(request) + response = self.stub.GetWlanConfig(request) + return wrappers.ConfigOption.from_dict(response.config) def set_wlan_config( self, session_id: int, node_id: int, config: Dict[str, str] - ) -> SetWlanConfigResponse: + ) -> bool: """ Set wlan configuration for a node. :param session_id: session id :param node_id: node id :param config: wlan configuration - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ wlan_config = WlanConfig(node_id=node_id, config=config) request = SetWlanConfigRequest(session_id=session_id, wlan_config=wlan_config) - return self.stub.SetWlanConfig(request) + response = self.stub.SetWlanConfig(request) + return response.result - def get_emane_config(self, session_id: int) -> GetEmaneConfigResponse: + def get_emane_config(self, session_id: int) -> Dict[str, wrappers.ConfigOption]: """ Get session emane configuration. @@ -903,36 +1060,37 @@ class CoreGrpcClient: :raises grpc.RpcError: when session doesn't exist """ request = GetEmaneConfigRequest(session_id=session_id) - return self.stub.GetEmaneConfig(request) + response = self.stub.GetEmaneConfig(request) + return wrappers.ConfigOption.from_dict(response.config) - def set_emane_config( - self, session_id: int, config: Dict[str, str] - ) -> SetEmaneConfigResponse: + def set_emane_config(self, session_id: int, config: Dict[str, str]) -> bool: """ Set session emane configuration. :param session_id: session id :param config: emane configuration - :return: response with result of success or failure + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ request = SetEmaneConfigRequest(session_id=session_id, config=config) - return self.stub.SetEmaneConfig(request) + response = self.stub.SetEmaneConfig(request) + return response.result - def get_emane_models(self, session_id: int) -> GetEmaneModelsResponse: + def get_emane_models(self, session_id: int) -> List[str]: """ Get session emane models. :param session_id: session id - :return: response with a list of emane models + :return: list of emane models :raises grpc.RpcError: when session doesn't exist """ request = GetEmaneModelsRequest(session_id=session_id) - return self.stub.GetEmaneModels(request) + response = self.stub.GetEmaneModels(request) + return list(response.models) def get_emane_model_config( self, session_id: int, node_id: int, model: str, iface_id: int = -1 - ) -> GetEmaneModelConfigResponse: + ) -> Dict[str, wrappers.ConfigOption]: """ Get emane model configuration for a node or a node's interface. @@ -940,53 +1098,51 @@ class CoreGrpcClient: :param node_id: node id :param model: emane model name :param iface_id: node interface id - :return: response with a list of configuration groups + :return: dict of names to options :raises grpc.RpcError: when session doesn't exist """ request = GetEmaneModelConfigRequest( session_id=session_id, node_id=node_id, model=model, iface_id=iface_id ) - return self.stub.GetEmaneModelConfig(request) + response = self.stub.GetEmaneModelConfig(request) + return wrappers.ConfigOption.from_dict(response.config) def set_emane_model_config( - self, - session_id: int, - node_id: int, - model: str, - config: Dict[str, str] = None, - iface_id: int = -1, - ) -> SetEmaneModelConfigResponse: + self, session_id: int, emane_model_config: wrappers.EmaneModelConfig + ) -> bool: """ Set emane model configuration for a node or a node's interface. :param session_id: session id - :param node_id: node id - :param model: emane model name - :param config: emane model configuration - :param iface_id: node interface id - :return: response with result of success or failure + :param emane_model_config: emane model config to set + :return: True for success, False otherwise :raises grpc.RpcError: when session doesn't exist """ - model_config = EmaneModelConfig( - node_id=node_id, model=model, config=config, iface_id=iface_id - ) request = SetEmaneModelConfigRequest( - session_id=session_id, emane_model_config=model_config + session_id=session_id, emane_model_config=emane_model_config.to_proto() ) - return self.stub.SetEmaneModelConfig(request) + response = self.stub.SetEmaneModelConfig(request) + return response.result - def get_emane_model_configs(self, session_id: int) -> GetEmaneModelConfigsResponse: + def get_emane_model_configs( + self, session_id: int + ) -> List[wrappers.EmaneModelConfig]: """ Get all EMANE model configurations for a session. :param session_id: session to get emane model configs - :return: response with a dictionary of node/interface ids to configurations + :return: list of emane model configs :raises grpc.RpcError: when session doesn't exist """ request = GetEmaneModelConfigsRequest(session_id=session_id) - return self.stub.GetEmaneModelConfigs(request) + response = self.stub.GetEmaneModelConfigs(request) + configs = [] + for config_proto in response.configs: + config = wrappers.EmaneModelConfig.from_proto(config_proto) + configs.append(config) + return configs - def save_xml(self, session_id: int, file_path: str) -> core_pb2.SaveXmlResponse: + def save_xml(self, session_id: int, file_path: str) -> None: """ Save the current scenario to an XML file. @@ -1000,22 +1156,21 @@ class CoreGrpcClient: with open(file_path, "w") as xml_file: xml_file.write(response.data) - def open_xml(self, file_path: str, start: bool = False) -> core_pb2.OpenXmlResponse: + def open_xml(self, file_path: Path, start: bool = False) -> Tuple[bool, int]: """ Load a local scenario XML file to open as a new session. :param file_path: path of scenario XML file - :param start: True to start session, False otherwise - :return: response with opened session id + :param start: tuple of result and session id when successful + :return: tuple of result and session id """ - with open(file_path, "r") as xml_file: - data = xml_file.read() - request = core_pb2.OpenXmlRequest(data=data, start=start, file=file_path) - return self.stub.OpenXml(request) + with file_path.open("r") as f: + data = f.read() + request = core_pb2.OpenXmlRequest(data=data, start=start, file=str(file_path)) + response = self.stub.OpenXml(request) + return response.result, response.session_id - def emane_link( - self, session_id: int, nem1: int, nem2: int, linked: bool - ) -> EmaneLinkResponse: + def emane_link(self, session_id: int, nem1: int, nem2: int, linked: bool) -> bool: """ Helps broadcast wireless link/unlink between EMANE nodes. @@ -1023,92 +1178,103 @@ class CoreGrpcClient: :param nem1: first nem for emane link :param nem2: second nem for emane link :param linked: True to link, False to unlink - :return: get emane link response + :return: True for success, False otherwise :raises grpc.RpcError: when session or nodes related to nems do not exist """ request = EmaneLinkRequest( session_id=session_id, nem1=nem1, nem2=nem2, linked=linked ) - return self.stub.EmaneLink(request) + response = self.stub.EmaneLink(request) + return response.result - def get_ifaces(self) -> core_pb2.GetInterfacesResponse: + def get_ifaces(self) -> List[str]: """ Retrieves a list of interfaces available on the host machine that are not a part of a CORE session. - :return: get interfaces response + :return: list of interfaces """ request = core_pb2.GetInterfacesRequest() - return self.stub.GetInterfaces(request) + response = self.stub.GetInterfaces(request) + return list(response.ifaces) - def get_config_services(self) -> GetConfigServicesResponse: + def get_config_services(self) -> List[wrappers.ConfigService]: """ Retrieve all known config services. - :return: get config services response + :return: list of config services """ request = GetConfigServicesRequest() - return self.stub.GetConfigServices(request) + response = self.stub.GetConfigServices(request) + services = [] + for service_proto in response.services: + service = wrappers.ConfigService.from_proto(service_proto) + services.append(service) + return services - def get_config_service_defaults( - self, name: str - ) -> GetConfigServiceDefaultsResponse: + def get_config_service_defaults(self, name: str) -> wrappers.ConfigServiceDefaults: """ Retrieves config service default values. :param name: name of service to get defaults for - :return: get config service defaults + :return: config service defaults """ request = GetConfigServiceDefaultsRequest(name=name) - return self.stub.GetConfigServiceDefaults(request) + response = self.stub.GetConfigServiceDefaults(request) + return wrappers.ConfigServiceDefaults.from_proto(response) def get_node_config_service_configs( self, session_id: int - ) -> GetNodeConfigServiceConfigsResponse: + ) -> List[wrappers.ConfigServiceConfig]: """ Retrieves all node config service configurations for a session. :param session_id: session to get config service configurations for - :return: get node config service configs response + :return: list of node config service configs :raises grpc.RpcError: when session doesn't exist """ request = GetNodeConfigServiceConfigsRequest(session_id=session_id) - return self.stub.GetNodeConfigServiceConfigs(request) + response = self.stub.GetNodeConfigServiceConfigs(request) + configs = [] + for config_proto in response.configs: + config = wrappers.ConfigServiceConfig.from_proto(config_proto) + configs.append(config) + return configs def get_node_config_service( self, session_id: int, node_id: int, name: str - ) -> GetNodeConfigServiceResponse: + ) -> Dict[str, str]: """ Retrieves information for a specific config service on a node. :param session_id: session node belongs to :param node_id: id of node to get service information from :param name: name of service - :return: get node config service response + :return: config dict of names to values :raises grpc.RpcError: when session or node doesn't exist """ request = GetNodeConfigServiceRequest( session_id=session_id, node_id=node_id, name=name ) - return self.stub.GetNodeConfigService(request) + response = self.stub.GetNodeConfigService(request) + return dict(response.config) - def get_node_config_services( - self, session_id: int, node_id: int - ) -> GetNodeConfigServicesResponse: + def get_node_config_services(self, session_id: int, node_id: int) -> List[str]: """ Retrieves the config services currently assigned to a node. :param session_id: session node belongs to :param node_id: id of node to get config services for - :return: get node config services response + :return: list of config services :raises grpc.RpcError: when session or node doesn't exist """ request = GetNodeConfigServicesRequest(session_id=session_id, node_id=node_id) - return self.stub.GetNodeConfigServices(request) + response = self.stub.GetNodeConfigServices(request) + return list(response.services) def set_node_config_service( self, session_id: int, node_id: int, name: str, config: Dict[str, str] - ) -> SetNodeConfigServiceResponse: + ) -> bool: """ Assigns a config service to a node with the provided configuration. @@ -1116,38 +1282,41 @@ class CoreGrpcClient: :param node_id: id of node to assign config service to :param name: name of service :param config: service configuration - :return: set node config service response + :return: True for success, False otherwise :raises grpc.RpcError: when session or node doesn't exist """ request = SetNodeConfigServiceRequest( session_id=session_id, node_id=node_id, name=name, config=config ) - return self.stub.SetNodeConfigService(request) + response = self.stub.SetNodeConfigService(request) + return response.result - def get_emane_event_channel(self, session_id: int) -> GetEmaneEventChannelResponse: + def get_emane_event_channel(self, session_id: int) -> wrappers.EmaneEventChannel: """ Retrieves the current emane event channel being used for a session. :param session_id: session to get emane event channel for - :return: emane event channel response + :return: emane event channel :raises grpc.RpcError: when session doesn't exist """ request = GetEmaneEventChannelRequest(session_id=session_id) - return self.stub.GetEmaneEventChannel(request) + response = self.stub.GetEmaneEventChannel(request) + return wrappers.EmaneEventChannel.from_proto(response) - def execute_script(self, script: str) -> ExecuteScriptResponse: + def execute_script(self, script: str) -> Optional[int]: """ Executes a python script given context of the current CoreEmu object. :param script: script to execute - :return: execute script response + :return: create session id for script executed """ request = ExecuteScriptRequest(script=script) - return self.stub.ExecuteScript(request) + response = self.stub.ExecuteScript(request) + return response.session_id if response.session_id else None def wlan_link( self, session_id: int, wlan_id: int, node1_id: int, node2_id: int, linked: bool - ) -> WlanLinkResponse: + ) -> bool: """ Links/unlinks nodes on the same WLAN. @@ -1156,7 +1325,7 @@ class CoreGrpcClient: :param node1_id: first node of pair to link/unlink :param node2_id: second node of pair to link/unlin :param linked: True to link, False to unlink - :return: wlan link response + :return: True for success, False otherwise :raises grpc.RpcError: when session or one of the nodes do not exist """ request = WlanLinkRequest( @@ -1166,20 +1335,19 @@ class CoreGrpcClient: node2_id=node2_id, linked=linked, ) - return self.stub.WlanLink(request) + response = self.stub.WlanLink(request) + return response.result - def emane_pathlosses( - self, pathloss_iterator: Iterable[EmanePathlossesRequest] - ) -> EmanePathlossesResponse: + def emane_pathlosses(self, streamer: EmanePathlossesStreamer) -> None: """ Stream EMANE pathloss events. - :param pathloss_iterator: iterator for sending emane pathloss events - :return: emane pathloss response + :param streamer: emane pathlosses streamer + :return: nothing :raises grpc.RpcError: when a pathloss event session or one of the nodes do not exist """ - return self.stub.EmanePathlosses(pathloss_iterator) + self.stub.EmanePathlosses(streamer.iter()) def connect(self) -> None: """ diff --git a/daemon/core/api/grpc/clientw.py b/daemon/core/api/grpc/clientw.py deleted file mode 100644 index 23a66f1f..00000000 --- a/daemon/core/api/grpc/clientw.py +++ /dev/null @@ -1,1385 +0,0 @@ -""" -gRpc client for interfacing with CORE. -""" - -import logging -import threading -from contextlib import contextmanager -from pathlib import Path -from queue import Queue -from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple - -import grpc - -from core.api.grpc import ( - configservices_pb2, - core_pb2, - core_pb2_grpc, - emane_pb2, - mobility_pb2, - services_pb2, - wlan_pb2, - wrappers, -) -from core.api.grpc.configservices_pb2 import ( - GetConfigServiceDefaultsRequest, - GetConfigServicesRequest, - GetNodeConfigServiceConfigsRequest, - GetNodeConfigServiceRequest, - GetNodeConfigServicesRequest, - SetNodeConfigServiceRequest, -) -from core.api.grpc.core_pb2 import ExecuteScriptRequest -from core.api.grpc.emane_pb2 import ( - EmaneLinkRequest, - GetEmaneConfigRequest, - GetEmaneEventChannelRequest, - GetEmaneModelConfigRequest, - GetEmaneModelConfigsRequest, - GetEmaneModelsRequest, - SetEmaneConfigRequest, - SetEmaneModelConfigRequest, -) -from core.api.grpc.mobility_pb2 import ( - GetMobilityConfigRequest, - GetMobilityConfigsRequest, - MobilityActionRequest, - MobilityConfig, - SetMobilityConfigRequest, -) -from core.api.grpc.services_pb2 import ( - GetNodeServiceConfigsRequest, - GetNodeServiceFileRequest, - GetNodeServiceRequest, - GetServiceDefaultsRequest, - GetServicesRequest, - ServiceActionRequest, - ServiceDefaults, - SetNodeServiceFileRequest, - SetNodeServiceRequest, - SetServiceDefaultsRequest, -) -from core.api.grpc.wlan_pb2 import ( - GetWlanConfigRequest, - GetWlanConfigsRequest, - SetWlanConfigRequest, - WlanConfig, - WlanLinkRequest, -) -from core.emulator.data import IpPrefixes -from core.errors import CoreError - -logger = logging.getLogger(__name__) - - -class MoveNodesStreamer: - def __init__(self, session_id: int = None, source: str = None) -> None: - self.session_id = session_id - self.source = source - self.queue: Queue = Queue() - - def send_position(self, node_id: int, x: float, y: float) -> None: - position = wrappers.Position(x=x, y=y) - request = wrappers.MoveNodesRequest( - session_id=self.session_id, - node_id=node_id, - source=self.source, - position=position, - ) - self.send(request) - - def send_geo(self, node_id: int, lon: float, lat: float, alt: float) -> None: - geo = wrappers.Geo(lon=lon, lat=lat, alt=alt) - request = wrappers.MoveNodesRequest( - session_id=self.session_id, node_id=node_id, source=self.source, geo=geo - ) - self.send(request) - - def send(self, request: wrappers.MoveNodesRequest) -> None: - self.queue.put(request) - - def stop(self) -> None: - self.queue.put(None) - - def next(self) -> Optional[core_pb2.MoveNodesRequest]: - request: Optional[wrappers.MoveNodesRequest] = self.queue.get() - if request: - return request.to_proto() - else: - return request - - def iter(self) -> Iterable: - return iter(self.next, None) - - -class EmanePathlossesStreamer: - def __init__(self) -> None: - self.queue: Queue = Queue() - - def send(self, request: Optional[wrappers.EmanePathlossesRequest]) -> None: - self.queue.put(request) - - def next(self) -> Optional[emane_pb2.EmanePathlossesRequest]: - request: Optional[wrappers.EmanePathlossesRequest] = self.queue.get() - if request: - return request.to_proto() - else: - return request - - def iter(self): - return iter(self.next, None) - - -class InterfaceHelper: - """ - Convenience class to help generate IP4 and IP6 addresses for gRPC clients. - """ - - def __init__(self, ip4_prefix: str = None, ip6_prefix: str = None) -> None: - """ - Creates an InterfaceHelper object. - - :param ip4_prefix: ip4 prefix to use for generation - :param ip6_prefix: ip6 prefix to use for generation - :raises ValueError: when both ip4 and ip6 prefixes have not been provided - """ - self.prefixes: IpPrefixes = IpPrefixes(ip4_prefix, ip6_prefix) - - def create_iface( - self, node_id: int, iface_id: int, name: str = None, mac: str = None - ) -> wrappers.Interface: - """ - Create an interface protobuf object. - - :param node_id: node id to create interface for - :param iface_id: interface id - :param name: name of interface - :param mac: mac address for interface - :return: interface protobuf - """ - iface_data = self.prefixes.gen_iface(node_id, name, mac) - return wrappers.Interface( - id=iface_id, - name=iface_data.name, - ip4=iface_data.ip4, - ip4_mask=iface_data.ip4_mask, - ip6=iface_data.ip6, - ip6_mask=iface_data.ip6_mask, - mac=iface_data.mac, - ) - - -def throughput_listener( - stream: Any, handler: Callable[[wrappers.ThroughputsEvent], None] -) -> None: - """ - Listen for throughput events and provide them to the handler. - - :param stream: grpc stream that will provide events - :param handler: function that handles an event - :return: nothing - """ - try: - for event_proto in stream: - event = wrappers.ThroughputsEvent.from_proto(event_proto) - handler(event) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.CANCELLED: - logger.debug("throughput stream closed") - else: - logger.exception("throughput stream error") - - -def cpu_listener( - stream: Any, handler: Callable[[wrappers.CpuUsageEvent], None] -) -> None: - """ - Listen for cpu events and provide them to the handler. - - :param stream: grpc stream that will provide events - :param handler: function that handles an event - :return: nothing - """ - try: - for event_proto in stream: - event = wrappers.CpuUsageEvent.from_proto(event_proto) - handler(event) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.CANCELLED: - logger.debug("cpu stream closed") - else: - logger.exception("cpu stream error") - - -def event_listener(stream: Any, handler: Callable[[wrappers.Event], None]) -> None: - """ - Listen for session events and provide them to the handler. - - :param stream: grpc stream that will provide events - :param handler: function that handles an event - :return: nothing - """ - try: - for event_proto in stream: - event = wrappers.Event.from_proto(event_proto) - handler(event) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.CANCELLED: - logger.debug("session stream closed") - else: - logger.exception("session stream error") - - -class CoreGrpcClient: - """ - Provides convenience methods for interfacing with the CORE grpc server. - """ - - def __init__(self, address: str = "localhost:50051", proxy: bool = False) -> None: - """ - Creates a CoreGrpcClient instance. - - :param address: grpc server address to connect to - """ - self.address: str = address - self.stub: Optional[core_pb2_grpc.CoreApiStub] = None - self.channel: Optional[grpc.Channel] = None - self.proxy: bool = proxy - - def start_session( - self, - session: wrappers.Session, - asymmetric_links: List[wrappers.Link] = None, - definition: bool = False, - ) -> Tuple[bool, List[str]]: - """ - Start a session. - - :param session: session to start - :param asymmetric_links: link configuration for asymmetric links - :param definition: True to only define session data, False to start session - :return: tuple of result and exception strings - """ - nodes = [x.to_proto() for x in session.nodes.values()] - links = [x.to_proto() for x in session.links] - if asymmetric_links: - asymmetric_links = [x.to_proto() for x in asymmetric_links] - hooks = [x.to_proto() for x in session.hooks.values()] - emane_config = {k: v.value for k, v in session.emane_config.items()} - emane_model_configs = [] - mobility_configs = [] - wlan_configs = [] - service_configs = [] - service_file_configs = [] - config_service_configs = [] - for node in session.nodes.values(): - for key, config in node.emane_model_configs.items(): - model, iface_id = key - config = wrappers.ConfigOption.to_dict(config) - if iface_id is None: - iface_id = -1 - emane_model_config = emane_pb2.EmaneModelConfig( - node_id=node.id, iface_id=iface_id, model=model, config=config - ) - emane_model_configs.append(emane_model_config) - if node.wlan_config: - config = wrappers.ConfigOption.to_dict(node.wlan_config) - wlan_config = wlan_pb2.WlanConfig(node_id=node.id, config=config) - wlan_configs.append(wlan_config) - if node.mobility_config: - config = wrappers.ConfigOption.to_dict(node.mobility_config) - mobility_config = mobility_pb2.MobilityConfig( - node_id=node.id, config=config - ) - mobility_configs.append(mobility_config) - for name, config in node.service_configs.items(): - service_config = services_pb2.ServiceConfig( - node_id=node.id, - service=name, - directories=config.dirs, - files=config.configs, - startup=config.startup, - validate=config.validate, - shutdown=config.shutdown, - ) - service_configs.append(service_config) - for service, file_configs in node.service_file_configs.items(): - for file, data in file_configs.items(): - service_file_config = services_pb2.ServiceFileConfig( - node_id=node.id, service=service, file=file, data=data - ) - service_file_configs.append(service_file_config) - for name, service_config in node.config_service_configs.items(): - config_service_config = configservices_pb2.ConfigServiceConfig( - node_id=node.id, - name=name, - templates=service_config.templates, - config=service_config.config, - ) - config_service_configs.append(config_service_config) - options = {k: v.value for k, v in session.options.items()} - request = core_pb2.StartSessionRequest( - session_id=session.id, - nodes=nodes, - links=links, - location=session.location.to_proto(), - hooks=hooks, - emane_config=emane_config, - emane_model_configs=emane_model_configs, - wlan_configs=wlan_configs, - mobility_configs=mobility_configs, - service_configs=service_configs, - service_file_configs=service_file_configs, - asymmetric_links=asymmetric_links, - config_service_configs=config_service_configs, - options=options, - user=session.user, - definition=definition, - metadata=session.metadata, - ) - response = self.stub.StartSession(request) - return response.result, list(response.exceptions) - - def stop_session(self, session_id: int) -> bool: - """ - Stop a running session. - - :param session_id: id of session - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.StopSessionRequest(session_id=session_id) - response = self.stub.StopSession(request) - return response.result - - def add_session(self, session_id: int = None) -> wrappers.Session: - session_id = self.create_session(session_id) - return self.get_session(session_id) - - def create_session(self, session_id: int = None) -> int: - """ - Create a session. - - :param session_id: id for session, default is None and one will be created - for you - :return: session id - """ - request = core_pb2.CreateSessionRequest(session_id=session_id) - response = self.stub.CreateSession(request) - return response.session_id - - def delete_session(self, session_id: int) -> bool: - """ - Delete a session. - - :param session_id: id of session - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.DeleteSessionRequest(session_id=session_id) - response = self.stub.DeleteSession(request) - return response.result - - def get_sessions(self) -> List[wrappers.SessionSummary]: - """ - Retrieves all currently known sessions. - - :return: response with a list of currently known session, their state and - number of nodes - """ - response = self.stub.GetSessions(core_pb2.GetSessionsRequest()) - sessions = [] - for session_proto in response.sessions: - session = wrappers.SessionSummary.from_proto(session_proto) - sessions.append(session) - return sessions - - def check_session(self, session_id: int) -> bool: - """ - Check if a session exists. - - :param session_id: id of session to check for - :return: True if exists, False otherwise - """ - request = core_pb2.CheckSessionRequest(session_id=session_id) - response = self.stub.CheckSession(request) - return response.result - - def get_session(self, session_id: int) -> wrappers.Session: - """ - Retrieve a session. - - :param session_id: id of session - :return: session - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.GetSessionRequest(session_id=session_id) - response = self.stub.GetSession(request) - return wrappers.Session.from_proto(response.session) - - def set_session_state(self, session_id: int, state: wrappers.SessionState) -> bool: - """ - Set session state. - - :param session_id: id of session - :param state: session state to transition to - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.SetSessionStateRequest( - session_id=session_id, state=state.value - ) - response = self.stub.SetSessionState(request) - return response.result - - def add_session_server(self, session_id: int, name: str, host: str) -> bool: - """ - Add distributed session server. - - :param session_id: id of session - :param name: name of server to add - :param host: host address to connect to - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.AddSessionServerRequest( - session_id=session_id, name=name, host=host - ) - response = self.stub.AddSessionServer(request) - return response.result - - def alert( - self, - session_id: int, - level: wrappers.ExceptionLevel, - source: str, - text: str, - node_id: int = None, - ) -> bool: - """ - Initiate an alert to be broadcast out to all listeners. - - :param session_id: id of session - :param level: alert level - :param source: source of alert - :param text: alert text - :param node_id: node associated with alert - :return: True for success, False otherwise - """ - request = core_pb2.SessionAlertRequest( - session_id=session_id, - level=level.value, - source=source, - text=text, - node_id=node_id, - ) - response = self.stub.SessionAlert(request) - return response.result - - def events( - self, - session_id: int, - handler: Callable[[wrappers.Event], None], - events: List[wrappers.EventType] = None, - ) -> grpc.Future: - """ - Listen for session events. - - :param session_id: id of session - :param handler: handler for received events - :param events: events to listen to, defaults to all - :return: stream processing events, can be used to cancel stream - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.EventsRequest(session_id=session_id, events=events) - stream = self.stub.Events(request) - thread = threading.Thread( - target=event_listener, args=(stream, handler), daemon=True - ) - thread.start() - return stream - - def throughputs( - self, session_id: int, handler: Callable[[wrappers.ThroughputsEvent], None] - ) -> grpc.Future: - """ - Listen for throughput events with information for interfaces and bridges. - - :param session_id: session id - :param handler: handler for every event - :return: stream processing events, can be used to cancel stream - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.ThroughputsRequest(session_id=session_id) - stream = self.stub.Throughputs(request) - thread = threading.Thread( - target=throughput_listener, args=(stream, handler), daemon=True - ) - thread.start() - return stream - - def cpu_usage( - self, delay: int, handler: Callable[[wrappers.CpuUsageEvent], None] - ) -> grpc.Future: - """ - Listen for cpu usage events with the given repeat delay. - - :param delay: delay between receiving events - :param handler: handler for every event - :return: stream processing events, can be used to cancel stream - """ - request = core_pb2.CpuUsageRequest(delay=delay) - stream = self.stub.CpuUsage(request) - thread = threading.Thread( - target=cpu_listener, args=(stream, handler), daemon=True - ) - thread.start() - return stream - - def add_node(self, session_id: int, node: wrappers.Node, source: str = None) -> int: - """ - Add node to session. - - :param session_id: session id - :param node: node to add - :param source: source application - :return: id of added node - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.AddNodeRequest( - session_id=session_id, node=node.to_proto(), source=source - ) - response = self.stub.AddNode(request) - return response.node_id - - def get_node( - self, session_id: int, node_id: int - ) -> Tuple[wrappers.Node, List[wrappers.Interface], List[wrappers.Link]]: - """ - Get node details. - - :param session_id: session id - :param node_id: node id - :return: tuple of node and its interfaces - :raises grpc.RpcError: when session or node doesn't exist - """ - request = core_pb2.GetNodeRequest(session_id=session_id, node_id=node_id) - response = self.stub.GetNode(request) - node = wrappers.Node.from_proto(response.node) - ifaces = [] - for iface_proto in response.ifaces: - iface = wrappers.Interface.from_proto(iface_proto) - ifaces.append(iface) - links = [] - for link_proto in response.links: - link = wrappers.Link.from_proto(link_proto) - links.append(link) - return node, ifaces, links - - def edit_node( - self, - session_id: int, - node_id: int, - position: wrappers.Position = None, - icon: str = None, - geo: wrappers.Geo = None, - source: str = None, - ) -> bool: - """ - Edit a node's icon and/or location, can only use position(x,y) or - geo(lon, lat, alt), not both. - - :param session_id: session id - :param node_id: node id - :param position: x,y location for node - :param icon: path to icon for gui to use for node - :param geo: lon,lat,alt location for node - :param source: application source - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - if position and geo: - raise CoreError("cannot edit position and geo at same time") - position_proto = position.to_proto() if position else None - geo_proto = geo.to_proto() if geo else None - request = core_pb2.EditNodeRequest( - session_id=session_id, - node_id=node_id, - position=position_proto, - icon=icon, - source=source, - geo=geo_proto, - ) - response = self.stub.EditNode(request) - return response.result - - def move_nodes(self, streamer: MoveNodesStreamer) -> None: - """ - Stream node movements using the provided iterator. - - :param streamer: move nodes streamer - :return: nothing - :raises grpc.RpcError: when session or nodes do not exist - """ - self.stub.MoveNodes(streamer.iter()) - - def delete_node(self, session_id: int, node_id: int, source: str = None) -> bool: - """ - Delete node from session. - - :param session_id: session id - :param node_id: node id - :param source: application source - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.DeleteNodeRequest( - session_id=session_id, node_id=node_id, source=source - ) - response = self.stub.DeleteNode(request) - return response.result - - def node_command( - self, - session_id: int, - node_id: int, - command: str, - wait: bool = True, - shell: bool = False, - ) -> Tuple[int, str]: - """ - Send command to a node and get the output. - - :param session_id: session id - :param node_id: node id - :param command: command to run on node - :param wait: wait for command to complete - :param shell: send shell command - :return: returns tuple of return code and output - :raises grpc.RpcError: when session or node doesn't exist - """ - request = core_pb2.NodeCommandRequest( - session_id=session_id, - node_id=node_id, - command=command, - wait=wait, - shell=shell, - ) - response = self.stub.NodeCommand(request) - return response.return_code, response.output - - def get_node_terminal(self, session_id: int, node_id: int) -> str: - """ - Retrieve terminal command string for launching a local terminal. - - :param session_id: session id - :param node_id: node id - :return: node terminal - :raises grpc.RpcError: when session or node doesn't exist - """ - request = core_pb2.GetNodeTerminalRequest( - session_id=session_id, node_id=node_id - ) - response = self.stub.GetNodeTerminal(request) - return response.terminal - - def get_node_links(self, session_id: int, node_id: int) -> List[wrappers.Link]: - """ - Get current links for a node. - - :param session_id: session id - :param node_id: node id - :return: list of links - :raises grpc.RpcError: when session or node doesn't exist - """ - request = core_pb2.GetNodeLinksRequest(session_id=session_id, node_id=node_id) - response = self.stub.GetNodeLinks(request) - links = [] - for link_proto in response.links: - link = wrappers.Link.from_proto(link_proto) - links.append(link) - return links - - def add_link( - self, session_id: int, link: wrappers.Link, source: str = None - ) -> Tuple[bool, wrappers.Interface, wrappers.Interface]: - """ - Add a link between nodes. - - :param session_id: session id - :param link: link to add - :param source: application source - :return: tuple of result and finalized interface values - :raises grpc.RpcError: when session or one of the nodes don't exist - """ - request = core_pb2.AddLinkRequest( - session_id=session_id, link=link.to_proto(), source=source - ) - response = self.stub.AddLink(request) - iface1 = wrappers.Interface.from_proto(response.iface1) - iface2 = wrappers.Interface.from_proto(response.iface2) - return response.result, iface1, iface2 - - def edit_link( - self, session_id: int, link: wrappers.Link, source: str = None - ) -> bool: - """ - Edit a link between nodes. - - :param session_id: session id - :param link: link to edit - :param source: application source - :return: response with result of success or failure - :raises grpc.RpcError: when session or one of the nodes don't exist - """ - iface1_id = link.iface1.id if link.iface1 else None - iface2_id = link.iface2.id if link.iface2 else None - request = core_pb2.EditLinkRequest( - session_id=session_id, - node1_id=link.node1_id, - node2_id=link.node2_id, - options=link.options.to_proto(), - iface1_id=iface1_id, - iface2_id=iface2_id, - source=source, - ) - response = self.stub.EditLink(request) - return response.result - - def delete_link( - self, session_id: int, link: wrappers.Link, source: str = None - ) -> bool: - """ - Delete a link between nodes. - - :param session_id: session id - :param link: link to delete - :param source: application source - :return: response with result of success or failure - :raises grpc.RpcError: when session doesn't exist - """ - iface1_id = link.iface1.id if link.iface1 else None - iface2_id = link.iface2.id if link.iface2 else None - request = core_pb2.DeleteLinkRequest( - session_id=session_id, - node1_id=link.node1_id, - node2_id=link.node2_id, - iface1_id=iface1_id, - iface2_id=iface2_id, - source=source, - ) - response = self.stub.DeleteLink(request) - return response.result - - def get_mobility_configs( - self, session_id: int - ) -> Dict[int, Dict[str, wrappers.ConfigOption]]: - """ - Get all mobility configurations. - - :param session_id: session id - :return: dict of node id to mobility configuration dict - :raises grpc.RpcError: when session doesn't exist - """ - request = GetMobilityConfigsRequest(session_id=session_id) - response = self.stub.GetMobilityConfigs(request) - configs = {} - for node_id, mapped_config in response.configs.items(): - configs[node_id] = wrappers.ConfigOption.from_dict(mapped_config.config) - return configs - - def get_mobility_config( - self, session_id: int, node_id: int - ) -> Dict[str, wrappers.ConfigOption]: - """ - Get mobility configuration for a node. - - :param session_id: session id - :param node_id: node id - :return: dict of config name to options - :raises grpc.RpcError: when session or node doesn't exist - """ - request = GetMobilityConfigRequest(session_id=session_id, node_id=node_id) - response = self.stub.GetMobilityConfig(request) - return wrappers.ConfigOption.from_dict(response.config) - - def set_mobility_config( - self, session_id: int, node_id: int, config: Dict[str, str] - ) -> bool: - """ - Set mobility configuration for a node. - - :param session_id: session id - :param node_id: node id - :param config: mobility configuration - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - mobility_config = MobilityConfig(node_id=node_id, config=config) - request = SetMobilityConfigRequest( - session_id=session_id, mobility_config=mobility_config - ) - response = self.stub.SetMobilityConfig(request) - return response.result - - def mobility_action( - self, session_id: int, node_id: int, action: wrappers.MobilityAction - ) -> bool: - """ - Send a mobility action for a node. - - :param session_id: session id - :param node_id: node id - :param action: action to take - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - request = MobilityActionRequest( - session_id=session_id, node_id=node_id, action=action.value - ) - response = self.stub.MobilityAction(request) - return response.result - - def get_services(self) -> List[wrappers.Service]: - """ - Get all currently loaded services. - - :return: list of services, name and groups only - """ - request = GetServicesRequest() - response = self.stub.GetServices(request) - services = [] - for service_proto in response.services: - service = wrappers.Service.from_proto(service_proto) - services.append(service) - return services - - def get_service_defaults(self, session_id: int) -> List[wrappers.ServiceDefault]: - """ - Get default services for different default node models. - - :param session_id: session id - :return: list of service defaults - :raises grpc.RpcError: when session doesn't exist - """ - request = GetServiceDefaultsRequest(session_id=session_id) - response = self.stub.GetServiceDefaults(request) - defaults = [] - for default_proto in response.defaults: - default = wrappers.ServiceDefault.from_proto(default_proto) - defaults.append(default) - return defaults - - def set_service_defaults( - self, session_id: int, service_defaults: Dict[str, List[str]] - ) -> bool: - """ - Set default services for node models. - - :param session_id: session id - :param service_defaults: node models to lists of services - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - defaults = [] - for node_type in service_defaults: - services = service_defaults[node_type] - default = ServiceDefaults(node_type=node_type, services=services) - defaults.append(default) - request = SetServiceDefaultsRequest(session_id=session_id, defaults=defaults) - response = self.stub.SetServiceDefaults(request) - return response.result - - def get_node_service_configs( - self, session_id: int - ) -> List[wrappers.NodeServiceConfig]: - """ - Get service data for a node. - - :param session_id: session id - :return: list of node service data - :raises grpc.RpcError: when session doesn't exist - """ - request = GetNodeServiceConfigsRequest(session_id=session_id) - response = self.stub.GetNodeServiceConfigs(request) - node_services = [] - for config in response.configs: - node_service = wrappers.NodeServiceConfig.from_proto(config) - node_services.append(node_service) - return node_services - - def get_node_service( - self, session_id: int, node_id: int, service: str - ) -> wrappers.NodeServiceData: - """ - Get service data for a node. - - :param session_id: session id - :param node_id: node id - :param service: service name - :return: node service data - :raises grpc.RpcError: when session or node doesn't exist - """ - request = GetNodeServiceRequest( - session_id=session_id, node_id=node_id, service=service - ) - response = self.stub.GetNodeService(request) - return wrappers.NodeServiceData.from_proto(response.service) - - def get_node_service_file( - self, session_id: int, node_id: int, service: str, file_name: str - ) -> str: - """ - Get a service file for a node. - - :param session_id: session id - :param node_id: node id - :param service: service name - :param file_name: file name to get data for - :return: file data - :raises grpc.RpcError: when session or node doesn't exist - """ - request = GetNodeServiceFileRequest( - session_id=session_id, node_id=node_id, service=service, file=file_name - ) - response = self.stub.GetNodeServiceFile(request) - return response.data - - def set_node_service( - self, session_id: int, service_config: wrappers.ServiceConfig - ) -> bool: - """ - Set service data for a node. - - :param session_id: session id - :param service_config: service configuration for a node - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - request = SetNodeServiceRequest( - session_id=session_id, config=service_config.to_proto() - ) - response = self.stub.SetNodeService(request) - return response.result - - def set_node_service_file( - self, session_id: int, service_file_config: wrappers.ServiceFileConfig - ) -> bool: - """ - Set a service file for a node. - - :param session_id: session id - :param service_file_config: configuration to set - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - config = service_file_config.to_proto() - request = SetNodeServiceFileRequest(session_id=session_id, config=config) - response = self.stub.SetNodeServiceFile(request) - return response.result - - def service_action( - self, - session_id: int, - node_id: int, - service: str, - action: wrappers.ServiceAction, - ) -> bool: - """ - Send an action to a service for a node. - - :param session_id: session id - :param node_id: node id - :param service: service name - :param action: action for service (start, stop, restart, - validate) - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - request = ServiceActionRequest( - session_id=session_id, node_id=node_id, service=service, action=action.value - ) - response = self.stub.ServiceAction(request) - return response.result - - def get_wlan_configs( - self, session_id: int - ) -> Dict[int, Dict[str, wrappers.ConfigOption]]: - """ - Get all wlan configurations. - - :param session_id: session id - :return: dict of node ids to dict of names to options - :raises grpc.RpcError: when session doesn't exist - """ - request = GetWlanConfigsRequest(session_id=session_id) - response = self.stub.GetWlanConfigs(request) - configs = {} - for node_id, mapped_config in response.configs.items(): - configs[node_id] = wrappers.ConfigOption.from_dict(mapped_config.config) - return configs - - def get_wlan_config( - self, session_id: int, node_id: int - ) -> Dict[str, wrappers.ConfigOption]: - """ - Get wlan configuration for a node. - - :param session_id: session id - :param node_id: node id - :return: dict of names to options - :raises grpc.RpcError: when session doesn't exist - """ - request = GetWlanConfigRequest(session_id=session_id, node_id=node_id) - response = self.stub.GetWlanConfig(request) - return wrappers.ConfigOption.from_dict(response.config) - - def set_wlan_config( - self, session_id: int, node_id: int, config: Dict[str, str] - ) -> bool: - """ - Set wlan configuration for a node. - - :param session_id: session id - :param node_id: node id - :param config: wlan configuration - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - wlan_config = WlanConfig(node_id=node_id, config=config) - request = SetWlanConfigRequest(session_id=session_id, wlan_config=wlan_config) - response = self.stub.SetWlanConfig(request) - return response.result - - def get_emane_config(self, session_id: int) -> Dict[str, wrappers.ConfigOption]: - """ - Get session emane configuration. - - :param session_id: session id - :return: response with a list of configuration groups - :raises grpc.RpcError: when session doesn't exist - """ - request = GetEmaneConfigRequest(session_id=session_id) - response = self.stub.GetEmaneConfig(request) - return wrappers.ConfigOption.from_dict(response.config) - - def set_emane_config(self, session_id: int, config: Dict[str, str]) -> bool: - """ - Set session emane configuration. - - :param session_id: session id - :param config: emane configuration - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = SetEmaneConfigRequest(session_id=session_id, config=config) - response = self.stub.SetEmaneConfig(request) - return response.result - - def get_emane_models(self, session_id: int) -> List[str]: - """ - Get session emane models. - - :param session_id: session id - :return: list of emane models - :raises grpc.RpcError: when session doesn't exist - """ - request = GetEmaneModelsRequest(session_id=session_id) - response = self.stub.GetEmaneModels(request) - return list(response.models) - - def get_emane_model_config( - self, session_id: int, node_id: int, model: str, iface_id: int = -1 - ) -> Dict[str, wrappers.ConfigOption]: - """ - Get emane model configuration for a node or a node's interface. - - :param session_id: session id - :param node_id: node id - :param model: emane model name - :param iface_id: node interface id - :return: dict of names to options - :raises grpc.RpcError: when session doesn't exist - """ - request = GetEmaneModelConfigRequest( - session_id=session_id, node_id=node_id, model=model, iface_id=iface_id - ) - response = self.stub.GetEmaneModelConfig(request) - return wrappers.ConfigOption.from_dict(response.config) - - def set_emane_model_config( - self, session_id: int, emane_model_config: wrappers.EmaneModelConfig - ) -> bool: - """ - Set emane model configuration for a node or a node's interface. - - :param session_id: session id - :param emane_model_config: emane model config to set - :return: True for success, False otherwise - :raises grpc.RpcError: when session doesn't exist - """ - request = SetEmaneModelConfigRequest( - session_id=session_id, emane_model_config=emane_model_config.to_proto() - ) - response = self.stub.SetEmaneModelConfig(request) - return response.result - - def get_emane_model_configs( - self, session_id: int - ) -> List[wrappers.EmaneModelConfig]: - """ - Get all EMANE model configurations for a session. - - :param session_id: session to get emane model configs - :return: list of emane model configs - :raises grpc.RpcError: when session doesn't exist - """ - request = GetEmaneModelConfigsRequest(session_id=session_id) - response = self.stub.GetEmaneModelConfigs(request) - configs = [] - for config_proto in response.configs: - config = wrappers.EmaneModelConfig.from_proto(config_proto) - configs.append(config) - return configs - - def save_xml(self, session_id: int, file_path: str) -> None: - """ - Save the current scenario to an XML file. - - :param session_id: session to save xml file for - :param file_path: local path to save scenario XML file to - :return: nothing - :raises grpc.RpcError: when session doesn't exist - """ - request = core_pb2.SaveXmlRequest(session_id=session_id) - response = self.stub.SaveXml(request) - with open(file_path, "w") as xml_file: - xml_file.write(response.data) - - def open_xml(self, file_path: Path, start: bool = False) -> Tuple[bool, int]: - """ - Load a local scenario XML file to open as a new session. - - :param file_path: path of scenario XML file - :param start: tuple of result and session id when successful - :return: tuple of result and session id - """ - with file_path.open("r") as f: - data = f.read() - request = core_pb2.OpenXmlRequest(data=data, start=start, file=str(file_path)) - response = self.stub.OpenXml(request) - return response.result, response.session_id - - def emane_link(self, session_id: int, nem1: int, nem2: int, linked: bool) -> bool: - """ - Helps broadcast wireless link/unlink between EMANE nodes. - - :param session_id: session to emane link - :param nem1: first nem for emane link - :param nem2: second nem for emane link - :param linked: True to link, False to unlink - :return: True for success, False otherwise - :raises grpc.RpcError: when session or nodes related to nems do not exist - """ - request = EmaneLinkRequest( - session_id=session_id, nem1=nem1, nem2=nem2, linked=linked - ) - response = self.stub.EmaneLink(request) - return response.result - - def get_ifaces(self) -> List[str]: - """ - Retrieves a list of interfaces available on the host machine that are not - a part of a CORE session. - - :return: list of interfaces - """ - request = core_pb2.GetInterfacesRequest() - response = self.stub.GetInterfaces(request) - return list(response.ifaces) - - def get_config_services(self) -> List[wrappers.ConfigService]: - """ - Retrieve all known config services. - - :return: list of config services - """ - request = GetConfigServicesRequest() - response = self.stub.GetConfigServices(request) - services = [] - for service_proto in response.services: - service = wrappers.ConfigService.from_proto(service_proto) - services.append(service) - return services - - def get_config_service_defaults(self, name: str) -> wrappers.ConfigServiceDefaults: - """ - Retrieves config service default values. - - :param name: name of service to get defaults for - :return: config service defaults - """ - request = GetConfigServiceDefaultsRequest(name=name) - response = self.stub.GetConfigServiceDefaults(request) - return wrappers.ConfigServiceDefaults.from_proto(response) - - def get_node_config_service_configs( - self, session_id: int - ) -> List[wrappers.ConfigServiceConfig]: - """ - Retrieves all node config service configurations for a session. - - :param session_id: session to get config service configurations for - :return: list of node config service configs - :raises grpc.RpcError: when session doesn't exist - """ - request = GetNodeConfigServiceConfigsRequest(session_id=session_id) - response = self.stub.GetNodeConfigServiceConfigs(request) - configs = [] - for config_proto in response.configs: - config = wrappers.ConfigServiceConfig.from_proto(config_proto) - configs.append(config) - return configs - - def get_node_config_service( - self, session_id: int, node_id: int, name: str - ) -> Dict[str, str]: - """ - Retrieves information for a specific config service on a node. - - :param session_id: session node belongs to - :param node_id: id of node to get service information from - :param name: name of service - :return: config dict of names to values - :raises grpc.RpcError: when session or node doesn't exist - """ - request = GetNodeConfigServiceRequest( - session_id=session_id, node_id=node_id, name=name - ) - response = self.stub.GetNodeConfigService(request) - return dict(response.config) - - def get_node_config_services(self, session_id: int, node_id: int) -> List[str]: - """ - Retrieves the config services currently assigned to a node. - - :param session_id: session node belongs to - :param node_id: id of node to get config services for - :return: list of config services - :raises grpc.RpcError: when session or node doesn't exist - """ - request = GetNodeConfigServicesRequest(session_id=session_id, node_id=node_id) - response = self.stub.GetNodeConfigServices(request) - return list(response.services) - - def set_node_config_service( - self, session_id: int, node_id: int, name: str, config: Dict[str, str] - ) -> bool: - """ - Assigns a config service to a node with the provided configuration. - - :param session_id: session node belongs to - :param node_id: id of node to assign config service to - :param name: name of service - :param config: service configuration - :return: True for success, False otherwise - :raises grpc.RpcError: when session or node doesn't exist - """ - request = SetNodeConfigServiceRequest( - session_id=session_id, node_id=node_id, name=name, config=config - ) - response = self.stub.SetNodeConfigService(request) - return response.result - - def get_emane_event_channel(self, session_id: int) -> wrappers.EmaneEventChannel: - """ - Retrieves the current emane event channel being used for a session. - - :param session_id: session to get emane event channel for - :return: emane event channel - :raises grpc.RpcError: when session doesn't exist - """ - request = GetEmaneEventChannelRequest(session_id=session_id) - response = self.stub.GetEmaneEventChannel(request) - return wrappers.EmaneEventChannel.from_proto(response) - - def execute_script(self, script: str) -> Optional[int]: - """ - Executes a python script given context of the current CoreEmu object. - - :param script: script to execute - :return: create session id for script executed - """ - request = ExecuteScriptRequest(script=script) - response = self.stub.ExecuteScript(request) - return response.session_id if response.session_id else None - - def wlan_link( - self, session_id: int, wlan_id: int, node1_id: int, node2_id: int, linked: bool - ) -> bool: - """ - Links/unlinks nodes on the same WLAN. - - :param session_id: session id containing wlan and nodes - :param wlan_id: wlan nodes must belong to - :param node1_id: first node of pair to link/unlink - :param node2_id: second node of pair to link/unlin - :param linked: True to link, False to unlink - :return: True for success, False otherwise - :raises grpc.RpcError: when session or one of the nodes do not exist - """ - request = WlanLinkRequest( - session_id=session_id, - wlan=wlan_id, - node1_id=node1_id, - node2_id=node2_id, - linked=linked, - ) - response = self.stub.WlanLink(request) - return response.result - - def emane_pathlosses(self, streamer: EmanePathlossesStreamer) -> None: - """ - Stream EMANE pathloss events. - - :param streamer: emane pathlosses streamer - :return: nothing - :raises grpc.RpcError: when a pathloss event session or one of the nodes do not - exist - """ - self.stub.EmanePathlosses(streamer.iter()) - - def connect(self) -> None: - """ - Open connection to server, must be closed manually. - - :return: nothing - """ - self.channel = grpc.insecure_channel( - self.address, options=[("grpc.enable_http_proxy", self.proxy)] - ) - self.stub = core_pb2_grpc.CoreApiStub(self.channel) - - def close(self) -> None: - """ - Close currently opened server channel connection. - - :return: nothing - """ - if self.channel: - self.channel.close() - self.channel = None - - @contextmanager - def context_connect(self) -> Generator: - """ - Makes a context manager based connection to the server, will close after - context ends. - - :return: nothing - """ - try: - self.connect() - yield - finally: - self.close() diff --git a/daemon/core/gui/coreclient.py b/daemon/core/gui/coreclient.py index a9396517..2cdf31d0 100644 --- a/daemon/core/gui/coreclient.py +++ b/daemon/core/gui/coreclient.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple import grpc -from core.api.grpc import clientw, configservices_pb2, core_pb2 +from core.api.grpc import client, configservices_pb2, core_pb2 from core.api.grpc.wrappers import ( ConfigOption, ConfigService, @@ -66,7 +66,7 @@ class CoreClient: """ self.app: "Application" = app self.master: tk.Tk = app.master - self._client: clientw.CoreGrpcClient = clientw.CoreGrpcClient(proxy=proxy) + self._client: client.CoreGrpcClient = client.CoreGrpcClient(proxy=proxy) self.session: Optional[Session] = None self.user = getpass.getuser() @@ -95,7 +95,7 @@ class CoreClient: self.handling_events: Optional[grpc.Future] = None @property - def client(self) -> clientw.CoreGrpcClient: + def client(self) -> client.CoreGrpcClient: if self.session: if not self._client.check_session(self.session.id): throughputs_enabled = self.handling_throughputs is not None diff --git a/daemon/examples/grpc/distributed_switch.py b/daemon/examples/grpc/distributed_switch.py index f9534b41..6503abbf 100644 --- a/daemon/examples/grpc/distributed_switch.py +++ b/daemon/examples/grpc/distributed_switch.py @@ -1,7 +1,7 @@ import argparse import logging -from core.api.grpc import clientw +from core.api.grpc import client from core.api.grpc.wrappers import NodeType, Position @@ -11,10 +11,10 @@ def log_event(event): def main(args): # helper to create interfaces - interface_helper = clientw.InterfaceHelper(ip4_prefix="10.83.0.0/16") + interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16") # create grpc client and connect - core = clientw.CoreGrpcClient() + core = client.CoreGrpcClient() core.connect() # create session diff --git a/daemon/examples/grpc/emane80211.py b/daemon/examples/grpc/emane80211.py index c6fba6c8..edb1f529 100644 --- a/daemon/examples/grpc/emane80211.py +++ b/daemon/examples/grpc/emane80211.py @@ -1,13 +1,13 @@ # required imports -from core.api.grpc import clientw +from core.api.grpc import client from core.api.grpc.wrappers import NodeType, Position from core.emane.ieee80211abg import EmaneIeee80211abgModel # interface helper -iface_helper = clientw.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") +iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") # create grpc client and connect -core = clientw.CoreGrpcClient() +core = client.CoreGrpcClient() core.connect() # add session diff --git a/daemon/examples/grpc/peertopeer.py b/daemon/examples/grpc/peertopeer.py index 8c1b47a1..730128eb 100644 --- a/daemon/examples/grpc/peertopeer.py +++ b/daemon/examples/grpc/peertopeer.py @@ -1,11 +1,11 @@ -from core.api.grpc import clientw +from core.api.grpc import client from core.api.grpc.wrappers import Position # interface helper -iface_helper = clientw.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") +iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") # create grpc client and connect -core = clientw.CoreGrpcClient() +core = client.CoreGrpcClient() core.connect() # add session diff --git a/daemon/examples/grpc/switch.py b/daemon/examples/grpc/switch.py index 0a5be8a1..ce8e2622 100644 --- a/daemon/examples/grpc/switch.py +++ b/daemon/examples/grpc/switch.py @@ -1,11 +1,11 @@ -from core.api.grpc import clientw +from core.api.grpc import client from core.api.grpc.wrappers import NodeType, Position # interface helper -iface_helper = clientw.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") +iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") # create grpc client and connect -core = clientw.CoreGrpcClient() +core = client.CoreGrpcClient() core.connect() # add session diff --git a/daemon/examples/grpc/wlan.py b/daemon/examples/grpc/wlan.py index 86a3856b..561a772b 100644 --- a/daemon/examples/grpc/wlan.py +++ b/daemon/examples/grpc/wlan.py @@ -1,11 +1,11 @@ -from core.api.grpc import clientw +from core.api.grpc import client from core.api.grpc.wrappers import NodeType, Position # interface helper -iface_helper = clientw.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") +iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") # create grpc client and connect -core = clientw.CoreGrpcClient() +core = client.CoreGrpcClient() core.connect() # add session diff --git a/daemon/scripts/core-cli b/daemon/scripts/core-cli index 41083f94..30041188 100755 --- a/daemon/scripts/core-cli +++ b/daemon/scripts/core-cli @@ -15,7 +15,7 @@ import grpc import netaddr from netaddr import EUI, AddrFormatError, IPNetwork -from core.api.grpc.clientw import CoreGrpcClient +from core.api.grpc.client import CoreGrpcClient from core.api.grpc.wrappers import ( Geo, Interface, diff --git a/daemon/scripts/core-imn-to-xml b/daemon/scripts/core-imn-to-xml index 9575fd75..c11533a4 100755 --- a/daemon/scripts/core-imn-to-xml +++ b/daemon/scripts/core-imn-to-xml @@ -5,7 +5,7 @@ import sys from pathlib import Path from core import utils -from core.api.grpc.clientw import CoreGrpcClient +from core.api.grpc.client import CoreGrpcClient from core.errors import CoreCommandError if __name__ == "__main__": diff --git a/daemon/scripts/core-route-monitor b/daemon/scripts/core-route-monitor index 2a18ea9f..bc61f6fa 100755 --- a/daemon/scripts/core-route-monitor +++ b/daemon/scripts/core-route-monitor @@ -15,7 +15,7 @@ from typing import Dict, Tuple import grpc from core import utils -from core.api.grpc.clientw import CoreGrpcClient +from core.api.grpc.client import CoreGrpcClient from core.api.grpc.wrappers import NodeType SDT_HOST = "127.0.0.1" diff --git a/daemon/tests/test_grpc.py b/daemon/tests/test_grpc.py deleted file mode 100644 index 14e971d3..00000000 --- a/daemon/tests/test_grpc.py +++ /dev/null @@ -1,1054 +0,0 @@ -import time -from pathlib import Path -from queue import Queue -from tempfile import TemporaryFile -from typing import Optional - -import grpc -import pytest -from mock import patch - -from core.api.grpc import core_pb2 -from core.api.grpc.client import CoreGrpcClient, InterfaceHelper -from core.api.grpc.emane_pb2 import EmaneModelConfig -from core.api.grpc.mobility_pb2 import MobilityAction, MobilityConfig -from core.api.grpc.server import CoreGrpcServer -from core.api.grpc.services_pb2 import ServiceAction, ServiceConfig, ServiceFileConfig -from core.api.grpc.wlan_pb2 import WlanConfig -from core.api.tlv.dataconversion import ConfigShim -from core.api.tlv.enumerations import ConfigFlags -from core.emane.ieee80211abg import EmaneIeee80211abgModel -from core.emane.nodes import EmaneNet -from core.emulator.data import EventData, IpPrefixes, NodeData, NodeOptions -from core.emulator.enumerations import EventTypes, ExceptionLevels, NodeTypes -from core.errors import CoreError -from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility -from core.nodes.base import CoreNode -from core.nodes.network import SwitchNode, WlanNode -from core.xml.corexml import CoreXmlWriter - - -class TestGrpc: - def test_start_session(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - position = core_pb2.Position(x=50, y=100) - node1 = core_pb2.Node(id=1, position=position, model="PC") - position = core_pb2.Position(x=100, y=100) - node2 = core_pb2.Node(id=2, position=position, model="PC") - position = core_pb2.Position(x=200, y=200) - wlan_node = core_pb2.Node( - id=3, type=NodeTypes.WIRELESS_LAN.value, position=position - ) - nodes = [node1, node2, wlan_node] - iface_helper = InterfaceHelper(ip4_prefix="10.83.0.0/16") - iface1_id = 0 - iface1 = iface_helper.create_iface(node1.id, iface1_id) - iface2_id = 0 - iface2 = iface_helper.create_iface(node2.id, iface2_id) - link = core_pb2.Link( - type=core_pb2.LinkType.WIRED, - node1_id=node1.id, - node2_id=node2.id, - iface1=iface1, - iface2=iface2, - ) - links = [link] - hook = core_pb2.Hook( - state=core_pb2.SessionState.RUNTIME, file="echo.sh", data="echo hello" - ) - hooks = [hook] - location_x = 5 - location_y = 10 - location_z = 15 - location_lat = 20 - location_lon = 30 - location_alt = 40 - location_scale = 5 - location = core_pb2.SessionLocation( - x=location_x, - y=location_y, - z=location_z, - lat=location_lat, - lon=location_lon, - alt=location_alt, - scale=location_scale, - ) - emane_config_key = "platform_id_start" - emane_config_value = "2" - emane_config = {emane_config_key: emane_config_value} - model_node_id = 20 - model_config_key = "bandwidth" - model_config_value = "500000" - model_config = EmaneModelConfig( - node_id=model_node_id, - iface_id=-1, - model=EmaneIeee80211abgModel.name, - config={model_config_key: model_config_value}, - ) - model_configs = [model_config] - wlan_config_key = "range" - wlan_config_value = "333" - wlan_config = WlanConfig( - node_id=wlan_node.id, config={wlan_config_key: wlan_config_value} - ) - wlan_configs = [wlan_config] - mobility_config_key = "refresh_ms" - mobility_config_value = "60" - mobility_config = MobilityConfig( - node_id=wlan_node.id, config={mobility_config_key: mobility_config_value} - ) - mobility_configs = [mobility_config] - service_config = ServiceConfig( - node_id=node1.id, service="DefaultRoute", validate=["echo hello"] - ) - service_configs = [service_config] - service_file_config = ServiceFileConfig( - node_id=node1.id, - service="DefaultRoute", - file="defaultroute.sh", - data="echo hello", - ) - service_file_configs = [service_file_config] - - # when - with patch.object(CoreXmlWriter, "write"): - with client.context_connect(): - client.start_session( - session.id, - nodes, - links, - location, - hooks, - emane_config, - model_configs, - wlan_configs, - mobility_configs, - service_configs, - service_file_configs, - ) - - # then - assert node1.id in session.nodes - assert node2.id in session.nodes - assert wlan_node.id in session.nodes - assert iface1_id in session.nodes[node1.id].ifaces - assert iface2_id in session.nodes[node2.id].ifaces - hook_file, hook_data = session.hooks[EventTypes.RUNTIME_STATE][0] - assert hook_file == hook.file - assert hook_data == hook.data - assert session.location.refxyz == (location_x, location_y, location_z) - assert session.location.refgeo == (location_lat, location_lon, location_alt) - assert session.location.refscale == location_scale - assert session.emane.get_config(emane_config_key) == emane_config_value - set_wlan_config = session.mobility.get_model_config( - wlan_node.id, BasicRangeModel.name - ) - assert set_wlan_config[wlan_config_key] == wlan_config_value - set_mobility_config = session.mobility.get_model_config( - wlan_node.id, Ns2ScriptedMobility.name - ) - assert set_mobility_config[mobility_config_key] == mobility_config_value - set_model_config = session.emane.get_model_config( - model_node_id, EmaneIeee80211abgModel.name - ) - assert set_model_config[model_config_key] == model_config_value - service = session.services.get_service( - node1.id, service_config.service, default_service=True - ) - assert service.validate == tuple(service_config.validate) - service_file = session.services.get_service_file( - node1, service_file_config.service, service_file_config.file - ) - assert service_file.data == service_file_config.data - - @pytest.mark.parametrize("session_id", [None, 6013]) - def test_create_session( - self, grpc_server: CoreGrpcServer, session_id: Optional[int] - ): - # given - client = CoreGrpcClient() - - # when - with client.context_connect(): - response = client.create_session(session_id) - - # then - assert isinstance(response.session_id, int) - assert isinstance(response.state, int) - session = grpc_server.coreemu.sessions.get(response.session_id) - assert session is not None - assert session.state == EventTypes(response.state) - if session_id is not None: - assert response.session_id == session_id - assert session.id == session_id - - @pytest.mark.parametrize("session_id, expected", [(None, True), (6013, False)]) - def test_delete_session( - self, grpc_server: CoreGrpcServer, session_id: Optional[int], expected: bool - ): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - if session_id is None: - session_id = session.id - - # then - with client.context_connect(): - response = client.delete_session(session_id) - - # then - assert response.result is expected - assert grpc_server.coreemu.sessions.get(session_id) is None - - def test_get_session(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.add_node(CoreNode) - session.set_state(EventTypes.DEFINITION_STATE) - - # then - with client.context_connect(): - response = client.get_session(session.id) - - # then - assert response.session.state == core_pb2.SessionState.DEFINITION - assert len(response.session.nodes) == 1 - assert len(response.session.links) == 0 - - def test_get_sessions(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - - # then - with client.context_connect(): - response = client.get_sessions() - - # then - found_session = None - for current_session in response.sessions: - if current_session.id == session.id: - found_session = current_session - break - assert len(response.sessions) == 1 - assert found_session is not None - - def test_set_session_state(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - - # then - with client.context_connect(): - response = client.set_session_state( - session.id, core_pb2.SessionState.DEFINITION - ) - - # then - assert response.result is True - assert session.state == EventTypes.DEFINITION_STATE - - def test_add_node(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - - # then - with client.context_connect(): - node = core_pb2.Node() - response = client.add_node(session.id, node) - - # then - assert response.node_id is not None - assert session.get_node(response.node_id, CoreNode) is not None - - def test_edit_node(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - - # then - x, y = 10, 10 - with client.context_connect(): - position = core_pb2.Position(x=x, y=y) - response = client.edit_node(session.id, node.id, position) - - # then - assert response.result is True - assert node.position.x == x - assert node.position.y == y - - @pytest.mark.parametrize("node_id, expected", [(1, True), (2, False)]) - def test_delete_node( - self, grpc_server: CoreGrpcServer, node_id: int, expected: bool - ): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - - # then - with client.context_connect(): - response = client.delete_node(session.id, node_id) - - # then - assert response.result is expected - if expected is True: - with pytest.raises(CoreError): - assert session.get_node(node.id, CoreNode) - - def test_node_command(self, request, grpc_server: CoreGrpcServer): - if request.config.getoption("mock"): - pytest.skip("mocking calls") - - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.set_state(EventTypes.CONFIGURATION_STATE) - options = NodeOptions(model="Host") - node = session.add_node(CoreNode, options=options) - session.instantiate() - output = "hello world" - - # then - command = f"echo {output}" - with client.context_connect(): - response = client.node_command(session.id, node.id, command) - - # then - assert response.output == output - - def test_get_node_terminal(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.set_state(EventTypes.CONFIGURATION_STATE) - options = NodeOptions(model="Host") - node = session.add_node(CoreNode, options=options) - session.instantiate() - - # then - with client.context_connect(): - response = client.get_node_terminal(session.id, node.id) - - # then - assert response.terminal is not None - - def test_save_xml(self, grpc_server: CoreGrpcServer, tmpdir: TemporaryFile): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - tmp = tmpdir.join("text.xml") - - # then - with client.context_connect(): - client.save_xml(session.id, str(tmp)) - - # then - assert tmp.exists() - - def test_open_xml_hook(self, grpc_server: CoreGrpcServer, tmpdir: TemporaryFile): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - tmp = tmpdir.join("text.xml") - session.save_xml(Path(str(tmp))) - - # then - with client.context_connect(): - response = client.open_xml(str(tmp)) - - # then - assert response.result is True - assert response.session_id is not None - - def test_add_link(self, grpc_server: CoreGrpcServer, iface_helper: InterfaceHelper): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - switch = session.add_node(SwitchNode) - node = session.add_node(CoreNode) - assert len(switch.links()) == 0 - - # then - iface = iface_helper.create_iface(node.id, 0) - with client.context_connect(): - response = client.add_link(session.id, node.id, switch.id, iface) - - # then - assert response.result is True - assert len(switch.links()) == 1 - - def test_add_link_exception( - self, grpc_server: CoreGrpcServer, iface_helper: InterfaceHelper - ): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - - # then - iface = iface_helper.create_iface(node.id, 0) - with pytest.raises(grpc.RpcError): - with client.context_connect(): - client.add_link(session.id, 1, 3, iface) - - def test_edit_link(self, grpc_server: CoreGrpcServer, ip_prefixes: IpPrefixes): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - switch = session.add_node(SwitchNode) - node = session.add_node(CoreNode) - iface = ip_prefixes.create_iface(node) - session.add_link(node.id, switch.id, iface) - options = core_pb2.LinkOptions(bandwidth=30000) - link = switch.links()[0] - assert options.bandwidth != link.options.bandwidth - - # then - with client.context_connect(): - response = client.edit_link( - session.id, node.id, switch.id, options, iface1_id=iface.id - ) - - # then - assert response.result is True - link = switch.links()[0] - assert options.bandwidth == link.options.bandwidth - - def test_delete_link(self, grpc_server: CoreGrpcServer, ip_prefixes: IpPrefixes): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node1 = session.add_node(CoreNode) - iface1 = ip_prefixes.create_iface(node1) - node2 = session.add_node(CoreNode) - iface2 = ip_prefixes.create_iface(node2) - session.add_link(node1.id, node2.id, iface1, iface2) - link_node = None - for node_id in session.nodes: - node = session.nodes[node_id] - if node.id not in {node1.id, node2.id}: - link_node = node - break - assert len(link_node.links()) == 1 - - # then - with client.context_connect(): - response = client.delete_link( - session.id, node1.id, node2.id, iface1.id, iface2.id - ) - - # then - assert response.result is True - assert len(link_node.links()) == 0 - - def test_get_wlan_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - wlan = session.add_node(WlanNode) - - # then - with client.context_connect(): - response = client.get_wlan_config(session.id, wlan.id) - - # then - assert len(response.config) > 0 - - def test_set_wlan_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.set_state(EventTypes.CONFIGURATION_STATE) - wlan = session.add_node(WlanNode) - wlan.setmodel(BasicRangeModel, BasicRangeModel.default_values()) - session.instantiate() - range_key = "range" - range_value = "50" - - # then - with client.context_connect(): - response = client.set_wlan_config( - session.id, - wlan.id, - { - range_key: range_value, - "delay": "0", - "loss": "0", - "bandwidth": "50000", - "error": "0", - "jitter": "0", - }, - ) - - # then - assert response.result is True - config = session.mobility.get_model_config(wlan.id, BasicRangeModel.name) - assert config[range_key] == range_value - assert wlan.model.range == int(range_value) - - def test_get_emane_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - - # then - with client.context_connect(): - response = client.get_emane_config(session.id) - - # then - assert len(response.config) > 0 - - def test_set_emane_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - config_key = "platform_id_start" - config_value = "2" - - # then - with client.context_connect(): - response = client.set_emane_config(session.id, {config_key: config_value}) - - # then - assert response.result is True - config = session.emane.get_configs() - assert len(config) > 1 - assert config[config_key] == config_value - - def test_get_emane_model_configs(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.set_location(47.57917, -122.13232, 2.00000, 1.0) - options = NodeOptions(emane=EmaneIeee80211abgModel.name) - emane_network = session.add_node(EmaneNet, options=options) - session.emane.set_model(emane_network, EmaneIeee80211abgModel) - config_key = "platform_id_start" - config_value = "2" - session.emane.set_model_config( - emane_network.id, EmaneIeee80211abgModel.name, {config_key: config_value} - ) - - # then - with client.context_connect(): - response = client.get_emane_model_configs(session.id) - - # then - assert len(response.configs) == 1 - model_config = response.configs[0] - assert emane_network.id == model_config.node_id - assert model_config.model == EmaneIeee80211abgModel.name - assert len(model_config.config) > 0 - assert model_config.iface_id == -1 - - def test_set_emane_model_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.set_location(47.57917, -122.13232, 2.00000, 1.0) - options = NodeOptions(emane=EmaneIeee80211abgModel.name) - emane_network = session.add_node(EmaneNet, options=options) - session.emane.set_model(emane_network, EmaneIeee80211abgModel) - config_key = "bandwidth" - config_value = "900000" - - # then - with client.context_connect(): - response = client.set_emane_model_config( - session.id, - emane_network.id, - EmaneIeee80211abgModel.name, - {config_key: config_value}, - ) - - # then - assert response.result is True - config = session.emane.get_model_config( - emane_network.id, EmaneIeee80211abgModel.name - ) - assert config[config_key] == config_value - - def test_get_emane_model_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - session.set_location(47.57917, -122.13232, 2.00000, 1.0) - options = NodeOptions(emane=EmaneIeee80211abgModel.name) - emane_network = session.add_node(EmaneNet, options=options) - session.emane.set_model(emane_network, EmaneIeee80211abgModel) - - # then - with client.context_connect(): - response = client.get_emane_model_config( - session.id, emane_network.id, EmaneIeee80211abgModel.name - ) - - # then - assert len(response.config) > 0 - - def test_get_emane_models(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - - # then - with client.context_connect(): - response = client.get_emane_models(session.id) - - # then - assert len(response.models) > 0 - - def test_get_mobility_configs(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - wlan = session.add_node(WlanNode) - session.mobility.set_model_config(wlan.id, Ns2ScriptedMobility.name, {}) - - # then - with client.context_connect(): - response = client.get_mobility_configs(session.id) - - # then - assert len(response.configs) > 0 - assert wlan.id in response.configs - mapped_config = response.configs[wlan.id] - assert len(mapped_config.config) > 0 - - def test_get_mobility_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - wlan = session.add_node(WlanNode) - session.mobility.set_model_config(wlan.id, Ns2ScriptedMobility.name, {}) - - # then - with client.context_connect(): - response = client.get_mobility_config(session.id, wlan.id) - - # then - assert len(response.config) > 0 - - def test_set_mobility_config(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - wlan = session.add_node(WlanNode) - config_key = "refresh_ms" - config_value = "60" - - # then - with client.context_connect(): - response = client.set_mobility_config( - session.id, wlan.id, {config_key: config_value} - ) - - # then - assert response.result is True - config = session.mobility.get_model_config(wlan.id, Ns2ScriptedMobility.name) - assert config[config_key] == config_value - - def test_mobility_action(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - wlan = session.add_node(WlanNode) - session.mobility.set_model_config(wlan.id, Ns2ScriptedMobility.name, {}) - session.instantiate() - - # then - with client.context_connect(): - response = client.mobility_action(session.id, wlan.id, MobilityAction.STOP) - - # then - assert response.result is True - - def test_get_services(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - - # then - with client.context_connect(): - response = client.get_services() - - # then - assert len(response.services) > 0 - - def test_get_service_defaults(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - - # then - with client.context_connect(): - response = client.get_service_defaults(session.id) - - # then - assert len(response.defaults) > 0 - - def test_set_service_defaults(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node_type = "test" - services = ["SSH"] - - # then - with client.context_connect(): - response = client.set_service_defaults(session.id, {node_type: services}) - - # then - assert response.result is True - assert session.services.default_services[node_type] == services - - def test_get_node_service_configs(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - service_name = "DefaultRoute" - session.services.set_service(node.id, service_name) - - # then - with client.context_connect(): - response = client.get_node_service_configs(session.id) - - # then - assert len(response.configs) == 1 - service_config = response.configs[0] - assert service_config.node_id == node.id - assert service_config.service == service_name - - def test_get_node_service(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - - # then - with client.context_connect(): - response = client.get_node_service(session.id, node.id, "DefaultRoute") - - # then - assert len(response.service.configs) > 0 - - def test_get_node_service_file(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - - # then - with client.context_connect(): - response = client.get_node_service_file( - session.id, node.id, "DefaultRoute", "defaultroute.sh" - ) - - # then - assert response.data is not None - - def test_set_node_service(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - service_name = "DefaultRoute" - validate = ["echo hello"] - - # then - with client.context_connect(): - response = client.set_node_service( - session.id, node.id, service_name, validate=validate - ) - - # then - assert response.result is True - service = session.services.get_service( - node.id, service_name, default_service=True - ) - assert service.validate == tuple(validate) - - def test_set_node_service_file(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - service_name = "DefaultRoute" - file_name = "defaultroute.sh" - file_data = "echo hello" - - # then - with client.context_connect(): - response = client.set_node_service_file( - session.id, node.id, service_name, file_name, file_data - ) - - # then - assert response.result is True - service_file = session.services.get_service_file(node, service_name, file_name) - assert service_file.data == file_data - - def test_service_action(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - service_name = "DefaultRoute" - - # then - with client.context_connect(): - response = client.service_action( - session.id, node.id, service_name, ServiceAction.STOP - ) - - # then - assert response.result is True - - def test_node_events(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - node.position.lat = 10.0 - node.position.lon = 20.0 - node.position.alt = 5.0 - queue = Queue() - - def handle_event(event_data): - assert event_data.session_id == session.id - assert event_data.HasField("node_event") - event_node = event_data.node_event.node - assert event_node.geo.lat == node.position.lat - assert event_node.geo.lon == node.position.lon - assert event_node.geo.alt == node.position.alt - queue.put(event_data) - - # then - with client.context_connect(): - client.events(session.id, handle_event) - time.sleep(0.1) - session.broadcast_node(node) - - # then - queue.get(timeout=5) - - def test_link_events(self, grpc_server: CoreGrpcServer, ip_prefixes: IpPrefixes): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - wlan = session.add_node(WlanNode) - node = session.add_node(CoreNode) - iface = ip_prefixes.create_iface(node) - session.add_link(node.id, wlan.id, iface) - link_data = wlan.links()[0] - queue = Queue() - - def handle_event(event_data): - assert event_data.session_id == session.id - assert event_data.HasField("link_event") - queue.put(event_data) - - # then - with client.context_connect(): - client.events(session.id, handle_event) - time.sleep(0.1) - session.broadcast_link(link_data) - - # then - queue.get(timeout=5) - - def test_throughputs(self, request, grpc_server: CoreGrpcServer): - if request.config.getoption("mock"): - pytest.skip("mocking calls") - - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - queue = Queue() - - def handle_event(event_data): - assert event_data.session_id == session.id - queue.put(event_data) - - # then - with client.context_connect(): - client.throughputs(session.id, handle_event) - time.sleep(0.1) - - # then - queue.get(timeout=5) - - def test_session_events(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - queue = Queue() - - def handle_event(event_data): - assert event_data.session_id == session.id - assert event_data.HasField("session_event") - queue.put(event_data) - - # then - with client.context_connect(): - client.events(session.id, handle_event) - time.sleep(0.1) - event = EventData( - event_type=EventTypes.RUNTIME_STATE, time=str(time.monotonic()) - ) - session.broadcast_event(event) - - # then - queue.get(timeout=5) - - def test_config_events(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - queue = Queue() - - def handle_event(event_data): - assert event_data.session_id == session.id - assert event_data.HasField("config_event") - queue.put(event_data) - - # then - with client.context_connect(): - client.events(session.id, handle_event) - time.sleep(0.1) - session_config = session.options.get_configs() - config_data = ConfigShim.config_data( - 0, None, ConfigFlags.UPDATE.value, session.options, session_config - ) - session.broadcast_config(config_data) - - # then - queue.get(timeout=5) - - def test_exception_events(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - queue = Queue() - exception_level = ExceptionLevels.FATAL - source = "test" - node_id = None - text = "exception message" - - def handle_event(event_data): - assert event_data.session_id == session.id - assert event_data.HasField("exception_event") - exception_event = event_data.exception_event - assert exception_event.level == exception_level.value - assert exception_event.node_id == 0 - assert exception_event.source == source - assert exception_event.text == text - queue.put(event_data) - - # then - with client.context_connect(): - client.events(session.id, handle_event) - time.sleep(0.1) - session.exception(exception_level, source, text, node_id) - - # then - queue.get(timeout=5) - - def test_file_events(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - queue = Queue() - - def handle_event(event_data): - assert event_data.session_id == session.id - assert event_data.HasField("file_event") - queue.put(event_data) - - # then - with client.context_connect(): - client.events(session.id, handle_event) - time.sleep(0.1) - file_data = session.services.get_service_file( - node, "DefaultRoute", "defaultroute.sh" - ) - session.broadcast_file(file_data) - - # then - queue.get(timeout=5) - - def test_move_nodes(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - x, y = 10.0, 15.0 - - def move_iter(): - yield core_pb2.MoveNodesRequest( - session_id=session.id, - node_id=node.id, - position=core_pb2.Position(x=x, y=y), - ) - - # then - with client.context_connect(): - client.move_nodes(move_iter()) - - # assert - assert node.position.x == x - assert node.position.y == y - - def test_move_nodes_geo(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - session = grpc_server.coreemu.create_session() - node = session.add_node(CoreNode) - lon, lat, alt = 10.0, 15.0, 5.0 - queue = Queue() - - def node_handler(node_data: NodeData): - n = node_data.node - assert n.position.lon == lon - assert n.position.lat == lat - assert n.position.alt == alt - queue.put(node_data) - - session.node_handlers.append(node_handler) - - def move_iter(): - yield core_pb2.MoveNodesRequest( - session_id=session.id, - node_id=node.id, - geo=core_pb2.Geo(lon=lon, lat=lat, alt=alt), - ) - - # then - with client.context_connect(): - client.move_nodes(move_iter()) - - # assert - assert node.position.lon == lon - assert node.position.lat == lat - assert node.position.alt == alt - assert queue.get(timeout=5) - - def test_move_nodes_exception(self, grpc_server: CoreGrpcServer): - # given - client = CoreGrpcClient() - grpc_server.coreemu.create_session() - - def move_iter(): - yield core_pb2.MoveNodesRequest() - - # then - with pytest.raises(grpc.RpcError): - with client.context_connect(): - client.move_nodes(move_iter()) diff --git a/daemon/tests/test_grpcw.py b/daemon/tests/test_grpcw.py index f512dba1..5160128c 100644 --- a/daemon/tests/test_grpcw.py +++ b/daemon/tests/test_grpcw.py @@ -9,7 +9,7 @@ import pytest from mock import patch from core.api.grpc import core_pb2 -from core.api.grpc.clientw import CoreGrpcClient, InterfaceHelper, MoveNodesStreamer +from core.api.grpc.client import CoreGrpcClient, InterfaceHelper, MoveNodesStreamer from core.api.grpc.server import CoreGrpcServer from core.api.grpc.wrappers import ( ConfigOption, diff --git a/docs/grpc.md b/docs/grpc.md index 998970c5..ca3ebe00 100644 --- a/docs/grpc.md +++ b/docs/grpc.md @@ -53,15 +53,15 @@ and the services they map to. There is an interface helper class that can be leveraged for convenience when creating interface data for nodes. Alternatively one can manually create -a `core.api.grpc.core_pb2.Interface` class instead with appropriate information. +a `core.api.grpc.wrappers.Interface` class instead with appropriate information. -Manually creating gRPC interface data: +Manually creating gRPC client interface: ```python -from core.api.grpc import core_pb2 +from core.api.grpc.wrappers import Interface # id is optional and will set to the next available id # name is optional and will default to eth # mac is optional and will result in a randomly generated mac -iface_data = core_pb2.Interface( +iface = Interface( id=0, name="eth0", ip4="10.0.0.1", @@ -98,16 +98,24 @@ Event types: * file - file events when the legacy gui joins a session ```python -from core.api.grpc import core_pb2 +from core.api.grpc import client +from core.api.grpc.wrappers import EventType def event_listener(event): print(event) +# create grpc client and connect +core = client.CoreGrpcClient() +core.connect() + +# add session +session = core.add_session() + # provide no events to listen to all events -core.events(session_id, event_listener) +core.events(session.id, event_listener) # provide events to listen to specific events -core.events(session_id, event_listener, [core_pb2.EventType.NODE]) +core.events(session.id, event_listener, [EventType.NODE]) ``` ### Configuring Links @@ -122,27 +130,47 @@ Currently supported configuration options: * loss (%) ```python -from core.api.grpc import core_pb2 +from core.api.grpc import client +from core.api.grpc.wrappers import LinkOptions, Position + +# interface helper +iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") + +# create grpc client and connect +core = client.CoreGrpcClient() +core.connect() + +# add session +session = core.add_session() + +# create nodes +position = Position(x=100, y=100) +node1 = session.add_node(1, position=position) +position = Position(x=300, y=100) +node2 = session.add_node(2, position=position) # configuring when creating a link -options = core_pb2.LinkOptions( +options = LinkOptions( bandwidth=54_000_000, delay=5000, dup=5, loss=5.5, jitter=0, ) -core.add_link(session_id, n1_id, n2_id, iface1_data, iface2_data, options) +iface1 = iface_helper.create_iface(node1.id, 0) +iface2 = iface_helper.create_iface(node2.id, 0) +link = session.add_link(node1=node1, node2=node2, iface1=iface1, iface2=iface2) # configuring during runtime -core.edit_link(session_id, n1_id, n2_id, iface1_id, iface2_id, options) +link.options.loss = 10.0 +core.edit_link(session.id, link) ``` ### Peer to Peer Example ```python # required imports from core.api.grpc import client -from core.api.grpc.core_pb2 import Node, NodeType, Position, SessionState +from core.api.grpc.core_pb2 import Position # interface helper iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") @@ -151,39 +179,29 @@ iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001 core = client.CoreGrpcClient() core.connect() -# create session and get id -response = core.create_session() -session_id = response.session_id +# add session +session = core.add_session() -# change session state to configuration so that nodes get started when added -core.set_session_state(session_id, SessionState.CONFIGURATION) - -# create node one +# create nodes position = Position(x=100, y=100) -n1 = Node(type=NodeType.DEFAULT, position=position, model="PC") -response = core.add_node(session_id, n1) -n1_id = response.node_id - -# create node two +node1 = session.add_node(1, position=position) position = Position(x=300, y=100) -n2 = Node(type=NodeType.DEFAULT, position=position, model="PC") -response = core.add_node(session_id, n2) -n2_id = response.node_id +node2 = session.add_node(2, position=position) -# links nodes together -iface1 = iface_helper.create_iface(n1_id, 0) -iface2 = iface_helper.create_iface(n2_id, 0) -core.add_link(session_id, n1_id, n2_id, iface1, iface2) +# create link +iface1 = iface_helper.create_iface(node1.id, 0) +iface2 = iface_helper.create_iface(node2.id, 0) +session.add_link(node1=node1, node2=node2, iface1=iface1, iface2=iface2) -# change session state -core.set_session_state(session_id, SessionState.INSTANTIATION) +# start session +core.start_session(session) ``` ### Switch/Hub Example ```python # required imports from core.api.grpc import client -from core.api.grpc.core_pb2 import Node, NodeType, Position, SessionState +from core.api.grpc.core_pb2 import NodeType, Position # interface helper iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") @@ -192,46 +210,32 @@ iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001 core = client.CoreGrpcClient() core.connect() -# create session and get id -response = core.create_session() -session_id = response.session_id +# add session +session = core.add_session() -# change session state to configuration so that nodes get started when added -core.set_session_state(session_id, SessionState.CONFIGURATION) - -# create switch node +# create nodes position = Position(x=200, y=200) -switch = Node(type=NodeType.SWITCH, position=position) -response = core.add_node(session_id, switch) -switch_id = response.node_id - -# create node one +switch = session.add_node(1, _type=NodeType.SWITCH, position=position) position = Position(x=100, y=100) -n1 = Node(type=NodeType.DEFAULT, position=position, model="PC") -response = core.add_node(session_id, n1) -n1_id = response.node_id - -# create node two +node1 = session.add_node(2, position=position) position = Position(x=300, y=100) -n2 = Node(type=NodeType.DEFAULT, position=position, model="PC") -response = core.add_node(session_id, n2) -n2_id = response.node_id +node2 = session.add_node(3, position=position) -# links nodes to switch -iface1 = iface_helper.create_iface(n1_id, 0) -core.add_link(session_id, n1_id, switch_id, iface1) -iface1 = iface_helper.create_iface(n2_id, 0) -core.add_link(session_id, n2_id, switch_id, iface1) +# create links +iface1 = iface_helper.create_iface(node1.id, 0) +session.add_link(node1=node1, node2=switch, iface1=iface1) +iface1 = iface_helper.create_iface(node2.id, 0) +session.add_link(node1=node2, node2=switch, iface1=iface1) -# change session state -core.set_session_state(session_id, SessionState.INSTANTIATION) +# start session +core.start_session(session) ``` ### WLAN Example ```python # required imports from core.api.grpc import client -from core.api.grpc.core_pb2 import Node, NodeType, Position, SessionState +from core.api.grpc.core_pb2 import NodeType, Position # interface helper iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001::/64") @@ -240,49 +244,37 @@ iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001 core = client.CoreGrpcClient() core.connect() -# create session and get id -response = core.create_session() -session_id = response.session_id +# add session +session = core.add_session() -# change session state to configuration so that nodes get started when added -core.set_session_state(session_id, SessionState.CONFIGURATION) - -# create wlan node +# create nodes position = Position(x=200, y=200) -wlan = Node(type=NodeType.WIRELESS_LAN, position=position) -response = core.add_node(session_id, wlan) -wlan_id = response.node_id - -# create node one +wlan = session.add_node(1, _type=NodeType.WIRELESS_LAN, position=position) position = Position(x=100, y=100) -n1 = Node(type=NodeType.DEFAULT, position=position, model="mdr") -response = core.add_node(session_id, n1) -n1_id = response.node_id - -# create node two +node1 = session.add_node(2, model="mdr", position=position) position = Position(x=300, y=100) -n2 = Node(type=NodeType.DEFAULT, position=position, model="mdr") -response = core.add_node(session_id, n2) -n2_id = response.node_id +node2 = session.add_node(3, model="mdr", position=position) -# configure wlan using a dict mapping currently +# create links +iface1 = iface_helper.create_iface(node1.id, 0) +session.add_link(node1=node1, node2=wlan, iface1=iface1) +iface1 = iface_helper.create_iface(node2.id, 0) +session.add_link(node1=node2, node2=wlan, iface1=iface1) + +# set wlan config using a dict mapping currently # support values as strings -core.set_wlan_config(session_id, wlan_id, { - "range": "280", - "bandwidth": "55000000", - "delay": "6000", - "jitter": "5", - "error": "5", -}) +wlan.set_wlan( + { + "range": "280", + "bandwidth": "55000000", + "delay": "6000", + "jitter": "5", + "error": "5", + } +) -# links nodes to wlan -iface1 = iface_helper.create_iface(n1_id, 0) -core.add_link(session_id, n1_id, wlan_id, iface1) -iface1 = iface_helper.create_iface(n2_id, 0) -core.add_link(session_id, n2_id, wlan_id, iface1) - -# change session state -core.set_session_state(session_id, SessionState.INSTANTIATION) +# start session +core.start_session(session) ``` ### EMANE Example @@ -307,7 +299,7 @@ will use the defaults. When no configuration is used, the defaults are used. ```python # required imports from core.api.grpc import client -from core.api.grpc.core_pb2 import Node, NodeType, Position, SessionState +from core.api.grpc.core_pb2 import NodeType, Position from core.emane.ieee80211abg import EmaneIeee80211abgModel # interface helper @@ -317,68 +309,45 @@ iface_helper = client.InterfaceHelper(ip4_prefix="10.0.0.0/24", ip6_prefix="2001 core = client.CoreGrpcClient() core.connect() -# create session and get id -response = core.create_session() -session_id = response.session_id +# add session +session = core.add_session() -# change session state to configuration so that nodes get started when added -core.set_session_state(session_id, SessionState.CONFIGURATION) - -# create emane node +# create nodes position = Position(x=200, y=200) -emane = Node(type=NodeType.EMANE, position=position, emane=EmaneIeee80211abgModel.name) -response = core.add_node(session_id, emane) -emane_id = response.node_id - -# create node one +emane = session.add_node( + 1, _type=NodeType.EMANE, position=position, emane=EmaneIeee80211abgModel.name +) position = Position(x=100, y=100) -n1 = Node(type=NodeType.DEFAULT, position=position, model="mdr") -response = core.add_node(session_id, n1) -n1_id = response.node_id - -# create node two +node1 = session.add_node(2, model="mdr", position=position) position = Position(x=300, y=100) -n2 = Node(type=NodeType.DEFAULT, position=position, model="mdr") -response = core.add_node(session_id, n2) -n2_id = response.node_id +node2 = session.add_node(3, model="mdr", position=position) -# configure general emane settings -core.set_emane_config(session_id, { - "eventservicettl": "2" -}) +# create links +iface1 = iface_helper.create_iface(node1.id, 0) +session.add_link(node1=node1, node2=emane, iface1=iface1) +iface1 = iface_helper.create_iface(node2.id, 0) +session.add_link(node1=node2, node2=emane, iface1=iface1) -# configure emane model settings -# using a dict mapping currently support values as strings -core.set_emane_model_config(session_id, emane_id, EmaneIeee80211abgModel.name, { - "unicastrate": "3", -}) +# setting global emane configuration +session.set_emane({"eventservicettl": "2"}) +# setting emane specific emane model configuration +emane.set_emane_model(EmaneIeee80211abgModel.name, {"unicastrate": "3"}) -# links nodes to emane -iface1 = iface_helper.create_iface(n1_id, 0) -core.add_link(session_id, n1_id, emane_id, iface1) -iface1 = iface_helper.create_iface(n2_id, 0) -core.add_link(session_id, n2_id, emane_id, iface1) - -# change session state -core.set_session_state(session_id, SessionState.INSTANTIATION) +# start session +core.start_session(session) ``` EMANE Model Configuration: ```python -# emane network specific config -core.set_emane_model_config(session_id, emane_id, EmaneIeee80211abgModel.name, { - "unicastrate": "3", -}) +# emane network specific config, set on an emane node +# this setting applies to all nodes connected +emane.set_emane_model(EmaneIeee80211abgModel.name, {"unicastrate": "3"}) -# node specific config -core.set_emane_model_config(session_id, node_id, EmaneIeee80211abgModel.name, { - "unicastrate": "3", -}) +# node specific config for an individual node connected to an emane network +node.set_emane_model(EmaneIeee80211abgModel.name, {"unicastrate": "3"}) -# node interface specific config -core.set_emane_model_config(session_id, node_id, EmaneIeee80211abgModel.name, { - "unicastrate": "3", -}, iface_id) +# node interface specific config for an individual node connected to an emane network +node.set_emane_model(EmaneIeee80211abgModel.name, {"unicastrate": "3"}, iface_id=0) ``` ## Configuring a Service @@ -398,11 +367,8 @@ The following features can be configured for a service: Editing service properties: ```python # configure a service, for a node, for a given session -core.set_node_service( - session_id, - node_id, - service_name, - files=["file1.sh", "file2.sh"], +node.service_configs[service_name] = NodeServiceData( + configs=["file1.sh", "file2.sh"], directories=["/etc/node"], startup=["bash file1.sh"], validate=[], @@ -417,13 +383,8 @@ Editing a service file: ```python # to edit the contents of a generated file you can specify # the service, the file name, and its contents -core.set_node_service_file( - session_id, - node_id, - service_name, - file_name, - "echo hello", -) +file_configs = node.service_file_configs.setdefault(service_name, {}) +file_configs[file_name] = "echo hello world" ``` ## File Examples