Merge branch 'develop' into coretk-enhance/scaling

This commit is contained in:
Huy Pham 2020-02-17 09:27:37 -08:00
commit b7116c52ff
13 changed files with 152 additions and 38 deletions

View file

@ -26,6 +26,10 @@ from core.api.grpc.configservices_pb2 import (
SetNodeConfigServiceRequest,
SetNodeConfigServiceResponse,
)
from core.api.grpc.core_pb2 import (
GetEmaneEventChannelRequest,
GetEmaneEventChannelResponse,
)
class InterfaceHelper:
@ -834,9 +838,11 @@ class CoreGrpcClient:
session_id: int,
node_id: int,
service: str,
startup: List[str],
validate: List[str],
shutdown: List[str],
files: List[str] = None,
directories: List[str] = None,
startup: List[str] = None,
validate: List[str] = None,
shutdown: List[str] = None,
) -> core_pb2.SetNodeServiceResponse:
"""
Set service data for a node.
@ -844,6 +850,8 @@ class CoreGrpcClient:
:param session_id: session id
:param node_id: node id
:param service: service name
:param files: service files
:param directories: service directories
:param startup: startup commands
:param validate: validation commands
:param shutdown: shutdown commands
@ -853,6 +861,8 @@ class CoreGrpcClient:
config = core_pb2.ServiceConfig(
node_id=node_id,
service=service,
files=files,
directories=directories,
startup=startup,
validate=validate,
shutdown=shutdown,
@ -1133,6 +1143,10 @@ class CoreGrpcClient:
)
return self.stub.SetNodeConfigService(request)
def get_emane_event_channel(self, session_id: int) -> GetEmaneEventChannelResponse:
request = GetEmaneEventChannelRequest(session_id=session_id)
return self.stub.GetEmaneEventChannel(request)
def connect(self) -> None:
"""
Open connection to server, must be closed manually.

View file

@ -2,6 +2,8 @@ import logging
import time
from typing import Any, Dict, List, Tuple, Type
import netaddr
from core import utils
from core.api.grpc import common_pb2, core_pb2
from core.config import ConfigurableOptions
@ -10,6 +12,7 @@ from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import LinkTypes, NodeTypes
from core.emulator.session import Session
from core.nodes.base import CoreNetworkBase, NodeBase
from core.nodes.interface import CoreInterface
from core.services.coreservices import CoreService
WORKERS = 10
@ -373,9 +376,16 @@ def service_configuration(session: Session, config: core_pb2.ServiceConfig) -> N
"""
session.services.set_service(config.node_id, config.service)
service = session.services.get_service(config.node_id, config.service)
service.startup = tuple(config.startup)
service.validate = tuple(config.validate)
service.shutdown = tuple(config.shutdown)
if config.files:
service.files = tuple(config.files)
if config.directories:
service.directories = tuple(config.directories)
if config.startup:
service.startup = tuple(config.startup)
if config.validate:
service.validate = tuple(config.validate)
if config.shutdown:
service.shutdown = tuple(config.shutdown)
def get_service_configuration(service: Type[CoreService]) -> core_pb2.NodeServiceData:
@ -397,3 +407,40 @@ def get_service_configuration(service: Type[CoreService]) -> core_pb2.NodeServic
shutdown=service.shutdown,
meta=service.meta,
)
def interface_to_proto(interface: CoreInterface) -> core_pb2.Interface:
"""
Convenience for converting a core interface to the protobuf representation.
:param interface: interface to convert
:return: interface proto
"""
net_id = None
if interface.net:
net_id = interface.net.id
ip4 = None
ip4mask = None
ip6 = None
ip6mask = None
for addr in interface.addrlist:
network = netaddr.IPNetwork(addr)
mask = network.prefixlen
ip = str(network.ip)
if netaddr.valid_ipv4(ip) and not ip4:
ip4 = ip
ip4mask = mask
elif netaddr.valid_ipv6(ip) and not ip6:
ip6 = ip
ip6mask = mask
return core_pb2.Interface(
id=interface.netindex,
netid=net_id,
name=interface.name,
mac=str(interface.hwaddr),
mtu=interface.mtu,
flowid=interface.flow_id,
ip4=ip4,
ip4mask=ip4mask,
ip6=ip6,
ip6mask=ip6mask,
)

View file

@ -32,6 +32,10 @@ from core.api.grpc.configservices_pb2 import (
SetNodeConfigServiceRequest,
SetNodeConfigServiceResponse,
)
from core.api.grpc.core_pb2 import (
GetEmaneEventChannelRequest,
GetEmaneEventChannelResponse,
)
from core.api.grpc.events import EventStreamer
from core.api.grpc.grpcutils import (
get_config_options,
@ -637,17 +641,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
interfaces = []
for interface_id in node._netif:
interface = node._netif[interface_id]
net_id = None
if interface.net:
net_id = interface.net.id
interface_proto = core_pb2.Interface(
id=interface_id,
netid=net_id,
name=interface.name,
mac=str(interface.hwaddr),
mtu=interface.mtu,
flowid=interface.flow_id,
)
interface_proto = grpcutils.interface_to_proto(interface)
interfaces.append(interface_proto)
emane_model = None
@ -795,10 +789,20 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node_one_id = request.link.node_one_id
node_two_id = request.link.node_two_id
interface_one, interface_two, options = grpcutils.add_link_data(request.link)
session.add_link(
node_one_interface, node_two_interface = session.add_link(
node_one_id, node_two_id, interface_one, interface_two, link_options=options
)
return core_pb2.AddLinkResponse(result=True)
interface_one_proto = None
interface_two_proto = None
if node_one_interface:
interface_one_proto = grpcutils.interface_to_proto(node_one_interface)
if node_two_interface:
interface_two_proto = grpcutils.interface_to_proto(node_two_interface)
return core_pb2.AddLinkResponse(
result=True,
interface_one=interface_one_proto,
interface_two=interface_two_proto,
)
def EditLink(
self, request: core_pb2.EditLinkRequest, context: ServicerContext
@ -1630,3 +1634,14 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
grpc.StatusCode.NOT_FOUND,
f"node {node.name} missing service {request.name}",
)
def GetEmaneEventChannel(
self, request: GetEmaneEventChannelRequest, context: ServicerContext
) -> GetEmaneEventChannelResponse:
session = self.get_session(request.session_id, context)
group = None
port = None
device = None
if session.emane.eventchannel:
group, port, device = session.emane.eventchannel
return GetEmaneEventChannelResponse(group=group, port=port, device=device)

View file

@ -91,6 +91,7 @@ class EmaneManager(ModelManager):
self.set_configs(self.emane_config.default_values())
self.service = None
self.eventchannel = None
self.event_device = None
self.emane_check()
@ -204,13 +205,13 @@ class EmaneManager(ModelManager):
if eventnet is not None:
# direct EMANE events towards control net bridge
self.event_device = eventnet.brname
eventchannel = (group, int(port), self.event_device)
self.eventchannel = (group, int(port), self.event_device)
# disabled otachannel for event service
# only needed for e.g. antennaprofile events xmit by models
logging.info("using %s for event service traffic", self.event_device)
try:
self.service = EventService(eventchannel=eventchannel, otachannel=None)
self.service = EventService(eventchannel=self.eventchannel, otachannel=None)
except EventServiceException:
logging.exception("error instantiating emane EventService")

View file

@ -42,7 +42,7 @@ from core.location.event import EventLoop
from core.location.mobility import BasicRangeModel, MobilityManager
from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase, NodeBase
from core.nodes.docker import DockerNode
from core.nodes.interface import GreTap
from core.nodes.interface import CoreInterface, GreTap
from core.nodes.lxd import LxcNode
from core.nodes.network import (
CtrlNet,
@ -55,7 +55,7 @@ from core.nodes.network import (
)
from core.nodes.physical import PhysicalNode, Rj45Node
from core.plugins.sdt import Sdt
from core.services.coreservices import CoreServices, ServiceBootError
from core.services.coreservices import CoreServices
from core.xml import corexml, corexmldeployment
from core.xml.corexml import CoreXmlReader, CoreXmlWriter
@ -301,7 +301,7 @@ class Session:
interface_one: InterfaceData = None,
interface_two: InterfaceData = None,
link_options: LinkOptions = None,
) -> None:
) -> Tuple[CoreInterface, CoreInterface]:
"""
Add a link between nodes.
@ -313,7 +313,7 @@ class Session:
data, defaults to none
:param link_options: data for creating link,
defaults to no options
:return: nothing
:return: tuple of created core interfaces, depending on link
"""
if not link_options:
link_options = LinkOptions()
@ -328,6 +328,9 @@ class Session:
if node_two:
node_two.lock.acquire()
node_one_interface = None
node_two_interface = None
try:
# wireless link
if link_options.type == LinkTypes.WIRELESS:
@ -353,6 +356,7 @@ class Session:
net_one.name,
)
interface = create_interface(node_one, net_one, interface_one)
node_one_interface = interface
link_config(net_one, interface, link_options)
# network to node
@ -363,6 +367,7 @@ class Session:
net_one.name,
)
interface = create_interface(node_two, net_one, interface_two)
node_two_interface = interface
if not link_options.unidirectional:
link_config(net_one, interface, link_options)
@ -374,6 +379,7 @@ class Session:
net_two.name,
)
interface = net_one.linknet(net_two)
node_one_interface = interface
link_config(net_one, interface, link_options)
if not link_options.unidirectional:
@ -426,6 +432,8 @@ class Session:
if node_two:
node_two.lock.release()
return node_one_interface, node_two_interface
def delete_link(
self,
node_one_id: int,
@ -1459,7 +1467,7 @@ class Session:
)
self.broadcast_exception(exception_data)
def instantiate(self) -> List[ServiceBootError]:
def instantiate(self) -> List[Exception]:
"""
We have entered the instantiation state, invoke startup methods
of various managers and boot the nodes. Validate nodes and check

View file

@ -625,7 +625,12 @@ class CoreClient:
shutdowns: List[str],
) -> core_pb2.NodeServiceData:
response = self.client.set_node_service(
self.session_id, node_id, service_name, startups, validations, shutdowns
self.session_id,
node_id,
service_name,
startup=startups,
validate=validations,
shutdown=shutdowns,
)
logging.info(
"Set %s service for node(%s), Startup: %s, Validation: %s, Shutdown: %s, Result: %s",
@ -713,9 +718,9 @@ class CoreClient:
self.session_id,
config_proto.node_id,
config_proto.service,
config_proto.startup,
config_proto.validate,
config_proto.shutdown,
startup=config_proto.startup,
validate=config_proto.validate,
shutdown=config_proto.shutdown,
)
for config_proto in self.get_service_file_configs_proto():
self.client.set_node_service_file(

View file

@ -426,9 +426,9 @@ class ServiceConfigDialog(Dialog):
config = self.core.set_node_service(
self.node_id,
self.service_name,
startup_commands,
validate_commands,
shutdown_commands,
startups=startup_commands,
validations=validate_commands,
shutdowns=shutdown_commands,
)
if self.node_id not in self.service_configs:
self.service_configs[self.node_id] = {}

View file

@ -738,6 +738,9 @@ class CoreNode(CoreNodeBase):
flow_id = self.node_net_client.get_ifindex(veth.name)
veth.flow_id = int(flow_id)
logging.debug("interface flow index: %s - %s", veth.name, veth.flow_id)
hwaddr = self.node_net_client.get_mac(veth.name)
logging.debug("interface mac: %s - %s", veth.name, hwaddr)
veth.sethwaddr(hwaddr)
try:
# add network interface to the node. If unsuccessful, destroy the

View file

@ -664,7 +664,7 @@ class RadvdService(UtilService):
for ifc in node.netifs():
if hasattr(ifc, "control") and ifc.control is True:
continue
prefixes = map(cls.subnetentry, ifc.addrlist)
prefixes = list(map(cls.subnetentry, ifc.addrlist))
if len(prefixes) < 1:
continue
cfg += (

View file

@ -140,6 +140,8 @@ service CoreApi {
}
rpc GetEmaneModelConfigs (GetEmaneModelConfigsRequest) returns (GetEmaneModelConfigsResponse) {
}
rpc GetEmaneEventChannel (GetEmaneEventChannelRequest) returns (GetEmaneEventChannelResponse) {
}
// xml rpc
rpc SaveXml (SaveXmlRequest) returns (SaveXmlResponse) {
@ -454,6 +456,8 @@ message AddLinkRequest {
message AddLinkResponse {
bool result = 1;
Interface interface_one = 2;
Interface interface_two = 3;
}
message EditLinkRequest {
@ -708,6 +712,16 @@ message GetEmaneModelConfigsResponse {
repeated ModelConfig configs = 1;
}
message GetEmaneEventChannelRequest {
int32 session_id = 1;
}
message GetEmaneEventChannelResponse {
string group = 1;
int32 port = 2;
string device = 3;
}
message SaveXmlRequest {
int32 session_id = 1;
}
@ -769,6 +783,8 @@ message ServiceConfig {
repeated string startup = 3;
repeated string validate = 4;
repeated string shutdown = 5;
repeated string files = 6;
repeated string directories = 7;
}
message ServiceFileConfig {

View file

@ -19,6 +19,7 @@ from core.emulator.emudata import IpPrefixes
from core.emulator.enumerations import EventTypes
from core.emulator.session import Session
from core.nodes.base import CoreNode
from core.nodes.netclient import LinuxNetClient
EMANE_SERVICES = "zebra|OSPFv3MDR|IPForward"
@ -27,8 +28,8 @@ class PatchManager:
def __init__(self):
self.patches = []
def patch_obj(self, _cls, attribute):
p = mock.patch.object(_cls, attribute)
def patch_obj(self, _cls, attribute, return_value=None):
p = mock.patch.object(_cls, attribute, return_value=return_value)
p.start()
self.patches.append(p)
@ -51,11 +52,14 @@ class MockServer:
@pytest.fixture(scope="session")
def patcher(request):
patch_manager = PatchManager()
patch_manager.patch_obj(DistributedServer, "remote_cmd")
patch_manager.patch_obj(DistributedServer, "remote_cmd", return_value="1")
if request.config.getoption("mock"):
patch_manager.patch("os.mkdir")
patch_manager.patch("core.utils.cmd")
patch_manager.patch("core.nodes.netclient.get_net_client")
patch_manager.patch_obj(
LinuxNetClient, "get_mac", return_value="00:00:00:00:00:00"
)
patch_manager.patch_obj(CoreNode, "nodefile")
patch_manager.patch_obj(Session, "write_state")
patch_manager.patch_obj(Session, "write_nodes")

View file

@ -13,6 +13,7 @@ class TestDistributed:
options = NodeOptions()
options.server = server_name
node = session.add_node(options=options)
session.instantiate()
# then
assert node.server is not None

View file

@ -935,7 +935,7 @@ class TestGrpc:
# then
with client.context_connect():
response = client.set_node_service(
session.id, node.id, service_name, [], validate, []
session.id, node.id, service_name, validate=validate
)
# then