core-extra/daemon/core/emulator/data.py

357 lines
9.7 KiB
Python

"""
CORE data objects.
"""
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional
import netaddr
from core import utils
from core.emulator.enumerations import (
EventTypes,
ExceptionLevels,
LinkTypes,
MessageFlags,
)
if TYPE_CHECKING:
from core.nodes.base import CoreNode, NodeBase
@dataclass
class ConfigData:
message_type: int = None
node: int = None
object: str = None
type: int = None
data_types: tuple[int] = None
data_values: str = None
captions: str = None
bitmap: str = None
possible_values: str = None
groups: str = None
session: int = None
iface_id: int = None
network_id: int = None
opaque: str = None
@dataclass
class EventData:
node: int = None
event_type: EventTypes = None
name: str = None
data: str = None
time: str = None
session: int = None
@dataclass
class ExceptionData:
node: int = None
session: int = None
level: ExceptionLevels = None
source: str = None
date: str = None
text: str = None
opaque: str = None
@dataclass
class FileData:
message_type: MessageFlags = None
node: int = None
name: str = None
mode: str = None
number: int = None
type: str = None
source: str = None
session: int = None
data: str = None
compressed_data: str = None
@dataclass
class NodeOptions:
"""
Options for creating and updating nodes within core.
"""
name: str = None
model: Optional[str] = "PC"
canvas: int = None
icon: str = None
services: list[str] = field(default_factory=list)
config_services: list[str] = field(default_factory=list)
x: float = None
y: float = None
lat: float = None
lon: float = None
alt: float = None
server: str = None
image: str = None
emane: str = None
legacy: bool = False
# src, dst
binds: list[tuple[str, str]] = field(default_factory=list)
# src, dst, unique, delete
volumes: list[tuple[str, str, bool, bool]] = field(default_factory=list)
def set_position(self, x: float, y: float) -> None:
"""
Convenience method for setting position.
:param x: x position
:param y: y position
:return: nothing
"""
self.x = x
self.y = y
def set_location(self, lat: float, lon: float, alt: float) -> None:
"""
Convenience method for setting location.
:param lat: latitude
:param lon: longitude
:param alt: altitude
:return: nothing
"""
self.lat = lat
self.lon = lon
self.alt = alt
@dataclass
class NodeData:
"""
Node to broadcast.
"""
node: "NodeBase"
message_type: MessageFlags = None
source: str = None
@dataclass
class InterfaceData:
"""
Convenience class for storing interface data.
"""
id: int = None
name: str = None
mac: str = None
ip4: str = None
ip4_mask: int = None
ip6: str = None
ip6_mask: int = None
mtu: int = None
def get_ips(self) -> list[str]:
"""
Returns a list of ip4 and ip6 addresses when present.
:return: list of ip addresses
"""
ips = []
if self.ip4 and self.ip4_mask:
ips.append(f"{self.ip4}/{self.ip4_mask}")
if self.ip6 and self.ip6_mask:
ips.append(f"{self.ip6}/{self.ip6_mask}")
return ips
@dataclass
class LinkOptions:
"""
Options for creating and updating links within core.
"""
delay: int = None
bandwidth: int = None
loss: float = None
dup: int = None
jitter: int = None
mer: int = None
burst: int = None
mburst: int = None
unidirectional: int = None
key: int = None
buffer: int = None
def update(self, options: "LinkOptions") -> bool:
"""
Updates current options with values from other options.
:param options: options to update with
:return: True if any value has changed, False otherwise
"""
changed = False
if options.delay is not None and 0 <= options.delay != self.delay:
self.delay = options.delay
changed = True
if options.bandwidth is not None and 0 <= options.bandwidth != self.bandwidth:
self.bandwidth = options.bandwidth
changed = True
if options.loss is not None and 0 <= options.loss != self.loss:
self.loss = options.loss
changed = True
if options.dup is not None and 0 <= options.dup != self.dup:
self.dup = options.dup
changed = True
if options.jitter is not None and 0 <= options.jitter != self.jitter:
self.jitter = options.jitter
changed = True
if options.buffer is not None and 0 <= options.buffer != self.buffer:
self.buffer = options.buffer
changed = True
return changed
def is_clear(self) -> bool:
"""
Checks if the current option values represent a clear state.
:return: True if the current values should clear, False otherwise
"""
clear = self.delay is None or self.delay <= 0
clear &= self.jitter is None or self.jitter <= 0
clear &= self.loss is None or self.loss <= 0
clear &= self.dup is None or self.dup <= 0
clear &= self.bandwidth is None or self.bandwidth <= 0
clear &= self.buffer is None or self.buffer <= 0
return clear
def __eq__(self, other: Any) -> bool:
"""
Custom logic to check if this link options is equivalent to another.
:param other: other object to check
:return: True if they are both link options with the same values,
False otherwise
"""
if not isinstance(other, LinkOptions):
return False
return (
self.delay == other.delay
and self.jitter == other.jitter
and self.loss == other.loss
and self.dup == other.dup
and self.bandwidth == other.bandwidth
and self.buffer == other.buffer
)
@dataclass
class LinkData:
"""
Represents all data associated with a link.
"""
message_type: MessageFlags = None
type: LinkTypes = LinkTypes.WIRED
label: str = None
node1_id: int = None
node2_id: int = None
network_id: int = None
iface1: InterfaceData = None
iface2: InterfaceData = None
options: LinkOptions = LinkOptions()
color: str = None
source: str = None
class IpPrefixes:
"""
Convenience class to help generate IP4 and IP6 addresses for nodes within CORE.
"""
def __init__(self, ip4_prefix: str = None, ip6_prefix: str = None) -> None:
"""
Creates an IpPrefixes object.
:param ip4_prefix: ip4 prefix to use for generation
:param ip6_prefix: ip6 prefix to use for generation
:raises ValueError: when both ip4 and ip6 prefixes have not been provided
"""
if not ip4_prefix and not ip6_prefix:
raise ValueError("ip4 or ip6 must be provided")
self.ip4 = None
if ip4_prefix:
self.ip4 = netaddr.IPNetwork(ip4_prefix)
self.ip6 = None
if ip6_prefix:
self.ip6 = netaddr.IPNetwork(ip6_prefix)
def ip4_address(self, node_id: int) -> str:
"""
Convenience method to return the IP4 address for a node.
:param node_id: node id to get IP4 address for
:return: IP4 address or None
"""
if not self.ip4:
raise ValueError("ip4 prefixes have not been set")
return str(self.ip4[node_id])
def ip6_address(self, node_id: int) -> str:
"""
Convenience method to return the IP6 address for a node.
:param node_id: node id to get IP6 address for
:return: IP4 address or None
"""
if not self.ip6:
raise ValueError("ip6 prefixes have not been set")
return str(self.ip6[node_id])
def gen_iface(self, node_id: int, name: str = None, mac: str = None):
"""
Creates interface data for linking nodes, using the nodes unique id for
generation, along with a random mac address, unless provided.
:param node_id: node id to create an interface for
:param name: name to set for interface, default is eth{id}
:param mac: mac address to use for this interface, default is random
generation
:return: new interface data for the provided node
"""
# generate ip4 data
ip4 = None
ip4_mask = None
if self.ip4:
ip4 = self.ip4_address(node_id)
ip4_mask = self.ip4.prefixlen
# generate ip6 data
ip6 = None
ip6_mask = None
if self.ip6:
ip6 = self.ip6_address(node_id)
ip6_mask = self.ip6.prefixlen
# random mac
if not mac:
mac = utils.random_mac()
return InterfaceData(
name=name, ip4=ip4, ip4_mask=ip4_mask, ip6=ip6, ip6_mask=ip6_mask, mac=mac
)
def create_iface(
self, node: "CoreNode", name: str = None, mac: str = None
) -> InterfaceData:
"""
Creates interface data for linking nodes, using the nodes unique id for
generation, along with a random mac address, unless provided.
:param node: node to create interface for
:param name: name to set for interface, default is eth{id}
:param mac: mac address to use for this interface, default is random
generation
:return: new interface data for the provided node
"""
iface_data = self.gen_iface(node.id, name, mac)
iface_data.id = node.next_iface_id()
return iface_data