core-extra/daemon/scripts/core-player

276 lines
8.3 KiB
Text
Raw Normal View History

#!/usr/bin/env python3
import argparse
import csv
import enum
import logging
import sched
import sys
from pathlib import Path
from threading import Thread
from typing import Callable, Dict, Optional
import grpc
from core.api.grpc.client import CoreGrpcClient, MoveNodesStreamer
logger = logging.getLogger(__name__)
@enum.unique
class Events(enum.Enum):
"""
Provides event types for processing file events.
"""
XY = enum.auto()
GEO = enum.auto()
CMD = enum.auto()
WLINK = enum.auto()
@classmethod
def get(cls, value: str) -> Optional["Events"]:
"""
Retrieves a valid event type from read input.
:param value: value to get event type for
:return: valid event type, None otherwise
"""
event = None
try:
event = cls[value]
except KeyError:
pass
return event
def path_type(value: str) -> Path:
file_path = Path(value)
if not file_path.is_file():
raise argparse.ArgumentTypeError(f"file does not exist: {value}")
return file_path
def parse_args() -> argparse.Namespace:
"""
Setup and parse command line arguments.
:return: parsed arguments
"""
parser = argparse.ArgumentParser(
description="core player runs files that can move nodes and send commands",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-f", "--file", required=True, type=path_type, help="core file to play"
)
parser.add_argument(
"-s", "--session", type=int, help="session to play to, first found session otherwise"
)
return parser.parse_args()
class CorePlayer:
"""
Provides core player functionality for reading a file with timed events
and playing them out.
"""
def __init__(self, file_path: Path):
"""
Creates a CorePlayer instance.
:param file_path: file to play path
"""
self.file_path: Path = file_path
self.core: CoreGrpcClient = CoreGrpcClient()
self.session_id: Optional[int] = None
self.node_streamer: Optional[MoveNodesStreamer] = None
self.node_streamer_thread: Optional[Thread] = None
self.scheduler: sched.scheduler = sched.scheduler()
self.handlers: Dict[Events, Callable] = {
Events.XY: self.handle_xy,
Events.GEO: self.handle_geo,
Events.CMD: self.handle_cmd,
Events.WLINK: self.handle_wlink,
}
def init(self, session_id: Optional[int]) -> None:
"""
Initialize core connections, settings to or retrieving session to use.
Also setup node streamer for xy/geo movements.
:param session_id: session id to use, None for default session
:return: nothing
"""
self.core.connect()
try:
if session_id is None:
sessions = self.core.get_sessions()
if len(sessions):
session_id = sessions[0].id
if session_id is None:
logger.error("no core sessions found")
sys.exit(1)
self.session_id = session_id
logger.info("playing to session(%s)", self.session_id)
self.node_streamer = MoveNodesStreamer(self.session_id)
self.node_streamer_thread = Thread(
target=self.core.move_nodes, args=(self.node_streamer,), daemon=True
)
self.node_streamer_thread.start()
except grpc.RpcError as e:
logger.error("core is not running: %s", e.details())
sys.exit(1)
def start(self) -> None:
"""
Starts playing file, reading the csv data line by line, then handling
each line event type. Delay is tracked and calculated, while processing,
to ensure we wait for the event time to be active.
:return: nothing
"""
current_time = 0.0
with self.file_path.open("r", newline="") as f:
for row in csv.reader(f):
# determine delay
input_time = float(row[0])
delay = input_time - current_time
current_time = input_time
# determine event
event_value = row[1]
event = Events.get(event_value)
if not event:
logger.error("unknown event type: %s", ",".join(row))
continue
# get args and event functions
args = row[2:]
event_func = self.handlers.get(event)
if not event_func:
logger.error("unknown event type handler: %s", ",".join(row))
continue
logger.info(
"processing line time(%s) event(%s) args(%s)",
input_time,
event.name,
args,
)
# schedule and run event
self.scheduler.enter(delay, 1, event_func, argument=args)
self.scheduler.run()
self.stop()
def stop(self) -> None:
"""
Stop and cleanup playback.
:return: nothing
"""
logger.info("stopping playback, cleaning up")
self.node_streamer.stop()
self.node_streamer_thread.join()
self.node_streamer_thread = None
def handle_xy(self, node_id: str, x: str, y: str) -> None:
"""
Handle node xy movement event.
:param node_id: id of node to move
:param x: x position
:param y: y position
:return: nothing
"""
logger.debug("handling xy node(%s) x(%s) y(%s)", node_id, x, y)
try:
node_id = int(node_id)
x, y = float(x), float(y)
self.node_streamer.send_position(node_id, x, y)
except ValueError:
logger.error("invalid xy inputs")
def handle_geo(self, node_id: str, lon: str, lat: str, alt: str) -> None:
"""
Handle node geo movement event.
:param node_id: id of node to move
:param lon: longitude position
:param lat: latitude position
:param alt: altitude position
:return: nothing
"""
logger.debug(
"handling geo node(%s) lon(%s) lat(%s) alt(%s)",
node_id,
lon,
lat,
alt,
)
try:
node_id = int(node_id)
lon, lat, alt = float(lon), float(lat), float(alt)
self.node_streamer.send_geo(node_id, lon, lat, alt)
except ValueError:
logger.error("invalid geo inputs")
def handle_cmd(self, node_id: str, wait: str, shell: str, cmd: str) -> None:
"""
Handle node command event.
:param node_id: id of node to run command
:param wait: True to wait for successful command, False otherwise
:param shell: True to run command in shell context, False otherwise
:param cmd: command to run
:return: nothing
"""
logger.debug(
"handling cmd node(%s) wait(%s) shell(%s) cmd(%s)",
node_id,
wait,
shell,
cmd,
)
try:
node_id = int(node_id)
wait, shell = bool(int(wait)), bool(int(shell))
status, output = self.core.node_command(
self.session_id, node_id, cmd, wait, shell
)
logger.info("cmd result(%s): %s", status, output)
except ValueError:
logger.error("invalid cmd inputs")
def handle_wlink(
self,
node1_id: str,
node2_id: str,
net_id: str,
linked: str,
) -> None:
logger.debug(
"handling wlink node1(%s) node2(%s) net(%s) linked(%s)",
node1_id,
node2_id,
net_id,
linked,
)
try:
node1_id = int(node1_id)
node2_id = int(node2_id)
net_id = int(net_id)
linked = bool(int(linked))
self.core.wlan_link(self.session_id, net_id, node1_id, node2_id, linked)
except ValueError:
logger.error("invalid cmd inputs")
def main() -> None:
logging.basicConfig(level=logging.INFO)
args = parse_args()
player = CorePlayer(args.file)
player.init(args.session)
player.start()
if __name__ == "__main__":
main()