""" socket server request handlers leveraged by core servers. """ import Queue import SocketServer import os import shlex import shutil import sys import threading import time from core import logger from core.api import coreapi from core.data import ConfigData from core.data import EventData from core.emulator.emudata import InterfaceData from core.emulator.emudata import LinkOptions from core.emulator.emudata import NodeOptions from core.enumerations import ConfigTlvs from core.enumerations import EventTlvs from core.enumerations import EventTypes from core.enumerations import ExceptionTlvs from core.enumerations import ExecuteTlvs from core.enumerations import FileTlvs from core.enumerations import LinkTlvs from core.enumerations import LinkTypes from core.enumerations import MessageFlags from core.enumerations import MessageTypes from core.enumerations import NodeTlvs from core.enumerations import NodeTypes from core.enumerations import RegisterTlvs from core.enumerations import SessionTlvs from core.misc import nodeutils from core.misc import structutils from core.misc import utils class CoreHandler(SocketServer.BaseRequestHandler): """ The SocketServer class uses the RequestHandler class for servicing requests. """ def __init__(self, request, client_address, server): """ Create a CoreRequestHandler instance. :param request: request object :param str client_address: client address :param CoreServer server: core server instance :return: """ self.done = False self.message_handlers = { MessageTypes.NODE.value: self.handle_node_message, MessageTypes.LINK.value: self.handle_link_message, MessageTypes.EXECUTE.value: self.handle_execute_message, MessageTypes.REGISTER.value: self.handle_register_message, MessageTypes.CONFIG.value: self.handle_config_message, MessageTypes.FILE.value: self.handle_file_message, MessageTypes.INTERFACE.value: self.handle_interface_message, MessageTypes.EVENT.value: self.handle_event_message, MessageTypes.SESSION.value: self.handle_session_message, } self.message_queue = Queue.Queue() self.node_status_request = {} self._shutdown_lock = threading.Lock() self._sessions_lock = threading.Lock() self.handler_threads = [] num_threads = int(server.config["numthreads"]) if num_threads < 1: raise ValueError("invalid number of threads: %s" % num_threads) logger.debug("launching core server handler threads: %s", num_threads) for _ in xrange(num_threads): thread = threading.Thread(target=self.handler_thread) self.handler_threads.append(thread) thread.start() self.master = False self.session = None # core emulator self.coreemu = server.coreemu utils.close_onexec(request.fileno()) SocketServer.BaseRequestHandler.__init__(self, request, client_address, server) def setup(self): """ Client has connected, set up a new connection. :return: nothing """ logger.debug("new TCP connection: %s", self.client_address) def finish(self): """ Client has disconnected, end this request handler and disconnect from the session. Shutdown sessions that are not running. :return: nothing """ logger.debug("finishing request handler") logger.debug("remaining message queue size: %s", self.message_queue.qsize()) # give some time for message queue to deplete timeout = 10 wait = 0 while not self.message_queue.empty(): logger.debug("waiting for message queue to empty: %s seconds", wait) time.sleep(1) wait += 1 if wait == timeout: logger.warn("queue failed to be empty, finishing request handler") break logger.info("client disconnected: notifying threads") self.done = True for thread in self.handler_threads: logger.info("waiting for thread: %s", thread.getName()) thread.join(timeout) if thread.isAlive(): logger.warn("joining %s failed: still alive after %s sec", thread.getName(), timeout) logger.info("connection closed: %s", self.client_address) if self.session: # remove client from session broker and shutdown if there are no clients self.remove_session_handlers() self.session.broker.session_clients.remove(self) if not self.session.broker.session_clients and not self.session.is_active(): logger.info("no session clients left and not active, initiating shutdown") self.coreemu.delete_session(self.session.session_id) return SocketServer.BaseRequestHandler.finish(self) def session_message(self, flags=0): """ Build CORE API Sessions message based on current session info. :param int flags: message flags :return: session message """ id_list = [] name_list = [] file_list = [] node_count_list = [] date_list = [] thumb_list = [] num_sessions = 0 with self._sessions_lock: for session_id, session in self.coreemu.sessions.iteritems(): num_sessions += 1 id_list.append(str(session_id)) name = session.name if not name: name = "" name_list.append(name) file = session.file_name if not file: file = "" file_list.append(file) node_count_list.append(str(session.get_node_count())) date_list.append(time.ctime(session._state_time)) thumb = session.thumbnail if not thumb: thumb = "" thumb_list.append(thumb) session_ids = "|".join(id_list) names = "|".join(name_list) files = "|".join(file_list) node_counts = "|".join(node_count_list) dates = "|".join(date_list) thumbs = "|".join(thumb_list) if num_sessions > 0: tlv_data = "" if len(session_ids) > 0: tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NUMBER.value, session_ids) if len(names) > 0: tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NAME.value, names) if len(files) > 0: tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.FILE.value, files) if len(node_counts) > 0: tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.NODE_COUNT.value, node_counts) if len(dates) > 0: tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.DATE.value, dates) if len(thumbs) > 0: tlv_data += coreapi.CoreSessionTlv.pack(SessionTlvs.THUMB.value, thumbs) message = coreapi.CoreSessionMessage.pack(flags, tlv_data) else: message = None return message def handle_broadcast_event(self, event_data): """ Callback to handle an event broadcast out from a session. :param core.data.EventData event_data: event data to handle :return: nothing """ logger.debug("handling broadcast event: %s", event_data) tlv_data = structutils.pack_values(coreapi.CoreEventTlv, [ (EventTlvs.NODE, event_data.node), (EventTlvs.TYPE, event_data.event_type), (EventTlvs.NAME, event_data.name), (EventTlvs.DATA, event_data.data), (EventTlvs.TIME, event_data.time), (EventTlvs.TIME, event_data.session) ]) message = coreapi.CoreEventMessage.pack(0, tlv_data) try: self.sendall(message) except IOError: logger.exception("error sending event message") def handle_broadcast_file(self, file_data): """ Callback to handle a file broadcast out from a session. :param core.data.FileData file_data: file data to handle :return: nothing """ logger.debug("handling broadcast file: %s", file_data) tlv_data = structutils.pack_values(coreapi.CoreFileTlv, [ (FileTlvs.NODE, file_data.node), (FileTlvs.NAME, file_data.name), (FileTlvs.MODE, file_data.mode), (FileTlvs.NUMBER, file_data.number), (FileTlvs.TYPE, file_data.type), (FileTlvs.SOURCE_NAME, file_data.source), (FileTlvs.SESSION, file_data.session), (FileTlvs.DATA, file_data.data), (FileTlvs.COMPRESSED_DATA, file_data.compressed_data), ]) message = coreapi.CoreFileMessage.pack(file_data.message_type, tlv_data) try: self.sendall(message) except IOError: logger.exception("error sending file message") def handle_broadcast_config(self, config_data): """ Callback to handle a config broadcast out from a session. :param core.data.ConfigData config_data: config data to handle :return: nothing """ logger.debug("handling broadcast config: %s", config_data) tlv_data = structutils.pack_values(coreapi.CoreConfigTlv, [ (ConfigTlvs.NODE, config_data.node), (ConfigTlvs.OBJECT, config_data.object), (ConfigTlvs.TYPE, config_data.type), (ConfigTlvs.DATA_TYPES, config_data.data_types), (ConfigTlvs.VALUES, config_data.data_values), (ConfigTlvs.CAPTIONS, config_data.captions), (ConfigTlvs.BITMAP, config_data.bitmap), (ConfigTlvs.POSSIBLE_VALUES, config_data.possible_values), (ConfigTlvs.GROUPS, config_data.groups), (ConfigTlvs.SESSION, config_data.session), (ConfigTlvs.INTERFACE_NUMBER, config_data.interface_number), (ConfigTlvs.NETWORK_ID, config_data.network_id), (ConfigTlvs.OPAQUE, config_data.opaque), ]) message = coreapi.CoreConfMessage.pack(config_data.message_type, tlv_data) try: self.sendall(message) except IOError: logger.exception("error sending config message") def handle_broadcast_exception(self, exception_data): """ Callback to handle an exception broadcast out from a session. :param core.data.ExceptionData exception_data: exception data to handle :return: nothing """ logger.debug("handling broadcast exception: %s", exception_data) tlv_data = structutils.pack_values(coreapi.CoreExceptionTlv, [ (ExceptionTlvs.NODE, exception_data.node), (ExceptionTlvs.SESSION, exception_data.session), (ExceptionTlvs.LEVEL, exception_data.level), (ExceptionTlvs.SOURCE, exception_data.source), (ExceptionTlvs.DATE, exception_data.date), (ExceptionTlvs.TEXT, exception_data.text) ]) message = coreapi.CoreExceptionMessage.pack(0, tlv_data) try: self.sendall(message) except IOError: logger.exception("error sending exception message") def handle_broadcast_node(self, node_data): """ Callback to handle an node broadcast out from a session. :param core.data.NodeData node_data: node data to handle :return: nothing """ logger.debug("handling broadcast node: %s", node_data) tlv_data = structutils.pack_values(coreapi.CoreNodeTlv, [ (NodeTlvs.NUMBER, node_data.id), (NodeTlvs.TYPE, node_data.node_type), (NodeTlvs.NAME, node_data.name), (NodeTlvs.IP_ADDRESS, node_data.ip_address), (NodeTlvs.MAC_ADDRESS, node_data.mac_address), (NodeTlvs.IP6_ADDRESS, node_data.ip6_address), (NodeTlvs.MODEL, node_data.model), (NodeTlvs.EMULATION_ID, node_data.emulation_id), (NodeTlvs.EMULATION_SERVER, node_data.emulation_server), (NodeTlvs.SESSION, node_data.session), (NodeTlvs.X_POSITION, node_data.x_position), (NodeTlvs.Y_POSITION, node_data.y_position), (NodeTlvs.CANVAS, node_data.canvas), (NodeTlvs.NETWORK_ID, node_data.network_id), (NodeTlvs.SERVICES, node_data.services), (NodeTlvs.LATITUDE, node_data.latitude), (NodeTlvs.LONGITUDE, node_data.longitude), (NodeTlvs.ALTITUDE, node_data.altitude), (NodeTlvs.ICON, node_data.icon), (NodeTlvs.OPAQUE, node_data.opaque) ]) message = coreapi.CoreNodeMessage.pack(node_data.message_type, tlv_data) try: self.sendall(message) except IOError: logger.exception("error sending node message") def handle_broadcast_link(self, link_data): """ Callback to handle an link broadcast out from a session. :param core.data.LinkData link_data: link data to handle :return: nothing """ logger.debug("handling broadcast link: %s", link_data) tlv_data = structutils.pack_values(coreapi.CoreLinkTlv, [ (LinkTlvs.N1_NUMBER, link_data.node1_id), (LinkTlvs.N2_NUMBER, link_data.node2_id), (LinkTlvs.DELAY, link_data.delay), (LinkTlvs.BANDWIDTH, link_data.bandwidth), (LinkTlvs.PER, link_data.per), (LinkTlvs.DUP, link_data.dup), (LinkTlvs.JITTER, link_data.jitter), (LinkTlvs.MER, link_data.mer), (LinkTlvs.BURST, link_data.burst), (LinkTlvs.SESSION, link_data.session), (LinkTlvs.MBURST, link_data.mburst), (LinkTlvs.TYPE, link_data.link_type), (LinkTlvs.GUI_ATTRIBUTES, link_data.gui_attributes), (LinkTlvs.UNIDIRECTIONAL, link_data.unidirectional), (LinkTlvs.EMULATION_ID, link_data.emulation_id), (LinkTlvs.NETWORK_ID, link_data.network_id), (LinkTlvs.KEY, link_data.key), (LinkTlvs.INTERFACE1_NUMBER, link_data.interface1_id), (LinkTlvs.INTERFACE1_NAME, link_data.interface1_name), (LinkTlvs.INTERFACE1_IP4, link_data.interface1_ip4), (LinkTlvs.INTERFACE1_IP4_MASK, link_data.interface1_ip4_mask), (LinkTlvs.INTERFACE1_MAC, link_data.interface1_mac), (LinkTlvs.INTERFACE1_IP6, link_data.interface1_ip6), (LinkTlvs.INTERFACE1_IP6_MASK, link_data.interface1_ip6_mask), (LinkTlvs.INTERFACE2_NUMBER, link_data.interface2_id), (LinkTlvs.INTERFACE2_NAME, link_data.interface2_name), (LinkTlvs.INTERFACE2_IP4, link_data.interface2_ip4), (LinkTlvs.INTERFACE2_IP4_MASK, link_data.interface2_ip4_mask), (LinkTlvs.INTERFACE2_MAC, link_data.interface2_mac), (LinkTlvs.INTERFACE2_IP6, link_data.interface2_ip6), (LinkTlvs.INTERFACE2_IP6_MASK, link_data.interface2_ip6_mask), (LinkTlvs.OPAQUE, link_data.opaque) ]) message = coreapi.CoreLinkMessage.pack(link_data.message_type, tlv_data) try: self.sendall(message) except IOError: logger.exception("error sending Event Message") def register(self): """ Return a Register Message :return: register message data """ logger.info("GUI has connected to session %d at %s", self.session.session_id, time.ctime()) tlv_data = "" tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, "core-daemon") tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EMULATION_SERVER.value, "core-daemon") # get config objects for session for name in self.session.config_objects: config_type, callback = self.session.config_objects[name] # type must be in coreapi.reg_tlvs tlv_data += coreapi.CoreRegisterTlv.pack(config_type, name) return coreapi.CoreRegMessage.pack(MessageFlags.ADD.value, tlv_data) def sendall(self, data): """ Send raw data to the other end of this TCP connection using socket"s sendall(). :param data: data to send over request socket :return: data sent """ return self.request.sendall(data) def receive_message(self): """ Receive data and return a CORE API message object. :return: received message :rtype: coreapi.CoreMessage """ try: header = self.request.recv(coreapi.CoreMessage.header_len) except IOError as e: raise IOError("error receiving header (%s)" % e) if len(header) != coreapi.CoreMessage.header_len: if len(header) == 0: raise EOFError("client disconnected") else: raise IOError("invalid message header size") message_type, message_flags, message_len = coreapi.CoreMessage.unpack_header(header) if message_len == 0: logger.warn("received message with no data") data = "" while len(data) < message_len: data += self.request.recv(message_len - len(data)) if len(data) > message_len: error_message = "received message length does not match received data (%s != %s)" % ( len(data), message_len) logger.error(error_message) raise IOError(error_message) try: message_class = coreapi.CLASS_MAP[message_type] message = message_class(message_flags, header, data) except KeyError: message = coreapi.CoreMessage(message_flags, header, data) message.message_type = message_type logger.exception("unimplemented core message type: %s", message.type_str()) return message def queue_message(self, message): """ Queue an API message for later processing. :param message: message to queue :return: nothing """ logger.debug("queueing msg (queuedtimes = %s): type %s", message.queuedtimes, MessageTypes( message.message_type)) self.message_queue.put(message) def handler_thread(self): """ CORE API message handling loop that is spawned for each server thread; get CORE API messages from the incoming message queue, and call handlemsg() for processing. :return: nothing """ while not self.done: try: message = self.message_queue.get(timeout=1) self.handle_message(message) except Queue.Empty: pass def handle_message(self, message): """ Handle an incoming message; dispatch based on message type, optionally sending replies. :param message: message to handle :return: nothing """ if self.session and self.session.broker.handle_message(message): logger.debug("message not being handled locally") return logger.debug("%s handling message:\n%s", threading.currentThread().getName(), message) if message.message_type not in self.message_handlers: logger.error("no handler for message type: %s", message.type_str()) return message_handler = self.message_handlers[message.message_type] try: # TODO: this needs to be removed, make use of the broadcast message methods replies = message_handler(message) self.dispatch_replies(replies, message) except: logger.exception("%s: exception while handling message: %s", threading.currentThread().getName(), message) def dispatch_replies(self, replies, message): """ Dispatch replies by CORE to message msg previously received from the client. :param list replies: reply messages to dispatch :param message: message for replies :return: nothing """ logger.debug("dispatching replies") for reply in replies: message_type, message_flags, message_length = coreapi.CoreMessage.unpack_header(reply) try: reply_message = coreapi.CLASS_MAP[message_type]( message_flags, reply[:coreapi.CoreMessage.header_len], reply[coreapi.CoreMessage.header_len:] ) except KeyError: # multiple TLVs of same type cause KeyError exception reply_message = "CoreMessage (type %d flags %d length %d)" % ( message_type, message_flags, message_length) logger.debug("dispatch reply:\n%s", reply_message) try: self.sendall(reply) except IOError: logger.exception("error dispatching reply") def handle(self): """ Handle a new connection request from a client. Dispatch to the recvmsg() method for receiving data into CORE API messages, and add them to an incoming message queue. :return: nothing """ # use port as session id port = self.request.getpeername()[1] # TODO: add shutdown handler for session self.session = self.coreemu.create_session(port, master=False) # self.session.shutdown_handlers.append(self.session_shutdown) logger.debug("created new session for client: %s", self.session.session_id) # TODO: hack to associate this handler with this sessions broker for broadcasting # TODO: broker needs to be pulled out of session to the server/handler level if self.master: logger.debug("session set to master") self.session.master = True self.session.broker.session_clients.append(self) # add handlers for various data self.add_session_handlers() # set initial session state self.session.set_state(EventTypes.DEFINITION_STATE) while True: try: message = self.receive_message() except EOFError: logger.info("client disconnected") break except IOError: logger.exception("error receiving message") break message.queuedtimes = 0 self.queue_message(message) # delay is required for brief connections, allow session joining if message.message_type == MessageTypes.SESSION.value: time.sleep(0.125) # broadcast node/link messages to other connected clients if message.message_type not in [MessageTypes.NODE.value, MessageTypes.LINK.value]: continue for client in self.session.broker.session_clients: if client == self: continue logger.debug("BROADCAST TO OTHER CLIENT: %s", client) client.sendall(message.raw_message) def add_session_handlers(self): logger.debug("adding session broadcast handlers") self.session.event_handlers.append(self.handle_broadcast_event) self.session.exception_handlers.append(self.handle_broadcast_exception) self.session.node_handlers.append(self.handle_broadcast_node) self.session.link_handlers.append(self.handle_broadcast_link) self.session.file_handlers.append(self.handle_broadcast_file) self.session.config_handlers.append(self.handle_broadcast_config) def remove_session_handlers(self): logger.debug("removing session broadcast handlers") self.session.event_handlers.remove(self.handle_broadcast_event) self.session.exception_handlers.remove(self.handle_broadcast_exception) self.session.node_handlers.remove(self.handle_broadcast_node) self.session.link_handlers.remove(self.handle_broadcast_link) self.session.file_handlers.remove(self.handle_broadcast_file) self.session.config_handlers.remove(self.handle_broadcast_config) def handle_node_message(self, message): """ Node Message handler :param coreapi.CoreNodeMessage message: node message :return: replies to node message """ replies = [] if message.flags & MessageFlags.ADD.value and message.flags & MessageFlags.DELETE.value: logger.warn("ignoring invalid message: add and delete flag both set") return () node_type = None node_type_value = message.get_tlv(NodeTlvs.TYPE.value) if node_type_value is not None: node_type = NodeTypes(node_type_value) node_id = message.get_tlv(NodeTlvs.NUMBER.value) node_options = NodeOptions( name=message.get_tlv(NodeTlvs.NAME.value), model=message.get_tlv(NodeTlvs.MODEL.value) ) node_options.set_position( x=message.get_tlv(NodeTlvs.X_POSITION.value), y=message.get_tlv(NodeTlvs.Y_POSITION.value) ) node_options.set_location( lat=message.get_tlv(NodeTlvs.LATITUDE.value), lon=message.get_tlv(NodeTlvs.LONGITUDE.value), alt=message.get_tlv(NodeTlvs.ALTITUDE.value) ) node_options.icon = message.get_tlv(NodeTlvs.ICON.value) node_options.canvas = message.get_tlv(NodeTlvs.CANVAS.value) node_options.opaque = message.get_tlv(NodeTlvs.OPAQUE.value) services = message.get_tlv(NodeTlvs.SERVICES.value) if services: node_options.services = services.split("|") if message.flags & MessageFlags.ADD.value: node = self.session.add_node(node_type, node_id, node_options) if node: if message.flags & MessageFlags.STRING.value: self.node_status_request[node.objid] = True if self.session.state == EventTypes.RUNTIME_STATE.value: self.send_node_emulation_id(node.objid) elif message.flags & MessageFlags.DELETE.value: with self._shutdown_lock: result = self.session.delete_node(node_id) # if we deleted a node broadcast out its removal if result and message.flags & MessageFlags.STRING.value: tlvdata = "" tlvdata += coreapi.CoreNodeTlv.pack(NodeTlvs.NUMBER.value, node_id) flags = MessageFlags.DELETE.value | MessageFlags.LOCAL.value replies.append(coreapi.CoreNodeMessage.pack(flags, tlvdata)) # node update else: self.session.update_node(node_id, node_options) return replies def handle_link_message(self, message): """ Link Message handler :param coreapi.CoreLinkMessage message: link message to handle :return: link message replies """ node_one_id = message.get_tlv(LinkTlvs.N1_NUMBER.value) node_two_id = message.get_tlv(LinkTlvs.N2_NUMBER.value) interface_one = InterfaceData( _id=message.get_tlv(LinkTlvs.INTERFACE1_NUMBER.value), name=message.get_tlv(LinkTlvs.INTERFACE1_NAME.value), mac=message.get_tlv(LinkTlvs.INTERFACE1_MAC.value), ip4=message.get_tlv(LinkTlvs.INTERFACE1_IP4.value), ip4_mask=message.get_tlv(LinkTlvs.INTERFACE1_IP4_MASK.value), ip6=message.get_tlv(LinkTlvs.INTERFACE1_IP6.value), ip6_mask=message.get_tlv(LinkTlvs.INTERFACE1_IP6_MASK.value), ) interface_two = InterfaceData( _id=message.get_tlv(LinkTlvs.INTERFACE2_NUMBER.value), name=message.get_tlv(LinkTlvs.INTERFACE2_NAME.value), mac=message.get_tlv(LinkTlvs.INTERFACE2_MAC.value), ip4=message.get_tlv(LinkTlvs.INTERFACE2_IP4.value), ip4_mask=message.get_tlv(LinkTlvs.INTERFACE2_IP4_MASK.value), ip6=message.get_tlv(LinkTlvs.INTERFACE2_IP6.value), ip6_mask=message.get_tlv(LinkTlvs.INTERFACE1_IP6_MASK.value), ) link_type = None link_type_value = message.get_tlv(LinkTlvs.TYPE.value) if link_type_value is not None: link_type = LinkTypes(link_type_value) link_options = LinkOptions(_type=link_type) link_options.delay = message.get_tlv(LinkTlvs.DELAY.value) link_options.bandwidth = message.get_tlv(LinkTlvs.BANDWIDTH.value) link_options.session = message.get_tlv(LinkTlvs.SESSION.value) link_options.per = message.get_tlv(LinkTlvs.PER.value) link_options.dup = message.get_tlv(LinkTlvs.DUP.value) link_options.jitter = message.get_tlv(LinkTlvs.JITTER.value) link_options.mer = message.get_tlv(LinkTlvs.MER.value) link_options.burst = message.get_tlv(LinkTlvs.BURST.value) link_options.mburst = message.get_tlv(LinkTlvs.MBURST.value) link_options.gui_attributes = message.get_tlv(LinkTlvs.GUI_ATTRIBUTES.value) link_options.unidirectional = message.get_tlv(LinkTlvs.UNIDIRECTIONAL.value) link_options.emulation_id = message.get_tlv(LinkTlvs.EMULATION_ID.value) link_options.network_id = message.get_tlv(LinkTlvs.NETWORK_ID.value) link_options.key = message.get_tlv(LinkTlvs.KEY.value) link_options.opaque = message.get_tlv(LinkTlvs.OPAQUE.value) if message.flags & MessageFlags.ADD.value: self.session.add_link(node_one_id, node_two_id, interface_one, interface_two, link_options) elif message.flags & MessageFlags.DELETE.value: self.session.delete_link(node_one_id, node_two_id, interface_one.id, interface_two.id) else: self.session.update_link(node_one_id, node_two_id, interface_one.id, interface_two.id, link_options) return () def handle_execute_message(self, message): """ Execute Message handler :param coreapi.CoreExecMessage message: execute message to handle :return: reply messages """ node_num = message.get_tlv(ExecuteTlvs.NODE.value) execute_num = message.get_tlv(ExecuteTlvs.NUMBER.value) execute_time = message.get_tlv(ExecuteTlvs.TIME.value) command = message.get_tlv(ExecuteTlvs.COMMAND.value) # local flag indicates command executed locally, not on a node if node_num is None and not message.flags & MessageFlags.LOCAL.value: raise ValueError("Execute Message is missing node number.") if execute_num is None: raise ValueError("Execute Message is missing execution number.") if execute_time is not None: self.session.add_event(execute_time, node=node_num, name=None, data=command) return () try: node = self.session.get_object(node_num) # build common TLV items for reply tlv_data = "" if node_num is not None: tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NODE.value, node_num) tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.NUMBER.value, execute_num) tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.COMMAND.value, command) if message.flags & MessageFlags.TTY.value: if node_num is None: raise NotImplementedError # echo back exec message with cmd for spawning interactive terminal if command == "bash": command = "/bin/bash" res = node.termcmdstring(command) tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res) reply = coreapi.CoreExecMessage.pack(MessageFlags.TTY.value, tlv_data) return reply, else: logger.info("execute message with cmd=%s", command) # execute command and send a response if message.flags & MessageFlags.STRING.value or message.flags & MessageFlags.TEXT.value: # shlex.split() handles quotes within the string if message.flags & MessageFlags.LOCAL.value: status, res = utils.cmd_output(command) else: status, res = node.cmd_output(command) logger.info("done exec cmd=%s with status=%d res=(%d bytes)", command, status, len(res)) if message.flags & MessageFlags.TEXT.value: tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.RESULT.value, res) if message.flags & MessageFlags.STRING.value: tlv_data += coreapi.CoreExecuteTlv.pack(ExecuteTlvs.STATUS.value, status) reply = coreapi.CoreExecMessage.pack(0, tlv_data) return reply, # execute the command with no response else: if message.flags & MessageFlags.LOCAL.value: utils.mute_detach(command) else: node.cmd(command, wait=False) except KeyError: logger.exception("error getting object: %s", node_num) # XXX wait and queue this message to try again later # XXX maybe this should be done differently if not message.flags & MessageFlags.LOCAL.value: time.sleep(0.125) self.queue_message(message) return () def handle_register_message(self, message): """ Register Message Handler :param coreapi.CoreRegMessage message: register message to handle :return: reply messages """ replies = [] # execute a Python script or XML file execute_server = message.get_tlv(RegisterTlvs.EXECUTE_SERVER.value) if execute_server: try: logger.info("executing: %s", execute_server) if message.flags & MessageFlags.STRING.value: old_session_ids = set(self.coreemu.sessions.keys()) sys.argv = shlex.split(execute_server) file_name = sys.argv[0] if os.path.splitext(file_name)[1].lower() == ".xml": session = self.coreemu.create_session(master=False) try: session.open_xml(file_name, start=True) except: self.coreemu.delete_session(session.session_id) raise else: thread = threading.Thread( target=execfile, args=(file_name, {"__file__": file_name, "coreemu": self.coreemu}) ) thread.daemon = True thread.start() # allow time for session creation time.sleep(0.25) if message.flags & MessageFlags.STRING.value: new_session_ids = set(self.coreemu.sessions.keys()) new_sid = new_session_ids.difference(old_session_ids) try: sid = new_sid.pop() logger.info("executed: %s as session %d", execute_server, sid) except KeyError: logger.info("executed %s with unknown session ID", execute_server) return replies logger.debug("checking session %d for RUNTIME state", sid) session = self.coreemu.sessions.get(sid) retries = 10 # wait for session to enter RUNTIME state, to prevent GUI from # connecting while nodes are still being instantiated while session.state != EventTypes.RUNTIME_STATE.value: logger.debug("waiting for session %d to enter RUNTIME state", sid) time.sleep(1) retries -= 1 if retries <= 0: logger.debug("session %d did not enter RUNTIME state", sid) return replies tlv_data = coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, execute_server) tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.SESSION.value, "%s" % sid) message = coreapi.CoreRegMessage.pack(0, tlv_data) replies.append(message) except Exception as e: logger.exception("error executing: %s", execute_server) tlv_data = coreapi.CoreExceptionTlv.pack(ExceptionTlvs.LEVEL.value, 2) tlv_data += coreapi.CoreExceptionTlv.pack(ExceptionTlvs.TEXT.value, str(e)) message = coreapi.CoreExceptionMessage.pack(0, tlv_data) replies.append(message) return replies gui = message.get_tlv(RegisterTlvs.GUI.value) if gui is None: logger.debug("ignoring Register message") else: # register capabilities with the GUI self.master = True # find the session containing this client and set the session to master for session in self.coreemu.sessions.itervalues(): if self in session.broker.session_clients: logger.debug("setting session to master: %s", session.session_id) session.master = True break replies.append(self.register()) replies.append(self.session_message()) return replies def handle_config_message(self, message): """ Configuration Message handler :param coreapi.CoreConfMessage message: configuration message to handle :return: reply messages """ # convert config message to standard config data object config_data = ConfigData( node=message.get_tlv(ConfigTlvs.NODE.value), object=message.get_tlv(ConfigTlvs.OBJECT.value), type=message.get_tlv(ConfigTlvs.TYPE.value), data_types=message.get_tlv(ConfigTlvs.DATA_TYPES.value), data_values=message.get_tlv(ConfigTlvs.VALUES.value), captions=message.get_tlv(ConfigTlvs.CAPTIONS.value), bitmap=message.get_tlv(ConfigTlvs.BITMAP.value), possible_values=message.get_tlv(ConfigTlvs.POSSIBLE_VALUES.value), groups=message.get_tlv(ConfigTlvs.GROUPS.value), session=message.get_tlv(ConfigTlvs.SESSION.value), interface_number=message.get_tlv(ConfigTlvs.INTERFACE_NUMBER.value), network_id=message.get_tlv(ConfigTlvs.NETWORK_ID.value), opaque=message.get_tlv(ConfigTlvs.OPAQUE.value) ) logger.debug("configuration message for %s node %s", config_data.object, config_data.node) # dispatch to any registered callback for this object type replies = self.session.config_object(config_data) for reply in replies: self.handle_broadcast_config(reply) return [] def handle_file_message(self, message): """ File Message handler :param coreapi.CoreFileMessage message: file message to handle :return: reply messages """ if message.flags & MessageFlags.ADD.value: node_num = message.get_tlv(FileTlvs.NODE.value) file_name = message.get_tlv(FileTlvs.NAME.value) file_type = message.get_tlv(FileTlvs.TYPE.value) source_name = message.get_tlv(FileTlvs.SOURCE_NAME.value) data = message.get_tlv(FileTlvs.DATA.value) compressed_data = message.get_tlv(FileTlvs.COMPRESSED_DATA.value) if compressed_data: logger.warn("Compressed file data not implemented for File message.") return () if source_name and data: logger.warn("ignoring invalid File message: source and data TLVs are both present") return () # some File Messages store custom files in services, # prior to node creation if file_type is not None: if file_type.startswith("service:"): _, service_name = file_type.split(':')[:2] self.session.add_node_service_file(node_num, service_name, file_name, source_name, data) return () elif file_type.startswith("hook:"): _, state = file_type.split(':')[:2] if not state.isdigit(): logger.error("error setting hook having state '%s'", state) return () state = int(state) self.session.add_hook(state, file_name, source_name, data) return () # writing a file to the host if node_num is None: if source_name is not None: shutil.copy2(source_name, file_name) else: with open(file_name, "w") as open_file: open_file.write(data) return () self.session.node_add_file(node_num, source_name, file_name, data) else: raise NotImplementedError return () def handle_interface_message(self, message): """ Interface Message handler. :param message: interface message to handle :return: reply messages """ logger.info("ignoring Interface message") return () def handle_event_message(self, message): """ Event Message handler :param coreapi.CoreEventMessage message: event message to handle :return: reply messages """ event_data = EventData( node=message.get_tlv(EventTlvs.NODE.value), event_type=message.get_tlv(EventTlvs.TYPE.value), name=message.get_tlv(EventTlvs.NAME.value), data=message.get_tlv(EventTlvs.DATA.value), time=message.get_tlv(EventTlvs.TIME.value), session=message.get_tlv(EventTlvs.SESSION.value) ) if event_data.event_type is None: raise NotImplementedError("Event message missing event type") event_type = EventTypes(event_data.event_type) node_id = event_data.node logger.debug("handling event %s at %s", event_type.name, time.ctime()) if event_type.value <= EventTypes.SHUTDOWN_STATE.value: if node_id is not None: try: node = self.session.get_object(node_id) except KeyError: raise KeyError("Event message for unknown node %d" % node_id) # configure mobility models for WLAN added during runtime if event_type == EventTypes.INSTANTIATION_STATE and nodeutils.is_node(node, NodeTypes.WIRELESS_LAN): self.session.start_mobility(node_ids=(node.objid,)) return () logger.warn("dropping unhandled Event message with node number") return () self.session.set_state(event_type) if event_type == EventTypes.DEFINITION_STATE: # clear all session objects in order to receive new definitions self.session.clear() elif event_type == EventTypes.INSTANTIATION_STATE: if len(self.handler_threads) > 1: # TODO: sync handler threads here before continuing time.sleep(2.0) # XXX # done receiving node/link configuration, ready to instantiate self.session.instantiate() # after booting nodes attempt to send emulation id for nodes waiting on status for obj in self.session.objects.itervalues(): self.send_node_emulation_id(obj.objid) elif event_type == EventTypes.RUNTIME_STATE: if self.session.master: logger.warn("Unexpected event message: RUNTIME state received at session master") else: # master event queue is started in session.checkruntime() self.session.start_events() elif event_type == EventTypes.DATACOLLECT_STATE: self.session.data_collect() elif event_type == EventTypes.SHUTDOWN_STATE: if self.session.master: logger.warn("Unexpected event message: SHUTDOWN state received at session master") elif event_type in {EventTypes.START, EventTypes.STOP, EventTypes.RESTART, EventTypes.PAUSE, EventTypes.RECONFIGURE}: handled = False name = event_data.name if name: # TODO: register system for event message handlers, # like confobjs if name.startswith("service:"): self.session.services_event(event_data) handled = True elif name.startswith("mobility:"): self.session.mobility_event(event_data) handled = True if not handled: logger.warn("Unhandled event message: event type %s ", event_type.name) elif event_type == EventTypes.FILE_OPEN: filename = event_data.name self.session.open_xml(filename, start=False) self.session.send_objects() return () elif event_type == EventTypes.FILE_SAVE: filename = event_data.name self.session.save_xml(filename, self.session.config["xmlfilever"]) elif event_type == EventTypes.SCHEDULED: etime = event_data.time node = event_data.node name = event_data.name data = event_data.data if etime is None: logger.warn("Event message scheduled event missing start time") return () if message.flags & MessageFlags.ADD.value: self.session.add_event(float(etime), node=node, name=name, data=data) else: raise NotImplementedError else: logger.warn("unhandled event message: event type %s", event_type) return () def handle_session_message(self, message): """ Session Message handler :param coreapi.CoreSessionMessage message: session message to handle :return: reply messages """ session_id_str = message.get_tlv(SessionTlvs.NUMBER.value) session_ids = coreapi.str_to_list(session_id_str) name_str = message.get_tlv(SessionTlvs.NAME.value) names = coreapi.str_to_list(name_str) file_str = message.get_tlv(SessionTlvs.FILE.value) files = coreapi.str_to_list(file_str) thumb = message.get_tlv(SessionTlvs.THUMB.value) user = message.get_tlv(SessionTlvs.USER.value) logger.debug("SESSION message flags=0x%x sessions=%s" % (message.flags, session_id_str)) if message.flags == 0: for index, session_id in enumerate(session_ids): session_id = int(session_id) if session_id == 0: session = self.session else: session = self.coreemu.sessions.get(session_id) if session is None: logger.warn("session %s not found", session_id) continue logger.info("request to modify to session: %s", session.session_id) if names is not None: session.name = names[index] if files is not None: session.file_name = files[index] if thumb: session.set_thumbnail(thumb) if user: session.set_user(user) elif message.flags & MessageFlags.STRING.value and not message.flags & MessageFlags.ADD.value: # status request flag: send list of sessions return self.session_message(), else: # handle ADD or DEL flags for session_id in session_ids: session_id = int(session_id) session = self.coreemu.sessions.get(session_id) if session is None: logger.info("session %s not found (flags=0x%x)", session_id, message.flags) continue if message.flags & MessageFlags.ADD.value: # connect to the first session that exists logger.info("request to connect to session %s", session_id) # remove client from session broker and shutdown if needed self.remove_session_handlers() self.session.broker.session_clients.remove(self) if not self.session.broker.session_clients and not self.session.is_active(): self.coreemu.delete_session(self.session.session_id) # set session to join self.session = session # add client to session broker and set master if needed if self.master: self.session.master = True self.session.broker.session_clients.append(self) # add broadcast handlers logger.info("adding session broadcast handlers") self.add_session_handlers() if user: self.session.set_user(user) if message.flags & MessageFlags.STRING.value: self.session.send_objects() elif message.flags & MessageFlags.DELETE.value: # shut down the specified session(s) logger.info("request to terminate session %s" % session_id) session.shutdown() else: logger.warn("unhandled session flags for session %s", session_id) return () def send_node_emulation_id(self, node_id): """ Node emulation id to send. :param int node_id: node id to send :return: nothing """ if node_id in self.node_status_request: tlv_data = "" tlv_data += coreapi.CoreNodeTlv.pack(NodeTlvs.NUMBER.value, node_id) tlv_data += coreapi.CoreNodeTlv.pack(NodeTlvs.EMULATION_ID.value, node_id) reply = coreapi.CoreNodeMessage.pack(MessageFlags.ADD.value | MessageFlags.LOCAL.value, tlv_data) try: self.sendall(reply) except IOError: logger.exception("error sending node emulation id message: %s", node_id) del self.node_status_request[node_id]