daemon: added core player writer and player to core library and added wireless event handling, fixed issues with wireless nodes being written to xml

This commit is contained in:
Blake Harnden 2022-03-31 16:31:20 -07:00
parent ba0e4adb04
commit 5d4642006c
5 changed files with 461 additions and 230 deletions

View file

@ -1,48 +1,15 @@
#!/usr/bin/env python3
import argparse
import csv
import enum
import logging
import sched
import sys
from pathlib import Path
from threading import Thread
from typing import Callable, Dict, Optional
import grpc
from core.api.grpc.client import CoreGrpcClient, MoveNodesStreamer
from core.player import CorePlayer
logger = logging.getLogger(__name__)
@enum.unique
class Events(enum.Enum):
"""
Provides event types for processing file events.
"""
XY = enum.auto()
GEO = enum.auto()
CMD = enum.auto()
WLINK = enum.auto()
@classmethod
def get(cls, value: str) -> Optional["Events"]:
"""
Retrieves a valid event type from read input.
:param value: value to get event type for
:return: valid event type, None otherwise
"""
event = None
try:
event = cls[value]
except KeyError:
pass
return event
def path_type(value: str) -> Path:
file_path = Path(value)
if not file_path.is_file():
@ -69,205 +36,13 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()
class CorePlayer:
"""
Provides core player functionality for reading a file with timed events
and playing them out.
"""
def __init__(self, file_path: Path):
"""
Creates a CorePlayer instance.
:param file_path: file to play path
"""
self.file_path: Path = file_path
self.core: CoreGrpcClient = CoreGrpcClient()
self.session_id: Optional[int] = None
self.node_streamer: Optional[MoveNodesStreamer] = None
self.node_streamer_thread: Optional[Thread] = None
self.scheduler: sched.scheduler = sched.scheduler()
self.handlers: Dict[Events, Callable] = {
Events.XY: self.handle_xy,
Events.GEO: self.handle_geo,
Events.CMD: self.handle_cmd,
Events.WLINK: self.handle_wlink,
}
def init(self, session_id: Optional[int]) -> None:
"""
Initialize core connections, settings to or retrieving session to use.
Also setup node streamer for xy/geo movements.
:param session_id: session id to use, None for default session
:return: nothing
"""
self.core.connect()
try:
if session_id is None:
sessions = self.core.get_sessions()
if len(sessions):
session_id = sessions[0].id
if session_id is None:
logger.error("no core sessions found")
sys.exit(1)
self.session_id = session_id
logger.info("playing to session(%s)", self.session_id)
self.node_streamer = MoveNodesStreamer(self.session_id)
self.node_streamer_thread = Thread(
target=self.core.move_nodes, args=(self.node_streamer,), daemon=True
)
self.node_streamer_thread.start()
except grpc.RpcError as e:
logger.error("core is not running: %s", e.details())
sys.exit(1)
def start(self) -> None:
"""
Starts playing file, reading the csv data line by line, then handling
each line event type. Delay is tracked and calculated, while processing,
to ensure we wait for the event time to be active.
:return: nothing
"""
current_time = 0.0
with self.file_path.open("r", newline="") as f:
for row in csv.reader(f):
# determine delay
input_time = float(row[0])
delay = input_time - current_time
current_time = input_time
# determine event
event_value = row[1]
event = Events.get(event_value)
if not event:
logger.error("unknown event type: %s", ",".join(row))
continue
# get args and event functions
args = row[2:]
event_func = self.handlers.get(event)
if not event_func:
logger.error("unknown event type handler: %s", ",".join(row))
continue
logger.info(
"processing line time(%s) event(%s) args(%s)",
input_time,
event.name,
args,
)
# schedule and run event
self.scheduler.enter(delay, 1, event_func, argument=args)
self.scheduler.run()
self.stop()
def stop(self) -> None:
"""
Stop and cleanup playback.
:return: nothing
"""
logger.info("stopping playback, cleaning up")
self.node_streamer.stop()
self.node_streamer_thread.join()
self.node_streamer_thread = None
def handle_xy(self, node_id: str, x: str, y: str) -> None:
"""
Handle node xy movement event.
:param node_id: id of node to move
:param x: x position
:param y: y position
:return: nothing
"""
logger.debug("handling xy node(%s) x(%s) y(%s)", node_id, x, y)
try:
node_id = int(node_id)
x, y = float(x), float(y)
self.node_streamer.send_position(node_id, x, y)
except ValueError:
logger.error("invalid xy inputs")
def handle_geo(self, node_id: str, lon: str, lat: str, alt: str) -> None:
"""
Handle node geo movement event.
:param node_id: id of node to move
:param lon: longitude position
:param lat: latitude position
:param alt: altitude position
:return: nothing
"""
logger.debug(
"handling geo node(%s) lon(%s) lat(%s) alt(%s)",
node_id,
lon,
lat,
alt,
)
try:
node_id = int(node_id)
lon, lat, alt = float(lon), float(lat), float(alt)
self.node_streamer.send_geo(node_id, lon, lat, alt)
except ValueError:
logger.error("invalid geo inputs")
def handle_cmd(self, node_id: str, wait: str, shell: str, cmd: str) -> None:
"""
Handle node command event.
:param node_id: id of node to run command
:param wait: True to wait for successful command, False otherwise
:param shell: True to run command in shell context, False otherwise
:param cmd: command to run
:return: nothing
"""
logger.debug(
"handling cmd node(%s) wait(%s) shell(%s) cmd(%s)",
node_id,
wait,
shell,
cmd,
)
try:
node_id = int(node_id)
wait, shell = bool(int(wait)), bool(int(shell))
status, output = self.core.node_command(
self.session_id, node_id, cmd, wait, shell
)
logger.info("cmd result(%s): %s", status, output)
except ValueError:
logger.error("invalid cmd inputs")
def handle_wlink(
self,
node1_id: str,
node2_id: str,
net_id: str,
linked: str,
) -> None:
logger.debug(
"handling wlink node1(%s) node2(%s) net(%s) linked(%s)",
node1_id,
node2_id,
net_id,
linked,
)
try:
node1_id = int(node1_id)
node2_id = int(node2_id)
net_id = int(net_id)
linked = bool(int(linked))
self.core.wlan_link(self.session_id, net_id, node1_id, node2_id, linked)
except ValueError:
logger.error("invalid cmd inputs")
def main() -> None:
logging.basicConfig(level=logging.INFO)
args = parse_args()
player = CorePlayer(args.file)
player.init(args.session)
result = player.init(args.session)
if not result:
sys.exit(1)
player.start()