updated gui test to working order, moved distributed test to a sub directory to be ignored by default, added pytest.ini to ignore distributed directory by default

This commit is contained in:
Blake J. Harnden 2018-04-27 10:12:01 -07:00
parent fe4c21bad4
commit ba3669712a
4 changed files with 19 additions and 60 deletions

2
daemon/tests/pytest.ini Normal file
View file

@ -0,0 +1,2 @@
[pytest]
norecursedirs = distributed

View file

@ -5,7 +5,6 @@ Unit tests for testing basic CORE networks.
import os import os
import stat import stat
import threading import threading
import time
from xml.etree import ElementTree from xml.etree import ElementTree
import pytest import pytest
@ -247,7 +246,7 @@ class TestCore:
node_one.delnetif(0) node_one.delnetif(0)
assert not node_one.netif(0) assert not node_one.netif(0)
def test_wlan_good(self, session, ip_prefixes): def test_wlan_ping(self, session, ip_prefixes):
""" """
Test basic wlan network. Test basic wlan network.
@ -280,46 +279,6 @@ class TestCore:
status = ping(node_one, node_two, ip_prefixes) status = ping(node_one, node_two, ip_prefixes)
assert not status assert not status
def test_wlan_bad(self, session, ip_prefixes):
"""
Test basic wlan network with leveraging basic range model.
:param core.future.coreemu.FutureSession session: session for test
:param ip_prefixes: generates ip addresses for nodes
"""
# create wlan
wlan_node = session.add_node(_type=NodeTypes.WIRELESS_LAN)
session.set_wireless_model(wlan_node, BasicRangeModel)
# create nodes
node_options = NodeOptions()
node_options.set_position(0, 0)
node_one = session.create_wireless_node(node_options=node_options)
node_two = session.create_wireless_node(node_options=node_options)
# link nodes
for node in [node_one, node_two]:
interface = ip_prefixes.create_interface(node)
session.add_link(node.objid, wlan_node.objid, interface_one=interface)
# link nodes in wlan
session.wireless_link_all(wlan_node, [node_one, node_two])
# instantiate session
session.instantiate()
# move node two out of range, default range check is 275
time.sleep(5)
update_options = NodeOptions()
update_options.set_position(500, 500)
session.update_node(node_two.objid, update_options)
# ping n2 from n1 and assert failure
time.sleep(5)
status = ping(node_one, node_two, ip_prefixes)
assert status
def test_mobility(self, session, ip_prefixes): def test_mobility(self, session, ip_prefixes):
""" """
Test basic wlan network. Test basic wlan network.

View file

@ -6,7 +6,7 @@ import threading
from core.api import coreapi, dataconversion from core.api import coreapi, dataconversion
from core.api.coreapi import CoreExecuteTlv from core.api.coreapi import CoreExecuteTlv
from core.enumerations import CORE_API_PORT from core.enumerations import CORE_API_PORT, NodeTypes
from core.enumerations import EventTlvs from core.enumerations import EventTlvs
from core.enumerations import EventTypes from core.enumerations import EventTypes
from core.enumerations import ExecuteTlvs from core.enumerations import ExecuteTlvs
@ -15,7 +15,6 @@ from core.enumerations import LinkTypes
from core.enumerations import MessageFlags from core.enumerations import MessageFlags
from core.enumerations import MessageTypes from core.enumerations import MessageTypes
from core.misc import ipaddress from core.misc import ipaddress
from core.netns.nodes import SwitchNode
def command_message(node, command): def command_message(node, command):
@ -102,11 +101,12 @@ def run_cmd(node, exec_cmd):
class TestGui: class TestGui:
def test_broker(self, core, cored): def test_broker(self, session, cored):
""" """
Test session broker creation. Test session broker creation.
:param conftest.Core core: core fixture to test with :param core.future.coreemu.FutureSession session: session for test
:param cored: cored daemon server to test with
""" """
# set core daemon to run in the background # set core daemon to run in the background
@ -119,54 +119,52 @@ class TestGui:
daemon = "localhost" daemon = "localhost"
# add server # add server
core.session.broker.addserver(daemon, "127.0.0.1", CORE_API_PORT) session.broker.addserver(daemon, "127.0.0.1", CORE_API_PORT)
# setup server # setup server
core.session.broker.setupserver(daemon) session.broker.setupserver(daemon)
# do not want the recvloop running as we will deal ourselves # do not want the recvloop running as we will deal ourselves
core.session.broker.dorecvloop = False session.broker.dorecvloop = False
# have broker handle a configuration state change # have broker handle a configuration state change
core.session.set_state(EventTypes.CONFIGURATION_STATE) session.set_state(EventTypes.CONFIGURATION_STATE)
event_message = state_message(EventTypes.CONFIGURATION_STATE) event_message = state_message(EventTypes.CONFIGURATION_STATE)
core.session.broker.handlerawmsg(event_message) session.broker.handlerawmsg(event_message)
# create a switch node # create a switch node
switch = core.session.add_object(cls=SwitchNode, name="switch", start=False) switch = session.add_node(_type=NodeTypes.SWITCH)
switch.setposition(x=80, y=50) switch.setposition(x=80, y=50)
switch.server = daemon switch.server = daemon
# retrieve switch data representation, create a switch message for broker to handle # retrieve switch data representation, create a switch message for broker to handle
switch_data = switch.data(MessageFlags.ADD.value) switch_data = switch.data(MessageFlags.ADD.value)
switch_message = dataconversion.convert_node(switch_data) switch_message = dataconversion.convert_node(switch_data)
core.session.broker.handlerawmsg(switch_message) session.broker.handlerawmsg(switch_message)
# create node one # create node one
core.create_node("n1") node_one = session.add_node()
node_one = core.get_node("n1")
node_one.server = daemon node_one.server = daemon
# create node two # create node two
core.create_node("n2") node_two = session.add_node()
node_two = core.get_node("n2")
node_two.server = daemon node_two.server = daemon
# create node messages for the broker to handle # create node messages for the broker to handle
for node in [node_one, node_two]: for node in [node_one, node_two]:
node_data = node.data(MessageFlags.ADD.value) node_data = node.data(MessageFlags.ADD.value)
node_message = dataconversion.convert_node(node_data) node_message = dataconversion.convert_node(node_data)
core.session.broker.handlerawmsg(node_message) session.broker.handlerawmsg(node_message)
# create links to switch from nodes for broker to handle # create links to switch from nodes for broker to handle
for index, node in enumerate([node_one, node_two], start=1): for index, node in enumerate([node_one, node_two], start=1):
ip4_address = prefix.addr(index) ip4_address = prefix.addr(index)
link_message = switch_link_message(switch, node, ip4_address, prefix.prefixlen) link_message = switch_link_message(switch, node, ip4_address, prefix.prefixlen)
core.session.broker.handlerawmsg(link_message) session.broker.handlerawmsg(link_message)
# change session to instantiation state # change session to instantiation state
event_message = state_message(EventTypes.INSTANTIATION_STATE) event_message = state_message(EventTypes.INSTANTIATION_STATE)
core.session.broker.handlerawmsg(event_message) session.broker.handlerawmsg(event_message)
# Get the ip or last node and ping it from the first # Get the ip or last node and ping it from the first
output, status = run_cmd(node_one, "ip -4 -o addr show dev eth0") output, status = run_cmd(node_one, "ip -4 -o addr show dev eth0")