Merge pull request #425 from coreemu/enhancement/route-monitor

Enhancement/route monitor
This commit is contained in:
bharnden 2020-04-06 16:54:33 -07:00 committed by GitHub
commit 0db119a9ae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 262 additions and 62 deletions

View file

@ -8,6 +8,7 @@ from core import utils
from core.api.grpc import common_pb2, core_pb2
from core.api.grpc.services_pb2 import NodeServiceData, ServiceConfig
from core.config import ConfigurableOptions
from core.emane.nodes import EmaneNet
from core.emulator.data import LinkData
from core.emulator.emudata import InterfaceData, LinkOptions, NodeOptions
from core.emulator.enumerations import LinkTypes, NodeTypes
@ -221,6 +222,47 @@ def get_config_options(
return results
def get_node_proto(session: Session, node: NodeBase) -> core_pb2.Node:
"""
Convert CORE node to protobuf representation.
:param session: session containing node
:param node: node to convert
:return: node proto
"""
node_type = session.get_node_type(node.__class__)
position = core_pb2.Position(
x=node.position.x, y=node.position.y, z=node.position.z
)
services = getattr(node, "services", [])
if services is None:
services = []
services = [x.name for x in services]
config_services = getattr(node, "config_services", {})
config_services = [x for x in config_services]
emane_model = None
if isinstance(node, EmaneNet):
emane_model = node.model.name
model = getattr(node, "type", None)
node_dir = getattr(node, "nodedir", None)
channel = getattr(node, "ctrlchnlname", None)
image = getattr(node, "image", None)
return core_pb2.Node(
id=node.id,
name=node.name,
emane=emane_model,
model=model,
type=node_type.value,
position=position,
services=services,
icon=node.icon,
image=image,
config_services=config_services,
dir=node_dir,
channel=channel,
)
def get_links(session: Session, node: NodeBase):
"""
Retrieve a list of links for grpc to use

View file

@ -103,7 +103,6 @@ from core.api.grpc.wlan_pb2 import (
SetWlanConfigRequest,
SetWlanConfigResponse,
)
from core.emane.nodes import EmaneNet
from core.emulator.coreemu import CoreEmu
from core.emulator.data import LinkData
from core.emulator.emudata import LinkOptions, NodeOptions
@ -112,8 +111,6 @@ from core.emulator.session import Session
from core.errors import CoreCommandError, CoreError
from core.location.mobility import BasicRangeModel, Ns2ScriptedMobility
from core.nodes.base import CoreNodeBase, NodeBase
from core.nodes.docker import DockerNode
from core.nodes.lxd import LxcNode
from core.services.coreservices import ServiceManager
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
@ -373,6 +370,7 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
state=session.state.value,
nodes=session.get_node_count(),
file=session.file_name,
dir=session.session_dir,
)
sessions.append(session_summary)
return core_pb2.GetSessionsResponse(sessions=sessions)
@ -543,44 +541,13 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node = session.nodes[_id]
if not isinstance(node.id, int):
continue
node_type = session.get_node_type(node.__class__)
model = getattr(node, "type", None)
position = core_pb2.Position(
x=node.position.x, y=node.position.y, z=node.position.z
)
services = getattr(node, "services", [])
if services is None:
services = []
services = [x.name for x in services]
config_services = getattr(node, "config_services", {})
config_services = [x for x in config_services]
emane_model = None
if isinstance(node, EmaneNet):
emane_model = node.model.name
image = getattr(node, "image", None)
node_proto = core_pb2.Node(
id=node.id,
name=node.name,
emane=emane_model,
model=model,
type=node_type.value,
position=position,
services=services,
icon=node.icon,
image=image,
config_services=config_services,
)
if isinstance(node, (DockerNode, LxcNode)):
node_proto.image = node.image
node_proto = grpcutils.get_node_proto(session, node)
nodes.append(node_proto)
node_links = get_links(session, node)
links.extend(node_links)
session_proto = core_pb2.Session(
state=session.state.value, nodes=nodes, links=links
state=session.state.value, nodes=nodes, links=links, dir=session.session_dir
)
return core_pb2.GetSessionResponse(session=session_proto)
@ -713,37 +680,12 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
logging.debug("get node: %s", request)
session = self.get_session(request.session_id, context)
node = self.get_node(session, request.node_id, context)
interfaces = []
for interface_id in node._netif:
interface = node._netif[interface_id]
interface_proto = grpcutils.interface_to_proto(interface)
interfaces.append(interface_proto)
emane_model = None
if isinstance(node, EmaneNet):
emane_model = node.model.name
services = []
if node.services:
services = [x.name for x in node.services]
position = core_pb2.Position(
x=node.position.x, y=node.position.y, z=node.position.z
)
node_type = session.get_node_type(node.__class__)
node_proto = core_pb2.Node(
id=node.id,
name=node.name,
type=node_type.value,
emane=emane_model,
model=node.type,
position=position,
services=services,
)
if isinstance(node, (DockerNode, LxcNode)):
node_proto.image = node.image
node_proto = grpcutils.get_node_proto(session, node)
return core_pb2.GetNodeResponse(node=node_proto, interfaces=interfaces)
def EditNode(

View file

@ -650,6 +650,7 @@ message Session {
SessionState.Enum state = 2;
repeated Node nodes = 3;
repeated Link links = 4;
string dir = 5;
}
message SessionSummary {
@ -657,6 +658,7 @@ message SessionSummary {
SessionState.Enum state = 2;
int32 nodes = 3;
string file = 4;
string dir = 5;
}
message Node {
@ -673,6 +675,8 @@ message Node {
string server = 11;
repeated string config_services = 12;
Geo geo = 13;
string dir = 14;
string channel = 15;
}
message Link {

212
daemon/scripts/core-route-monitor Executable file
View file

@ -0,0 +1,212 @@
#!/usr/bin/env python
import argparse
import enum
import select
import socket
import subprocess
import time
from argparse import ArgumentDefaultsHelpFormatter
from functools import cmp_to_key
from queue import Queue
from threading import Thread
from typing import Dict, Tuple
from core import utils
from core.api.grpc.client import CoreGrpcClient
from core.api.grpc.core_pb2 import NodeType
SDT_HOST = "127.0.0.1"
SDT_PORT = 50000
ROUTE_LAYER = "CORE Route"
DEAD_TIME = 3
ROUTE_TIME = 3
PACKET_CHOICES = ["udp", "tcp", "icmp"]
class RouteEnum(enum.Enum):
ADD = 0
DEL = 1
class SdtClient:
def __init__(self, address: Tuple[str, int]) -> None:
self.sock = socket.create_connection(address)
self.links = []
self.send(f'layer "{ROUTE_LAYER}"')
def close(self) -> None:
self.sock.close()
def send(self, cmd: str) -> None:
sdt_cmd = f"{cmd}\n".encode()
self.sock.sendall(sdt_cmd)
def add_link(self, node1, node2) -> None:
route_id = f"{node1}-{node2}-r"
link_id = f"{node1},{node2},{route_id}"
cmd = f'link {link_id} linkLayer "{ROUTE_LAYER}" line yellow,2'
self.send(cmd)
self.links.append(link_id)
def delete_links(self) -> None:
for link_id in self.links:
cmd = f"delete link,{link_id}"
self.send(cmd)
self.links.clear()
class RouterMonitor:
def __init__(self, src_id: str, src: str, dst: str, pkt: str,
sdt_host: str, sdt_port: int) -> None:
self.queue = Queue()
self.core = CoreGrpcClient()
self.src_id = src_id
self.src = src
self.dst = dst
self.pkt = pkt
self.seen = {}
self.running = False
self.route_time = None
self.listeners = []
self.sdt = SdtClient((sdt_host, sdt_port))
self.nodes = self.get_nodes()
def get_nodes(self) -> Dict[str, str]:
nodes = {}
with self.core.context_connect():
response = self.core.get_sessions()
sessions = response.sessions
session = None
if sessions:
session = sessions[0]
if not session:
raise Exception("no current core sessions")
print("session: ", session.dir)
response = self.core.get_session(session.id)
for node in response.session.nodes:
if node.type != NodeType.DEFAULT:
continue
nodes[node.id] = node.channel
return nodes
def start(self) -> None:
self.running = True
for node_id, node in self.nodes.items():
print("listening on node: ", node)
thread = Thread(target=self.listen, args=(node_id, node), daemon=True)
thread.start()
self.listeners.append(thread)
self.manage()
def manage(self) -> None:
self.route_time = time.monotonic()
while self.running:
route_enum, node, seen = self.queue.get()
if route_enum == RouteEnum.ADD:
self.seen[node] = seen
elif node in self.seen:
del self.seen[node]
if (time.monotonic() - self.route_time) >= ROUTE_TIME:
self.manage_routes()
self.route_time = time.monotonic()
def route_sort(self, x: Tuple[str, int], y: Tuple[str, int]) -> int:
x_node = x[0]
y_node = y[0]
if x_node == self.src_id:
return 1
if y_node == self.src_id:
return -1
x_ttl, y_ttl = x[1], y[1]
return x_ttl - y_ttl
def manage_routes(self) -> None:
self.sdt.delete_links()
if not self.seen:
return
values = sorted(self.seen.items(),
key=cmp_to_key(self.route_sort),
reverse=True)
print("current route:")
for index, node_data in enumerate(values):
next_index = index + 1
if next_index == len(values):
break
next_node_id = values[next_index][0]
node_id, ttl = node_data
print(f"{node_id} -> {next_node_id}")
self.sdt.add_link(node_id, next_node_id)
def stop(self) -> None:
self.running = False
self.sdt.delete_links()
self.sdt.close()
for thread in self.listeners:
thread.join()
self.listeners.clear()
def listen(self, node_id, node) -> None:
cmd = (
f"tcpdump -lnv src host {self.src} and dst host {self.dst} and {self.pkt}"
)
node_cmd = f"vcmd -c {node} -- {cmd}"
p = subprocess.Popen(node_cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
current = time.monotonic()
try:
while not p.poll() and self.running:
ready, _, _ = select.select([p.stdout], [], [], 1)
if ready:
line = p.stdout.readline().strip().decode()
if line:
line = line.split("ttl", 1)[1]
ttl = int(line.split(",", 1)[0])
p.stdout.readline()
self.queue.put((RouteEnum.ADD, node_id, ttl))
current = time.monotonic()
else:
if (time.monotonic() - current) >= DEAD_TIME:
self.queue.put((RouteEnum.DEL, node_id, None))
except Exception as e:
print(f"listener error: {e}")
def main() -> None:
if not utils.which("tcpdump", required=False):
print("core-route-monitor requires tcpdump to be installed")
return
parser = argparse.ArgumentParser(
description="core route monitor",
formatter_class=ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--id", required=True,
help="source node id for determining path")
parser.add_argument("--src", default="10.0.0.20",
help="source address for route monitoring")
parser.add_argument("--dst", default="10.0.2.20",
help="destination address for route monitoring")
parser.add_argument("--pkt", default="icmp", choices=PACKET_CHOICES,
help="packet type")
parser.add_argument("--sdt-host", default=SDT_HOST, help="sdt host address")
parser.add_argument("--sdt-port", type=int, default=SDT_PORT, help="sdt port")
args = parser.parse_args()
monitor = RouterMonitor(
args.id,
args.src,
args.dst,
args.pkt,
args.sdt_host,
args.sdt_port,
)
try:
monitor.start()
except KeyboardInterrupt:
monitor.stop()
print("ending route monitor")
if __name__ == "__main__":
main()