Merge pull request #462 from coreemu/enhancement/grpc-node-stream

grpc: added call to stream node movements using geo/xy and tests to v…
This commit is contained in:
bharnden 2020-06-03 08:53:46 -07:00 committed by GitHub
commit 7048aa7867
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 132 additions and 4 deletions

View file

@ -5,7 +5,7 @@ gRpc client for interfacing with CORE, when gRPC mode is enabled.
import logging
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, Generator, List
from typing import Any, Callable, Dict, Generator, Iterable, List
import grpc
import netaddr
@ -571,6 +571,17 @@ class CoreGrpcClient:
)
return self.stub.EditNode(request)
def move_nodes(
self, move_iterator: Iterable[core_pb2.MoveNodesRequest]
) -> core_pb2.MoveNodesResponse:
"""
Stream node movements using the provided iterator.
:param move_iterator: iterator for generating node movements
:return: move nodes response
"""
return self.stub.MoveNodes(move_iterator)
def delete_node(self, session_id: int, node_id: int) -> core_pb2.DeleteNodeResponse:
"""
Delete node from session.

View file

@ -692,6 +692,40 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
node_proto = grpcutils.get_node_proto(session, node)
return core_pb2.GetNodeResponse(node=node_proto, interfaces=interfaces)
def MoveNodes(
self, request_iterator, context: ServicerContext
) -> core_pb2.MoveNodesResponse:
"""
Stream node movements
:param request_iterator: move nodes request iterator
:param context: context object
:return: move nodes response
"""
for request in request_iterator:
if not request.WhichOneof("move_type"):
raise CoreError("move nodes must provide a move type")
session = self.get_session(request.session_id, context)
node = self.get_node(session, request.node_id, context, NodeBase)
options = NodeOptions()
has_geo = request.HasField("geo")
if has_geo:
logging.info("has geo")
lat = request.geo.lat
lon = request.geo.lon
alt = request.geo.alt
options.set_location(lat, lon, alt)
else:
x = request.position.x
y = request.position.y
logging.info("has pos: %s,%s", x, y)
options.set_position(x, y)
session.edit_node(node.id, options)
source = request.source if request.source else None
if not has_geo:
session.broadcast_node(node, source=source)
return core_pb2.MoveNodesResponse()
def EditNode(
self, request: core_pb2.EditNodeRequest, context: ServicerContext
) -> core_pb2.EditNodeResponse:

View file

@ -31,7 +31,7 @@ class GeoLocation:
CRS_WGS84, CRS_PROJ, always_xy=True
)
self.to_geo = pyproj.Transformer.from_crs(CRS_PROJ, CRS_WGS84, always_xy=True)
self.refproj = (0.0, 0.0)
self.refproj = (0.0, 0.0, 0.0)
self.refgeo = (0.0, 0.0, 0.0)
self.refxyz = (0.0, 0.0, 0.0)
self.refscale = 1.0
@ -58,7 +58,7 @@ class GeoLocation:
self.refxyz = (0.0, 0.0, 0.0)
self.refgeo = (0.0, 0.0, 0.0)
self.refscale = 1.0
self.refproj = self.to_pixels.transform(self.refgeo[0], self.refgeo[1])
self.refproj = self.to_pixels.transform(*self.refgeo)
def pixels2meters(self, value: float) -> float:
"""

View file

@ -61,6 +61,8 @@ service CoreApi {
}
rpc GetNodeTerminal (GetNodeTerminalRequest) returns (GetNodeTerminalResponse) {
}
rpc MoveNodes (stream MoveNodesRequest) returns (MoveNodesResponse) {
}
// link rpc
rpc GetNodeLinks (GetNodeLinksRequest) returns (GetNodeLinksResponse) {
@ -446,6 +448,19 @@ message GetNodeTerminalResponse {
string terminal = 1;
}
message MoveNodesRequest {
int32 session_id = 1;
int32 node_id = 2;
string source = 3;
oneof move_type {
Position position = 4;
Geo geo = 5;
}
}
message MoveNodesResponse {
}
message NodeCommandRequest {
int32 session_id = 1;
int32 node_id = 2;

View file

@ -18,7 +18,7 @@ from core.api.tlv.dataconversion import ConfigShim
from core.api.tlv.enumerations import ConfigFlags
from core.emane.ieee80211abg import EmaneIeee80211abgModel
from core.emane.nodes import EmaneNet
from core.emulator.data import EventData
from core.emulator.data import EventData, NodeData
from core.emulator.emudata import IpPrefixes, NodeOptions
from core.emulator.enumerations import EventTypes, ExceptionLevels, NodeTypes
from core.errors import CoreError
@ -1170,3 +1170,71 @@ class TestGrpc:
# then
queue.get(timeout=5)
def test_move_nodes(self, grpc_server: CoreGrpcServer):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node(CoreNode)
x, y = 10.0, 15.0
def move_iter():
yield core_pb2.MoveNodesRequest(
session_id=session.id,
node_id=node.id,
position=core_pb2.Position(x=x, y=y),
)
# then
with client.context_connect():
client.move_nodes(move_iter())
# assert
assert node.position.x == x
assert node.position.y == y
def test_move_nodes_geo(self, grpc_server: CoreGrpcServer):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node(CoreNode)
lon, lat, alt = 10.0, 15.0, 5.0
queue = Queue()
def node_handler(node_data: NodeData):
assert node_data.longitude == lon
assert node_data.latitude == lat
assert node_data.altitude == alt
queue.put(node_data)
session.node_handlers.append(node_handler)
def move_iter():
yield core_pb2.MoveNodesRequest(
session_id=session.id,
node_id=node.id,
geo=core_pb2.Geo(lon=lon, lat=lat, alt=alt),
)
# then
with client.context_connect():
client.move_nodes(move_iter())
# assert
assert node.position.lon == lon
assert node.position.lat == lat
assert node.position.alt == alt
assert queue.get(timeout=5)
def test_move_nodes_exception(self, grpc_server: CoreGrpcServer):
# given
client = CoreGrpcClient()
grpc_server.coreemu.create_session()
def move_iter():
yield core_pb2.MoveNodesRequest()
# then
with pytest.raises(grpc.RpcError):
with client.context_connect():
client.move_nodes(move_iter())