diff --git a/daemon/core/api/grpc/grpcutils.py b/daemon/core/api/grpc/grpcutils.py index aec094d3..ea166328 100644 --- a/daemon/core/api/grpc/grpcutils.py +++ b/daemon/core/api/grpc/grpcutils.py @@ -1,7 +1,7 @@ -import concurrent.futures import logging import time +from core import utils from core.api.grpc import core_pb2 from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions from core.emulator.enumerations import LinkTypes, NodeTypes @@ -112,23 +112,15 @@ def create_nodes(session, node_protos): :return: results and exceptions for created nodes :rtype: tuple """ + funcs = [] + for node_proto in node_protos: + _type, _id, options = add_node_data(node_proto) + args = (_type, _id, options) + funcs.append((session.add_node, args, {})) start = time.monotonic() - with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: - futures = [] - for node_proto in node_protos: - _type, _id, options = add_node_data(node_proto) - future = executor.submit(session.add_node, _type, _id, options) - futures.append(future) - results = [] - exceptions = [] - for future in concurrent.futures.as_completed(futures): - try: - result = future.result() - results.append(result) - except Exception as executor: - exceptions.append(executor) + results, exceptions = utils.threadpool(funcs) total = time.monotonic() - start - logging.info("created nodes time: %s", total) + logging.debug("grpc created nodes time: %s", total) return results, exceptions @@ -141,32 +133,17 @@ def create_links(session, link_protos): :return: results and exceptions for created links :rtype: tuple """ + funcs = [] + for link_proto in link_protos: + node_one_id = link_proto.node_one_id + node_two_id = link_proto.node_two_id + interface_one, interface_two, options = add_link_data(link_proto) + args = (node_one_id, node_two_id, interface_one, interface_two, options) + funcs.append((session.add_link, args, {})) start = time.monotonic() - with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: - futures = [] - for link_proto in link_protos: - node_one_id = link_proto.node_one_id - node_two_id = link_proto.node_two_id - interface_one, interface_two, options = add_link_data(link_proto) - future = executor.submit( - session.add_link, - node_one_id, - node_two_id, - interface_one, - interface_two, - options, - ) - futures.append(future) - results = [] - exceptions = [] - for future in concurrent.futures.as_completed(futures): - try: - result = future.result() - results.append(result) - except Exception as executor: - exceptions.append(executor) + results, exceptions = utils.threadpool(funcs) total = time.monotonic() - start - logging.info("created links time: %s", total) + logging.debug("grpc created links time: %s", total) return results, exceptions diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index c516ea45..d58d9b9d 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -126,7 +126,6 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): grpcutils.create_nodes(session, request.nodes) # create links - logging.info("links: %s", request.links) grpcutils.create_links(session, request.links) # set to instantiation and start @@ -146,8 +145,10 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): """ logging.debug("stop session: %s", request) session = self.get_session(request.session_id, context) + session.data_collect() session.set_state(EventTypes.DATACOLLECT_STATE) session.clear() + session.set_state(EventTypes.SHUTDOWN_STATE) return core_pb2.StopSessionResponse(result=True) def CreateSession(self, request, context): diff --git a/daemon/core/emulator/session.py b/daemon/core/emulator/session.py index fcebcd9e..768ea524 100644 --- a/daemon/core/emulator/session.py +++ b/daemon/core/emulator/session.py @@ -12,7 +12,6 @@ import subprocess import tempfile import threading import time -from multiprocessing.pool import ThreadPool from core import constants, utils from core.emane.emanemanager import EmaneManager @@ -1366,9 +1365,11 @@ class Session: Clear the nodes dictionary, and call shutdown for each node. """ with self._nodes_lock: + funcs = [] while self.nodes: _, node = self.nodes.popitem() - node.shutdown() + funcs.append((node.shutdown, [], {})) + utils.threadpool(funcs) self.node_id_gen.id = 0 def write_nodes(self): @@ -1508,11 +1509,14 @@ class Session: # stop node services with self._nodes_lock: + funcs = [] for node_id in self.nodes: node = self.nodes[node_id] - # TODO: determine if checking for CoreNode alone is ok if isinstance(node, CoreNodeBase): self.services.stop_services(node) + args = (node,) + funcs.append((self.services.stop_services, args, {})) + utils.threadpool(funcs) # shutdown emane self.emane.shutdown() @@ -1520,7 +1524,8 @@ class Session: # update control interface hosts self.update_control_interface_hosts(remove=True) - # remove all four possible control networks. Does nothing if ctrlnet is not installed. + # remove all four possible control networks. Does nothing if ctrlnet is not + # installed. self.add_remove_control_interface(node=None, net_index=0, remove=True) self.add_remove_control_interface(node=None, net_index=1, remove=True) self.add_remove_control_interface(node=None, net_index=2, remove=True) @@ -1551,6 +1556,18 @@ class Session: ssid = (self.id >> 8) ^ (self.id & ((1 << 8) - 1)) return f"{ssid:x}" + def boot_node(self, node): + """ + Boot node by adding a control interface when necessary and starting + node services. + + :param core.nodes.base.CoreNodeBase node: node to boot + :return: nothing + """ + logging.info("booting node(%s): %s", node.name, [x.name for x in node.services]) + self.add_remove_control_interface(node=node, remove=False) + self.services.boot_services(node) + def boot_nodes(self): """ Invoke the boot() procedure for all nodes and send back node @@ -1558,29 +1575,18 @@ class Session: request flag. """ with self._nodes_lock: - pool = ThreadPool() - results = [] - - start = time.time() + funcs = [] + start = time.monotonic() for _id in self.nodes: node = self.nodes[_id] if isinstance(node, CoreNodeBase) and not isinstance(node, Rj45Node): - # add a control interface if configured - logging.info( - "booting node(%s): %s", - node.name, - [x.name for x in node.services], - ) - self.add_remove_control_interface(node=node, remove=False) - result = pool.apply_async(self.services.boot_services, (node,)) - results.append(result) - - pool.close() - pool.join() - for result in results: - result.get() - logging.debug("boot run time: %s", time.time() - start) - + args = (node,) + funcs.append((self.boot_node, args, {})) + results, exceptions = utils.threadpool(funcs) + total = time.monotonic() - start + logging.debug("boot run time: %s", total) + if exceptions: + raise CoreError(exceptions) self.update_control_interface_hosts() def get_control_net_prefixes(self): @@ -1730,7 +1736,7 @@ class Session: If conf_reqd is False, the control network may be built even when the user has not configured one (e.g. for EMANE.) - :param core.nodes.base.CoreNode node: node to add or remove control interface + :param core.nodes.base.CoreNodeBase node: node to add or remove control interface :param int net_index: network index :param bool remove: flag to check if it should be removed :param bool conf_required: flag to check if conf is required diff --git a/daemon/core/services/coreservices.py b/daemon/core/services/coreservices.py index 2a4cb178..80168425 100644 --- a/daemon/core/services/coreservices.py +++ b/daemon/core/services/coreservices.py @@ -10,7 +10,6 @@ services. import enum import logging import time -from multiprocessing.pool import ThreadPool from core import utils from core.constants import which @@ -462,18 +461,14 @@ class CoreServices: :param core.netns.vnode.LxcNode node: node to start services on :return: nothing """ - pool = ThreadPool() - results = [] - + funcs = [] boot_paths = ServiceDependencies(node.services).boot_paths() for boot_path in boot_paths: - result = pool.apply_async(self._start_boot_paths, (node, boot_path)) - results.append(result) - - pool.close() - pool.join() - for result in results: - result.get() + args = (node, boot_path) + funcs.append((self._start_boot_paths, args, {})) + result, exceptions = utils.threadpool(funcs) + if exceptions: + raise ServiceBootError(exceptions) def _start_boot_paths(self, node, boot_path): """ diff --git a/daemon/core/utils.py b/daemon/core/utils.py index e16efd1f..413df156 100644 --- a/daemon/core/utils.py +++ b/daemon/core/utils.py @@ -2,6 +2,7 @@ Miscellaneous utility functions, wrappers around some subprocess procedures. """ +import concurrent.futures import fcntl import hashlib import importlib @@ -381,3 +382,29 @@ def load_logging_config(config_path): with open(config_path, "r") as log_config_file: log_config = json.load(log_config_file) logging.config.dictConfig(log_config) + + +def threadpool(funcs, workers=10): + """ + Run provided functions, arguments, and keywords within a threadpool + collecting results and exceptions. + + :param iter funcs: iterable that provides a func, args, kwargs + :param int workers: number of workers for the threadpool + :return: results and exceptions from running functions with args and kwargs + :rtype: tuple + """ + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + futures = [] + for func, args, kwargs in funcs: + future = executor.submit(func, *args, **kwargs) + futures.append(future) + results = [] + exceptions = [] + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + exceptions.append(e) + return results, exceptions