From 0aa7c6f1f2779c052f82231b50f552d2892b94cd Mon Sep 17 00:00:00 2001 From: Blake Harnden <32446120+bharnden@users.noreply.github.com> Date: Mon, 6 Apr 2020 16:09:01 -0700 Subject: [PATCH] cleaned up how grpc creates node protobuf data for grpc interfaces, cleaned up route monitor script slighly --- daemon/core/api/grpc/grpcutils.py | 42 ++++++++++++++++++ daemon/core/api/grpc/server.py | 71 ++----------------------------- daemon/scripts/core-route-monitor | 32 +++++++++----- 3 files changed, 66 insertions(+), 79 deletions(-) diff --git a/daemon/core/api/grpc/grpcutils.py b/daemon/core/api/grpc/grpcutils.py index e23073a0..0aa5a553 100644 --- a/daemon/core/api/grpc/grpcutils.py +++ b/daemon/core/api/grpc/grpcutils.py @@ -8,6 +8,7 @@ from core import utils from core.api.grpc import common_pb2, core_pb2 from core.api.grpc.services_pb2 import NodeServiceData, ServiceConfig from core.config import ConfigurableOptions +from core.emane.nodes import EmaneNet from core.emulator.data import LinkData from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions from core.emulator.enumerations import LinkTypes, NodeTypes @@ -221,6 +222,47 @@ def get_config_options( return results +def get_node_proto(session: Session, node: NodeBase) -> core_pb2.Node: + """ + Convert CORE node to protobuf representation. + + :param session: session containing node + :param node: node to convert + :return: node proto + """ + node_type = session.get_node_type(node.__class__) + position = core_pb2.Position( + x=node.position.x, y=node.position.y, z=node.position.z + ) + services = getattr(node, "services", []) + if services is None: + services = [] + services = [x.name for x in services] + config_services = getattr(node, "config_services", {}) + config_services = [x for x in config_services] + emane_model = None + if isinstance(node, EmaneNet): + emane_model = node.model.name + model = getattr(node, "type", None) + node_dir = getattr(node, "nodedir", None) + channel = getattr(node, "ctrlchnlname", None) + image = getattr(node, "image", None) + return core_pb2.Node( + id=node.id, + name=node.name, + emane=emane_model, + model=model, + type=node_type.value, + position=position, + services=services, + icon=node.icon, + image=image, + config_services=config_services, + dir=node_dir, + channel=channel, + ) + + def get_links(session: Session, node: NodeBase): """ Retrieve a list of links for grpc to use diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index 73f24176..c559f8f2 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -103,7 +103,6 @@ from core.api.grpc.wlan_pb2 import ( SetWlanConfigRequest, SetWlanConfigResponse, ) -from core.emane.nodes import EmaneNet from core.emulator.coreemu import CoreEmu from core.emulator.data import LinkData from core.emulator.emudata import LinkOptions, NodeOptions @@ -111,9 +110,7 @@ from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags from core.emulator.session import Session from core.errors import CoreCommandError, CoreError from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility -from core.nodes.base import CoreNode, CoreNodeBase, NodeBase -from core.nodes.docker import DockerNode -from core.nodes.lxd import LxcNode +from core.nodes.base import CoreNodeBase, NodeBase from core.services.coreservices import ServiceManager _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -544,42 +541,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): node = session.nodes[_id] if not isinstance(node.id, int): continue - node_type = session.get_node_type(node.__class__) - model = getattr(node, "type", None) - position = core_pb2.Position( - x=node.position.x, y=node.position.y, z=node.position.z - ) - services = getattr(node, "services", []) - if services is None: - services = [] - services = [x.name for x in services] - config_services = getattr(node, "config_services", {}) - config_services = [x for x in config_services] - emane_model = None - if isinstance(node, EmaneNet): - emane_model = node.model.name - node_dir = None - channel = None - if isinstance(node, CoreNode): - node_dir = node.nodedir - channel = node.ctrlchnlname - image = getattr(node, "image", None) - node_proto = core_pb2.Node( - id=node.id, - name=node.name, - emane=emane_model, - model=model, - type=node_type.value, - position=position, - services=services, - icon=node.icon, - image=image, - config_services=config_services, - dir=node_dir, - channel=channel, - ) - if isinstance(node, (DockerNode, LxcNode)): - node_proto.image = node.image + node_proto = grpcutils.get_node_proto(session, node) nodes.append(node_proto) node_links = get_links(session, node) links.extend(node_links) @@ -723,34 +685,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): interface = node._netif[interface_id] interface_proto = grpcutils.interface_to_proto(interface) interfaces.append(interface_proto) - emane_model = None - if isinstance(node, EmaneNet): - emane_model = node.model.name - node_dir = None - channel = None - if isinstance(node, CoreNode): - node_dir = node.nodedir - channel = node.ctrlchnlname - services = [] - if node.services: - services = [x.name for x in node.services] - position = core_pb2.Position( - x=node.position.x, y=node.position.y, z=node.position.z - ) - node_type = session.get_node_type(node.__class__) - node_proto = core_pb2.Node( - id=node.id, - name=node.name, - type=node_type.value, - emane=emane_model, - model=node.type, - position=position, - services=services, - dir=node_dir, - channel=channel, - ) - if isinstance(node, (DockerNode, LxcNode)): - node_proto.image = node.image + node_proto = grpcutils.get_node_proto(session, node) return core_pb2.GetNodeResponse(node=node_proto, interfaces=interfaces) def EditNode( diff --git a/daemon/scripts/core-route-monitor b/daemon/scripts/core-route-monitor index c040b369..668b3c81 100755 --- a/daemon/scripts/core-route-monitor +++ b/daemon/scripts/core-route-monitor @@ -33,9 +33,11 @@ class SdtClient: self.links = [] self.send(f'layer "{ROUTE_LAYER}"') + def close(self) -> None: + self.sock.close() + def send(self, cmd: str) -> None: sdt_cmd = f"{cmd}\n".encode() - print("sdt cmd: ", cmd) self.sock.sendall(sdt_cmd) def add_link(self, node1, node2) -> None: @@ -64,6 +66,7 @@ class RouterMonitor: self.seen = {} self.running = False self.route_time = None + self.listeners = [] self.sdt = SdtClient((sdt_host, sdt_port)) self.nodes = self.get_nodes() @@ -77,7 +80,7 @@ class RouterMonitor: session = sessions[0] if not session: raise Exception("no current core sessions") - print(session.dir) + print("session: ", session.dir) response = self.core.get_session(session.id) for node in response.session.nodes: if node.type != NodeType.DEFAULT: @@ -88,8 +91,10 @@ class RouterMonitor: def start(self) -> None: self.running = True for node_id, node in self.nodes.items(): + print("listening on node: ", node) thread = Thread(target=self.listen, args=(node_id, node), daemon=True) thread.start() + self.listeners.append(thread) self.manage() def manage(self) -> None: @@ -123,7 +128,6 @@ class RouterMonitor: key=cmp_to_key(self.route_sort), reverse=True) print("current route:") - print(values) for index, node_data in enumerate(values): next_index = index + 1 if next_index == len(values): @@ -133,8 +137,13 @@ class RouterMonitor: print(f"{node_id} -> {next_node_id}") self.sdt.add_link(node_id, next_node_id) - def cleanup(self) -> None: + def stop(self) -> None: + self.running = False self.sdt.delete_links() + self.sdt.close() + for thread in self.listeners: + thread.join() + self.listeners.clear() def listen(self, node_id, node) -> None: cmd = ( @@ -149,16 +158,17 @@ class RouterMonitor: ready, _, _ = select.select([p.stdout], [], [], 1) if ready: line = p.stdout.readline().strip().decode() - line = line.split("ttl", 1)[1] - ttl = int(line.split(",", 1)[0]) - p.stdout.readline() - self.queue.put((RouteEnum.ADD, node_id, ttl)) - current = time.monotonic() + if line: + line = line.split("ttl", 1)[1] + ttl = int(line.split(",", 1)[0]) + p.stdout.readline() + self.queue.put((RouteEnum.ADD, node_id, ttl)) + current = time.monotonic() else: if (time.monotonic() - current) >= DEAD_TIME: self.queue.put((RouteEnum.DEL, node_id, None)) except Exception as e: - print(f"listen error: {e}") + print(f"listener error: {e}") def main() -> None: @@ -189,7 +199,7 @@ def main() -> None: try: monitor.start() except KeyboardInterrupt: - monitor.cleanup() + monitor.stop() print("ending packet monitor")