core-extra/daemon/scripts/core-cli

541 lines
20 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
from argparse import (
ArgumentDefaultsHelpFormatter,
ArgumentParser,
ArgumentTypeError,
Namespace,
_SubParsersAction,
)
from functools import wraps
from pathlib import Path
from typing import Any, Optional, Tuple
import grpc
import netaddr
from google.protobuf.json_format import MessageToJson
from netaddr import EUI, AddrFormatError, IPNetwork
from core.api.grpc.client import CoreGrpcClient
from core.api.grpc.core_pb2 import (
Geo,
Interface,
LinkOptions,
Node,
NodeType,
Position,
SessionState,
)
NODE_TYPES = [k for k, v in NodeType.Enum.items() if v != NodeType.PEER_TO_PEER]
def coreclient(func):
@wraps(func)
def wrapper(*args, **kwargs):
core = CoreGrpcClient()
try:
with core.context_connect():
return func(core, *args, **kwargs)
except grpc.RpcError as e:
print(f"grpc error: {e.details()}")
return wrapper
def mac_type(value: str) -> str:
try:
mac = EUI(value, dialect=netaddr.mac_unix_expanded)
return str(mac)
except AddrFormatError:
raise ArgumentTypeError(f"invalid mac address: {value}")
def ip4_type(value: str) -> IPNetwork:
try:
ip = IPNetwork(value)
if not netaddr.valid_ipv4(str(ip.ip)):
raise ArgumentTypeError(f"invalid ip4 address: {value}")
return ip
except AddrFormatError:
raise ArgumentTypeError(f"invalid ip4 address: {value}")
def ip6_type(value: str) -> IPNetwork:
try:
ip = IPNetwork(value)
if not netaddr.valid_ipv6(str(ip.ip)):
raise ArgumentTypeError(f"invalid ip6 address: {value}")
return ip
except AddrFormatError:
raise ArgumentTypeError(f"invalid ip6 address: {value}")
def position_type(value: str) -> Tuple[float, float]:
error = "invalid position, must be in the format: float,float"
try:
values = [float(x) for x in value.split(",")]
except ValueError:
raise ArgumentTypeError(error)
if len(values) != 2:
raise ArgumentTypeError(error)
x, y = values
return x, y
def geo_type(value: str) -> Tuple[float, float, float]:
error = "invalid geo, must be in the format: float,float,float"
try:
values = [float(x) for x in value.split(",")]
except ValueError:
raise ArgumentTypeError(error)
if len(values) != 3:
raise ArgumentTypeError(error)
lon, lat, alt = values
return lon, lat, alt
def file_type(value: str) -> str:
path = Path(value)
if not path.is_file():
raise ArgumentTypeError(f"invalid file: {value}")
return str(path.absolute())
def get_current_session(core: CoreGrpcClient, session_id: Optional[int]) -> int:
if session_id:
return session_id
response = core.get_sessions()
if not response.sessions:
print("no current session to interact with")
sys.exit(1)
return response.sessions[0].id
def create_iface(iface_id: int, mac: str, ip4_net: IPNetwork, ip6_net: IPNetwork) -> Interface:
ip4 = str(ip4_net.ip) if ip4_net else None
ip4_mask = ip4_net.prefixlen if ip4_net else None
ip6 = str(ip6_net.ip) if ip6_net else None
ip6_mask = ip6_net.prefixlen if ip6_net else None
return Interface(
id=iface_id,
mac=mac,
ip4=ip4,
ip4_mask=ip4_mask,
ip6=ip6,
ip6_mask=ip6_mask,
)
def print_iface_header() -> None:
print("ID | MAC Address | IP4 Address | IP6 Address")
def print_iface(iface: Interface) -> None:
iface_ip4 = f"{iface.ip4}/{iface.ip4_mask}" if iface.ip4 else ""
iface_ip6 = f"{iface.ip6}/{iface.ip6_mask}" if iface.ip6 else ""
print(f"{iface.id:<3} | {iface.mac:<17} | {iface_ip4:<18} | {iface_ip6}")
def print_json(message: Any) -> None:
json = MessageToJson(message, preserving_proto_field_name=True)
print(json)
@coreclient
def get_wlan_config(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
response = core.get_wlan_config(session_id, args.node)
if args.json:
print_json(response)
else:
size = 0
for option in response.config.values():
size = max(size, len(option.name))
print(f"{'Name':<{size}.{size}} | Value")
for option in response.config.values():
print(f"{option.name:<{size}.{size}} | {option.value}")
@coreclient
def set_wlan_config(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
config = {}
if args.bandwidth:
config["bandwidth"] = str(args.bandwidth)
if args.delay:
config["delay"] = str(args.delay)
if args.loss:
config["error"] = str(args.loss)
if args.jitter:
config["jitter"] = str(args.jitter)
if args.range:
config["range"] = str(args.range)
response = core.set_wlan_config(session_id, args.node, config)
if args.json:
print_json(response)
else:
print(f"set wlan config: {response.result}")
@coreclient
def open_xml(core: CoreGrpcClient, args: Namespace) -> None:
response = core.open_xml(args.file, args.start)
if args.json:
print_json(response)
else:
print(f"opened xml: {response.result}")
@coreclient
def query_sessions(core: CoreGrpcClient, args: Namespace) -> None:
response = core.get_sessions()
if args.json:
print_json(response)
else:
print("Session ID | Session State | Nodes")
for s in response.sessions:
state = SessionState.Enum.Name(s.state)
print(f"{s.id:<10} | {state:<13} | {s.nodes}")
@coreclient
def query_session(core: CoreGrpcClient, args: Namespace) -> None:
response = core.get_session(args.id)
if args.json:
print_json(response)
else:
print("Nodes")
print("Node ID | Node Name | Node Type")
names = {}
for node in response.session.nodes:
names[node.id] = node.name
node_type = NodeType.Enum.Name(node.type)
print(f"{node.id:<7} | {node.name:<9} | {node_type}")
print("\nLinks")
for link in response.session.links:
n1 = names[link.node1_id]
n2 = names[link.node2_id]
print(f"Node | ", end="")
print_iface_header()
print(f"{n1:<6} | ", end="")
if link.HasField("iface1"):
print_iface(link.iface1)
else:
print()
print(f"{n2:<6} | ", end="")
if link.HasField("iface2"):
print_iface(link.iface2)
else:
print()
print()
@coreclient
def query_node(core: CoreGrpcClient, args: Namespace) -> None:
names = {}
response = core.get_session(args.id)
for node in response.session.nodes:
names[node.id] = node.name
response = core.get_node(args.id, args.node)
if args.json:
print_json(response)
else:
node = response.node
node_type = NodeType.Enum.Name(node.type)
print("ID | Name | Type")
print(f"{node.id:<4} | {node.name:<7} | {node_type}")
print("Interfaces")
print("Connected To | ", end="")
print_iface_header()
for iface in response.ifaces:
if iface.net_id == node.id:
if iface.node_id:
name = names[iface.node_id]
else:
name = names[iface.net2_id]
else:
name = names.get(iface.net_id, "")
print(f"{name:<12} | ", end="")
print_iface(iface)
@coreclient
def add_node(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
node_type = NodeType.Enum.Value(args.type)
pos = None
if args.pos:
x, y = args.pos
pos = Position(x=x, y=y)
geo = None
if args.geo:
lon, lat, alt = args.geo
geo = Geo(lon=lon, lat=lat, alt=alt)
node = Node(
id=args.id,
name=args.name,
type=node_type,
model=args.model,
emane=args.emane,
icon=args.icon,
image=args.image,
position=pos,
geo=geo,
)
response = core.add_node(session_id, node)
if args.json:
print_json(response)
else:
print(f"created node: {response.node_id}")
@coreclient
def edit_node(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
pos = None
if args.pos:
x, y = args.pos
pos = Position(x=x, y=y)
geo = None
if args.geo:
lon, lat, alt = args.geo
geo = Geo(lon=lon, lat=lat, alt=alt)
response = core.edit_node(session_id, args.id, pos, args.icon, geo)
if args.json:
print_json(response)
else:
print(f"edit node: {response.result}")
@coreclient
def delete_node(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
response = core.delete_node(session_id, args.id)
if args.json:
print_json(response)
else:
print(f"deleted node: {response.result}")
@coreclient
def add_link(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
iface1 = None
if args.iface1_id is not None:
iface1 = create_iface(args.iface1_id, args.iface1_mac, args.iface1_ip4, args.iface1_ip6)
iface2 = None
if args.iface2_id is not None:
iface2 = create_iface(args.iface2_id, args.iface2_mac, args.iface2_ip4, args.iface2_ip6)
options = LinkOptions(
bandwidth=args.bandwidth,
loss=args.loss,
jitter=args.jitter,
delay=args.delay,
dup=args.duplicate,
unidirectional=args.uni,
)
response = core.add_link(session_id, args.node1, args.node2, iface1, iface2, options)
if args.json:
print_json(response)
else:
print(f"add link: {response.result}")
@coreclient
def edit_link(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
options = LinkOptions(
bandwidth=args.bandwidth,
loss=args.loss,
jitter=args.jitter,
delay=args.delay,
dup=args.duplicate,
unidirectional=args.uni,
)
response = core.edit_link(
session_id, args.node1, args.node2, options, args.iface1, args.iface2
)
if args.json:
print_json(response)
else:
print(f"edit link: {response.result}")
@coreclient
def delete_link(core: CoreGrpcClient, args: Namespace) -> None:
session_id = get_current_session(core, args.session)
response = core.delete_link(session_id, args.node1, args.node2, args.iface1, args.iface2)
if args.json:
print_json(response)
else:
print(f"delete link: {response.result}")
def setup_node_parser(parent: _SubParsersAction) -> None:
parser = parent.add_parser("node", help="node interactions")
parser.formatter_class = ArgumentDefaultsHelpFormatter
parser.add_argument("-s", "--session", type=int, help="session to interact with")
subparsers = parser.add_subparsers(help="node commands")
subparsers.required = True
subparsers.dest = "command"
add_parser = subparsers.add_parser("add", help="add a node")
add_parser.formatter_class = ArgumentDefaultsHelpFormatter
add_parser.add_argument("-i", "--id", type=int, help="id to use, optional")
add_parser.add_argument("-n", "--name", help="name to use, optional")
add_parser.add_argument(
"-t", "--type", choices=NODE_TYPES, default="DEFAULT", help="type of node"
)
add_parser.add_argument("-m", "--model", help="used to determine services, optional")
group = add_parser.add_mutually_exclusive_group(required=True)
group.add_argument("-p", "--pos", type=position_type, help="x,y position")
group.add_argument("-g", "--geo", type=geo_type, help="lon,lat,alt position")
add_parser.add_argument("-ic", "--icon", help="icon to use, optional")
add_parser.add_argument("-im", "--image", help="container image, optional")
add_parser.add_argument("-e", "--emane", help="emane model, only required for emane nodes")
add_parser.set_defaults(func=add_node)
edit_parser = subparsers.add_parser("edit", help="edit a node")
edit_parser.formatter_class = ArgumentDefaultsHelpFormatter
edit_parser.add_argument("-i", "--id", type=int, help="id to use, optional")
group = edit_parser.add_mutually_exclusive_group(required=True)
group.add_argument("-p", "--pos", type=position_type, help="x,y position")
group.add_argument("-g", "--geo", type=geo_type, help="lon,lat,alt position")
edit_parser.add_argument("-ic", "--icon", help="icon to use, optional")
edit_parser.set_defaults(func=edit_node)
delete_parser = subparsers.add_parser("delete", help="delete a node")
delete_parser.formatter_class = ArgumentDefaultsHelpFormatter
delete_parser.add_argument("-i", "--id", type=int, help="node id", required=True)
delete_parser.set_defaults(func=delete_node)
def setup_link_parser(parent: _SubParsersAction) -> None:
parser = parent.add_parser("link", help="link interactions")
parser.formatter_class = ArgumentDefaultsHelpFormatter
parser.add_argument("-s", "--session", type=int, help="session to interact with")
subparsers = parser.add_subparsers(help="link commands")
subparsers.required = True
subparsers.dest = "command"
add_parser = subparsers.add_parser("add", help="add a node")
add_parser.formatter_class = ArgumentDefaultsHelpFormatter
add_parser.add_argument("-n1", "--node1", type=int, help="node1 id", required=True)
add_parser.add_argument("-n2", "--node2", type=int, help="node2 id", required=True)
add_parser.add_argument("-i1-i", "--iface1-id", type=int, help="node1 interface id")
add_parser.add_argument("-i1-m", "--iface1-mac", type=mac_type, help="node1 interface mac")
add_parser.add_argument("-i1-4", "--iface1-ip4", type=ip4_type, help="node1 interface ip4")
add_parser.add_argument("-i1-6", "--iface1-ip6", type=ip6_type, help="node1 interface ip6")
add_parser.add_argument("-i2-i", "--iface2-id", type=int, help="node2 interface id")
add_parser.add_argument("-i2-m", "--iface2-mac", type=mac_type, help="node2 interface mac")
add_parser.add_argument("-i2-4", "--iface2-ip4", type=ip4_type, help="node2 interface ip4")
add_parser.add_argument("-i2-6", "--iface2-ip6", type=ip6_type, help="node2 interface ip6")
add_parser.add_argument("-b", "--bandwidth", type=int, help="bandwidth (bps)")
add_parser.add_argument("-l", "--loss", type=float, help="loss (%%)")
add_parser.add_argument("-j", "--jitter", type=int, help="jitter (us)")
add_parser.add_argument("-de", "--delay", type=int, help="delay (us)")
add_parser.add_argument("-du", "--duplicate", type=int, help="duplicate (%%)")
add_parser.add_argument("-u", "--uni", action="store_true", help="is link unidirectional?")
add_parser.set_defaults(func=add_link)
edit_parser = subparsers.add_parser("edit", help="edit a link")
edit_parser.formatter_class = ArgumentDefaultsHelpFormatter
edit_parser.add_argument("-n1", "--node1", type=int, help="node1 id", required=True)
edit_parser.add_argument("-n2", "--node2", type=int, help="node2 id", required=True)
edit_parser.add_argument("-i1", "--iface1", type=int, help="node1 interface id")
edit_parser.add_argument("-i2", "--iface2", type=int, help="node2 interface id")
edit_parser.add_argument("-b", "--bandwidth", type=int, help="bandwidth (bps)")
edit_parser.add_argument("-l", "--loss", type=float, help="loss (%%)")
edit_parser.add_argument("-j", "--jitter", type=int, help="jitter (us)")
edit_parser.add_argument("-de", "--delay", type=int, help="delay (us)")
edit_parser.add_argument("-du", "--duplicate", type=int, help="duplicate (%%)")
edit_parser.add_argument(
"-u", "--uni", action="store_true", help="is link unidirectional?"
)
edit_parser.set_defaults(func=edit_link)
delete_parser = subparsers.add_parser("delete", help="delete a link")
delete_parser.formatter_class = ArgumentDefaultsHelpFormatter
delete_parser.add_argument("-n1", "--node1", type=int, help="node1 id", required=True)
delete_parser.add_argument("-n2", "--node2", type=int, help="node1 id", required=True)
delete_parser.add_argument("-i1", "--iface1", type=int, help="node1 interface id")
delete_parser.add_argument("-i2", "--iface2", type=int, help="node2 interface id")
delete_parser.set_defaults(func=delete_link)
def setup_query_parser(parent: _SubParsersAction) -> None:
parser = parent.add_parser("query", help="query interactions")
subparsers = parser.add_subparsers(help="query commands")
subparsers.required = True
subparsers.dest = "command"
sessions_parser = subparsers.add_parser("sessions", help="query current sessions")
sessions_parser.formatter_class = ArgumentDefaultsHelpFormatter
sessions_parser.set_defaults(func=query_sessions)
session_parser = subparsers.add_parser("session", help="query session")
session_parser.formatter_class = ArgumentDefaultsHelpFormatter
session_parser.add_argument("-i", "--id", type=int, help="session to query", required=True)
session_parser.set_defaults(func=query_session)
node_parser = subparsers.add_parser("node", help="query node")
node_parser.formatter_class = ArgumentDefaultsHelpFormatter
node_parser.add_argument("-i", "--id", type=int, help="session to query", required=True)
node_parser.add_argument("-n", "--node", type=int, help="node to query", required=True)
node_parser.set_defaults(func=query_node)
def setup_xml_parser(parent: _SubParsersAction) -> None:
parser = parent.add_parser("xml", help="open session xml")
parser.formatter_class = ArgumentDefaultsHelpFormatter
parser.add_argument("-f", "--file", type=file_type, help="xml file to open", required=True)
parser.add_argument("-s", "--start", action="store_true", help="start the session?")
parser.set_defaults(func=open_xml)
def setup_wlan_parser(parent: _SubParsersAction) -> None:
parser = parent.add_parser("wlan", help="wlan specific interactions")
parser.formatter_class = ArgumentDefaultsHelpFormatter
parser.add_argument("-s", "--session", type=int, help="session to interact with")
subparsers = parser.add_subparsers(help="link commands")
subparsers.required = True
subparsers.dest = "command"
get_parser = subparsers.add_parser("get", help="get wlan configuration")
get_parser.formatter_class = ArgumentDefaultsHelpFormatter
get_parser.add_argument("-n", "--node", type=int, help="wlan node", required=True)
get_parser.set_defaults(func=get_wlan_config)
set_parser = subparsers.add_parser("set", help="set wlan configuration")
set_parser.formatter_class = ArgumentDefaultsHelpFormatter
set_parser.add_argument("-n", "--node", type=int, help="wlan node", required=True)
set_parser.add_argument("-b", "--bandwidth", type=int, help="bandwidth (bps)")
set_parser.add_argument("-d", "--delay", type=int, help="delay (us)")
set_parser.add_argument("-l", "--loss", type=float, help="loss (%%)")
set_parser.add_argument("-j", "--jitter", type=int, help="jitter (us)")
set_parser.add_argument("-r", "--range", type=int, help="range (pixels)")
set_parser.set_defaults(func=set_wlan_config)
def main() -> None:
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument(
"-js", "--json", action="store_true", help="print responses to terminal as json"
)
subparsers = parser.add_subparsers(help="supported commands")
subparsers.required = True
subparsers.dest = "command"
setup_node_parser(subparsers)
setup_link_parser(subparsers)
setup_query_parser(subparsers)
setup_xml_parser(subparsers)
setup_wlan_parser(subparsers)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()