""" Uses coreapi_data for message and TLV types, and defines TLV data types and objects used for parsing and building CORE API messages. CORE API messaging is leveraged for communication with the GUI. """ import socket import struct from enum import Enum from core.enumerations import ConfigTlvs from core.enumerations import EventTlvs from core.enumerations import EventTypes from core.enumerations import ExceptionTlvs from core.enumerations import ExecuteTlvs from core.enumerations import FileTlvs from core.enumerations import InterfaceTlvs from core.enumerations import LinkTlvs from core.enumerations import MessageFlags from core.enumerations import MessageTypes from core.enumerations import NodeTlvs from core.enumerations import RegisterTlvs from core.enumerations import SessionTlvs from core.misc import structutils from core.misc.ipaddress import IpAddress from core.misc.ipaddress import MacAddress class CoreTlvData(object): """ Helper base class used for packing and unpacking values using struct. """ # format string for packing data data_format = None # python data type for the data data_type = None # pad length for data after packing pad_len = None @classmethod def pack(cls, value): """ Convenience method for packing data using the struct module. :param value: value to pack :return: length of data and the packed data itself :rtype: tuple """ data = struct.pack(cls.data_format, value) length = len(data) - cls.pad_len return length, data @classmethod def unpack(cls, data): """ Convenience method for unpacking data using the struct module. :param data: data to unpack :return: the value of the unpacked data """ return struct.unpack(cls.data_format, data)[0] @classmethod def pack_string(cls, value): """ Convenience method for packing data from a string representation. :param str value: value to pack :return: length of data and the packed data itself :rtype: tuple """ return cls.pack(cls.from_string(value)) @classmethod def from_string(cls, value): """ Retrieve the value type from a string representation. :param str value: value to get a data type from :return: value parse from string representation """ return cls.data_type(value) class CoreTlvDataObj(CoreTlvData): """ Helper class for packing custom object data. """ @classmethod def pack(cls, obj): """ Convenience method for packing custom object data. :param obj: custom object to pack :return: length of data and the packed data itself :rtype: tuple """ value = cls.get_value(obj) return super(CoreTlvDataObj, cls).pack(value) @classmethod def unpack(cls, data): """ Convenience method for unpacking custom object data. :param data: data to unpack custom object from :return: unpacked custom object """ data = super(CoreTlvDataObj, cls).unpack(data) return cls.new_obj(data) @staticmethod def get_value(obj): """ Method that will be used to retrieve the data to pack from a custom object. :param obj: custom object to get data to pack :return: data value to pack """ raise NotImplementedError @staticmethod def new_obj(obj): """ Method for retrieving data to unpack from an object. :param obj: object to get unpack data from :return: value of unpacked data """ raise NotImplementedError class CoreTlvDataUint16(CoreTlvData): """ Helper class for packing uint16 data. """ data_format = "!H" data_type = int pad_len = 0 class CoreTlvDataUint32(CoreTlvData): """ Helper class for packing uint32 data. """ data_format = "!2xI" data_type = int pad_len = 2 class CoreTlvDataUint64(CoreTlvData): """ Helper class for packing uint64 data. """ data_format = "!2xQ" data_type = long pad_len = 2 class CoreTlvDataString(CoreTlvData): """ Helper class for packing string data. """ data_type = str @classmethod def pack(cls, value): """ Convenience method for packing string data. :param str value: string to pack :return: length of data packed and the packed data :rtype: tuple """ if not isinstance(value, str): raise ValueError("value not a string: %s" % value) if len(value) < 256: header_len = CoreTlv.header_len else: header_len = CoreTlv.long_header_len pad_len = -(header_len + len(value)) % 4 return len(value), value + "\0" * pad_len @classmethod def unpack(cls, data): """ Convenience method for unpacking string data. :param str data: unpack string data :return: unpacked string data """ return data.rstrip("\0") class CoreTlvDataUint16List(CoreTlvData): """ List of unsigned 16-bit values. """ data_type = tuple data_format = "!H" @classmethod def pack(cls, values): """ Convenience method for packing a uint 16 list. :param list values: unint 16 list to pack :return: length of data packed and the packed data :rtype: tuple """ if not isinstance(values, tuple): raise ValueError("value not a tuple: %s" % values) data = "" for value in values: data += struct.pack(cls.data_format, value) pad_len = -(CoreTlv.header_len + len(data)) % 4 return len(data), data + "\0" * pad_len @classmethod def unpack(cls, data): """ Convenience method for unpacking a uint 16 list. :param data: data to unpack :return: unpacked data """ data_format = "!%dH" % (len(data) / 2) return struct.unpack(data_format, data) @classmethod def from_string(cls, value): """ Retrieves a unint 16 list from a string :param str value: string representation of a uint 16 list :return: unint 16 list :rtype: list """ return tuple(map(lambda (x): int(x), value.split())) class CoreTlvDataIpv4Addr(CoreTlvDataObj): """ Utility class for packing/unpacking Ipv4 addresses. """ data_type = IpAddress.from_string data_format = "!2x4s" pad_len = 2 @staticmethod def get_value(obj): """ Retrieve Ipv4 address value from object. :param core.misc.ipaddress.IpAddress obj: ip address to get value from :return: """ return obj.addr @staticmethod def new_obj(value): """ Retrieve Ipv4 address from a string representation. :param str value: value to get Ipv4 address from :return: Ipv4 address :rtype: core.misc.ipaddress.IpAddress """ return IpAddress(af=socket.AF_INET, address=value) class CoreTlvDataIPv6Addr(CoreTlvDataObj): """ Utility class for packing/unpacking Ipv6 addresses. """ data_format = "!16s2x" data_type = IpAddress.from_string pad_len = 2 @staticmethod def get_value(obj): """ Retrieve Ipv6 address value from object. :param core.misc.ipaddress.IpAddress obj: ip address to get value from :return: """ return obj.addr @staticmethod def new_obj(value): """ Retrieve Ipv6 address from a string representation. :param str value: value to get Ipv4 address from :return: Ipv4 address :rtype: core.misc.ipaddress.IpAddress """ return IpAddress(af=socket.AF_INET6, address=value) class CoreTlvDataMacAddr(CoreTlvDataObj): """ Utility class for packing/unpacking mac addresses. """ data_format = "!2x8s" data_type = MacAddress.from_string pad_len = 2 @staticmethod def get_value(obj): """ Retrieve Ipv6 address value from object. :param core.misc.ipaddress.MacAddress obj: mac address to get value from :return: """ # extend to 64 bits return "\0\0" + obj.addr @staticmethod def new_obj(value): """ Retrieve mac address from a string representation. :param str value: value to get Ipv4 address from :return: Ipv4 address :rtype: core.misc.ipaddress.MacAddress """ # only use 48 bits return MacAddress(address=value[2:]) class CoreTlv(object): """ Base class for representing CORE TLVs. """ header_format = "!BB" header_len = struct.calcsize(header_format) long_header_format = "!BBH" long_header_len = struct.calcsize(long_header_format) tlv_type_map = Enum tlv_data_class_map = {} def __init__(self, tlv_type, tlv_data): """ Create a CoreTlv instance. :param int tlv_type: tlv type :param tlv_data: data to unpack :return: unpacked data """ self.tlv_type = tlv_type if tlv_data: try: self.value = self.tlv_data_class_map[self.tlv_type].unpack(tlv_data) except KeyError: self.value = tlv_data else: self.value = None @classmethod def unpack(cls, data): """ Parse data and return unpacked class. :param data: data to unpack :return: unpacked data class """ tlv_type, tlv_len = struct.unpack(cls.header_format, data[:cls.header_len]) header_len = cls.header_len if tlv_len == 0: tlv_type, zero, tlv_len = struct.unpack(cls.long_header_format, data[:cls.long_header_len]) header_len = cls.long_header_len tlv_size = header_len + tlv_len # for 32-bit alignment tlv_size += -tlv_size % 4 return cls(tlv_type, data[header_len:tlv_size]), data[tlv_size:] @classmethod def pack(cls, tlv_type, value): """ Pack a TLV value, based on type. :param int tlv_type: type of data to pack :param value: data to pack :return: header and packed data """ tlv_len, tlv_data = cls.tlv_data_class_map[tlv_type].pack(value) if tlv_len < 256: hdr = struct.pack(cls.header_format, tlv_type, tlv_len) else: hdr = struct.pack(cls.long_header_format, tlv_type, 0, tlv_len) return hdr + tlv_data @classmethod def pack_string(cls, tlv_type, value): """ Pack data type from a string representation :param int tlv_type: type of data to pack :param str value: string representation of data :return: header and packed data """ return cls.pack(tlv_type, cls.tlv_data_class_map[tlv_type].from_string(value)) def type_str(self): """ Retrieve type string for this data type. :return: data type name :rtype: str """ try: return self.tlv_type_map(self.tlv_type).name except ValueError: return "unknown tlv type: %s" % str(self.tlv_type) def __str__(self): """ String representation of this data type. :return: string representation :rtype: str """ return "%s " % (self.__class__.__name__, self.type_str(), self.value) class CoreNodeTlv(CoreTlv): """ Class for representing CORE Node TLVs. """ tlv_type_map = NodeTlvs tlv_data_class_map = { NodeTlvs.NUMBER.value: CoreTlvDataUint32, NodeTlvs.TYPE.value: CoreTlvDataUint32, NodeTlvs.NAME.value: CoreTlvDataString, NodeTlvs.IP_ADDRESS.value: CoreTlvDataIpv4Addr, NodeTlvs.MAC_ADDRESS.value: CoreTlvDataMacAddr, NodeTlvs.IP6_ADDRESS.value: CoreTlvDataIPv6Addr, NodeTlvs.MODEL.value: CoreTlvDataString, NodeTlvs.EMULATION_SERVER.value: CoreTlvDataString, NodeTlvs.SESSION.value: CoreTlvDataString, NodeTlvs.X_POSITION.value: CoreTlvDataUint16, NodeTlvs.Y_POSITION.value: CoreTlvDataUint16, NodeTlvs.CANVAS.value: CoreTlvDataUint16, NodeTlvs.EMULATION_ID.value: CoreTlvDataUint32, NodeTlvs.NETWORK_ID.value: CoreTlvDataUint32, NodeTlvs.SERVICES.value: CoreTlvDataString, NodeTlvs.LATITUDE.value: CoreTlvDataString, NodeTlvs.LONGITUDE.value: CoreTlvDataString, NodeTlvs.ALTITUDE.value: CoreTlvDataString, NodeTlvs.ICON.value: CoreTlvDataString, NodeTlvs.OPAQUE.value: CoreTlvDataString, } class CoreLinkTlv(CoreTlv): """ Class for representing CORE link TLVs. """ tlv_type_map = LinkTlvs tlv_data_class_map = { LinkTlvs.N1_NUMBER.value: CoreTlvDataUint32, LinkTlvs.N2_NUMBER.value: CoreTlvDataUint32, LinkTlvs.DELAY.value: CoreTlvDataUint64, LinkTlvs.BANDWIDTH.value: CoreTlvDataUint64, LinkTlvs.PER.value: CoreTlvDataString, LinkTlvs.DUP.value: CoreTlvDataString, LinkTlvs.JITTER.value: CoreTlvDataUint64, LinkTlvs.MER.value: CoreTlvDataUint16, LinkTlvs.BURST.value: CoreTlvDataUint16, LinkTlvs.SESSION.value: CoreTlvDataString, LinkTlvs.MBURST.value: CoreTlvDataUint16, LinkTlvs.TYPE.value: CoreTlvDataUint32, LinkTlvs.GUI_ATTRIBUTES.value: CoreTlvDataString, LinkTlvs.UNIDIRECTIONAL.value: CoreTlvDataUint16, LinkTlvs.EMULATION_ID.value: CoreTlvDataUint32, LinkTlvs.NETWORK_ID.value: CoreTlvDataUint32, LinkTlvs.KEY.value: CoreTlvDataUint32, LinkTlvs.INTERFACE1_NUMBER.value: CoreTlvDataUint16, LinkTlvs.INTERFACE1_IP4.value: CoreTlvDataIpv4Addr, LinkTlvs.INTERFACE1_IP4_MASK.value: CoreTlvDataUint16, LinkTlvs.INTERFACE1_MAC.value: CoreTlvDataMacAddr, LinkTlvs.INTERFACE1_IP6.value: CoreTlvDataIPv6Addr, LinkTlvs.INTERFACE1_IP6_MASK.value: CoreTlvDataUint16, LinkTlvs.INTERFACE2_NUMBER.value: CoreTlvDataUint16, LinkTlvs.INTERFACE2_IP4.value: CoreTlvDataIpv4Addr, LinkTlvs.INTERFACE2_IP4_MASK.value: CoreTlvDataUint16, LinkTlvs.INTERFACE2_MAC.value: CoreTlvDataMacAddr, LinkTlvs.INTERFACE2_IP6.value: CoreTlvDataIPv6Addr, LinkTlvs.INTERFACE2_IP6_MASK.value: CoreTlvDataUint16, LinkTlvs.INTERFACE1_NAME.value: CoreTlvDataString, LinkTlvs.INTERFACE2_NAME.value: CoreTlvDataString, LinkTlvs.OPAQUE.value: CoreTlvDataString, } class CoreExecuteTlv(CoreTlv): """ Class for representing CORE execute TLVs. """ tlv_type_map = ExecuteTlvs tlv_data_class_map = { ExecuteTlvs.NODE.value: CoreTlvDataUint32, ExecuteTlvs.NUMBER.value: CoreTlvDataUint32, ExecuteTlvs.TIME.value: CoreTlvDataUint32, ExecuteTlvs.COMMAND.value: CoreTlvDataString, ExecuteTlvs.RESULT.value: CoreTlvDataString, ExecuteTlvs.STATUS.value: CoreTlvDataUint32, ExecuteTlvs.SESSION.value: CoreTlvDataString, } class CoreRegisterTlv(CoreTlv): """ Class for representing CORE register TLVs. """ tlv_type_map = RegisterTlvs tlv_data_class_map = { RegisterTlvs.WIRELESS.value: CoreTlvDataString, RegisterTlvs.MOBILITY.value: CoreTlvDataString, RegisterTlvs.UTILITY.value: CoreTlvDataString, RegisterTlvs.EXECUTE_SERVER.value: CoreTlvDataString, RegisterTlvs.GUI.value: CoreTlvDataString, RegisterTlvs.EMULATION_SERVER.value: CoreTlvDataString, RegisterTlvs.SESSION.value: CoreTlvDataString, } class CoreConfigTlv(CoreTlv): """ Class for representing CORE configuration TLVs. """ tlv_type_map = ConfigTlvs tlv_data_class_map = { ConfigTlvs.NODE.value: CoreTlvDataUint32, ConfigTlvs.OBJECT.value: CoreTlvDataString, ConfigTlvs.TYPE.value: CoreTlvDataUint16, ConfigTlvs.DATA_TYPES.value: CoreTlvDataUint16List, ConfigTlvs.VALUES.value: CoreTlvDataString, ConfigTlvs.CAPTIONS.value: CoreTlvDataString, ConfigTlvs.BITMAP.value: CoreTlvDataString, ConfigTlvs.POSSIBLE_VALUES.value: CoreTlvDataString, ConfigTlvs.GROUPS.value: CoreTlvDataString, ConfigTlvs.SESSION.value: CoreTlvDataString, ConfigTlvs.INTERFACE_NUMBER.value: CoreTlvDataUint16, ConfigTlvs.NETWORK_ID.value: CoreTlvDataUint32, ConfigTlvs.OPAQUE.value: CoreTlvDataString, } class CoreFileTlv(CoreTlv): """ Class for representing CORE file TLVs. """ tlv_type_map = FileTlvs tlv_data_class_map = { FileTlvs.NODE.value: CoreTlvDataUint32, FileTlvs.NAME.value: CoreTlvDataString, FileTlvs.MODE.value: CoreTlvDataString, FileTlvs.NUMBER.value: CoreTlvDataUint16, FileTlvs.TYPE.value: CoreTlvDataString, FileTlvs.SOURCE_NAME.value: CoreTlvDataString, FileTlvs.SESSION.value: CoreTlvDataString, FileTlvs.DATA.value: CoreTlvDataString, FileTlvs.COMPRESSED_DATA.value: CoreTlvDataString, } class CoreInterfaceTlv(CoreTlv): """ Class for representing CORE interface TLVs. """ tlv_type_map = InterfaceTlvs tlv_data_class_map = { InterfaceTlvs.NODE.value: CoreTlvDataUint32, InterfaceTlvs.NUMBER.value: CoreTlvDataUint16, InterfaceTlvs.NAME.value: CoreTlvDataString, InterfaceTlvs.IP_ADDRESS.value: CoreTlvDataIpv4Addr, InterfaceTlvs.MASK.value: CoreTlvDataUint16, InterfaceTlvs.MAC_ADDRESS.value: CoreTlvDataMacAddr, InterfaceTlvs.IP6_ADDRESS.value: CoreTlvDataIPv6Addr, InterfaceTlvs.IP6_MASK.value: CoreTlvDataUint16, InterfaceTlvs.TYPE.value: CoreTlvDataUint16, InterfaceTlvs.SESSION.value: CoreTlvDataString, InterfaceTlvs.STATE.value: CoreTlvDataUint16, InterfaceTlvs.EMULATION_ID.value: CoreTlvDataUint32, InterfaceTlvs.NETWORK_ID.value: CoreTlvDataUint32, } class CoreEventTlv(CoreTlv): """ Class for representing CORE event TLVs. """ tlv_type_map = EventTlvs tlv_data_class_map = { EventTlvs.NODE.value: CoreTlvDataUint32, EventTlvs.TYPE.value: CoreTlvDataUint32, EventTlvs.NAME.value: CoreTlvDataString, EventTlvs.DATA.value: CoreTlvDataString, EventTlvs.TIME.value: CoreTlvDataString, EventTlvs.SESSION.value: CoreTlvDataString, } class CoreSessionTlv(CoreTlv): """ Class for representing CORE session TLVs. """ tlv_type_map = SessionTlvs tlv_data_class_map = { SessionTlvs.NUMBER.value: CoreTlvDataString, SessionTlvs.NAME.value: CoreTlvDataString, SessionTlvs.FILE.value: CoreTlvDataString, SessionTlvs.NODE_COUNT.value: CoreTlvDataString, SessionTlvs.DATE.value: CoreTlvDataString, SessionTlvs.THUMB.value: CoreTlvDataString, SessionTlvs.USER.value: CoreTlvDataString, SessionTlvs.OPAQUE.value: CoreTlvDataString, } class CoreExceptionTlv(CoreTlv): """ Class for representing CORE exception TLVs. """ tlv_type_map = ExceptionTlvs tlv_data_class_map = { ExceptionTlvs.NODE.value: CoreTlvDataUint32, ExceptionTlvs.SESSION.value: CoreTlvDataString, ExceptionTlvs.LEVEL.value: CoreTlvDataUint16, ExceptionTlvs.SOURCE.value: CoreTlvDataString, ExceptionTlvs.DATE.value: CoreTlvDataString, ExceptionTlvs.TEXT.value: CoreTlvDataString, ExceptionTlvs.OPAQUE.value: CoreTlvDataString, } class CoreMessage(object): """ Base class for representing CORE messages. """ header_format = "!BBH" header_len = struct.calcsize(header_format) message_type = None flag_map = MessageFlags tlv_class = CoreTlv def __init__(self, flags, hdr, data): self.raw_message = hdr + data self.flags = flags self.tlv_data = {} self.parse_data(data) @classmethod def unpack_header(cls, data): """ parse data and return (message_type, message_flags, message_len). :param str data: data to parse :return: unpacked tuple :rtype: tuple """ message_type, message_flags, message_len = struct.unpack(cls.header_format, data[:cls.header_len]) return message_type, message_flags, message_len @classmethod def create(cls, flags, values): tlv_data = structutils.pack_values(cls.tlv_class, values) packed = cls.pack(flags, tlv_data) header_data = packed[:cls.header_len] return cls(flags, header_data, tlv_data) @classmethod def pack(cls, message_flags, tlv_data): """ Pack CORE message data. :param message_flags: message flags to pack with data :param tlv_data: data to get length from for packing :return: combined header and tlv data """ header = struct.pack(cls.header_format, cls.message_type, message_flags, len(tlv_data)) return header + tlv_data def add_tlv_data(self, key, value): """ Add TLV data into the data map. :param int key: key to store TLV data :param value: data to associate with key :return: nothing """ if key in self.tlv_data: raise KeyError("key already exists: %s (val=%s)" % (key, value)) self.tlv_data[key] = value def get_tlv(self, tlv_type): """ Retrieve TLV data from data map. :param int tlv_type: type of data to retrieve :return: TLV type data """ return self.tlv_data.get(tlv_type) def parse_data(self, data): """ Parse data while possible and adding TLV data to the data map. :param data: data to parse for TLV data :return: nothing """ while data: tlv, data = self.tlv_class.unpack(data) self.add_tlv_data(tlv.tlv_type, tlv.value) def pack_tlv_data(self): """ Opposite of parse_data(). Return packed TLV data using self.tlv_data dict. Used by repack(). :return: packed data :rtype: str """ tlv_data = "" keys = sorted(self.tlv_data.keys()) for key in keys: value = self.tlv_data[key] tlv_data += self.tlv_class.pack(key, value) return tlv_data def repack(self): """ Invoke after updating self.tlv_data[] to rebuild self.raw_message. Useful for modifying a message that has been parsed, before sending the raw data again. :return: nothing """ tlv_data = self.pack_tlv_data() self.raw_message = self.pack(self.flags, tlv_data) def type_str(self): """ Retrieve data of the message type. :return: name of message type :rtype: str """ try: return MessageTypes(self.message_type).name except ValueError: return "unknown message type: %s" % str(self.message_type) def flag_str(self): """ Retrieve message flag string. :return: message flag string :rtype: str """ message_flags = [] flag = 1L while True: if self.flags & flag: try: message_flags.append(self.flag_map(flag).name) except ValueError: message_flags.append("0x%x" % flag) flag <<= 1 if not (self.flags & ~(flag - 1)): break return "0x%x <%s>" % (self.flags, " | ".join(message_flags)) def __str__(self): """ Retrieve string representation of the message. :return: string representation :rtype: str """ result = "%s " % (self.__class__.__name__, self.type_str(), self.flag_str()) for key, value in self.tlv_data.iteritems(): try: tlv_type = self.tlv_class.tlv_type_map(key).name except ValueError: tlv_type = "tlv type %s" % key result += "\n %s: %s" % (tlv_type, value) return result def node_numbers(self): """ Return a list of node numbers included in this message. """ number1 = None number2 = None # not all messages have node numbers if self.message_type == MessageTypes.NODE.value: number1 = self.get_tlv(NodeTlvs.NUMBER.value) elif self.message_type == MessageTypes.LINK.value: number1 = self.get_tlv(LinkTlvs.N1_NUMBER.value) number2 = self.get_tlv(LinkTlvs.N2_NUMBER.value) elif self.message_type == MessageTypes.EXECUTE.value: number1 = self.get_tlv(ExecuteTlvs.NODE.value) elif self.message_type == MessageTypes.CONFIG.value: number1 = self.get_tlv(ConfigTlvs.NODE.value) elif self.message_type == MessageTypes.FILE.value: number1 = self.get_tlv(FileTlvs.NODE.value) elif self.message_type == MessageTypes.INTERFACE.value: number1 = self.get_tlv(InterfaceTlvs.NODE.value) elif self.message_type == MessageTypes.EVENT.value: number1 = self.get_tlv(EventTlvs.NODE) result = [] if number1: result.append(number1) if number2: result.append(number2) return result def session_numbers(self): """ Return a list of session numbers included in this message. """ result = [] if self.message_type == MessageTypes.SESSION.value: sessions = self.get_tlv(SessionTlvs.NUMBER.value) elif self.message_type == MessageTypes.EXCEPTION.value: sessions = self.get_tlv(ExceptionTlvs.SESSION.value) else: # All other messages share TLV number 0xA for the session number(s). sessions = self.get_tlv(NodeTlvs.SESSION.value) if sessions: for session_id in sessions.split("|"): result.append(int(session_id)) return result class CoreNodeMessage(CoreMessage): """ CORE node message class. """ message_type = MessageTypes.NODE.value tlv_class = CoreNodeTlv class CoreLinkMessage(CoreMessage): """ CORE link message class. """ message_type = MessageTypes.LINK.value tlv_class = CoreLinkTlv class CoreExecMessage(CoreMessage): """ CORE execute message class. """ message_type = MessageTypes.EXECUTE.value tlv_class = CoreExecuteTlv class CoreRegMessage(CoreMessage): """ CORE register message class. """ message_type = MessageTypes.REGISTER.value tlv_class = CoreRegisterTlv class CoreConfMessage(CoreMessage): """ CORE configuration message class. """ message_type = MessageTypes.CONFIG.value tlv_class = CoreConfigTlv class CoreFileMessage(CoreMessage): """ CORE file message class. """ message_type = MessageTypes.FILE.value tlv_class = CoreFileTlv class CoreIfaceMessage(CoreMessage): """ CORE interface message class. """ message_type = MessageTypes.INTERFACE.value tlv_class = CoreInterfaceTlv class CoreEventMessage(CoreMessage): """ CORE event message class. """ message_type = MessageTypes.EVENT.value tlv_class = CoreEventTlv class CoreSessionMessage(CoreMessage): """ CORE session message class. """ message_type = MessageTypes.SESSION.value tlv_class = CoreSessionTlv class CoreExceptionMessage(CoreMessage): """ CORE exception message class. """ message_type = MessageTypes.EXCEPTION.value tlv_class = CoreExceptionTlv # map used to translate enumerated message type values to message class objects CLASS_MAP = { MessageTypes.NODE.value: CoreNodeMessage, MessageTypes.LINK.value: CoreLinkMessage, MessageTypes.EXECUTE.value: CoreExecMessage, MessageTypes.REGISTER.value: CoreRegMessage, MessageTypes.CONFIG.value: CoreConfMessage, MessageTypes.FILE.value: CoreFileMessage, MessageTypes.INTERFACE.value: CoreIfaceMessage, MessageTypes.EVENT.value: CoreEventMessage, MessageTypes.SESSION.value: CoreSessionMessage, MessageTypes.EXCEPTION.value: CoreExceptionMessage, } def str_to_list(value): """ Helper to convert pipe-delimited string ("a|b|c") into a list (a, b, c). :param str value: string to convert :return: converted list :rtype: list """ if value is None: return None return value.split("|") def state_name(value): """ Helper to convert state number into state name using event types. :param int value: state value to derive name from :return: state name :rtype: str """ try: value = EventTypes(value).name except ValueError: value = "unknown" return value