moved grpc utility functions into grpcutils, updated StartSession to threadpool node and link creation

This commit is contained in:
bharnden 2019-10-28 23:11:15 -07:00
parent 934ea96558
commit 236ac7919a
3 changed files with 335 additions and 322 deletions

View file

@ -1,12 +1,24 @@
import asyncio import concurrent.futures
import logging import logging
import time import time
from core.emulator.emudata import NodeOptions from core.api.grpc import core_pb2
from core.emulator.enumerations import NodeTypes from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import LinkTypes, NodeTypes
from core.nodes.base import CoreNetworkBase
from core.nodes.ipaddress import MacAddress
WORKERS = 10
def add_node_data(node_proto): def add_node_data(node_proto):
"""
Convert node protobuf message to data for creating a node.
:param core_pb2.Node node_proto: node proto message
:return: node type, id, and options
:rtype: tuple
"""
_id = node_proto.id _id = node_proto.id
_type = node_proto.type _type = node_proto.type
if _type is None: if _type is None:
@ -27,29 +39,293 @@ def add_node_data(node_proto):
return _type, _id, options return _type, _id, options
async def async_add_node(session, node_proto): def link_interface(interface_proto):
_type, _id, options = add_node_data(node_proto) """
session.add_node(_type=_type, _id=_id, options=options) Create interface data from interface proto.
:param core_pb2.Interface interface_proto: interface proto
:return: interface data
:rtype: InterfaceData
"""
interface = None
if interface_proto:
name = interface_proto.name
if name == "":
name = None
mac = interface_proto.mac
if mac == "":
mac = None
else:
mac = MacAddress.from_string(mac)
interface = InterfaceData(
_id=interface_proto.id,
name=name,
mac=mac,
ip4=interface_proto.ip4,
ip4_mask=interface_proto.ip4mask,
ip6=interface_proto.ip6,
ip6_mask=interface_proto.ip6mask,
)
return interface
async def create_nodes(loop, session, node_protos): def add_link_data(link_proto):
tasks = [] """
for node_proto in node_protos: Convert link proto to link interfaces and options data.
task = loop.create_task(async_add_node(session, node_proto))
tasks.append(task)
:param core_pb2.Link link_proto: link proto
:return: link interfaces and options
:rtype: tuple
"""
interface_one = link_interface(link_proto.interface_one)
interface_two = link_interface(link_proto.interface_two)
link_type = None
link_type_value = link_proto.type
if link_type_value is not None:
link_type = LinkTypes(link_type_value)
options = LinkOptions(_type=link_type)
options_data = link_proto.options
if options_data:
options.delay = options_data.delay
options.bandwidth = options_data.bandwidth
options.per = options_data.per
options.dup = options_data.dup
options.jitter = options_data.jitter
options.mer = options_data.mer
options.burst = options_data.burst
options.mburst = options_data.mburst
options.unidirectional = options_data.unidirectional
options.key = options_data.key
options.opaque = options_data.opaque
return interface_one, interface_two, options
def create_nodes(session, node_protos):
"""
Create nodes using a thread pool and wait for completion.
:param core.emulator.session.Session session: session to create nodes in
:param list[core_pb2.Node] node_protos: node proto messages
:return: results and exceptions for created nodes
:rtype: tuple
"""
start = time.monotonic() start = time.monotonic()
results = await asyncio.gather(*tasks, return_exceptions=True) with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
futures = []
for node_proto in node_protos:
_type, _id, options = add_node_data(node_proto)
future = executor.submit(session.add_node, _type, _id, options)
futures.append(future)
results = []
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as executor:
exceptions.append(executor)
total = time.monotonic() - start total = time.monotonic() - start
logging.info("created nodes time: %s", total)
return results, exceptions
logging.info(f"created nodes time: {total}")
def create_links(session, link_protos):
"""
Create nodes using a thread pool and wait for completion.
:param core.emulator.session.Session session: session to create nodes in
:param list[core_pb2.Link] link_protos: link proto messages
:return: results and exceptions for created links
:rtype: tuple
"""
start = time.monotonic()
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
futures = []
for link_proto in link_protos:
node_one_id = link_proto.node_one_id
node_two_id = link_proto.node_two_id
interface_one, interface_two, options = add_link_data(link_proto)
future = executor.submit(
session.add_link,
node_one_id,
node_two_id,
interface_one,
interface_two,
options,
)
futures.append(future)
results = []
exceptions = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as executor:
exceptions.append(executor)
total = time.monotonic() - start
logging.info("created links time: %s", total)
return results, exceptions
def convert_value(value):
"""
Convert value into string.
:param value: value
:return: string conversion of the value
:rtype: str
"""
if value is not None:
value = str(value)
return value
def get_config_options(config, configurable_options):
"""
Retrieve configuration options in a form that is used by the grpc server.
:param dict config: configuration
:param core.config.ConfigurableOptions configurable_options: configurable options
:return: mapping of configuration ids to configuration options
:rtype: dict[str,core.api.grpc.core_pb2.ConfigOption]
"""
results = {}
for configuration in configurable_options.configurations():
value = config[configuration.id]
config_option = core_pb2.ConfigOption(
label=configuration.label,
name=configuration.id,
value=value,
type=configuration.type.value,
select=configuration.options,
)
results[configuration.id] = config_option
for config_group in configurable_options.config_groups():
start = config_group.start - 1
stop = config_group.stop
options = list(results.values())[start:stop]
for option in options:
option.group = config_group.name
return results return results
def sync_create_nodes(session, node_protos): def get_links(session, node):
start = time.monotonic() """
for node_proto in node_protos: Retrieve a list of links for grpc to use
_type, _id, options = add_node_data(node_proto)
session.add_node(_type=_type, _id=_id, options=options) :param core.emulator.Session session: node's section
total = time.monotonic() - start :param core.nodes.base.CoreNode node: node to get links from
logging.info(f"created nodes time: {total}") :return: [core.api.grpc.core_pb2.Link]
"""
links = []
for link_data in node.all_link_data(0):
link = convert_link(session, link_data)
links.append(link)
return links
def get_emane_model_id(node_id, interface_id):
"""
Get EMANE model id
:param int node_id: node id
:param int interface_id: interface id
:return: EMANE model id
:rtype: int
"""
if interface_id >= 0:
return node_id * 1000 + interface_id
else:
return node_id
def convert_link(session, link_data):
"""
Convert link_data into core protobuf Link
:param core.emulator.session.Session session:
:param core.emulator.data.LinkData link_data:
:return: core protobuf Link
:rtype: core.api.grpc.core_pb2.Link
"""
interface_one = None
if link_data.interface1_id is not None:
node = session.get_node(link_data.node1_id)
interface_name = None
if not isinstance(node, CoreNetworkBase):
interface = node.netif(link_data.interface1_id)
interface_name = interface.name
interface_one = core_pb2.Interface(
id=link_data.interface1_id,
name=interface_name,
mac=convert_value(link_data.interface1_mac),
ip4=convert_value(link_data.interface1_ip4),
ip4mask=link_data.interface1_ip4_mask,
ip6=convert_value(link_data.interface1_ip6),
ip6mask=link_data.interface1_ip6_mask,
)
interface_two = None
if link_data.interface2_id is not None:
node = session.get_node(link_data.node2_id)
interface_name = None
if not isinstance(node, CoreNetworkBase):
interface = node.netif(link_data.interface2_id)
interface_name = interface.name
interface_two = core_pb2.Interface(
id=link_data.interface2_id,
name=interface_name,
mac=convert_value(link_data.interface2_mac),
ip4=convert_value(link_data.interface2_ip4),
ip4mask=link_data.interface2_ip4_mask,
ip6=convert_value(link_data.interface2_ip6),
ip6mask=link_data.interface2_ip6_mask,
)
options = core_pb2.LinkOptions(
opaque=link_data.opaque,
jitter=link_data.jitter,
key=link_data.key,
mburst=link_data.mburst,
mer=link_data.mer,
per=link_data.per,
bandwidth=link_data.bandwidth,
burst=link_data.burst,
delay=link_data.delay,
dup=link_data.dup,
unidirectional=link_data.unidirectional,
)
return core_pb2.Link(
type=link_data.link_type,
node_one_id=link_data.node1_id,
node_two_id=link_data.node2_id,
interface_one=interface_one,
interface_two=interface_two,
options=options,
)
def get_net_stats():
"""
Retrieve status about the current interfaces in the system
:return: send and receive status of the interfaces in the system
:rtype: dict
"""
with open("/proc/net/dev", "r") as f:
data = f.readlines()[2:]
stats = {}
for line in data:
line = line.strip()
if not line:
continue
line = line.split()
line[0] = line[0].strip(":")
stats[line[0]] = {"rx": float(line[1]), "tx": float(line[9])}
return stats

View file

@ -10,6 +10,13 @@ from queue import Empty, Queue
import grpc import grpc
from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils
from core.api.grpc.grpcutils import (
convert_value,
get_config_options,
get_emane_model_id,
get_links,
get_net_stats,
)
from core.emane.nodes import EmaneNet from core.emane.nodes import EmaneNet
from core.emulator.data import ( from core.emulator.data import (
ConfigData, ConfigData,
@ -19,13 +26,11 @@ from core.emulator.data import (
LinkData, LinkData,
NodeData, NodeData,
) )
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions from core.emulator.emudata import LinkOptions, NodeOptions
from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags, NodeTypes from core.emulator.enumerations import EventTypes, LinkTypes, MessageFlags
from core.errors import CoreCommandError, CoreError from core.errors import CoreCommandError, CoreError
from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility
from core.nodes.base import CoreNetworkBase
from core.nodes.docker import DockerNode from core.nodes.docker import DockerNode
from core.nodes.ipaddress import MacAddress
from core.nodes.lxd import LxcNode from core.nodes.lxd import LxcNode
from core.services.coreservices import ServiceManager from core.services.coreservices import ServiceManager
@ -33,167 +38,6 @@ _ONE_DAY_IN_SECONDS = 60 * 60 * 24
_INTERFACE_REGEX = re.compile(r"\d+") _INTERFACE_REGEX = re.compile(r"\d+")
def convert_value(value):
"""
Convert value into string.
:param value: value
:return: string conversion of the value
:rtype: str
"""
if value is not None:
value = str(value)
return value
def get_config_options(config, configurable_options):
"""
Retrieve configuration options in a form that is used by the grpc server.
:param dict config: configuration
:param core.config.ConfigurableOptions configurable_options: configurable options
:return: mapping of configuration ids to configuration options
:rtype: dict[str,core.api.grpc.core_pb2.ConfigOption]
"""
results = {}
for configuration in configurable_options.configurations():
value = config[configuration.id]
config_option = core_pb2.ConfigOption(
label=configuration.label,
name=configuration.id,
value=value,
type=configuration.type.value,
select=configuration.options,
)
results[configuration.id] = config_option
for config_group in configurable_options.config_groups():
start = config_group.start - 1
stop = config_group.stop
options = list(results.values())[start:stop]
for option in options:
option.group = config_group.name
return results
def get_links(session, node):
"""
Retrieve a list of links for grpc to use
:param core.emulator.Session session: node's section
:param core.nodes.base.CoreNode node: node to get links from
:return: [core.api.grpc.core_pb2.Link]
"""
links = []
for link_data in node.all_link_data(0):
link = convert_link(session, link_data)
links.append(link)
return links
def get_emane_model_id(node_id, interface_id):
"""
Get EMANE model id
:param int node_id: node id
:param int interface_id: interface id
:return: EMANE model id
:rtype: int
"""
if interface_id >= 0:
return node_id * 1000 + interface_id
else:
return node_id
def convert_link(session, link_data):
"""
Convert link_data into core protobuf Link
:param core.emulator.session.Session session:
:param core.emulator.data.LinkData link_data:
:return: core protobuf Link
:rtype: core.api.grpc.core_pb2.Link
"""
interface_one = None
if link_data.interface1_id is not None:
node = session.get_node(link_data.node1_id)
interface_name = None
if not isinstance(node, CoreNetworkBase):
interface = node.netif(link_data.interface1_id)
interface_name = interface.name
interface_one = core_pb2.Interface(
id=link_data.interface1_id,
name=interface_name,
mac=convert_value(link_data.interface1_mac),
ip4=convert_value(link_data.interface1_ip4),
ip4mask=link_data.interface1_ip4_mask,
ip6=convert_value(link_data.interface1_ip6),
ip6mask=link_data.interface1_ip6_mask,
)
interface_two = None
if link_data.interface2_id is not None:
node = session.get_node(link_data.node2_id)
interface_name = None
if not isinstance(node, CoreNetworkBase):
interface = node.netif(link_data.interface2_id)
interface_name = interface.name
interface_two = core_pb2.Interface(
id=link_data.interface2_id,
name=interface_name,
mac=convert_value(link_data.interface2_mac),
ip4=convert_value(link_data.interface2_ip4),
ip4mask=link_data.interface2_ip4_mask,
ip6=convert_value(link_data.interface2_ip6),
ip6mask=link_data.interface2_ip6_mask,
)
options = core_pb2.LinkOptions(
opaque=link_data.opaque,
jitter=link_data.jitter,
key=link_data.key,
mburst=link_data.mburst,
mer=link_data.mer,
per=link_data.per,
bandwidth=link_data.bandwidth,
burst=link_data.burst,
delay=link_data.delay,
dup=link_data.dup,
unidirectional=link_data.unidirectional,
)
return core_pb2.Link(
type=link_data.link_type,
node_one_id=link_data.node1_id,
node_two_id=link_data.node2_id,
interface_one=interface_one,
interface_two=interface_two,
options=options,
)
def get_net_stats():
"""
Retrieve status about the current interfaces in the system
:return: send and receive status of the interfaces in the system
:rtype: dict
"""
with open("/proc/net/dev", "r") as f:
data = f.readlines()[2:]
stats = {}
for line in data:
line = line.strip()
if not line:
continue
line = line.split()
line[0] = line[0].strip(":")
stats[line[0]] = {"rx": float(line[1]), "tx": float(line[9])}
return stats
class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
""" """
Create CoreGrpcServer instance Create CoreGrpcServer instance
@ -279,12 +123,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
os.mkdir(session.session_dir) os.mkdir(session.session_dir)
# create nodes # create nodes
# loop = asyncio.new_event_loop() grpcutils.create_nodes(session, request.nodes)
# asyncio.set_event_loop(loop)
# results = loop.run_until_complete( # create links
# grpcutils.create_nodes(loop, session, request.nodes) logging.info("links: %s", request.links)
# ) grpcutils.create_links(session, request.links)
grpcutils.sync_create_nodes(session, request.nodes)
# set to instantiation and start # set to instantiation and start
session.set_state(EventTypes.INSTANTIATION_STATE) session.set_state(EventTypes.INSTANTIATION_STATE)
@ -302,7 +145,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
:rtype: core.api.grpc.core_pb2.StopSessionResponse :rtype: core.api.grpc.core_pb2.StopSessionResponse
""" """
logging.debug("stop session: %s", request) logging.debug("stop session: %s", request)
session = self.coreemu.create_session(request.session_id) session = self.get_session(request.session_id, context)
session.set_state(EventTypes.DATACOLLECT_STATE) session.set_state(EventTypes.DATACOLLECT_STATE)
session.clear() session.clear()
return core_pb2.StopSessionResponse(result=True) return core_pb2.StopSessionResponse(result=True)
@ -808,32 +651,12 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
""" """
logging.debug("add node: %s", request) logging.debug("add node: %s", request)
session = self.get_session(request.session_id, context) session = self.get_session(request.session_id, context)
_type, _id, options = grpcutils.add_node_data(request.node)
node_proto = request.node node = session.add_node(_type=_type, _id=_id, options=options)
node_id = node_proto.id
node_type = node_proto.type
if node_type is None:
node_type = NodeTypes.DEFAULT.value
node_type = NodeTypes(node_type)
options = NodeOptions(name=node_proto.name, model=node_proto.model)
options.icon = node_proto.icon
options.opaque = node_proto.opaque
options.image = node_proto.image
options.services = node_proto.services
if node_proto.server:
options.server = node_proto.server
position = node_proto.position
options.set_position(position.x, position.y)
options.set_location(position.lat, position.lon, position.alt)
node = session.add_node(_type=node_type, _id=node_id, options=options)
# configure emane if provided # configure emane if provided
emane_model = node_proto.emane emane_model = request.node.emane
if emane_model: if emane_model:
session.emane.set_model_config(node_id, emane_model) session.emane.set_model_config(id, emane_model)
return core_pb2.AddNodeResponse(node_id=node.id) return core_pb2.AddNodeResponse(node_id=node.id)
def GetNode(self, request, context): def GetNode(self, request, context):
@ -991,82 +814,16 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
:rtype: core.api.grpc.AddLinkResponse :rtype: core.api.grpc.AddLinkResponse
""" """
logging.debug("add link: %s", request) logging.debug("add link: %s", request)
# validate session and nodes
session = self.get_session(request.session_id, context) session = self.get_session(request.session_id, context)
# validate node exist
self.get_node(session, request.link.node_one_id, context) self.get_node(session, request.link.node_one_id, context)
self.get_node(session, request.link.node_two_id, context) self.get_node(session, request.link.node_two_id, context)
node_one_id = request.link.node_one_id node_one_id = request.link.node_one_id
node_two_id = request.link.node_two_id node_two_id = request.link.node_two_id
interface_one, interface_two, options = grpcutils.add_link_data(request.link)
interface_one = None
interface_one_data = request.link.interface_one
if interface_one_data:
name = interface_one_data.name
if name == "":
name = None
mac = interface_one_data.mac
if mac == "":
mac = None
else:
mac = MacAddress.from_string(mac)
interface_one = InterfaceData(
_id=interface_one_data.id,
name=name,
mac=mac,
ip4=interface_one_data.ip4,
ip4_mask=interface_one_data.ip4mask,
ip6=interface_one_data.ip6,
ip6_mask=interface_one_data.ip6mask,
)
interface_two = None
interface_two_data = request.link.interface_two
if interface_two_data:
name = interface_two_data.name
if name == "":
name = None
mac = interface_two_data.mac
if mac == "":
mac = None
else:
mac = MacAddress.from_string(mac)
interface_two = InterfaceData(
_id=interface_two_data.id,
name=name,
mac=mac,
ip4=interface_two_data.ip4,
ip4_mask=interface_two_data.ip4mask,
ip6=interface_two_data.ip6,
ip6_mask=interface_two_data.ip6mask,
)
link_type = None
link_type_value = request.link.type
if link_type_value is not None:
link_type = LinkTypes(link_type_value)
options_data = request.link.options
link_options = LinkOptions(_type=link_type)
if options_data:
link_options.delay = options_data.delay
link_options.bandwidth = options_data.bandwidth
link_options.per = options_data.per
link_options.dup = options_data.dup
link_options.jitter = options_data.jitter
link_options.mer = options_data.mer
link_options.burst = options_data.burst
link_options.mburst = options_data.mburst
link_options.unidirectional = options_data.unidirectional
link_options.key = options_data.key
link_options.opaque = options_data.opaque
session.add_link( session.add_link(
node_one_id, node_one_id, node_two_id, interface_one, interface_two, link_options=options
node_two_id,
interface_one,
interface_two,
link_options=link_options,
) )
return core_pb2.AddLinkResponse(result=True) return core_pb2.AddLinkResponse(result=True)

View file

@ -26,47 +26,27 @@ def main():
node = core_pb2.Node(id=i, position=position, model="PC") node = core_pb2.Node(id=i, position=position, model="PC")
nodes.append(node) nodes.append(node)
# start session # create links
interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16")
links = [] links = []
for node in nodes:
interface_one = interface_helper.create_interface(node.id, 0)
link = core_pb2.Link(
type=core_pb2.LinkType.WIRED,
node_one_id=node.id,
node_two_id=switch.id,
interface_one=interface_one,
)
links.append(link)
# start session
response = core.start_session(session_id, nodes, links) response = core.start_session(session_id, nodes, links)
logging.info("started session: %s", response) logging.info("started session: %s", response)
# handle events session may broadcast input("press enter to shutdown session")
# core.events(session_id, log_event)
# change session state response = core.stop_session(session_id)
# response = core.set_session_state( logging.info("stop sessionL %s", response)
# session_id, core_pb2.SessionState.CONFIGURATION
# )
# logging.info("set session state: %s", response)
# create switch node
# switch = core_pb2.Node(type=core_pb2.NodeType.SWITCH)
# response = core.add_node(session_id, switch)
# logging.info("created switch: %s", response)
# switch_id = response.node_id
# helper to create interfaces
# interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16")
#
# for i in range(2):
# # create node
# position = core_pb2.Position(x=50 + 50 * i, y=50)
# node = core_pb2.Node(position=position)
# response = core.add_node(session_id, node)
# logging.info("created node: %s", response)
# node_id = response.node_id
#
# # create link
# interface_one = interface_helper.create_interface(node_id, 0)
# response = core.add_link(session_id, node_id, switch_id, interface_one)
# logging.info("created link: %s", response)
# change session state
# response = core.set_session_state(
# session_id, core_pb2.SessionState.INSTANTIATION
# )
# logging.info("set session state: %s", response)
if __name__ == "__main__": if __name__ == "__main__":