beginning to add some tests to help verify handling different tlv messages, added delete link logic for net to node and node to net

This commit is contained in:
bharnden 2019-06-21 23:12:18 -07:00
parent 05c6233908
commit 588a0932d3
3 changed files with 232 additions and 61 deletions

View file

@ -361,6 +361,18 @@ class Session(object):
self.delete_node(net_one.id)
node_one.delnetif(interface_one.netindex)
node_two.delnetif(interface_two.netindex)
elif node_one and net_one:
interface = node_one.netif(interface_one_id)
logging.info("deleting link node(%s):interface(%s) node(%s)",
node_one.name, interface.name, net_one.name)
interface.detachnet()
node_one.delnetif(interface.netindex)
elif node_two and net_one:
interface = node_two.netif(interface_two_id)
logging.info("deleting link node(%s):interface(%s) node(%s)",
node_two.name, interface.name, net_one.name)
interface.detachnet()
node_two.delnetif(interface.netindex)
finally:
if node_one:
node_one.lock.release()

View file

@ -40,6 +40,14 @@ class CoreServerTest(object):
self.session = None
self.request_handler = None
def setup_handler(self):
self.session = self.server.coreemu.create_session(1)
request_mock = MagicMock()
request_mock.fileno = MagicMock(return_value=1)
self.request_handler = CoreHandler(request_mock, "", self.server)
self.request_handler.session = self.session
self.request_handler.add_session_handlers()
def setup(self, distributed_address):
# validate address
assert distributed_address, "distributed server address was not provided"
@ -154,6 +162,20 @@ def cored():
ServiceManager.services.clear()
@pytest.fixture()
def coreserver():
# create and return server
server = CoreServerTest()
server.setup_handler()
yield server
# cleanup
server.shutdown()
# cleanup services
ServiceManager.services.clear()
def ping(from_node, to_node, ip_prefixes, count=3):
address = ip_prefixes.ip4_address(to_node)
return from_node.cmd(["ping", "-c", str(count), address])

View file

@ -2,19 +2,18 @@
Unit tests for testing with a CORE switch.
"""
import threading
import pytest
from core.api.tlv import coreapi, dataconversion
from core.api.tlv import coreapi
from core.api.tlv.coreapi import CoreExecuteTlv
from core.emulator.enumerations import CORE_API_PORT, NodeTypes
from core.emulator.enumerations import EventTlvs
from core.emulator.enumerations import EventTypes
from core.emulator.enumerations import ExecuteTlvs
from core.emulator.enumerations import LinkTlvs
from core.emulator.enumerations import LinkTypes
from core.emulator.enumerations import MessageFlags
from core.emulator.enumerations import MessageTypes
from core.nodes import ipaddress
from core.emulator.enumerations import NodeTypes, NodeTlvs
from core.nodes.ipaddress import Ipv4Prefix
def command_message(node, command):
@ -101,74 +100,212 @@ def run_cmd(node, exec_cmd):
class TestGui:
def test_broker(self, cored):
"""
Test session broker creation.
@pytest.mark.parametrize("node_type, model", [
(NodeTypes.DEFAULT, "PC"),
(NodeTypes.EMANE, None),
(NodeTypes.HUB, None),
(NodeTypes.SWITCH, None),
(NodeTypes.WIRELESS_LAN, None),
(NodeTypes.TUNNEL, None),
(NodeTypes.RJ45, None),
])
def test_node_add(self, coreserver, node_type, model):
node_id = 1
message = coreapi.CoreNodeMessage.create(MessageFlags.ADD.value, [
(NodeTlvs.NUMBER, node_id),
(NodeTlvs.TYPE, node_type.value),
(NodeTlvs.NAME, "n1"),
(NodeTlvs.X_POSITION, 0),
(NodeTlvs.Y_POSITION, 0),
(NodeTlvs.MODEL, model),
])
:param core.emulator.coreemu.EmuSession session: session for test
:param cored: cored daemon server to test with
"""
coreserver.request_handler.handle_message(message)
# set core daemon to run in the background
thread = threading.Thread(target=cored.server.serve_forever)
thread.daemon = True
thread.start()
assert coreserver.session.get_node(node_id) is not None
# ip prefix for nodes
prefix = ipaddress.Ipv4Prefix("10.83.0.0/16")
daemon = "localhost"
def test_node_update(self, coreserver):
node_id = 1
coreserver.session.add_node(_id=node_id)
x = 50
y = 100
message = coreapi.CoreNodeMessage.create(0, [
(NodeTlvs.NUMBER, node_id),
(NodeTlvs.X_POSITION, x),
(NodeTlvs.Y_POSITION, y),
])
# add server
session = cored.server.coreemu.create_session()
session.broker.addserver(daemon, "127.0.0.1", CORE_API_PORT)
coreserver.request_handler.handle_message(message)
# setup server
session.broker.setupserver(daemon)
node = coreserver.session.get_node(node_id)
assert node is not None
assert node.position.x == x
assert node.position.y == y
# do not want the recvloop running as we will deal ourselves
session.broker.dorecvloop = False
def test_node_delete(self, coreserver):
node_id = 1
coreserver.session.add_node(_id=node_id)
message = coreapi.CoreNodeMessage.create(MessageFlags.DELETE.value, [
(NodeTlvs.NUMBER, node_id),
])
# have broker handle a configuration state change
session.set_state(EventTypes.CONFIGURATION_STATE)
event_message = state_message(EventTypes.CONFIGURATION_STATE)
session.broker.handlerawmsg(event_message)
coreserver.request_handler.handle_message(message)
# create a switch node
switch = session.add_node(_type=NodeTypes.SWITCH)
switch.setposition(x=80, y=50)
switch.server = daemon
with pytest.raises(KeyError):
coreserver.session.get_node(node_id)
# retrieve switch data representation, create a switch message for broker to handle
switch_data = switch.data(MessageFlags.ADD.value)
switch_message = dataconversion.convert_node(switch_data)
session.broker.handlerawmsg(switch_message)
def test_link_add(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
coreserver.session.add_link(node_one, switch, interface_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
# create node one
node_one = session.add_node()
node_one.server = daemon
coreserver.request_handler.handle_message(message)
# create node two
node_two = session.add_node()
node_two.server = daemon
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
# create node messages for the broker to handle
for node in [node_one, node_two]:
node_data = node.data(MessageFlags.ADD.value)
node_message = dataconversion.convert_node(node_data)
session.broker.handlerawmsg(node_message)
def test_link_update(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
link = all_links[0]
assert link.bandwidth is None
# create links to switch from nodes for broker to handle
for index, node in enumerate([node_one, node_two], start=1):
ip4_address = prefix.addr(index)
link_message = switch_link_message(switch, node, ip4_address, prefix.prefixlen)
session.broker.handlerawmsg(link_message)
bandwidth = 50000
message = coreapi.CoreLinkMessage.create(0, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.BANDWIDTH, bandwidth),
])
coreserver.request_handler.handle_message(message)
# change session to instantiation state
event_message = state_message(EventTypes.INSTANTIATION_STATE)
session.broker.handlerawmsg(event_message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
link = all_links[0]
assert link.bandwidth == bandwidth
# Get the ip or last node and ping it from the first
output, status = run_cmd(node_one, "ip -4 -o addr show dev eth0")
pingip = output.split()[3].split("/")[0]
output, status = run_cmd(node_two, "ping -c 5 " + pingip)
assert not status
def test_link_delete_node_to_node(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
node_two = 2
coreserver.session.add_node(_id=node_two)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
interface_two = ip_prefix.addr(node_two)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, node_two),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
(LinkTlvs.INTERFACE2_IP4, interface_two),
(LinkTlvs.INTERFACE2_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
all_links = []
for node_id in coreserver.session.nodes:
node = coreserver.session.nodes[node_id]
all_links += node.all_link_data(0)
assert len(all_links) == 1
message = coreapi.CoreLinkMessage.create(MessageFlags.DELETE.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, node_two),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE2_NUMBER, 0),
])
coreserver.request_handler.handle_message(message)
all_links = []
for node_id in coreserver.session.nodes:
node = coreserver.session.nodes[node_id]
all_links += node.all_link_data(0)
assert len(all_links) == 0
def test_link_delete_node_to_net(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
message = coreapi.CoreLinkMessage.create(MessageFlags.DELETE.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 0
def test_link_delete_net_to_node(self, coreserver):
node_one = 1
coreserver.session.add_node(_id=node_one)
switch = 2
coreserver.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
message = coreapi.CoreLinkMessage.create(MessageFlags.ADD.value, [
(LinkTlvs.N1_NUMBER, node_one),
(LinkTlvs.N2_NUMBER, switch),
(LinkTlvs.INTERFACE1_NUMBER, 0),
(LinkTlvs.INTERFACE1_IP4, interface_one),
(LinkTlvs.INTERFACE1_IP4_MASK, 24),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 1
message = coreapi.CoreLinkMessage.create(MessageFlags.DELETE.value, [
(LinkTlvs.N1_NUMBER, switch),
(LinkTlvs.N2_NUMBER, node_one),
(LinkTlvs.INTERFACE2_NUMBER, 0),
])
coreserver.request_handler.handle_message(message)
switch_node = coreserver.session.get_node(switch)
all_links = switch_node.all_link_data(0)
assert len(all_links) == 0