grpc: implemented wrapper stream classes for using the wrapped client

This commit is contained in:
Blake Harnden 2020-09-05 10:19:44 -07:00
parent c4a724ee10
commit 98a51ce17d
2 changed files with 86 additions and 12 deletions

View file

@ -5,7 +5,8 @@ gRpc client for interfacing with CORE.
import logging
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple
from queue import Queue
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple
import grpc
@ -30,7 +31,6 @@ from core.api.grpc.configservices_pb2 import (
from core.api.grpc.core_pb2 import ExecuteScriptRequest
from core.api.grpc.emane_pb2 import (
EmaneLinkRequest,
EmanePathlossesRequest,
GetEmaneConfigRequest,
GetEmaneEventChannelRequest,
GetEmaneModelConfigRequest,
@ -69,6 +69,42 @@ from core.api.grpc.wlan_pb2 import (
from core.emulator.data import IpPrefixes
class MoveNodesStreamer:
def __init__(self) -> None:
self.queue: Queue = Queue()
def send(self, request: Optional[wrappers.MoveNodesRequest]) -> None:
self.queue.put(request)
def next(self) -> Optional[core_pb2.MoveNodesRequest]:
request: Optional[wrappers.MoveNodesRequest] = self.queue.get()
if request:
return request.to_proto()
else:
return request
def iter(self):
return iter(self.next, None)
class EmanePathlossesStreamer:
def __init__(self) -> None:
self.queue: Queue = Queue()
def send(self, request: Optional[wrappers.EmanePathlossesRequest]) -> None:
self.queue.put(request)
def next(self) -> Optional[emane_pb2.EmanePathlossesRequest]:
request: Optional[wrappers.EmanePathlossesRequest] = self.queue.get()
if request:
return request.to_proto()
else:
return request
def iter(self):
return iter(self.next, None)
class InterfaceHelper:
"""
Convenience class to help generate IP4 and IP6 addresses for gRPC clients.
@ -627,16 +663,15 @@ class CoreGrpcClient:
response = self.stub.EditNode(request)
return response.result
# TODO: determine path to stream non proto requests
def move_nodes(self, move_iterator: Iterable[core_pb2.MoveNodesRequest]) -> None:
def move_nodes(self, streamer: MoveNodesStreamer) -> None:
"""
Stream node movements using the provided iterator.
:param move_iterator: iterator for generating node movements
:param streamer: move nodes streamer
:return: nothing
:raises grpc.RpcError: when session or nodes do not exist
"""
self.stub.MoveNodes(move_iterator)
self.stub.MoveNodes(streamer.iter())
def delete_node(self, session_id: int, node_id: int, source: str = None) -> bool:
"""
@ -1396,19 +1431,16 @@ class CoreGrpcClient:
response = self.stub.WlanLink(request)
return response.result
# TODO: determine path to stream non proto requests
def emane_pathlosses(
self, pathloss_iterator: Iterable[EmanePathlossesRequest]
) -> None:
def emane_pathlosses(self, streamer: EmanePathlossesStreamer) -> None:
"""
Stream EMANE pathloss events.
:param pathloss_iterator: iterator for sending emane pathloss events
:param streamer: emane pathlosses streamer
:return: nothing
:raises grpc.RpcError: when a pathloss event session or one of the nodes do not
exist
"""
self.stub.EmanePathlosses(pathloss_iterator)
self.stub.EmanePathlosses(streamer.iter())
def connect(self) -> None:
"""

View file

@ -934,3 +934,45 @@ class EmaneEventChannel:
return EmaneEventChannel(
group=proto.group, port=proto.port, device=proto.device
)
@dataclass
class EmanePathlossesRequest:
session_id: int
node1_id: int
rx1: float
iface1_id: int
node2_id: int
rx2: float
iface2_id: int
def to_proto(self) -> emane_pb2.EmanePathlossesRequest:
return emane_pb2.EmanePathlossesRequest(
session_id=self.session_id,
node1_id=self.node1_id,
rx1=self.rx1,
iface1_id=self.iface1_id,
node2_id=self.node2_id,
rx2=self.rx2,
iface2_id=self.iface2_id,
)
@dataclass
class MoveNodesRequest:
session_id: int
node_id: int
source: str = None
position: Position = None
geo: Geo = None
def to_proto(self) -> core_pb2.MoveNodesRequest:
position = self.position.to_proto() if self.position else None
geo = self.geo.to_proto() if self.geo else None
return core_pb2.MoveNodesRequest(
session_id=self.session_id,
node_id=self.node_id,
source=self.source,
position=position,
geo=geo,
)