diff --git a/daemon/scripts/core-cli b/daemon/scripts/core-cli index ca9011b5..a8354f65 100755 --- a/daemon/scripts/core-cli +++ b/daemon/scripts/core-cli @@ -11,6 +11,7 @@ from typing import Tuple import netaddr from google.protobuf.json_format import MessageToJson +from netaddr import EUI, AddrFormatError, IPNetwork from core.api.grpc.client import CoreGrpcClient from core.api.grpc.core_pb2 import ( @@ -27,21 +28,31 @@ NODE_TYPES = [k for k, v in NodeType.Enum.items() if v != NodeType.PEER_TO_PEER] def mac_type(value: str) -> str: - if not netaddr.valid_mac(value): - raise ArgumentTypeError("invalid mac address") - return value + try: + mac = EUI(value, dialect=netaddr.mac_unix_expanded) + return str(mac) + except AddrFormatError: + raise ArgumentTypeError(f"invalid mac address: {value}") -def ip4_type(value: str) -> str: - if not netaddr.valid_ipv4(value): - raise ArgumentTypeError("invalid ip4 address") - return value +def ip4_type(value: str) -> IPNetwork: + try: + ip = IPNetwork(value) + if not netaddr.valid_ipv4(str(ip.ip)): + raise ArgumentTypeError(f"invalid ip4 address: {value}") + return ip + except AddrFormatError: + raise ArgumentTypeError(f"invalid ip4 address: {value}") -def ip6_type(value: str) -> str: - if not netaddr.valid_ipv6(value): - raise ArgumentTypeError("invalid ip6 address") - return value +def ip6_type(value: str) -> IPNetwork: + try: + ip = IPNetwork(value) + if not netaddr.valid_ipv6(str(ip.ip)): + raise ArgumentTypeError(f"invalid ip6 address: {value}") + return ip + except AddrFormatError: + raise ArgumentTypeError(f"invalid ip6 address: {value}") def position_type(value: str) -> Tuple[float, float]: @@ -78,11 +89,26 @@ def get_current_session() -> int: return response.sessions[0].id -def print_interface_header() -> None: +def create_iface(iface_id: int, mac: str, ip4_net: IPNetwork, ip6_net: IPNetwork) -> Interface: + ip4 = str(ip4_net.ip) if ip4_net else None + ip4_mask = ip4_net.prefixlen if ip4_net else None + ip6 = str(ip6_net.ip) if ip6_net else None + ip6_mask = ip6_net.prefixlen if ip6_net else None + return Interface( + id=iface_id, + mac=mac, + ip4=ip4, + ip4_mask=ip4_mask, + ip6=ip6, + ip6_mask=ip6_mask, + ) + + +def print_iface_header() -> None: print("ID | MAC Address | IP4 Address | IP6 Address") -def print_interface(iface: Interface) -> None: +def print_iface(iface: Interface) -> None: iface_ip4 = f"{iface.ip4}/{iface.ip4_mask}" if iface.ip4 else None iface_ip6 = f"{iface.ip6}/{iface.ip6_mask}" if iface.ip6 else None print(f"{iface.id:<3} | {iface.mac:<11} | {iface_ip4:<18} | {iface_ip6}") @@ -123,13 +149,13 @@ def query_session(args: Namespace) -> None: n1 = names[link.node1_id] n2 = names[link.node2_id] print(f"Node | ", end="") - print_interface_header() + print_iface_header() if link.HasField("iface1"): print(f"{n1:<6} | ", end="") - print_interface(link.iface1) + print_iface(link.iface1) if link.HasField("iface2"): print(f"{n2:<6} | ", end="") - print_interface(link.iface2) + print_iface(link.iface2) print() @@ -146,9 +172,9 @@ def query_node(args: Namespace) -> None: print("ID | Name | Type") print(f"{node.id:<4} | {node.name:<7} | {node_type}") print("Interfaces") - print_interface_header() + print_iface_header() for iface in response.ifaces: - print_interface(iface) + print_iface(iface) def add_node(args: Namespace) -> None: @@ -219,24 +245,10 @@ def add_link(args: Namespace) -> None: session_id = get_current_session() iface1 = None if args.iface1_id is not None: - iface1 = Interface( - id=args.iface1_id, - mac=args.iface1_mac, - ip4=args.iface1_ip4, - ip4_mask=args.iface1_ip4_mask, - ip6=args.iface1_ip4, - ip6_mask=args.iface1_ip6_mask, - ) + iface1 = create_iface(args.iface1_id, args.iface1_mac, args.iface1_ip4, args.iface1_ip6) iface2 = None if args.iface2_id is not None: - iface2 = Interface( - id=args.iface2_id, - mac=args.iface2_mac, - ip4=args.iface2_ip4, - ip4_mask=args.iface2_ip4_mask, - ip6=args.iface2_ip4, - ip6_mask=args.iface2_ip6_mask, - ) + iface2 = create_iface(args.iface2_id, args.iface2_mac, args.iface2_ip4, args.iface2_ip6) options = LinkOptions( bandwidth=args.bandwidth, loss=args.loss, @@ -349,23 +361,11 @@ def setup_link_parser(parent: _SubParsersAction) -> None: add_parser.add_argument("--iface1-id", type=int, help="node1 interface id for link") add_parser.add_argument("--iface1-mac", type=mac_type, help="node1 interface mac") add_parser.add_argument("--iface1-ip4", type=ip4_type, help="node1 interface ip4") - add_parser.add_argument( - "--iface1-ip4-mask", type=int, help="node1 interface ip4 mask" - ) add_parser.add_argument("--iface1-ip6", type=ip6_type, help="node1 interface ip6") - add_parser.add_argument( - "--iface1-ip6-mask", type=int, help="node1 interface ip6 mask" - ) add_parser.add_argument("--iface2-id", type=int, help="node1 interface id for link") add_parser.add_argument("--iface2-mac", type=mac_type, help="node1 interface mac") add_parser.add_argument("--iface2-ip4", type=ip4_type, help="node1 interface ip4") - add_parser.add_argument( - "--iface2-ip4-mask", type=int, help="node1 interface ip4 mask" - ) add_parser.add_argument("--iface2-ip6", type=ip6_type, help="node1 interface ip6") - add_parser.add_argument( - "--iface2-ip6-mask", type=int, help="node1 interface ip6 mask" - ) add_parser.add_argument("--bandwidth", type=int, help="bandwidth (bps) for link") add_parser.add_argument("--loss", type=float, help="loss (%) for link") add_parser.add_argument("--jitter", type=int, help="jitter (us) for link")