grpc added service/events rpc tests

This commit is contained in:
bharnden 2019-03-25 14:03:04 -07:00
parent b15e525cc1
commit 7299abd64d
3 changed files with 246 additions and 10 deletions

View file

@ -859,15 +859,15 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer):
context.abort(grpc.StatusCode.NOT_FOUND, "service not found")
status = -1
if request.action == core_pb2.START:
if request.action == core_pb2.SERVICE_START:
status = session.services.startup_service(node, service, wait=True)
elif request.action == core_pb2.STOP:
elif request.action == core_pb2.SERVICE_STOP:
status = session.services.stop_service(node, service)
elif request.action == core_pb2.RESTART:
elif request.action == core_pb2.SERVICE_RESTART:
status = session.services.stop_service(node, service)
if not status:
status = session.services.startup_service(node, service, wait=True)
elif request.action == core_pb2.VALIDATE:
elif request.action == core_pb2.SERVICE_VALIDATE:
status = session.services.validate_service(node, service)
result = False

View file

@ -646,10 +646,10 @@ enum ServiceValidationMode {
}
enum ServiceAction {
START = 0;
STOP = 1;
RESTART = 2;
VALIDATE = 3;
SERVICE_START = 0;
SERVICE_STOP = 1;
SERVICE_RESTART = 2;
SERVICE_VALIDATE = 3;
}
enum MobilityAction {

View file

@ -1,12 +1,15 @@
import os
import time
from Queue import Queue
import pytest
from core.conf import ConfigShim
from core.data import EventData
from core.emane.ieee80211abg import EmaneIeee80211abgModel
from core.emulator.emudata import NodeOptions, LinkOptions
from core.enumerations import NodeTypes, EventTypes, ConfigFlags, ExceptionLevels
from core.grpc import core_pb2
from core.enumerations import NodeTypes, EventTypes
from core.grpc.client import CoreGrpcClient
from core.mobility import BasicRangeModel, Ns2ScriptedMobility
@ -564,3 +567,236 @@ class TestGrpc:
# then
assert response.result is True
def test_get_services(self, grpc_server):
# given
client = CoreGrpcClient()
# then
with client.context_connect():
response = client.get_services()
# then
assert len(response.services) > 0
def test_get_service_defaults(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
# then
with client.context_connect():
response = client.get_service_defaults(session.session_id)
# then
assert len(response.defaults) > 0
def test_set_service_defaults(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node_type = "test"
services = ["SSH"]
# then
with client.context_connect():
response = client.set_service_defaults(session.session_id, {node_type: services})
# then
assert response.result is True
assert session.services.default_services[node_type] == services
def test_get_node_service(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
# then
with client.context_connect():
response = client.get_node_service(session.session_id, node.objid, "IPForward")
# then
assert len(response.service.configs) > 0
def test_get_node_service_file(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
# then
with client.context_connect():
response = client.get_node_service_file(session.session_id, node.objid, "IPForward", "ipforward.sh")
# then
assert response.data is not None
def test_set_node_service(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
service_name = "IPForward"
validate = ("echo hello",)
# then
with client.context_connect():
response = client.set_node_service(session.session_id, node.objid, service_name, (), validate, ())
# then
assert response.result is True
service = session.services.get_service(node.objid, service_name, default_service=True)
assert service.validate == validate
def test_set_node_service_file(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
service_name = "IPForward"
file_name = "ipforward.sh"
file_data = "echo hello"
# then
with client.context_connect():
response = client.set_node_service_file(session.session_id, node.objid, service_name, file_name, file_data)
# then
assert response.result is True
service_file = session.services.get_service_file(node, service_name, file_name)
assert service_file.data == file_data
def test_service_action(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
service_name = "IPForward"
# then
with client.context_connect():
response = client.service_action(session.session_id, node.objid, service_name, core_pb2.SERVICE_STOP)
# then
assert response.result is True
def test_node_events(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
node_data = node.data(message_type=0)
queue = Queue()
def handle_event(event_data):
queue.put(event_data)
# then
with client.context_connect():
client.node_events(session.session_id, handle_event)
time.sleep(0.1)
session.broadcast_node(node_data)
# then
queue.get(timeout=5)
def test_link_events(self, grpc_server, ip_prefixes):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN)
node = session.add_node()
interface = ip_prefixes.create_interface(node)
session.add_link(node.objid, wlan.objid, interface)
link_data = wlan.all_link_data(0)[0]
queue = Queue()
def handle_event(event_data):
queue.put(event_data)
# then
with client.context_connect():
client.link_events(session.session_id, handle_event)
time.sleep(0.1)
session.broadcast_link(link_data)
# then
queue.get(timeout=5)
def test_session_events(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
queue = Queue()
def handle_event(event_data):
queue.put(event_data)
# then
with client.context_connect():
client.session_events(session.session_id, handle_event)
time.sleep(0.1)
event = EventData(event_type=EventTypes.RUNTIME_STATE.value, time="%s" % time.time())
session.broadcast_event(event)
# then
queue.get(timeout=5)
def test_config_events(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
queue = Queue()
def handle_event(event_data):
queue.put(event_data)
# then
with client.context_connect():
client.config_events(session.session_id, handle_event)
time.sleep(0.1)
session_config = session.options.get_configs()
config_data = ConfigShim.config_data(0, None, ConfigFlags.UPDATE.value, session.options, session_config)
session.broadcast_config(config_data)
# then
queue.get(timeout=5)
def test_exception_events(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
queue = Queue()
def handle_event(event_data):
queue.put(event_data)
# then
with client.context_connect():
client.exception_events(session.session_id, handle_event)
time.sleep(0.1)
session.exception(ExceptionLevels.FATAL, "test", None, "exception message")
# then
queue.get(timeout=5)
def test_file_events(self, grpc_server):
# given
client = CoreGrpcClient()
session = grpc_server.coreemu.create_session()
node = session.add_node()
queue = Queue()
def handle_event(event_data):
queue.put(event_data)
# then
with client.context_connect():
client.file_events(session.session_id, handle_event)
time.sleep(0.1)
file_data = session.services.get_service_file(node, "IPForward", "ipforward.sh")
session.broadcast_file(file_data)
# then
queue.get(timeout=5)