Merge pull request #336 from coreemu/refactoring/remove-ipaddress

Refactoring/remove ipaddress
This commit is contained in:
bharnden 2020-01-09 14:31:33 -08:00 committed by GitHub
commit 67c1dae357
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 337 additions and 687 deletions

View file

@ -7,9 +7,10 @@ import threading
from contextlib import contextmanager
import grpc
import netaddr
from core import utils
from core.api.grpc import core_pb2, core_pb2_grpc
from core.nodes.ipaddress import Ipv4Prefix, Ipv6Prefix, MacAddress
class InterfaceHelper:
@ -30,10 +31,10 @@ class InterfaceHelper:
self.ip4 = None
if ip4_prefix:
self.ip4 = Ipv4Prefix(ip4_prefix)
self.ip4 = netaddr.IPNetwork(ip4_prefix)
self.ip6 = None
if ip6_prefix:
self.ip6 = Ipv6Prefix(ip6_prefix)
self.ip6 = netaddr.IPNetwork(ip6_prefix)
def ip4_address(self, node_id):
"""
@ -45,7 +46,7 @@ class InterfaceHelper:
"""
if not self.ip4:
raise ValueError("ip4 prefixes have not been set")
return str(self.ip4.addr(node_id))
return str(self.ip4[node_id])
def ip6_address(self, node_id):
"""
@ -57,7 +58,7 @@ class InterfaceHelper:
"""
if not self.ip6:
raise ValueError("ip6 prefixes have not been set")
return str(self.ip6.addr(node_id))
return str(self.ip6[node_id])
def create_interface(self, node_id, interface_id, name=None, mac=None):
"""
@ -75,19 +76,19 @@ class InterfaceHelper:
ip4 = None
ip4_mask = None
if self.ip4:
ip4 = str(self.ip4.addr(node_id))
ip4 = self.ip4_address(node_id)
ip4_mask = self.ip4.prefixlen
# generate ip6 data
ip6 = None
ip6_mask = None
if self.ip6:
ip6 = str(self.ip6.addr(node_id))
ip6 = self.ip6_address(node_id)
ip6_mask = self.ip6.prefixlen
# random mac
if not mac:
mac = MacAddress.random()
mac = utils.random_mac()
return core_pb2.Interface(
id=interface_id,

View file

@ -6,7 +6,6 @@ from core.api.grpc import core_pb2
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import LinkTypes, NodeTypes
from core.nodes.base import CoreNetworkBase
from core.nodes.ipaddress import MacAddress
WORKERS = 10
@ -57,8 +56,6 @@ def link_interface(interface_proto):
mac = interface_proto.mac
if mac == "":
mac = None
else:
mac = MacAddress.from_string(mac)
interface = InterfaceData(
_id=interface_proto.id,
name=name,

View file

@ -5,10 +5,13 @@ types and objects used for parsing and building CORE API messages.
CORE API messaging is leveraged for communication with the GUI.
"""
import binascii
import socket
import struct
from enum import Enum
import netaddr
from core.api.tlv import structutils
from core.emulator.enumerations import (
ConfigTlvs,
@ -24,7 +27,6 @@ from core.emulator.enumerations import (
RegisterTlvs,
SessionTlvs,
)
from core.nodes.ipaddress import IpAddress, MacAddress
class CoreTlvData:
@ -258,7 +260,7 @@ class CoreTlvDataIpv4Addr(CoreTlvDataObj):
Utility class for packing/unpacking Ipv4 addresses.
"""
data_type = IpAddress.from_string
data_type = str
data_format = "!2x4s"
pad_len = 2
@ -267,21 +269,22 @@ class CoreTlvDataIpv4Addr(CoreTlvDataObj):
"""
Retrieve Ipv4 address value from object.
:param core.misc.ipaddress.IpAddress obj: ip address to get value from
:return:
:param str obj: ip address to get value from
:return: packed address
:rtype: bytes
"""
return obj.addr
return socket.inet_pton(socket.AF_INET, obj)
@staticmethod
def new_obj(value):
"""
Retrieve Ipv4 address from a string representation.
:param str value: value to get Ipv4 address from
:param bytes value: value to get Ipv4 address from
:return: Ipv4 address
:rtype: core.nodes.ipaddress.IpAddress
:rtype: str
"""
return IpAddress(af=socket.AF_INET, address=value)
return socket.inet_ntop(socket.AF_INET, value)
class CoreTlvDataIPv6Addr(CoreTlvDataObj):
@ -290,7 +293,7 @@ class CoreTlvDataIPv6Addr(CoreTlvDataObj):
"""
data_format = "!16s2x"
data_type = IpAddress.from_string
data_type = str
pad_len = 2
@staticmethod
@ -298,21 +301,22 @@ class CoreTlvDataIPv6Addr(CoreTlvDataObj):
"""
Retrieve Ipv6 address value from object.
:param core.nodes.ipaddress.IpAddress obj: ip address to get value from
:return:
:param str obj: ip address to get value from
:return: packed address
:rtype: bytes
"""
return obj.addr
return socket.inet_pton(socket.AF_INET6, obj)
@staticmethod
def new_obj(value):
"""
Retrieve Ipv6 address from a string representation.
:param str value: value to get Ipv4 address from
:param bytes value: value to get Ipv4 address from
:return: Ipv4 address
:rtype: core.nodes.ipaddress.IpAddress
:rtype: str
"""
return IpAddress(af=socket.AF_INET6, address=value)
return socket.inet_ntop(socket.AF_INET6, value)
class CoreTlvDataMacAddr(CoreTlvDataObj):
@ -321,7 +325,7 @@ class CoreTlvDataMacAddr(CoreTlvDataObj):
"""
data_format = "!2x8s"
data_type = MacAddress.from_string
data_type = str
pad_len = 2
@staticmethod
@ -329,23 +333,27 @@ class CoreTlvDataMacAddr(CoreTlvDataObj):
"""
Retrieve Ipv6 address value from object.
:param core.nodes.ipaddress.MacAddress obj: mac address to get value from
:return:
:param str obj: mac address to get value from
:return: packed mac address
:rtype: bytes
"""
# extend to 64 bits
return b"\0\0" + obj.addr
return b"\0\0" + netaddr.EUI(obj).packed
@staticmethod
def new_obj(value):
"""
Retrieve mac address from a string representation.
:param str value: value to get Ipv4 address from
:return: Ipv4 address
:rtype: core.nodes.ipaddress.MacAddress
:param bytes value: value to get Ipv4 address from
:return: mac address
:rtype: str
"""
# only use 48 bits
return MacAddress(address=value[2:])
value = binascii.hexlify(value[2:]).decode()
mac = netaddr.EUI(value)
mac.dialect = netaddr.mac_unix
return str(mac)
class CoreTlv:

View file

@ -8,13 +8,13 @@ import threading
from collections import OrderedDict
from tempfile import NamedTemporaryFile
import netaddr
from fabric import Connection
from invoke import UnexpectedExit
from core import utils
from core.errors import CoreCommandError
from core.nodes.interface import GreTap
from core.nodes.ipaddress import IpAddress
from core.nodes.network import CoreNetwork, CtrlNet
LOCK = threading.Lock()
@ -196,7 +196,7 @@ class DistributedController:
:rtype: tuple
"""
host = server.host
key = self.tunnel_key(node.id, IpAddress.to_int(host))
key = self.tunnel_key(node.id, netaddr.IPAddress(host).value)
tunnel = self.tunnels.get(key)
if tunnel is not None:
return tunnel

View file

@ -1,6 +1,8 @@
import netaddr
from core import utils
from core.emane.nodes import EmaneNet
from core.emulator.enumerations import LinkTypes
from core.nodes.ipaddress import Ipv4Prefix, Ipv6Prefix, MacAddress
from core.nodes.physical import PhysicalNode
@ -164,10 +166,10 @@ class IpPrefixes:
self.ip4 = None
if ip4_prefix:
self.ip4 = Ipv4Prefix(ip4_prefix)
self.ip4 = netaddr.IPNetwork(ip4_prefix)
self.ip6 = None
if ip6_prefix:
self.ip6 = Ipv6Prefix(ip6_prefix)
self.ip6 = netaddr.IPNetwork(ip6_prefix)
def ip4_address(self, node):
"""
@ -179,7 +181,7 @@ class IpPrefixes:
"""
if not self.ip4:
raise ValueError("ip4 prefixes have not been set")
return str(self.ip4.addr(node.id))
return str(self.ip4[node.id])
def ip6_address(self, node):
"""
@ -191,7 +193,7 @@ class IpPrefixes:
"""
if not self.ip6:
raise ValueError("ip6 prefixes have not been set")
return str(self.ip6.addr(node.id))
return str(self.ip6[node.id])
def create_interface(self, node, name=None, mac=None):
"""
@ -212,19 +214,19 @@ class IpPrefixes:
ip4 = None
ip4_mask = None
if self.ip4:
ip4 = str(self.ip4.addr(node.id))
ip4 = self.ip4_address(node)
ip4_mask = self.ip4.prefixlen
# generate ip6 data
ip6 = None
ip6_mask = None
if self.ip6:
ip6 = str(self.ip6.addr(node.id))
ip6 = self.ip6_address(node)
ip6_mask = self.ip6.prefixlen
# random mac
if not mac:
mac = MacAddress.random()
mac = utils.random_mac()
return InterfaceData(
_id=inteface_id,
@ -248,7 +250,7 @@ class InterfaceData:
:param int _id: interface id
:param str name: name for interface
:param core.nodes.ipaddress.MacAddress mac: mac address
:param str mac: mac address
:param str ip4: ipv4 address
:param int ip4_mask: ipv4 bit mask
:param str ip6: ipv6 address

View file

@ -33,7 +33,6 @@ from core.location.event import EventLoop
from core.location.mobility import BasicRangeModel, MobilityManager
from core.nodes.base import CoreNetworkBase, CoreNode, CoreNodeBase
from core.nodes.docker import DockerNode
from core.nodes.ipaddress import MacAddress
from core.nodes.lxd import LxcNode
from core.nodes.network import (
CtrlNet,
@ -1764,7 +1763,7 @@ class Session:
control_ip = node.id
try:
address = control_net.prefix.addr(control_ip)
address = control_net.prefix[control_ip]
prefix = control_net.prefix.prefixlen
addrlist = [f"{address}/{prefix}"]
except ValueError:
@ -1778,7 +1777,7 @@ class Session:
net=control_net,
ifindex=control_net.CTRLIF_IDX_BASE + net_index,
ifname=f"ctrl{net_index}",
hwaddr=MacAddress.random(),
hwaddr=utils.random_mac(),
addrlist=addrlist,
)
node.netif(interface1).control = True

View file

@ -5,16 +5,16 @@ Defines the base logic for nodes used within core.
import logging
import os
import shutil
import socket
import threading
from socket import AF_INET, AF_INET6
import netaddr
from core import utils
from core.constants import MOUNT_BIN, VNODED_BIN
from core.emulator.data import LinkData, NodeData
from core.emulator.enumerations import LinkTypes, NodeTypes
from core.errors import CoreCommandError
from core.nodes import client, ipaddress
from core.nodes import client
from core.nodes.interface import TunTap, Veth
from core.nodes.netclient import get_net_client
@ -725,39 +725,40 @@ class CoreNode(CoreNodeBase):
Set hardware addres for an interface.
:param int ifindex: index of interface to set hardware address for
:param core.nodes.ipaddress.MacAddress addr: hardware address to set
:param str addr: hardware address to set
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
addr = utils.validate_mac(addr)
interface = self._netif[ifindex]
interface.sethwaddr(addr)
if self.up:
self.node_net_client.device_mac(interface.name, str(addr))
self.node_net_client.device_mac(interface.name, addr)
def addaddr(self, ifindex, addr):
"""
Add interface address.
:param int ifindex: index of interface to add address to
:param core.nodes.ipaddress.IpAddress addr: address to add to interface
:param str addr: address to add to interface
:return: nothing
"""
addr = utils.validate_ip(addr)
interface = self._netif[ifindex]
interface.addaddr(addr)
if self.up:
address = str(addr)
# ipv6 check
# ipv4 check
broadcast = None
if ":" not in address:
if netaddr.valid_ipv4(addr):
broadcast = "+"
self.node_net_client.create_address(interface.name, address, broadcast)
self.node_net_client.create_address(interface.name, addr, broadcast)
def deladdr(self, ifindex, addr):
"""
Delete address from an interface.
:param int ifindex: index of interface to delete address from
:param core.nodes.ipaddress.IpAddress addr: address to delete from interface
:param str addr: address to delete from interface
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
@ -769,7 +770,7 @@ class CoreNode(CoreNodeBase):
logging.exception("trying to delete unknown address: %s", addr)
if self.up:
self.node_net_client.delete_address(interface.name, str(addr))
self.node_net_client.delete_address(interface.name, addr)
def ifup(self, ifindex):
"""
@ -788,7 +789,7 @@ class CoreNode(CoreNodeBase):
:param core.nodes.base.CoreNetworkBase net: network to associate with
:param list addrlist: addresses to add on the interface
:param core.nodes.ipaddress.MacAddress hwaddr: hardware address to set for interface
:param str hwaddr: hardware address to set for interface
:param int ifindex: index of interface to create
:param str ifname: name for interface
:return: interface index
@ -799,7 +800,7 @@ class CoreNode(CoreNodeBase):
with self.lock:
# TODO: emane specific code
if net.is_emane is True:
if net is not None and net.is_emane is True:
ifindex = self.newtuntap(ifindex, ifname)
# TUN/TAP is not ready for addressing yet; the device may
# take some time to appear, and installing it into a
@ -1015,15 +1016,11 @@ class CoreNetworkBase(NodeBase):
for address in netif.addrlist:
ip, _sep, mask = address.partition("/")
mask = int(mask)
if ipaddress.is_ipv4_address(ip):
family = AF_INET
ipl = socket.inet_pton(family, ip)
interface2_ip4 = ipaddress.IpAddress(af=family, address=ipl)
if netaddr.valid_ipv4(ip):
interface2_ip4 = ip
interface2_ip4_mask = mask
else:
family = AF_INET6
ipl = socket.inet_pton(family, ip)
interface2_ip6 = ipaddress.IpAddress(af=family, address=ipl)
interface2_ip6 = ip
interface2_ip6_mask = mask
link_data = LinkData(

View file

@ -114,7 +114,7 @@ class CoreInterface:
:param str addr: address to add
:return: nothing
"""
addr = utils.validate_ip(addr)
self.addrlist.append(addr)
def deladdr(self, addr):
@ -130,9 +130,10 @@ class CoreInterface:
"""
Set hardware address.
:param core.nodes.ipaddress.MacAddress addr: hardware address to set to.
:param str addr: hardware address to set to.
:return: nothing
"""
addr = utils.validate_mac(addr)
self.hwaddr = addr
def getparam(self, key):

View file

@ -1,456 +0,0 @@
"""
Helper objects for dealing with IPv4/v6 addresses.
"""
import logging
import random
import socket
import struct
from socket import AF_INET, AF_INET6
class MacAddress:
"""
Provides mac address utilities for use within core.
"""
def __init__(self, address):
"""
Creates a MacAddress instance.
:param bytes address: mac address
"""
self.addr = address
def __str__(self):
"""
Create a string representation of a MacAddress.
:return: string representation
:rtype: str
"""
return ":".join(f"{x:02x}" for x in bytearray(self.addr))
def to_link_local(self):
"""
Convert the MAC address to a IPv6 link-local address, using EUI 48
to EUI 64 conversion process per RFC 5342.
:return: ip address object
:rtype: IpAddress
"""
if not self.addr:
return IpAddress.from_string("::")
tmp = struct.unpack("!Q", b"\x00\x00" + self.addr)[0]
nic = int(tmp) & 0x000000FFFFFF
oui = int(tmp) & 0xFFFFFF000000
# toggle U/L bit
oui ^= 0x020000000000
# append EUI-48 octets
oui = (oui << 16) | 0xFFFE000000
return IpAddress(AF_INET6, struct.pack("!QQ", 0xFE80 << 48, oui | nic))
@classmethod
def from_string(cls, s):
"""
Create a mac address object from a string.
:param s: string representation of a mac address
:return: mac address class
:rtype: MacAddress
"""
addr = b"".join(bytes([int(x, 16)]) for x in s.split(":"))
return cls(addr)
@classmethod
def random(cls):
"""
Create a random mac address.
:return: random mac address
:rtype: MacAddress
"""
tmp = random.randint(0, 0xFFFFFF)
# use the Xen OID 00:16:3E
tmp |= 0x00163E << 24
tmpbytes = struct.pack("!Q", tmp)
return cls(tmpbytes[2:])
class IpAddress:
"""
Provides ip utilities and functionality for use within core.
"""
def __init__(self, af, address):
"""
Create a IpAddress instance.
:param int af: address family
:param bytes address: ip address
:return:
"""
# check if (af, addr) is valid
if not socket.inet_ntop(af, address):
raise ValueError("invalid af/addr")
self.af = af
self.addr = address
def is_ipv4(self):
"""
Checks if this is an ipv4 address.
:return: True if ipv4 address, False otherwise
:rtype: bool
"""
return self.af == AF_INET
def is_ipv6(self):
"""
Checks if this is an ipv6 address.
:return: True if ipv6 address, False otherwise
:rtype: bool
"""
return self.af == AF_INET6
def __str__(self):
"""
Create a string representation of this address.
:return: string representation of address
:rtype: str
"""
return socket.inet_ntop(self.af, self.addr)
def __eq__(self, other):
"""
Checks for equality with another ip address.
:param IpAddress other: other ip address to check equality with
:return: True is the other IpAddress is equal, False otherwise
:rtype: bool
"""
if not isinstance(other, IpAddress):
return False
elif self is other:
return True
else:
return other.af == self.af and other.addr == self.addr
def __add__(self, other):
"""
Add value to ip addresses.
:param int other: value to add to ip address
:return: added together ip address instance
:rtype: IpAddress
"""
try:
carry = int(other)
except ValueError:
logging.exception("error during addition")
return NotImplemented
tmp = [x for x in bytearray(self.addr)]
for i in range(len(tmp) - 1, -1, -1):
x = tmp[i] + carry
tmp[i] = x & 0xFF
carry = x >> 8
if carry == 0:
break
addr = bytes(tmp)
return self.__class__(self.af, addr)
def __sub__(self, other):
"""
Subtract value from ip address.
:param int other: value to subtract from ip address
:return:
"""
try:
tmp = -int(other)
except ValueError:
logging.exception("error during subtraction")
return NotImplemented
return self.__add__(tmp)
@classmethod
def from_string(cls, s):
"""
Create a ip address from a string representation.
:param s: string representation to create ip address from
:return: ip address instance
:rtype: IpAddress
"""
for af in AF_INET, AF_INET6:
return cls(af, socket.inet_pton(af, s))
@staticmethod
def to_int(s):
"""
Convert IPv4 string to integer
:param s: string to convert to 32-bit integer
:return: integer value
:rtype: int
"""
value = socket.inet_pton(AF_INET, s)
return struct.unpack("!I", value)[0]
class IpPrefix:
"""
Provides ip address generation and prefix utilities.
"""
def __init__(self, af, prefixstr):
"""
Create a IpPrefix instance.
:param int af: address family for ip prefix
:param str prefixstr: ip prefix string
"""
# prefixstr format: address/prefixlen
tmp = prefixstr.split("/")
if len(tmp) > 2:
raise ValueError(f"invalid prefix: {prefixstr}")
self.af = af
if self.af == AF_INET:
self.addrlen = 32
elif self.af == AF_INET6:
self.addrlen = 128
else:
raise ValueError(f"invalid address family: {self.af}")
if len(tmp) == 2:
self.prefixlen = int(tmp[1])
else:
self.prefixlen = self.addrlen
self.prefix = socket.inet_pton(self.af, tmp[0])
self.prefix = bytes(self.prefix)
if self.addrlen > self.prefixlen:
addrbits = self.addrlen - self.prefixlen
netmask = ((1 << self.prefixlen) - 1) << addrbits
prefix = bytes(b"")
for i in range(-1, -(addrbits >> 3) - 2, -1):
prefix = bytes([self.prefix[i] & (netmask & 0xFF)]) + prefix
netmask >>= 8
self.prefix = self.prefix[:i] + prefix
def __str__(self):
"""
String representation of an ip prefix.
:return: string representation
:rtype: str
"""
address = socket.inet_ntop(self.af, self.prefix)
return f"{address}/{self.prefixlen}"
def __eq__(self, other):
"""
Compare equality with another ip prefix.
:param IpPrefix other: other ip prefix to compare with
:return: True is equal, False otherwise
:rtype: bool
"""
if not isinstance(other, IpPrefix):
return False
elif self is other:
return True
else:
return (
other.af == self.af
and other.prefixlen == self.prefixlen
and other.prefix == self.prefix
)
def __add__(self, other):
"""
Add a value to this ip prefix.
:param int other: value to add
:return: added ip prefix instance
:rtype: IpPrefix
"""
try:
tmp = int(other)
except ValueError:
logging.exception("error during addition")
return NotImplemented
a = IpAddress(self.af, self.prefix) + (tmp << (self.addrlen - self.prefixlen))
prefixstr = f"{a}/{self.prefixlen}"
if self.__class__ == IpPrefix:
return self.__class__(self.af, prefixstr)
else:
return self.__class__(prefixstr)
def __sub__(self, other):
"""
Subtract value from this ip prefix.
:param int other: value to subtract
:return: subtracted ip prefix instance
:rtype: IpPrefix
"""
try:
tmp = -int(other)
except ValueError:
logging.exception("error during subtraction")
return NotImplemented
return self.__add__(tmp)
def addr(self, hostid):
"""
Create an ip address for a given host id.
:param hostid: host id for an ip address
:return: ip address
:rtype: IpAddress
"""
tmp = int(hostid)
if tmp in [-1, 0, 1] and self.addrlen == self.prefixlen:
return IpAddress(self.af, self.prefix)
if (
tmp == 0
or tmp > (1 << (self.addrlen - self.prefixlen)) - 1
or (
self.af == AF_INET and tmp == (1 << (self.addrlen - self.prefixlen)) - 1
)
):
raise ValueError(f"invalid hostid for prefix {self}: {hostid}")
addr = bytes(b"")
prefix_endpoint = -1
for i in range(-1, -(self.addrlen >> 3) - 1, -1):
prefix_endpoint = i
addr = bytes([self.prefix[i] | (tmp & 0xFF)]) + addr
tmp >>= 8
if not tmp:
break
addr = self.prefix[:prefix_endpoint] + addr
return IpAddress(self.af, addr)
def min_addr(self):
"""
Return the minimum ip address for this prefix.
:return: minimum ip address
:rtype: IpAddress
"""
return self.addr(1)
def max_addr(self):
"""
Return the maximum ip address for this prefix.
:return: maximum ip address
:rtype: IpAddress
"""
if self.af == AF_INET:
return self.addr((1 << (self.addrlen - self.prefixlen)) - 2)
else:
return self.addr((1 << (self.addrlen - self.prefixlen)) - 1)
def num_addr(self):
"""
Retrieve the number of ip addresses for this prefix.
:return: maximum number of ip addresses
:rtype: int
"""
return max(0, (1 << (self.addrlen - self.prefixlen)) - 2)
def prefix_str(self):
"""
Retrieve the prefix string for this ip address.
:return: prefix string
:rtype: str
"""
return socket.inet_ntop(self.af, self.prefix)
def netmask_str(self):
"""
Retrieve the netmask string for this ip address.
:return: netmask string
:rtype: str
"""
addrbits = self.addrlen - self.prefixlen
netmask = ((1 << self.prefixlen) - 1) << addrbits
netmaskbytes = struct.pack("!L", netmask)
return IpAddress(af=AF_INET, address=netmaskbytes).__str__()
class Ipv4Prefix(IpPrefix):
"""
Provides an ipv4 specific class for ip prefixes.
"""
def __init__(self, prefixstr):
"""
Create a Ipv4Prefix instance.
:param str prefixstr: ip prefix
"""
super().__init__(AF_INET, prefixstr)
class Ipv6Prefix(IpPrefix):
"""
Provides an ipv6 specific class for ip prefixes.
"""
def __init__(self, prefixstr):
"""
Create a Ipv6Prefix instance.
:param str prefixstr: ip prefix
"""
super().__init__(AF_INET6, prefixstr)
def is_ip_address(af, addrstr):
"""
Check if ip address string is a valid ip address.
:param int af: address family
:param str addrstr: ip address string
:return: True if a valid ip address, False otherwise
:rtype: bool
"""
try:
socket.inet_pton(af, addrstr)
return True
except IOError:
return False
def is_ipv4_address(addrstr):
"""
Check if ipv4 address string is a valid ipv4 address.
:param str addrstr: ipv4 address string
:return: True if a valid ipv4 address, False otherwise
:rtype: bool
"""
return is_ip_address(AF_INET, addrstr)
def is_ipv6_address(addrstr):
"""
Check if ipv6 address string is a valid ipv6 address.
:param str addrstr: ipv6 address string
:return: True if a valid ipv6 address, False otherwise
:rtype: bool
"""
return is_ip_address(AF_INET6, addrstr)

View file

@ -3,17 +3,16 @@ Defines network nodes used within core.
"""
import logging
import socket
import threading
import time
from socket import AF_INET, AF_INET6
import netaddr
from core import utils
from core.constants import EBTABLES_BIN, TC_BIN
from core.emulator.data import LinkData
from core.emulator.enumerations import LinkTypes, NodeTypes, RegisterTlvs
from core.errors import CoreCommandError, CoreError
from core.nodes import ipaddress
from core.nodes.base import CoreNetworkBase
from core.nodes.interface import GreTap, Veth
from core.nodes.netclient import get_net_client
@ -750,28 +749,30 @@ class CtrlNet(CoreNetwork):
:param serverintf: server interface
:return:
"""
self.prefix = ipaddress.Ipv4Prefix(prefix)
self.prefix = netaddr.IPNetwork(prefix).cidr
self.hostid = hostid
self.assign_address = assign_address
self.updown_script = updown_script
self.serverintf = serverintf
super().__init__(session, _id, name, start, server)
def add_addresses(self, address):
def add_addresses(self, index):
"""
Add addresses used for created control networks,
:param core.nodes.interfaces.IpAddress address: starting address to use
:return:
:param int index: starting address index
:return: nothing
"""
use_ovs = self.session.options.get_config("ovs") == "True"
address = self.prefix[index]
current = f"{address}/{self.prefix.prefixlen}"
net_client = get_net_client(use_ovs, utils.cmd)
net_client.create_address(self.brname, current)
servers = self.session.distributed.servers
for name in servers:
server = servers[name]
address -= 1
index -= 1
address = self.prefix[index]
current = f"{address}/{self.prefix.prefixlen}"
net_client = get_net_client(use_ovs, server.remote_cmd)
net_client.create_address(self.brname, current)
@ -790,11 +791,9 @@ class CtrlNet(CoreNetwork):
logging.info("added control network bridge: %s %s", self.brname, self.prefix)
if self.hostid and self.assign_address:
address = self.prefix.addr(self.hostid)
self.add_addresses(address)
self.add_addresses(self.hostid)
elif self.assign_address:
address = self.prefix.max_addr()
self.add_addresses(address)
self.add_addresses(-2)
if self.updown_script:
logging.info(
@ -908,15 +907,11 @@ class PtpNet(CoreNetwork):
for address in if1.addrlist:
ip, _sep, mask = address.partition("/")
mask = int(mask)
if ipaddress.is_ipv4_address(ip):
family = AF_INET
ipl = socket.inet_pton(family, ip)
interface1_ip4 = ipaddress.IpAddress(af=family, address=ipl)
if netaddr.valid_ipv4(ip):
interface1_ip4 = ip
interface1_ip4_mask = mask
else:
family = AF_INET6
ipl = socket.inet_pton(family, ip)
interface1_ip6 = ipaddress.IpAddress(af=family, address=ipl)
interface1_ip6 = ip
interface1_ip6_mask = mask
interface2_ip4 = None
@ -926,15 +921,11 @@ class PtpNet(CoreNetwork):
for address in if2.addrlist:
ip, _sep, mask = address.partition("/")
mask = int(mask)
if ipaddress.is_ipv4_address(ip):
family = AF_INET
ipl = socket.inet_pton(family, ip)
interface2_ip4 = ipaddress.IpAddress(af=family, address=ipl)
if netaddr.valid_ipv4(ip):
interface2_ip4 = ip
interface2_ip4_mask = mask
else:
family = AF_INET6
ipl = socket.inet_pton(family, ip)
interface2_ip6 = ipaddress.IpAddress(af=family, address=ipl)
interface2_ip6 = ip
interface2_ip6_mask = mask
link_data = LinkData(

View file

@ -59,19 +59,30 @@ class PhysicalNode(CoreNodeBase):
def sethwaddr(self, ifindex, addr):
"""
Set hardware address for an interface.
:param int ifindex: index of interface to set hardware address for
:param str addr: hardware address to set
:return: nothing
:raises CoreCommandError: when a non-zero exit status occurs
"""
addr = utils.validate_mac(addr)
interface = self._netif[ifindex]
interface.sethwaddr(addr)
if self.up:
self.net_client.device_mac(interface.name, str(addr))
self.net_client.device_mac(interface.name, addr)
def addaddr(self, ifindex, addr):
"""
Add an address to an interface.
:param int ifindex: index of interface to add address to
:param str addr: address to add
:return: nothing
"""
addr = utils.validate_ip(addr)
interface = self._netif[ifindex]
if self.up:
self.net_client.create_address(interface.name, str(addr))
self.net_client.create_address(interface.name, addr)
interface.addaddr(addr)
def deladdr(self, ifindex, addr):
@ -408,9 +419,9 @@ class Rj45Node(CoreNodeBase, CoreInterface):
:return: nothing
:raises CoreCommandError: when there is a command exception
"""
addr = utils.validate_ip(addr)
if self.up:
self.net_client.create_address(self.name, str(addr))
self.net_client.create_address(self.name, addr)
CoreInterface.addaddr(self, addr)
def deladdr(self, addr):

View file

@ -1,7 +1,8 @@
"""
bird.py: defines routing services provided by the BIRD Internet Routing Daemon.
"""
from core.nodes import ipaddress
import netaddr
from core.services.coreservices import CoreService
@ -39,7 +40,7 @@ class Bird(CoreService):
continue
for a in ifc.addrlist:
a = a.split("/")[0]
if ipaddress.is_ipv4_address(a):
if netaddr.valid_ipv4(a):
return a
# raise ValueError, "no IPv4 address found for router ID"
return "0.0.0.0"

View file

@ -2,10 +2,10 @@
frr.py: defines routing services provided by FRRouting.
Assumes installation of FRR via https://deb.frrouting.org/
"""
import netaddr
from core import constants
from core.emulator.enumerations import LinkTypes
from core.nodes import ipaddress
from core.nodes.network import PtpNet
from core.nodes.physical import Rj45Node
from core.services.coreservices import CoreService
@ -85,7 +85,7 @@ class FRRZebra(CoreService):
if want_ipv4:
ipv4list = filter(
lambda x: ipaddress.is_ipv4_address(x.split("/")[0]), ifc.addrlist
lambda x: netaddr.valid_ipv4(x.split("/")[0]), ifc.addrlist
)
cfg += " "
cfg += "\n ".join(map(cls.addrstr, ipv4list))
@ -93,7 +93,7 @@ class FRRZebra(CoreService):
cfg += cfgv4
if want_ipv6:
ipv6list = filter(
lambda x: ipaddress.is_ipv6_address(x.split("/")[0]), ifc.addrlist
lambda x: netaddr.valid_ipv6(x.split("/")[0]), ifc.addrlist
)
cfg += " "
cfg += "\n ".join(map(cls.addrstr, ipv6list))
@ -113,9 +113,9 @@ class FRRZebra(CoreService):
helper for mapping IP addresses to zebra config statements
"""
addr = x.split("/")[0]
if ipaddress.is_ipv4_address(addr):
if netaddr.valid_ipv4(addr):
return "ip address %s" % x
elif ipaddress.is_ipv6_address(addr):
elif netaddr.valid_ipv6(addr):
return "ipv6 address %s" % x
else:
raise ValueError("invalid address: %s", x)
@ -330,7 +330,7 @@ class FrrService(CoreService):
continue
for a in ifc.addrlist:
a = a.split("/")[0]
if ipaddress.is_ipv4_address(a):
if netaddr.valid_ipv4(a):
return a
# raise ValueError, "no IPv4 address found for router ID"
return "0.0.0.0"
@ -414,29 +414,15 @@ class FRROspfv2(FrrService):
continue
for a in ifc.addrlist:
addr = a.split("/")[0]
if not ipaddress.is_ipv4_address(addr):
if not netaddr.valid_ipv4(addr):
continue
net = ipaddress.Ipv4Prefix(a)
cfg += " network %s area 0\n" % net
cfg += " network %s area 0\n" % a
cfg += "!\n"
return cfg
@classmethod
def generatefrrifcconfig(cls, node, ifc):
return cls.mtucheck(ifc)
# cfg = cls.mtucheck(ifc)
# external RJ45 connections will use default OSPF timers
# if cls.rj45check(ifc):
# return cfg
# cfg += cls.ptpcheck(ifc)
# return cfg + """\
# ip ospf hello-interval 2
# ip ospf dead-interval 6
# ip ospf retransmit-interval 5
# """
class FRROspfv3(FrrService):

View file

@ -2,10 +2,9 @@
nrl.py: defines services provided by NRL protolib tools hosted here:
http://www.nrl.navy.mil/itd/ncs/products
"""
import netaddr
from core import utils
from core.nodes import ipaddress
from core.nodes.ipaddress import Ipv4Prefix
from core.services.coreservices import CoreService
@ -38,9 +37,8 @@ class NrlService(CoreService):
continue
for a in ifc.addrlist:
a = a.split("/")[0]
if ipaddress.is_ipv4_address(a):
pre = Ipv4Prefix("%s/%s" % (a, prefixlen))
return str(pre)
if netaddr.valid_ipv4(a):
return f"{a}/{prefixlen}"
# raise ValueError, "no IPv4 address found"
return "0.0.0.0/%s" % prefixlen

View file

@ -1,11 +1,11 @@
"""
quagga.py: defines routing services provided by Quagga.
"""
import netaddr
from core import constants
from core.emane.nodes import EmaneNet
from core.emulator.enumerations import LinkTypes
from core.nodes import ipaddress
from core.nodes.network import PtpNet, WlanNode
from core.nodes.physical import Rj45Node
from core.services.coreservices import CoreService
@ -82,7 +82,7 @@ class Zebra(CoreService):
if want_ipv4:
ipv4list = filter(
lambda x: ipaddress.is_ipv4_address(x.split("/")[0]), ifc.addrlist
lambda x: netaddr.valid_ipv4(x.split("/")[0]), ifc.addrlist
)
cfg += " "
cfg += "\n ".join(map(cls.addrstr, ipv4list))
@ -90,7 +90,7 @@ class Zebra(CoreService):
cfg += cfgv4
if want_ipv6:
ipv6list = filter(
lambda x: ipaddress.is_ipv6_address(x.split("/")[0]), ifc.addrlist
lambda x: netaddr.valid_ipv6(x.split("/")[0]), ifc.addrlist
)
cfg += " "
cfg += "\n ".join(map(cls.addrstr, ipv6list))
@ -110,9 +110,9 @@ class Zebra(CoreService):
helper for mapping IP addresses to zebra config statements
"""
addr = x.split("/")[0]
if ipaddress.is_ipv4_address(addr):
if netaddr.valid_ipv4(addr):
return "ip address %s" % x
elif ipaddress.is_ipv6_address(addr):
elif netaddr.valid_ipv6(addr):
return "ipv6 address %s" % x
else:
raise ValueError("invalid address: %s", x)
@ -257,7 +257,7 @@ class QuaggaService(CoreService):
continue
for a in ifc.addrlist:
a = a.split("/")[0]
if ipaddress.is_ipv4_address(a):
if netaddr.valid_ipv4(a):
return a
# raise ValueError, "no IPv4 address found for router ID"
return "0.0.0.0"
@ -341,28 +341,14 @@ class Ospfv2(QuaggaService):
continue
for a in ifc.addrlist:
addr = a.split("/")[0]
if ipaddress.is_ipv4_address(addr):
net = ipaddress.Ipv4Prefix(a)
cfg += " network %s area 0\n" % net
if netaddr.valid_ipv4(addr):
cfg += " network %s area 0\n" % a
cfg += "!\n"
return cfg
@classmethod
def generatequaggaifcconfig(cls, node, ifc):
return cls.mtucheck(ifc)
# cfg = cls.mtucheck(ifc)
# external RJ45 connections will use default OSPF timers
# if cls.rj45check(ifc):
# return cfg
# cfg += cls.ptpcheck(ifc)
# return cfg + """\
# ip ospf hello-interval 2
# ip ospf dead-interval 6
# ip ospf retransmit-interval 5
# """
class Ospfv3(QuaggaService):

View file

@ -4,7 +4,8 @@ sdn.py defines services to start Open vSwitch and the Ryu SDN Controller.
import re
from core.nodes import ipaddress
import netaddr
from core.services.coreservices import CoreService
@ -41,7 +42,7 @@ class OvsService(SdnService):
cfg += "# auto-generated by OvsService (OvsService.py)\n"
cfg += "/etc/init.d/openvswitch-switch start < /dev/null\n"
cfg += "ovs-vsctl add-br ovsbr0 -- set Bridge ovsbr0 fail-mode=secure\n"
cfg += "ifconfig ovsbr0 up\n"
cfg += "ip link set dev ovsbr0 up\n"
for ifc in node.netifs():
if hasattr(ifc, "control") and ifc.control is True:
@ -51,18 +52,18 @@ class OvsService(SdnService):
# create virtual interfaces
cfg += "ip link add rtr%s type veth peer name sw%s\n" % (ifnum, ifnum)
cfg += "ifconfig rtr%s up\n" % ifnum
cfg += "ifconfig sw%s up\n" % ifnum
cfg += "ip link set dev rtr%s up\n" % ifnum
cfg += "ip link set dev sw%s up\n" % ifnum
# remove ip address of eths because quagga/zebra will assign same IPs to rtr interfaces
# or assign them manually to rtr interfaces if zebra is not running
for ifcaddr in ifc.addrlist:
addr = ifcaddr.split("/")[0]
if ipaddress.is_ipv4_address(addr):
if netaddr.valid_ipv4(addr):
cfg += "ip addr del %s dev %s\n" % (ifcaddr, ifc.name)
if has_zebra == 0:
cfg += "ip addr add %s dev rtr%s\n" % (ifcaddr, ifnum)
elif ipaddress.is_ipv6_address(addr):
elif netaddr.valid_ipv6(addr):
cfg += "ip -6 addr del %s dev %s\n" % (ifcaddr, ifc.name)
if has_zebra == 0:
cfg += "ip -6 addr add %s dev rtr%s\n" % (ifcaddr, ifnum)

View file

@ -4,10 +4,10 @@ utility.py: defines miscellaneous utility services.
import os
import netaddr
from core import constants, utils
from core.errors import CoreCommandError
from core.nodes import ipaddress
from core.nodes.ipaddress import Ipv4Prefix, Ipv6Prefix
from core.services.coreservices import CoreService, ServiceMode
@ -88,19 +88,15 @@ class DefaultRouteService(UtilService):
@staticmethod
def addrstr(x):
addr = x.split("/")[0]
if ipaddress.is_ipv6_address(addr):
net = Ipv6Prefix(x)
else:
net = Ipv4Prefix(x)
if net.max_addr() == net.min_addr():
net = netaddr.IPNetwork(x)
if net[1] == net[-2]:
return ""
else:
if os.uname()[0] == "Linux":
rtcmd = "ip route add default via"
else:
raise Exception("unknown platform")
return "%s %s" % (rtcmd, net.min_addr())
return "%s %s" % (rtcmd, net[1])
class DefaultMulticastRouteService(UtilService):
@ -150,20 +146,19 @@ class StaticRouteService(UtilService):
@staticmethod
def routestr(x):
addr = x.split("/")[0]
if ipaddress.is_ipv6_address(addr):
net = Ipv6Prefix(x)
if netaddr.valid_ipv6(addr):
dst = "3ffe:4::/64"
else:
net = Ipv4Prefix(x)
dst = "10.9.8.0/24"
if net.max_addr() == net.min_addr():
net = netaddr.IPNetwork(x)
if net[-2] == net[1]:
return ""
else:
if os.uname()[0] == "Linux":
rtcmd = "#/sbin/ip route add %s via" % dst
else:
raise Exception("unknown platform")
return "%s %s" % (rtcmd, net.min_addr())
return "%s %s" % (rtcmd, net[1])
class SshService(UtilService):
@ -285,14 +280,14 @@ ddns-update-style none;
for inclusion in the dhcpd3 config file.
"""
addr = x.split("/")[0]
if ipaddress.is_ipv6_address(addr):
if netaddr.valid_ipv6(addr):
return ""
else:
addr = x.split("/")[0]
net = Ipv4Prefix(x)
net = netaddr.IPNetwork(x)
# divide the address space in half
rangelow = net.addr(net.num_addr() / 2)
rangehigh = net.max_addr()
index = (net.size - 2) / 2
rangelow = net[index]
rangehigh = net[-2]
return """
subnet %s netmask %s {
pool {
@ -302,8 +297,8 @@ subnet %s netmask %s {
}
}
""" % (
net.prefix_str(),
net.netmask_str(),
net.ip,
net.netmask,
rangelow,
rangehigh,
addr,
@ -708,9 +703,8 @@ interface %s
for inclusion in the RADVD config file.
"""
addr = x.split("/")[0]
if ipaddress.is_ipv6_address(addr):
net = Ipv6Prefix(x)
return str(net)
if netaddr.valid_ipv6(addr):
return x
else:
return ""

View file

@ -4,7 +4,8 @@ xorp.py: defines routing services provided by the XORP routing suite.
import logging
from core.nodes import ipaddress
import netaddr
from core.services.coreservices import CoreService
@ -152,7 +153,7 @@ class XorpService(CoreService):
continue
for a in ifc.addrlist:
a = a.split("/")[0]
if ipaddress.is_ipv4_address(a):
if netaddr.valid_ipv4(a):
return a
# raise ValueError, "no IPv4 address found for router ID"
return "0.0.0.0"
@ -190,7 +191,7 @@ class XorpOspfv2(XorpService):
cfg += "\t\tvif %s {\n" % ifc.name
for a in ifc.addrlist:
addr = a.split("/")[0]
if not ipaddress.is_ipv4_address(addr):
if not netaddr.valid_ipv4(addr):
continue
cfg += "\t\t address %s {\n" % addr
cfg += "\t\t }\n"
@ -283,7 +284,7 @@ class XorpRip(XorpService):
cfg += "\t vif %s {\n" % ifc.name
for a in ifc.addrlist:
addr = a.split("/")[0]
if not ipaddress.is_ipv4_address(addr):
if not netaddr.valid_ipv4(addr):
continue
cfg += "\t\taddress %s {\n" % addr
cfg += "\t\t disable: false\n"
@ -465,7 +466,7 @@ class XorpOlsr(XorpService):
cfg += "\t vif %s {\n" % ifc.name
for a in ifc.addrlist:
addr = a.split("/")[0]
if not ipaddress.is_ipv4_address(addr):
if not netaddr.valid_ipv4(addr):
continue
cfg += "\t\taddress %s {\n" % addr
cfg += "\t\t}\n"

View file

@ -11,11 +11,14 @@ import json
import logging
import logging.config
import os
import random
import shlex
import sys
from subprocess import PIPE, STDOUT, Popen
from core.errors import CoreCommandError
import netaddr
from core.errors import CoreCommandError, CoreError
DEVNULL = open(os.devnull, "wb")
@ -408,3 +411,48 @@ def threadpool(funcs, workers=10):
except Exception as e:
exceptions.append(e)
return results, exceptions
def random_mac():
"""
Create a random mac address using Xen OID 00:16:3E.
:return: random mac address
:rtype: str
"""
value = random.randint(0, 0xFFFFFF)
value |= 0x00163E << 24
mac = netaddr.EUI(value)
mac.dialect = netaddr.mac_unix
return str(mac)
def validate_mac(value):
"""
Validate mac and return unix formatted version.
:param str value: address to validate
:return: unix formatted mac
:rtype: str
"""
try:
mac = netaddr.EUI(value)
mac.dialect = netaddr.mac_unix
return str(mac)
except netaddr.AddrFormatError as e:
raise CoreError(f"invalid mac address {value}: {e}")
def validate_ip(value):
"""
Validate ip address with prefix and return formatted version.
:param str value: address to validate
:return: formatted ip address
:rtype: str
"""
try:
ip = netaddr.IPNetwork(value)
return str(ip)
except (ValueError, netaddr.AddrFormatError) as e:
raise CoreError(f"invalid ip address {value}: {e}")

View file

@ -8,7 +8,6 @@ from core.emane.nodes import EmaneNet
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import NodeTypes
from core.nodes.base import CoreNetworkBase
from core.nodes.ipaddress import MacAddress
from core.nodes.network import CtrlNet
@ -48,8 +47,6 @@ def create_interface_data(interface_element):
interface_id = int(interface_element.get("id"))
name = interface_element.get("name")
mac = interface_element.get("mac")
if mac:
mac = MacAddress.from_string(mac)
ip4 = interface_element.get("ip4")
ip4_mask = get_int(interface_element, "ip4_mask")
ip6 = interface_element.get("ip6")

View file

@ -1,12 +1,12 @@
import os
import socket
import netaddr
from lxml import etree
from core import utils
from core.constants import IP_BIN
from core.emane.nodes import EmaneNet
from core.nodes import ipaddress
from core.nodes.base import CoreNodeBase
@ -56,9 +56,9 @@ def add_emane_interface(host_element, netif, platform_name="p1", transport_name=
def get_address_type(address):
addr, _slash, _prefixlen = address.partition("/")
if ipaddress.is_ipv4_address(addr):
if netaddr.valid_ipv4(addr):
address_type = "IPv4"
elif ipaddress.is_ipv6_address(addr):
elif netaddr.valid_ipv6(addr):
address_type = "IPv6"
else:
raise NotImplementedError

View file

@ -5,7 +5,6 @@ from tempfile import NamedTemporaryFile
from lxml import etree
from core import utils
from core.nodes.ipaddress import MacAddress
from core.xml import corexml
_hwaddr_prefix = "02:02"
@ -208,7 +207,7 @@ def build_node_platform_xml(emane_manager, control_net, node, nem_id, platform_x
node.setnemid(netif, nem_id)
macstr = _hwaddr_prefix + ":00:00:"
macstr += f"{(nem_id >> 8) & 0xFF:02X}:{nem_id & 0xFF:02X}"
netif.sethwaddr(MacAddress.from_string(macstr))
netif.sethwaddr(macstr)
# increment nem id
nem_id += 1

View file

@ -5,6 +5,7 @@ import os
import time
import mock
import netaddr
import pytest
from mock import MagicMock
@ -26,7 +27,6 @@ from core.emulator.enumerations import (
)
from core.errors import CoreError
from core.location.mobility import BasicRangeModel
from core.nodes.ipaddress import Ipv4Prefix
def dict_to_str(values):
@ -101,8 +101,8 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
switch = 2
coretlv.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[
@ -125,8 +125,8 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
switch = 2
coretlv.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[
@ -149,9 +149,9 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
node_two = 2
coretlv.session.add_node(_id=node_two)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
interface_two = ip_prefix.addr(node_two)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
interface_two = str(ip_prefix[node_two])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[
@ -179,8 +179,8 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
switch = 2
coretlv.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[
@ -221,9 +221,9 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
node_two = 2
coretlv.session.add_node(_id=node_two)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
interface_two = ip_prefix.addr(node_two)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
interface_two = str(ip_prefix[node_two])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[
@ -265,8 +265,8 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
switch = 2
coretlv.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[
@ -301,8 +301,8 @@ class TestGui:
coretlv.session.add_node(_id=node_one)
switch = 2
coretlv.session.add_node(_id=switch, _type=NodeTypes.SWITCH)
ip_prefix = Ipv4Prefix("10.0.0.0/24")
interface_one = ip_prefix.addr(node_one)
ip_prefix = netaddr.IPNetwork("10.0.0.0/24")
interface_one = str(ip_prefix[node_one])
message = coreapi.CoreLinkMessage.create(
MessageFlags.ADD.value,
[

View file

@ -47,6 +47,54 @@ class TestNodes:
with pytest.raises(CoreError):
session.get_node(node.id)
def test_node_sethwaddr(self, session):
# given
node = session.add_node()
index = node.newnetif()
interface = node.netif(index)
mac = "aa:aa:aa:ff:ff:ff"
# when
node.sethwaddr(index, mac)
# then
assert interface.hwaddr == mac
def test_node_sethwaddr_exception(self, session):
# given
node = session.add_node()
index = node.newnetif()
node.netif(index)
mac = "aa:aa:aa:ff:ff:fff"
# when
with pytest.raises(CoreError):
node.sethwaddr(index, mac)
def test_node_addaddr(self, session):
# given
node = session.add_node()
index = node.newnetif()
interface = node.netif(index)
addr = "192.168.0.1/24"
# when
node.addaddr(index, addr)
# then
assert interface.addrlist[0] == addr
def test_node_addaddr_exception(self, session):
# given
node = session.add_node()
index = node.newnetif()
node.netif(index)
addr = "256.168.0.1/24"
# when
with pytest.raises(CoreError):
node.addaddr(index, addr)
@pytest.mark.parametrize("net_type", NET_TYPES)
def test_net(self, session, net_type):
# given

View file

@ -1,4 +1,8 @@
import netaddr
import pytest
from core import utils
from core.errors import CoreError
class TestUtils:
@ -20,3 +24,43 @@ class TestUtils:
assert len(one_arg) == 1
assert len(two_args) == 2
assert len(unicode_args) == 3
@pytest.mark.parametrize(
"data,expected",
[
("127", "127.0.0.0/32"),
("10.0.0.1/24", "10.0.0.1/24"),
("2001::", "2001::/128"),
("2001::/64", "2001::/64"),
],
)
def test_validate_ip(self, data, expected):
value = utils.validate_ip(data)
assert value == expected
@pytest.mark.parametrize("data", ["256", "1270.0.0.1", "127.0.0.0.1"])
def test_validate_ip_exception(self, data):
with pytest.raises(CoreError):
utils.validate_ip("")
@pytest.mark.parametrize(
"data,expected",
[
("AA-AA-AA-FF-FF-FF", "aa:aa:aa:ff:ff:ff"),
("AA:AA:AA:FF:FF:FF", "aa:aa:aa:ff:ff:ff"),
],
)
def test_validate_mac(self, data, expected):
value = utils.validate_mac(data)
assert value == expected
@pytest.mark.parametrize(
"data", ["AAA:AA:AA:FF:FF:FF", "AA:AA:AA:FF:FF", "AA/AA/AA/FF/FF/FF"]
)
def test_validate_mac_exception(self, data):
with pytest.raises(CoreError):
utils.validate_mac(data)
def test_random_mac(self):
value = utils.random_mac()
assert netaddr.EUI(value) is not None

View file

@ -8,13 +8,12 @@ import logging
import optparse
import sys
import netaddr
import ns.core
import ns.mobility
from corens3.obj import Ns3LteNet
from corens3.obj import Ns3Session
from core.nodes import ipaddress
def ltesession(opt):
"""
@ -28,10 +27,10 @@ def ltesession(opt):
stream = ascii_helper.CreateFileStream('/tmp/ns3lte.tr')
lte.lte.EnableAsciiAll(stream)
prefix = ipaddress.Ipv4Prefix("10.0.0.0/16")
prefix = netaddr.IPNetwork("10.0.0.0/16")
mobb = None
nodes = []
for i in xrange(1, opt.numnodes + 1):
for i in range(1, opt.numnodes + 1):
node = session.addnode(name="n%d" % i)
mob = ns.mobility.ConstantPositionMobilityModel()
mob.SetPosition(ns.core.Vector3D(10.0 * i, 0.0, 0.0))
@ -39,7 +38,7 @@ def ltesession(opt):
# first node is nodeb
lte.setnodeb(node)
mobb = mob
node.newnetif(lte, ["%s/%s" % (prefix.addr(i), prefix.prefixlen)])
node.newnetif(lte, ["%s/%s" % (prefix[i], prefix.prefixlen)])
nodes.append(node)
if i == 1:
_tmp, ns3dev = lte.findns3dev(node)

View file

@ -26,12 +26,11 @@ import logging
import optparse
import sys
import netaddr
import ns.core
from corens3.obj import Ns3Session
from corens3.obj import Ns3WifiNet
from core.nodes import ipaddress
def add_to_server(session):
"""
@ -60,11 +59,11 @@ def wifisession(opt):
wifi.setposition(30, 30, 0)
wifi.phy.Set("RxGain", ns.core.DoubleValue(18.0))
prefix = ipaddress.Ipv4Prefix("10.0.0.0/16")
prefix = netaddr.IPNetwork("10.0.0.0/16")
nodes = []
for i in xrange(1, opt.numnodes + 1):
for i in range(1, opt.numnodes + 1):
node = session.addnode(name="n%d" % i)
node.newnetif(wifi, ["%s/%s" % (prefix.addr(i), prefix.prefixlen)])
node.newnetif(wifi, ["%s/%s" % (prefix[i], prefix.prefixlen)])
nodes.append(node)
session.setupconstantmobility()
wifi.usecorepositions()

View file

@ -16,13 +16,12 @@ import optparse
import sys
from builtins import range
import netaddr
import ns.core
import ns.network
from corens3.obj import Ns3Session
from corens3.obj import Ns3WifiNet
from core.nodes import ipaddress
def add_to_server(session):
"""
@ -51,12 +50,12 @@ def wifisession(opt):
# for improved connectivity
wifi.phy.Set("RxGain", ns.core.DoubleValue(18.0))
prefix = ipaddress.Ipv4Prefix("10.0.0.0/16")
prefix = netaddr.IPNetwork("10.0.0.0/16")
services_str = "zebra|OSPFv3MDR|IPForward"
nodes = []
for i in range(1, opt.numnodes + 1):
node = session.addnode(name="n%d" % i)
node.newnetif(wifi, ["%s/%s" % (prefix.addr(i), prefix.prefixlen)])
node.newnetif(wifi, ["%s/%s" % (prefix[i], prefix.prefixlen)])
nodes.append(node)
session.services.add_services(node, "router", services_str.split("|"))
session.services.boot_services(node)

View file

@ -14,11 +14,10 @@ import optparse
import sys
from builtins import range
import netaddr
from corens3.obj import Ns3Session
from corens3.obj import Ns3WimaxNet
from core.nodes import ipaddress
def wimaxsession(opt):
"""
@ -28,7 +27,7 @@ def wimaxsession(opt):
wimax = session.create_node(cls=Ns3WimaxNet, name="wlan1")
# wimax.wimax.EnableLogComponents()
prefix = ipaddress.Ipv4Prefix("10.0.0.0/16")
prefix = netaddr.IPNetwork("10.0.0.0/16")
# create one classifier for ICMP (protocol 1) traffic
# src port low/high, dst port low/high, protocol, priority
# classifier = (0, 65000, 0, 65000, 1, 1)
@ -38,7 +37,7 @@ def wimaxsession(opt):
node = session.addnode(name="n%d" % i)
if i == 1:
wimax.setbasestation(node)
node.newnetif(wimax, ["%s/%s" % (prefix.addr(i), prefix.prefixlen)])
node.newnetif(wimax, ["%s/%s" % (prefix[i], prefix.prefixlen)])
if i > 2:
wimax.addflow(nodes[-1], node, classifier, classifier)
nodes.append(node)