core-extra/daemon/core/api/coreapi.py

1005 lines
29 KiB
Python

"""
Uses coreapi_data for message and TLV types, and defines TLV data
types and objects used for parsing and building CORE API messages.
CORE API messaging is leveraged for communication with the GUI.
"""
import socket
import struct
from enum import Enum
from core import logger
from core.enumerations import ConfigTlvs
from core.enumerations import EventTlvs
from core.enumerations import EventTypes
from core.enumerations import ExceptionTlvs
from core.enumerations import ExecuteTlvs
from core.enumerations import FileTlvs
from core.enumerations import InterfaceTlvs
from core.enumerations import LinkTlvs
from core.enumerations import MessageFlags
from core.enumerations import MessageTypes
from core.enumerations import NodeTlvs
from core.enumerations import RegisterTlvs
from core.enumerations import SessionTlvs
from core.misc import structutils
from core.misc.ipaddress import IpAddress
from core.misc.ipaddress import MacAddress
class CoreTlvData(object):
"""
Helper base class used for packing and unpacking values using struct.
"""
# format string for packing data
data_format = None
# python data type for the data
data_type = None
# pad length for data after packing
pad_len = None
@classmethod
def pack(cls, value):
"""
Convenience method for packing data using the struct module.
:param value: value to pack
:return: length of data and the packed data itself
:rtype: tuple
"""
data = struct.pack(cls.data_format, value)
length = len(data) - cls.pad_len
return length, data
@classmethod
def unpack(cls, data):
"""
Convenience method for unpacking data using the struct module.
:param data: data to unpack
:return: the value of the unpacked data
"""
return struct.unpack(cls.data_format, data)[0]
@classmethod
def pack_string(cls, value):
"""
Convenience method for packing data from a string representation.
:param str value: value to pack
:return: length of data and the packed data itself
:rtype: tuple
"""
return cls.pack(cls.from_string(value))
@classmethod
def from_string(cls, value):
"""
Retrieve the value type from a string representation.
:param str value: value to get a data type from
:return: value parse from string representation
"""
return cls.data_type(value)
class CoreTlvDataObj(CoreTlvData):
"""
Helper class for packing custom object data.
"""
@classmethod
def pack(cls, obj):
"""
Convenience method for packing custom object data.
:param obj: custom object to pack
:return: length of data and the packed data itself
:rtype: tuple
"""
value = cls.get_value(obj)
return super(CoreTlvDataObj, cls).pack(value)
@classmethod
def unpack(cls, data):
"""
Convenience method for unpacking custom object data.
:param data: data to unpack custom object from
:return: unpacked custom object
"""
data = super(CoreTlvDataObj, cls).unpack(data)
return cls.new_obj(data)
@staticmethod
def get_value(obj):
"""
Method that will be used to retrieve the data to pack from a custom object.
:param obj: custom object to get data to pack
:return: data value to pack
"""
raise NotImplementedError
@staticmethod
def new_obj(obj):
"""
Method for retrieving data to unpack from an object.
:param obj: object to get unpack data from
:return: value of unpacked data
"""
raise NotImplementedError
class CoreTlvDataUint16(CoreTlvData):
"""
Helper class for packing uint16 data.
"""
data_format = "!H"
data_type = int
pad_len = 0
class CoreTlvDataUint32(CoreTlvData):
"""
Helper class for packing uint32 data.
"""
data_format = "!2xI"
data_type = int
pad_len = 2
class CoreTlvDataUint64(CoreTlvData):
"""
Helper class for packing uint64 data.
"""
data_format = "!2xQ"
data_type = long
pad_len = 2
class CoreTlvDataString(CoreTlvData):
"""
Helper class for packing string data.
"""
data_type = str
@classmethod
def pack(cls, value):
"""
Convenience method for packing string data.
:param str value: string to pack
:return: length of data packed and the packed data
:rtype: tuple
"""
if not isinstance(value, str):
raise ValueError("value not a string: %s" % value)
if len(value) < 256:
header_len = CoreTlv.header_len
else:
header_len = CoreTlv.long_header_len
pad_len = -(header_len + len(value)) % 4
return len(value), value + "\0" * pad_len
@classmethod
def unpack(cls, data):
"""
Convenience method for unpacking string data.
:param str data: unpack string data
:return: unpacked string data
"""
return data.rstrip("\0")
class CoreTlvDataUint16List(CoreTlvData):
"""
List of unsigned 16-bit values.
"""
data_type = tuple
data_format = "!H"
@classmethod
def pack(cls, values):
"""
Convenience method for packing a uint 16 list.
:param list values: unint 16 list to pack
:return: length of data packed and the packed data
:rtype: tuple
"""
if not isinstance(values, tuple):
raise ValueError("value not a tuple: %s" % values)
data = ""
for value in values:
data += struct.pack(cls.data_format, value)
pad_len = -(CoreTlv.header_len + len(data)) % 4
return len(data), data + "\0" * pad_len
@classmethod
def unpack(cls, data):
"""
Convenience method for unpacking a uint 16 list.
:param data: data to unpack
:return: unpacked data
"""
data_format = "!%dH" % (len(data) / 2)
return struct.unpack(data_format, data)
@classmethod
def from_string(cls, value):
"""
Retrieves a unint 16 list from a string
:param str value: string representation of a uint 16 list
:return: unint 16 list
:rtype: list
"""
return tuple(map(lambda (x): int(x), value.split()))
class CoreTlvDataIpv4Addr(CoreTlvDataObj):
"""
Utility class for packing/unpacking Ipv4 addresses.
"""
data_type = IpAddress.from_string
data_format = "!2x4s"
pad_len = 2
@staticmethod
def get_value(obj):
"""
Retrieve Ipv4 address value from object.
:param core.misc.ipaddress.IpAddress obj: ip address to get value from
:return:
"""
return obj.addr
@staticmethod
def new_obj(value):
"""
Retrieve Ipv4 address from a string representation.
:param str value: value to get Ipv4 address from
:return: Ipv4 address
:rtype: core.misc.ipaddress.IpAddress
"""
logger.info("getting new ipv4 address for: %s", value)
return IpAddress(af=socket.AF_INET, address=value)
class CoreTlvDataIPv6Addr(CoreTlvDataObj):
"""
Utility class for packing/unpacking Ipv6 addresses.
"""
data_format = "!16s2x"
data_type = IpAddress.from_string
pad_len = 2
@staticmethod
def get_value(obj):
"""
Retrieve Ipv6 address value from object.
:param core.misc.ipaddress.IpAddress obj: ip address to get value from
:return:
"""
return obj.addr
@staticmethod
def new_obj(value):
"""
Retrieve Ipv6 address from a string representation.
:param str value: value to get Ipv4 address from
:return: Ipv4 address
:rtype: core.misc.ipaddress.IpAddress
"""
return IpAddress(af=socket.AF_INET6, address=value)
class CoreTlvDataMacAddr(CoreTlvDataObj):
"""
Utility class for packing/unpacking mac addresses.
"""
data_format = "!2x8s"
data_type = MacAddress.from_string
pad_len = 2
@staticmethod
def get_value(obj):
"""
Retrieve Ipv6 address value from object.
:param core.misc.ipaddress.MacAddress obj: mac address to get value from
:return:
"""
# extend to 64 bits
return "\0\0" + obj.addr
@staticmethod
def new_obj(value):
"""
Retrieve mac address from a string representation.
:param str value: value to get Ipv4 address from
:return: Ipv4 address
:rtype: core.misc.ipaddress.MacAddress
"""
# only use 48 bits
return MacAddress(address=value[2:])
class CoreTlv(object):
"""
Base class for representing CORE TLVs.
"""
header_format = "!BB"
header_len = struct.calcsize(header_format)
long_header_format = "!BBH"
long_header_len = struct.calcsize(long_header_format)
tlv_type_map = Enum
tlv_data_class_map = {}
def __init__(self, tlv_type, tlv_data):
"""
Create a CoreTlv instance.
:param int tlv_type: tlv type
:param tlv_data: data to unpack
:return: unpacked data
"""
self.tlv_type = tlv_type
if tlv_data:
try:
self.value = self.tlv_data_class_map[self.tlv_type].unpack(tlv_data)
except KeyError:
self.value = tlv_data
else:
self.value = None
@classmethod
def unpack(cls, data):
"""
Parse data and return unpacked class.
:param data: data to unpack
:return: unpacked data class
"""
tlv_type, tlv_len = struct.unpack(cls.header_format, data[:cls.header_len])
header_len = cls.header_len
if tlv_len == 0:
tlv_type, zero, tlv_len = struct.unpack(cls.long_header_format, data[:cls.long_header_len])
header_len = cls.long_header_len
tlv_size = header_len + tlv_len
# for 32-bit alignment
tlv_size += -tlv_size % 4
return cls(tlv_type, data[header_len:tlv_size]), data[tlv_size:]
@classmethod
def pack(cls, tlv_type, value):
"""
Pack a TLV value, based on type.
:param int tlv_type: type of data to pack
:param value: data to pack
:return: header and packed data
"""
tlv_len, tlv_data = cls.tlv_data_class_map[tlv_type].pack(value)
if tlv_len < 256:
hdr = struct.pack(cls.header_format, tlv_type, tlv_len)
else:
hdr = struct.pack(cls.long_header_format, tlv_type, 0, tlv_len)
return hdr + tlv_data
@classmethod
def pack_string(cls, tlv_type, value):
"""
Pack data type from a string representation
:param int tlv_type: type of data to pack
:param str value: string representation of data
:return: header and packed data
"""
return cls.pack(tlv_type, cls.tlv_data_class_map[tlv_type].from_string(value))
def type_str(self):
"""
Retrieve type string for this data type.
:return: data type name
:rtype: str
"""
try:
return self.tlv_type_map(self.tlv_type).name
except ValueError:
return "unknown tlv type: %s" % str(self.tlv_type)
def __str__(self):
"""
String representation of this data type.
:return: string representation
:rtype: str
"""
return "%s <tlvtype = %s, value = %s>" % (self.__class__.__name__, self.type_str(), self.value)
class CoreNodeTlv(CoreTlv):
"""
Class for representing CORE Node TLVs.
"""
tlv_type_map = NodeTlvs
tlv_data_class_map = {
NodeTlvs.NUMBER.value: CoreTlvDataUint32,
NodeTlvs.TYPE.value: CoreTlvDataUint32,
NodeTlvs.NAME.value: CoreTlvDataString,
NodeTlvs.IP_ADDRESS.value: CoreTlvDataIpv4Addr,
NodeTlvs.MAC_ADDRESS.value: CoreTlvDataMacAddr,
NodeTlvs.IP6_ADDRESS.value: CoreTlvDataIPv6Addr,
NodeTlvs.MODEL.value: CoreTlvDataString,
NodeTlvs.EMULATION_SERVER.value: CoreTlvDataString,
NodeTlvs.SESSION.value: CoreTlvDataString,
NodeTlvs.X_POSITION.value: CoreTlvDataUint16,
NodeTlvs.Y_POSITION.value: CoreTlvDataUint16,
NodeTlvs.CANVAS.value: CoreTlvDataUint16,
NodeTlvs.EMULATION_ID.value: CoreTlvDataUint32,
NodeTlvs.NETWORK_ID.value: CoreTlvDataUint32,
NodeTlvs.SERVICES.value: CoreTlvDataString,
NodeTlvs.LATITUDE.value: CoreTlvDataString,
NodeTlvs.LONGITUDE.value: CoreTlvDataString,
NodeTlvs.ALTITUDE.value: CoreTlvDataString,
NodeTlvs.ICON.value: CoreTlvDataString,
NodeTlvs.OPAQUE.value: CoreTlvDataString,
}
class CoreLinkTlv(CoreTlv):
"""
Class for representing CORE link TLVs.
"""
tlv_type_map = LinkTlvs
tlv_data_class_map = {
LinkTlvs.N1_NUMBER.value: CoreTlvDataUint32,
LinkTlvs.N2_NUMBER.value: CoreTlvDataUint32,
LinkTlvs.DELAY.value: CoreTlvDataUint64,
LinkTlvs.BANDWIDTH.value: CoreTlvDataUint64,
LinkTlvs.PER.value: CoreTlvDataString,
LinkTlvs.DUP.value: CoreTlvDataString,
LinkTlvs.JITTER.value: CoreTlvDataUint64,
LinkTlvs.MER.value: CoreTlvDataUint16,
LinkTlvs.BURST.value: CoreTlvDataUint16,
LinkTlvs.SESSION.value: CoreTlvDataString,
LinkTlvs.MBURST.value: CoreTlvDataUint16,
LinkTlvs.TYPE.value: CoreTlvDataUint32,
LinkTlvs.GUI_ATTRIBUTES.value: CoreTlvDataString,
LinkTlvs.UNIDIRECTIONAL.value: CoreTlvDataUint16,
LinkTlvs.EMULATION_ID.value: CoreTlvDataUint32,
LinkTlvs.NETWORK_ID.value: CoreTlvDataUint32,
LinkTlvs.KEY.value: CoreTlvDataUint32,
LinkTlvs.INTERFACE1_NUMBER.value: CoreTlvDataUint16,
LinkTlvs.INTERFACE1_IP4.value: CoreTlvDataIpv4Addr,
LinkTlvs.INTERFACE1_IP4_MASK.value: CoreTlvDataUint16,
LinkTlvs.INTERFACE1_MAC.value: CoreTlvDataMacAddr,
LinkTlvs.INTERFACE1_IP6.value: CoreTlvDataIPv6Addr,
LinkTlvs.INTERFACE1_IP6_MASK.value: CoreTlvDataUint16,
LinkTlvs.INTERFACE2_NUMBER.value: CoreTlvDataUint16,
LinkTlvs.INTERFACE2_IP4.value: CoreTlvDataIpv4Addr,
LinkTlvs.INTERFACE2_IP4_MASK.value: CoreTlvDataUint16,
LinkTlvs.INTERFACE2_MAC.value: CoreTlvDataMacAddr,
LinkTlvs.INTERFACE2_IP6.value: CoreTlvDataIPv6Addr,
LinkTlvs.INTERFACE2_IP6_MASK.value: CoreTlvDataUint16,
LinkTlvs.INTERFACE1_NAME.value: CoreTlvDataString,
LinkTlvs.INTERFACE2_NAME.value: CoreTlvDataString,
LinkTlvs.OPAQUE.value: CoreTlvDataString,
}
class CoreExecuteTlv(CoreTlv):
"""
Class for representing CORE execute TLVs.
"""
tlv_type_map = ExecuteTlvs
tlv_data_class_map = {
ExecuteTlvs.NODE.value: CoreTlvDataUint32,
ExecuteTlvs.NUMBER.value: CoreTlvDataUint32,
ExecuteTlvs.TIME.value: CoreTlvDataUint32,
ExecuteTlvs.COMMAND.value: CoreTlvDataString,
ExecuteTlvs.RESULT.value: CoreTlvDataString,
ExecuteTlvs.STATUS.value: CoreTlvDataUint32,
ExecuteTlvs.SESSION.value: CoreTlvDataString,
}
class CoreRegisterTlv(CoreTlv):
"""
Class for representing CORE register TLVs.
"""
tlv_type_map = RegisterTlvs
tlv_data_class_map = {
RegisterTlvs.WIRELESS.value: CoreTlvDataString,
RegisterTlvs.MOBILITY.value: CoreTlvDataString,
RegisterTlvs.UTILITY.value: CoreTlvDataString,
RegisterTlvs.EXECUTE_SERVER.value: CoreTlvDataString,
RegisterTlvs.GUI.value: CoreTlvDataString,
RegisterTlvs.EMULATION_SERVER.value: CoreTlvDataString,
RegisterTlvs.SESSION.value: CoreTlvDataString,
}
class CoreConfigTlv(CoreTlv):
"""
Class for representing CORE configuration TLVs.
"""
tlv_type_map = ConfigTlvs
tlv_data_class_map = {
ConfigTlvs.NODE.value: CoreTlvDataUint32,
ConfigTlvs.OBJECT.value: CoreTlvDataString,
ConfigTlvs.TYPE.value: CoreTlvDataUint16,
ConfigTlvs.DATA_TYPES.value: CoreTlvDataUint16List,
ConfigTlvs.VALUES.value: CoreTlvDataString,
ConfigTlvs.CAPTIONS.value: CoreTlvDataString,
ConfigTlvs.BITMAP.value: CoreTlvDataString,
ConfigTlvs.POSSIBLE_VALUES.value: CoreTlvDataString,
ConfigTlvs.GROUPS.value: CoreTlvDataString,
ConfigTlvs.SESSION.value: CoreTlvDataString,
ConfigTlvs.INTERFACE_NUMBER.value: CoreTlvDataUint16,
ConfigTlvs.NETWORK_ID.value: CoreTlvDataUint32,
ConfigTlvs.OPAQUE.value: CoreTlvDataString,
}
class CoreFileTlv(CoreTlv):
"""
Class for representing CORE file TLVs.
"""
tlv_type_map = FileTlvs
tlv_data_class_map = {
FileTlvs.NODE.value: CoreTlvDataUint32,
FileTlvs.NAME.value: CoreTlvDataString,
FileTlvs.MODE.value: CoreTlvDataString,
FileTlvs.NUMBER.value: CoreTlvDataUint16,
FileTlvs.TYPE.value: CoreTlvDataString,
FileTlvs.SOURCE_NAME.value: CoreTlvDataString,
FileTlvs.SESSION.value: CoreTlvDataString,
FileTlvs.DATA.value: CoreTlvDataString,
FileTlvs.COMPRESSED_DATA.value: CoreTlvDataString,
}
class CoreInterfaceTlv(CoreTlv):
"""
Class for representing CORE interface TLVs.
"""
tlv_type_map = InterfaceTlvs
tlv_data_class_map = {
InterfaceTlvs.NODE.value: CoreTlvDataUint32,
InterfaceTlvs.NUMBER.value: CoreTlvDataUint16,
InterfaceTlvs.NAME.value: CoreTlvDataString,
InterfaceTlvs.IP_ADDRESS.value: CoreTlvDataIpv4Addr,
InterfaceTlvs.MASK.value: CoreTlvDataUint16,
InterfaceTlvs.MAC_ADDRESS.value: CoreTlvDataMacAddr,
InterfaceTlvs.IP6_ADDRESS.value: CoreTlvDataIPv6Addr,
InterfaceTlvs.IP6_MASK.value: CoreTlvDataUint16,
InterfaceTlvs.TYPE.value: CoreTlvDataUint16,
InterfaceTlvs.SESSION.value: CoreTlvDataString,
InterfaceTlvs.STATE.value: CoreTlvDataUint16,
InterfaceTlvs.EMULATION_ID.value: CoreTlvDataUint32,
InterfaceTlvs.NETWORK_ID.value: CoreTlvDataUint32,
}
class CoreEventTlv(CoreTlv):
"""
Class for representing CORE event TLVs.
"""
tlv_type_map = EventTlvs
tlv_data_class_map = {
EventTlvs.NODE.value: CoreTlvDataUint32,
EventTlvs.TYPE.value: CoreTlvDataUint32,
EventTlvs.NAME.value: CoreTlvDataString,
EventTlvs.DATA.value: CoreTlvDataString,
EventTlvs.TIME.value: CoreTlvDataString,
EventTlvs.SESSION.value: CoreTlvDataString,
}
class CoreSessionTlv(CoreTlv):
"""
Class for representing CORE session TLVs.
"""
tlv_type_map = SessionTlvs
tlv_data_class_map = {
SessionTlvs.NUMBER.value: CoreTlvDataString,
SessionTlvs.NAME.value: CoreTlvDataString,
SessionTlvs.FILE.value: CoreTlvDataString,
SessionTlvs.NODE_COUNT.value: CoreTlvDataString,
SessionTlvs.DATE.value: CoreTlvDataString,
SessionTlvs.THUMB.value: CoreTlvDataString,
SessionTlvs.USER.value: CoreTlvDataString,
SessionTlvs.OPAQUE.value: CoreTlvDataString,
}
class CoreExceptionTlv(CoreTlv):
"""
Class for representing CORE exception TLVs.
"""
tlv_type_map = ExceptionTlvs
tlv_data_class_map = {
ExceptionTlvs.NODE.value: CoreTlvDataUint32,
ExceptionTlvs.SESSION.value: CoreTlvDataString,
ExceptionTlvs.LEVEL.value: CoreTlvDataUint16,
ExceptionTlvs.SOURCE.value: CoreTlvDataString,
ExceptionTlvs.DATE.value: CoreTlvDataString,
ExceptionTlvs.TEXT.value: CoreTlvDataString,
ExceptionTlvs.OPAQUE.value: CoreTlvDataString,
}
class CoreMessage(object):
"""
Base class for representing CORE messages.
"""
header_format = "!BBH"
header_len = struct.calcsize(header_format)
message_type = None
flag_map = MessageFlags
tlv_class = CoreTlv
def __init__(self, flags, hdr, data):
self.raw_message = hdr + data
self.flags = flags
self.tlv_data = {}
self.parse_data(data)
@classmethod
def unpack_header(cls, data):
"""
parse data and return (message_type, message_flags, message_len).
:param str data: data to parse
:return: unpacked tuple
:rtype: tuple
"""
message_type, message_flags, message_len = struct.unpack(cls.header_format, data[:cls.header_len])
return message_type, message_flags, message_len
@classmethod
def create(cls, flags, values):
tlv_data = structutils.pack_values(cls.tlv_class, values)
packed = cls.pack(flags, tlv_data)
header_data = packed[:cls.header_len]
return cls(flags, header_data, tlv_data)
@classmethod
def pack(cls, message_flags, tlv_data):
"""
Pack CORE message data.
:param message_flags: message flags to pack with data
:param tlv_data: data to get length from for packing
:return: combined header and tlv data
"""
header = struct.pack(cls.header_format, cls.message_type, message_flags, len(tlv_data))
return header + tlv_data
def add_tlv_data(self, key, value):
"""
Add TLV data into the data map.
:param int key: key to store TLV data
:param value: data to associate with key
:return: nothing
"""
if key in self.tlv_data:
raise KeyError("key already exists: %s (val=%s)" % (key, value))
self.tlv_data[key] = value
def get_tlv(self, tlv_type):
"""
Retrieve TLV data from data map.
:param int tlv_type: type of data to retrieve
:return: TLV type data
"""
return self.tlv_data.get(tlv_type)
def parse_data(self, data):
"""
Parse data while possible and adding TLV data to the data map.
:param data: data to parse for TLV data
:return: nothing
"""
while data:
tlv, data = self.tlv_class.unpack(data)
self.add_tlv_data(tlv.tlv_type, tlv.value)
def pack_tlv_data(self):
"""
Opposite of parse_data(). Return packed TLV data using self.tlv_data dict. Used by repack().
:return: packed data
:rtype: str
"""
tlv_data = ""
keys = sorted(self.tlv_data.keys())
for key in keys:
value = self.tlv_data[key]
tlv_data += self.tlv_class.pack(key, value)
return tlv_data
def repack(self):
"""
Invoke after updating self.tlv_data[] to rebuild self.raw_message.
Useful for modifying a message that has been parsed, before
sending the raw data again.
:return: nothing
"""
tlv_data = self.pack_tlv_data()
self.raw_message = self.pack(self.flags, tlv_data)
def type_str(self):
"""
Retrieve data of the message type.
:return: name of message type
:rtype: str
"""
try:
return MessageTypes(self.message_type).name
except ValueError:
return "unknown message type: %s" % str(self.message_type)
def flag_str(self):
"""
Retrieve message flag string.
:return: message flag string
:rtype: str
"""
message_flags = []
flag = 1L
while True:
if self.flags & flag:
try:
message_flags.append(self.flag_map(flag).name)
except ValueError:
message_flags.append("0x%x" % flag)
flag <<= 1
if not (self.flags & ~(flag - 1)):
break
return "0x%x <%s>" % (self.flags, " | ".join(message_flags))
def __str__(self):
"""
Retrieve string representation of the message.
:return: string representation
:rtype: str
"""
result = "%s <msgtype = %s, flags = %s>" % (self.__class__.__name__, self.type_str(), self.flag_str())
for key, value in self.tlv_data.iteritems():
try:
tlv_type = self.tlv_class.tlv_type_map(key).name
except ValueError:
tlv_type = "tlv type %s" % key
result += "\n %s: %s" % (tlv_type, value)
return result
def node_numbers(self):
"""
Return a list of node numbers included in this message.
"""
number1 = None
number2 = None
# not all messages have node numbers
if self.message_type == MessageTypes.NODE.value:
number1 = self.get_tlv(NodeTlvs.NUMBER.value)
elif self.message_type == MessageTypes.LINK.value:
number1 = self.get_tlv(LinkTlvs.N1_NUMBER.value)
number2 = self.get_tlv(LinkTlvs.N2_NUMBER.value)
elif self.message_type == MessageTypes.EXECUTE.value:
number1 = self.get_tlv(ExecuteTlvs.NODE.value)
elif self.message_type == MessageTypes.CONFIG.value:
number1 = self.get_tlv(ConfigTlvs.NODE.value)
elif self.message_type == MessageTypes.FILE.value:
number1 = self.get_tlv(FileTlvs.NODE.value)
elif self.message_type == MessageTypes.INTERFACE.value:
number1 = self.get_tlv(InterfaceTlvs.NODE.value)
elif self.message_type == MessageTypes.EVENT.value:
number1 = self.get_tlv(EventTlvs.NODE)
result = []
if number1:
result.append(number1)
if number2:
result.append(number2)
return result
def session_numbers(self):
"""
Return a list of session numbers included in this message.
"""
result = []
if self.message_type == MessageTypes.SESSION.value:
sessions = self.get_tlv(SessionTlvs.NUMBER.value)
elif self.message_type == MessageTypes.EXCEPTION.value:
sessions = self.get_tlv(ExceptionTlvs.SESSION.value)
else:
# All other messages share TLV number 0xA for the session number(s).
sessions = self.get_tlv(NodeTlvs.SESSION.value)
if sessions:
for session_id in sessions.split("|"):
result.append(int(session_id))
return result
class CoreNodeMessage(CoreMessage):
"""
CORE node message class.
"""
message_type = MessageTypes.NODE.value
tlv_class = CoreNodeTlv
class CoreLinkMessage(CoreMessage):
"""
CORE link message class.
"""
message_type = MessageTypes.LINK.value
tlv_class = CoreLinkTlv
class CoreExecMessage(CoreMessage):
"""
CORE execute message class.
"""
message_type = MessageTypes.EXECUTE.value
tlv_class = CoreExecuteTlv
class CoreRegMessage(CoreMessage):
"""
CORE register message class.
"""
message_type = MessageTypes.REGISTER.value
tlv_class = CoreRegisterTlv
class CoreConfMessage(CoreMessage):
"""
CORE configuration message class.
"""
message_type = MessageTypes.CONFIG.value
tlv_class = CoreConfigTlv
class CoreFileMessage(CoreMessage):
"""
CORE file message class.
"""
message_type = MessageTypes.FILE.value
tlv_class = CoreFileTlv
class CoreIfaceMessage(CoreMessage):
"""
CORE interface message class.
"""
message_type = MessageTypes.INTERFACE.value
tlv_class = CoreInterfaceTlv
class CoreEventMessage(CoreMessage):
"""
CORE event message class.
"""
message_type = MessageTypes.EVENT.value
tlv_class = CoreEventTlv
class CoreSessionMessage(CoreMessage):
"""
CORE session message class.
"""
message_type = MessageTypes.SESSION.value
tlv_class = CoreSessionTlv
class CoreExceptionMessage(CoreMessage):
"""
CORE exception message class.
"""
message_type = MessageTypes.EXCEPTION.value
tlv_class = CoreExceptionTlv
# map used to translate enumerated message type values to message class objects
CLASS_MAP = {
MessageTypes.NODE.value: CoreNodeMessage,
MessageTypes.LINK.value: CoreLinkMessage,
MessageTypes.EXECUTE.value: CoreExecMessage,
MessageTypes.REGISTER.value: CoreRegMessage,
MessageTypes.CONFIG.value: CoreConfMessage,
MessageTypes.FILE.value: CoreFileMessage,
MessageTypes.INTERFACE.value: CoreIfaceMessage,
MessageTypes.EVENT.value: CoreEventMessage,
MessageTypes.SESSION.value: CoreSessionMessage,
MessageTypes.EXCEPTION.value: CoreExceptionMessage,
}
def str_to_list(value):
"""
Helper to convert pipe-delimited string ("a|b|c") into a list (a, b, c).
:param str value: string to convert
:return: converted list
:rtype: list
"""
if value is None:
return None
return value.split("|")
def state_name(value):
"""
Helper to convert state number into state name using event types.
:param int value: state value to derive name from
:return: state name
:rtype: str
"""
try:
value = EventTypes(value).name
except ValueError:
value = "unknown"
return value