core-extra/daemon/scripts/core-route-monitor

212 lines
6.7 KiB
Python
Executable file

#!/usr/bin/env python
import argparse
import enum
import select
import socket
import subprocess
import time
from argparse import ArgumentDefaultsHelpFormatter
from functools import cmp_to_key
from queue import Queue
from threading import Thread
from typing import Dict, Tuple
from core import utils
from core.api.grpc.client import CoreGrpcClient
from core.api.grpc.core_pb2 import NodeType
SDT_HOST = "127.0.0.1"
SDT_PORT = 50000
ROUTE_LAYER = "CORE Route"
DEAD_TIME = 3
ROUTE_TIME = 3
PACKET_CHOICES = ["udp", "tcp", "icmp"]
class RouteEnum(enum.Enum):
ADD = 0
DEL = 1
class SdtClient:
def __init__(self, address: Tuple[str, int]) -> None:
self.sock = socket.create_connection(address)
self.links = []
self.send(f'layer "{ROUTE_LAYER}"')
def close(self) -> None:
self.sock.close()
def send(self, cmd: str) -> None:
sdt_cmd = f"{cmd}\n".encode()
self.sock.sendall(sdt_cmd)
def add_link(self, node1, node2) -> None:
route_id = f"{node1}-{node2}-r"
link_id = f"{node1},{node2},{route_id}"
cmd = f'link {link_id} linkLayer "{ROUTE_LAYER}" line yellow,2'
self.send(cmd)
self.links.append(link_id)
def delete_links(self) -> None:
for link_id in self.links:
cmd = f"delete link,{link_id}"
self.send(cmd)
self.links.clear()
class RouterMonitor:
def __init__(self, src_id: str, src: str, dst: str, pkt: str,
sdt_host: str, sdt_port: int) -> None:
self.queue = Queue()
self.core = CoreGrpcClient()
self.src_id = src_id
self.src = src
self.dst = dst
self.pkt = pkt
self.seen = {}
self.running = False
self.route_time = None
self.listeners = []
self.sdt = SdtClient((sdt_host, sdt_port))
self.nodes = self.get_nodes()
def get_nodes(self) -> Dict[str, str]:
nodes = {}
with self.core.context_connect():
response = self.core.get_sessions()
sessions = response.sessions
session = None
if sessions:
session = sessions[0]
if not session:
raise Exception("no current core sessions")
print("session: ", session.dir)
response = self.core.get_session(session.id)
for node in response.session.nodes:
if node.type != NodeType.DEFAULT:
continue
nodes[node.id] = node.channel
return nodes
def start(self) -> None:
self.running = True
for node_id, node in self.nodes.items():
print("listening on node: ", node)
thread = Thread(target=self.listen, args=(node_id, node), daemon=True)
thread.start()
self.listeners.append(thread)
self.manage()
def manage(self) -> None:
self.route_time = time.monotonic()
while self.running:
route_enum, node, seen = self.queue.get()
if route_enum == RouteEnum.ADD:
self.seen[node] = seen
elif node in self.seen:
del self.seen[node]
if (time.monotonic() - self.route_time) >= ROUTE_TIME:
self.manage_routes()
self.route_time = time.monotonic()
def route_sort(self, x: Tuple[str, int], y: Tuple[str, int]) -> int:
x_node = x[0]
y_node = y[0]
if x_node == self.src_id:
return 1
if y_node == self.src_id:
return -1
x_ttl, y_ttl = x[1], y[1]
return x_ttl - y_ttl
def manage_routes(self) -> None:
self.sdt.delete_links()
if not self.seen:
return
values = sorted(self.seen.items(),
key=cmp_to_key(self.route_sort),
reverse=True)
print("current route:")
for index, node_data in enumerate(values):
next_index = index + 1
if next_index == len(values):
break
next_node_id = values[next_index][0]
node_id, ttl = node_data
print(f"{node_id} -> {next_node_id}")
self.sdt.add_link(node_id, next_node_id)
def stop(self) -> None:
self.running = False
self.sdt.delete_links()
self.sdt.close()
for thread in self.listeners:
thread.join()
self.listeners.clear()
def listen(self, node_id, node) -> None:
cmd = (
f"tcpdump -lnv src host {self.src} and dst host {self.dst} and {self.pkt}"
)
node_cmd = f"vcmd -c {node} -- {cmd}"
p = subprocess.Popen(node_cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
current = time.monotonic()
try:
while not p.poll() and self.running:
ready, _, _ = select.select([p.stdout], [], [], 1)
if ready:
line = p.stdout.readline().strip().decode()
if line:
line = line.split("ttl", 1)[1]
ttl = int(line.split(",", 1)[0])
p.stdout.readline()
self.queue.put((RouteEnum.ADD, node_id, ttl))
current = time.monotonic()
else:
if (time.monotonic() - current) >= DEAD_TIME:
self.queue.put((RouteEnum.DEL, node_id, None))
except Exception as e:
print(f"listener error: {e}")
def main() -> None:
if not utils.which("tcpdump", required=False):
print("core-route-monitor requires tcpdump to be installed")
return
parser = argparse.ArgumentParser(
description="core route monitor",
formatter_class=ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--id", required=True,
help="source node id for determining path")
parser.add_argument("--src", default="10.0.0.20",
help="source address for route monitoring")
parser.add_argument("--dst", default="10.0.2.20",
help="destination address for route monitoring")
parser.add_argument("--pkt", default="icmp", choices=PACKET_CHOICES,
help="packet type")
parser.add_argument("--sdt-host", default=SDT_HOST, help="sdt host address")
parser.add_argument("--sdt-port", type=int, default=SDT_PORT, help="sdt port")
args = parser.parse_args()
monitor = RouterMonitor(
args.id,
args.src,
args.dst,
args.pkt,
args.sdt_host,
args.sdt_port,
)
try:
monitor.start()
except KeyboardInterrupt:
monitor.stop()
print("ending route monitor")
if __name__ == "__main__":
main()