From 8644e9d61e9c3992646cc55a92e4c482369e7b74 Mon Sep 17 00:00:00 2001 From: "Blake J. Harnden" Date: Wed, 25 Apr 2018 16:33:58 -0700 Subject: [PATCH] moved future core server and handler code to act as the default core-daemon, updated future examples and tests to leverage new api --- daemon/core/corehandlers.py | 1534 ----------------- daemon/core/coreserver.py | 263 --- daemon/core/future/coreemu.py | 67 +- daemon/core/legacy/__init__.py | 0 .../corehandler.py} | 52 +- .../futureserver.py => legacy/coreserver.py} | 14 +- daemon/core/session.py | 23 +- daemon/core/xml/xmlparser0.py | 2 +- daemon/examples/future/emane80211_api.py | 4 +- daemon/examples/future/switch_api.py | 2 +- daemon/examples/future/switch_api_inject.py | 2 +- daemon/examples/future/wlan_api.py | 6 +- daemon/examples/netns/daemonnodes.py | 2 +- daemon/examples/netns/distributed.py | 2 +- daemon/scripts/core-daemon | 40 +- daemon/scripts/core-future | 145 -- daemon/tests/conftest.py | 165 +- daemon/tests/test_core.py | 433 ++--- daemon/tests/test_emane.py | 44 +- daemon/tests/test_future.py | 201 --- daemon/tests/test_gui.py | 2 +- daemon/tests/test_links.py | 260 +++ daemon/tests/test_nodes.py | 79 + ns3/corens3/obj.py | 4 +- 24 files changed, 618 insertions(+), 2728 deletions(-) delete mode 100644 daemon/core/corehandlers.py delete mode 100644 daemon/core/coreserver.py create mode 100644 daemon/core/legacy/__init__.py rename daemon/core/{future/futurehandler.py => legacy/corehandler.py} (96%) rename daemon/core/{future/futureserver.py => legacy/coreserver.py} (64%) mode change 100755 => 100644 daemon/scripts/core-daemon delete mode 100644 daemon/scripts/core-future delete mode 100644 daemon/tests/test_future.py create mode 100644 daemon/tests/test_links.py create mode 100644 daemon/tests/test_nodes.py diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py deleted file mode 100644 index 5e78a39f..00000000 --- a/daemon/core/corehandlers.py +++ /dev/null @@ -1,1534 +0,0 @@ -""" -socket server request handlers leveraged by core servers. -""" - -import Queue -import SocketServer -import os -import shlex -import shutil -import sys -import threading -import time - -from core import coreobj -from core import logger -from core.api import coreapi -from core.coreserver import CoreServer -from core.data import ConfigData -from core.data import EventData -from core.data import NodeData -from core.enumerations import ConfigTlvs -from core.enumerations import EventTlvs -from core.enumerations import EventTypes -from core.enumerations import ExceptionTlvs -from core.enumerations import ExecuteTlvs -from core.enumerations import FileTlvs -from core.enumerations import LinkTlvs -from core.enumerations import LinkTypes -from core.enumerations import MessageFlags -from core.enumerations import MessageTypes -from core.enumerations import NodeTlvs -from core.enumerations import NodeTypes -from core.enumerations import RegisterTlvs -from core.enumerations import SessionTlvs -from core.misc import nodeutils -from core.misc import structutils -from core.misc import utils -from core.netns import nodes -from core.xml.xmlsession import open_session_xml -from core.xml.xmlsession import save_session_xml - - -class CoreRequestHandler(SocketServer.BaseRequestHandler): - """ - The SocketServer class uses the RequestHandler class for servicing requests. - """ - - def __init__(self, request, client_address, server): - """ - Create a CoreRequestHandler instance. - - :param request: request object - :param str client_address: client address - :param CoreServer server: core server instance - :return: - """ - self.done = False - self.message_handlers = { - MessageTypes.NODE.value: self.handle_node_message, - MessageTypes.LINK.value: self.handle_link_message, - MessageTypes.EXECUTE.value: self.handle_execute_message, - MessageTypes.REGISTER.value: self.handle_register_message, - MessageTypes.CONFIG.value: self.handle_config_message, - MessageTypes.FILE.value: self.handle_file_message, - MessageTypes.INTERFACE.value: self.handle_interface_message, - MessageTypes.EVENT.value: self.handle_event_message, - MessageTypes.SESSION.value: self.handle_session_message, - } - self.message_queue = Queue.Queue() - self.node_status_request = {} - self._shutdown_lock = threading.Lock() - - self.handler_threads = [] - num_threads = int(server.config["numthreads"]) - if num_threads < 1: - raise ValueError("invalid number of threads: %s" % num_threads) - - logger.info("launching core server handler threads: %s", num_threads) - for _ in xrange(num_threads): - thread = threading.Thread(target=self.handler_thread) - self.handler_threads.append(thread) - thread.start() - - self.master = False - self.session = None - - utils.close_onexec(request.fileno()) - SocketServer.BaseRequestHandler.__init__(self, request, client_address, server) - - def setup(self): - """ - Client has connected, set up a new connection. - - :return: nothing - """ - logger.info("new TCP connection: %s", self.client_address) - - def finish(self): - """ - Client has disconnected, end this request handler and disconnect - from the session. Shutdown sessions that are not running. - - :return: nothing - """ - logger.info("finishing request handler") - self.done = True - - logger.info("remaining message queue size: %s", self.message_queue.qsize()) - # seconds - timeout = 10.0 - logger.info("client disconnected: notifying threads") - for thread in self.handler_threads: - logger.info("waiting for thread: %s", thread.getName()) - thread.join(timeout) - if thread.isAlive(): - logger.warn("joining %s failed: still alive after %s sec", thread.getName(), timeout) - - logger.info("connection closed: %s", self.client_address) - if self.session: - self.remove_session_handlers() - - # remove client from session broker and shutdown if there are no clients - self.session.broker.session_clients.remove(self) - if not self.session.broker.session_clients: - logger.info("no session clients left, initiating shutdown") - self.session.shutdown() - - return SocketServer.BaseRequestHandler.finish(self) - - def handle_broadcast_event(self, event_data): - """ - Callback to handle an event broadcast out from a session. - - :param core.data.EventData event_data: event data to handle - :return: nothing - """ - logger.info("handling broadcast event: %s", event_data) - - tlv_data = structutils.pack_values(coreapi.CoreEventTlv, [ - (EventTlvs.NODE, event_data.node), - (EventTlvs.TYPE, event_data.event_type), - (EventTlvs.NAME, event_data.name), - (EventTlvs.DATA, event_data.data), - (EventTlvs.TIME, event_data.time), - (EventTlvs.TIME, event_data.session) - ]) - message = coreapi.CoreEventMessage.pack(0, tlv_data) - - try: - self.sendall(message) - except IOError: - logger.exception("error sending event message") - - def handle_broadcast_file(self, file_data): - """ - Callback to handle a file broadcast out from a session. - - :param core.data.FileData file_data: file data to handle - :return: nothing - """ - logger.info("handling broadcast file: %s", file_data) - - tlv_data = structutils.pack_values(coreapi.CoreFileTlv, [ - (FileTlvs.NODE, file_data.node), - (FileTlvs.NAME, file_data.name), - (FileTlvs.MODE, file_data.mode), - (FileTlvs.NUMBER, file_data.number), - (FileTlvs.TYPE, file_data.type), - (FileTlvs.SOURCE_NAME, file_data.source), - (FileTlvs.SESSION, file_data.session), - (FileTlvs.DATA, file_data.data), - (FileTlvs.COMPRESSED_DATA, file_data.compressed_data), - ]) - message = coreapi.CoreFileMessage.pack(file_data.message_type, tlv_data) - - try: - self.sendall(message) - except IOError: - logger.exception("error sending file message") - - def handle_broadcast_config(self, config_data): - """ - Callback to handle a config broadcast out from a session. - - :param core.data.ConfigData config_data: config data to handle - :return: nothing - """ - logger.info("handling broadcast config: %s", config_data) - - tlv_data = structutils.pack_values(coreapi.CoreConfigTlv, [ - (ConfigTlvs.NODE, config_data.node), - (ConfigTlvs.OBJECT, config_data.object), - (ConfigTlvs.TYPE, config_data.type), - (ConfigTlvs.DATA_TYPES, config_data.data_types), - (ConfigTlvs.VALUES, config_data.data_values), - (ConfigTlvs.CAPTIONS, config_data.captions), - (ConfigTlvs.BITMAP, config_data.bitmap), - (ConfigTlvs.POSSIBLE_VALUES, config_data.possible_values), - (ConfigTlvs.GROUPS, config_data.groups), - (ConfigTlvs.SESSION, config_data.session), - (ConfigTlvs.INTERFACE_NUMBER, config_data.interface_number), - (ConfigTlvs.NETWORK_ID, config_data.network_id), - (ConfigTlvs.OPAQUE, config_data.opaque), - ]) - message = coreapi.CoreConfMessage.pack(config_data.message_type, tlv_data) - - try: - self.sendall(message) - except IOError: - logger.exception("error sending config message") - - def handle_broadcast_exception(self, exception_data): - """ - Callback to handle an exception broadcast out from a session. - - :param core.data.ExceptionData exception_data: exception data to handle - :return: nothing - """ - logger.info("handling broadcast exception: %s", exception_data) - tlv_data = structutils.pack_values(coreapi.CoreExceptionTlv, [ - (ExceptionTlvs.NODE, exception_data.node), - (ExceptionTlvs.SESSION, exception_data.session), - (ExceptionTlvs.LEVEL, exception_data.level), - (ExceptionTlvs.SOURCE, exception_data.source), - (ExceptionTlvs.DATE, exception_data.date), - (ExceptionTlvs.TEXT, exception_data.text) - ]) - message = coreapi.CoreExceptionMessage.pack(0, tlv_data) - - try: - self.sendall(message) - except IOError: - logger.exception("error sending exception message") - - def handle_broadcast_node(self, node_data): - """ - Callback to handle an node broadcast out from a session. - - :param core.data.NodeData node_data: node data to handle - :return: nothing - """ - logger.info("handling broadcast node: %s", node_data) - - tlv_data = structutils.pack_values(coreapi.CoreNodeTlv, [ - (NodeTlvs.NUMBER, node_data.id), - (NodeTlvs.TYPE, node_data.node_type), - (NodeTlvs.NAME, node_data.name), - (NodeTlvs.IP_ADDRESS, node_data.ip_address), - (NodeTlvs.MAC_ADDRESS, node_data.mac_address), - (NodeTlvs.IP6_ADDRESS, node_data.ip6_address), - (NodeTlvs.MODEL, node_data.model), - (NodeTlvs.EMULATION_ID, node_data.emulation_id), - (NodeTlvs.EMULATION_SERVER, node_data.emulation_server), - (NodeTlvs.SESSION, node_data.session), - (NodeTlvs.X_POSITION, node_data.x_position), - (NodeTlvs.Y_POSITION, node_data.y_position), - (NodeTlvs.CANVAS, node_data.canvas), - (NodeTlvs.NETWORK_ID, node_data.network_id), - (NodeTlvs.SERVICES, node_data.services), - (NodeTlvs.LATITUDE, node_data.latitude), - (NodeTlvs.LONGITUDE, node_data.longitude), - (NodeTlvs.ALTITUDE, node_data.altitude), - (NodeTlvs.ICON, node_data.icon), - (NodeTlvs.OPAQUE, node_data.opaque) - ]) - message = coreapi.CoreNodeMessage.pack(node_data.message_type, tlv_data) - - try: - self.sendall(message) - except IOError: - logger.exception("error sending node message") - - def handle_broadcast_link(self, link_data): - """ - Callback to handle an link broadcast out from a session. - - :param core.data.LinkData link_data: link data to handle - :return: nothing - """ - logger.info("handling broadcast link: %s", link_data) - - tlv_data = structutils.pack_values(coreapi.CoreLinkTlv, [ - (LinkTlvs.N1_NUMBER, link_data.node1_id), - (LinkTlvs.N2_NUMBER, link_data.node2_id), - (LinkTlvs.DELAY, link_data.delay), - (LinkTlvs.BANDWIDTH, link_data.bandwidth), - (LinkTlvs.PER, link_data.per), - (LinkTlvs.DUP, link_data.dup), - (LinkTlvs.JITTER, link_data.jitter), - (LinkTlvs.MER, link_data.mer), - (LinkTlvs.BURST, link_data.burst), - (LinkTlvs.SESSION, link_data.session), - (LinkTlvs.MBURST, link_data.mburst), - (LinkTlvs.TYPE, link_data.link_type), - (LinkTlvs.GUI_ATTRIBUTES, link_data.gui_attributes), - (LinkTlvs.UNIDIRECTIONAL, link_data.unidirectional), - (LinkTlvs.EMULATION_ID, link_data.emulation_id), - (LinkTlvs.NETWORK_ID, link_data.network_id), - (LinkTlvs.KEY, link_data.key), - (LinkTlvs.INTERFACE1_NUMBER, link_data.interface1_id), - (LinkTlvs.INTERFACE1_NAME, link_data.interface1_name), - (LinkTlvs.INTERFACE1_IP4, link_data.interface1_ip4), - (LinkTlvs.INTERFACE1_IP4_MASK, link_data.interface1_ip4_mask), - (LinkTlvs.INTERFACE1_MAC, link_data.interface1_mac), - (LinkTlvs.INTERFACE1_IP6, link_data.interface1_ip6), - (LinkTlvs.INTERFACE1_IP6_MASK, link_data.interface1_ip6_mask), - (LinkTlvs.INTERFACE2_NUMBER, link_data.interface2_id), - (LinkTlvs.INTERFACE2_NAME, link_data.interface2_name), - (LinkTlvs.INTERFACE2_IP4, link_data.interface2_ip4), - (LinkTlvs.INTERFACE2_IP4_MASK, link_data.interface2_ip4_mask), - (LinkTlvs.INTERFACE2_MAC, link_data.interface2_mac), - (LinkTlvs.INTERFACE2_IP6, link_data.interface2_ip6), - (LinkTlvs.INTERFACE2_IP6_MASK, link_data.interface2_ip6_mask), - (LinkTlvs.OPAQUE, link_data.opaque) - ]) - - message = coreapi.CoreLinkMessage.pack(link_data.message_type, tlv_data) - - try: - self.sendall(message) - except IOError: - logger.exception("error sending Event Message") - - def register(self): - """ - Return a Register Message - - :return: register message data - """ - logger.info("GUI has connected to session %d at %s", self.session.session_id, time.ctime()) - - tlv_data = "" - tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, "core-daemon") - tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EMULATION_SERVER.value, "core-daemon") - - # get config objects for session - for name in self.session.config_objects: - config_type, callback = self.session.config_objects[name] - # type must be in coreapi.reg_tlvs - tlv_data += coreapi.CoreRegisterTlv.pack(config_type, name) - - return coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlv_data) - - def sendall(self, data): - """ - Send raw data to the other end of this TCP connection - using socket"s sendall(). - - :param data: data to send over request socket - :return: data sent - """ - return self.request.sendall(data) - - def receive_message(self): - """ - Receive data and return a CORE API message object. - - :return: received message - :rtype: coreapi.CoreMessage - """ - try: - header = self.request.recv(coreapi.CoreMessage.header_len) - except IOError as e: - raise IOError("error receiving header (%s)" % e) - - if len(header) != coreapi.CoreMessage.header_len: - if len(header) == 0: - raise EOFError("client disconnected") - else: - raise IOError("invalid message header size") - - message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header) - if message_len == 0: - logger.warn("received message with no data") - - data = "" - while len(data) < message_len: - data += self.request.recv(message_len - len(data)) - if len(data) > message_len: - error_message = "received message length does not match received data (%s != %s)" % ( - len(data), message_len) - logger.error(error_message) - raise IOError(error_message) - - try: - message_class = coreapi.CLASS_MAP[message_type] - message = message_class(message_flags, header, data) - except KeyError: - message = coreapi.CoreMessage(message_flags, header, data) - message.message_type = message_type - logger.exception("unimplemented core message type: %s", message.type_str()) - - return message - - def queue_message(self, message): - """ - Queue an API message for later processing. - - :param message: message to queue - :return: nothing - """ - logger.info("queueing msg (queuedtimes = %s): type %s", - message.queuedtimes, MessageTypes(message.message_type)) - self.message_queue.put(message) - - def handler_thread(self): - """ - CORE API message handling loop that is spawned for each server - thread; get CORE API messages from the incoming message queue, - and call handlemsg() for processing. - - :return: nothing - """ - while not self.done: - message = self.message_queue.get() - self.handle_message(message) - - def handle_message(self, message): - """ - Handle an incoming message; dispatch based on message type, - optionally sending replies. - - :param message: message to handle - :return: nothing - """ - if self.session and self.session.broker.handle_message(message): - logger.info("message not being handled locally") - return - - logger.info("%s handling message:\n%s", threading.currentThread().getName(), message) - - if message.message_type not in self.message_handlers: - logger.warn("no handler for message type: %s", message.type_str()) - return - - message_handler = self.message_handlers[message.message_type] - - try: - # TODO: this needs to be removed, make use of the broadcast message methods - replies = message_handler(message) - self.dispatch_replies(replies, message) - except: - logger.exception("%s: exception while handling message: %s", - threading.currentThread().getName(), message) - - def dispatch_replies(self, replies, message): - """ - Dispatch replies by CORE to message msg previously received from the client. - - :param list replies: reply messages to dispatch - :param message: message for replies - :return: nothing - """ - logger.info("dispatching replies") - for reply in replies: - message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(reply) - try: - reply_message = coreapi.CLASS_MAP[message_type]( - message_flags, - reply[:coreapi.CoreMessage.header_len], - reply[coreapi.CoreMessage.header_len:] - ) - except KeyError: - # multiple TLVs of same type cause KeyError exception - reply_message = "CoreMessage (type %d flags %d length %d)" % ( - message_type, message_flags, message_length) - - logger.info("dispatch reply:\n%s", reply_message) - - try: - self.sendall(reply) - except IOError: - logger.exception("error dispatching reply") - - def handle(self): - """ - Handle a new connection request from a client. Dispatch to the - recvmsg() method for receiving data into CORE API messages, and - add them to an incoming message queue. - - :return: nothing - """ - # use port as session id - port = self.request.getpeername()[1] - - logger.info("creating new session for client: %s", port) - self.session = self.server.create_session(session_id=port) - - # TODO: hack to associate this handler with this sessions broker for broadcasting - # TODO: broker needs to be pulled out of session to the server/handler level - if self.master: - logger.info("session set to master") - self.session.master = True - self.session.broker.session_clients.append(self) - - # add handlers for various data - logger.info("adding session broadcast handlers") - self.add_session_handlers() - - # set initial session state - self.session.set_state(state=EventTypes.DEFINITION_STATE.value) - - while True: - try: - message = self.receive_message() - except EOFError: - logger.info("client disconnected") - break - except IOError: - logger.exception("error receiving message") - break - - message.queuedtimes = 0 - self.queue_message(message) - - # delay is required for brief connections, allow session joining - if message.message_type == MessageTypes.SESSION.value: - time.sleep(0.125) - - # broadcast node/link messages to other connected clients - if message.message_type not in [MessageTypes.NODE.value, MessageTypes.LINK.value]: - continue - - for client in self.session.broker.session_clients: - if client == self: - continue - - logger.info("BROADCAST TO OTHER CLIENT: %s", client) - client.sendall(message.raw_message) - - def add_session_handlers(self): - logger.info("adding session broadcast handlers") - self.session.event_handlers.append(self.handle_broadcast_event) - self.session.exception_handlers.append(self.handle_broadcast_exception) - self.session.node_handlers.append(self.handle_broadcast_node) - self.session.link_handlers.append(self.handle_broadcast_link) - self.session.file_handlers.append(self.handle_broadcast_file) - self.session.config_handlers.append(self.handle_broadcast_config) - - def remove_session_handlers(self): - logger.info("removing session broadcast handlers") - self.session.event_handlers.remove(self.handle_broadcast_event) - self.session.exception_handlers.remove(self.handle_broadcast_exception) - self.session.node_handlers.remove(self.handle_broadcast_node) - self.session.link_handlers.remove(self.handle_broadcast_link) - self.session.file_handlers.remove(self.handle_broadcast_file) - self.session.config_handlers.remove(self.handle_broadcast_config) - - def handle_node_message(self, message): - """ - Node Message handler - - :param coreapi.CoreNodeMessage message: node message - :return: replies to node message - """ - replies = [] - if message.flags & MessageFlags.ADD.value and message.flags & MessageFlags.DELETE.value: - logger.warn("ignoring invalid message: add and delete flag both set") - return () - - node_id = message.tlv_data[NodeTlvs.NUMBER.value] - x_position = message.get_tlv(NodeTlvs.X_POSITION.value) - y_position = message.get_tlv(NodeTlvs.Y_POSITION.value) - canvas = message.get_tlv(NodeTlvs.CANVAS.value) - icon = message.get_tlv(NodeTlvs.ICON.value) - lat = message.get_tlv(NodeTlvs.LATITUDE.value) - lng = message.get_tlv(NodeTlvs.LONGITUDE.value) - alt = message.get_tlv(NodeTlvs.ALTITUDE.value) - - if x_position is None and y_position is None and \ - lat is not None and lng is not None and alt is not None: - x, y, z = self.session.location.getxyz(float(lat), float(lng), float(alt)) - x_position = int(x) - y_position = int(y) - - # GUI can"t handle lat/long, so generate another X/Y position message - node_data = NodeData( - id=node_id, - x_position=x_position, - y_position=y_position - ) - - self.session.broadcast_node(node_data) - - if message.flags & MessageFlags.ADD.value: - node_type = message.tlv_data[NodeTlvs.TYPE.value] - try: - node_class = nodeutils.get_node_class(NodeTypes(node_type)) - except KeyError: - try: - node_type_str = " (%s)" % NodeTypes(node_type).name - except KeyError: - node_type_str = "" - - logger.warn("warning: unimplemented node type: %s%s" % (node_type, node_type_str)) - return () - - start = False - if self.session.state > EventTypes.DEFINITION_STATE.value: - start = True - - node_name = message.tlv_data[NodeTlvs.NAME.value] - model = message.get_tlv(NodeTlvs.MODEL.value) - class_args = {"start": start} - - if node_type == NodeTypes.RJ45.value and hasattr( - self.session.options, "enablerj45") and self.session.options.enablerj45 == "0": - class_args["start"] = False - - # this instantiates an object of class nodecls, creating the node or network - node = self.session.add_object(cls=node_class, objid=node_id, name=node_name, **class_args) - if x_position is not None and y_position is not None: - node.setposition(x_position, y_position, None) - if canvas is not None: - node.canvas = canvas - if icon is not None: - node.icon = icon - opaque = message.get_tlv(NodeTlvs.OPAQUE.value) - if opaque is not None: - node.opaque = opaque - - # add services to a node, either from its services TLV or - # through the configured defaults for this node type - if node_type in [NodeTypes.DEFAULT.value, NodeTypes.PHYSICAL.value]: - if model is None: - # TODO: default model from conf file? - model = "router" - node.type = model - services_str = message.get_tlv(NodeTlvs.SERVICES.value) - logger.info("setting model (%s) with services (%s)", model, services_str) - self.session.services.addservicestonode(node, model, services_str) - - # boot nodes if they are added after runtime (like - # session.bootnodes()) - if self.session.state == EventTypes.RUNTIME_STATE.value: - if isinstance(node, nodes.PyCoreNode) and not nodeutils.is_node(node, NodeTypes.RJ45): - self.session.write_objects() - self.session.add_remove_control_interface(node=node, remove=False) - node.boot() - - if message.flags & MessageFlags.STRING.value: - self.node_status_request[node_id] = True - self.send_node_emulation_id(node_id) - elif message.flags & MessageFlags.STRING.value: - self.node_status_request[node_id] = True - - elif message.flags & MessageFlags.DELETE.value: - with self._shutdown_lock: - self.session.delete_object(node_id) - - if message.flags & MessageFlags.STRING.value: - tlvdata = "" - tlvdata += coreapi.CoreNodeTlv.pack(NodeTlvs.NUMBER.value, node_id) - flags = MessageFlags.DELETE.value | MessageFlags.LOCAL.value - replies.append(coreapi.CoreNodeMessage.pack(flags, tlvdata)) - - if self.session.check_shutdown(): - tlvdata = "" - tlvdata += coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, self.session.state) - replies.append(coreapi.CoreEventMessage.pack(0, tlvdata)) - # Node modify message (no add/del flag) - else: - try: - node = self.session.get_object(node_id) - - if x_position is not None and y_position is not None: - node.setposition(x_position, y_position, None) - - if canvas is not None: - node.canvas = canvas - - if icon is not None: - node.icon = icon - except KeyError: - logger.exception("ignoring node message: unknown node number %s", node_id) - - return replies - - def handle_link_message(self, message): - """ - Link Message handler - - :param coreapi.CoreLinkMessage message: link message to handle - :return: link message replies - """ - # get node classes - ptp_class = nodeutils.get_node_class(NodeTypes.PEER_TO_PEER) - - node_num1 = message.get_tlv(LinkTlvs.N1_NUMBER.value) - interface_index1 = message.get_tlv(LinkTlvs.INTERFACE1_NUMBER.value) - ipv41 = message.get_tlv(LinkTlvs.INTERFACE1_IP4.value) - ipv4_mask1 = message.get_tlv(LinkTlvs.INTERFACE1_IP4_MASK.value) - mac1 = message.get_tlv(LinkTlvs.INTERFACE1_MAC.value) - ipv61 = message.get_tlv(LinkTlvs.INTERFACE1_IP6.value) - ipv6_mask1 = message.get_tlv(LinkTlvs.INTERFACE1_IP6_MASK.value) - interface_name1 = message.get_tlv(LinkTlvs.INTERFACE1_NAME.value) - - node_num2 = message.get_tlv(LinkTlvs.N2_NUMBER.value) - interface_index2 = message.get_tlv(LinkTlvs.INTERFACE2_NUMBER.value) - ipv42 = message.get_tlv(LinkTlvs.INTERFACE2_IP4.value) - ipv4_mask2 = message.get_tlv(LinkTlvs.INTERFACE2_IP4_MASK.value) - mac2 = message.get_tlv(LinkTlvs.INTERFACE2_MAC.value) - ipv62 = message.get_tlv(LinkTlvs.INTERFACE2_IP6.value) - ipv6_mask2 = message.get_tlv(LinkTlvs.INTERFACE2_IP6_MASK.value) - interface_name2 = message.get_tlv(LinkTlvs.INTERFACE2_NAME.value) - - node1 = None - node2 = None - net = None - net2 = None - - unidirectional_value = message.get_tlv(LinkTlvs.UNIDIRECTIONAL.value) - if unidirectional_value == 1: - unidirectional = True - else: - unidirectional = False - - # one of the nodes may exist on a remote server - logger.info("link message between node1(%s:%s) and node2(%s:%s)", - node_num1, interface_index1, node_num2, interface_index2) - if node_num1 is not None and node_num2 is not None: - tunnel = self.session.broker.gettunnel(node_num1, node_num2) - logger.info("tunnel between nodes: %s", tunnel) - if isinstance(tunnel, coreobj.PyCoreNet): - net = tunnel - if tunnel.remotenum == node_num1: - node_num1 = None - else: - node_num2 = None - # PhysicalNode connected via GreTap tunnel; uses adoptnetif() below - elif tunnel is not None: - if tunnel.remotenum == node_num1: - node_num1 = None - else: - node_num2 = None - - if node_num1 is not None: - try: - n = self.session.get_object(node_num1) - except KeyError: - # XXX wait and queue this message to try again later - # XXX maybe this should be done differently - time.sleep(0.125) - self.queue_message(message) - return () - if isinstance(n, nodes.PyCoreNode): - node1 = n - elif isinstance(n, coreobj.PyCoreNet): - if net is None: - net = n - else: - net2 = n - else: - raise ValueError("unexpected object class: %s" % n) - - if node_num2 is not None: - try: - n = self.session.get_object(node_num2) - except KeyError: - # XXX wait and queue this message to try again later - # XXX maybe this should be done differently - time.sleep(0.125) - self.queue_message(message) - return () - if isinstance(n, nodes.PyCoreNode): - node2 = n - elif isinstance(n, coreobj.PyCoreNet): - if net is None: - net = n - else: - net2 = n - else: - raise ValueError("unexpected object class: %s" % n) - - link_msg_type = message.get_tlv(LinkTlvs.TYPE.value) - - if node1: - node1.lock.acquire() - if node2: - node2.lock.acquire() - - try: - if link_msg_type == LinkTypes.WIRELESS.value: - """ - Wireless link/unlink event - """ - numwlan = 0 - objs = [node1, node2, net, net2] - objs = filter(lambda (x): x is not None, objs) - if len(objs) < 2: - raise ValueError("wireless link/unlink message between unknown objects") - - nets = objs[0].commonnets(objs[1]) - for netcommon, netif1, netif2 in nets: - if not nodeutils.is_node(netcommon, (NodeTypes.WIRELESS_LAN, NodeTypes.EMANE)): - continue - if message.flags & MessageFlags.ADD.value: - netcommon.link(netif1, netif2) - elif message.flags & MessageFlags.DELETE.value: - netcommon.unlink(netif1, netif2) - else: - raise ValueError("invalid flags for wireless link/unlink message") - numwlan += 1 - if numwlan == 0: - raise ValueError("no common network found for wireless link/unlink") - - elif message.flags & MessageFlags.ADD.value: - """ - Add a new link. - """ - start = False - if self.session.state > EventTypes.DEFINITION_STATE.value: - start = True - - if node1 and node2 and not net: - # a new wired link - net = self.session.add_object(cls=ptp_class, start=start) - - bw = message.get_tlv(LinkTlvs.BANDWIDTH.value) - delay = message.get_tlv(LinkTlvs.DELAY.value) - loss = message.get_tlv(LinkTlvs.PER.value) - duplicate = message.get_tlv(LinkTlvs.DUP.value) - jitter = message.get_tlv(LinkTlvs.JITTER.value) - key = message.get_tlv(LinkTlvs.KEY.value) - - netaddrlist = [] - if node1 and net: - addrlist = [] - if ipv41 is not None and ipv4_mask1 is not None: - addrlist.append("%s/%s" % (ipv41, ipv4_mask1)) - if ipv61 is not None and ipv6_mask1 is not None: - addrlist.append("%s/%s" % (ipv61, ipv6_mask1)) - if ipv42 is not None and ipv4_mask2 is not None: - netaddrlist.append("%s/%s" % (ipv42, ipv4_mask2)) - if ipv62 is not None and ipv6_mask2 is not None: - netaddrlist.append("%s/%s" % (ipv62, ipv6_mask2)) - interface_index1 = node1.newnetif( - net, addrlist=addrlist, - hwaddr=mac1, ifindex=interface_index1, ifname=interface_name1 - ) - net.linkconfig( - node1.netif(interface_index1, net), bw=bw, - delay=delay, loss=loss, - duplicate=duplicate, jitter=jitter - ) - if node1 is None and net: - if ipv41 is not None and ipv4_mask1 is not None: - netaddrlist.append("%s/%s" % (ipv41, ipv4_mask1)) - # don"t add this address again if node2 and net - ipv41 = None - if ipv61 is not None and ipv6_mask1 is not None: - netaddrlist.append("%s/%s" % (ipv61, ipv6_mask1)) - # don"t add this address again if node2 and net - ipv61 = None - if node2 and net: - addrlist = [] - if ipv42 is not None and ipv4_mask2 is not None: - addrlist.append("%s/%s" % (ipv42, ipv4_mask2)) - if ipv62 is not None and ipv6_mask2 is not None: - addrlist.append("%s/%s" % (ipv62, ipv6_mask2)) - if ipv41 is not None and ipv4_mask1 is not None: - netaddrlist.append("%s/%s" % (ipv41, ipv4_mask1)) - if ipv61 is not None and ipv6_mask1 is not None: - netaddrlist.append("%s/%s" % (ipv61, ipv6_mask1)) - interface_index2 = node2.newnetif( - net, addrlist=addrlist, - hwaddr=mac2, ifindex=interface_index2, ifname=interface_name2 - ) - if not unidirectional: - net.linkconfig( - node2.netif(interface_index2, net), bw=bw, - delay=delay, loss=loss, - duplicate=duplicate, jitter=jitter - ) - if node2 is None and net2: - if ipv42 is not None and ipv4_mask2 is not None: - netaddrlist.append("%s/%s" % (ipv42, ipv4_mask2)) - if ipv62 is not None and ipv6_mask2 is not None: - netaddrlist.append("%s/%s" % (ipv62, ipv6_mask2)) - - # tunnel node finalized with this link message - if key and nodeutils.is_node(net, NodeTypes.TUNNEL): - net.setkey(key) - if len(netaddrlist) > 0: - net.addrconfig(netaddrlist) - if key and nodeutils.is_node(net2, NodeTypes.TUNNEL): - net2.setkey(key) - if len(netaddrlist) > 0: - net2.addrconfig(netaddrlist) - - if net and net2: - # two layer-2 networks linked together - if nodeutils.is_node(net2, NodeTypes.RJ45): - # RJ45 nodes have different linknet() - netif = net2.linknet(net) - else: - netif = net.linknet(net2) - net.linkconfig(netif, bw=bw, delay=delay, loss=loss, - duplicate=duplicate, jitter=jitter) - if not unidirectional: - netif.swapparams("_params_up") - net2.linkconfig(netif, bw=bw, delay=delay, loss=loss, - duplicate=duplicate, jitter=jitter, - devname=netif.name) - netif.swapparams("_params_up") - elif net is None and net2 is None and (node1 is None or node2 is None): - # apply address/parameters to PhysicalNodes - fx = (bw, delay, loss, duplicate, jitter) - addrlist = [] - if node1 and nodeutils.is_node(node1, NodeTypes.PHYSICAL): - if ipv41 is not None and ipv4_mask1 is not None: - addrlist.append("%s/%s" % (ipv41, ipv4_mask1)) - if ipv61 is not None and ipv6_mask1 is not None: - addrlist.append("%s/%s" % (ipv61, ipv6_mask1)) - node1.adoptnetif(tunnel, interface_index1, mac1, addrlist) - node1.linkconfig(tunnel, bw, delay, loss, duplicate, jitter) - elif node2 and nodeutils.is_node(node2, NodeTypes.PHYSICAL): - if ipv42 is not None and ipv4_mask2 is not None: - addrlist.append("%s/%s" % (ipv42, ipv4_mask2)) - if ipv62 is not None and ipv6_mask2 is not None: - addrlist.append("%s/%s" % (ipv62, ipv6_mask2)) - node2.adoptnetif(tunnel, interface_index2, mac2, addrlist) - node2.linkconfig(tunnel, bw, delay, loss, duplicate, jitter) - # delete a link - elif message.flags & MessageFlags.DELETE.value: - """ - Remove a link. - """ - if node1 and node2: - # TODO: fix this for the case where ifindex[1,2] are not specified - # a wired unlink event, delete the connecting bridge - netif1 = node1.netif(interface_index1) - netif2 = node2.netif(interface_index2) - if netif1 is None and netif2 is None: - nets = node1.commonnets(node2) - for netcommon, tmp1, tmp2 in nets: - if (net and netcommon == net) or net is None: - netif1 = tmp1 - netif2 = tmp2 - break - - if all([netif1, netif2]) and any([netif1.net, netif2.net]): - if netif1.net != netif2.net and all([netif1.up, netif2.up]): - raise ValueError("no common network found") - net = netif1.net - netif1.detachnet() - netif2.detachnet() - if net.numnetif() == 0: - self.session.delete_object(net.objid) - node1.delnetif(interface_index1) - node2.delnetif(interface_index2) - else: - """ - Modify a link. - """ - bw = message.get_tlv(LinkTlvs.BANDWIDTH.value) - delay = message.get_tlv(LinkTlvs.DELAY.value) - loss = message.get_tlv(LinkTlvs.PER.value) - duplicate = message.get_tlv(LinkTlvs.DUP.value) - jitter = message.get_tlv(LinkTlvs.JITTER.value) - numnet = 0 - # TODO: clean up all this logic. Having the add flag or not - # should use the same code block. - if node1 is None and node2 is None: - if net and net2: - # modify link between nets - netif = net.getlinknetif(net2) - upstream = False - if netif is None: - upstream = True - netif = net2.getlinknetif(net) - if netif is None: - raise ValueError("modify unknown link between nets") - if upstream: - netif.swapparams("_params_up") - net.linkconfig(netif, bw=bw, delay=delay, - loss=loss, duplicate=duplicate, - jitter=jitter, devname=netif.name) - netif.swapparams("_params_up") - else: - net.linkconfig(netif, bw=bw, delay=delay, - loss=loss, duplicate=duplicate, - jitter=jitter) - if not unidirectional: - if upstream: - net2.linkconfig(netif, bw=bw, delay=delay, - loss=loss, - duplicate=duplicate, - jitter=jitter) - else: - netif.swapparams("_params_up") - net2.linkconfig(netif, bw=bw, delay=delay, - loss=loss, - duplicate=duplicate, - jitter=jitter, - devname=netif.name) - netif.swapparams("_params_up") - else: - raise ValueError("modify link for unknown nodes") - elif node1 is None: - # node1 = layer 2node, node2 = layer3 node - net.linkconfig(node2.netif(interface_index2, net), bw=bw, - delay=delay, loss=loss, - duplicate=duplicate, jitter=jitter) - elif node2 is None: - # node2 = layer 2node, node1 = layer3 node - net.linkconfig(node1.netif(interface_index1, net), bw=bw, - delay=delay, loss=loss, - duplicate=duplicate, jitter=jitter) - else: - nets = node1.commonnets(node2) - for net, netif1, netif2 in nets: - if interface_index1 is not None and interface_index1 != node1.getifindex(netif1): - continue - net.linkconfig(netif1, bw=bw, delay=delay, - loss=loss, duplicate=duplicate, - jitter=jitter, netif2=netif2) - if not unidirectional: - net.linkconfig(netif2, bw=bw, delay=delay, - loss=loss, duplicate=duplicate, - jitter=jitter, netif2=netif1) - numnet += 1 - if numnet == 0: - raise ValueError("no common network found") - finally: - if node1: - node1.lock.release() - if node2: - node2.lock.release() - - return () - - def handle_execute_message(self, message): - """ - Execute Message handler - - :param coreapi.CoreExecMessage message: execute message to handle - :return: reply messages - """ - node_num = message.get_tlv(ExecuteTlvs.NODE.value) - execute_num = message.get_tlv(ExecuteTlvs.NUMBER.value) - execute_time = message.get_tlv(ExecuteTlvs.TIME.value) - command = message.get_tlv(ExecuteTlvs.COMMAND.value) - - # local flag indicates command executed locally, not on a node - if node_num is None and not message.flags & MessageFlags.LOCAL.value: - raise ValueError("Execute Message is missing node number.") - - if execute_num is None: - raise ValueError("Execute Message is missing execution number.") - - if execute_time is not None: - self.session.add_event(execute_time, node=node_num, name=None, data=command) - return () - - try: - node = self.session.get_object(node_num) - - # build common TLV items for reply - tlv_data = "" - if node_num is not None: - tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, node_num) - tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, execute_num) - tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, command) - - if message.flags & MessageFlags.TTY.value: - if node_num is None: - raise NotImplementedError - # echo back exec message with cmd for spawning interactive terminal - if command == "bash": - command = "/bin/bash" - res = node.termcmdstring(command) - tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res) - reply = coreapi.CoreExecMessage.pack(MessageFlags.TTY.value, tlv_data) - return reply, - else: - logger.info("execute message with cmd=%s", command) - # execute command and send a response - if message.flags & MessageFlags.STRING.value or message.flags & MessageFlags.TEXT.value: - # shlex.split() handles quotes within the string - if message.flags & MessageFlags.LOCAL.value: - status, res = utils.cmd_output(command) - else: - status, res = node.cmd_output(command) - logger.info("done exec cmd=%s with status=%d res=(%d bytes)", command, status, len(res)) - if message.flags & MessageFlags.TEXT.value: - tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res) - if message.flags & MessageFlags.STRING.value: - tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.STATUS.value, status) - reply = coreapi.CoreExecMessage.pack(0, tlv_data) - return reply, - # execute the command with no response - else: - if message.flags & MessageFlags.LOCAL.value: - utils.mute_detach(command) - else: - node.cmd(command, wait=False) - except KeyError: - logger.exception("error getting object: %s", node_num) - # XXX wait and queue this message to try again later - # XXX maybe this should be done differently - if not message.flags & MessageFlags.LOCAL.value: - time.sleep(0.125) - self.queue_message(message) - - return () - - def handle_register_message(self, message): - """ - Register Message Handler - - :param coreapi.CoreRegMessage message: register message to handle - :return: reply messages - """ - replies = [] - - # execute a Python script or XML file - execute_server = message.get_tlv(RegisterTlvs.EXECUTE_SERVER.value) - if execute_server: - try: - logger.info("executing: %s", execute_server) - if message.flags & MessageFlags.STRING.value: - old_session_ids = set(self.server.get_session_ids()) - sys.argv = shlex.split(execute_server) - file_name = sys.argv[0] - if os.path.splitext(file_name)[1].lower() == ".xml": - session = self.server.create_session() - try: - open_session_xml(session, file_name, start=True) - except: - session.shutdown() - self.server.remove_session(session) - raise - else: - thread = threading.Thread( - target=execfile, - args=(file_name, {"__file__": file_name, "server": self.server}) - ) - thread.daemon = True - thread.start() - # allow time for session creation - time.sleep(0.25) - if message.flags & MessageFlags.STRING.value: - new_session_ids = set(self.server.get_session_ids()) - new_sid = new_session_ids.difference(old_session_ids) - try: - sid = new_sid.pop() - logger.info("executed: %s as session %d", execute_server, sid) - except KeyError: - logger.info("executed %s with unknown session ID", execute_server) - return replies - logger.info("checking session %d for RUNTIME state" % sid) - session = self.server.get_session(session_id=sid) - retries = 10 - # wait for session to enter RUNTIME state, to prevent GUI from - # connecting while nodes are still being instantiated - while session.state != EventTypes.RUNTIME_STATE.value: - logger.info("waiting for session %d to enter RUNTIME state" % sid) - time.sleep(1) - retries -= 1 - if retries <= 0: - logger.info("session %d did not enter RUNTIME state" % sid) - return replies - tlv_data = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, execute_server) - tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.SESSION.value, "%s" % sid) - message = coreapi.CoreRegMessage.pack(0, tlv_data) - replies.append(message) - except Exception as e: - logger.exception("error executing: %s", execute_server) - tlv_data = coreapi.CoreExceptionTlv.pack(ExceptionTlvs.LEVEL.value, 2) - tlv_data += coreapi.CoreExceptionTlv.pack(ExceptionTlvs.TEXT.value, str(e)) - message = coreapi.CoreExceptionMessage.pack(0, tlv_data) - replies.append(message) - - return replies - - gui = message.get_tlv(RegisterTlvs.GUI.value) - if gui is None: - logger.info("ignoring Register message") - else: - # register capabilities with the GUI - self.master = True - - # TODO: need to replicate functionality? - # self.server.set_session_master(self) - # find the session containing this client and set the session to master - for session in self.server.sessions.itervalues(): - if self in session.broker.session_clients: - logger.info("setting session to master: %s", session.session_id) - session.master = True - break - - replies.append(self.register()) - replies.append(self.server.to_session_message()) - - return replies - - def handle_config_message(self, message): - """ - Configuration Message handler - - :param coreapi.CoreConfMessage message: configuration message to handle - :return: reply messages - """ - # convert config message to standard config data object - config_data = ConfigData( - node=message.get_tlv(ConfigTlvs.NODE.value), - object=message.get_tlv(ConfigTlvs.OBJECT.value), - type=message.get_tlv(ConfigTlvs.TYPE.value), - data_types=message.get_tlv(ConfigTlvs.DATA_TYPES.value), - data_values=message.get_tlv(ConfigTlvs.VALUES.value), - captions=message.get_tlv(ConfigTlvs.CAPTIONS.value), - bitmap=message.get_tlv(ConfigTlvs.BITMAP.value), - possible_values=message.get_tlv(ConfigTlvs.POSSIBLE_VALUES.value), - groups=message.get_tlv(ConfigTlvs.GROUPS.value), - session=message.get_tlv(ConfigTlvs.SESSION.value), - interface_number=message.get_tlv(ConfigTlvs.INTERFACE_NUMBER.value), - network_id=message.get_tlv(ConfigTlvs.NETWORK_ID.value), - opaque=message.get_tlv(ConfigTlvs.OPAQUE.value) - ) - logger.info("Configuration message for %s node %s", config_data.object, config_data.node) - - # dispatch to any registered callback for this object type - replies = self.session.config_object(config_data) - - for reply in replies: - self.handle_broadcast_config(reply) - - return [] - - def handle_file_message(self, message): - """ - File Message handler - - :param coreapi.CoreFileMessage message: file message to handle - :return: reply messages - """ - if message.flags & MessageFlags.ADD.value: - node_num = message.get_tlv(NodeTlvs.NUMBER.value) - file_name = message.get_tlv(FileTlvs.NAME.value) - file_type = message.get_tlv(FileTlvs.TYPE.value) - source_name = message.get_tlv(FileTlvs.SOURCE_NAME.value) - data = message.get_tlv(FileTlvs.DATA.value) - compressed_data = message.get_tlv(FileTlvs.COMPRESSED_DATA.value) - - if compressed_data: - logger.warn("Compressed file data not implemented for File message.") - return () - - if source_name and data: - logger.warn("ignoring invalid File message: source and data TLVs are both present") - return () - - # some File Messages store custom files in services, - # prior to node creation - if file_type is not None: - if file_type[:8] == "service:": - self.session.services.setservicefile(node_num, file_type, file_name, source_name, data) - return () - elif file_type[:5] == "hook:": - self.session.set_hook(file_type, file_name, source_name, data) - return () - - # writing a file to the host - if node_num is None: - if source_name is not None: - shutil.copy2(source_name, file_name) - else: - with open(file_name, "w") as open_file: - open_file.write(data) - return () - - try: - node = self.session.get_object(node_num) - if source_name is not None: - node.addfile(source_name, file_name) - elif data is not None: - node.nodefile(file_name, data) - except KeyError: - # XXX wait and queue this message to try again later - # XXX maybe this should be done differently - logger.warn("File message for %s for node number %s queued." % (file_name, node_num)) - time.sleep(0.125) - self.queue_message(message) - return () - else: - raise NotImplementedError - - return () - - def handle_interface_message(self, message): - """ - Interface Message handler. - - :param message: interface message to handle - :return: reply messages - """ - logger.info("ignoring Interface message") - return () - - def handle_event_message(self, message): - """ - Event Message handler - - :param coreapi.CoreEventMessage message: event message to handle - :return: reply messages - """ - event_data = EventData( - node=message.get_tlv(EventTlvs.NODE.value), - event_type=message.get_tlv(EventTlvs.TYPE.value), - name=message.get_tlv(EventTlvs.NAME.value), - data=message.get_tlv(EventTlvs.DATA.value), - time=message.get_tlv(EventTlvs.TIME.value), - session=message.get_tlv(EventTlvs.SESSION.value) - ) - - event_type = event_data.event_type - if event_type is None: - raise NotImplementedError("Event message missing event type") - node_id = event_data.node - - logger.info("EVENT %d: %s at %s", event_type, EventTypes(event_type).name, time.ctime()) - if event_type <= EventTypes.SHUTDOWN_STATE.value: - if node_id is not None: - try: - node = self.session.get_object(node_id) - except KeyError: - raise KeyError("Event message for unknown node %d" % node_id) - - # configure mobility models for WLAN added during runtime - if event_type == EventTypes.INSTANTIATION_STATE.value and nodeutils.is_node(node, - NodeTypes.WIRELESS_LAN): - self.session.mobility.startup(node_ids=(node.objid,)) - return () - - logger.warn("dropping unhandled Event message with node number") - return () - self.session.set_state(state=event_type) - - if event_type == EventTypes.DEFINITION_STATE.value: - # clear all session objects in order to receive new definitions - self.session.delete_objects() - self.session.del_hooks() - self.session.broker.reset() - elif event_type == EventTypes.INSTANTIATION_STATE.value: - if len(self.handler_threads) > 1: - # TODO: sync handler threads here before continuing - time.sleep(2.0) # XXX - # done receiving node/link configuration, ready to instantiate - self.session.instantiate() - - # after booting nodes attempt to send emulation id for nodes waiting on status - for obj in self.session.objects.itervalues(): - self.send_node_emulation_id(obj.objid) - elif event_type == EventTypes.RUNTIME_STATE.value: - if self.session.master: - logger.warn("Unexpected event message: RUNTIME state received at session master") - else: - # master event queue is started in session.checkruntime() - self.session.event_loop.run() - elif event_type == EventTypes.DATACOLLECT_STATE.value: - self.session.data_collect() - elif event_type == EventTypes.SHUTDOWN_STATE.value: - if self.session.master: - logger.warn("Unexpected event message: SHUTDOWN state received at session master") - elif event_type in (EventTypes.START.value, EventTypes.STOP.value, - EventTypes.RESTART.value, - EventTypes.PAUSE.value, - EventTypes.RECONFIGURE.value): - handled = False - name = event_data.name - if name: - # TODO: register system for event message handlers, - # like confobjs - if name.startswith("service:"): - self.session.services.handleevent(event_data) - handled = True - elif name.startswith("mobility:"): - self.session.mobility.handleevent(event_data) - handled = True - if not handled: - logger.warn("Unhandled event message: event type %s (%s)", - event_type, coreapi.state_name(event_type)) - elif event_type == EventTypes.FILE_OPEN.value: - self.session.delete_objects() - self.session.del_hooks() - self.session.broker.reset() - filename = event_data.name - open_session_xml(self.session, filename) - - # trigger session to send out messages out itself - self.session.send_objects() - return () - elif event_type == EventTypes.FILE_SAVE.value: - filename = event_data.name - save_session_xml(self.session, filename, self.session.config["xmlfilever"]) - elif event_type == EventTypes.SCHEDULED.value: - etime = event_data.time - node = event_data.node - name = event_data.name - data = event_data.data - if etime is None: - logger.warn("Event message scheduled event missing start time") - return () - if message.flags & MessageFlags.ADD.value: - self.session.add_event(float(etime), node=node, name=name, data=data) - else: - raise NotImplementedError - else: - logger.warn("Unhandled event message: event type %d", event_type) - - return () - - def handle_session_message(self, message): - """ - Session Message handler - - :param coreapi.CoreSessionMessage message: session message to handle - :return: reply messages - """ - session_id_str = message.get_tlv(SessionTlvs.NUMBER.value) - name_str = message.get_tlv(SessionTlvs.NAME.value) - file_str = message.get_tlv(SessionTlvs.FILE.value) - node_count_str = message.get_tlv(SessionTlvs.NODE_COUNT.value) - thumb = message.get_tlv(SessionTlvs.THUMB.value) - user = message.get_tlv(SessionTlvs.USER.value) - session_ids = coreapi.str_to_list(session_id_str) - names = coreapi.str_to_list(name_str) - files = coreapi.str_to_list(file_str) - node_counts = coreapi.str_to_list(node_count_str) - logger.info("SESSION message flags=0x%x sessions=%s" % (message.flags, session_id_str)) - - if message.flags == 0: - # modify a session - i = 0 - for session_id in session_ids: - session_id = int(session_id) - if session_id == 0: - session = self.session - else: - session = self.server.get_session(session_id=session_id) - - if session is None: - logger.info("session %s not found", session_id) - i += 1 - continue - - logger.info("request to modify to session %s", session.session_id) - if names is not None: - session.name = names[i] - if files is not None: - session.file_name = files[i] - if node_counts is not None: - pass - if thumb is not None: - session.set_thumbnail(thumb) - if user is not None: - session.set_user(user) - i += 1 - else: - if message.flags & MessageFlags.STRING.value and not message.flags & MessageFlags.ADD.value: - # status request flag: send list of sessions - return self.server.to_session_message(), - - # handle ADD or DEL flags - for session_id in session_ids: - session_id = int(session_id) - session = self.server.get_session(session_id=session_id) - - if session is None: - logger.info("session %s not found (flags=0x%x)", session_id, message.flags) - continue - - if message.flags & MessageFlags.ADD.value: - # connect to the first session that exists - logger.info("request to connect to session %s" % session_id) - - # remove client from session broker and shutdown if needed - self.session.broker.session_clients.remove(self) - active_states = [ - EventTypes.RUNTIME_STATE.value, - EventTypes.RUNTIME_STATE.value, - EventTypes.DATACOLLECT_STATE.value - ] - if not self.session.broker.session_clients and self.session.state not in active_states: - self.session.shutdown() - - # set session to join - self.session = session - - # add client to session broker and set master if needed - if self.master: - self.session.master = True - self.session.broker.session_clients.append(self) - - # add broadcast handlers - logger.info("adding session broadcast handlers") - self.add_session_handlers() - - if user is not None: - self.session.set_user(user) - - if message.flags & MessageFlags.STRING.value: - self.session.send_objects() - elif message.flags & MessageFlags.DELETE.value: - # shut down the specified session(s) - logger.info("request to terminate session %s" % session_id) - session.set_state(state=EventTypes.DATACOLLECT_STATE.value, send_event=True) - session.set_state(state=EventTypes.SHUTDOWN_STATE.value, send_event=True) - session.shutdown() - else: - logger.warn("unhandled session flags for session %s", session_id) - - return () - - def send_node_emulation_id(self, node_id): - """ - Node emulation id to send. - - :param int node_id: node id to send - :return: nothing - """ - if node_id in self.node_status_request: - tlv_data = "" - tlv_data += coreapi.CoreNodeTlv.pack(NodeTlvs.NUMBER.value, node_id) - tlv_data += coreapi.CoreNodeTlv.pack(NodeTlvs.EMULATION_ID.value, node_id) - reply = coreapi.CoreNodeMessage.pack(MessageFlags.ADD.value | MessageFlags.LOCAL.value, tlv_data) - - try: - self.sendall(reply) - except IOError: - logger.exception("error sending node emulation id message: %s", node_id) - - del self.node_status_request[node_id] diff --git a/daemon/core/coreserver.py b/daemon/core/coreserver.py deleted file mode 100644 index 1687aec3..00000000 --- a/daemon/core/coreserver.py +++ /dev/null @@ -1,263 +0,0 @@ -""" -Defines server classes and request handlers for TCP and UDP. -""" - -import SocketServer -import threading -import time - -from core import logger -from core.api import coreapi -from core.enumerations import EventTypes -from core.enumerations import SessionTlvs -from core.session import Session - - -class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - """ - TCP server class, manages sessions and spawns request handlers for - incoming connections. - """ - daemon_threads = True - allow_reuse_address = True - servers = set() - - def __init__(self, server_address, handler_class, config=None): - """ - Server class initialization takes configuration data and calls - the SocketServer constructor - - :param tuple[str, int] server_address: server host and port to use - :param class handler_class: request handler - :param dict config: configuration setting - :return: - """ - self.config = config - self.sessions = {} - self.udpserver = None - self.udpthread = None - self._sessions_lock = threading.Lock() - CoreServer.add_server(self) - SocketServer.TCPServer.__init__(self, server_address, handler_class) - - @classmethod - def add_server(cls, server): - """ - Add a core server to the known servers set. - - :param CoreServer server: server to add - :return: nothing - """ - cls.servers.add(server) - - @classmethod - def remove_server(cls, server): - """ - Remove a core server from the known servers set. - - :param CoreServer server: server to remove - :return: nothing - """ - if server in cls.servers: - cls.servers.remove(server) - - def shutdown(self): - """ - Shutdown the server, all known sessions, and remove server from known servers set. - - :return: nothing - """ - # shutdown all known sessions - for session in self.sessions.values(): - session.shutdown() - - # remove server from server list - CoreServer.remove_server(self) - - def add_session(self, session): - """ - Add a session to our dictionary of sessions, ensuring a unique session number. - - :param core.session.Session session: session to add - :return: added session - :raise KeyError: when a session with the same id already exists - """ - with self._sessions_lock: - if session.session_id in self.sessions: - raise KeyError("non-unique session id %s for %s" % (session.session_id, session)) - self.sessions[session.session_id] = session - - return session - - def remove_session(self, session): - """ - Remove a session from our dictionary of sessions. - - :param core.session.Session session: session to remove - :return: removed session - :rtype: core.session.Session - """ - with self._sessions_lock: - if session.session_id not in self.sessions: - logger.info("session id %s not found (sessions=%s)", session.session_id, self.sessions.keys()) - else: - del self.sessions[session.session_id] - - return session - - def get_session_ids(self): - """ - Return a list of active session numbers. - - :return: known session ids - :rtype: list - """ - with self._sessions_lock: - session_ids = self.sessions.keys() - - return session_ids - - def create_session(self, session_id=None): - """ - Convenience method for creating sessions with the servers config. - - :param int session_id: session id for new session - :return: create session - :rtype: core.session.Session - """ - - # create random id when necessary, seems to be 1 case wanted, based on legacy code - # creating a value so high, typical client side generation schemes hopefully wont collide - if not session_id: - session_id = next( - session_id for session_id in xrange(60000, 65000) - if session_id not in self.sessions - ) - - # create and add session to local manager - session = Session(session_id, config=self.config) - self.add_session(session) - - # add shutdown handler to remove session from manager - session.shutdown_handlers.append(self.session_shutdown) - - return session - - def get_session(self, session_id=None): - """ - Create a new session or retrieve an existing one from our - dictionary of sessions. When the session_id=0 and the use_existing - flag is set, return on of the existing sessions. - - :param int session_id: session id of session to retrieve, defaults to returning random session - :return: session - :rtype: core.session.Session - """ - - with self._sessions_lock: - # return specified session or none - if session_id: - return self.sessions.get(session_id) - - # retrieving known session - session = None - - # find runtime session with highest node count - for known_session in filter(lambda x: x.state == EventTypes.RUNTIME_STATE.value, - self.sessions.itervalues()): - if not session or known_session.get_node_count() > session.get_node_count(): - session = known_session - - # return first known session otherwise - if not session: - for known_session in self.sessions.itervalues(): - session = known_session - break - - return session - - def session_shutdown(self, session): - """ - Handler method to be used as a callback when a session has shutdown. - - :param core.session.Session session: session shutting down - :return: nothing - """ - self.remove_session(session) - - def to_session_message(self, flags=0): - """ - Build CORE API Sessions message based on current session info. - - :param int flags: message flags - :return: session message - """ - id_list = [] - name_list = [] - file_list = [] - node_count_list = [] - date_list = [] - thumb_list = [] - num_sessions = 0 - - with self._sessions_lock: - for session_id in self.sessions: - session = self.sessions[session_id] - # debug: session.dumpsession() - num_sessions += 1 - id_list.append(str(session_id)) - - name = session.name - if not name: - name = "" - name_list.append(name) - - file = session.file_name - if not file: - file = "" - file_list.append(file) - - node_count_list.append(str(session.get_node_count())) - - date_list.append(time.ctime(session._state_time)) - - thumb = session.thumbnail - if not thumb: - thumb = "" - thumb_list.append(thumb) - - session_ids = "|".join(id_list) - names = "|".join(name_list) - files = "|".join(file_list) - node_counts = "|".join(node_count_list) - dates = "|".join(date_list) - thumbs = "|".join(thumb_list) - - if num_sessions > 0: - tlv_data = "" - if len(session_ids) > 0: - tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids) - if len(names) > 0: - tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names) - if len(files) > 0: - tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files) - if len(node_counts) > 0: - tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts) - if len(dates) > 0: - tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates) - if len(thumbs) > 0: - tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs) - message = coreapi.CoreSessionMessage.pack(flags, tlv_data) - else: - message = None - - return message - - def dump_sessions(self): - """ - Log currently known session information. - """ - logger.info("sessions:") - with self._sessions_lock: - for session_id in self.sessions: - logger.info(session_id) diff --git a/daemon/core/future/coreemu.py b/daemon/core/future/coreemu.py index 5bb2859e..4b906fcf 100644 --- a/daemon/core/future/coreemu.py +++ b/daemon/core/future/coreemu.py @@ -195,6 +195,9 @@ class FutureSession(Session): raise ValueError("wireless link failure: %s", objects) logger.info("handling wireless linking objects(%) connect(%s)", objects, connect) common_networks = objects[0].commonnets(objects[1]) + if not common_networks: + raise ValueError("no common network found for wireless link/unlink") + for common_network, interface_one, interface_two in common_networks: if not nodeutils.is_node(common_network, [NodeTypes.WIRELESS_LAN, NodeTypes.EMANE]): logger.info("skipping common network that is not wireless/emane: %s", common_network) @@ -205,8 +208,6 @@ class FutureSession(Session): common_network.link(interface_one, interface_two) else: common_network.unlink(interface_one, interface_two) - else: - raise ValueError("no common network found for wireless link/unlink") def add_link(self, node_one_id, node_two_id, interface_one=None, interface_two=None, link_options=LinkOptions()): """ @@ -429,15 +430,17 @@ class FutureSession(Session): link_config(net_one, interface, link_options) else: common_networks = node_one.commonnets(node_two) + if not common_networks: + raise ValueError("no common network found") + for net_one, interface_one, interface_two in common_networks: - if interface_one_id and interface_one_id != node_one.getifindex(interface_one): + if interface_one_id is not None and interface_one_id != node_one.getifindex(interface_one): continue link_config(net_one, interface_one, link_options, interface_two=interface_two) if not link_options.unidirectional: link_config(net_one, interface_two, link_options, interface_two=interface_one) - else: - raise ValueError("no common network found") + finally: if node_one: node_one.lock.release() @@ -610,8 +613,8 @@ class FutureSession(Session): :return: nothing """ - self.set_state(state=EventTypes.DATACOLLECT_STATE.value, send_event=True) - self.set_state(state=EventTypes.SHUTDOWN_STATE.value, send_event=True) + self.set_state(EventTypes.DATACOLLECT_STATE, send_event=True) + self.set_state(EventTypes.SHUTDOWN_STATE, send_event=True) super(FutureSession, self).shutdown() def custom_delete_object(self, object_id): @@ -753,9 +756,9 @@ class FutureSession(Session): """ self.mobility.handleevent(event_data) - def create_emane_node(self, _id=None, node_options=NodeOptions()): + def create_wireless_node(self, _id=None, node_options=NodeOptions()): """ - Create an EMANE node for use within an EMANE network. + Create a wireless node for use within an wireless/EMANE networks. :param int _id: int for node, defaults to None and will be generated :param core.future.futuredata.NodeOptions node_options: options for emane node, model will always be "mdr" @@ -796,6 +799,29 @@ class FutureSession(Session): values = list(emane_model.getdefaultvalues()) self.emane.setconfig(emane_node.objid, emane_model.name, values) + def set_wireless_model(self, node, model): + """ + Convenience method for setting a wireless model. + + :param node: node to set wireless model for + :param core.mobility.WirelessModel model: wireless model to set node to + :return: nothing + """ + values = list(model.getdefaultvalues()) + node.setmodel(model, values) + + def wireless_link_all(self, network, nodes): + """ + Link all nodes to the provided wireless network. + + :param network: wireless network to link nodes to + :param nodes: nodes to link to wireless network + :return: nothing + """ + for node in nodes: + for common_network, interface_one, interface_two in node.commonnets(network): + common_network.link(interface_one, interface_two) + class CoreEmu(object): """ @@ -876,26 +902,3 @@ class CoreEmu(object): logger.error("session to delete did not exist: %s", _id) return result - - def set_wireless_model(self, node, model): - """ - Convenience method for setting a wireless model. - - :param node: node to set wireless model for - :param core.mobility.WirelessModel model: wireless model to set node to - :return: nothing - """ - values = list(model.getdefaultvalues()) - node.setmodel(model, values) - - def wireless_link_all(self, network, nodes): - """ - Link all nodes to the provided wireless network. - - :param network: wireless network to link nodes to - :param nodes: nodes to link to wireless network - :return: nothing - """ - for node in nodes: - for common_network, interface_one, interface_two in node.commonnets(network): - common_network.link(interface_one, interface_two) diff --git a/daemon/core/legacy/__init__.py b/daemon/core/legacy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/daemon/core/future/futurehandler.py b/daemon/core/legacy/corehandler.py similarity index 96% rename from daemon/core/future/futurehandler.py rename to daemon/core/legacy/corehandler.py index 7fb90a5a..2b358f1b 100644 --- a/daemon/core/future/futurehandler.py +++ b/daemon/core/legacy/corehandler.py @@ -13,29 +13,31 @@ import time from core import logger from core.api import coreapi -from core.coreserver import CoreServer from core.data import ConfigData from core.data import EventData -from core.enumerations import ConfigTlvs, LinkTypes +from core.enumerations import ConfigTlvs from core.enumerations import EventTlvs from core.enumerations import EventTypes from core.enumerations import ExceptionTlvs from core.enumerations import ExecuteTlvs from core.enumerations import FileTlvs from core.enumerations import LinkTlvs +from core.enumerations import LinkTypes from core.enumerations import MessageFlags from core.enumerations import MessageTypes from core.enumerations import NodeTlvs from core.enumerations import NodeTypes from core.enumerations import RegisterTlvs from core.enumerations import SessionTlvs -from core.future.futuredata import NodeOptions, LinkOptions, InterfaceData +from core.future.futuredata import InterfaceData +from core.future.futuredata import LinkOptions +from core.future.futuredata import NodeOptions from core.misc import nodeutils from core.misc import structutils from core.misc import utils -class FutureHandler(SocketServer.BaseRequestHandler): +class CoreHandler(SocketServer.BaseRequestHandler): """ The SocketServer class uses the RequestHandler class for servicing requests. """ @@ -578,7 +580,7 @@ class FutureHandler(SocketServer.BaseRequestHandler): self.add_session_handlers() # set initial session state - self.session.set_state(state=EventTypes.DEFINITION_STATE.value) + self.session.set_state(EventTypes.DEFINITION_STATE) while True: try: @@ -1032,13 +1034,13 @@ class FutureHandler(SocketServer.BaseRequestHandler): session=message.get_tlv(EventTlvs.SESSION.value) ) - event_type = event_data.event_type - if event_type is None: + if event_data.event_type is None: raise NotImplementedError("Event message missing event type") + event_type = EventTypes(event_data.event_type) node_id = event_data.node - logger.info("EVENT %d: %s at %s", event_type, EventTypes(event_type).name, time.ctime()) - if event_type <= EventTypes.SHUTDOWN_STATE.value: + logger.info("EVENT %s at %s", event_type.name, time.ctime()) + if event_type.value <= EventTypes.SHUTDOWN_STATE.value: if node_id is not None: try: node = self.session.get_object(node_id) @@ -1046,19 +1048,18 @@ class FutureHandler(SocketServer.BaseRequestHandler): raise KeyError("Event message for unknown node %d" % node_id) # configure mobility models for WLAN added during runtime - if event_type == EventTypes.INSTANTIATION_STATE.value and nodeutils.is_node(node, - NodeTypes.WIRELESS_LAN): + if event_type == EventTypes.INSTANTIATION_STATE and nodeutils.is_node(node, NodeTypes.WIRELESS_LAN): self.session.start_mobility(node_ids=(node.objid,)) return () logger.warn("dropping unhandled Event message with node number") return () - self.session.set_state(state=event_type) + self.session.set_state(event_type) - if event_type == EventTypes.DEFINITION_STATE.value: + if event_type == EventTypes.DEFINITION_STATE: # clear all session objects in order to receive new definitions self.session.clear() - elif event_type == EventTypes.INSTANTIATION_STATE.value: + elif event_type == EventTypes.INSTANTIATION_STATE: if len(self.handler_threads) > 1: # TODO: sync handler threads here before continuing time.sleep(2.0) # XXX @@ -1068,21 +1069,19 @@ class FutureHandler(SocketServer.BaseRequestHandler): # after booting nodes attempt to send emulation id for nodes waiting on status for obj in self.session.objects.itervalues(): self.send_node_emulation_id(obj.objid) - elif event_type == EventTypes.RUNTIME_STATE.value: + elif event_type == EventTypes.RUNTIME_STATE: if self.session.master: logger.warn("Unexpected event message: RUNTIME state received at session master") else: # master event queue is started in session.checkruntime() self.session.start_events() - elif event_type == EventTypes.DATACOLLECT_STATE.value: + elif event_type == EventTypes.DATACOLLECT_STATE: self.session.data_collect() - elif event_type == EventTypes.SHUTDOWN_STATE.value: + elif event_type == EventTypes.SHUTDOWN_STATE: if self.session.master: logger.warn("Unexpected event message: SHUTDOWN state received at session master") - elif event_type in (EventTypes.START.value, EventTypes.STOP.value, - EventTypes.RESTART.value, - EventTypes.PAUSE.value, - EventTypes.RECONFIGURE.value): + elif event_type in {EventTypes.START, EventTypes.STOP, EventTypes.RESTART, EventTypes.PAUSE, + EventTypes.RECONFIGURE}: handled = False name = event_data.name if name: @@ -1095,17 +1094,16 @@ class FutureHandler(SocketServer.BaseRequestHandler): self.session.mobility_event(event_data) handled = True if not handled: - logger.warn("Unhandled event message: event type %s (%s)", - event_type, coreapi.state_name(event_type)) - elif event_type == EventTypes.FILE_OPEN.value: + logger.warn("Unhandled event message: event type %s ", event_type.name) + elif event_type == EventTypes.FILE_OPEN: filename = event_data.name self.session.open_xml(filename, start=False) self.session.send_objects() return () - elif event_type == EventTypes.FILE_SAVE.value: + elif event_type == EventTypes.FILE_SAVE: filename = event_data.name self.session.save_xml(filename, self.session.config["xmlfilever"]) - elif event_type == EventTypes.SCHEDULED.value: + elif event_type == EventTypes.SCHEDULED: etime = event_data.time node = event_data.node name = event_data.name @@ -1118,7 +1116,7 @@ class FutureHandler(SocketServer.BaseRequestHandler): else: raise NotImplementedError else: - logger.warn("Unhandled event message: event type %d", event_type) + logger.warn("Unhandled event message: event type %s", event_type) return () diff --git a/daemon/core/future/futureserver.py b/daemon/core/legacy/coreserver.py similarity index 64% rename from daemon/core/future/futureserver.py rename to daemon/core/legacy/coreserver.py index d37845b4..5b57ca51 100644 --- a/daemon/core/future/futureserver.py +++ b/daemon/core/legacy/coreserver.py @@ -1,5 +1,5 @@ """ -Defines server classes and request handlers for TCP and UDP. +Defines core server for handling TCP connections. """ import SocketServer @@ -7,7 +7,7 @@ import SocketServer from core.future.coreemu import CoreEmu -class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): +class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): """ TCP server class, manages sessions and spawns request handlers for incoming connections. @@ -28,13 +28,3 @@ class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): self.coreemu = CoreEmu(config) self.config = config SocketServer.TCPServer.__init__(self, server_address, handler_class) - - def shutdown(self): - """ - Shutdown the server, all known sessions, and remove server from known servers set. - - :return: nothing - """ - # shutdown all known sessions - for session in self.coreemu.sessions.itervalues(): - session.shutdown() diff --git a/daemon/core/session.py b/daemon/core/session.py index 81ee8b04..a7149c34 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -300,27 +300,28 @@ class Session(object): """ Set the session's current state. - :param int state: state to set to + :param core.enumerations.EventTypes state: state to set to :param send_event: if true, generate core API event messages :return: nothing """ - state_name = coreapi.state_name(state) + state_value = state.value + state_name = state.name - if self.state == state: + if self.state == state_value: logger.info("session is already in state: %s, skipping change", state_name) return - self.state = state + self.state = state_value self._state_time = time.time() logger.info("changing session %s to state %s(%s) at %s", - self.session_id, state, state_name, self._state_time) + self.session_id, state_value, state_name, self._state_time) - self.write_state(state) - self.run_hooks(state) - self.run_state_hooks(state) + self.write_state(state_value) + self.run_hooks(state_value) + self.run_state_hooks(state_value) if send_event: - event_data = EventData(event_type=state, time="%s" % time.time()) + event_data = EventData(event_type=state_value, time="%s" % time.time()) self.broadcast_event(event_data) def write_state(self, state): @@ -868,7 +869,7 @@ class Session(object): # start event loop and set to runtime self.event_loop.run() - self.set_state(EventTypes.RUNTIME_STATE.value, send_event=True) + self.set_state(EventTypes.RUNTIME_STATE, send_event=True) def data_collect(self): """ @@ -908,7 +909,7 @@ class Session(object): shutdown = False if node_count == 0: shutdown = True - self.set_state(state=EventTypes.SHUTDOWN_STATE.value) + self.set_state(EventTypes.SHUTDOWN_STATE) return shutdown diff --git a/daemon/core/xml/xmlparser0.py b/daemon/core/xml/xmlparser0.py index 936c7b6f..e13e61c5 100644 --- a/daemon/core/xml/xmlparser0.py +++ b/daemon/core/xml/xmlparser0.py @@ -250,7 +250,7 @@ class CoreDocumentParser0(object): geo.append(a) location.setrefgeo(geo[0], geo[1], geo[2]) scale = origin.getAttribute("scale100") - if scale is not None: + if scale is not None and scale: location.refscale = float(scale) point = xmlutils.get_one_element(origin, "point") if point is not None and point.firstChild is not None: diff --git a/daemon/examples/future/emane80211_api.py b/daemon/examples/future/emane80211_api.py index 1b16bc96..e60c2871 100644 --- a/daemon/examples/future/emane80211_api.py +++ b/daemon/examples/future/emane80211_api.py @@ -20,7 +20,7 @@ def example(options): session = coreemu.create_session() # must be in configuration state for nodes to start, when using "node_add" below - session.set_state(EventTypes.CONFIGURATION_STATE.value) + session.set_state(EventTypes.CONFIGURATION_STATE) # create emane network node emane_network = session.create_emane_network( @@ -31,7 +31,7 @@ def example(options): # create nodes for i in xrange(options.nodes): - node = session.create_emane_node() + node = session.create_wireless_node() node.setposition(x=150 * (i + 1), y=150) interface = prefixes.create_interface(node) session.add_link(node.objid, emane_network.objid, interface_one=interface) diff --git a/daemon/examples/future/switch_api.py b/daemon/examples/future/switch_api.py index 1dc914ec..9e50085f 100644 --- a/daemon/examples/future/switch_api.py +++ b/daemon/examples/future/switch_api.py @@ -22,7 +22,7 @@ def example(options): session = coreemu.create_session() # must be in configuration state for nodes to start, when using "node_add" below - session.set_state(EventTypes.CONFIGURATION_STATE.value) + session.set_state(EventTypes.CONFIGURATION_STATE) # create switch network node switch = session.add_node(_type=NodeTypes.SWITCH) diff --git a/daemon/examples/future/switch_api_inject.py b/daemon/examples/future/switch_api_inject.py index 215bddb8..647f3423 100644 --- a/daemon/examples/future/switch_api_inject.py +++ b/daemon/examples/future/switch_api_inject.py @@ -18,7 +18,7 @@ def example(nodes): session = coreemu.create_session() # must be in configuration state for nodes to start, when using "node_add" below - session.set_state(EventTypes.CONFIGURATION_STATE.value) + session.set_state(EventTypes.CONFIGURATION_STATE) # create switch network node switch = session.add_node(_type=NodeTypes.SWITCH) diff --git a/daemon/examples/future/wlan_api.py b/daemon/examples/future/wlan_api.py index ed34ca55..5fa16cc0 100644 --- a/daemon/examples/future/wlan_api.py +++ b/daemon/examples/future/wlan_api.py @@ -23,11 +23,11 @@ def example(options): session = coreemu.create_session() # must be in configuration state for nodes to start, when using "node_add" below - session.set_state(EventTypes.CONFIGURATION_STATE.value) + session.set_state(EventTypes.CONFIGURATION_STATE) # create wlan network node wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) - coreemu.set_wireless_model(wlan, BasicRangeModel) + session.set_wireless_model(wlan, BasicRangeModel) # create nodes wireless_nodes = [] @@ -38,7 +38,7 @@ def example(options): wireless_nodes.append(node) # link all created nodes with the wireless network - coreemu.wireless_link_all(wlan, wireless_nodes) + session.wireless_link_all(wlan, wireless_nodes) # instantiate session session.instantiate() diff --git a/daemon/examples/netns/daemonnodes.py b/daemon/examples/netns/daemonnodes.py index 2f8b7f81..76e3f9ce 100755 --- a/daemon/examples/netns/daemonnodes.py +++ b/daemon/examples/netns/daemonnodes.py @@ -132,7 +132,7 @@ def main(): session.broker.dorecvloop = False # Change to configuration state on both machines - session.set_state(EventTypes.CONFIGURATION_STATE.value) + session.set_state(EventTypes.CONFIGURATION_STATE) tlvdata = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.CONFIGURATION_STATE.value) session.broker.handlerawmsg(coreapi.CoreEventMessage.pack(0, tlvdata)) diff --git a/daemon/examples/netns/distributed.py b/daemon/examples/netns/distributed.py index a76388b1..6ffd49f1 100755 --- a/daemon/examples/netns/distributed.py +++ b/daemon/examples/netns/distributed.py @@ -69,7 +69,7 @@ def main(): print "connecting to slave at %s:%d" % (slave, port) session.broker.addserver(slave, slave, port) session.broker.setupserver(slave) - session.set_state(EventTypes.CONFIGURATION_STATE.value) + session.set_state(EventTypes.CONFIGURATION_STATE) tlvdata = coreapi.CoreEventTlv.pack(EventTlvs.TYPE.value, EventTypes.CONFIGURATION_STATE.value) session.broker.handlerawmsg(coreapi.CoreEventMessage.pack(0, tlvdata)) diff --git a/daemon/scripts/core-daemon b/daemon/scripts/core-daemon old mode 100755 new mode 100644 index a9d92376..744d202b --- a/daemon/scripts/core-daemon +++ b/daemon/scripts/core-daemon @@ -6,18 +6,16 @@ message handlers are defined and some support for sending messages. """ import ConfigParser -import atexit import optparse -import signal import sys import time from core import constants -from core import corehandlers -from core import coreserver from core import enumerations from core import logger from core import services +from core.legacy.corehandler import CoreHandler +from core.legacy.coreserver import CoreServer from core.misc import nodeutils from core.misc.utils import close_onexec from core.service import ServiceManager @@ -45,7 +43,7 @@ def cored(cfg=None): host = "localhost" try: - server = coreserver.CoreServer((host, port), corehandlers.CoreRequestHandler, cfg) + server = CoreServer((host, port), CoreHandler, cfg) except: logger.exception("error starting main server on: %s:%s", host, port) sys.exit(1) @@ -55,38 +53,6 @@ def cored(cfg=None): server.serve_forever() -# TODO: should sessions and the main core daemon both catch exit to shutdown independently? -def cleanup(): - """ - Runs server shutdown and cleanup when catching an exit signal. - - :return: nothing - """ - while coreserver.CoreServer.servers: - server = coreserver.CoreServer.servers.pop() - server.shutdown() - - -def sighandler(signum, stackframe): - """ - Signal handler when different signals are sent. - - :param int signum: singal number sent - :param stackframe: stack frame sent - :return: nothing - """ - logger.error("terminated by signal: %s", signum) - sys.exit(signum) - - -signal.signal(signal.SIGHUP, sighandler) -signal.signal(signal.SIGINT, sighandler) -signal.signal(signal.SIGTERM, sighandler) -signal.signal(signal.SIGUSR1, sighandler) -signal.signal(signal.SIGUSR2, sighandler) -atexit.register(cleanup) - - def get_merged_config(filename): """ Return a configuration after merging config file and command-line arguments. diff --git a/daemon/scripts/core-future b/daemon/scripts/core-future deleted file mode 100644 index 815aa5bb..00000000 --- a/daemon/scripts/core-future +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python -""" -core-daemon: the CORE daemon is a server process that receives CORE API -messages and instantiates emulated nodes and networks within the kernel. Various -message handlers are defined and some support for sending messages. -""" - -import ConfigParser -import optparse -import sys -import time - -from core import constants -from core import enumerations -from core import logger -from core import services -from core.future.futurehandler import FutureHandler -from core.future.futureserver import FutureServer -from core.misc import nodeutils -from core.misc.utils import close_onexec -from core.service import ServiceManager - - -def banner(): - """ - Output the program banner printed to the terminal or log file. - - :return: nothing - """ - logger.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) - - -def cored(cfg=None): - """ - Start the CoreServer object and enter the server loop. - - :param dict cfg: core configuration - :return: nothing - """ - host = cfg["listenaddr"] - port = int(cfg["port"]) - if host == "" or host is None: - host = "localhost" - - try: - server = FutureServer((host, port), FutureHandler, cfg) - except: - logger.exception("error starting main server on: %s:%s", host, port) - sys.exit(1) - - close_onexec(server.fileno()) - logger.info("main server started, listening on: %s:%s", host, port) - server.serve_forever() - - -def get_merged_config(filename): - """ - Return a configuration after merging config file and command-line arguments. - - :param str filename: file name to merge configuration settings with - :return: merged configuration - :rtype: dict - """ - # these are the defaults used in the config file - defaults = { - "port": "%d" % enumerations.CORE_API_PORT, - "listenaddr": "localhost", - "xmlfilever": "1.0", - "numthreads": "1", - } - - usagestr = "usage: %prog [-h] [options] [args]\n\n" + \ - "CORE daemon v.%s instantiates Linux network namespace " \ - "nodes." % constants.COREDPY_VERSION - parser = optparse.OptionParser(usage=usagestr) - parser.add_option("-f", "--configfile", dest="configfile", type="string", - help="read config from specified file; default = %s" % filename) - parser.add_option("-p", "--port", dest="port", type=int, - help="port number to listen on; default = %s" % defaults["port"]) - parser.add_option("-t", "--numthreads", dest="numthreads", type=int, - help="number of server threads; default = %s" % defaults["numthreads"]) - - # parse command line options - options, args = parser.parse_args() - - # read the config file - if options.configfile is not None: - filename = options.configfile - del options.configfile - cfg = ConfigParser.SafeConfigParser(defaults) - cfg.read(filename) - - section = "core-daemon" - if not cfg.has_section(section): - cfg.add_section(section) - - # merge command line with config file - for opt in options.__dict__: - val = options.__dict__[opt] - if val is not None: - cfg.set(section, opt, val.__str__()) - - return dict(cfg.items(section)), args - - -def main(): - """ - Main program startup. - - :return: nothing - """ - # get a configuration merged from config file and command-line arguments - cfg, args = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR) - for a in args: - logger.error("ignoring command line argument: %s", a) - - # attempt load custom services - service_paths = cfg.get("custom_services_dir") - logger.debug("custom service paths: %s", service_paths) - if service_paths: - for service_path in service_paths.split(','): - service_path = service_path.strip() - ServiceManager.add_services(service_path) - - banner() - - try: - cored(cfg) - except KeyboardInterrupt: - logger.info("keyboard interrupt, stopping core daemon") - - sys.exit(0) - - -if __name__ == "__main__": - # configure nodes to use - if len(sys.argv) == 2 and sys.argv[1] == "ovs": - from core.netns.openvswitch import OVS_NODES - - nodeutils.update_node_map(OVS_NODES) - - # load default services - services.load() - - main() diff --git a/daemon/tests/conftest.py b/daemon/tests/conftest.py index 50b088d7..bf478d4a 100644 --- a/daemon/tests/conftest.py +++ b/daemon/tests/conftest.py @@ -13,8 +13,6 @@ from core.api.coreapi import CoreEventMessage from core.api.coreapi import CoreExecMessage from core.api.coreapi import CoreLinkMessage from core.api.coreapi import CoreNodeMessage -from core.corehandlers import CoreRequestHandler -from core.coreserver import CoreServer from core.enumerations import CORE_API_PORT from core.enumerations import ConfigTlvs from core.enumerations import EventTlvs @@ -25,10 +23,12 @@ from core.enumerations import LinkTypes from core.enumerations import MessageFlags from core.enumerations import NodeTlvs from core.enumerations import NodeTypes +from core.future.coreemu import CoreEmu +from core.future.futuredata import IpPrefixes +from core.legacy.corehandler import CoreHandler +from core.legacy.coreserver import CoreServer from core.misc import ipaddress from core.misc.ipaddress import MacAddress -from core.netns import nodes -from core.session import Session EMANE_SERVICES = "zebra|OSPFv3MDR|IPForward" @@ -133,110 +133,12 @@ def state_message(state): ]) -class Core(object): - def __init__(self, session, ip_prefix): - self.session = session - self.ip_prefix = ip_prefix - self.current_ip = 1 - self.nodes = {} - self.node_ips = {} - - def create_node(self, name, cls=nodes.CoreNode, objid=None, position=None, services=None, model="host"): - node = self.session.add_object(cls=cls, name=name, objid=objid) - node.type = model - if position: - node.setposition(*position) - if services: - self.session.services.addservicestonode(node, model, services) - self.nodes[name] = node - - def add_interface(self, network, name): - node_ip = self.ip_prefix.addr(self.current_ip) - self.current_ip += 1 - self.node_ips[name] = node_ip - node = self.nodes[name] - interface_id = node.newnetif(network, ["%s/%s" % (node_ip, self.ip_prefix.prefixlen)]) - return node.netif(interface_id) - - def get_node(self, name): - """ - Retrieve node from current session. - - :param str name: name of node to retrieve - :return: core node - :rtype: core.netns.nodes.CoreNode - """ - return self.nodes[name] - - def get_ip(self, name): - return self.node_ips[name] - - def link(self, network, from_interface, to_interface): - network.link(from_interface, to_interface) - - def configure_link(self, network, interface_one, interface_two, values, unidirectional=False): - network.linkconfig(netif=interface_one, netif2=interface_two, **values) - - if not unidirectional: - network.linkconfig(netif=interface_two, netif2=interface_one, **values) - - def ping(self, from_name, to_name): - from_node = self.nodes[from_name] - to_ip = str(self.get_ip(to_name)) - return from_node.cmd(["ping", "-c", "3", to_ip]) - - def ping_output(self, from_name, to_name): - from_node = self.nodes[from_name] - to_ip = str(self.get_ip(to_name)) - output = from_node.check_cmd(["ping", "-i", "0.05", "-c", "3", to_ip]) - return output - - def iperf(self, from_name, to_name): - from_node = self.nodes[from_name] - to_node = self.nodes[to_name] - to_ip = str(self.get_ip(to_name)) - - # run iperf server, run client, kill iperf server - vcmd, stdin, stdout, stderr = to_node.client.popen(["iperf", "-s", "-u", "-y", "C"]) - from_node.cmd(["iperf", "-u", "-t", "5", "-c", to_ip]) - to_node.cmd(["killall", "-9", "iperf"]) - - return stdout.read().strip() - - def assert_nodes(self): - for node in self.nodes.itervalues(): - assert os.path.exists(node.nodedir) - - def create_link_network(self): - # create switch - ptp_node = self.session.add_object(cls=nodes.PtpNet) - - # create nodes - self.create_node("n1") - self.create_node("n2") - - # add interfaces - interface_one = self.add_interface(ptp_node, "n1") - interface_two = self.add_interface(ptp_node, "n2") - - # instantiate session - self.session.instantiate() - - # assert node directories created - self.assert_nodes() - - return ptp_node, interface_one, interface_two - - def set_emane_model(self, emane_node, emane_model): - # set the emane model - values = emane_model.getdefaultvalues() - self.session.emane.setconfig(emane_node.objid, emane_model.name, values) - - class CoreServerTest(object): - def __init__(self): - address = ("localhost", CORE_API_PORT) - self.server = CoreServer(address, CoreRequestHandler, { + def __init__(self, port=CORE_API_PORT): + self.host = "localhost" + self.port = port + address = (self.host, self.port) + self.server = CoreServer(address, CoreHandler, { "numthreads": 1, "daemonize": False, }) @@ -246,29 +148,29 @@ class CoreServerTest(object): self.session = None self.request_handler = None - def setup(self, distributed_address): + def setup(self, distributed_address, port): # validate address assert distributed_address, "distributed server address was not provided" # create session - self.session = self.server.create_session(1) + self.session = self.server.coreemu.create_session(1) self.session.master = True # create request handler request_mock = MagicMock() request_mock.fileno = MagicMock(return_value=1) - self.request_handler = CoreRequestHandler(request_mock, "", self.server) + self.request_handler = CoreHandler(request_mock, "", self.server) self.request_handler.session = self.session self.request_handler.add_session_handlers() self.session.broker.session_clients.append(self.request_handler) # have broker handle a configuration state change - self.session.set_state(state=EventTypes.DEFINITION_STATE.value) + self.session.set_state(EventTypes.DEFINITION_STATE) message = state_message(EventTypes.CONFIGURATION_STATE) self.request_handler.handle_message(message) # add broker server for distributed core - distributed = "%s:%s:%s" % (self.distributed_server, distributed_address, CORE_API_PORT) + distributed = "%s:%s:%s" % (self.distributed_server, distributed_address, port) message = CoreConfMessage.create(0, [ (ConfigTlvs.OBJECT, "broker"), (ConfigTlvs.TYPE, 0), @@ -301,38 +203,24 @@ class CoreServerTest(object): self.server.server_close() -@pytest.fixture() +@pytest.fixture def session(): - # load default services - services.load() - - # create and return session - session_fixture = Session(1, persistent=True) - session_fixture.master = True + # use coreemu and create a session + coreemu = CoreEmu() + session_fixture = coreemu.create_session() + session_fixture.set_state(EventTypes.CONFIGURATION_STATE) assert os.path.exists(session_fixture.session_dir) - # set location - # session_fixture.master = True - session_fixture.location.setrefgeo(47.57917, -122.13232, 2.00000) - session_fixture.location.refscale = 150.0 - - # return session fixture + # return created session yield session_fixture - # cleanup - print "shutting down session" - session_fixture.shutdown() - assert not os.path.exists(session_fixture.session_dir) + # shutdown coreemu + coreemu.shutdown() @pytest.fixture(scope="module") -def ip_prefix(): - return ipaddress.Ipv4Prefix("10.83.0.0/16") - - -@pytest.fixture() -def core(session, ip_prefix): - return Core(session, ip_prefix) +def ip_prefixes(): + return IpPrefixes(ip4_prefix="10.83.0.0/16") @pytest.fixture() @@ -348,6 +236,11 @@ def cored(): server.shutdown() +def ping(from_node, to_node, ip_prefixes, count=3): + address = ip_prefixes.ip4_address(to_node) + return from_node.cmd(["ping", "-c", str(count), address]) + + def pytest_addoption(parser): parser.addoption("--distributed", help="distributed server address") diff --git a/daemon/tests/test_core.py b/daemon/tests/test_core.py index 2340f2fb..6a0f50d5 100644 --- a/daemon/tests/test_core.py +++ b/daemon/tests/test_core.py @@ -11,22 +11,22 @@ from xml.etree import ElementTree import pytest from mock import MagicMock -from conftest import EMANE_SERVICES from core.data import ConfigData -from core.enumerations import MessageFlags +from core.enumerations import MessageFlags, NodeTypes +from core.future.futuredata import NodeOptions from core.mobility import BasicRangeModel -from core.netns import nodes -from core.netns import vnodeclient from core.netns.vnodeclient import VnodeClient -from core.phys.pnodes import PhysicalNode from core.service import ServiceManager -from core.xml import xmlsession _PATH = os.path.abspath(os.path.dirname(__file__)) _SERVICES_PATH = os.path.join(_PATH, "myservices") _MOBILITY_FILE = os.path.join(_PATH, "mobility.scen") _XML_VERSIONS = ["0.0", "1.0"] -_NODE_CLASSES = [nodes.PtpNet, nodes.HubNode, nodes.SwitchNode] +_WIRED = [ + NodeTypes.PEER_TO_PEER, + NodeTypes.HUB, + NodeTypes.SWITCH +] def createclients(sessiondir, clientcls=VnodeClient, cmdchnlfilterfunc=None): @@ -47,8 +47,13 @@ def createclients(sessiondir, clientcls=VnodeClient, cmdchnlfilterfunc=None): return map(lambda x: clientcls(os.path.basename(x), x), cmdchnls) +def ping(from_node, to_node, ip_prefixes): + address = ip_prefixes.ip4_address(to_node) + return from_node.cmd(["ping", "-c", "3", address]) + + class TestCore: - def test_import_service(self, core): + def test_import_service(self): """ Test importing a custom service. @@ -58,118 +63,115 @@ class TestCore: assert ServiceManager.get("MyService") assert ServiceManager.get("MyService2") - @pytest.mark.parametrize("cls", _NODE_CLASSES) - def test_nodes(self, core, cls): + @pytest.mark.parametrize("net_type", _WIRED) + def test_wired_ping(self, session, net_type, ip_prefixes): """ Test ptp node network. - :param conftest.Core core: core fixture to test with - :param cls: node classes that work within a simple network + :param session: session for test + :param core.enumerations.NodeTypes net_type: type of net node to create + :param ip_prefixes: generates ip addresses for nodes """ - # create ptp - network_node = core.session.add_object(cls=cls) + # create net node + net_node = session.add_node(_type=net_type) # create nodes - core.create_node("n1") - core.create_node("n2") + node_one = session.add_node() + node_two = session.add_node() - # add interfaces - core.add_interface(network_node, "n1") - core.add_interface(network_node, "n2") + # link nodes to net node + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, net_node.objid, interface_one=interface) # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # ping n2 from n1 and assert success - status = core.ping("n1", "n2") + status = ping(node_one, node_two, ip_prefixes) assert not status @pytest.mark.parametrize("version", _XML_VERSIONS) - def test_xml(self, core, tmpdir, version): + def test_xml(self, session, tmpdir, version, ip_prefixes): """ Test xml client methods. - :param conftest.Core core: core fixture to test with + :param session: session for test + :param tmpdir: tmpdir to create data in :param str version: xml version to write and parse + :param ip_prefixes: generates ip addresses for nodes """ # create ptp - ptp_node = core.session.add_object(cls=nodes.PtpNet) + ptp_node = session.add_node(_type=NodeTypes.PEER_TO_PEER) # create nodes - core.create_node("n1") - core.create_node("n2") + node_one = session.add_node() + node_two = session.add_node() - # add interfaces - core.add_interface(ptp_node, "n1") - core.add_interface(ptp_node, "n2") + # link nodes to ptp net + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, ptp_node.objid, interface_one=interface) # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # get ids for nodes - n1_id = core.get_node("n1").objid - n2_id = core.get_node("n2").objid + n1_id = node_one.objid + n2_id = node_two.objid # save xml xml_file = tmpdir.join("session.xml") file_path = xml_file.strpath - xmlsession.save_session_xml(core.session, file_path, version) + session.save_xml(file_path, version) # verify xml file was created and can be parsed assert xml_file.isfile() assert ElementTree.parse(file_path) # stop current session, clearing data - core.session.shutdown() + session.shutdown() # verify nodes have been removed from session with pytest.raises(KeyError): - assert not core.session.get_object_by_name(n1_id) + assert not session.get_object(n1_id) with pytest.raises(KeyError): - assert not core.session.get_object(n2_id) + assert not session.get_object(n2_id) # load saved xml - xmlsession.open_session_xml(core.session, file_path, start=True) + session.open_xml(file_path, start=True) # verify nodes have been recreated - assert core.session.get_object(n1_id) - assert core.session.get_object(n2_id) + assert session.get_object(n1_id) + assert session.get_object(n2_id) - def test_vnode_client(self, core): + def test_vnode_client(self, session, ip_prefixes): """ Test vnode client methods. - :param conftest.Core core: core fixture to test with + :param session: session for test + :param ip_prefixes: generates ip addresses for nodes """ # create ptp - ptp_node = core.session.add_object(cls=nodes.PtpNet) + ptp_node = session.add_node(_type=NodeTypes.PEER_TO_PEER) # create nodes - core.create_node("n1") - core.create_node("n2") + node_one = session.add_node() + node_two = session.add_node() - # add interfaces - core.add_interface(ptp_node, "n1") - core.add_interface(ptp_node, "n2") + # link nodes to ptp net + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, ptp_node.objid, interface_one=interface) # get node client for testing - n1 = core.get_node("n1") - client = n1.client + client = node_one.client # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # check we are connected assert client.connected() @@ -195,183 +197,154 @@ class TestCore: assert not client.shcmd(command[0]) # check module methods - assert createclients(core.session.session_dir) + assert createclients(session.session_dir) # check convenience methods for interface information assert client.getaddr("eth0") assert client.netifstats() - def test_netif(self, core): + def test_netif(self, session, ip_prefixes): """ Test netif methods. - :param conftest.Core core: core fixture to test with + :param session: session for test + :param ip_prefixes: generates ip addresses for nodes """ # create ptp - ptp_node = core.session.add_object(cls=nodes.PtpNet) + ptp_node = session.add_node(_type=NodeTypes.PEER_TO_PEER) # create nodes - core.create_node("n1") - core.create_node("n2") + node_one = session.add_node() + node_two = session.add_node() - # add interfaces - n1_interface = core.add_interface(ptp_node, "n1") - n2_interface = core.add_interface(ptp_node, "n2") - - # get nodes - n1 = core.get_node("n1") - n2 = core.get_node("n2") + # link nodes to ptp net + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, ptp_node.objid, interface_one=interface) # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # check link data gets generated assert ptp_node.all_link_data(MessageFlags.ADD.value) # check common nets exist between linked nodes - assert n1.commonnets(n2) - assert n2.commonnets(n1) + assert node_one.commonnets(node_two) + assert node_two.commonnets(node_one) # check we can retrieve netif index - assert n1.getifindex(n1_interface) == 0 - assert n2.getifindex(n2_interface) == 0 + assert node_one.getifindex(0) + assert node_two.getifindex(0) # check interface parameters - n1_interface.setparam("test", 1) - assert n1_interface.getparam("test") == 1 - assert n1_interface.getparams() + interface = node_one.netif(0) + interface.setparam("test", 1) + assert interface.getparam("test") == 1 + assert interface.getparams() # delete netif and test that if no longer exists - n1.delnetif(0) - assert not n1.netif(0) + node_one.delnetif(0) + assert not node_one.netif(0) - def test_physical(self, core): - """ - Test physical node network. - - :param conftest.Core core: core fixture to test with - """ - - # create switch node - switch_node = core.session.add_object(cls=nodes.SwitchNode) - - # create a physical node - core.create_node(cls=PhysicalNode, name="p1") - - # mock method that will not work - physical_node = core.get_node("p1") - physical_node.newnetif = MagicMock(return_value=0) - - # create regular node - core.create_node("n1") - - # add interface - core.add_interface(switch_node, "n1") - core.add_interface(switch_node, "p1") - - # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() - - def test_wlan_basic_range_good(self, core): + def test_wlan_good(self, session, ip_prefixes): """ Test basic wlan network. - :param conftest.Core core: core fixture to test with + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes """ # create wlan - wlan_node = core.session.add_object(cls=nodes.WlanNode) - values = BasicRangeModel.getdefaultvalues() - wlan_node.setmodel(BasicRangeModel, values) + wlan_node = session.add_node(_type=NodeTypes.WIRELESS_LAN) + session.set_wireless_model(wlan_node, BasicRangeModel) # create nodes - core.create_node("n1", position=(0, 0), services=EMANE_SERVICES, model="mdr") - core.create_node("n2", position=(0, 0), services=EMANE_SERVICES, model="mdr") + node_options = NodeOptions() + node_options.set_position(0, 0) + node_one = session.create_wireless_node(node_options=node_options) + node_two = session.create_wireless_node(node_options=node_options) - # add interfaces - interface_one = core.add_interface(wlan_node, "n1") - interface_two = core.add_interface(wlan_node, "n2") + # link nodes + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, wlan_node.objid, interface_one=interface) # link nodes in wlan - core.link(wlan_node, interface_one, interface_two) + session.wireless_link_all(wlan_node, [node_one, node_two]) # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # ping n2 from n1 and assert success - status = core.ping("n1", "n2") + status = ping(node_one, node_two, ip_prefixes) assert not status - def test_wlan_basic_range_bad(self, core): + def test_wlan_bad(self, session, ip_prefixes): """ Test basic wlan network with leveraging basic range model. - :param conftest.Core core: core fixture to test with + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes """ # create wlan - wlan_node = core.session.add_object(cls=nodes.WlanNode) - values = BasicRangeModel.getdefaultvalues() - wlan_node.setmodel(BasicRangeModel, values) + wlan_node = session.add_node(_type=NodeTypes.WIRELESS_LAN) + session.set_wireless_model(wlan_node, BasicRangeModel) # create nodes - core.create_node("n1", position=(0, 0), services=EMANE_SERVICES, model="mdr") - core.create_node("n2", position=(0, 0), services=EMANE_SERVICES, model="mdr") + node_options = NodeOptions() + node_options.set_position(0, 0) + node_one = session.create_wireless_node(node_options=node_options) + node_two = session.create_wireless_node(node_options=node_options) - # add interfaces - interface_one = core.add_interface(wlan_node, "n1") - interface_two = core.add_interface(wlan_node, "n2") + # link nodes + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, wlan_node.objid, interface_one=interface) # link nodes in wlan - core.link(wlan_node, interface_one, interface_two) - - # move nodes out of range, default range check is 275 - core.get_node("n1").setposition(0, 0) - core.get_node("n2").setposition(500, 500) + session.wireless_link_all(wlan_node, [node_one, node_two]) # instantiate session - core.session.instantiate() + session.instantiate() - # assert node directories created - core.assert_nodes() + # move node two out of range, default range check is 275 + time.sleep(5) + update_options = NodeOptions() + update_options.set_position(500, 500) + session.update_node(node_two.objid, update_options) - # ping n2 from n1 and assert failure ) - time.sleep(3) - status = core.ping("n1", "n2") + # ping n2 from n1 and assert failure + time.sleep(5) + status = ping(node_one, node_two, ip_prefixes) assert status - def test_mobility(self, core): + def test_mobility(self, session, ip_prefixes): """ Test basic wlan network. - :param conftest.Core core: core fixture to test with + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes """ # create wlan - wlan_node = core.session.add_object(cls=nodes.WlanNode) - values = BasicRangeModel.getdefaultvalues() - wlan_node.setmodel(BasicRangeModel, values) + wlan_node = session.add_node(_type=NodeTypes.WIRELESS_LAN) + session.set_wireless_model(wlan_node, BasicRangeModel) # create nodes - core.create_node("n1", objid=1, position=(0, 0), services=EMANE_SERVICES, model="mdr") - core.create_node("n2", objid=2, position=(0, 0), services=EMANE_SERVICES, model="mdr") + node_options = NodeOptions() + node_options.set_position(0, 0) + node_one = session.create_wireless_node(node_options=node_options) + node_two = session.create_wireless_node(node_options=node_options) - # add interfaces - interface_one = core.add_interface(wlan_node, "n1") - interface_two = core.add_interface(wlan_node, "n2") + # link nodes + for node in [node_one, node_two]: + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, wlan_node.objid, interface_one=interface) # link nodes in wlan - core.link(wlan_node, interface_one, interface_two) + session.wireless_link_all(wlan_node, [node_one, node_two]) # configure mobility script for session config = ConfigData( @@ -382,7 +355,7 @@ class TestCore: data_values="file=%s|refresh_ms=50|loop=1|autostart=0.0|" "map=|script_start=|script_pause=|script_stop=" % _MOBILITY_FILE ) - core.session.config_object(config) + session.config_object(config) # add handler for receiving node updates event = threading.Event() @@ -390,138 +363,10 @@ class TestCore: def node_update(_): event.set() - core.session.node_handlers.append(node_update) + session.node_handlers.append(node_update) # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # validate we receive a node message for updating its location assert event.wait(5) - - def test_link_bandwidth(self, core): - """ - Test ptp node network with modifying link bandwidth. - - :param conftest.Core core: core fixture to test with - """ - - # create link network - ptp_node, interface_one, interface_two = core.create_link_network() - - # output csv index - bandwidth_index = 8 - - # run iperf, validate normal bandwidth - stdout = core.iperf("n1", "n2") - assert stdout - value = int(stdout.split(',')[bandwidth_index]) - assert 900000 <= value <= 1100000 - - # change bandwidth in bits per second - bandwidth = 500000 - core.configure_link(ptp_node, interface_one, interface_two, { - "bw": bandwidth - }) - - # run iperf again - stdout = core.iperf("n1", "n2") - assert stdout - value = int(stdout.split(',')[bandwidth_index]) - assert 400000 <= value <= 600000 - - def test_link_loss(self, core): - """ - Test ptp node network with modifying link packet loss. - - :param conftest.Core core: core fixture to test with - """ - - # create link network - ptp_node, interface_one, interface_two = core.create_link_network() - - # output csv index - loss_index = -2 - - # run iperf, validate normal bandwidth - stdout = core.iperf("n1", "n2") - assert stdout - value = float(stdout.split(',')[loss_index]) - assert 0 <= value <= 0.5 - - # change bandwidth in bits per second - loss = 50 - core.configure_link(ptp_node, interface_one, interface_two, { - "loss": loss - }) - - # run iperf again - stdout = core.iperf("n1", "n2") - assert stdout - value = float(stdout.split(',')[loss_index]) - assert 40 <= value <= 60 - - def test_link_delay(self, core): - """ - Test ptp node network with modifying link packet delay. - - :param conftest.Core core: core fixture to test with - """ - - # create link network - ptp_node, interface_one, interface_two = core.create_link_network() - - # run ping for delay information - stdout = core.ping_output("n1", "n2") - assert stdout - rtt_line = stdout.split("\n")[-1] - rtt_values = rtt_line.split("=")[1].split("ms")[0].strip() - rtt_avg = float(rtt_values.split("/")[2]) - assert 0 <= rtt_avg <= 0.2 - - # change delay in microseconds - delay = 1000000 - core.configure_link(ptp_node, interface_one, interface_two, { - "delay": delay - }) - - # run ping for delay information again - stdout = core.ping_output("n1", "n2") - assert stdout - rtt_line = stdout.split("\n")[-1] - rtt_values = rtt_line.split("=")[1].split("ms")[0].strip() - rtt_avg = float(rtt_values.split("/")[2]) - assert 1800 <= rtt_avg <= 2200 - - def test_link_jitter(self, core): - """ - Test ptp node network with modifying link packet jitter. - - :param conftest.Core core: core fixture to test with - """ - - # create link network - ptp_node, interface_one, interface_two = core.create_link_network() - - # output csv index - jitter_index = 9 - - # run iperf - stdout = core.iperf("n1", "n2") - assert stdout - value = float(stdout.split(",")[jitter_index]) - assert -0.5 <= value <= 0.05 - - # change jitter in microseconds - jitter = 1000000 - core.configure_link(ptp_node, interface_one, interface_two, { - "jitter": jitter - }) - - # run iperf again - stdout = core.iperf("n1", "n2") - assert stdout - value = float(stdout.split(",")[jitter_index]) - assert 200 <= value <= 500 diff --git a/daemon/tests/test_emane.py b/daemon/tests/test_emane.py index 6c7ade45..c6644cf9 100644 --- a/daemon/tests/test_emane.py +++ b/daemon/tests/test_emane.py @@ -4,16 +4,13 @@ Unit tests for testing CORE EMANE networks. import pytest -from conftest import EMANE_SERVICES - -from core.data import ConfigData +from conftest import ping from core.emane.bypass import EmaneBypassModel from core.emane.commeffect import EmaneCommEffectModel from core.emane.ieee80211abg import EmaneIeee80211abgModel -from core.emane.nodes import EmaneNode from core.emane.rfpipe import EmaneRfPipeModel from core.emane.tdma import EmaneTdmaModel - +from core.future.futuredata import NodeOptions _EMANE_MODELS = [ EmaneIeee80211abgModel, @@ -26,36 +23,37 @@ _EMANE_MODELS = [ class TestEmane: @pytest.mark.parametrize("model", _EMANE_MODELS) - def test_models(self, core, model): + def test_models(self, session, model, ip_prefixes): """ Test emane models within a basic network. - :param conftest.Core core: core fixture to test with + :param core.future.coreemu.FutureSession session: session for test :param model: emane model to test - :param func setup: setup function to configure emane node + :param ip_prefixes: generates ip addresses for nodes """ # create emane node for networking the core nodes - emane_node = core.session.add_object(name="emane", cls=EmaneNode) - emane_node.setposition(x=80, y=50) - - # set the emane model - core.set_emane_model(emane_node, model) + emane_network = session.create_emane_network( + model, + geo_reference=(47.57917, -122.13232, 2.00000) + ) + emane_network.setposition(x=80, y=50) # create nodes - core.create_node("n1", objid=1, position=(150, 150), services=EMANE_SERVICES, model="mdr") - core.create_node("n2", objid=2, position=(300, 150), services=EMANE_SERVICES, model="mdr") + node_options = NodeOptions() + node_options.set_position(150, 150) + node_one = session.create_wireless_node(node_options=node_options) + node_options.set_position(300, 150) + node_two = session.create_wireless_node(node_options=node_options) - # add interfaces to nodes - core.add_interface(emane_node, "n1") - core.add_interface(emane_node, "n2") + for i, node in enumerate([node_one, node_two]): + node.setposition(x=150 * (i + 1), y=150) + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, emane_network.objid, interface_one=interface) # instantiate session - core.session.instantiate() - - # assert node directories created - core.assert_nodes() + session.instantiate() # ping n2 from n1 and assert success - status = core.ping("n1", "n2") + status = ping(node_one, node_two, ip_prefixes, count=5) assert not status diff --git a/daemon/tests/test_future.py b/daemon/tests/test_future.py deleted file mode 100644 index eae42799..00000000 --- a/daemon/tests/test_future.py +++ /dev/null @@ -1,201 +0,0 @@ -import os -import time - -import pytest - -from core.enumerations import NodeTypes, EventTypes -from core.future.coreemu import CoreEmu -from core.future.futuredata import IpPrefixes, NodeOptions, LinkOptions -from core.misc import utils - - -@pytest.fixture -def future_session(): - # use coreemu and create a session - coreemu = CoreEmu() - session = coreemu.create_session() - session.set_state(EventTypes.CONFIGURATION_STATE.value) - - # return created session - yield session - - # shutdown coreemu - coreemu.shutdown() - - -IP4_PREFIX = "10.83.0.0/16" - -MODELS = [ - "router", - "host", - "PC", - "mdr", -] - -NET_TYPES = [ - NodeTypes.SWITCH, - NodeTypes.HUB, - NodeTypes.WIRELESS_LAN -] - - -class TestFuture: - @pytest.mark.parametrize("model", MODELS) - def test_node_add(self, future_session, model): - # given - node_options = NodeOptions(model=model) - - # when - node = future_session.add_node(node_options=node_options) - - # give time for node services to boot - time.sleep(1) - - # then - assert node - assert os.path.exists(node.nodedir) - assert node.alive() - assert node.up - assert node.check_cmd(["ip", "addr", "show", "lo"]) - node.validate() - - def test_node_update(self, future_session): - # given - node = future_session.add_node() - position_value = 100 - update_options = NodeOptions() - update_options.set_position(x=position_value, y=position_value) - - # when - future_session.update_node(node.objid, update_options) - - # then - assert node.position.x == position_value - assert node.position.y == position_value - - def test_node_delete(self, future_session): - # given - node = future_session.add_node() - - # when - future_session.delete_node(node.objid) - - # then - with pytest.raises(KeyError): - future_session.get_object(node.objid) - - @pytest.mark.parametrize("net_type", NET_TYPES) - def test_net(self, future_session, net_type): - # given - - # when - node = future_session.add_node(_type=net_type) - - # then - assert node - assert node.up - assert utils.check_cmd(["brctl", "show", node.brname]) - - def test_ptp(self, future_session): - # given - prefixes = IpPrefixes(ip4_prefix=IP4_PREFIX) - node_one = future_session.add_node() - node_two = future_session.add_node() - interface_one = prefixes.create_interface(node_one) - inteface_two = prefixes.create_interface(node_two) - - # when - future_session.add_link(node_one.objid, node_two.objid, interface_one, inteface_two) - - # then - assert node_one.netif(interface_one.id) - assert node_two.netif(inteface_two.id) - - def test_node_to_net(self, future_session): - # given - prefixes = IpPrefixes(ip4_prefix=IP4_PREFIX) - node_one = future_session.add_node() - node_two = future_session.add_node(_type=NodeTypes.SWITCH) - interface_one = prefixes.create_interface(node_one) - - # when - future_session.add_link(node_one.objid, node_two.objid, interface_one) - - # then - assert node_two.all_link_data(0) - assert node_one.netif(interface_one.id) - - def test_net_to_node(self, future_session): - # given - prefixes = IpPrefixes(ip4_prefix=IP4_PREFIX) - node_one = future_session.add_node(_type=NodeTypes.SWITCH) - node_two = future_session.add_node() - interface_two = prefixes.create_interface(node_two) - - # when - future_session.add_link(node_one.objid, node_two.objid, interface_two=interface_two) - - # then - assert node_one.all_link_data(0) - assert node_two.netif(interface_two.id) - - def test_net_to_net(self, future_session): - # given - node_one = future_session.add_node(_type=NodeTypes.SWITCH) - node_two = future_session.add_node(_type=NodeTypes.SWITCH) - - # when - future_session.add_link(node_one.objid, node_two.objid) - - # then - assert node_one.all_link_data(0) - - def test_link_update(self, future_session): - # given - prefixes = IpPrefixes(ip4_prefix=IP4_PREFIX) - node_one = future_session.add_node() - node_two = future_session.add_node(_type=NodeTypes.SWITCH) - interface_one = prefixes.create_interface(node_one) - future_session.add_link(node_one.objid, node_two.objid, interface_one) - interface = node_one.netif(interface_one.id) - output = utils.check_cmd(["tc", "qdisc", "show", "dev", interface.localname]) - assert "delay" not in output - assert "rate" not in output - assert "loss" not in output - assert "duplicate" not in output - - # when - link_options = LinkOptions() - link_options.delay = 50 - link_options.bandwidth = 5000000 - link_options.per = 25 - link_options.dup = 25 - future_session.update_link(node_one.objid, node_two.objid, - interface_one_id=interface_one.id, link_options=link_options) - - # then - output = utils.check_cmd(["tc", "qdisc", "show", "dev", interface.localname]) - assert "delay" in output - assert "rate" in output - assert "loss" in output - assert "duplicate" in output - - def test_link_delete(self, future_session): - # given - prefixes = IpPrefixes(ip4_prefix=IP4_PREFIX) - node_one = future_session.add_node() - node_two = future_session.add_node() - interface_one = prefixes.create_interface(node_one) - interface_two = prefixes.create_interface(node_two) - future_session.add_link(node_one.objid, node_two.objid, interface_one, interface_two) - assert node_one.netif(interface_one.id) - assert node_two.netif(interface_two.id) - assert future_session.get_node_count() == 3 - - # when - future_session.delete_link(node_one.objid, node_two.objid, interface_one.id, interface_two.id) - - # then - assert not node_one.netif(interface_one.id) - assert not node_two.netif(interface_two.id) - assert future_session.get_node_count() == 2 diff --git a/daemon/tests/test_gui.py b/daemon/tests/test_gui.py index 5fa0f7ed..77c8b9fe 100644 --- a/daemon/tests/test_gui.py +++ b/daemon/tests/test_gui.py @@ -128,7 +128,7 @@ class TestGui: core.session.broker.dorecvloop = False # have broker handle a configuration state change - core.session.set_state(EventTypes.CONFIGURATION_STATE.value) + core.session.set_state(EventTypes.CONFIGURATION_STATE) event_message = state_message(EventTypes.CONFIGURATION_STATE) core.session.broker.handlerawmsg(event_message) diff --git a/daemon/tests/test_links.py b/daemon/tests/test_links.py new file mode 100644 index 00000000..4b5085e4 --- /dev/null +++ b/daemon/tests/test_links.py @@ -0,0 +1,260 @@ +from core.enumerations import NodeTypes +from core.future.futuredata import LinkOptions +from core.misc import utils + + +def create_ptp_network(session, ip_prefixes): + # create nodes + node_one = session.add_node() + node_two = session.add_node() + + # link nodes to net node + interface_one = ip_prefixes.create_interface(node_one) + interface_two = ip_prefixes.create_interface(node_two) + session.add_link(node_one.objid, node_two.objid, interface_one, interface_two) + + # instantiate session + session.instantiate() + + return node_one, node_two + + +def ping_output(from_node, to_node, ip_prefixes): + address = ip_prefixes.ip4_address(to_node) + output = from_node.check_cmd(["ping", "-i", "0.05", "-c", "3", address]) + return output + + +def iperf(from_node, to_node, ip_prefixes): + # run iperf server, run client, kill iperf server + address = ip_prefixes.ip4_address(to_node) + vcmd, stdin, stdout, stderr = to_node.client.popen(["iperf", "-s", "-u", "-y", "C"]) + from_node.cmd(["iperf", "-u", "-t", "5", "-c", address]) + to_node.cmd(["killall", "-9", "iperf"]) + return stdout.read().strip() + + +class TestLinks: + def test_ptp(self, session, ip_prefixes): + # given + node_one = session.add_node() + node_two = session.add_node() + interface_one = ip_prefixes.create_interface(node_one) + inteface_two = ip_prefixes.create_interface(node_two) + + # when + session.add_link(node_one.objid, node_two.objid, interface_one, inteface_two) + + # then + assert node_one.netif(interface_one.id) + assert node_two.netif(inteface_two.id) + + def test_node_to_net(self, session, ip_prefixes): + # given + node_one = session.add_node() + node_two = session.add_node(_type=NodeTypes.SWITCH) + interface_one = ip_prefixes.create_interface(node_one) + + # when + session.add_link(node_one.objid, node_two.objid, interface_one) + + # then + assert node_two.all_link_data(0) + assert node_one.netif(interface_one.id) + + def test_net_to_node(self, session, ip_prefixes): + # given + node_one = session.add_node(_type=NodeTypes.SWITCH) + node_two = session.add_node() + interface_two = ip_prefixes.create_interface(node_two) + + # when + session.add_link(node_one.objid, node_two.objid, interface_two=interface_two) + + # then + assert node_one.all_link_data(0) + assert node_two.netif(interface_two.id) + + def test_net_to_net(self, session): + # given + node_one = session.add_node(_type=NodeTypes.SWITCH) + node_two = session.add_node(_type=NodeTypes.SWITCH) + + # when + session.add_link(node_one.objid, node_two.objid) + + # then + assert node_one.all_link_data(0) + + def test_link_update(self, session, ip_prefixes): + # given + node_one = session.add_node() + node_two = session.add_node(_type=NodeTypes.SWITCH) + interface_one = ip_prefixes.create_interface(node_one) + session.add_link(node_one.objid, node_two.objid, interface_one) + interface = node_one.netif(interface_one.id) + output = utils.check_cmd(["tc", "qdisc", "show", "dev", interface.localname]) + assert "delay" not in output + assert "rate" not in output + assert "loss" not in output + assert "duplicate" not in output + + # when + link_options = LinkOptions() + link_options.delay = 50 + link_options.bandwidth = 5000000 + link_options.per = 25 + link_options.dup = 25 + session.update_link(node_one.objid, node_two.objid, + interface_one_id=interface_one.id, link_options=link_options) + + # then + output = utils.check_cmd(["tc", "qdisc", "show", "dev", interface.localname]) + assert "delay" in output + assert "rate" in output + assert "loss" in output + assert "duplicate" in output + + def test_link_delete(self, session, ip_prefixes): + # given + node_one = session.add_node() + node_two = session.add_node() + interface_one = ip_prefixes.create_interface(node_one) + interface_two = ip_prefixes.create_interface(node_two) + session.add_link(node_one.objid, node_two.objid, interface_one, interface_two) + assert node_one.netif(interface_one.id) + assert node_two.netif(interface_two.id) + assert session.get_node_count() == 3 + + # when + session.delete_link(node_one.objid, node_two.objid, interface_one.id, interface_two.id) + + # then + assert not node_one.netif(interface_one.id) + assert not node_two.netif(interface_two.id) + assert session.get_node_count() == 2 + + def test_link_bandwidth(self, session, ip_prefixes): + """ + Test ptp node network with modifying link bandwidth. + + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes + """ + + # create link network + node_one, node_two = create_ptp_network(session, ip_prefixes) + + # output csv index + bandwidth_index = 8 + + # run iperf, validate normal bandwidth + stdout = iperf(node_one, node_two, ip_prefixes) + assert stdout + value = int(stdout.split(',')[bandwidth_index]) + assert 900000 <= value <= 1100000 + + # change bandwidth in bits per second + link_options = LinkOptions() + link_options.bandwidth = 500000 + session.update_link(node_one.objid, node_two.objid, link_options=link_options) + + # run iperf again + stdout = iperf(node_one, node_two, ip_prefixes) + assert stdout + value = int(stdout.split(',')[bandwidth_index]) + assert 400000 <= value <= 600000 + + def test_link_loss(self, session, ip_prefixes): + """ + Test ptp node network with modifying link packet loss. + + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes + """ + + # create link network + node_one, node_two = create_ptp_network(session, ip_prefixes) + + # output csv index + loss_index = -2 + + # run iperf, validate normal bandwidth + stdout = iperf(node_one, node_two, ip_prefixes) + assert stdout + value = float(stdout.split(',')[loss_index]) + assert 0 <= value <= 0.5 + + # change bandwidth in bits per second + link_options = LinkOptions() + link_options.per = 50 + session.update_link(node_one.objid, node_two.objid, link_options=link_options) + + # run iperf again + stdout = iperf(node_one, node_two, ip_prefixes) + assert stdout + value = float(stdout.split(',')[loss_index]) + assert 40 <= value <= 60 + + def test_link_delay(self, session, ip_prefixes): + """ + Test ptp node network with modifying link packet delay. + + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes + """ + + # create link network + node_one, node_two = create_ptp_network(session, ip_prefixes) + + # run ping for delay information + stdout = ping_output(node_one, node_two, ip_prefixes) + assert stdout + rtt_line = stdout.split("\n")[-1] + rtt_values = rtt_line.split("=")[1].split("ms")[0].strip() + rtt_avg = float(rtt_values.split("/")[2]) + assert 0 <= rtt_avg <= 0.2 + + # change delay in microseconds + link_options = LinkOptions() + link_options.delay = 1000000 + session.update_link(node_one.objid, node_two.objid, link_options=link_options) + + # run ping for delay information again + stdout = ping_output(node_one, node_two, ip_prefixes) + assert stdout + rtt_line = stdout.split("\n")[-1] + rtt_values = rtt_line.split("=")[1].split("ms")[0].strip() + rtt_avg = float(rtt_values.split("/")[2]) + assert 1800 <= rtt_avg <= 2200 + + def test_link_jitter(self, session, ip_prefixes): + """ + Test ptp node network with modifying link packet jitter. + + :param core.future.coreemu.FutureSession session: session for test + :param ip_prefixes: generates ip addresses for nodes + """ + + # create link network + node_one, node_two = create_ptp_network(session, ip_prefixes) + + # output csv index + jitter_index = 9 + + # run iperf + stdout = iperf(node_one, node_two, ip_prefixes) + assert stdout + value = float(stdout.split(",")[jitter_index]) + assert -0.5 <= value <= 0.05 + + # change jitter in microseconds + link_options = LinkOptions() + link_options.jitter = 1000000 + session.update_link(node_one.objid, node_two.objid, link_options=link_options) + + # run iperf again + stdout = iperf(node_one, node_two, ip_prefixes) + assert stdout + value = float(stdout.split(",")[jitter_index]) + assert 200 <= value <= 500 diff --git a/daemon/tests/test_nodes.py b/daemon/tests/test_nodes.py new file mode 100644 index 00000000..94a757a5 --- /dev/null +++ b/daemon/tests/test_nodes.py @@ -0,0 +1,79 @@ +import os +import time + +import pytest + +from core.enumerations import NodeTypes +from core.future.futuredata import NodeOptions +from core.misc import utils + +MODELS = [ + "router", + "host", + "PC", + "mdr", +] + +NET_TYPES = [ + NodeTypes.SWITCH, + NodeTypes.HUB, + NodeTypes.WIRELESS_LAN +] + + +class TestNodes: + @pytest.mark.parametrize("model", MODELS) + def test_node_add(self, session, model): + # given + node_options = NodeOptions(model=model) + + # when + node = session.add_node(node_options=node_options) + + # give time for node services to boot + time.sleep(1) + + # then + assert node + assert os.path.exists(node.nodedir) + assert node.alive() + assert node.up + assert node.check_cmd(["ip", "addr", "show", "lo"]) + node.validate() + + def test_node_update(self, session): + # given + node = session.add_node() + position_value = 100 + update_options = NodeOptions() + update_options.set_position(x=position_value, y=position_value) + + # when + session.update_node(node.objid, update_options) + + # then + assert node.position.x == position_value + assert node.position.y == position_value + + def test_node_delete(self, session): + # given + node = session.add_node() + + # when + session.delete_node(node.objid) + + # then + with pytest.raises(KeyError): + session.get_object(node.objid) + + @pytest.mark.parametrize("net_type", NET_TYPES) + def test_net(self, session, net_type): + # given + + # when + node = session.add_node(_type=net_type) + + # then + assert node + assert node.up + assert utils.check_cmd(["brctl", "show", node.brname]) diff --git a/ns3/corens3/obj.py b/ns3/corens3/obj.py index e076e8f4..229288d2 100644 --- a/ns3/corens3/obj.py +++ b/ns3/corens3/obj.py @@ -386,7 +386,7 @@ class Ns3Session(Session): ns.core.Simulator.Run() # self.evq.run() # event queue may have WayPointMobility events - self.set_state(EventTypes.RUNTIME_STATE.value, send_event=True) + self.set_state(EventTypes.RUNTIME_STATE, send_event=True) t = threading.Thread(target=runthread) t.daemon = True t.start() @@ -454,7 +454,7 @@ class Ns3Session(Session): Start a thread that updates CORE nodes based on their ns-3 positions. """ - self.set_state(EventTypes.INSTANTIATION_STATE.value) + self.set_state(EventTypes.INSTANTIATION_STATE) self.mobilitythread = threading.Thread( target=self.ns3mobilitythread, args=(refresh_ms,))