From 236ac7919a7b408d0263043075308312ccc8c42a Mon Sep 17 00:00:00 2001 From: bharnden <32446120+bharnden@users.noreply.github.com> Date: Mon, 28 Oct 2019 23:11:15 -0700 Subject: [PATCH] moved grpc utility functions into grpcutils, updated StartSession to threadpool node and link creation --- daemon/core/api/grpc/grpcutils.py | 316 ++++++++++++++++++++++++++++-- daemon/core/api/grpc/server.py | 289 +++------------------------ daemon/examples/grpc/large.py | 52 ++--- 3 files changed, 335 insertions(+), 322 deletions(-) diff --git a/daemon/core/api/grpc/grpcutils.py b/daemon/core/api/grpc/grpcutils.py index b729e584..aec094d3 100644 --- a/daemon/core/api/grpc/grpcutils.py +++ b/daemon/core/api/grpc/grpcutils.py @@ -1,12 +1,24 @@ -import asyncio +import concurrent.futures import logging import time -from core.emulator.emudata import NodeOptions -from core.emulator.enumerations import NodeTypes +from core.api.grpc import core_pb2 +from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions +from core.emulator.enumerations import LinkTypes, NodeTypes +from core.nodes.base import CoreNetworkBase +from core.nodes.ipaddress import MacAddress + +WORKERS = 10 def add_node_data(node_proto): + """ + Convert node protobuf message to data for creating a node. + + :param core_pb2.Node node_proto: node proto message + :return: node type, id, and options + :rtype: tuple + """ _id = node_proto.id _type = node_proto.type if _type is None: @@ -27,29 +39,293 @@ def add_node_data(node_proto): return _type, _id, options -async def async_add_node(session, node_proto): - _type, _id, options = add_node_data(node_proto) - session.add_node(_type=_type, _id=_id, options=options) +def link_interface(interface_proto): + """ + Create interface data from interface proto. + + :param core_pb2.Interface interface_proto: interface proto + :return: interface data + :rtype: InterfaceData + """ + interface = None + if interface_proto: + name = interface_proto.name + if name == "": + name = None + mac = interface_proto.mac + if mac == "": + mac = None + else: + mac = MacAddress.from_string(mac) + interface = InterfaceData( + _id=interface_proto.id, + name=name, + mac=mac, + ip4=interface_proto.ip4, + ip4_mask=interface_proto.ip4mask, + ip6=interface_proto.ip6, + ip6_mask=interface_proto.ip6mask, + ) + return interface -async def create_nodes(loop, session, node_protos): - tasks = [] - for node_proto in node_protos: - task = loop.create_task(async_add_node(session, node_proto)) - tasks.append(task) +def add_link_data(link_proto): + """ + Convert link proto to link interfaces and options data. + :param core_pb2.Link link_proto: link proto + :return: link interfaces and options + :rtype: tuple + """ + interface_one = link_interface(link_proto.interface_one) + interface_two = link_interface(link_proto.interface_two) + + link_type = None + link_type_value = link_proto.type + if link_type_value is not None: + link_type = LinkTypes(link_type_value) + + options = LinkOptions(_type=link_type) + options_data = link_proto.options + if options_data: + options.delay = options_data.delay + options.bandwidth = options_data.bandwidth + options.per = options_data.per + options.dup = options_data.dup + options.jitter = options_data.jitter + options.mer = options_data.mer + options.burst = options_data.burst + options.mburst = options_data.mburst + options.unidirectional = options_data.unidirectional + options.key = options_data.key + options.opaque = options_data.opaque + + return interface_one, interface_two, options + + +def create_nodes(session, node_protos): + """ + Create nodes using a thread pool and wait for completion. + + :param core.emulator.session.Session session: session to create nodes in + :param list[core_pb2.Node] node_protos: node proto messages + :return: results and exceptions for created nodes + :rtype: tuple + """ start = time.monotonic() - results = await asyncio.gather(*tasks, return_exceptions=True) + with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: + futures = [] + for node_proto in node_protos: + _type, _id, options = add_node_data(node_proto) + future = executor.submit(session.add_node, _type, _id, options) + futures.append(future) + results = [] + exceptions = [] + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as executor: + exceptions.append(executor) total = time.monotonic() - start + logging.info("created nodes time: %s", total) + return results, exceptions - logging.info(f"created nodes time: {total}") + +def create_links(session, link_protos): + """ + Create nodes using a thread pool and wait for completion. + + :param core.emulator.session.Session session: session to create nodes in + :param list[core_pb2.Link] link_protos: link proto messages + :return: results and exceptions for created links + :rtype: tuple + """ + start = time.monotonic() + with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: + futures = [] + for link_proto in link_protos: + node_one_id = link_proto.node_one_id + node_two_id = link_proto.node_two_id + interface_one, interface_two, options = add_link_data(link_proto) + future = executor.submit( + session.add_link, + node_one_id, + node_two_id, + interface_one, + interface_two, + options, + ) + futures.append(future) + results = [] + exceptions = [] + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as executor: + exceptions.append(executor) + total = time.monotonic() - start + logging.info("created links time: %s", total) + return results, exceptions + + +def convert_value(value): + """ + Convert value into string. + + :param value: value + :return: string conversion of the value + :rtype: str + """ + if value is not None: + value = str(value) + return value + + +def get_config_options(config, configurable_options): + """ + Retrieve configuration options in a form that is used by the grpc server. + + :param dict config: configuration + :param core.config.ConfigurableOptions configurable_options: configurable options + :return: mapping of configuration ids to configuration options + :rtype: dict[str,core.api.grpc.core_pb2.ConfigOption] + """ + results = {} + for configuration in configurable_options.configurations(): + value = config[configuration.id] + config_option = core_pb2.ConfigOption( + label=configuration.label, + name=configuration.id, + value=value, + type=configuration.type.value, + select=configuration.options, + ) + results[configuration.id] = config_option + for config_group in configurable_options.config_groups(): + start = config_group.start - 1 + stop = config_group.stop + options = list(results.values())[start:stop] + for option in options: + option.group = config_group.name return results -def sync_create_nodes(session, node_protos): - start = time.monotonic() - for node_proto in node_protos: - _type, _id, options = add_node_data(node_proto) - session.add_node(_type=_type, _id=_id, options=options) - total = time.monotonic() - start - logging.info(f"created nodes time: {total}") +def get_links(session, node): + """ + Retrieve a list of links for grpc to use + + :param core.emulator.Session session: node's section + :param core.nodes.base.CoreNode node: node to get links from + :return: [core.api.grpc.core_pb2.Link] + """ + links = [] + for link_data in node.all_link_data(0): + link = convert_link(session, link_data) + links.append(link) + return links + + +def get_emane_model_id(node_id, interface_id): + """ + Get EMANE model id + + :param int node_id: node id + :param int interface_id: interface id + :return: EMANE model id + :rtype: int + """ + if interface_id >= 0: + return node_id * 1000 + interface_id + else: + return node_id + + +def convert_link(session, link_data): + """ + Convert link_data into core protobuf Link + + :param core.emulator.session.Session session: + :param core.emulator.data.LinkData link_data: + :return: core protobuf Link + :rtype: core.api.grpc.core_pb2.Link + """ + interface_one = None + if link_data.interface1_id is not None: + node = session.get_node(link_data.node1_id) + interface_name = None + if not isinstance(node, CoreNetworkBase): + interface = node.netif(link_data.interface1_id) + interface_name = interface.name + interface_one = core_pb2.Interface( + id=link_data.interface1_id, + name=interface_name, + mac=convert_value(link_data.interface1_mac), + ip4=convert_value(link_data.interface1_ip4), + ip4mask=link_data.interface1_ip4_mask, + ip6=convert_value(link_data.interface1_ip6), + ip6mask=link_data.interface1_ip6_mask, + ) + + interface_two = None + if link_data.interface2_id is not None: + node = session.get_node(link_data.node2_id) + interface_name = None + if not isinstance(node, CoreNetworkBase): + interface = node.netif(link_data.interface2_id) + interface_name = interface.name + interface_two = core_pb2.Interface( + id=link_data.interface2_id, + name=interface_name, + mac=convert_value(link_data.interface2_mac), + ip4=convert_value(link_data.interface2_ip4), + ip4mask=link_data.interface2_ip4_mask, + ip6=convert_value(link_data.interface2_ip6), + ip6mask=link_data.interface2_ip6_mask, + ) + + options = core_pb2.LinkOptions( + opaque=link_data.opaque, + jitter=link_data.jitter, + key=link_data.key, + mburst=link_data.mburst, + mer=link_data.mer, + per=link_data.per, + bandwidth=link_data.bandwidth, + burst=link_data.burst, + delay=link_data.delay, + dup=link_data.dup, + unidirectional=link_data.unidirectional, + ) + + return core_pb2.Link( + type=link_data.link_type, + node_one_id=link_data.node1_id, + node_two_id=link_data.node2_id, + interface_one=interface_one, + interface_two=interface_two, + options=options, + ) + + +def get_net_stats(): + """ + Retrieve status about the current interfaces in the system + + :return: send and receive status of the interfaces in the system + :rtype: dict + """ + with open("/proc/net/dev", "r") as f: + data = f.readlines()[2:] + + stats = {} + for line in data: + line = line.strip() + if not line: + continue + line = line.split() + line[0] = line[0].strip(":") + stats[line[0]] = {"rx": float(line[1]), "tx": float(line[9])} + + return stats diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index ff23b43e..c516ea45 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -10,6 +10,13 @@ from queue import Empty, Queue import grpc from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils +from core.api.grpc.grpcutils import ( + convert_value, + get_config_options, + get_emane_model_id, + get_links, + get_net_stats, +) from core.emane.nodes import EmaneNet from core.emulator.data import ( ConfigData, @@ -19,13 +26,11 @@ from core.emulator.data import ( LinkData, NodeData, ) -from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions -from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags, NodeTypes +from core.emulator.emudata import LinkOptions, NodeOptions +from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags from core.errors import CoreCommandError, CoreError from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility -from core.nodes.base import CoreNetworkBase from core.nodes.docker import DockerNode -from core.nodes.ipaddress import MacAddress from core.nodes.lxd import LxcNode from core.services.coreservices import ServiceManager @@ -33,167 +38,6 @@ _ONE_DAY_IN_SECONDS = 60 * 60 * 24 _INTERFACE_REGEX = re.compile(r"\d+") -def convert_value(value): - """ - Convert value into string. - - :param value: value - :return: string conversion of the value - :rtype: str - """ - if value is not None: - value = str(value) - return value - - -def get_config_options(config, configurable_options): - """ - Retrieve configuration options in a form that is used by the grpc server. - - :param dict config: configuration - :param core.config.ConfigurableOptions configurable_options: configurable options - :return: mapping of configuration ids to configuration options - :rtype: dict[str,core.api.grpc.core_pb2.ConfigOption] - """ - results = {} - for configuration in configurable_options.configurations(): - value = config[configuration.id] - config_option = core_pb2.ConfigOption( - label=configuration.label, - name=configuration.id, - value=value, - type=configuration.type.value, - select=configuration.options, - ) - results[configuration.id] = config_option - for config_group in configurable_options.config_groups(): - start = config_group.start - 1 - stop = config_group.stop - options = list(results.values())[start:stop] - for option in options: - option.group = config_group.name - return results - - -def get_links(session, node): - """ - Retrieve a list of links for grpc to use - - :param core.emulator.Session session: node's section - :param core.nodes.base.CoreNode node: node to get links from - :return: [core.api.grpc.core_pb2.Link] - """ - links = [] - for link_data in node.all_link_data(0): - link = convert_link(session, link_data) - links.append(link) - return links - - -def get_emane_model_id(node_id, interface_id): - """ - Get EMANE model id - - :param int node_id: node id - :param int interface_id: interface id - :return: EMANE model id - :rtype: int - """ - if interface_id >= 0: - return node_id * 1000 + interface_id - else: - return node_id - - -def convert_link(session, link_data): - """ - Convert link_data into core protobuf Link - - :param core.emulator.session.Session session: - :param core.emulator.data.LinkData link_data: - :return: core protobuf Link - :rtype: core.api.grpc.core_pb2.Link - """ - interface_one = None - if link_data.interface1_id is not None: - node = session.get_node(link_data.node1_id) - interface_name = None - if not isinstance(node, CoreNetworkBase): - interface = node.netif(link_data.interface1_id) - interface_name = interface.name - interface_one = core_pb2.Interface( - id=link_data.interface1_id, - name=interface_name, - mac=convert_value(link_data.interface1_mac), - ip4=convert_value(link_data.interface1_ip4), - ip4mask=link_data.interface1_ip4_mask, - ip6=convert_value(link_data.interface1_ip6), - ip6mask=link_data.interface1_ip6_mask, - ) - - interface_two = None - if link_data.interface2_id is not None: - node = session.get_node(link_data.node2_id) - interface_name = None - if not isinstance(node, CoreNetworkBase): - interface = node.netif(link_data.interface2_id) - interface_name = interface.name - interface_two = core_pb2.Interface( - id=link_data.interface2_id, - name=interface_name, - mac=convert_value(link_data.interface2_mac), - ip4=convert_value(link_data.interface2_ip4), - ip4mask=link_data.interface2_ip4_mask, - ip6=convert_value(link_data.interface2_ip6), - ip6mask=link_data.interface2_ip6_mask, - ) - - options = core_pb2.LinkOptions( - opaque=link_data.opaque, - jitter=link_data.jitter, - key=link_data.key, - mburst=link_data.mburst, - mer=link_data.mer, - per=link_data.per, - bandwidth=link_data.bandwidth, - burst=link_data.burst, - delay=link_data.delay, - dup=link_data.dup, - unidirectional=link_data.unidirectional, - ) - - return core_pb2.Link( - type=link_data.link_type, - node_one_id=link_data.node1_id, - node_two_id=link_data.node2_id, - interface_one=interface_one, - interface_two=interface_two, - options=options, - ) - - -def get_net_stats(): - """ - Retrieve status about the current interfaces in the system - - :return: send and receive status of the interfaces in the system - :rtype: dict - """ - with open("/proc/net/dev", "r") as f: - data = f.readlines()[2:] - - stats = {} - for line in data: - line = line.strip() - if not line: - continue - line = line.split() - line[0] = line[0].strip(":") - stats[line[0]] = {"rx": float(line[1]), "tx": float(line[9])} - - return stats - - class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): """ Create CoreGrpcServer instance @@ -279,12 +123,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): os.mkdir(session.session_dir) # create nodes - # loop = asyncio.new_event_loop() - # asyncio.set_event_loop(loop) - # results = loop.run_until_complete( - # grpcutils.create_nodes(loop, session, request.nodes) - # ) - grpcutils.sync_create_nodes(session, request.nodes) + grpcutils.create_nodes(session, request.nodes) + + # create links + logging.info("links: %s", request.links) + grpcutils.create_links(session, request.links) # set to instantiation and start session.set_state(EventTypes.INSTANTIATION_STATE) @@ -302,7 +145,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): :rtype: core.api.grpc.core_pb2.StopSessionResponse """ logging.debug("stop session: %s", request) - session = self.coreemu.create_session(request.session_id) + session = self.get_session(request.session_id, context) session.set_state(EventTypes.DATACOLLECT_STATE) session.clear() return core_pb2.StopSessionResponse(result=True) @@ -808,32 +651,12 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): """ logging.debug("add node: %s", request) session = self.get_session(request.session_id, context) - - node_proto = request.node - node_id = node_proto.id - node_type = node_proto.type - if node_type is None: - node_type = NodeTypes.DEFAULT.value - node_type = NodeTypes(node_type) - - options = NodeOptions(name=node_proto.name, model=node_proto.model) - options.icon = node_proto.icon - options.opaque = node_proto.opaque - options.image = node_proto.image - options.services = node_proto.services - if node_proto.server: - options.server = node_proto.server - - position = node_proto.position - options.set_position(position.x, position.y) - options.set_location(position.lat, position.lon, position.alt) - node = session.add_node(_type=node_type, _id=node_id, options=options) - + _type, _id, options = grpcutils.add_node_data(request.node) + node = session.add_node(_type=_type, _id=_id, options=options) # configure emane if provided - emane_model = node_proto.emane + emane_model = request.node.emane if emane_model: - session.emane.set_model_config(node_id, emane_model) - + session.emane.set_model_config(id, emane_model) return core_pb2.AddNodeResponse(node_id=node.id) def GetNode(self, request, context): @@ -991,82 +814,16 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): :rtype: core.api.grpc.AddLinkResponse """ logging.debug("add link: %s", request) + # validate session and nodes session = self.get_session(request.session_id, context) - - # validate node exist self.get_node(session, request.link.node_one_id, context) self.get_node(session, request.link.node_two_id, context) + node_one_id = request.link.node_one_id node_two_id = request.link.node_two_id - - interface_one = None - interface_one_data = request.link.interface_one - if interface_one_data: - name = interface_one_data.name - if name == "": - name = None - mac = interface_one_data.mac - if mac == "": - mac = None - else: - mac = MacAddress.from_string(mac) - interface_one = InterfaceData( - _id=interface_one_data.id, - name=name, - mac=mac, - ip4=interface_one_data.ip4, - ip4_mask=interface_one_data.ip4mask, - ip6=interface_one_data.ip6, - ip6_mask=interface_one_data.ip6mask, - ) - - interface_two = None - interface_two_data = request.link.interface_two - if interface_two_data: - name = interface_two_data.name - if name == "": - name = None - mac = interface_two_data.mac - if mac == "": - mac = None - else: - mac = MacAddress.from_string(mac) - interface_two = InterfaceData( - _id=interface_two_data.id, - name=name, - mac=mac, - ip4=interface_two_data.ip4, - ip4_mask=interface_two_data.ip4mask, - ip6=interface_two_data.ip6, - ip6_mask=interface_two_data.ip6mask, - ) - - link_type = None - link_type_value = request.link.type - if link_type_value is not None: - link_type = LinkTypes(link_type_value) - - options_data = request.link.options - link_options = LinkOptions(_type=link_type) - if options_data: - link_options.delay = options_data.delay - link_options.bandwidth = options_data.bandwidth - link_options.per = options_data.per - link_options.dup = options_data.dup - link_options.jitter = options_data.jitter - link_options.mer = options_data.mer - link_options.burst = options_data.burst - link_options.mburst = options_data.mburst - link_options.unidirectional = options_data.unidirectional - link_options.key = options_data.key - link_options.opaque = options_data.opaque - + interface_one, interface_two, options = grpcutils.add_link_data(request.link) session.add_link( - node_one_id, - node_two_id, - interface_one, - interface_two, - link_options=link_options, + node_one_id, node_two_id, interface_one, interface_two, link_options=options ) return core_pb2.AddLinkResponse(result=True) diff --git a/daemon/examples/grpc/large.py b/daemon/examples/grpc/large.py index c2a4d0b6..ef1e6cc4 100644 --- a/daemon/examples/grpc/large.py +++ b/daemon/examples/grpc/large.py @@ -26,47 +26,27 @@ def main(): node = core_pb2.Node(id=i, position=position, model="PC") nodes.append(node) - # start session + # create links + interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16") links = [] + for node in nodes: + interface_one = interface_helper.create_interface(node.id, 0) + link = core_pb2.Link( + type=core_pb2.LinkType.WIRED, + node_one_id=node.id, + node_two_id=switch.id, + interface_one=interface_one, + ) + links.append(link) + + # start session response = core.start_session(session_id, nodes, links) logging.info("started session: %s", response) - # handle events session may broadcast - # core.events(session_id, log_event) + input("press enter to shutdown session") - # change session state - # response = core.set_session_state( - # session_id, core_pb2.SessionState.CONFIGURATION - # ) - # logging.info("set session state: %s", response) - - # create switch node - # switch = core_pb2.Node(type=core_pb2.NodeType.SWITCH) - # response = core.add_node(session_id, switch) - # logging.info("created switch: %s", response) - # switch_id = response.node_id - - # helper to create interfaces - # interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16") - # - # for i in range(2): - # # create node - # position = core_pb2.Position(x=50 + 50 * i, y=50) - # node = core_pb2.Node(position=position) - # response = core.add_node(session_id, node) - # logging.info("created node: %s", response) - # node_id = response.node_id - # - # # create link - # interface_one = interface_helper.create_interface(node_id, 0) - # response = core.add_link(session_id, node_id, switch_id, interface_one) - # logging.info("created link: %s", response) - - # change session state - # response = core.set_session_state( - # session_id, core_pb2.SessionState.INSTANTIATION - # ) - # logging.info("set session state: %s", response) + response = core.stop_session(session_id) + logging.info("stop sessionL %s", response) if __name__ == "__main__":