merged cleanup branch with master

This commit is contained in:
Rod A Santiago 2017-06-19 18:03:39 -07:00
parent a4f47a17e3
commit 0a91fe7a3e
28 changed files with 9033 additions and 0 deletions

133
daemon/tests/conftest.py Normal file
View file

@ -0,0 +1,133 @@
"""
Unit test fixture module.
"""
import os
import pytest
from core.session import Session
from core.misc import ipaddress
from core.misc import nodemaps
from core.misc import nodeutils
from core.netns import nodes
class Core(object):
def __init__(self, session, ip_prefix):
self.session = session
self.ip_prefix = ip_prefix
self.current_ip = 1
self.nodes = {}
self.node_ips = {}
def create_node(self, name):
node = self.session.add_object(cls=nodes.CoreNode, name=name)
self.nodes[name] = node
def add_interface(self, network, name):
node_ip = self.ip_prefix.addr(self.current_ip)
self.current_ip += 1
self.node_ips[name] = node_ip
node = self.nodes[name]
interface_id = node.newnetif(network, ["%s/%s" % (node_ip, self.ip_prefix.prefixlen)])
return node.netif(interface_id)
def get_node(self, name):
"""
Retrieve node from current session.
:param str name: name of node to retrieve
:return: core node
:rtype: core.netns.nodes.CoreNode
"""
return self.nodes[name]
def get_ip(self, name):
return self.node_ips[name]
def link(self, network, from_interface, to_interface):
network.link(from_interface, to_interface)
def configure_link(self, network, interface_one, interface_two, values, unidirectional=False):
network.linkconfig(netif=interface_one, netif2=interface_two, **values)
if not unidirectional:
network.linkconfig(netif=interface_two, netif2=interface_one, **values)
def ping(self, from_name, to_name):
from_node = self.nodes[from_name]
to_ip = str(self.get_ip(to_name))
return from_node.cmd(["ping", "-c", "3", to_ip])
def ping_output(self, from_name, to_name):
from_node = self.nodes[from_name]
to_ip = str(self.get_ip(to_name))
vcmd, stdin, stdout, stderr = from_node.popen(["ping", "-i", "0.05", "-c", "3", to_ip])
return stdout.read().strip()
def iping(self, from_name, to_name):
from_node = self.nodes[from_name]
to_ip = str(self.get_ip(to_name))
from_node.icmd(["ping", "-i", "0.01", "-c", "10", to_ip])
def iperf(self, from_name, to_name):
from_node = self.nodes[from_name]
to_node = self.nodes[to_name]
to_ip = str(self.get_ip(to_name))
# run iperf server, run client, kill iperf server
vcmd, stdin, stdout, stderr = to_node.popen(["iperf", "-s", "-u", "-y", "C"])
from_node.cmd(["iperf", "-u", "-t", "5", "-c", to_ip])
to_node.cmd(["killall", "-9", "iperf"])
return stdout.read().strip()
def assert_nodes(self):
for node in self.nodes.itervalues():
assert os.path.exists(node.nodedir)
def create_link_network(self):
# create switch
ptp_node = self.session.add_object(cls=nodes.PtpNet)
# create nodes
self.create_node("n1")
self.create_node("n2")
# add interfaces
interface_one = self.add_interface(ptp_node, "n1")
interface_two = self.add_interface(ptp_node, "n2")
# instantiate session
self.session.instantiate()
# assert node directories created
self.assert_nodes()
return ptp_node, interface_one, interface_two
@pytest.fixture()
def session():
# configure default nodes
node_map = nodemaps.CLASSIC_NODES
nodeutils.set_node_map(node_map)
# create and return session
session_fixture = Session(1, persistent=True)
assert os.path.exists(session_fixture.session_dir)
yield session_fixture
# cleanup
print "shutting down session"
session_fixture.shutdown()
assert not os.path.exists(session_fixture.session_dir)
@pytest.fixture(scope="module")
def ip_prefix():
return ipaddress.Ipv4Prefix("10.83.0.0/16")
@pytest.fixture()
def core(session, ip_prefix):
return Core(session, ip_prefix)

326
daemon/tests/test_core.py Normal file
View file

@ -0,0 +1,326 @@
"""
Unit tests for testing with a CORE switch.
"""
import time
from core.mobility import BasicRangeModel
from core.netns import nodes
from core.phys.pnodes import PhysicalNode
class TestCore:
def test_physical(self, core):
"""
Test physical node network.
:param conftest.Core core: core fixture to test with
"""
# create switch node
switch_node = core.session.add_object(cls=nodes.SwitchNode)
# create a physical node
physical_node = core.session.add_object(cls=PhysicalNode, name="p1")
core.nodes[physical_node.name] = physical_node
# create regular node
core.create_node("n1")
# add interface
core.add_interface(switch_node, "n1")
core.add_interface(switch_node, "p1")
# instantiate session
core.session.instantiate()
# assert node directories created
core.assert_nodes()
# ping n2 from n1 and assert success
status = core.ping("n1", "p1")
assert not status
def test_ptp(self, core):
"""
Test ptp node network.
:param conftest.Core core: core fixture to test with
"""
# create ptp
ptp_node = core.session.add_object(cls=nodes.PtpNet)
# create nodes
core.create_node("n1")
core.create_node("n2")
# add interfaces
core.add_interface(ptp_node, "n1")
core.add_interface(ptp_node, "n2")
# instantiate session
core.session.instantiate()
# assert node directories created
core.assert_nodes()
# ping n2 from n1 and assert success
status = core.ping("n1", "n2")
assert not status
def test_hub(self, core):
"""
Test basic hub network.
:param conftest.Core core: core fixture to test with
"""
# create hub
hub_node = core.session.add_object(cls=nodes.HubNode)
# create nodes
core.create_node("n1")
core.create_node("n2")
# add interfaces
core.add_interface(hub_node, "n1")
core.add_interface(hub_node, "n2")
# instantiate session
core.session.instantiate()
# assert node directories created
core.assert_nodes()
# ping n2 from n1 and assert success
status = core.ping("n1", "n2")
assert not status
def test_switch(self, core):
"""
Test basic switch network.
:param conftest.Core core: core fixture to test with
"""
# create switch
switch_node = core.session.add_object(cls=nodes.SwitchNode)
# create nodes
core.create_node("n1")
core.create_node("n2")
# add interfaces
core.add_interface(switch_node, "n1")
core.add_interface(switch_node, "n2")
# instantiate session
core.session.instantiate()
# assert node directories created
core.assert_nodes()
# ping n2 from n1 and assert success
status = core.ping("n1", "n2")
assert not status
def test_wlan_basic_range_good(self, core):
"""
Test basic wlan network.
:param conftest.Core core: core fixture to test with
"""
# create wlan
wlan_node = core.session.add_object(cls=nodes.WlanNode)
values = BasicRangeModel.getdefaultvalues()
wlan_node.setmodel(BasicRangeModel, values)
# create nodes
core.create_node("n1")
core.create_node("n2")
# add interfaces
interface_one = core.add_interface(wlan_node, "n1")
interface_two = core.add_interface(wlan_node, "n2")
# link nodes in wlan
core.link(wlan_node, interface_one, interface_two)
# mark node position as together
core.get_node("n1").setposition(0, 0)
core.get_node("n2").setposition(0, 0)
# instantiate session
core.session.instantiate()
# assert node directories created
core.assert_nodes()
# ping n2 from n1 and assert success
status = core.ping("n1", "n2")
assert not status
def test_wlan_basic_range_bad(self, core):
"""
Test basic wlan network with leveraging basic range model.
:param conftest.Core core: core fixture to test with
"""
# create wlan
wlan_node = core.session.add_object(cls=nodes.WlanNode)
values = BasicRangeModel.getdefaultvalues()
wlan_node.setmodel(BasicRangeModel, values)
# create nodes
core.create_node("n1")
core.create_node("n2")
# add interfaces
interface_one = core.add_interface(wlan_node, "n1")
interface_two = core.add_interface(wlan_node, "n2")
# link nodes in wlan
core.link(wlan_node, interface_one, interface_two)
# move nodes out of range, default range check is 275
core.get_node("n1").setposition(0, 0)
core.get_node("n2").setposition(500, 500)
# instantiate session
core.session.instantiate()
# assert node directories created
core.assert_nodes()
# ping n2 from n1 and assert failure
time.sleep(1)
status = core.ping("n1", "n2")
assert status
def test_link_bandwidth(self, core):
"""
Test ptp node network with modifying link bandwidth.
:param conftest.Core core: core fixture to test with
"""
# create link network
ptp_node, interface_one, interface_two = core.create_link_network()
# output csv index
bandwidth_index = 8
# run iperf, validate normal bandwidth
stdout = core.iperf("n1", "n2")
assert stdout
value = int(stdout.split(',')[bandwidth_index])
assert 900000 <= value <= 1100000
# change bandwidth in bits per second
bandwidth = 500000
core.configure_link(ptp_node, interface_one, interface_two, {
"bw": bandwidth
})
# run iperf again
stdout = core.iperf("n1", "n2")
assert stdout
value = int(stdout.split(',')[bandwidth_index])
assert 400000 <= value <= 600000
def test_link_loss(self, core):
"""
Test ptp node network with modifying link packet loss.
:param conftest.Core core: core fixture to test with
"""
# create link network
ptp_node, interface_one, interface_two = core.create_link_network()
# output csv index
loss_index = -2
# run iperf, validate normal bandwidth
stdout = core.iperf("n1", "n2")
assert stdout
value = float(stdout.split(',')[loss_index])
assert 0 <= value <= 0.5
# change bandwidth in bits per second
loss = 50
core.configure_link(ptp_node, interface_one, interface_two, {
"loss": loss
})
# run iperf again
stdout = core.iperf("n1", "n2")
assert stdout
value = float(stdout.split(',')[loss_index])
assert 45 <= value <= 55
def test_link_delay(self, core):
"""
Test ptp node network with modifying link packet delay.
:param conftest.Core core: core fixture to test with
"""
# create link network
ptp_node, interface_one, interface_two = core.create_link_network()
# run ping for delay information
stdout = core.ping_output("n1", "n2")
assert stdout
rtt_line = stdout.split("\n")[-1]
rtt_values = rtt_line.split("=")[1].split("ms")[0].strip()
rtt_avg = float(rtt_values.split("/")[2])
assert 0 <= rtt_avg <= 0.1
# change delay in microseconds
delay = 1000000
core.configure_link(ptp_node, interface_one, interface_two, {
"delay": delay
})
# run ping for delay information again
stdout = core.ping_output("n1", "n2")
assert stdout
rtt_line = stdout.split("\n")[-1]
rtt_values = rtt_line.split("=")[1].split("ms")[0].strip()
rtt_avg = float(rtt_values.split("/")[2])
assert 1800 <= rtt_avg <= 2200
def test_link_jitter(self, core):
"""
Test ptp node network with modifying link packet jitter.
:param conftest.Core core: core fixture to test with
"""
# create link network
ptp_node, interface_one, interface_two = core.create_link_network()
# output csv index
jitter_index = 9
# run iperf
stdout = core.iperf("n1", "n2")
assert stdout
value = float(stdout.split(",")[jitter_index])
assert -0.5 <= value <= 0.05
# change jitter in microseconds
jitter = 1000000
core.configure_link(ptp_node, interface_one, interface_two, {
"jitter": jitter
})
# run iperf again
stdout = core.iperf("n1", "n2")
assert stdout
value = float(stdout.split(",")[jitter_index])
assert 200 <= value <= 500

120
daemon/tests/test_gui.py Normal file
View file

@ -0,0 +1,120 @@
"""
Unit tests for testing with a CORE switch.
"""
from core.api import coreapi, dataconversion
from core.api.coreapi import CoreExecuteTlv
from core.enumerations import CORE_API_PORT, EventTypes, EventTlvs, MessageFlags, LinkTlvs, LinkTypes, ExecuteTlvs, \
MessageTypes
from core.misc import ipaddress
from core.netns.nodes import SwitchNode, CoreNode
def cmd(node, exec_cmd):
"""
Convenience method for sending commands to a node using the legacy API.
:param node: The node the command should be issued too
:param exec_cmd: A string with the command to be run
:return: Returns the result of the command
"""
# Set up the command api message
tlv_data = CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, node.objid)
tlv_data += CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, 1)
tlv_data += CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, exec_cmd)
message = coreapi.CoreExecMessage.pack(MessageFlags.STRING.value | MessageFlags.TEXT.value, tlv_data)
node.session.broker.handlerawmsg(message)
# Now wait for the response
server = node.session.broker.servers["localhost"]
server.sock.settimeout(50.0)
# receive messages until we get our execute response
result = None
while True:
message_header = server.sock.recv(coreapi.CoreMessage.header_len)
message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(message_header)
message_data = server.sock.recv(message_length)
# If we get the right response return the results
print "received response message: %s" % MessageTypes(message_type)
if message_type == MessageTypes.EXECUTE.value:
message = coreapi.CoreExecMessage(message_flags, message_header, message_data)
result = message.get_tlv(ExecuteTlvs.RESULT.value)
break
return result
class TestGui:
def test_broker(self, core):
"""
Test session broker creation.
:param conftest.Core core: core fixture to test with
"""
prefix = ipaddress.Ipv4Prefix("10.83.0.0/16")
daemon = "localhost"
# add server
core.session.broker.addserver(daemon, "127.0.0.1", CORE_API_PORT)
# setup server
core.session.broker.setupserver(daemon)
# do not want the recvloop running as we will deal ourselves
core.session.broker.dorecvloop = False
# have broker handle a configuration state change
core.session.set_state(EventTypes.CONFIGURATION_STATE.value)
tlv_data = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.CONFIGURATION_STATE.value)
raw_event_message = coreapi.CoreEventMessage.pack(0, tlv_data)
core.session.broker.handlerawmsg(raw_event_message)
# create a switch node
switch = core.session.add_object(cls=SwitchNode, name="switch", start=False)
switch.setposition(x=80, y=50)
switch.server = daemon
# retrieve switch data representation, create a switch message for broker to handle
switch_data = switch.data(MessageFlags.ADD.value)
switch_message = dataconversion.convert_node(switch_data)
core.session.broker.handlerawmsg(switch_message)
# create node one
core.create_node("n1")
node_one = core.get_node("n1")
node_one.server = daemon
# create node two
core.create_node("n2")
node_two = core.get_node("n2")
node_two.server = daemon
# create node messages for the broker to handle
for node in [node_one, node_two]:
node_data = node.data(MessageFlags.ADD.value)
node_message = dataconversion.convert_node(node_data)
core.session.broker.handlerawmsg(node_message)
# create links to switch from nodes for broker to handle
for index, node in enumerate([node_one, node_two], start=1):
tlv_data = coreapi.CoreLinkTlv.pack(LinkTlvs.N1_NUMBER.value, switch.objid)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.N2_NUMBER.value, node.objid)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.TYPE.value, LinkTypes.WIRED.value)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.INTERFACE2_NUMBER.value, 0)
ip4_address = prefix.addr(index)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.INTERFACE2_IP4.value, ip4_address)
tlv_data += coreapi.CoreLinkTlv.pack(LinkTlvs.INTERFACE2_IP4_MASK.value, prefix.prefixlen)
raw_link_message = coreapi.CoreLinkMessage.pack(MessageFlags.ADD.value, tlv_data)
core.session.broker.handlerawmsg(raw_link_message)
# change session to instantiation state
tlv_data = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.INSTANTIATION_STATE.value)
raw_event_message = coreapi.CoreEventMessage.pack(0, tlv_data)
core.session.broker.handlerawmsg(raw_event_message)
# Get the ip or last node and ping it from the first
print "pinging from the first to the last node"
pingip = cmd(node_one, "ip -4 -o addr show dev eth0").split()[3].split("/")[0]
print cmd(node_two, "ping -c 5 " + pingip)