Merge branch 'develop' into pydocupdates

This commit is contained in:
Huy Pham 2019-09-12 10:24:23 -07:00
commit 2bfcc9ef24
100 changed files with 5340 additions and 3488 deletions

View file

@ -4,17 +4,24 @@ import os
import re
import tempfile
import time
import grpc
from builtins import int
from concurrent import futures
from queue import Queue, Empty
from queue import Empty, Queue
from core.api.grpc import core_pb2
from core.api.grpc import core_pb2_grpc
from core.emulator.data import NodeData, LinkData, EventData, ConfigData, ExceptionData, FileData
from core.emulator.emudata import NodeOptions, InterfaceData, LinkOptions
from core.emulator.enumerations import NodeTypes, EventTypes, LinkTypes
import grpc
from core import CoreError
from core.api.grpc import core_pb2, core_pb2_grpc
from core.emulator.data import (
ConfigData,
EventData,
ExceptionData,
FileData,
LinkData,
NodeData,
)
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import EventTypes, LinkTypes, NodeTypes
from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility
from core.nodes import nodeutils
from core.nodes.base import CoreNetworkBase
@ -50,8 +57,10 @@ def get_config_groups(config, configurable_options):
for config_group in configurable_options.config_groups():
start = config_group.start - 1
stop = config_group.stop
options = config_options[start: stop]
config_group_proto = core_pb2.ConfigGroup(name=config_group.name, options=options)
options = config_options[start:stop]
config_group_proto = core_pb2.ConfigGroup(
name=config_group.name, options=options
)
groups.append(config_group_proto)
return groups
@ -87,9 +96,14 @@ def convert_link(session, link_data):
interface = node.netif(link_data.interface1_id)
interface_name = interface.name
interface_one = core_pb2.Interface(
id=link_data.interface1_id, name=interface_name, mac=convert_value(link_data.interface1_mac),
ip4=convert_value(link_data.interface1_ip4), ip4mask=link_data.interface1_ip4_mask,
ip6=convert_value(link_data.interface1_ip6), ip6mask=link_data.interface1_ip6_mask)
id=link_data.interface1_id,
name=interface_name,
mac=convert_value(link_data.interface1_mac),
ip4=convert_value(link_data.interface1_ip4),
ip4mask=link_data.interface1_ip4_mask,
ip6=convert_value(link_data.interface1_ip6),
ip6mask=link_data.interface1_ip6_mask,
)
interface_two = None
if link_data.interface2_id is not None:
@ -99,9 +113,14 @@ def convert_link(session, link_data):
interface = node.netif(link_data.interface2_id)
interface_name = interface.name
interface_two = core_pb2.Interface(
id=link_data.interface2_id, name=interface_name, mac=convert_value(link_data.interface2_mac),
ip4=convert_value(link_data.interface2_ip4), ip4mask=link_data.interface2_ip4_mask,
ip6=convert_value(link_data.interface2_ip6), ip6mask=link_data.interface2_ip6_mask)
id=link_data.interface2_id,
name=interface_name,
mac=convert_value(link_data.interface2_mac),
ip4=convert_value(link_data.interface2_ip4),
ip4mask=link_data.interface2_ip4_mask,
ip6=convert_value(link_data.interface2_ip6),
ip6mask=link_data.interface2_ip6_mask,
)
options = core_pb2.LinkOptions(
opaque=link_data.opaque,
@ -114,12 +133,16 @@ def convert_link(session, link_data):
burst=link_data.burst,
delay=link_data.delay,
dup=link_data.dup,
unidirectional=link_data.unidirectional
unidirectional=link_data.unidirectional,
)
return core_pb2.Link(
type=link_data.link_type, node_one_id=link_data.node1_id, node_two_id=link_data.node2_id,
interface_one=interface_one, interface_two=interface_two, options=options
type=link_data.link_type,
node_one_id=link_data.node1_id,
node_two_id=link_data.node2_id,
interface_one=interface_one,
interface_two=interface_two,
options=options,
)
@ -158,7 +181,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
context.abort(grpc.StatusCode.CANCELLED, "server stopping")
def listen(self, address):
logging.info("starting grpc api: %s", address)
logging.info("CORE gRPC API listening on: %s", address)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
core_pb2_grpc.add_CoreApiServicer_to_server(self, self.server)
self.server.add_insecure_port(address)
@ -173,14 +196,18 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def get_session(self, session_id, context):
session = self.coreemu.sessions.get(session_id)
if not session:
context.abort(grpc.StatusCode.NOT_FOUND, "session {} not found".format(session_id))
context.abort(
grpc.StatusCode.NOT_FOUND, "session {} not found".format(session_id)
)
return session
def get_node(self, session, node_id, context):
try:
return session.get_node(node_id)
except KeyError:
context.abort(grpc.StatusCode.NOT_FOUND, "node {} not found".format(node_id))
context.abort(
grpc.StatusCode.NOT_FOUND, "node {} not found".format(node_id)
)
def CreateSession(self, request, context):
logging.debug("create session: %s", request)
@ -188,7 +215,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
session.set_state(EventTypes.DEFINITION_STATE)
session.location.setrefgeo(47.57917, -122.13232, 2.0)
session.location.refscale = 150000.0
return core_pb2.CreateSessionResponse(session_id=session.id, state=session.state)
return core_pb2.CreateSessionResponse(
session_id=session.id, state=session.state
)
def DeleteSession(self, request, context):
logging.debug("delete session: %s", request)
@ -201,7 +230,8 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
for session_id in self.coreemu.sessions:
session = self.coreemu.sessions[session_id]
session_summary = core_pb2.SessionSummary(
id=session_id, state=session.state, nodes=session.get_node_count())
id=session_id, state=session.state, nodes=session.get_node_count()
)
sessions.append(session_summary)
return core_pb2.GetSessionsResponse(sessions=sessions)
@ -211,13 +241,21 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
x, y, z = session.location.refxyz
lat, lon, alt = session.location.refgeo
position = core_pb2.SessionPosition(x=x, y=y, z=z, lat=lat, lon=lon, alt=alt)
return core_pb2.GetSessionLocationResponse(position=position, scale=session.location.refscale)
return core_pb2.GetSessionLocationResponse(
position=position, scale=session.location.refscale
)
def SetSessionLocation(self, request, context):
logging.debug("set session location: %s", request)
session = self.get_session(request.session_id, context)
session.location.refxyz = (request.position.x, request.position.y, request.position.z)
session.location.setrefgeo(request.position.lat, request.position.lon, request.position.alt)
session.location.refxyz = (
request.position.x,
request.position.y,
request.position.z,
)
session.location.setrefgeo(
request.position.lat, request.position.lon, request.position.alt
)
session.location.refscale = request.scale
return core_pb2.SetSessionLocationResponse(result=True)
@ -275,7 +313,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node_type = nodeutils.get_node_type(node.__class__).value
model = getattr(node, "type", None)
position = core_pb2.Position(x=node.position.x, y=node.position.y, z=node.position.z)
position = core_pb2.Position(
x=node.position.x, y=node.position.y, z=node.position.z
)
services = getattr(node, "services", [])
if services is None:
@ -287,8 +327,14 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
emane_model = node.model.name
node_proto = core_pb2.Node(
id=node.id, name=node.name, emane=emane_model, model=model,
type=node_type, position=position, services=services)
id=node.id,
name=node.name,
emane=emane_model,
model=model,
type=node_type,
position=position,
services=services,
)
if isinstance(node, (DockerNode, LxcNode)):
node_proto.image = node.image
nodes.append(node_proto)
@ -348,23 +394,38 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
services = event.services or ""
services = services.split("|")
node_proto = core_pb2.Node(
id=event.id, name=event.name, model=event.model, position=position, services=services)
id=event.id,
name=event.name,
model=event.model,
position=position,
services=services,
)
return core_pb2.NodeEvent(node=node_proto)
def _handle_link_event(self, event):
interface_one = None
if event.interface1_id is not None:
interface_one = core_pb2.Interface(
id=event.interface1_id, name=event.interface1_name, mac=convert_value(event.interface1_mac),
ip4=convert_value(event.interface1_ip4), ip4mask=event.interface1_ip4_mask,
ip6=convert_value(event.interface1_ip6), ip6mask=event.interface1_ip6_mask)
id=event.interface1_id,
name=event.interface1_name,
mac=convert_value(event.interface1_mac),
ip4=convert_value(event.interface1_ip4),
ip4mask=event.interface1_ip4_mask,
ip6=convert_value(event.interface1_ip6),
ip6mask=event.interface1_ip6_mask,
)
interface_two = None
if event.interface2_id is not None:
interface_two = core_pb2.Interface(
id=event.interface2_id, name=event.interface2_name, mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4), ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6), ip6mask=event.interface2_ip6_mask)
id=event.interface2_id,
name=event.interface2_name,
mac=convert_value(event.interface2_mac),
ip4=convert_value(event.interface2_ip4),
ip4mask=event.interface2_ip4_mask,
ip6=convert_value(event.interface2_ip6),
ip6mask=event.interface2_ip6_mask,
)
options = core_pb2.LinkOptions(
opaque=event.opaque,
@ -377,11 +438,16 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
burst=event.burst,
delay=event.delay,
dup=event.dup,
unidirectional=event.unidirectional
unidirectional=event.unidirectional,
)
link = core_pb2.Link(
type=event.link_type, node_one_id=event.node1_id, node_two_id=event.node2_id,
interface_one=interface_one, interface_two=interface_two, options=options)
type=event.link_type,
node_one_id=event.node1_id,
node_two_id=event.node2_id,
interface_one=interface_one,
interface_two=interface_two,
options=options,
)
return core_pb2.LinkEvent(message_type=event.message_type, link=link)
def _handle_session_event(self, event):
@ -394,7 +460,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
name=event.name,
data=event.data,
time=event_time,
session_id=event.session
session_id=event.session,
)
def _handle_config_event(self, event):
@ -415,7 +481,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
interface=event.interface_number,
network_id=event.network_id,
opaque=event.opaque,
data_types=event.data_types
data_types=event.data_types,
)
def _handle_exception_event(self, event):
@ -426,7 +492,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
source=event.source,
date=event.date,
text=event.text,
opaque=event.opaque
opaque=event.opaque,
)
def _handle_file_event(self, event):
@ -440,7 +506,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
source=event.source,
session_id=event.session,
data=event.data,
compressed_data=event.compressed_data
compressed_data=event.compressed_data,
)
def Throughputs(self, request, context):
@ -460,21 +526,29 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
previous_rxtx = last_stats.get(key)
if not previous_rxtx:
continue
rx_kbps = (current_rxtx["rx"] - previous_rxtx["rx"]) * 8.0 / interval
tx_kbps = (current_rxtx["tx"] - previous_rxtx["tx"]) * 8.0 / interval
rx_kbps = (
(current_rxtx["rx"] - previous_rxtx["rx"]) * 8.0 / interval
)
tx_kbps = (
(current_rxtx["tx"] - previous_rxtx["tx"]) * 8.0 / interval
)
throughput = rx_kbps + tx_kbps
if key.startswith("veth"):
key = key.split(".")
node_id = int(_INTERFACE_REGEX.search(key[0]).group())
interface_id = int(key[1])
interface_throughput = throughputs_event.interface_throughputs.add()
interface_throughput = (
throughputs_event.interface_throughputs.add()
)
interface_throughput.node_id = node_id
interface_throughput.interface_id = interface_id
interface_throughput.throughput = throughput
elif key.startswith("b."):
try:
node_id = int(key.split(".")[1])
bridge_throughput = throughputs_event.bridge_throughputs.add()
bridge_throughput = (
throughputs_event.bridge_throughputs.add()
)
bridge_throughput.node_id = node_id
bridge_throughput.throughput = throughput
except ValueError:
@ -527,8 +601,13 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
if interface.net:
net_id = interface.net.id
interface_proto = core_pb2.Interface(
id=interface_id, netid=net_id, name=interface.name, mac=str(interface.hwaddr),
mtu=interface.mtu, flowid=interface.flow_id)
id=interface_id,
netid=net_id,
name=interface.name,
mac=str(interface.hwaddr),
mtu=interface.mtu,
flowid=interface.flow_id,
)
interfaces.append(interface_proto)
emane_model = None
@ -536,11 +615,19 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
emane_model = node.model.name
services = [x.name for x in getattr(node, "services", [])]
position = core_pb2.Position(x=node.position.x, y=node.position.y, z=node.position.z)
position = core_pb2.Position(
x=node.position.x, y=node.position.y, z=node.position.z
)
node_type = nodeutils.get_node_type(node.__class__).value
node_proto = core_pb2.Node(
id=node.id, name=node.name, type=node_type, emane=emane_model, model=node.type, position=position,
services=services)
id=node.id,
name=node.name,
type=node_type,
emane=emane_model,
model=node.type,
position=position,
services=services,
)
if isinstance(node, (DockerNode, LxcNode)):
node_proto.image = node.image
@ -558,7 +645,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
lon = request.position.lon
alt = request.position.alt
node_options.set_location(lat, lon, alt)
result = session.update_node(node_id, node_options)
result = True
try:
session.update_node(node_id, node_options)
except CoreError:
result = False
return core_pb2.EditNodeResponse(result=result)
def DeleteNode(self, request, context):
@ -660,7 +751,13 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
link_options.key = options_data.key
link_options.opaque = options_data.opaque
session.add_link(node_one_id, node_two_id, interface_one, interface_two, link_options=link_options)
session.add_link(
node_one_id,
node_two_id,
interface_one,
interface_two,
link_options=link_options,
)
return core_pb2.AddLinkResponse(result=True)
def EditLink(self, request, context):
@ -683,7 +780,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
link_options.unidirectional = options_data.unidirectional
link_options.key = options_data.key
link_options.opaque = options_data.opaque
session.update_link(node_one_id, node_two_id, interface_one_id, interface_two_id, link_options)
session.update_link(
node_one_id, node_two_id, interface_one_id, interface_two_id, link_options
)
return core_pb2.EditLinkResponse(result=True)
def DeleteLink(self, request, context):
@ -693,7 +792,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node_two_id = request.node_two_id
interface_one_id = request.interface_one_id
interface_two_id = request.interface_two_id
session.delete_link(node_one_id, node_two_id, interface_one_id, interface_two_id)
session.delete_link(
node_one_id, node_two_id, interface_one_id, interface_two_id
)
return core_pb2.DeleteLinkResponse(result=True)
def GetHooks(self, request, context):
@ -733,14 +834,18 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def GetMobilityConfig(self, request, context):
logging.debug("get mobility config: %s", request)
session = self.get_session(request.session_id, context)
config = session.mobility.get_model_config(request.node_id, Ns2ScriptedMobility.name)
config = session.mobility.get_model_config(
request.node_id, Ns2ScriptedMobility.name
)
groups = get_config_groups(config, Ns2ScriptedMobility)
return core_pb2.GetMobilityConfigResponse(groups=groups)
def SetMobilityConfig(self, request, context):
logging.debug("set mobility config: %s", request)
session = self.get_session(request.session_id, context)
session.mobility.set_model_config(request.node_id, Ns2ScriptedMobility.name, request.config)
session.mobility.set_model_config(
request.node_id, Ns2ScriptedMobility.name, request.config
)
return core_pb2.SetMobilityConfigResponse(result=True)
def MobilityAction(self, request, context):
@ -773,7 +878,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
all_service_defaults = []
for node_type in session.services.default_services:
services = session.services.default_services[node_type]
service_defaults = core_pb2.ServiceDefaults(node_type=node_type, services=services)
service_defaults = core_pb2.ServiceDefaults(
node_type=node_type, services=services
)
all_service_defaults.append(service_defaults)
return core_pb2.GetServiceDefaultsResponse(defaults=all_service_defaults)
@ -782,13 +889,17 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
session = self.get_session(request.session_id, context)
session.services.default_services.clear()
for service_defaults in request.defaults:
session.services.default_services[service_defaults.node_type] = service_defaults.services
session.services.default_services[
service_defaults.node_type
] = service_defaults.services
return core_pb2.SetServiceDefaultsResponse(result=True)
def GetNodeService(self, request, context):
logging.debug("get node service: %s", request)
session = self.get_session(request.session_id, context)
service = session.services.get_service(request.node_id, request.service, default_service=True)
service = session.services.get_service(
request.node_id, request.service, default_service=True
)
service_proto = core_pb2.NodeServiceData(
executables=service.executables,
dependencies=service.dependencies,
@ -799,7 +910,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
validation_mode=service.validation_mode.value,
validation_timer=service.validation_timer,
shutdown=service.shutdown,
meta=service.meta
meta=service.meta,
)
return core_pb2.GetNodeServiceResponse(service=service_proto)
@ -814,7 +925,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
break
if not service:
context.abort(grpc.StatusCode.NOT_FOUND, "service not found")
file_data = session.services.get_service_file(node, request.service, request.file)
file_data = session.services.get_service_file(
node, request.service, request.file
)
return core_pb2.GetNodeServiceFileResponse(data=file_data.data)
def SetNodeService(self, request, context):
@ -830,7 +943,9 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def SetNodeServiceFile(self, request, context):
logging.debug("set node service file: %s", request)
session = self.get_session(request.session_id, context)
session.services.set_service_file(request.node_id, request.service, request.file, request.data)
session.services.set_service_file(
request.node_id, request.service, request.file, request.data
)
return core_pb2.SetNodeServiceFileResponse(result=True)
def ServiceAction(self, request, context):
@ -867,14 +982,18 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def GetWlanConfig(self, request, context):
logging.debug("get wlan config: %s", request)
session = self.get_session(request.session_id, context)
config = session.mobility.get_model_config(request.node_id, BasicRangeModel.name)
config = session.mobility.get_model_config(
request.node_id, BasicRangeModel.name
)
groups = get_config_groups(config, BasicRangeModel)
return core_pb2.GetWlanConfigResponse(groups=groups)
def SetWlanConfig(self, request, context):
logging.debug("set wlan config: %s", request)
session = self.get_session(request.session_id, context)
session.mobility.set_model_config(request.node_id, BasicRangeModel.name, request.config)
session.mobility.set_model_config(
request.node_id, BasicRangeModel.name, request.config
)
if session.state == EventTypes.RUNTIME_STATE.value:
node = self.get_node(session, request.node_id, context)
node.updatemodel(request.config)
@ -970,7 +1089,11 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
def GetInterfaces(self, request, context):
interfaces = []
for interface in os.listdir("/sys/class/net"):
if interface.startswith("b.") or interface.startswith("veth") or interface == "lo":
if (
interface.startswith("b.")
or interface.startswith("veth")
or interface == "lo"
):
continue
interfaces.append(interface)
return core_pb2.GetInterfacesResponse(interfaces=interfaces)