#!/usr/bin/env python3 import argparse import csv import enum import logging import sched import sys from pathlib import Path from threading import Thread from typing import Callable, Dict, Optional import grpc from core.api.grpc.client import CoreGrpcClient, MoveNodesStreamer logger = logging.getLogger(__name__) @enum.unique class Events(enum.Enum): """ Provides event types for processing file events. """ XY = enum.auto() GEO = enum.auto() CMD = enum.auto() WLINK = enum.auto() @classmethod def get(cls, value: str) -> Optional["Events"]: """ Retrieves a valid event type from read input. :param value: value to get event type for :return: valid event type, None otherwise """ event = None try: event = cls[value] except KeyError: pass return event def path_type(value: str) -> Path: file_path = Path(value) if not file_path.is_file(): raise argparse.ArgumentTypeError(f"file does not exist: {value}") return file_path def parse_args() -> argparse.Namespace: """ Setup and parse command line arguments. :return: parsed arguments """ parser = argparse.ArgumentParser( description="core player runs files that can move nodes and send commands", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "-f", "--file", required=True, type=path_type, help="core file to play" ) parser.add_argument( "-s", "--session", type=int, help="session to play to, first found session otherwise" ) return parser.parse_args() class CorePlayer: """ Provides core player functionality for reading a file with timed events and playing them out. """ def __init__(self, file_path: Path): """ Creates a CorePlayer instance. :param file_path: file to play path """ self.file_path: Path = file_path self.core: CoreGrpcClient = CoreGrpcClient() self.session_id: Optional[int] = None self.node_streamer: Optional[MoveNodesStreamer] = None self.node_streamer_thread: Optional[Thread] = None self.scheduler: sched.scheduler = sched.scheduler() self.handlers: Dict[Events, Callable] = { Events.XY: self.handle_xy, Events.GEO: self.handle_geo, Events.CMD: self.handle_cmd, Events.WLINK: self.handle_wlink, } def init(self, session_id: Optional[int]) -> None: """ Initialize core connections, settings to or retrieving session to use. Also setup node streamer for xy/geo movements. :param session_id: session id to use, None for default session :return: nothing """ self.core.connect() try: if session_id is None: sessions = self.core.get_sessions() if len(sessions): session_id = sessions[0].id if session_id is None: logger.error("no core sessions found") sys.exit(1) self.session_id = session_id logger.info("playing to session(%s)", self.session_id) self.node_streamer = MoveNodesStreamer(self.session_id) self.node_streamer_thread = Thread( target=self.core.move_nodes, args=(self.node_streamer,), daemon=True ) self.node_streamer_thread.start() except grpc.RpcError as e: logger.error("core is not running: %s", e.details()) sys.exit(1) def start(self) -> None: """ Starts playing file, reading the csv data line by line, then handling each line event type. Delay is tracked and calculated, while processing, to ensure we wait for the event time to be active. :return: nothing """ current_time = 0.0 with self.file_path.open("r", newline="") as f: for row in csv.reader(f): # determine delay input_time = float(row[0]) delay = input_time - current_time current_time = input_time # determine event event_value = row[1] event = Events.get(event_value) if not event: logger.error("unknown event type: %s", ",".join(row)) continue # get args and event functions args = row[2:] event_func = self.handlers.get(event) if not event_func: logger.error("unknown event type handler: %s", ",".join(row)) continue logger.info( "processing line time(%s) event(%s) args(%s)", input_time, event.name, args, ) # schedule and run event self.scheduler.enter(delay, 1, event_func, argument=args) self.scheduler.run() self.stop() def stop(self) -> None: """ Stop and cleanup playback. :return: nothing """ logger.info("stopping playback, cleaning up") self.node_streamer.stop() self.node_streamer_thread.join() self.node_streamer_thread = None def handle_xy(self, node_id: str, x: str, y: str) -> None: """ Handle node xy movement event. :param node_id: id of node to move :param x: x position :param y: y position :return: nothing """ logger.debug("handling xy node(%s) x(%s) y(%s)", node_id, x, y) try: node_id = int(node_id) x, y = float(x), float(y) self.node_streamer.send_position(node_id, x, y) except ValueError: logger.error("invalid xy inputs") def handle_geo(self, node_id: str, lon: str, lat: str, alt: str) -> None: """ Handle node geo movement event. :param node_id: id of node to move :param lon: longitude position :param lat: latitude position :param alt: altitude position :return: nothing """ logger.debug( "handling geo node(%s) lon(%s) lat(%s) alt(%s)", node_id, lon, lat, alt, ) try: node_id = int(node_id) lon, lat, alt = float(lon), float(lat), float(alt) self.node_streamer.send_geo(node_id, lon, lat, alt) except ValueError: logger.error("invalid geo inputs") def handle_cmd(self, node_id: str, wait: str, shell: str, cmd: str) -> None: """ Handle node command event. :param node_id: id of node to run command :param wait: True to wait for successful command, False otherwise :param shell: True to run command in shell context, False otherwise :param cmd: command to run :return: nothing """ logger.debug( "handling cmd node(%s) wait(%s) shell(%s) cmd(%s)", node_id, wait, shell, cmd, ) try: node_id = int(node_id) wait, shell = bool(int(wait)), bool(int(shell)) status, output = self.core.node_command( self.session_id, node_id, cmd, wait, shell ) logger.info("cmd result(%s): %s", status, output) except ValueError: logger.error("invalid cmd inputs") def handle_wlink( self, node1_id: str, node2_id: str, net_id: str, linked: str, ) -> None: logger.debug( "handling wlink node1(%s) node2(%s) net(%s) linked(%s)", node1_id, node2_id, net_id, linked, ) try: node1_id = int(node1_id) node2_id = int(node2_id) net_id = int(net_id) linked = bool(int(linked)) self.core.wlan_link(self.session_id, net_id, node1_id, node2_id, linked) except ValueError: logger.error("invalid cmd inputs") def main() -> None: logging.basicConfig(level=logging.INFO) args = parse_args() player = CorePlayer(args.file) player.init(args.session) player.start() if __name__ == "__main__": main()