updates to grpc StartSession, added utility threadpool function to help improve speed when running certain tasks, made use of utility threadpool function where needed

This commit is contained in:
bharnden 2019-10-29 10:25:39 -07:00
parent 236ac7919a
commit 4e03dc6888
5 changed files with 83 additions and 77 deletions

View file

@ -1,7 +1,7 @@
import concurrent.futures
import logging import logging
import time import time
from core import utils
from core.api.grpc import core_pb2 from core.api.grpc import core_pb2
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import LinkTypes, NodeTypes from core.emulator.enumerations import LinkTypes, NodeTypes
@ -112,23 +112,15 @@ def create_nodes(session, node_protos):
:return: results and exceptions for created nodes :return: results and exceptions for created nodes
:rtype: tuple :rtype: tuple
""" """
funcs = []
for node_proto in node_protos:
_type, _id, options = add_node_data(node_proto)
args = (_type, _id, options)
funcs.append((session.add_node, args, {}))
start = time.monotonic() start = time.monotonic()
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: results, exceptions = utils.threadpool(funcs)
futures = []
for node_proto in node_protos:
_type, _id, options = add_node_data(node_proto)
future = executor.submit(session.add_node, _type, _id, options)
futures.append(future)
results = []
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as executor:
exceptions.append(executor)
total = time.monotonic() - start total = time.monotonic() - start
logging.info("created nodes time: %s", total) logging.debug("grpc created nodes time: %s", total)
return results, exceptions return results, exceptions
@ -141,32 +133,17 @@ def create_links(session, link_protos):
:return: results and exceptions for created links :return: results and exceptions for created links
:rtype: tuple :rtype: tuple
""" """
funcs = []
for link_proto in link_protos:
node_one_id = link_proto.node_one_id
node_two_id = link_proto.node_two_id
interface_one, interface_two, options = add_link_data(link_proto)
args = (node_one_id, node_two_id, interface_one, interface_two, options)
funcs.append((session.add_link, args, {}))
start = time.monotonic() start = time.monotonic()
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: results, exceptions = utils.threadpool(funcs)
futures = []
for link_proto in link_protos:
node_one_id = link_proto.node_one_id
node_two_id = link_proto.node_two_id
interface_one, interface_two, options = add_link_data(link_proto)
future = executor.submit(
session.add_link,
node_one_id,
node_two_id,
interface_one,
interface_two,
options,
)
futures.append(future)
results = []
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as executor:
exceptions.append(executor)
total = time.monotonic() - start total = time.monotonic() - start
logging.info("created links time: %s", total) logging.debug("grpc created links time: %s", total)
return results, exceptions return results, exceptions

View file

@ -126,7 +126,6 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
grpcutils.create_nodes(session, request.nodes) grpcutils.create_nodes(session, request.nodes)
# create links # create links
logging.info("links: %s", request.links)
grpcutils.create_links(session, request.links) grpcutils.create_links(session, request.links)
# set to instantiation and start # set to instantiation and start
@ -146,8 +145,10 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
""" """
logging.debug("stop session: %s", request) logging.debug("stop session: %s", request)
session = self.get_session(request.session_id, context) session = self.get_session(request.session_id, context)
session.data_collect()
session.set_state(EventTypes.DATACOLLECT_STATE) session.set_state(EventTypes.DATACOLLECT_STATE)
session.clear() session.clear()
session.set_state(EventTypes.SHUTDOWN_STATE)
return core_pb2.StopSessionResponse(result=True) return core_pb2.StopSessionResponse(result=True)
def CreateSession(self, request, context): def CreateSession(self, request, context):

View file

@ -12,7 +12,6 @@ import subprocess
import tempfile import tempfile
import threading import threading
import time import time
from multiprocessing.pool import ThreadPool
from core import constants, utils from core import constants, utils
from core.emane.emanemanager import EmaneManager from core.emane.emanemanager import EmaneManager
@ -1366,9 +1365,11 @@ class Session:
Clear the nodes dictionary, and call shutdown for each node. Clear the nodes dictionary, and call shutdown for each node.
""" """
with self._nodes_lock: with self._nodes_lock:
funcs = []
while self.nodes: while self.nodes:
_, node = self.nodes.popitem() _, node = self.nodes.popitem()
node.shutdown() funcs.append((node.shutdown, [], {}))
utils.threadpool(funcs)
self.node_id_gen.id = 0 self.node_id_gen.id = 0
def write_nodes(self): def write_nodes(self):
@ -1508,11 +1509,14 @@ class Session:
# stop node services # stop node services
with self._nodes_lock: with self._nodes_lock:
funcs = []
for node_id in self.nodes: for node_id in self.nodes:
node = self.nodes[node_id] node = self.nodes[node_id]
# TODO: determine if checking for CoreNode alone is ok
if isinstance(node, CoreNodeBase): if isinstance(node, CoreNodeBase):
self.services.stop_services(node) self.services.stop_services(node)
args = (node,)
funcs.append((self.services.stop_services, args, {}))
utils.threadpool(funcs)
# shutdown emane # shutdown emane
self.emane.shutdown() self.emane.shutdown()
@ -1520,7 +1524,8 @@ class Session:
# update control interface hosts # update control interface hosts
self.update_control_interface_hosts(remove=True) self.update_control_interface_hosts(remove=True)
# remove all four possible control networks. Does nothing if ctrlnet is not installed. # remove all four possible control networks. Does nothing if ctrlnet is not
# installed.
self.add_remove_control_interface(node=None, net_index=0, remove=True) self.add_remove_control_interface(node=None, net_index=0, remove=True)
self.add_remove_control_interface(node=None, net_index=1, remove=True) self.add_remove_control_interface(node=None, net_index=1, remove=True)
self.add_remove_control_interface(node=None, net_index=2, remove=True) self.add_remove_control_interface(node=None, net_index=2, remove=True)
@ -1551,6 +1556,18 @@ class Session:
ssid = (self.id >> 8) ^ (self.id & ((1 << 8) - 1)) ssid = (self.id >> 8) ^ (self.id & ((1 << 8) - 1))
return f"{ssid:x}" return f"{ssid:x}"
def boot_node(self, node):
"""
Boot node by adding a control interface when necessary and starting
node services.
:param core.nodes.base.CoreNodeBase node: node to boot
:return: nothing
"""
logging.info("booting node(%s): %s", node.name, [x.name for x in node.services])
self.add_remove_control_interface(node=node, remove=False)
self.services.boot_services(node)
def boot_nodes(self): def boot_nodes(self):
""" """
Invoke the boot() procedure for all nodes and send back node Invoke the boot() procedure for all nodes and send back node
@ -1558,29 +1575,18 @@ class Session:
request flag. request flag.
""" """
with self._nodes_lock: with self._nodes_lock:
pool = ThreadPool() funcs = []
results = [] start = time.monotonic()
start = time.time()
for _id in self.nodes: for _id in self.nodes:
node = self.nodes[_id] node = self.nodes[_id]
if isinstance(node, CoreNodeBase) and not isinstance(node, Rj45Node): if isinstance(node, CoreNodeBase) and not isinstance(node, Rj45Node):
# add a control interface if configured args = (node,)
logging.info( funcs.append((self.boot_node, args, {}))
"booting node(%s): %s", results, exceptions = utils.threadpool(funcs)
node.name, total = time.monotonic() - start
[x.name for x in node.services], logging.debug("boot run time: %s", total)
) if exceptions:
self.add_remove_control_interface(node=node, remove=False) raise CoreError(exceptions)
result = pool.apply_async(self.services.boot_services, (node,))
results.append(result)
pool.close()
pool.join()
for result in results:
result.get()
logging.debug("boot run time: %s", time.time() - start)
self.update_control_interface_hosts() self.update_control_interface_hosts()
def get_control_net_prefixes(self): def get_control_net_prefixes(self):
@ -1730,7 +1736,7 @@ class Session:
If conf_reqd is False, the control network may be built even If conf_reqd is False, the control network may be built even
when the user has not configured one (e.g. for EMANE.) when the user has not configured one (e.g. for EMANE.)
:param core.nodes.base.CoreNode node: node to add or remove control interface :param core.nodes.base.CoreNodeBase node: node to add or remove control interface
:param int net_index: network index :param int net_index: network index
:param bool remove: flag to check if it should be removed :param bool remove: flag to check if it should be removed
:param bool conf_required: flag to check if conf is required :param bool conf_required: flag to check if conf is required

View file

@ -10,7 +10,6 @@ services.
import enum import enum
import logging import logging
import time import time
from multiprocessing.pool import ThreadPool
from core import utils from core import utils
from core.constants import which from core.constants import which
@ -462,18 +461,14 @@ class CoreServices:
:param core.netns.vnode.LxcNode node: node to start services on :param core.netns.vnode.LxcNode node: node to start services on
:return: nothing :return: nothing
""" """
pool = ThreadPool() funcs = []
results = []
boot_paths = ServiceDependencies(node.services).boot_paths() boot_paths = ServiceDependencies(node.services).boot_paths()
for boot_path in boot_paths: for boot_path in boot_paths:
result = pool.apply_async(self._start_boot_paths, (node, boot_path)) args = (node, boot_path)
results.append(result) funcs.append((self._start_boot_paths, args, {}))
result, exceptions = utils.threadpool(funcs)
pool.close() if exceptions:
pool.join() raise ServiceBootError(exceptions)
for result in results:
result.get()
def _start_boot_paths(self, node, boot_path): def _start_boot_paths(self, node, boot_path):
""" """

View file

@ -2,6 +2,7 @@
Miscellaneous utility functions, wrappers around some subprocess procedures. Miscellaneous utility functions, wrappers around some subprocess procedures.
""" """
import concurrent.futures
import fcntl import fcntl
import hashlib import hashlib
import importlib import importlib
@ -381,3 +382,29 @@ def load_logging_config(config_path):
with open(config_path, "r") as log_config_file: with open(config_path, "r") as log_config_file:
log_config = json.load(log_config_file) log_config = json.load(log_config_file)
logging.config.dictConfig(log_config) logging.config.dictConfig(log_config)
def threadpool(funcs, workers=10):
"""
Run provided functions, arguments, and keywords within a threadpool
collecting results and exceptions.
:param iter funcs: iterable that provides a func, args, kwargs
:param int workers: number of workers for the threadpool
:return: results and exceptions from running functions with args and kwargs
:rtype: tuple
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for func, args, kwargs in funcs:
future = executor.submit(func, *args, **kwargs)
futures.append(future)
results = []
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
exceptions.append(e)
return results, exceptions