diff --git a/daemon/core/future/coreemu.py b/daemon/core/future/coreemu.py index 971399fb..9d5f122a 100644 --- a/daemon/core/future/coreemu.py +++ b/daemon/core/future/coreemu.py @@ -1,10 +1,133 @@ -# import itertools +import os -from core import services +import core.services +from core import logger +from core.coreobj import PyCoreNode, PyCoreNet +from core.data import NodeData from core.emane.nodes import EmaneNode +from core.enumerations import NodeTypes, EventTypes, LinkTypes +from core.misc import nodeutils from core.misc.ipaddress import Ipv4Prefix from core.netns.nodes import CoreNode from core.session import Session +from core.xml.xmlparser import core_document_parser +from core.xml.xmlwriter import core_document_writer + + +class InterfaceData(object): + def __init__(self, _id, name, mac, ip4, ip4_mask, ip6, ip6_mask): + self.id = _id + self.name = name + self.mac = mac + self.ip4 = ip4 + self.ip4_mask = ip4_mask + self.ip6 = ip6 + self.ip6_mask = ip6_mask + + def has_ip4(self): + return all([self.ip4, self.ip4_mask]) + + def has_ip6(self): + return all([self.ip6, self.ip6_mask]) + + def ip4_address(self): + if self.has_ip4(): + return "%s/%s" % (self.ip4, self.ip4_mask) + else: + return None + + def ip6_address(self): + if self.has_ip6(): + return "%s/%s" % (self.ip6, self.ip6_mask) + else: + return None + + def get_addresses(self): + ip4 = self.ip4_address() + ip6 = self.ip6_address() + return [i for i in [ip4, ip6] if i] + + +def get_interfaces(link_data): + interface_one = InterfaceData( + _id=link_data.interface1_id, + name=link_data.interface1_name, + mac=link_data.interface1_mac, + ip4=link_data.interface1_ip4, + ip4_mask=link_data.interface1_ip4_mask, + ip6=link_data.interface1_ip6, + ip6_mask=link_data.interface1_ip6_mask, + ) + interface_two = InterfaceData( + _id=link_data.interface2_id, + name=link_data.interface2_name, + mac=link_data.interface2_mac, + ip4=link_data.interface2_ip4, + ip4_mask=link_data.interface2_ip4_mask, + ip6=link_data.interface2_ip6, + ip6_mask=link_data.interface2_ip6_mask, + ) + return interface_one, interface_two + + +def create_interface(node, network, addresses, interface_data): + """ + Create an interface for a node on a network using provided interface data. + + :param node: node to create interface for + :param network: network to associate interface with + :param list[str] addresses: + :param InterfaceData interface_data: interface data + :return: + """ + node.newnetif( + network, + addrlist=addresses, + hwaddr=interface_data.mac, + ifindex=interface_data.id, + ifname=interface_data.name + ) + return node.netif(interface_data.id, network) + + +def link_config(network, interface, link_data, devname=None, interface_two=None): + config = { + "netif": interface, + "bw": link_data.bandwidth, + "delay": link_data.delay, + "loss": link_data.per, + "duplicate": link_data.dup, + "jitter": link_data.jitter, + "netif2": interface_two + } + + # hacky check here, because physical and emane nodes do not conform to the same linkconfig interface + if not nodeutils.is_node(network, [NodeTypes.EMANE, NodeTypes.PHYSICAL]): + config["devname"] = devname + + network.linkconfig(**config) + + +def is_net_node(node): + """ + Convenience method for testing if a legacy core node is considered a network node. + + :param object node: object to test against + :return: True if object is an instance of a network node, False otherwise + :rtype: bool + """ + return isinstance(node, PyCoreNet) + + +def is_core_node(node): + """ + Convenience method for testing if a legacy core node is considered a core node. + + :param object node: object to test against + :return: True if object is an instance of a core node, False otherwise + :rtype: bool + """ + return isinstance(node, PyCoreNode) class IdGen(object): @@ -30,7 +153,7 @@ class FutureSession(Session): self.master = True # object management - self.object_id_gen = IdGen() + self.node_id_gen = IdGen() # set default services self.services.defaultservices = { @@ -41,8 +164,511 @@ class FutureSession(Session): "host": ("DefaultRoute", "SSH"), } + def link_nodes(self, link_data): + logger.info("link message between node1(%s:%s) and node2(%s:%s)", + link_data.node1_id, link_data.interface1_id, link_data.node2_id, link_data.interface2_id) + + # values to fill + net_one = None + net_two = None + + # retrieve node one + n1_id = link_data.node1_id + n2_id = link_data.node2_id + node_one = self.get_object(n1_id) + node_two = self.get_object(n2_id) + + # both node ids are provided + tunnel = self.broker.gettunnel(n1_id, n2_id) + logger.info("tunnel between nodes: %s", tunnel) + if nodeutils.is_node(tunnel, NodeTypes.TAP_BRIDGE): + net_one = tunnel + if tunnel.remotenum == n1_id: + node_one = None + else: + node_two = None + # PhysicalNode connected via GreTap tunnel; uses adoptnetif() below + elif tunnel: + if tunnel.remotenum == n1_id: + node_one = None + else: + node_two = None + + if is_net_node(node_one): + if not net_one: + net_one = node_one + else: + net_two = node_one + node_one = None + + if is_net_node(node_two): + if not net_one: + net_one = node_two + else: + net_two = node_two + node_two = None + + logger.info("link node types n1(%s) n2(%s) net1(%s) net2(%s) tunnel(%s)", + node_one, node_two, net_one, net_two, tunnel) + return node_one, node_two, net_one, net_two, tunnel + + # TODO: this doesn't appear to ever be used, EMANE or basic wireless range + def _link_wireless(self, objects, connect): + """ + Objects to deal with when connecting/disconnecting wireless links. + + :param list objects: possible objects to deal with + :param bool connect: link interfaces if True, unlink otherwise + :return: nothing + """ + objects = [x for x in objects if x] + if len(objects) < 2: + raise ValueError("wireless link failure: %s", objects) + logger.info("handling wireless linking objects(%) connect(%s)", objects, connect) + common_networks = objects[0].commonnets(objects[1]) + for common_network, interface_one, interface_two in common_networks: + if not nodeutils.is_node(common_network, [NodeTypes.WIRELESS_LAN, NodeTypes.EMANE]): + logger.info("skipping common network that is not wireless/emane: %s", common_network) + continue + + logger.info("wireless linking connect(%s): %s - %s", connect, interface_one, interface_two) + if connect: + common_network.link(interface_one, interface_two) + else: + common_network.unlink(interface_one, interface_two) + else: + raise ValueError("no common network found for wireless link/unlink") + + def link_add(self, link_data): + # interface data + interface_one_data, interface_two_data = get_interfaces(link_data) + + # get node objects identified by link data + node_one, node_two, net_one, net_two, tunnel = self.link_nodes(link_data) + + if node_one: + node_one.lock.acquire() + if node_two: + node_two.lock.acquire() + + try: + # wireless link + if link_data.link_type == LinkTypes.WIRELESS.value: + objects = [node_one, node_two, net_one, net_two] + self._link_wireless(objects, connect=True) + # wired link + else: + # 2 nodes being linked, ptp network + if all([node_one, node_two]) and not net_one: + ptp_class = nodeutils.get_node_class(NodeTypes.PEER_TO_PEER) + start = self.state > EventTypes.DEFINITION_STATE.value + net_one = self.add_object(cls=ptp_class, start=start) + + # node to network + if node_one and net_one: + addresses = [] + addresses.extend(interface_one_data.get_addresses()) + addresses.extend(interface_two_data.get_addresses()) + interface = create_interface(node_one, net_one, addresses, interface_one_data) + link_config(net_one, interface, link_data) + + # network to node + if node_two and net_one: + addresses = [] + addresses.extend(interface_one_data.get_addresses()) + addresses.extend(interface_two_data.get_addresses()) + interface = create_interface(node_two, net_one, addresses, interface_two_data) + if not link_data.unidirectional: + link_config(net_one, interface, link_data) + + # network to network + if net_one and net_two: + if nodeutils.is_node(net_two, NodeTypes.RJ45): + interface = net_two.linknet(net_one) + else: + interface = net_one.linknet(net_two) + + link_config(net_one, interface, link_data) + + if not link_data.unidirectional: + interface.swapparams("_params_up") + link_config(net_two, interface, link_data, devname=interface.name) + interface.swapparams("_params_up") + + # a tunnel was found for the nodes + addresses = [] + if not node_one and net_one: + addresses.extend(interface_one_data.get_addresses()) + + if not node_two and net_two: + addresses.extend(interface_two_data.get_addresses()) + + # tunnel node logic + key = link_data.key + if key and nodeutils.is_node(net_one, NodeTypes.TUNNEL): + net_one.setkey(key) + if addresses: + net_one.addrconfig(addresses) + if key and nodeutils.is_node(net_two, NodeTypes.TUNNEL): + net_two.setkey(key) + if addresses: + net_two.addrconfig(addresses) + + if not net_one and not net_two and (not node_one or not node_two): + addresses = [] + if node_one and nodeutils.is_node(node_one, NodeTypes.PHYSICAL): + addresses.extend(interface_one_data.get_addresses()) + node_one.adoptnetif(tunnel, link_data.interface1_id, link_data.interface1_mac, addresses) + link_config(node_one, tunnel, link_data) + elif node_two and nodeutils.is_node(node_two, NodeTypes.PHYSICAL): + addresses.extend(interface_two_data.get_addresses()) + node_two.adoptnetif(tunnel, link_data.interface2_id, link_data.interface2_mac, addresses) + link_config(node_two, tunnel, link_data) + finally: + if node_one: + node_one.lock.release() + if node_two: + node_two.lock.release() + + def link_delete(self, link_data): + # interface data + interface_one_data, interface_two_data = get_interfaces(link_data) + + # get node objects identified by link data + node_one, node_two, net_one, net_two, tunnel = self.link_nodes(link_data) + + if node_one: + node_one.lock.acquire() + if node_two: + node_two.lock.acquire() + + try: + # wireless link + if link_data.link_type == LinkTypes.WIRELESS.value: + objects = [node_one, node_two, net_one, net_two] + self._link_wireless(objects, connect=False) + # wired link + else: + if all([node_one, node_two]): + # TODO: fix this for the case where ifindex[1,2] are not specified + # a wired unlink event, delete the connecting bridge + interface_one = node_one.netif(interface_one_data.id) + interface_two = node_two.netif(interface_two_data.id) + + # get interfaces from common network, if no network node + # otherwise get interfaces between a node and network + if not interface_one and not interface_two: + common_networks = node_one.commonnets(node_two) + for network, common_interface_one, common_interface_two in common_networks: + if (net_one and network == net_one) or not net_one: + interface_one = common_interface_one + interface_two = common_interface_two + break + + if all([interface_one, interface_two]) and any([interface_one.net, interface_two.net]): + if interface_one.net != interface_two.net and all([interface_one.up, interface_two.up]): + raise ValueError("no common network found") + net_one = interface_one.net + interface_one.detachnet() + interface_two.detachnet() + if net_one.numnetif() == 0: + self.delete_object(net_one.objid) + node_one.delnetif(interface_one_data.id) + node_two.delnetif(interface_two_data.id) + finally: + if node_one: + node_one.lock.release() + if node_two: + node_two.lock.release() + + def link_update(self, link_data): + # interface data + interface_one_data, interface_two_data = get_interfaces(link_data) + + # get node objects identified by link data + node_one, node_two, net_one, net_two, tunnel = self.link_nodes(link_data) + + if node_one: + node_one.lock.acquire() + if node_two: + node_two.lock.acquire() + + try: + # wireless link + if link_data.link_type == LinkTypes.WIRELESS.value: + raise ValueError("cannot update wireless link") + else: + if not node_one and not node_two: + if net_one and net_two: + # modify link between nets + interface = net_one.getlinknetif(net_two) + upstream = False + + if not interface: + upstream = True + interface = net_two.getlinknetif(net_one) + + if not interface: + raise ValueError("modify unknown link between nets") + + if upstream: + interface.swapparams("_params_up") + link_config(net_one, interface, link_data, devname=interface.name) + interface.swapparams("_params_up") + else: + link_config(net_one, interface, link_data) + + if not link_data.unidirectional: + if upstream: + link_config(net_two, interface, link_data) + else: + interface.swapparams("_params_up") + link_config(net_two, interface, link_data, devname=interface.name) + interface.swapparams("_params_up") + else: + raise ValueError("modify link for unknown nodes") + elif not node_one: + # node1 = layer 2node, node2 = layer3 node + interface = node_two.netif(interface_two_data.id, net_one) + link_config(net_one, interface, link_data) + elif not node_two: + # node2 = layer 2node, node1 = layer3 node + interface = node_one.netif(interface_one_data.id, net_one) + link_config(net_one, interface, link_data) + else: + common_networks = node_one.commonnets(node_two) + for net_one, interface_one, interface_two in common_networks: + if interface_one_data.id and interface_one_data.id != node_one.getifindex(interface_one): + continue + + link_config(net_one, interface_one, link_data, interface_two=interface_two) + if not link_data.unidirectional: + link_config(net_one, interface_two, link_data, interface_two=interface_one) + else: + raise ValueError("no common network found") + finally: + if node_one: + node_one.lock.release() + if node_two: + node_two.lock.release() + + def node_add(self, node_data): + """ + Add a node to the session, based on the provided node data. + + :param core.data.NodeData node_data: data to create node with + :return: nothing + """ + + # retrieve node class for given node type + try: + node_type = NodeTypes(node_data.node_type) + node_class = nodeutils.get_node_class(node_type) + except KeyError: + logger.error("invalid node type to create: %s", node_data.node_type) + return None + + # set node start based on current session state, override and check when rj45 + start = self.state > EventTypes.DEFINITION_STATE.value + enable_rj45 = getattr(self.options, "enablerj45", "0") == "1" + if node_type == NodeTypes.RJ45 and not enable_rj45: + start = False + + # determine node id + node_id = node_data.id + if not node_id: + node_id = self.node_id_gen.next() + + # generate name if not provided + name = node_data.name + if not name: + name = "%s%s" % (node_class.__name__, node_id) + + # create node + node = self.add_object(cls=node_class, objid=node_id, name=name, start=start) + + # set node attributes + node.type = node_data.model or "router" + node.icon = node_data.icon + node.canvas = node_data.canvas + node.opaque = node_data.opaque + + # set node position and broadcast it + self.node_set_position(node, node_data) + + # add services to default and physical nodes only + services = node_data.services + if node_type in [NodeTypes.DEFAULT, NodeTypes.PHYSICAL]: + logger.info("setting model (%s) with services (%s)", node.type, services) + self.services.addservicestonode(node, node.type, services) + + # boot nodes if created after runtime, LcxNodes, Physical, and RJ45 are all PyCoreNodes + is_boot_node = isinstance(node, PyCoreNode) and not nodeutils.is_node(node, NodeTypes.RJ45) + if self.state == EventTypes.RUNTIME_STATE.value and is_boot_node: + self.write_objects() + self.add_remove_control_interface(node=node, remove=False) + + # TODO: common method to both Physical and LxcNodes, but not the common PyCoreNode + node.boot() + + # return node id, in case it was generated + return node_id + + def node_update(self, node_data): + try: + # get node to update + node = self.get_object(node_data.id) + + # set node position and broadcast it + self.node_set_position(node, node_data) + + # update attributes + node.canvas = node_data.canvas + node.icon = node_data.icon + except KeyError: + logger.error("failure to update node that does not exist: %s", node_data.id) + + def node_delete(self, node_id): + # delete node and check for session shutdown if a node was removed + result = self.custom_delete_object(node_id) + if result: + self.check_shutdown() + return result + + def node_set_position(self, node, node_data): + # extract location values + x = node_data.x_position + y = node_data.y_position + lat = node_data.latitude + lon = node_data.longitude + alt = node_data.altitude + + # check if we need to generate position from lat/lon/alt + has_empty_position = all(i is None for i in [x, y]) + has_lat_lon_alt = all(i is not None for i in [lat, lon, alt]) + using_lat_lon_alt = has_empty_position and has_lat_lon_alt + if using_lat_lon_alt: + x, y, _ = self.location.getxyz(lat, lon, alt) + + # set position and broadcast + node.setposition(x, y, None) + + # broadcast updated location when using lat/lon/alt + if using_lat_lon_alt: + self.broadcast_node_location(node) + + def broadcast_node_location(self, node): + """ + Broadcast node location to all listeners. + + :param core.netns.nodes.PyCoreObj node: node to broadcast location for + :return: nothing + """ + node_data = NodeData( + message_type=0, + id=node.objid, + x_position=node.position.x, + y_position=node.position.y + ) + self.broadcast_node(node_data) + + def shutdown(self): + self.set_state(state=EventTypes.DATACOLLECT_STATE.value, send_event=True) + self.set_state(state=EventTypes.SHUTDOWN_STATE.value, send_event=True) + super(FutureSession, self).shutdown() + + def custom_delete_object(self, object_id): + """ + Remove an emulation object. + + :param int object_id: object id to remove + :return: True if object deleted, False otherwise + """ + result = False + with self._objects_lock: + if object_id in self.objects: + obj = self.objects.pop(object_id) + obj.shutdown() + result = True + return result + + def is_active(self): + return self.state in {EventTypes.RUNTIME_STATE.value, EventTypes.DATACOLLECT_STATE.value} + + def open_xml(self, file_name, start=False): + """ + Import a session from the EmulationScript XML format. + + :param str file_name: xml file to load session from + :param bool start: instantiate session if true, false otherwise + :return: nothing + """ + # clear out existing session + self.clear() + + # set default node class when one is not provided + node_class = nodeutils.get_node_class(NodeTypes.DEFAULT) + options = {"start": start, "nodecls": node_class} + core_document_parser(self, file_name, options) + if start: + self.name = os.path.basename(file_name) + self.file_name = file_name + self.instantiate() + + def save_xml(self, file_name, version): + """ + Export a session to the EmulationScript XML format. + + :param str file_name: file name to write session xml to + :param str version: xml version type + :return: nothing + """ + doc = core_document_writer(self, version) + doc.writexml(file_name) + + def hook_add(self, state, file_name, source_name, data): + """ + Store a hook from a received file message. + + :param int state: when to run hook + :param str file_name: file name for hook + :param str source_name: source name + :param data: hook data + :return: nothing + """ + # hack to conform with old logic until updated + state = ":%s" % state + self.set_hook(state, file_name, source_name, data) + + def node_service_file(self, node_id, service_name, file_name, source_name, data): + # hack to conform with old logic until updated + service_name = ":%s" % service_name + self.services.setservicefile(node_id, service_name, file_name, source_name, data) + + def node_file(self, node_id, source_name, file_name, data): + node = self.get_object(node_id) + + if source_name is not None: + node.addfile(source_name, file_name) + elif data is not None: + node.nodefile(file_name, data) + + def clear(self): + self.delete_objects() + self.del_hooks() + self.broker.reset() + + def start_events(self): + self.event_loop.run() + + def services_event(self, event_data): + self.services.handleevent(event_data) + + def mobility_event(self, event_data): + self.mobility.handleevent(event_data) + def create_node(self, cls, name=None, model=None): - object_id = self.object_id_gen.next() + object_id = self.node_id_gen.next() if not name: name = "%s%s" % (cls.__name__, object_id) @@ -103,7 +729,7 @@ class CoreEmu(object): self.sessions = {} # load default services - services.load() + core.services.load() def create_session(self): """ diff --git a/daemon/core/future/futurehandler.py b/daemon/core/future/futurehandler.py new file mode 100644 index 00000000..f5b0c088 --- /dev/null +++ b/daemon/core/future/futurehandler.py @@ -0,0 +1,1127 @@ +""" +socket server request handlers leveraged by core servers. +""" + +import Queue +import SocketServer +import os +import shlex +import shutil +import sys +import threading +import time + +from core import logger +from core.api import coreapi +from core.coreserver import CoreServer +from core.data import ConfigData, LinkData +from core.data import EventData +from core.data import NodeData +from core.enumerations import ConfigTlvs +from core.enumerations import EventTlvs +from core.enumerations import EventTypes +from core.enumerations import ExceptionTlvs +from core.enumerations import ExecuteTlvs +from core.enumerations import FileTlvs +from core.enumerations import LinkTlvs +from core.enumerations import MessageFlags +from core.enumerations import MessageTypes +from core.enumerations import NodeTlvs +from core.enumerations import NodeTypes +from core.enumerations import RegisterTlvs +from core.enumerations import SessionTlvs +from core.misc import nodeutils +from core.misc import structutils +from core.misc import utils + + +class FutureHandler(SocketServer.BaseRequestHandler): + """ + The SocketServer class uses the RequestHandler class for servicing requests. + """ + + def __init__(self, request, client_address, server): + """ + Create a CoreRequestHandler instance. + + :param request: request object + :param str client_address: client address + :param CoreServer server: core server instance + :return: + """ + self.done = False + self.message_handlers = { + MessageTypes.NODE.value: self.handle_node_message, + MessageTypes.LINK.value: self.handle_link_message, + MessageTypes.EXECUTE.value: self.handle_execute_message, + MessageTypes.REGISTER.value: self.handle_register_message, + MessageTypes.CONFIG.value: self.handle_config_message, + MessageTypes.FILE.value: self.handle_file_message, + MessageTypes.INTERFACE.value: self.handle_interface_message, + MessageTypes.EVENT.value: self.handle_event_message, + MessageTypes.SESSION.value: self.handle_session_message, + } + self.message_queue = Queue.Queue() + self.node_status_request = {} + self._shutdown_lock = threading.Lock() + + self.handler_threads = [] + num_threads = int(server.config["numthreads"]) + if num_threads < 1: + raise ValueError("invalid number of threads: %s" % num_threads) + + logger.info("launching core server handler threads: %s", num_threads) + for _ in xrange(num_threads): + thread = threading.Thread(target=self.handler_thread) + self.handler_threads.append(thread) + thread.start() + + self.master = False + self.session = None + + utils.close_onexec(request.fileno()) + SocketServer.BaseRequestHandler.__init__(self, request, client_address, server) + + def setup(self): + """ + Client has connected, set up a new connection. + + :return: nothing + """ + logger.info("new TCP connection: %s", self.client_address) + + def finish(self): + """ + Client has disconnected, end this request handler and disconnect + from the session. Shutdown sessions that are not running. + + :return: nothing + """ + logger.info("finishing request handler") + self.done = True + + logger.info("remaining message queue size: %s", self.message_queue.qsize()) + # seconds + timeout = 10.0 + logger.info("client disconnected: notifying threads") + for thread in self.handler_threads: + logger.info("waiting for thread: %s", thread.getName()) + thread.join(timeout) + if thread.isAlive(): + logger.warn("joining %s failed: still alive after %s sec", thread.getName(), timeout) + + logger.info("connection closed: %s", self.client_address) + if self.session: + self.remove_session_handlers() + + # remove client from session broker and shutdown if there are no clients + self.session.broker.session_clients.remove(self) + if not self.session.broker.session_clients: + logger.info("no session clients left, initiating shutdown") + self.session.shutdown() + + return SocketServer.BaseRequestHandler.finish(self) + + def handle_broadcast_event(self, event_data): + """ + Callback to handle an event broadcast out from a session. + + :param core.data.EventData event_data: event data to handle + :return: nothing + """ + logger.info("handling broadcast event: %s", event_data) + + tlv_data = structutils.pack_values(coreapi.CoreEventTlv, [ + (EventTlvs.NODE, event_data.node), + (EventTlvs.TYPE, event_data.event_type), + (EventTlvs.NAME, event_data.name), + (EventTlvs.DATA, event_data.data), + (EventTlvs.TIME, event_data.time), + (EventTlvs.TIME, event_data.session) + ]) + message = coreapi.CoreEventMessage.pack(0, tlv_data) + + try: + self.sendall(message) + except IOError: + logger.exception("error sending event message") + + def handle_broadcast_file(self, file_data): + """ + Callback to handle a file broadcast out from a session. + + :param core.data.FileData file_data: file data to handle + :return: nothing + """ + logger.info("handling broadcast file: %s", file_data) + + tlv_data = structutils.pack_values(coreapi.CoreFileTlv, [ + (FileTlvs.NODE, file_data.node), + (FileTlvs.NAME, file_data.name), + (FileTlvs.MODE, file_data.mode), + (FileTlvs.NUMBER, file_data.number), + (FileTlvs.TYPE, file_data.type), + (FileTlvs.SOURCE_NAME, file_data.source), + (FileTlvs.SESSION, file_data.session), + (FileTlvs.DATA, file_data.data), + (FileTlvs.COMPRESSED_DATA, file_data.compressed_data), + ]) + message = coreapi.CoreFileMessage.pack(file_data.message_type, tlv_data) + + try: + self.sendall(message) + except IOError: + logger.exception("error sending file message") + + def handle_broadcast_config(self, config_data): + """ + Callback to handle a config broadcast out from a session. + + :param core.data.ConfigData config_data: config data to handle + :return: nothing + """ + logger.info("handling broadcast config: %s", config_data) + + tlv_data = structutils.pack_values(coreapi.CoreConfigTlv, [ + (ConfigTlvs.NODE, config_data.node), + (ConfigTlvs.OBJECT, config_data.object), + (ConfigTlvs.TYPE, config_data.type), + (ConfigTlvs.DATA_TYPES, config_data.data_types), + (ConfigTlvs.VALUES, config_data.data_values), + (ConfigTlvs.CAPTIONS, config_data.captions), + (ConfigTlvs.BITMAP, config_data.bitmap), + (ConfigTlvs.POSSIBLE_VALUES, config_data.possible_values), + (ConfigTlvs.GROUPS, config_data.groups), + (ConfigTlvs.SESSION, config_data.session), + (ConfigTlvs.INTERFACE_NUMBER, config_data.interface_number), + (ConfigTlvs.NETWORK_ID, config_data.network_id), + (ConfigTlvs.OPAQUE, config_data.opaque), + ]) + message = coreapi.CoreConfMessage.pack(config_data.message_type, tlv_data) + + try: + self.sendall(message) + except IOError: + logger.exception("error sending config message") + + def handle_broadcast_exception(self, exception_data): + """ + Callback to handle an exception broadcast out from a session. + + :param core.data.ExceptionData exception_data: exception data to handle + :return: nothing + """ + logger.info("handling broadcast exception: %s", exception_data) + tlv_data = structutils.pack_values(coreapi.CoreExceptionTlv, [ + (ExceptionTlvs.NODE, exception_data.node), + (ExceptionTlvs.SESSION, exception_data.session), + (ExceptionTlvs.LEVEL, exception_data.level), + (ExceptionTlvs.SOURCE, exception_data.source), + (ExceptionTlvs.DATE, exception_data.date), + (ExceptionTlvs.TEXT, exception_data.text) + ]) + message = coreapi.CoreExceptionMessage.pack(0, tlv_data) + + try: + self.sendall(message) + except IOError: + logger.exception("error sending exception message") + + def handle_broadcast_node(self, node_data): + """ + Callback to handle an node broadcast out from a session. + + :param core.data.NodeData node_data: node data to handle + :return: nothing + """ + logger.info("handling broadcast node: %s", node_data) + + tlv_data = structutils.pack_values(coreapi.CoreNodeTlv, [ + (NodeTlvs.NUMBER, node_data.id), + (NodeTlvs.TYPE, node_data.node_type), + (NodeTlvs.NAME, node_data.name), + (NodeTlvs.IP_ADDRESS, node_data.ip_address), + (NodeTlvs.MAC_ADDRESS, node_data.mac_address), + (NodeTlvs.IP6_ADDRESS, node_data.ip6_address), + (NodeTlvs.MODEL, node_data.model), + (NodeTlvs.EMULATION_ID, node_data.emulation_id), + (NodeTlvs.EMULATION_SERVER, node_data.emulation_server), + (NodeTlvs.SESSION, node_data.session), + (NodeTlvs.X_POSITION, node_data.x_position), + (NodeTlvs.Y_POSITION, node_data.y_position), + (NodeTlvs.CANVAS, node_data.canvas), + (NodeTlvs.NETWORK_ID, node_data.network_id), + (NodeTlvs.SERVICES, node_data.services), + (NodeTlvs.LATITUDE, node_data.latitude), + (NodeTlvs.LONGITUDE, node_data.longitude), + (NodeTlvs.ALTITUDE, node_data.altitude), + (NodeTlvs.ICON, node_data.icon), + (NodeTlvs.OPAQUE, node_data.opaque) + ]) + message = coreapi.CoreNodeMessage.pack(node_data.message_type, tlv_data) + + try: + self.sendall(message) + except IOError: + logger.exception("error sending node message") + + def handle_broadcast_link(self, link_data): + """ + Callback to handle an link broadcast out from a session. + + :param core.data.LinkData link_data: link data to handle + :return: nothing + """ + logger.info("handling broadcast link: %s", link_data) + + tlv_data = structutils.pack_values(coreapi.CoreLinkTlv, [ + (LinkTlvs.N1_NUMBER, link_data.node1_id), + (LinkTlvs.N2_NUMBER, link_data.node2_id), + (LinkTlvs.DELAY, link_data.delay), + (LinkTlvs.BANDWIDTH, link_data.bandwidth), + (LinkTlvs.PER, link_data.per), + (LinkTlvs.DUP, link_data.dup), + (LinkTlvs.JITTER, link_data.jitter), + (LinkTlvs.MER, link_data.mer), + (LinkTlvs.BURST, link_data.burst), + (LinkTlvs.SESSION, link_data.session), + (LinkTlvs.MBURST, link_data.mburst), + (LinkTlvs.TYPE, link_data.link_type), + (LinkTlvs.GUI_ATTRIBUTES, link_data.gui_attributes), + (LinkTlvs.UNIDIRECTIONAL, link_data.unidirectional), + (LinkTlvs.EMULATION_ID, link_data.emulation_id), + (LinkTlvs.NETWORK_ID, link_data.network_id), + (LinkTlvs.KEY, link_data.key), + (LinkTlvs.INTERFACE1_NUMBER, link_data.interface1_id), + (LinkTlvs.INTERFACE1_NAME, link_data.interface1_name), + (LinkTlvs.INTERFACE1_IP4, link_data.interface1_ip4), + (LinkTlvs.INTERFACE1_IP4_MASK, link_data.interface1_ip4_mask), + (LinkTlvs.INTERFACE1_MAC, link_data.interface1_mac), + (LinkTlvs.INTERFACE1_IP6, link_data.interface1_ip6), + (LinkTlvs.INTERFACE1_IP6_MASK, link_data.interface1_ip6_mask), + (LinkTlvs.INTERFACE2_NUMBER, link_data.interface2_id), + (LinkTlvs.INTERFACE2_NAME, link_data.interface2_name), + (LinkTlvs.INTERFACE2_IP4, link_data.interface2_ip4), + (LinkTlvs.INTERFACE2_IP4_MASK, link_data.interface2_ip4_mask), + (LinkTlvs.INTERFACE2_MAC, link_data.interface2_mac), + (LinkTlvs.INTERFACE2_IP6, link_data.interface2_ip6), + (LinkTlvs.INTERFACE2_IP6_MASK, link_data.interface2_ip6_mask), + (LinkTlvs.OPAQUE, link_data.opaque) + ]) + + message = coreapi.CoreLinkMessage.pack(link_data.message_type, tlv_data) + + try: + self.sendall(message) + except IOError: + logger.exception("error sending Event Message") + + def register(self): + """ + Return a Register Message + + :return: register message data + """ + logger.info("GUI has connected to session %d at %s", self.session.session_id, time.ctime()) + + tlv_data = "" + tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, "core-daemon") + tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EMULATION_SERVER.value, "core-daemon") + + # get config objects for session + for name in self.session.config_objects: + config_type, callback = self.session.config_objects[name] + # type must be in coreapi.reg_tlvs + tlv_data += coreapi.CoreRegisterTlv.pack(config_type, name) + + return coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlv_data) + + def sendall(self, data): + """ + Send raw data to the other end of this TCP connection + using socket"s sendall(). + + :param data: data to send over request socket + :return: data sent + """ + return self.request.sendall(data) + + def receive_message(self): + """ + Receive data and return a CORE API message object. + + :return: received message + :rtype: coreapi.CoreMessage + """ + try: + header = self.request.recv(coreapi.CoreMessage.header_len) + except IOError as e: + raise IOError("error receiving header (%s)" % e) + + if len(header) != coreapi.CoreMessage.header_len: + if len(header) == 0: + raise EOFError("client disconnected") + else: + raise IOError("invalid message header size") + + message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header) + if message_len == 0: + logger.warn("received message with no data") + + data = "" + while len(data) < message_len: + data += self.request.recv(message_len - len(data)) + if len(data) > message_len: + error_message = "received message length does not match received data (%s != %s)" % ( + len(data), message_len) + logger.error(error_message) + raise IOError(error_message) + + try: + message_class = coreapi.CLASS_MAP[message_type] + message = message_class(message_flags, header, data) + except KeyError: + message = coreapi.CoreMessage(message_flags, header, data) + message.message_type = message_type + logger.exception("unimplemented core message type: %s", message.type_str()) + + return message + + def queue_message(self, message): + """ + Queue an API message for later processing. + + :param message: message to queue + :return: nothing + """ + logger.info("queueing msg (queuedtimes = %s): type %s", + message.queuedtimes, MessageTypes(message.message_type)) + self.message_queue.put(message) + + def handler_thread(self): + """ + CORE API message handling loop that is spawned for each server + thread; get CORE API messages from the incoming message queue, + and call handlemsg() for processing. + + :return: nothing + """ + while not self.done: + message = self.message_queue.get() + self.handle_message(message) + + def handle_message(self, message): + """ + Handle an incoming message; dispatch based on message type, + optionally sending replies. + + :param message: message to handle + :return: nothing + """ + if self.session and self.session.broker.handle_message(message): + logger.info("message not being handled locally") + return + + logger.info("%s handling message:\n%s", threading.currentThread().getName(), message) + + if message.message_type not in self.message_handlers: + logger.warn("no handler for message type: %s", message.type_str()) + return + + message_handler = self.message_handlers[message.message_type] + + try: + # TODO: this needs to be removed, make use of the broadcast message methods + replies = message_handler(message) + self.dispatch_replies(replies, message) + except: + logger.exception("%s: exception while handling message: %s", + threading.currentThread().getName(), message) + + def dispatch_replies(self, replies, message): + """ + Dispatch replies by CORE to message msg previously received from the client. + + :param list replies: reply messages to dispatch + :param message: message for replies + :return: nothing + """ + logger.info("dispatching replies") + for reply in replies: + message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(reply) + try: + reply_message = coreapi.CLASS_MAP[message_type]( + message_flags, + reply[:coreapi.CoreMessage.header_len], + reply[coreapi.CoreMessage.header_len:] + ) + except KeyError: + # multiple TLVs of same type cause KeyError exception + reply_message = "CoreMessage (type %d flags %d length %d)" % ( + message_type, message_flags, message_length) + + logger.info("dispatch reply:\n%s", reply_message) + + try: + self.sendall(reply) + except IOError: + logger.exception("error dispatching reply") + + def handle(self): + """ + Handle a new connection request from a client. Dispatch to the + recvmsg() method for receiving data into CORE API messages, and + add them to an incoming message queue. + + :return: nothing + """ + # use port as session id + port = self.request.getpeername()[1] + + logger.info("creating new session for client: %s", port) + self.session = self.server.create_session(session_id=port) + + # TODO: hack to associate this handler with this sessions broker for broadcasting + # TODO: broker needs to be pulled out of session to the server/handler level + if self.master: + logger.info("session set to master") + self.session.master = True + self.session.broker.session_clients.append(self) + + # add handlers for various data + logger.info("adding session broadcast handlers") + self.add_session_handlers() + + # set initial session state + self.session.set_state(state=EventTypes.DEFINITION_STATE.value) + + while True: + try: + message = self.receive_message() + except EOFError: + logger.info("client disconnected") + break + except IOError: + logger.exception("error receiving message") + break + + message.queuedtimes = 0 + self.queue_message(message) + + # delay is required for brief connections, allow session joining + if message.message_type == MessageTypes.SESSION.value: + time.sleep(0.125) + + # broadcast node/link messages to other connected clients + if message.message_type not in [MessageTypes.NODE.value, MessageTypes.LINK.value]: + continue + + for client in self.session.broker.session_clients: + if client == self: + continue + + logger.info("BROADCAST TO OTHER CLIENT: %s", client) + client.sendall(message.raw_message) + + def add_session_handlers(self): + logger.info("adding session broadcast handlers") + self.session.event_handlers.append(self.handle_broadcast_event) + self.session.exception_handlers.append(self.handle_broadcast_exception) + self.session.node_handlers.append(self.handle_broadcast_node) + self.session.link_handlers.append(self.handle_broadcast_link) + self.session.file_handlers.append(self.handle_broadcast_file) + self.session.config_handlers.append(self.handle_broadcast_config) + + def remove_session_handlers(self): + logger.info("removing session broadcast handlers") + self.session.event_handlers.remove(self.handle_broadcast_event) + self.session.exception_handlers.remove(self.handle_broadcast_exception) + self.session.node_handlers.remove(self.handle_broadcast_node) + self.session.link_handlers.remove(self.handle_broadcast_link) + self.session.file_handlers.remove(self.handle_broadcast_file) + self.session.config_handlers.remove(self.handle_broadcast_config) + + def handle_node_message(self, message): + """ + Node Message handler + + :param coreapi.CoreNodeMessage message: node message + :return: replies to node message + """ + replies = [] + if message.flags & MessageFlags.ADD.value and message.flags & MessageFlags.DELETE.value: + logger.warn("ignoring invalid message: add and delete flag both set") + return () + + # create node data from message data + node_data = NodeData( + id=message.get_tlv(NodeTlvs.NUMBER.value), + x_position=message.get_tlv(NodeTlvs.X_POSITION.value), + y_position=message.get_tlv(NodeTlvs.Y_POSITION.value), + canvas=message.get_tlv(NodeTlvs.CANVAS.value), + icon=message.get_tlv(NodeTlvs.ICON.value), + latitude=message.get_tlv(NodeTlvs.LATITUDE.value), + longitude=message.get_tlv(NodeTlvs.LONGITUDE.value), + altitude=message.get_tlv(NodeTlvs.ALTITUDE.value), + node_type=message.get_tlv(NodeTlvs.TYPE.value), + name=message.get_tlv(NodeTlvs.NAME.value), + model=message.get_tlv(NodeTlvs.MODEL.value), + opaque=message.get_tlv(NodeTlvs.OPAQUE.value), + services=message.get_tlv(NodeTlvs.SERVICES.value), + ) + + if message.flags & MessageFlags.ADD.value: + node_id = self.session.node_add(node_data) + if node_id: + if message.flags & MessageFlags.STRING.value: + self.node_status_request[node_id] = True + + if self.session.state == EventTypes.RUNTIME_STATE.value: + self.send_node_emulation_id(node_id) + elif message.flags & MessageFlags.DELETE.value: + with self._shutdown_lock: + result = self.session.node_delete(node_data.id) + + # if we deleted a node broadcast out its removal + if result and message.flags & MessageFlags.STRING.value: + tlvdata = "" + tlvdata += coreapi.CoreNodeTlv.pack(NodeTlvs.NUMBER.value, node_data.id) + flags = MessageFlags.DELETE.value | MessageFlags.LOCAL.value + replies.append(coreapi.CoreNodeMessage.pack(flags, tlvdata)) + # node update + else: + self.session.node_update(node_data) + + return replies + + def handle_link_message(self, message): + """ + Link Message handler + + :param coreapi.CoreLinkMessage message: link message to handle + :return: link message replies + """ + link_data = LinkData( + session=message.get_tlv(LinkTlvs.SESSION.value), + link_type=message.get_tlv(LinkTlvs.TYPE.value), + node1_id=message.get_tlv(LinkTlvs.N1_NUMBER.value), + node2_id=message.get_tlv(LinkTlvs.N2_NUMBER.value), + delay=message.get_tlv(LinkTlvs.DELAY.value), + bandwidth=message.get_tlv(LinkTlvs.BANDWIDTH.value), + per=message.get_tlv(LinkTlvs.PER.value), + dup=message.get_tlv(LinkTlvs.DUP.value), + jitter=message.get_tlv(LinkTlvs.JITTER.value), + mer=message.get_tlv(LinkTlvs.MER.value), + burst=message.get_tlv(LinkTlvs.BURST.value), + mburst=message.get_tlv(LinkTlvs.MBURST.value), + gui_attributes=message.get_tlv(LinkTlvs.GUI_ATTRIBUTES.value), + unidirectional=message.get_tlv(LinkTlvs.UNIDIRECTIONAL.value), + emulation_id=message.get_tlv(LinkTlvs.EMULATION_ID.value), + network_id=message.get_tlv(LinkTlvs.NETWORK_ID.value), + key=message.get_tlv(LinkTlvs.KEY.value), + opaque=message.get_tlv(LinkTlvs.OPAQUE.value), + interface1_id=message.get_tlv(LinkTlvs.INTERFACE1_NUMBER.value), + interface1_name=message.get_tlv(LinkTlvs.INTERFACE1_NAME.value), + interface1_ip4=message.get_tlv(LinkTlvs.INTERFACE1_IP4.value), + interface1_ip4_mask=message.get_tlv(LinkTlvs.INTERFACE1_IP4_MASK.value), + interface1_mac=message.get_tlv(LinkTlvs.INTERFACE1_MAC.value), + interface1_ip6=message.get_tlv(LinkTlvs.INTERFACE1_IP6.value), + interface1_ip6_mask=message.get_tlv(LinkTlvs.INTERFACE1_IP6_MASK.value), + interface2_id=message.get_tlv(LinkTlvs.INTERFACE2_NUMBER.value), + interface2_name=message.get_tlv(LinkTlvs.INTERFACE2_NAME.value), + interface2_ip4=message.get_tlv(LinkTlvs.INTERFACE2_IP4.value), + interface2_ip4_mask=message.get_tlv(LinkTlvs.INTERFACE2_IP4_MASK.value), + interface2_mac=message.get_tlv(LinkTlvs.INTERFACE2_MAC.value), + interface2_ip6=message.get_tlv(LinkTlvs.INTERFACE2_IP6.value), + interface2_ip6_mask=message.get_tlv(LinkTlvs.INTERFACE2_IP6_MASK.value), + ) + + if message.flags & MessageFlags.ADD.value: + self.session.link_add(link_data) + elif message.flags & MessageFlags.DELETE.value: + self.session.link_delete(link_data) + else: + self.session.link_update(link_data) + + return () + + def handle_execute_message(self, message): + """ + Execute Message handler + + :param coreapi.CoreExecMessage message: execute message to handle + :return: reply messages + """ + node_num = message.get_tlv(ExecuteTlvs.NODE.value) + execute_num = message.get_tlv(ExecuteTlvs.NUMBER.value) + execute_time = message.get_tlv(ExecuteTlvs.TIME.value) + command = message.get_tlv(ExecuteTlvs.COMMAND.value) + + # local flag indicates command executed locally, not on a node + if node_num is None and not message.flags & MessageFlags.LOCAL.value: + raise ValueError("Execute Message is missing node number.") + + if execute_num is None: + raise ValueError("Execute Message is missing execution number.") + + if execute_time is not None: + self.session.add_event(execute_time, node=node_num, name=None, data=command) + return () + + try: + node = self.session.get_object(node_num) + + # build common TLV items for reply + tlv_data = "" + if node_num is not None: + tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, node_num) + tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, execute_num) + tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, command) + + if message.flags & MessageFlags.TTY.value: + if node_num is None: + raise NotImplementedError + # echo back exec message with cmd for spawning interactive terminal + if command == "bash": + command = "/bin/bash" + res = node.termcmdstring(command) + tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res) + reply = coreapi.CoreExecMessage.pack(MessageFlags.TTY.value, tlv_data) + return reply, + else: + logger.info("execute message with cmd=%s", command) + # execute command and send a response + if message.flags & MessageFlags.STRING.value or message.flags & MessageFlags.TEXT.value: + # shlex.split() handles quotes within the string + if message.flags & MessageFlags.LOCAL.value: + status, res = utils.cmd_output(command) + else: + status, res = node.cmd_output(command) + logger.info("done exec cmd=%s with status=%d res=(%d bytes)", command, status, len(res)) + if message.flags & MessageFlags.TEXT.value: + tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res) + if message.flags & MessageFlags.STRING.value: + tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.STATUS.value, status) + reply = coreapi.CoreExecMessage.pack(0, tlv_data) + return reply, + # execute the command with no response + else: + if message.flags & MessageFlags.LOCAL.value: + utils.mute_detach(command) + else: + node.cmd(command, wait=False) + except KeyError: + logger.exception("error getting object: %s", node_num) + # XXX wait and queue this message to try again later + # XXX maybe this should be done differently + if not message.flags & MessageFlags.LOCAL.value: + time.sleep(0.125) + self.queue_message(message) + + return () + + def handle_register_message(self, message): + """ + Register Message Handler + + :param coreapi.CoreRegMessage message: register message to handle + :return: reply messages + """ + replies = [] + + # execute a Python script or XML file + execute_server = message.get_tlv(RegisterTlvs.EXECUTE_SERVER.value) + if execute_server: + try: + logger.info("executing: %s", execute_server) + if message.flags & MessageFlags.STRING.value: + old_session_ids = set(self.server.get_session_ids()) + sys.argv = shlex.split(execute_server) + file_name = sys.argv[0] + + if os.path.splitext(file_name)[1].lower() == ".xml": + session = self.server.create_session() + try: + session.open_xml(file_name, start=True) + except: + session.shutdown() + self.server.remove_session(session) + raise + else: + thread = threading.Thread( + target=execfile, + args=(file_name, {"__file__": file_name, "server": self.server}) + ) + thread.daemon = True + thread.start() + # allow time for session creation + time.sleep(0.25) + + if message.flags & MessageFlags.STRING.value: + new_session_ids = set(self.server.get_session_ids()) + new_sid = new_session_ids.difference(old_session_ids) + try: + sid = new_sid.pop() + logger.info("executed: %s as session %d", execute_server, sid) + except KeyError: + logger.info("executed %s with unknown session ID", execute_server) + return replies + + logger.info("checking session %d for RUNTIME state" % sid) + session = self.server.get_session(session_id=sid) + retries = 10 + # wait for session to enter RUNTIME state, to prevent GUI from + # connecting while nodes are still being instantiated + while session.state != EventTypes.RUNTIME_STATE.value: + logger.info("waiting for session %d to enter RUNTIME state" % sid) + time.sleep(1) + retries -= 1 + if retries <= 0: + logger.info("session %d did not enter RUNTIME state" % sid) + return replies + + tlv_data = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, execute_server) + tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.SESSION.value, "%s" % sid) + message = coreapi.CoreRegMessage.pack(0, tlv_data) + replies.append(message) + except Exception as e: + logger.exception("error executing: %s", execute_server) + tlv_data = coreapi.CoreExceptionTlv.pack(ExceptionTlvs.LEVEL.value, 2) + tlv_data += coreapi.CoreExceptionTlv.pack(ExceptionTlvs.TEXT.value, str(e)) + message = coreapi.CoreExceptionMessage.pack(0, tlv_data) + replies.append(message) + + return replies + + gui = message.get_tlv(RegisterTlvs.GUI.value) + if gui is None: + logger.info("ignoring Register message") + else: + # register capabilities with the GUI + self.master = True + + # TODO: need to replicate functionality? + # self.server.set_session_master(self) + # find the session containing this client and set the session to master + for session in self.server.sessions.itervalues(): + if self in session.broker.session_clients: + logger.info("setting session to master: %s", session.session_id) + session.master = True + break + + replies.append(self.register()) + replies.append(self.server.to_session_message()) + + return replies + + def handle_config_message(self, message): + """ + Configuration Message handler + + :param coreapi.CoreConfMessage message: configuration message to handle + :return: reply messages + """ + # convert config message to standard config data object + config_data = ConfigData( + node=message.get_tlv(ConfigTlvs.NODE.value), + object=message.get_tlv(ConfigTlvs.OBJECT.value), + type=message.get_tlv(ConfigTlvs.TYPE.value), + data_types=message.get_tlv(ConfigTlvs.DATA_TYPES.value), + data_values=message.get_tlv(ConfigTlvs.VALUES.value), + captions=message.get_tlv(ConfigTlvs.CAPTIONS.value), + bitmap=message.get_tlv(ConfigTlvs.BITMAP.value), + possible_values=message.get_tlv(ConfigTlvs.POSSIBLE_VALUES.value), + groups=message.get_tlv(ConfigTlvs.GROUPS.value), + session=message.get_tlv(ConfigTlvs.SESSION.value), + interface_number=message.get_tlv(ConfigTlvs.INTERFACE_NUMBER.value), + network_id=message.get_tlv(ConfigTlvs.NETWORK_ID.value), + opaque=message.get_tlv(ConfigTlvs.OPAQUE.value) + ) + logger.info("Configuration message for %s node %s", config_data.object, config_data.node) + + # dispatch to any registered callback for this object type + replies = self.session.config_object(config_data) + + for reply in replies: + self.handle_broadcast_config(reply) + + return [] + + def handle_file_message(self, message): + """ + File Message handler + + :param coreapi.CoreFileMessage message: file message to handle + :return: reply messages + """ + if message.flags & MessageFlags.ADD.value: + node_num = message.get_tlv(FileTlvs.NUMBER.value) + file_name = message.get_tlv(FileTlvs.NAME.value) + file_type = message.get_tlv(FileTlvs.TYPE.value) + source_name = message.get_tlv(FileTlvs.SOURCE_NAME.value) + data = message.get_tlv(FileTlvs.DATA.value) + compressed_data = message.get_tlv(FileTlvs.COMPRESSED_DATA.value) + + if compressed_data: + logger.warn("Compressed file data not implemented for File message.") + return () + + if source_name and data: + logger.warn("ignoring invalid File message: source and data TLVs are both present") + return () + + # some File Messages store custom files in services, + # prior to node creation + if file_type is not None: + if file_type.startswith("service:"): + _, service_name = file_type.split(':')[:2] + self.session.node_service_file(node_num, service_name, file_name, source_name, data) + return () + elif file_type.startswith("hook:"): + _, state = file_type.split(':')[:2] + if not state.isdigit(): + logger.error("error setting hook having state '%s'", state) + return () + state = int(state) + self.session.hook_add(state, file_name, source_name, data) + return () + + # writing a file to the host + if node_num is None: + if source_name is not None: + shutil.copy2(source_name, file_name) + else: + with open(file_name, "w") as open_file: + open_file.write(data) + return () + + self.session.node_add_file(node_num, source_name, file_name, data) + else: + raise NotImplementedError + + return () + + def handle_interface_message(self, message): + """ + Interface Message handler. + + :param message: interface message to handle + :return: reply messages + """ + logger.info("ignoring Interface message") + return () + + def handle_event_message(self, message): + """ + Event Message handler + + :param coreapi.CoreEventMessage message: event message to handle + :return: reply messages + """ + event_data = EventData( + node=message.get_tlv(EventTlvs.NODE.value), + event_type=message.get_tlv(EventTlvs.TYPE.value), + name=message.get_tlv(EventTlvs.NAME.value), + data=message.get_tlv(EventTlvs.DATA.value), + time=message.get_tlv(EventTlvs.TIME.value), + session=message.get_tlv(EventTlvs.SESSION.value) + ) + + event_type = event_data.event_type + if event_type is None: + raise NotImplementedError("Event message missing event type") + node_id = event_data.node + + logger.info("EVENT %d: %s at %s", event_type, EventTypes(event_type).name, time.ctime()) + if event_type <= EventTypes.SHUTDOWN_STATE.value: + if node_id is not None: + try: + node = self.session.get_object(node_id) + except KeyError: + raise KeyError("Event message for unknown node %d" % node_id) + + # configure mobility models for WLAN added during runtime + if event_type == EventTypes.INSTANTIATION_STATE.value and nodeutils.is_node(node, + NodeTypes.WIRELESS_LAN): + self.session.mobility.startup(node_ids=(node.objid,)) + return () + + logger.warn("dropping unhandled Event message with node number") + return () + self.session.set_state(state=event_type) + + if event_type == EventTypes.DEFINITION_STATE.value: + # clear all session objects in order to receive new definitions + self.session.clear() + elif event_type == EventTypes.INSTANTIATION_STATE.value: + if len(self.handler_threads) > 1: + # TODO: sync handler threads here before continuing + time.sleep(2.0) # XXX + # done receiving node/link configuration, ready to instantiate + self.session.instantiate() + + # after booting nodes attempt to send emulation id for nodes waiting on status + for obj in self.session.objects.itervalues(): + self.send_node_emulation_id(obj.objid) + elif event_type == EventTypes.RUNTIME_STATE.value: + if self.session.master: + logger.warn("Unexpected event message: RUNTIME state received at session master") + else: + # master event queue is started in session.checkruntime() + self.session.start_events() + elif event_type == EventTypes.DATACOLLECT_STATE.value: + self.session.data_collect() + elif event_type == EventTypes.SHUTDOWN_STATE.value: + if self.session.master: + logger.warn("Unexpected event message: SHUTDOWN state received at session master") + elif event_type in (EventTypes.START.value, EventTypes.STOP.value, + EventTypes.RESTART.value, + EventTypes.PAUSE.value, + EventTypes.RECONFIGURE.value): + handled = False + name = event_data.name + if name: + # TODO: register system for event message handlers, + # like confobjs + if name.startswith("service:"): + self.session.services_event(event_data) + handled = True + elif name.startswith("mobility:"): + self.session.mobility_event(event_data) + handled = True + if not handled: + logger.warn("Unhandled event message: event type %s (%s)", + event_type, coreapi.state_name(event_type)) + elif event_type == EventTypes.FILE_OPEN.value: + filename = event_data.name + self.session.open_xml(filename, start=False) + self.session.send_objects() + return () + elif event_type == EventTypes.FILE_SAVE.value: + filename = event_data.name + self.session.save_xml(filename, self.session.config["xmlfilever"]) + elif event_type == EventTypes.SCHEDULED.value: + etime = event_data.time + node = event_data.node + name = event_data.name + data = event_data.data + if etime is None: + logger.warn("Event message scheduled event missing start time") + return () + if message.flags & MessageFlags.ADD.value: + self.session.add_event(float(etime), node=node, name=name, data=data) + else: + raise NotImplementedError + else: + logger.warn("Unhandled event message: event type %d", event_type) + + return () + + def handle_session_message(self, message): + """ + Session Message handler + + :param coreapi.CoreSessionMessage message: session message to handle + :return: reply messages + """ + session_id_str = message.get_tlv(SessionTlvs.NUMBER.value) + session_ids = coreapi.str_to_list(session_id_str) + name_str = message.get_tlv(SessionTlvs.NAME.value) + names = coreapi.str_to_list(name_str) + file_str = message.get_tlv(SessionTlvs.FILE.value) + files = coreapi.str_to_list(file_str) + thumb = message.get_tlv(SessionTlvs.THUMB.value) + user = message.get_tlv(SessionTlvs.USER.value) + logger.info("SESSION message flags=0x%x sessions=%s" % (message.flags, session_id_str)) + + if message.flags == 0: + for index, session_id in enumerate(session_ids): + session_id = int(session_id) + if session_id == 0: + session = self.session + else: + session = self.server.get_session(session_id=session_id) + + if session is None: + logger.info("session %s not found", session_id) + continue + + logger.info("request to modify to session %s", session.session_id) + if names is not None: + session.name = names[index] + + if files is not None: + session.file_name = files[index] + + if thumb: + session.set_thumbnail(thumb) + + if user: + session.set_user(user) + elif message.flags & MessageFlags.STRING.value and not message.flags & MessageFlags.ADD.value: + # status request flag: send list of sessions + return self.server.to_session_message(), + else: + # handle ADD or DEL flags + for session_id in session_ids: + session_id = int(session_id) + session = self.server.get_session(session_id=session_id) + + if session is None: + logger.info("session %s not found (flags=0x%x)", session_id, message.flags) + continue + + if message.flags & MessageFlags.ADD.value: + # connect to the first session that exists + logger.info("request to connect to session %s" % session_id) + + # remove client from session broker and shutdown if needed + self.session.broker.session_clients.remove(self) + if not self.session.broker.session_clients and not self.session.is_active(): + self.session.shutdown() + + # set session to join + self.session = session + + # add client to session broker and set master if needed + if self.master: + self.session.master = True + self.session.broker.session_clients.append(self) + + # add broadcast handlers + logger.info("adding session broadcast handlers") + self.add_session_handlers() + + if user: + self.session.set_user(user) + + if message.flags & MessageFlags.STRING.value: + self.session.send_objects() + elif message.flags & MessageFlags.DELETE.value: + # shut down the specified session(s) + logger.info("request to terminate session %s" % session_id) + session.shutdown() + else: + logger.warn("unhandled session flags for session %s", session_id) + + return () + + def send_node_emulation_id(self, node_id): + """ + Node emulation id to send. + + :param int node_id: node id to send + :return: nothing + """ + if node_id in self.node_status_request: + tlv_data = "" + tlv_data += coreapi.CoreNodeTlv.pack(NodeTlvs.NUMBER.value, node_id) + tlv_data += coreapi.CoreNodeTlv.pack(NodeTlvs.EMULATION_ID.value, node_id) + reply = coreapi.CoreNodeMessage.pack(MessageFlags.ADD.value | MessageFlags.LOCAL.value, tlv_data) + + try: + self.sendall(reply) + except IOError: + logger.exception("error sending node emulation id message: %s", node_id) + + del self.node_status_request[node_id] diff --git a/daemon/core/future/futureserver.py b/daemon/core/future/futureserver.py new file mode 100644 index 00000000..64db9002 --- /dev/null +++ b/daemon/core/future/futureserver.py @@ -0,0 +1,263 @@ +""" +Defines server classes and request handlers for TCP and UDP. +""" + +import SocketServer +import threading +import time + +from core import logger +from core.api import coreapi +from core.enumerations import EventTypes +from core.enumerations import SessionTlvs +from core.future.coreemu import FutureSession + + +class FutureServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + """ + TCP server class, manages sessions and spawns request handlers for + incoming connections. + """ + daemon_threads = True + allow_reuse_address = True + servers = set() + + def __init__(self, server_address, handler_class, config=None): + """ + Server class initialization takes configuration data and calls + the SocketServer constructor + + :param tuple[str, int] server_address: server host and port to use + :param class handler_class: request handler + :param dict config: configuration setting + :return: + """ + self.config = config + self.sessions = {} + self.udpserver = None + self.udpthread = None + self._sessions_lock = threading.Lock() + FutureServer.add_server(self) + SocketServer.TCPServer.__init__(self, server_address, handler_class) + + @classmethod + def add_server(cls, server): + """ + Add a core server to the known servers set. + + :param CoreServer server: server to add + :return: nothing + """ + cls.servers.add(server) + + @classmethod + def remove_server(cls, server): + """ + Remove a core server from the known servers set. + + :param CoreServer server: server to remove + :return: nothing + """ + if server in cls.servers: + cls.servers.remove(server) + + def shutdown(self): + """ + Shutdown the server, all known sessions, and remove server from known servers set. + + :return: nothing + """ + # shutdown all known sessions + for session in self.sessions.values(): + session.shutdown() + + # remove server from server list + FutureServer.remove_server(self) + + def add_session(self, session): + """ + Add a session to our dictionary of sessions, ensuring a unique session number. + + :param core.session.Session session: session to add + :return: added session + :raise KeyError: when a session with the same id already exists + """ + with self._sessions_lock: + if session.session_id in self.sessions: + raise KeyError("non-unique session id %s for %s" % (session.session_id, session)) + self.sessions[session.session_id] = session + + return session + + def remove_session(self, session): + """ + Remove a session from our dictionary of sessions. + + :param core.session.Session session: session to remove + :return: removed session + :rtype: core.session.Session + """ + with self._sessions_lock: + if session.session_id not in self.sessions: + logger.info("session id %s not found (sessions=%s)", session.session_id, self.sessions.keys()) + else: + del self.sessions[session.session_id] + + return session + + def get_session_ids(self): + """ + Return a list of active session numbers. + + :return: known session ids + :rtype: list + """ + with self._sessions_lock: + session_ids = self.sessions.keys() + + return session_ids + + def create_session(self, session_id=None): + """ + Convenience method for creating sessions with the servers config. + + :param int session_id: session id for new session + :return: create session + :rtype: FutureSession + """ + + # create random id when necessary, seems to be 1 case wanted, based on legacy code + # creating a value so high, typical client side generation schemes hopefully wont collide + if not session_id: + session_id = next( + session_id for session_id in xrange(60000, 65000) + if session_id not in self.sessions + ) + + # create and add session to local manager + session = FutureSession(session_id, config=self.config) + self.add_session(session) + + # add shutdown handler to remove session from manager + session.shutdown_handlers.append(self.session_shutdown) + + return session + + def get_session(self, session_id=None): + """ + Create a new session or retrieve an existing one from our + dictionary of sessions. When the session_id=0 and the use_existing + flag is set, return on of the existing sessions. + + :param int session_id: session id of session to retrieve, defaults to returning random session + :return: session + :rtype: core.session.Session + """ + + with self._sessions_lock: + # return specified session or none + if session_id: + return self.sessions.get(session_id) + + # retrieving known session + session = None + + # find runtime session with highest node count + for known_session in filter(lambda x: x.state == EventTypes.RUNTIME_STATE.value, + self.sessions.itervalues()): + if not session or known_session.get_node_count() > session.get_node_count(): + session = known_session + + # return first known session otherwise + if not session: + for known_session in self.sessions.itervalues(): + session = known_session + break + + return session + + def session_shutdown(self, session): + """ + Handler method to be used as a callback when a session has shutdown. + + :param core.session.Session session: session shutting down + :return: nothing + """ + self.remove_session(session) + + def to_session_message(self, flags=0): + """ + Build CORE API Sessions message based on current session info. + + :param int flags: message flags + :return: session message + """ + id_list = [] + name_list = [] + file_list = [] + node_count_list = [] + date_list = [] + thumb_list = [] + num_sessions = 0 + + with self._sessions_lock: + for session_id in self.sessions: + session = self.sessions[session_id] + # debug: session.dumpsession() + num_sessions += 1 + id_list.append(str(session_id)) + + name = session.name + if not name: + name = "" + name_list.append(name) + + file = session.file_name + if not file: + file = "" + file_list.append(file) + + node_count_list.append(str(session.get_node_count())) + + date_list.append(time.ctime(session._state_time)) + + thumb = session.thumbnail + if not thumb: + thumb = "" + thumb_list.append(thumb) + + session_ids = "|".join(id_list) + names = "|".join(name_list) + files = "|".join(file_list) + node_counts = "|".join(node_count_list) + dates = "|".join(date_list) + thumbs = "|".join(thumb_list) + + if num_sessions > 0: + tlv_data = "" + if len(session_ids) > 0: + tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids) + if len(names) > 0: + tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names) + if len(files) > 0: + tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files) + if len(node_counts) > 0: + tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts) + if len(dates) > 0: + tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates) + if len(thumbs) > 0: + tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs) + message = coreapi.CoreSessionMessage.pack(flags, tlv_data) + else: + message = None + + return message + + def dump_sessions(self): + """ + Log currently known session information. + """ + logger.info("sessions:") + with self._sessions_lock: + for session_id in self.sessions: + logger.info(session_id) diff --git a/daemon/scripts/core-future b/daemon/scripts/core-future new file mode 100644 index 00000000..6afbaa1e --- /dev/null +++ b/daemon/scripts/core-future @@ -0,0 +1,180 @@ +#!/usr/bin/env python +""" +core-daemon: the CORE daemon is a server process that receives CORE API +messages and instantiates emulated nodes and networks within the kernel. Various +message handlers are defined and some support for sending messages. +""" + +import ConfigParser +import atexit +import optparse +import signal +import sys +import time + +from core import constants +from core import coreserver +from core import enumerations +from core import logger +from core import services +from core.future.futurehandler import FutureHandler +from core.future.futureserver import FutureServer +from core.misc import nodeutils +from core.misc.utils import close_onexec +from core.service import ServiceManager + + +def banner(): + """ + Output the program banner printed to the terminal or log file. + + :return: nothing + """ + logger.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) + + +def cored(cfg=None): + """ + Start the CoreServer object and enter the server loop. + + :param dict cfg: core configuration + :return: nothing + """ + host = cfg["listenaddr"] + port = int(cfg["port"]) + if host == "" or host is None: + host = "localhost" + + try: + server = FutureServer((host, port), FutureHandler, cfg) + except: + logger.exception("error starting main server on: %s:%s", host, port) + sys.exit(1) + + close_onexec(server.fileno()) + logger.info("main server started, listening on: %s:%s", host, port) + server.serve_forever() + + +# TODO: should sessions and the main core daemon both catch exit to shutdown independently? +def cleanup(): + """ + Runs server shutdown and cleanup when catching an exit signal. + + :return: nothing + """ + while coreserver.CoreServer.servers: + server = coreserver.CoreServer.servers.pop() + server.shutdown() + + +def sighandler(signum, stackframe): + """ + Signal handler when different signals are sent. + + :param int signum: singal number sent + :param stackframe: stack frame sent + :return: nothing + """ + logger.error("terminated by signal: %s", signum) + sys.exit(signum) + + +signal.signal(signal.SIGHUP, sighandler) +signal.signal(signal.SIGINT, sighandler) +signal.signal(signal.SIGTERM, sighandler) +signal.signal(signal.SIGUSR1, sighandler) +signal.signal(signal.SIGUSR2, sighandler) +atexit.register(cleanup) + + +def get_merged_config(filename): + """ + Return a configuration after merging config file and command-line arguments. + + :param str filename: file name to merge configuration settings with + :return: merged configuration + :rtype: dict + """ + # these are the defaults used in the config file + defaults = { + "port": "%d" % enumerations.CORE_API_PORT, + "listenaddr": "localhost", + "xmlfilever": "1.0", + "numthreads": "1", + } + + usagestr = "usage: %prog [-h] [options] [args]\n\n" + \ + "CORE daemon v.%s instantiates Linux network namespace " \ + "nodes." % constants.COREDPY_VERSION + parser = optparse.OptionParser(usage=usagestr) + parser.add_option("-f", "--configfile", dest="configfile", type="string", + help="read config from specified file; default = %s" % filename) + parser.add_option("-p", "--port", dest="port", type=int, + help="port number to listen on; default = %s" % defaults["port"]) + parser.add_option("-t", "--numthreads", dest="numthreads", type=int, + help="number of server threads; default = %s" % defaults["numthreads"]) + + # parse command line options + options, args = parser.parse_args() + + # read the config file + if options.configfile is not None: + filename = options.configfile + del options.configfile + cfg = ConfigParser.SafeConfigParser(defaults) + cfg.read(filename) + + section = "core-daemon" + if not cfg.has_section(section): + cfg.add_section(section) + + # merge command line with config file + for opt in options.__dict__: + val = options.__dict__[opt] + if val is not None: + cfg.set(section, opt, val.__str__()) + + return dict(cfg.items(section)), args + + +def main(): + """ + Main program startup. + + :return: nothing + """ + # get a configuration merged from config file and command-line arguments + cfg, args = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR) + for a in args: + logger.error("ignoring command line argument: %s", a) + + # attempt load custom services + service_paths = cfg.get("custom_services_dir") + logger.debug("custom service paths: %s", service_paths) + if service_paths: + for service_path in service_paths.split(','): + service_path = service_path.strip() + ServiceManager.add_services(service_path) + + banner() + + try: + cored(cfg) + except KeyboardInterrupt: + logger.info("keyboard interrupt, stopping core daemon") + + sys.exit(0) + + +if __name__ == "__main__": + # configure nodes to use + if len(sys.argv) == 2 and sys.argv[1] == "ovs": + from core.netns.openvswitch import OVS_NODES + + nodeutils.update_node_map(OVS_NODES) + + # load default services + services.load() + + main()