Merge pull request #262 from coreemu/tlv-tests

TLV Message Handling Tests
This commit is contained in:
bharnden 2019-06-25 12:37:59 -07:00 committed by GitHub
commit f8960cc519
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 874 additions and 150 deletions

View file

@ -1035,8 +1035,10 @@ class CoreHandler(socketserver.BaseRequestHandler):
if message_type == ConfigFlags.REQUEST:
node_id = config_data.node
metadata_configs = self.session.metadata.get_configs()
if metadata_configs is None:
metadata_configs = {}
data_values = "|".join(["%s=%s" % (x, metadata_configs[x]) for x in metadata_configs])
data_types = tuple(ConfigDataTypes.STRING.value for _ in self.session.metadata.get_configs())
data_types = tuple(ConfigDataTypes.STRING.value for _ in metadata_configs)
config_response = ConfigData(
message_type=0,
node=node_id,
@ -1396,7 +1398,7 @@ class CoreHandler(socketserver.BaseRequestHandler):
open_file.write(data)
return ()
self.session.node_add_file(node_num, source_name, file_name, data)
self.session.add_node_file(node_num, source_name, file_name, data)
else:
raise NotImplementedError
@ -1639,10 +1641,10 @@ class CoreHandler(socketserver.BaseRequestHandler):
logging.info("request to connect to session %s", session_id)
# remove client from session broker and shutdown if needed
self.remove_session_handlers()
self.session.broker.session_clients.remove(self)
if not self.session.broker.session_clients and not self.session.is_active():
self.coreemu.delete_session(self.session.id)
self.remove_session_handlers()
# set session to join
self.session = session

View file

@ -361,6 +361,18 @@ class Session(object):
self.delete_node(net_one.id)
node_one.delnetif(interface_one.netindex)
node_two.delnetif(interface_two.netindex)
elif node_one and net_one:
interface = node_one.netif(interface_one_id)
logging.info("deleting link node(%s):interface(%s) node(%s)",
node_one.name, interface.name, net_one.name)
interface.detachnet()
node_one.delnetif(interface.netindex)
elif node_two and net_one:
interface = node_two.netif(interface_two_id)
logging.info("deleting link node(%s):interface(%s) node(%s)",
node_two.name, interface.name, net_one.name)
interface.detachnet()
node_two.delnetif(interface.netindex)
finally:
if node_one:
node_one.lock.release()
@ -616,6 +628,9 @@ class Session(object):
# clear out existing session
self.clear()
if start:
self.set_state(EventTypes.CONFIGURATION_STATE)
# write out xml file
CoreXmlReader(self).read(file_name)

View file

@ -53,6 +53,14 @@ class MobilityManager(ModelManager):
self.physnets = {}
self.session.broker.handlers.add(self.physnodehandlelink)
def reset(self):
"""
Clear out all current configurations.
:return: nothing
"""
self.config_reset()
def startup(self, node_ids=None):
"""
Session is transitioning from instantiation to runtime state.

View file

@ -40,6 +40,14 @@ class CoreServerTest(object):
self.session = None
self.request_handler = None
def setup_handler(self):
self.session = self.server.coreemu.create_session(1)
request_mock = MagicMock()
request_mock.fileno = MagicMock(return_value=1)
self.request_handler = CoreHandler(request_mock, "", self.server)
self.request_handler.session = self.session
self.request_handler.add_session_handlers()
def setup(self, distributed_address):
# validate address
assert distributed_address, "distributed server address was not provided"
@ -154,6 +162,20 @@ def cored():
ServiceManager.services.clear()
@pytest.fixture()
def coreserver():
# create and return server
server = CoreServerTest()
server.setup_handler()
yield server
# cleanup
server.shutdown()
# cleanup services
ServiceManager.services.clear()
def ping(from_node, to_node, ip_prefixes, count=3):
address = ip_prefixes.ip4_address(to_node)
return from_node.cmd(["ping", "-c", str(count), address])

View file

@ -1,174 +1,851 @@
"""
Unit tests for testing with a CORE switch.
Tests for testing tlv message handling.
"""
import os
import time
import threading
import mock
import pytest
from core.api.tlv import coreapi, dataconversion
from core.api.tlv.coreapi import CoreExecuteTlv
from core.emulator.enumerations import CORE_API_PORT, NodeTypes
from core.emulator.enumerations import EventTlvs
from core.emulator.enumerations import EventTypes
from core.api.tlv import coreapi
from core.emane.ieee80211abg import EmaneIeee80211abgModel
from core.emulator.enumerations import EventTlvs, SessionTlvs, EventTypes, FileTlvs, RegisterTlvs, ConfigTlvs, \
ConfigFlags
from core.emulator.enumerations import ExecuteTlvs
from core.emulator.enumerations import LinkTlvs
from core.emulator.enumerations import LinkTypes
from core.emulator.enumerations import MessageFlags
from core.emulator.enumerations import MessageTypes
from core.nodes import ipaddress
from core.emulator.enumerations import NodeTypes, NodeTlvs
from core.location.mobility import BasicRangeModel
from core.nodes.ipaddress import Ipv4Prefix
def command_message(node, command):
"""
Create an execute command TLV message.
:param node: node to execute command for
:param command: command to execute
:return: packed execute message
"""
tlv_data = CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, node.id)
tlv_data += CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, 1)
tlv_data += CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, command)
return coreapi.CoreExecMessage.pack(MessageFlags.STRING.value | MessageFlags.TEXT.value, tlv_data)
def state_message(state):
"""
Create a event TLV message for a new state.
:param core.enumerations.EventTypes state: state to create message for
:return: packed event message
"""
tlv_data = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, state.value)
return coreapi.CoreEventMessage.pack(0, tlv_data)
def switch_link_message(switch, node, address, prefix_len):
"""
Create a link TLV message for node to a switch, with the provided address and prefix length.
:param switch: switch for link
:param node: node for link
:param address: address node on link
:param prefix_len: prefix length of address
:return: packed link message
"""
tlv_data = coreapi.CoreLinkTlv.pack(LinkTlvs.N1_NUMBER.value, switch.id)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.N2_NUMBER.value, node.id)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.TYPE.value, LinkTypes.WIRED.value)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.INTERFACE2_NUMBER.value, 0)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.INTERFACE2_IP4.value, address)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.INTERFACE2_IP4_MASK.value, prefix_len)
return coreapi.CoreLinkMessage.pack(MessageFlags.ADD.value, tlv_data)
def run_cmd(node, exec_cmd):
"""
Convenience method for sending commands to a node using the legacy API.
:param node: The node the command should be issued too
:param exec_cmd: A string with the command to be run
:return: Returns the result of the command
"""
# Set up the command api message
# tlv_data = CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, node.id)
# tlv_data += CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, 1)
# tlv_data += CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, exec_cmd)
# message = coreapi.CoreExecMessage.pack(MessageFlags.STRING.value | MessageFlags.TEXT.value, tlv_data)
message = command_message(node, exec_cmd)
node.session.broker.handlerawmsg(message)
# Now wait for the response
server = node.session.broker.servers["localhost"]
server.sock.settimeout(50.0)
# receive messages until we get our execute response
result = None
status = False
while True:
message_header = server.sock.recv(coreapi.CoreMessage.header_len)
message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(message_header)
message_data = server.sock.recv(message_length)
# If we get the right response return the results
print("received response message: %s" % message_type)
if message_type == MessageTypes.EXECUTE.value:
message = coreapi.CoreExecMessage(message_flags, message_header, message_data)
result = message.get_tlv(ExecuteTlvs.RESULT.value)
status = message.get_tlv(ExecuteTlvs.STATUS.value)
break
return result, status
def dict_to_str(values):
return "|".join("%s=%s" % (x, values[x]) for x in values)
class TestGui:
def test_broker(self, cored):
"""
Test session broker creation.
@pytest.mark.parametrize("node_type, model", [
(NodeTypes.DEFAULT, "PC"),
(NodeTypes.EMANE, None),
(NodeTypes.HUB, None),
(NodeTypes.SWITCH, None),
(NodeTypes.WIRELESS_LAN, None),
(NodeTypes.TUNNEL, None),
(NodeTypes.RJ45, None),
])
def test_node_add(self, coreserver, node_type, model):
node_id = 1
message = coreapi.CoreNodeMessage.create(MessageFlags.ADD.value, [
(NodeTlvs.NUMBER, node_id),
(NodeTlvs.TYPE, node_type.value),
(NodeTlvs.NAME, "n1"),
(NodeTlvs.X_POSITION, 0),
(NodeTlvs.Y_POSITION, 0),
(NodeTlvs.MODEL, model),
])
:param core.emulator.coreemu.EmuSession session: session for test
:param cored: cored daemon server to test with
"""
coreserver.request_handler.handle_message(message)
# set core daemon to run in the background
thread = threading.Thread(target=cored.server.serve_forever)
thread.daemon = True
thread.start()
assert coreserver.session.get_node(node_id) is not None
# ip prefix for nodes
prefix = ipaddress.Ipv4Prefix("10.83.0.0/16")
daemon = "localhost"
def test_node_update(self, coreserver):
node_id = 1
coreserver.session.add_node(_id=node_id)
x = 50
y = 100
message = coreapi.CoreNodeMessage.create(0, [
(NodeTlvs.NUMBER, node_id),
(NodeTlvs.X_POSITION, x),
(NodeTlvs.Y_POSITION, y),
])
# add server
session = cored.server.coreemu.create_session()
session.broker.addserver(daemon, "127.0.0.1", CORE_API_PORT)
coreserver.request_handler.handle_message(message)
# setup server
session.broker.setupserver(daemon)
node = coreserver.session.get_node(node_id)
assert node is not None
assert node.position.x == x
assert node.position.y == y
# do not want the recvloop running as we will deal ourselves
session.broker.dorecvloop = False
def test_node_delete(self, coreserver):
node_id = 1
coreserver.session.add_node(_id=node_id)
message = coreapi.CoreNodeMessage.create(MessageFlags.DELETE.value, [
(NodeTlvs.NUMBER, node_id),
])
# have broker handle a configuration state change
session.set_state(EventTypes.CONFIGURATION_STATE)
event_message = state_message(EventTypes.CONFIGURATION_STATE)
session.broker.handlerawmsg(event_message)
coreserver.request_handler.handle_message(message)
# create a switch node
switch = session.add_node(_type=NodeTypes.SWITCH)
switch.setposition(x=80, y=50)
switch.server = daemon
with pytest.raises(KeyError):
coreserver.session.get_node(node_id)
# retrieve switch data representation, create a switch message for broker to handle
switch_data = switch.data(MessageFlags.ADD.value)
switch_message = dataconversion.convert_node(switch_data)
session.broker.handlerawmsg(switch_message)
def test_link_add_node_to_net(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
# create node one
node_one = session.add_node()
node_one.server = daemon
coreserver.request_handler.handle_message(message)
# create node two
node_two = session.add_node()
node_two.server = daemon
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
# create node messages for the broker to handle
for node in [node_one, node_two]:
node_data = node.data(MessageFlags.ADD.value)
node_message = dataconversion.convert_node(node_data)
session.broker.handlerawmsg(node_message)
def test_link_add_net_to_node(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, switch),
(LinkTlvs.N2_NUMBER, node_one),
(LinkTlvs.INTERFACE2_NUMBER, 0),
(LinkTlvs.INTERFACE2_IP4, interface_one),
(LinkTlvs.INTERFACE2_IP4_MASK, 24),
])
# create links to switch from nodes for broker to handle
for index, node in enumerate([node_one, node_two], start=1):
ip4_address = prefix.addr(index)
link_message = switch_link_message(switch, node, ip4_address, prefix.prefixlen)
session.broker.handlerawmsg(link_message)
coreserver.request_handler.handle_message(message)
# change session to instantiation state
event_message = state_message(EventTypes.INSTANTIATION_STATE)
session.broker.handlerawmsg(event_message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
# Get the ip or last node and ping it from the first
output, status = run_cmd(node_one, "ip -4 -o addr show dev eth0")
pingip = output.split()[3].split("/")[0]
output, status = run_cmd(node_two, "ping -c 5 " + pingip)
assert not status
def test_link_add_node_to_node(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
node_two = 2
coreserver.session.add_node(_id=node_two)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
interface_two = ip_prefix.addr(node_two)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, node_two),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
(LinkTlvs.INTERFACE2_NUMBER, 0),
(LinkTlvs.INTERFACE2_IP4, interface_two),
(LinkTlvs.INTERFACE2_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
all_links = []
for node_id in coreserver.session.nodes:
node = coreserver.session.nodes[node_id]
all_links += node.all_link_data(0)
assert len(all_links) == 1
def test_link_update(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
link = all_links[0]
assert link.bandwidth is None
bandwidth = 50000
message = coreapi.CoreLinkMessage.create(0, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.BANDWIDTH, bandwidth),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
link = all_links[0]
assert link.bandwidth == bandwidth
def test_link_delete_node_to_node(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
node_two = 2
coreserver.session.add_node(_id=node_two)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
interface_two = ip_prefix.addr(node_two)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, node_two),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
(LinkTlvs.INTERFACE2_IP4, interface_two),
(LinkTlvs.INTERFACE2_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
all_links = []
for node_id in coreserver.session.nodes:
node = coreserver.session.nodes[node_id]
all_links += node.all_link_data(0)
assert len(all_links) == 1
message = coreapi.CoreLinkMessage.create(MessageFlags.DELETE.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, node_two),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE2_NUMBER, 0),
])
coreserver.request_handler.handle_message(message)
all_links = []
for node_id in coreserver.session.nodes:
node = coreserver.session.nodes[node_id]
all_links += node.all_link_data(0)
assert len(all_links) == 0
def test_link_delete_node_to_net(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
message = coreapi.CoreLinkMessage.create(MessageFlags.DELETE.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 0
def test_link_delete_net_to_node(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
message = coreapi.CoreLinkMessage.create(MessageFlags.DELETE.value, [
(LinkTlvs.N1_NUMBER, switch),
(LinkTlvs.N2_NUMBER, node_one),
(LinkTlvs.INTERFACE2_NUMBER, 0),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 0
def test_session_update(self, coreserver):
session_id = coreserver.session.id
name = "test"
message = coreapi.CoreSessionMessage.create(0, [
(SessionTlvs.NUMBER, str(session_id)),
(SessionTlvs.NAME, name),
])
coreserver.request_handler.handle_message(message)
assert coreserver.session.name == name
def test_session_query(self, coreserver):
coreserver.request_handler.dispatch_replies = mock.MagicMock()
message = coreapi.CoreSessionMessage.create(MessageFlags.STRING.value, [])
coreserver.request_handler.handle_message(message)
args, _ = coreserver.request_handler.dispatch_replies.call_args
replies = args[0]
assert len(replies) == 1
def test_session_join(self, coreserver):
coreserver.request_handler.dispatch_replies = mock.MagicMock()
session_id = coreserver.session.id
message = coreapi.CoreSessionMessage.create(MessageFlags.ADD.value, [
(SessionTlvs.NUMBER, str(session_id)),
])
coreserver.request_handler.handle_message(message)
assert coreserver.request_handler.session.id == session_id
def test_session_delete(self, coreserver):
assert len(coreserver.server.coreemu.sessions) == 1
session_id = coreserver.session.id
message = coreapi.CoreSessionMessage.create(MessageFlags.DELETE.value, [
(SessionTlvs.NUMBER, str(session_id)),
])
coreserver.request_handler.handle_message(message)
assert len(coreserver.server.coreemu.sessions) == 0
def test_file_hook_add(self, coreserver):
state = EventTypes.DATACOLLECT_STATE.value
assert coreserver.session._hooks.get(state) is None
file_name = "test.sh"
file_data = "echo hello"
message = coreapi.CoreFileMessage.create(MessageFlags.ADD.value, [
(FileTlvs.TYPE, "hook:%s" % state),
(FileTlvs.NAME, file_name),
(FileTlvs.DATA, file_data),
])
coreserver.request_handler.handle_message(message)
hooks = coreserver.session._hooks.get(state)
assert len(hooks) == 1
name, data = hooks[0]
assert file_name == name
assert file_data == data
def test_file_service_file_set(self, coreserver):
node = coreserver.session.add_node()
service = "DefaultRoute"
file_name = "defaultroute.sh"
file_data = "echo hello"
message = coreapi.CoreFileMessage.create(MessageFlags.ADD.value, [
(FileTlvs.NODE, node.id),
(FileTlvs.TYPE, "service:%s" % service),
(FileTlvs.NAME, file_name),
(FileTlvs.DATA, file_data),
])
coreserver.request_handler.handle_message(message)
service_file = coreserver.session.services.get_service_file(node, service, file_name)
assert file_data == service_file.data
def test_file_node_file_copy(self, coreserver):
file_name = "/var/log/test/node.log"
node = coreserver.session.add_node()
node.makenodedir()
file_data = "echo hello"
message = coreapi.CoreFileMessage.create(MessageFlags.ADD.value, [
(FileTlvs.NODE, node.id),
(FileTlvs.NAME, file_name),
(FileTlvs.DATA, file_data),
])
coreserver.request_handler.handle_message(message)
directory, basename = os.path.split(file_name)
created_directory = directory[1:].replace("/", ".")
create_path = os.path.join(node.nodedir, created_directory, basename)
assert os.path.exists(create_path)
def test_exec_node_tty(self, coreserver):
coreserver.request_handler.dispatch_replies = mock.MagicMock()
node = coreserver.session.add_node()
node.startup()
message = coreapi.CoreExecMessage.create(MessageFlags.TTY.value, [
(ExecuteTlvs.NODE, node.id),
(ExecuteTlvs.NUMBER, 1),
(ExecuteTlvs.COMMAND, "bash")
])
coreserver.request_handler.handle_message(message)
args, _ = coreserver.request_handler.dispatch_replies.call_args
replies = args[0]
assert len(replies) == 1
def test_exec_local_command(self, coreserver):
coreserver.request_handler.dispatch_replies = mock.MagicMock()
node = coreserver.session.add_node()
node.startup()
message = coreapi.CoreExecMessage.create(
MessageFlags.TEXT.value | MessageFlags.LOCAL.value, [
(ExecuteTlvs.NODE, node.id),
(ExecuteTlvs.NUMBER, 1),
(ExecuteTlvs.COMMAND, "echo hello")
])
coreserver.request_handler.handle_message(message)
args, _ = coreserver.request_handler.dispatch_replies.call_args
replies = args[0]
assert len(replies) == 1
def test_exec_node_command(self, coreserver):
coreserver.request_handler.dispatch_replies = mock.MagicMock()
node = coreserver.session.add_node()
node.startup()
message = coreapi.CoreExecMessage.create(
MessageFlags.TEXT.value, [
(ExecuteTlvs.NODE, node.id),
(ExecuteTlvs.NUMBER, 1),
(ExecuteTlvs.COMMAND, "echo hello")
])
coreserver.request_handler.handle_message(message)
args, _ = coreserver.request_handler.dispatch_replies.call_args
replies = args[0]
assert len(replies) == 1
@pytest.mark.parametrize("state", [
EventTypes.SHUTDOWN_STATE,
EventTypes.RUNTIME_STATE,
EventTypes.DATACOLLECT_STATE,
EventTypes.CONFIGURATION_STATE,
EventTypes.DEFINITION_STATE
])
def test_event_state(self, coreserver, state):
message = coreapi.CoreEventMessage.create(0, [
(EventTlvs.TYPE, state.value),
])
coreserver.request_handler.handle_message(message)
assert coreserver.session.state == state.value
def test_event_schedule(self, coreserver):
coreserver.session.add_event = mock.MagicMock()
node = coreserver.session.add_node()
message = coreapi.CoreEventMessage.create(MessageFlags.ADD.value, [
(EventTlvs.TYPE, EventTypes.SCHEDULED.value),
(EventTlvs.TIME, str(time.time() + 100)),
(EventTlvs.NODE, node.id),
(EventTlvs.NAME, "event"),
(EventTlvs.DATA, "data"),
])
coreserver.request_handler.handle_message(message)
coreserver.session.add_event.assert_called_once()
def test_event_save_xml(self, coreserver, tmpdir):
xml_file = tmpdir.join("session.xml")
file_path = xml_file.strpath
coreserver.session.add_node()
message = coreapi.CoreEventMessage.create(0, [
(EventTlvs.TYPE, EventTypes.FILE_SAVE.value),
(EventTlvs.NAME, file_path),
])
coreserver.request_handler.handle_message(message)
assert os.path.exists(file_path)
def test_event_open_xml(self, coreserver, tmpdir):
xml_file = tmpdir.join("session.xml")
file_path = xml_file.strpath
node = coreserver.session.add_node()
coreserver.session.save_xml(file_path)
coreserver.session.delete_node(node.id)
message = coreapi.CoreEventMessage.create(0, [
(EventTlvs.TYPE, EventTypes.FILE_OPEN.value),
(EventTlvs.NAME, file_path),
])
coreserver.request_handler.handle_message(message)
assert coreserver.session.get_node(node.id)
@pytest.mark.parametrize("state", [
EventTypes.START,
EventTypes.STOP,
EventTypes.RESTART,
EventTypes.PAUSE,
EventTypes.RECONFIGURE
])
def test_event_service(self, coreserver, state):
coreserver.session.broadcast_event = mock.MagicMock()
node = coreserver.session.add_node()
node.startup()
message = coreapi.CoreEventMessage.create(0, [
(EventTlvs.TYPE, state.value),
(EventTlvs.NODE, node.id),
(EventTlvs.NAME, "service:DefaultRoute"),
])
coreserver.request_handler.handle_message(message)
coreserver.session.broadcast_event.assert_called_once()
@pytest.mark.parametrize("state", [
EventTypes.START,
EventTypes.STOP,
EventTypes.RESTART,
EventTypes.PAUSE,
EventTypes.RECONFIGURE
])
def test_event_mobility(self, coreserver, state):
message = coreapi.CoreEventMessage.create(0, [
(EventTlvs.TYPE, state.value),
(EventTlvs.NAME, "mobility:ns2script"),
])
coreserver.request_handler.handle_message(message)
def test_register_gui(self, coreserver):
coreserver.request_handler.master = False
message = coreapi.CoreRegMessage.create(0, [
(RegisterTlvs.GUI, "gui"),
])
coreserver.request_handler.handle_message(message)
assert coreserver.request_handler.master is True
def test_register_xml(self, coreserver, tmpdir):
xml_file = tmpdir.join("session.xml")
file_path = xml_file.strpath
node = coreserver.session.add_node()
coreserver.session.save_xml(file_path)
coreserver.session.delete_node(node.id)
message = coreapi.CoreRegMessage.create(0, [
(RegisterTlvs.EXECUTE_SERVER, file_path),
])
coreserver.session.instantiate()
coreserver.request_handler.handle_message(message)
assert coreserver.server.coreemu.sessions[2].get_node(node.id)
def test_register_python(self, coreserver, tmpdir):
xml_file = tmpdir.join("test.py")
file_path = xml_file.strpath
with open(file_path, "w") as f:
f.write("coreemu = globals()['coreemu']\n")
f.write("session = coreemu.sessions[1]\n")
f.write("session.add_node()\n")
message = coreapi.CoreRegMessage.create(0, [
(RegisterTlvs.EXECUTE_SERVER, file_path),
])
coreserver.session.instantiate()
coreserver.request_handler.handle_message(message)
assert len(coreserver.session.nodes) == 1
def test_config_all(self, coreserver):
node = coreserver.session.add_node()
message = coreapi.CoreConfMessage.create(MessageFlags.ADD.value, [
(ConfigTlvs.OBJECT, "all"),
(ConfigTlvs.NODE, node.id),
(ConfigTlvs.TYPE, ConfigFlags.RESET.value),
])
coreserver.session.location.reset = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.session.location.reset.assert_called_once()
def test_config_options_request(self, coreserver):
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "session"),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_options_update(self, coreserver):
test_key = "test"
test_value = "test"
values = {
test_key: test_value
}
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "session"),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, dict_to_str(values)),
])
coreserver.request_handler.handle_message(message)
assert coreserver.session.options.get_config(test_key) == test_value
def test_config_location_reset(self, coreserver):
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "location"),
(ConfigTlvs.TYPE, ConfigFlags.RESET.value),
])
coreserver.session.location.refxyz = (10, 10, 10)
coreserver.request_handler.handle_message(message)
assert coreserver.session.location.refxyz == (0, 0, 0)
def test_config_location_update(self, coreserver):
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "location"),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, "10|10|70|50|0|0.5"),
])
coreserver.request_handler.handle_message(message)
assert coreserver.session.location.refxyz == (10, 10, 0.0)
assert coreserver.session.location.refgeo == (70, 50, 0)
assert coreserver.session.location.refscale == 0.5
def test_config_metadata_request(self, coreserver):
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "metadata"),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_metadata_update(self, coreserver):
test_key = "test"
test_value = "test"
values = {
test_key: test_value
}
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "metadata"),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, dict_to_str(values)),
])
coreserver.request_handler.handle_message(message)
assert coreserver.session.metadata.get_config(test_key) == test_value
def test_config_broker_request(self, coreserver):
server = "test"
host = "10.0.0.1"
port = 50000
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "broker"),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, "%s:%s:%s" % (server, host, port)),
])
coreserver.session.broker.addserver = mock.MagicMock()
coreserver.session.broker.setupserver = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.session.broker.addserver.assert_called_once_with(server, host, port)
coreserver.session.broker.setupserver.assert_called_once_with(server)
def test_config_services_request_all(self, coreserver):
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "services"),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_services_request_specific(self, coreserver):
node = coreserver.session.add_node()
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, node.id),
(ConfigTlvs.OBJECT, "services"),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
(ConfigTlvs.OPAQUE, "service:DefaultRoute"),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_services_request_specific_file(self, coreserver):
node = coreserver.session.add_node()
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, node.id),
(ConfigTlvs.OBJECT, "services"),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
(ConfigTlvs.OPAQUE, "service:DefaultRoute:defaultroute.sh"),
])
coreserver.session.broadcast_file = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.session.broadcast_file.assert_called_once()
def test_config_services_reset(self, coreserver):
node = coreserver.session.add_node()
service = "DefaultRoute"
coreserver.session.services.set_service(node.id, service)
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "services"),
(ConfigTlvs.TYPE, ConfigFlags.RESET.value),
])
assert coreserver.session.services.get_service(node.id, service) is not None
coreserver.request_handler.handle_message(message)
assert coreserver.session.services.get_service(node.id, service) is None
def test_config_services_set(self, coreserver):
node = coreserver.session.add_node()
service = "DefaultRoute"
values = {
"meta": "metadata"
}
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, node.id),
(ConfigTlvs.OBJECT, "services"),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.OPAQUE, "service:%s" % service),
(ConfigTlvs.VALUES, dict_to_str(values)),
])
assert coreserver.session.services.get_service(node.id, service) is None
coreserver.request_handler.handle_message(message)
assert coreserver.session.services.get_service(node.id, service) is not None
def test_config_mobility_reset(self, coreserver):
wlan = coreserver.session.add_node(_type=NodeTypes.WIRELESS_LAN)
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "MobilityManager"),
(ConfigTlvs.TYPE, ConfigFlags.RESET.value),
])
coreserver.session.mobility.set_model_config(wlan.id, BasicRangeModel.name, {})
assert len(coreserver.session.mobility.node_configurations) == 1
coreserver.request_handler.handle_message(message)
assert len(coreserver.session.mobility.node_configurations) == 0
def test_config_mobility_model_request(self, coreserver):
wlan = coreserver.session.add_node(_type=NodeTypes.WIRELESS_LAN)
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, wlan.id),
(ConfigTlvs.OBJECT, BasicRangeModel.name),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_mobility_model_update(self, coreserver):
wlan = coreserver.session.add_node(_type=NodeTypes.WIRELESS_LAN)
config_key = "range"
config_value = "1000"
values = {
config_key: config_value
}
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, wlan.id),
(ConfigTlvs.OBJECT, BasicRangeModel.name),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, dict_to_str(values)),
])
coreserver.request_handler.handle_message(message)
config = coreserver.session.mobility.get_model_config(wlan.id, BasicRangeModel.name)
assert config[config_key] == config_value
def test_config_emane_model_request(self, coreserver):
wlan = coreserver.session.add_node(_type=NodeTypes.WIRELESS_LAN)
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, wlan.id),
(ConfigTlvs.OBJECT, EmaneIeee80211abgModel.name),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_emane_model_update(self, coreserver):
wlan = coreserver.session.add_node(_type=NodeTypes.WIRELESS_LAN)
config_key = "distance"
config_value = "50051"
values = {
config_key: config_value
}
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.NODE, wlan.id),
(ConfigTlvs.OBJECT, EmaneIeee80211abgModel.name),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, dict_to_str(values)),
])
coreserver.request_handler.handle_message(message)
config = coreserver.session.emane.get_model_config(wlan.id, EmaneIeee80211abgModel.name)
assert config[config_key] == config_value
def test_config_emane_request(self, coreserver):
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "emane"),
(ConfigTlvs.TYPE, ConfigFlags.REQUEST.value),
])
coreserver.request_handler.handle_broadcast_config = mock.MagicMock()
coreserver.request_handler.handle_message(message)
coreserver.request_handler.handle_broadcast_config.assert_called_once()
def test_config_emane_update(self, coreserver):
config_key = "eventservicedevice"
config_value = "eth4"
values = {
config_key: config_value
}
message = coreapi.CoreConfMessage.create(0, [
(ConfigTlvs.OBJECT, "emane"),
(ConfigTlvs.TYPE, ConfigFlags.UPDATE.value),
(ConfigTlvs.VALUES, dict_to_str(values)),
])
coreserver.request_handler.handle_message(message)
config = coreserver.session.emane.get_configs()
assert config[config_key] == config_value