Merge pull request #259 from coreemu/emane-distributed
Emane distributed fixes
This commit is contained in:
commit
ca28920e2d
9 changed files with 281 additions and 244 deletions
|
@ -10,6 +10,7 @@ import select
|
|||
import socket
|
||||
import threading
|
||||
|
||||
from core import utils
|
||||
from core.api.tlv import coreapi
|
||||
from core.nodes.base import CoreNodeBase, CoreNetworkBase
|
||||
from core.emulator.enumerations import ConfigDataTypes
|
||||
|
@ -121,7 +122,6 @@ class CoreBroker(object):
|
|||
self.physical_nodes = set()
|
||||
# allows for other message handlers to process API messages (e.g. EMANE)
|
||||
self.handlers = set()
|
||||
self.handlers.add(self.handle_distributed)
|
||||
# dict with tunnel key to tunnel device mapping
|
||||
self.tunnels = {}
|
||||
self.dorecvloop = False
|
||||
|
@ -388,12 +388,13 @@ class CoreBroker(object):
|
|||
:return: tunnel key for the node pair
|
||||
:rtype: int
|
||||
"""
|
||||
logging.debug("creating tunnel key for: %s, %s", n1num, n2num)
|
||||
sid = self.session_id_master
|
||||
if sid is None:
|
||||
# this is the master session
|
||||
sid = self.session.id
|
||||
|
||||
key = (sid << 16) ^ hash(n1num) ^ (hash(n2num) << 8)
|
||||
key = (sid << 16) ^ utils.hashkey(n1num) ^ (utils.hashkey(n2num) << 8)
|
||||
return key & 0xFFFFFFFF
|
||||
|
||||
def addtunnel(self, remoteip, n1num, n2num, localnum):
|
||||
|
@ -1049,62 +1050,3 @@ class CoreBroker(object):
|
|||
if not server.instantiation_complete:
|
||||
return False
|
||||
return True
|
||||
|
||||
def handle_distributed(self, message):
|
||||
"""
|
||||
Handle the session options config message as it has reached the
|
||||
broker. Options requiring modification for distributed operation should
|
||||
be handled here.
|
||||
|
||||
:param message: message to handle
|
||||
:return: nothing
|
||||
"""
|
||||
if not self.session.master:
|
||||
return
|
||||
|
||||
if message.message_type != MessageTypes.CONFIG.value or message.get_tlv(ConfigTlvs.OBJECT.value) != "session":
|
||||
return
|
||||
|
||||
values_str = message.get_tlv(ConfigTlvs.VALUES.value)
|
||||
if values_str is None:
|
||||
return
|
||||
|
||||
value_strings = values_str.split("|")
|
||||
for value_string in value_strings:
|
||||
key, _value = value_string.split("=", 1)
|
||||
if key == "controlnet":
|
||||
self.handle_distributed_control_net(message, value_strings, value_strings.index(value_string))
|
||||
|
||||
def handle_distributed_control_net(self, message, values, index):
|
||||
"""
|
||||
Modify Config Message if multiple control network prefixes are
|
||||
defined. Map server names to prefixes and repack the message before
|
||||
it is forwarded to slave servers.
|
||||
|
||||
:param message: message to handle
|
||||
:param list values: values to handle
|
||||
:param int index: index ti get key value from
|
||||
:return: nothing
|
||||
"""
|
||||
key_value = values[index]
|
||||
_key, value = key_value.split("=", 1)
|
||||
control_nets = value.split()
|
||||
|
||||
if len(control_nets) < 2:
|
||||
logging.warning("multiple controlnet prefixes do not exist")
|
||||
return
|
||||
|
||||
servers = self.session.broker.getservernames()
|
||||
if len(servers) < 2:
|
||||
logging.warning("not distributed")
|
||||
return
|
||||
|
||||
servers.remove("localhost")
|
||||
# master always gets first prefix
|
||||
servers.insert(0, "localhost")
|
||||
# create list of "server1:ctrlnet1 server2:ctrlnet2 ..."
|
||||
control_nets = map(lambda x: "%s:%s" % (x[0], x[1]), zip(servers, control_nets))
|
||||
values[index] = "controlnet=%s" % (" ".join(control_nets))
|
||||
values_str = "|".join(values)
|
||||
message.tlv_data[ConfigTlvs.VALUES.value] = values_str
|
||||
message.repack()
|
||||
|
|
|
@ -510,7 +510,6 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
:param message: message for replies
|
||||
:return: nothing
|
||||
"""
|
||||
logging.debug("dispatching replies: %s", replies)
|
||||
for reply in replies:
|
||||
message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(reply)
|
||||
try:
|
||||
|
@ -524,7 +523,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
reply_message = "CoreMessage (type %d flags %d length %d)" % (
|
||||
message_type, message_flags, message_length)
|
||||
|
||||
logging.debug("dispatch reply:\n%s", reply_message)
|
||||
logging.debug("sending reply:\n%s", reply_message)
|
||||
|
||||
try:
|
||||
self.sendall(reply)
|
||||
|
@ -629,7 +628,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
|
|||
"""
|
||||
Node Message handler
|
||||
|
||||
:param core.api.coreapi.CoreNodeMessage message: node message
|
||||
:param core.api.tlv.coreapi.CoreNodeMessage message: node message
|
||||
:return: replies to node message
|
||||
"""
|
||||
replies = []
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
emane.py: definition of an Emane class for implementing configuration control of an EMANE emulation.
|
||||
"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
|
@ -443,10 +444,13 @@ class EmaneManager(ModelManager):
|
|||
continue
|
||||
|
||||
platformid += 1
|
||||
|
||||
# create temporary config for updating distributed nodes
|
||||
typeflags = ConfigFlags.UPDATE.value
|
||||
self.set_config("platform_id_start", str(platformid))
|
||||
self.set_config("nem_id_start", str(nemid))
|
||||
config_data = ConfigShim.config_data(0, None, typeflags, self.emane_config, self.get_configs())
|
||||
config = copy.deepcopy(self.get_configs())
|
||||
config["platform_id_start"] = str(platformid)
|
||||
config["nem_id_start"] = str(nemid)
|
||||
config_data = ConfigShim.config_data(0, None, typeflags, self.emane_config, config)
|
||||
message = dataconversion.convert_config(config_data)
|
||||
server.sock.send(message)
|
||||
# increment nemid for next server by number of interfaces
|
||||
|
@ -477,26 +481,30 @@ class EmaneManager(ModelManager):
|
|||
be configured. This generates configuration for slave control nets
|
||||
using the default list of prefixes.
|
||||
"""
|
||||
session = self.session
|
||||
# slave server
|
||||
session = self.session
|
||||
if not session.master:
|
||||
return
|
||||
|
||||
servers = session.broker.getservernames()
|
||||
# not distributed
|
||||
servers = session.broker.getservernames()
|
||||
if len(servers) < 2:
|
||||
return
|
||||
|
||||
prefix = session.options.get_config("controlnet")
|
||||
prefixes = prefix.split()
|
||||
# normal Config messaging will distribute controlnets
|
||||
if len(prefixes) >= len(servers):
|
||||
return
|
||||
prefix = session.options.get_config("controlnet", default="")
|
||||
prefixes = prefix.split()
|
||||
if len(prefixes) < len(servers):
|
||||
logging.info("setting up default controlnet prefixes for distributed (%d configured)", len(prefixes))
|
||||
prefix = ctrlnet.DEFAULT_PREFIX_LIST[0]
|
||||
prefixes = prefix.split()
|
||||
servers.remove("localhost")
|
||||
servers.insert(0, "localhost")
|
||||
prefix = " ".join("%s:%s" % (s, prefixes[i]) for i, s in enumerate(servers))
|
||||
|
||||
# this generates a config message having controlnet prefix assignments
|
||||
logging.info("Setting up default controlnet prefixes for distributed (%d configured)" % len(prefixes))
|
||||
prefixes = ctrlnet.DEFAULT_PREFIX_LIST[0]
|
||||
vals = 'controlnet="%s"' % prefixes
|
||||
logging.info("setting up controlnet prefixes for distributed: %s", prefix)
|
||||
vals = "controlnet=%s" % prefix
|
||||
tlvdata = b""
|
||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.OBJECT.value, "session")
|
||||
tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.TYPE.value, 0)
|
||||
|
@ -504,6 +512,7 @@ class EmaneManager(ModelManager):
|
|||
rawmsg = coreapi.CoreConfMessage.pack(0, tlvdata)
|
||||
msghdr = rawmsg[:coreapi.CoreMessage.header_len]
|
||||
msg = coreapi.CoreConfMessage(flags=0, hdr=msghdr, data=rawmsg[coreapi.CoreMessage.header_len:])
|
||||
logging.debug("sending controlnet message:\n%s", msg)
|
||||
self.session.broker.handle_message(msg)
|
||||
|
||||
def check_node_models(self):
|
||||
|
|
|
@ -165,16 +165,16 @@ class EmaneNode(EmaneNet):
|
|||
nemid = self.getnemid(netif)
|
||||
ifname = netif.localname
|
||||
if nemid is None:
|
||||
logging.info("nemid for %s is unknown" % ifname)
|
||||
logging.info("nemid for %s is unknown", ifname)
|
||||
return
|
||||
lat, long, alt = self.session.location.getgeo(x, y, z)
|
||||
logging.info("setnemposition %s (%s) x,y,z=(%d,%d,%s)(%.6f,%.6f,%.6f)", ifname, nemid, x, y, z, lat, long, alt)
|
||||
lat, lon, alt = self.session.location.getgeo(x, y, z)
|
||||
logging.info("setnemposition %s (%s) x,y,z=(%d,%d,%s)(%.6f,%.6f,%.6f)", ifname, nemid, x, y, z, lat, lon, alt)
|
||||
event = LocationEvent()
|
||||
|
||||
# altitude must be an integer or warning is printed
|
||||
# unused: yaw, pitch, roll, azimuth, elevation, velocity
|
||||
alt = int(round(alt))
|
||||
event.append(nemid, latitude=lat, longitude=long, altitude=alt)
|
||||
event.append(nemid, latitude=lat, longitude=lon, altitude=alt)
|
||||
self.session.emane.service.publish(0, event)
|
||||
|
||||
def setnempositions(self, moved_netifs):
|
||||
|
@ -199,12 +199,12 @@ class EmaneNode(EmaneNet):
|
|||
logging.info("nemid for %s is unknown" % ifname)
|
||||
continue
|
||||
x, y, z = netif.node.getposition()
|
||||
lat, long, alt = self.session.location.getgeo(x, y, z)
|
||||
lat, lon, alt = self.session.location.getgeo(x, y, z)
|
||||
logging.info("setnempositions %d %s (%s) x,y,z=(%d,%d,%s)(%.6f,%.6f,%.6f)",
|
||||
i, ifname, nemid, x, y, z, lat, long, alt)
|
||||
i, ifname, nemid, x, y, z, lat, lon, alt)
|
||||
# altitude must be an integer or warning is printed
|
||||
alt = int(round(alt))
|
||||
event.append(nemid, latitude=lat, longitude=long, altitude=alt)
|
||||
event.append(nemid, latitude=lat, longitude=lon, altitude=alt)
|
||||
i += 1
|
||||
|
||||
self.session.emane.service.publish(0, event)
|
||||
|
|
|
@ -22,10 +22,10 @@ from core.api.tlv.broker import CoreBroker
|
|||
from core.emane.emanemanager import EmaneManager
|
||||
from core.emulator.data import EventData, NodeData
|
||||
from core.emulator.data import ExceptionData
|
||||
from core.emulator.emudata import LinkOptions, NodeOptions
|
||||
from core.emulator.emudata import IdGen
|
||||
from core.emulator.emudata import is_net_node
|
||||
from core.emulator.emudata import LinkOptions, NodeOptions
|
||||
from core.emulator.emudata import create_interface
|
||||
from core.emulator.emudata import is_net_node
|
||||
from core.emulator.emudata import link_config
|
||||
from core.emulator.enumerations import EventTypes, LinkTypes
|
||||
from core.emulator.enumerations import ExceptionLevels
|
||||
|
@ -1168,7 +1168,7 @@ class Session(object):
|
|||
with self._nodes_lock:
|
||||
file_path = os.path.join(self.session_dir, "nodes")
|
||||
with open(file_path, "w") as f:
|
||||
for _id in sorted(self.nodes.keys()):
|
||||
for _id in self.nodes.keys():
|
||||
node = self.nodes[_id]
|
||||
f.write("%s %s %s %s\n" % (_id, node.name, node.apitype, type(node)))
|
||||
except IOError:
|
||||
|
@ -1214,19 +1214,19 @@ class Session(object):
|
|||
# write current nodes out to session directory file
|
||||
self.write_nodes()
|
||||
|
||||
# controlnet may be needed by some EMANE models
|
||||
# create control net interfaces and broker network tunnels
|
||||
# which need to exist for emane to sync on location events
|
||||
# in distributed scenarios
|
||||
self.add_remove_control_interface(node=None, remove=False)
|
||||
self.broker.startup()
|
||||
|
||||
# instantiate will be invoked again upon Emane configure
|
||||
if self.emane.startup() == self.emane.NOT_READY:
|
||||
return
|
||||
|
||||
# start feature helpers
|
||||
self.broker.startup()
|
||||
self.mobility.startup()
|
||||
|
||||
# boot the services on each node
|
||||
# boot node services and then start mobility
|
||||
self.boot_nodes()
|
||||
self.mobility.startup()
|
||||
|
||||
# set broker local instantiation to complete
|
||||
self.broker.local_instantiation_complete()
|
||||
|
@ -1498,7 +1498,7 @@ class Session(object):
|
|||
break
|
||||
|
||||
if not prefix:
|
||||
logging.error("Control network prefix not found for server '%s'" % servers[0])
|
||||
logging.error("control network prefix not found for server: %s", servers[0])
|
||||
assign_address = False
|
||||
try:
|
||||
prefix = prefixes[0].split(':', 1)[1]
|
||||
|
|
|
@ -3,6 +3,7 @@ Miscellaneous utility functions, wrappers around some subprocess procedures.
|
|||
"""
|
||||
|
||||
import fcntl
|
||||
import hashlib
|
||||
import importlib
|
||||
import inspect
|
||||
import logging
|
||||
|
@ -17,6 +18,22 @@ from core import CoreCommandError
|
|||
DEVNULL = open(os.devnull, "wb")
|
||||
|
||||
|
||||
def hashkey(value):
|
||||
"""
|
||||
Provide a consistent hash that can be used in place
|
||||
of the builtin hash, that no longer behaves consistently
|
||||
in python3.
|
||||
|
||||
:param str/int value: value to hash
|
||||
:return: hash value
|
||||
:rtype: int
|
||||
"""
|
||||
if isinstance(value, int):
|
||||
value = str(value)
|
||||
value = value.encode("utf-8")
|
||||
return int(hashlib.sha256(value).hexdigest(), 16)
|
||||
|
||||
|
||||
def _detach_init():
|
||||
"""
|
||||
Fork a child process and exit.
|
||||
|
|
|
@ -108,7 +108,7 @@ def build_node_platform_xml(emane_manager, control_net, node, nem_id, platform_x
|
|||
:return: the next nem id that can be used for creating platform xml files
|
||||
:rtype: int
|
||||
"""
|
||||
logging.debug("building emane platform xml for node(%s): %s", node, node.name)
|
||||
logging.debug("building emane platform xml for node(%s) nem_id(%s): %s", node, nem_id, node.name)
|
||||
nem_entries = {}
|
||||
|
||||
if node.model is None:
|
||||
|
@ -116,6 +116,7 @@ def build_node_platform_xml(emane_manager, control_net, node, nem_id, platform_x
|
|||
return nem_entries
|
||||
|
||||
for netif in node.netifs():
|
||||
logging.debug("building platform xml for interface(%s) nem_id(%s)", netif.name, nem_id)
|
||||
# build nem xml
|
||||
nem_definition = nem_file_name(node.model, netif)
|
||||
nem_element = etree.Element("nem", id=str(nem_id), name=netif.localname, definition=nem_definition)
|
||||
|
|
|
@ -9,134 +9,22 @@ import time
|
|||
import pytest
|
||||
from mock.mock import MagicMock
|
||||
|
||||
from core.api.tlv.coreapi import CoreConfMessage
|
||||
from core.api.tlv.coreapi import CoreEventMessage
|
||||
from core.api.tlv.coreapi import CoreExecMessage
|
||||
from core.api.tlv.coreapi import CoreLinkMessage
|
||||
from core.api.tlv.coreapi import CoreNodeMessage
|
||||
from core.api.grpc.client import InterfaceHelper
|
||||
from core.api.grpc.server import CoreGrpcServer
|
||||
from core.api.tlv.coreapi import CoreConfMessage, CoreEventMessage
|
||||
from core.api.tlv.corehandlers import CoreHandler
|
||||
from core.api.tlv.coreserver import CoreServer
|
||||
from core.emulator.coreemu import CoreEmu
|
||||
from core.emulator.emudata import IpPrefixes
|
||||
from core.emulator.enumerations import CORE_API_PORT
|
||||
from core.emulator.enumerations import CORE_API_PORT, EventTlvs
|
||||
from core.emulator.enumerations import ConfigTlvs
|
||||
from core.emulator.enumerations import EventTlvs
|
||||
from core.emulator.enumerations import EventTypes
|
||||
from core.emulator.enumerations import ExecuteTlvs
|
||||
from core.emulator.enumerations import LinkTlvs
|
||||
from core.emulator.enumerations import LinkTypes
|
||||
from core.emulator.enumerations import MessageFlags
|
||||
from core.emulator.enumerations import NodeTlvs
|
||||
from core.emulator.enumerations import NodeTypes
|
||||
from core.api.grpc.client import InterfaceHelper
|
||||
from core.api.grpc.server import CoreGrpcServer
|
||||
from core.nodes import ipaddress
|
||||
from core.nodes.ipaddress import MacAddress
|
||||
from core.services.coreservices import ServiceManager
|
||||
|
||||
EMANE_SERVICES = "zebra|OSPFv3MDR|IPForward"
|
||||
|
||||
|
||||
def node_message(_id, name, emulation_server=None, node_type=NodeTypes.DEFAULT, model=None):
|
||||
"""
|
||||
Convenience method for creating a node TLV messages.
|
||||
|
||||
:param int _id: node id
|
||||
:param str name: node name
|
||||
:param str emulation_server: distributed server name, if desired
|
||||
:param core.enumerations.NodeTypes node_type: node type
|
||||
:param str model: model for node
|
||||
:return: tlv message
|
||||
:rtype: core.api.coreapi.CoreNodeMessage
|
||||
"""
|
||||
values = [
|
||||
(NodeTlvs.NUMBER, _id),
|
||||
(NodeTlvs.TYPE, node_type.value),
|
||||
(NodeTlvs.NAME, name),
|
||||
(NodeTlvs.EMULATION_SERVER, emulation_server),
|
||||
]
|
||||
|
||||
if model:
|
||||
values.append((NodeTlvs.MODEL, model))
|
||||
|
||||
return CoreNodeMessage.create(MessageFlags.ADD.value, values)
|
||||
|
||||
|
||||
def link_message(n1, n2, intf_one=None, address_one=None, intf_two=None, address_two=None, key=None):
|
||||
"""
|
||||
Convenience method for creating link TLV messages.
|
||||
|
||||
:param int n1: node one id
|
||||
:param int n2: node two id
|
||||
:param int intf_one: node one interface id
|
||||
:param core.misc.ipaddress.IpAddress address_one: node one ip4 address
|
||||
:param int intf_two: node two interface id
|
||||
:param core.misc.ipaddress.IpAddress address_two: node two ip4 address
|
||||
:param int key: tunnel key for link if needed
|
||||
:return: tlv mesage
|
||||
:rtype: core.api.coreapi.CoreLinkMessage
|
||||
"""
|
||||
mac_one, mac_two = None, None
|
||||
if address_one:
|
||||
mac_one = MacAddress.random()
|
||||
if address_two:
|
||||
mac_two = MacAddress.random()
|
||||
|
||||
values = [
|
||||
(LinkTlvs.N1_NUMBER, n1),
|
||||
(LinkTlvs.N2_NUMBER, n2),
|
||||
(LinkTlvs.DELAY, 0),
|
||||
(LinkTlvs.BANDWIDTH, 0),
|
||||
(LinkTlvs.PER, "0"),
|
||||
(LinkTlvs.DUP, "0"),
|
||||
(LinkTlvs.JITTER, 0),
|
||||
(LinkTlvs.TYPE, LinkTypes.WIRED.value),
|
||||
(LinkTlvs.INTERFACE1_NUMBER, intf_one),
|
||||
(LinkTlvs.INTERFACE1_IP4, address_one),
|
||||
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
|
||||
(LinkTlvs.INTERFACE1_MAC, mac_one),
|
||||
(LinkTlvs.INTERFACE2_NUMBER, intf_two),
|
||||
(LinkTlvs.INTERFACE2_IP4, address_two),
|
||||
(LinkTlvs.INTERFACE2_IP4_MASK, 24),
|
||||
(LinkTlvs.INTERFACE2_MAC, mac_two),
|
||||
]
|
||||
|
||||
if key:
|
||||
values.append((LinkTlvs.KEY, key))
|
||||
|
||||
return CoreLinkMessage.create(MessageFlags.ADD.value, values)
|
||||
|
||||
|
||||
def command_message(node, command):
|
||||
"""
|
||||
Create an execute command TLV message.
|
||||
|
||||
:param node: node to execute command for
|
||||
:param command: command to execute
|
||||
:return: tlv message
|
||||
:rtype: core.api.coreapi.CoreExecMessage
|
||||
"""
|
||||
flags = MessageFlags.STRING.value | MessageFlags.TEXT.value
|
||||
return CoreExecMessage.create(flags, [
|
||||
(ExecuteTlvs.NODE, node.id),
|
||||
(ExecuteTlvs.NUMBER, 1),
|
||||
(ExecuteTlvs.COMMAND, command)
|
||||
])
|
||||
|
||||
|
||||
def state_message(state):
|
||||
"""
|
||||
Create a event TLV message for a new state.
|
||||
|
||||
:param core.enumerations.EventTypes state: state to create message for
|
||||
:return: tlv message
|
||||
:rtype: core.api.coreapi.CoreEventMessage
|
||||
"""
|
||||
return CoreEventMessage.create(0, [
|
||||
(EventTlvs.TYPE, state.value)
|
||||
])
|
||||
|
||||
|
||||
class CoreServerTest(object):
|
||||
def __init__(self, port=CORE_API_PORT):
|
||||
self.host = "localhost"
|
||||
|
@ -152,13 +40,12 @@ class CoreServerTest(object):
|
|||
self.session = None
|
||||
self.request_handler = None
|
||||
|
||||
def setup(self, distributed_address, port):
|
||||
def setup(self, distributed_address):
|
||||
# validate address
|
||||
assert distributed_address, "distributed server address was not provided"
|
||||
|
||||
# create session
|
||||
self.session = self.server.coreemu.create_session(1)
|
||||
self.session.master = True
|
||||
|
||||
# create request handler
|
||||
request_mock = MagicMock()
|
||||
|
@ -170,11 +57,11 @@ class CoreServerTest(object):
|
|||
|
||||
# have broker handle a configuration state change
|
||||
self.session.set_state(EventTypes.DEFINITION_STATE)
|
||||
message = state_message(EventTypes.CONFIGURATION_STATE)
|
||||
message = CoreEventMessage.create(0, [(EventTlvs.TYPE, EventTypes.CONFIGURATION_STATE.value)])
|
||||
self.request_handler.handle_message(message)
|
||||
|
||||
# add broker server for distributed core
|
||||
distributed = "%s:%s:%s" % (self.distributed_server, distributed_address, port)
|
||||
distributed = "%s:%s:%s" % (self.distributed_server, distributed_address, self.port)
|
||||
message = CoreConfMessage.create(0, [
|
||||
(ConfigTlvs.OBJECT, "broker"),
|
||||
(ConfigTlvs.TYPE, 0),
|
||||
|
@ -204,7 +91,6 @@ class CoreServerTest(object):
|
|||
|
||||
def shutdown(self):
|
||||
self.server.coreemu.shutdown()
|
||||
self.server.shutdown()
|
||||
self.server.server_close()
|
||||
|
||||
|
||||
|
|
|
@ -1,15 +1,125 @@
|
|||
"""
|
||||
Unit tests for testing CORE with distributed networks.
|
||||
"""
|
||||
from core.emane.ieee80211abg import EmaneIeee80211abgModel
|
||||
|
||||
import conftest
|
||||
|
||||
from core.api.tlv.coreapi import CoreExecMessage
|
||||
from core.emulator.enumerations import EventTypes
|
||||
from core.api.tlv.coreapi import CoreExecMessage, CoreNodeMessage, CoreLinkMessage, CoreEventMessage, CoreConfMessage
|
||||
from core.emulator.enumerations import EventTypes, NodeTlvs, LinkTlvs, LinkTypes, EventTlvs, ConfigTlvs, ConfigFlags
|
||||
from core.emulator.enumerations import ExecuteTlvs
|
||||
from core.emulator.enumerations import MessageFlags
|
||||
from core.emulator.enumerations import NodeTypes
|
||||
from core.nodes.ipaddress import IpAddress
|
||||
from core.nodes.ipaddress import IpAddress, MacAddress, Ipv4Prefix
|
||||
|
||||
|
||||
def set_emane_model(node_id, model):
|
||||
return CoreConfMessage.create(0, [
|
||||
(ConfigTlvs.NODE, node_id),
|
||||
(ConfigTlvs.OBJECT, model),
|
||||
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
|
||||
])
|
||||
|
||||
|
||||
def node_message(_id, name, emulation_server=None, node_type=NodeTypes.DEFAULT, model=None):
|
||||
"""
|
||||
Convenience method for creating a node TLV messages.
|
||||
|
||||
:param int _id: node id
|
||||
:param str name: node name
|
||||
:param str emulation_server: distributed server name, if desired
|
||||
:param core.emulator.enumerations.NodeTypes node_type: node type
|
||||
:param str model: model for node
|
||||
:return: tlv message
|
||||
:rtype: core.api.tlv.coreapi.CoreNodeMessage
|
||||
"""
|
||||
values = [
|
||||
(NodeTlvs.NUMBER, _id),
|
||||
(NodeTlvs.TYPE, node_type.value),
|
||||
(NodeTlvs.NAME, name),
|
||||
(NodeTlvs.EMULATION_SERVER, emulation_server),
|
||||
(NodeTlvs.X_POSITION, 0),
|
||||
(NodeTlvs.Y_POSITION, 0),
|
||||
]
|
||||
|
||||
if model:
|
||||
values.append((NodeTlvs.MODEL, model))
|
||||
|
||||
return CoreNodeMessage.create(MessageFlags.ADD.value, values)
|
||||
|
||||
|
||||
def link_message(n1, n2, intf_one=None, address_one=None, intf_two=None, address_two=None, key=None, mask=24):
|
||||
"""
|
||||
Convenience method for creating link TLV messages.
|
||||
|
||||
:param int n1: node one id
|
||||
:param int n2: node two id
|
||||
:param int intf_one: node one interface id
|
||||
:param core.nodes.ipaddress.IpAddress address_one: node one ip4 address
|
||||
:param int intf_two: node two interface id
|
||||
:param core.nodes.ipaddress.IpAddress address_two: node two ip4 address
|
||||
:param int key: tunnel key for link if needed
|
||||
:param int mask: ip4 mask to use for link
|
||||
:return: tlv mesage
|
||||
:rtype: core.api.tlv.coreapi.CoreLinkMessage
|
||||
"""
|
||||
mac_one, mac_two = None, None
|
||||
if address_one:
|
||||
mac_one = MacAddress.random()
|
||||
if address_two:
|
||||
mac_two = MacAddress.random()
|
||||
|
||||
values = [
|
||||
(LinkTlvs.N1_NUMBER, n1),
|
||||
(LinkTlvs.N2_NUMBER, n2),
|
||||
(LinkTlvs.DELAY, 0),
|
||||
(LinkTlvs.BANDWIDTH, 0),
|
||||
(LinkTlvs.PER, "0"),
|
||||
(LinkTlvs.DUP, "0"),
|
||||
(LinkTlvs.JITTER, 0),
|
||||
(LinkTlvs.TYPE, LinkTypes.WIRED.value),
|
||||
(LinkTlvs.INTERFACE1_NUMBER, intf_one),
|
||||
(LinkTlvs.INTERFACE1_IP4, address_one),
|
||||
(LinkTlvs.INTERFACE1_IP4_MASK, mask),
|
||||
(LinkTlvs.INTERFACE1_MAC, mac_one),
|
||||
(LinkTlvs.INTERFACE2_NUMBER, intf_two),
|
||||
(LinkTlvs.INTERFACE2_IP4, address_two),
|
||||
(LinkTlvs.INTERFACE2_IP4_MASK, mask),
|
||||
(LinkTlvs.INTERFACE2_MAC, mac_two),
|
||||
]
|
||||
|
||||
if key:
|
||||
values.append((LinkTlvs.KEY, key))
|
||||
|
||||
return CoreLinkMessage.create(MessageFlags.ADD.value, values)
|
||||
|
||||
|
||||
def command_message(node, command):
|
||||
"""
|
||||
Create an execute command TLV message.
|
||||
|
||||
:param node: node to execute command for
|
||||
:param command: command to execute
|
||||
:return: tlv message
|
||||
:rtype: core.api.tlv.coreapi.CoreExecMessage
|
||||
"""
|
||||
flags = MessageFlags.STRING.value | MessageFlags.TEXT.value
|
||||
return CoreExecMessage.create(flags, [
|
||||
(ExecuteTlvs.NODE, node.id),
|
||||
(ExecuteTlvs.NUMBER, 1),
|
||||
(ExecuteTlvs.COMMAND, command)
|
||||
])
|
||||
|
||||
|
||||
def state_message(state):
|
||||
"""
|
||||
Create a event TLV message for a new state.
|
||||
|
||||
:param core.enumerations.EventTypes state: state to create message for
|
||||
:return: tlv message
|
||||
:rtype: core.api.tlv.coreapi.CoreEventMessage
|
||||
"""
|
||||
return CoreEventMessage.create(0, [
|
||||
(EventTlvs.TYPE, state.value)
|
||||
])
|
||||
|
||||
|
||||
def validate_response(replies, _):
|
||||
|
@ -28,18 +138,18 @@ def validate_response(replies, _):
|
|||
|
||||
|
||||
class TestDistributed:
|
||||
def test_distributed(self, cored, distributed_address):
|
||||
def test_switch(self, cored, distributed_address):
|
||||
"""
|
||||
Test creating a distributed network.
|
||||
Test creating a distributed switch network.
|
||||
|
||||
:param core.coreserver.CoreServer conftest.Core cored: core daemon server to test with
|
||||
:param core.api.tlv.coreserver.CoreServer conftest.Core cored: core daemon server to test with
|
||||
:param str distributed_address: distributed server to test against
|
||||
"""
|
||||
# initialize server for testing
|
||||
cored.setup(distributed_address)
|
||||
|
||||
# create local node
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=1,
|
||||
name="n1",
|
||||
model="host"
|
||||
|
@ -47,7 +157,7 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed node and assign to distributed server
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=2,
|
||||
name="n2",
|
||||
emulation_server=cored.distributed_server,
|
||||
|
@ -56,17 +166,16 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed switch and assign to distributed server
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=3,
|
||||
name="n3",
|
||||
emulation_server=cored.distributed_server,
|
||||
node_type=NodeTypes.SWITCH
|
||||
)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# link message one
|
||||
ip4_address = cored.prefix.addr(1)
|
||||
message = conftest.link_message(
|
||||
message = link_message(
|
||||
n1=1,
|
||||
n2=3,
|
||||
intf_one=0,
|
||||
|
@ -76,7 +185,7 @@ class TestDistributed:
|
|||
|
||||
# link message two
|
||||
ip4_address = cored.prefix.addr(2)
|
||||
message = conftest.link_message(
|
||||
message = link_message(
|
||||
n1=3,
|
||||
n2=2,
|
||||
intf_two=0,
|
||||
|
@ -85,12 +194,86 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# change session to instantiation state
|
||||
message = conftest.state_message(EventTypes.INSTANTIATION_STATE)
|
||||
message = state_message(EventTypes.INSTANTIATION_STATE)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# test a ping command
|
||||
node_one = cored.session.get_node(1)
|
||||
message = conftest.command_message(node_one, "ping -c 5 %s" % ip4_address)
|
||||
message = command_message(node_one, "ping -c 5 %s" % ip4_address)
|
||||
cored.request_handler.dispatch_replies = validate_response
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
def test_emane(self, cored, distributed_address):
|
||||
"""
|
||||
Test creating a distributed emane network.
|
||||
|
||||
:param core.api.tlv.coreserver.CoreServer conftest.Core cored: core daemon server to test with
|
||||
:param str distributed_address: distributed server to test against
|
||||
"""
|
||||
# initialize server for testing
|
||||
cored.setup(distributed_address)
|
||||
|
||||
# configure required controlnet
|
||||
cored.session.options.set_config("controlnet", "core1:172.16.1.0/24 core2:172.16.2.0/24")
|
||||
|
||||
# create local node
|
||||
message = node_message(
|
||||
_id=1,
|
||||
name="n1",
|
||||
model="mdr"
|
||||
)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed node and assign to distributed server
|
||||
message = node_message(
|
||||
_id=2,
|
||||
name="n2",
|
||||
emulation_server=cored.distributed_server,
|
||||
model="mdr"
|
||||
)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed switch and assign to distributed server
|
||||
message = node_message(
|
||||
_id=3,
|
||||
name="n3",
|
||||
node_type=NodeTypes.EMANE
|
||||
)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# set emane model
|
||||
message = set_emane_model(3, EmaneIeee80211abgModel.name)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# link message one
|
||||
ip4_address = cored.prefix.addr(1)
|
||||
message = link_message(
|
||||
n1=1,
|
||||
n2=3,
|
||||
intf_one=0,
|
||||
address_one=ip4_address,
|
||||
mask=32
|
||||
)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# link message two
|
||||
ip4_address = cored.prefix.addr(2)
|
||||
message = link_message(
|
||||
n1=2,
|
||||
n2=3,
|
||||
intf_one=0,
|
||||
address_one=ip4_address,
|
||||
mask=32
|
||||
)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# change session to instantiation state
|
||||
message = state_message(EventTypes.INSTANTIATION_STATE)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# test a ping command
|
||||
node_one = cored.session.get_node(1)
|
||||
message = command_message(node_one, "ping -c 5 %s" % ip4_address)
|
||||
cored.request_handler.dispatch_replies = validate_response
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
|
@ -98,14 +281,14 @@ class TestDistributed:
|
|||
"""
|
||||
Test creating a distributed prouter node.
|
||||
|
||||
:param core.coreserver.CoreServer conftest.Core cored: core daemon server to test with
|
||||
:param core.coreserver.CoreServer Core cored: core daemon server to test with
|
||||
:param str distributed_address: distributed server to test against
|
||||
"""
|
||||
# initialize server for testing
|
||||
cored.setup(distributed_address)
|
||||
|
||||
# create local node
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=1,
|
||||
name="n1",
|
||||
model="host"
|
||||
|
@ -113,7 +296,7 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed node and assign to distributed server
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=2,
|
||||
name="n2",
|
||||
emulation_server=cored.distributed_server,
|
||||
|
@ -123,7 +306,7 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed switch and assign to distributed server
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=3,
|
||||
name="n3",
|
||||
node_type=NodeTypes.SWITCH
|
||||
|
@ -132,7 +315,7 @@ class TestDistributed:
|
|||
|
||||
# link message one
|
||||
ip4_address = cored.prefix.addr(1)
|
||||
message = conftest.link_message(
|
||||
message = link_message(
|
||||
n1=1,
|
||||
n2=3,
|
||||
intf_one=0,
|
||||
|
@ -142,7 +325,7 @@ class TestDistributed:
|
|||
|
||||
# link message two
|
||||
ip4_address = cored.prefix.addr(2)
|
||||
message = conftest.link_message(
|
||||
message = link_message(
|
||||
n1=3,
|
||||
n2=2,
|
||||
intf_two=0,
|
||||
|
@ -151,12 +334,12 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# change session to instantiation state
|
||||
message = conftest.state_message(EventTypes.INSTANTIATION_STATE)
|
||||
message = state_message(EventTypes.INSTANTIATION_STATE)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
||||
# test a ping command
|
||||
node_one = cored.session.get_node(1)
|
||||
message = conftest.command_message(node_one, "ping -c 5 %s" % ip4_address)
|
||||
message = command_message(node_one, "ping -c 5 %s" % ip4_address)
|
||||
cored.request_handler.dispatch_replies = validate_response
|
||||
cored.request_handler.handle_message(message)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
@ -165,14 +348,14 @@ class TestDistributed:
|
|||
"""
|
||||
Test session broker creation.
|
||||
|
||||
:param core.coreserver.CoreServer conftest.Core cored: core daemon server to test with
|
||||
:param core.coreserver.CoreServer Core cored: core daemon server to test with
|
||||
:param str distributed_address: distributed server to test against
|
||||
"""
|
||||
# initialize server for testing
|
||||
cored.setup(distributed_address)
|
||||
|
||||
# create local node
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=1,
|
||||
name="n1",
|
||||
model="host"
|
||||
|
@ -180,7 +363,7 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# create distributed node and assign to distributed server
|
||||
message = conftest.node_message(
|
||||
message = node_message(
|
||||
_id=2,
|
||||
name=distributed_address,
|
||||
emulation_server=cored.distributed_server,
|
||||
|
@ -191,7 +374,7 @@ class TestDistributed:
|
|||
# link message one
|
||||
ip4_address = cored.prefix.addr(1)
|
||||
address_two = IpAddress.from_string(distributed_address)
|
||||
message = conftest.link_message(
|
||||
message = link_message(
|
||||
n1=1,
|
||||
n2=2,
|
||||
intf_one=0,
|
||||
|
@ -203,5 +386,5 @@ class TestDistributed:
|
|||
cored.request_handler.handle_message(message)
|
||||
|
||||
# change session to instantiation state
|
||||
message = conftest.state_message(EventTypes.INSTANTIATION_STATE)
|
||||
message = state_message(EventTypes.INSTANTIATION_STATE)
|
||||
cored.request_handler.handle_message(message)
|
||||
|
|
Loading…
Reference in a new issue