added grpc api to core-daemon, added get sessions and get session rpc calls

This commit is contained in:
bharnden 2019-02-18 22:54:14 -08:00
parent e72f133488
commit 7aed803aae
5 changed files with 132 additions and 11 deletions

View file

@ -16,6 +16,11 @@ class CoreApiClient(object):
def get_sessions(self): def get_sessions(self):
return self.stub.GetSessions(core_pb2.SessionsRequest()) return self.stub.GetSessions(core_pb2.SessionsRequest())
def get_session(self, _id):
request = core_pb2.SessionRequest()
request.id = _id
return self.stub.GetSession(request)
@contextmanager @contextmanager
def connect(self): def connect(self):
channel = grpc.insecure_channel(self.address) channel = grpc.insecure_channel(self.address)
@ -32,6 +37,11 @@ def main():
response = client.get_sessions() response = client.get_sessions()
print("core client received: %s" % response) print("core client received: %s" % response)
if len(response.sessions) > 0:
session_data = response.sessions[0]
session = client.get_session(session_data.id)
print(session)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig() logging.basicConfig()

View file

@ -1,3 +1,5 @@
from core.enumerations import NodeTypes
from concurrent import futures from concurrent import futures
import time import time
import logging import logging
@ -6,23 +8,81 @@ import grpc
import core_pb2 import core_pb2
import core_pb2_grpc import core_pb2_grpc
from core.misc import nodeutils
_ONE_DAY_IN_SECONDS = 60 * 60 * 24 _ONE_DAY_IN_SECONDS = 60 * 60 * 24
class CoreApiServer(core_pb2_grpc.CoreApiServicer): class CoreApiServer(core_pb2_grpc.CoreApiServicer):
def __init__(self, coreemu):
super(CoreApiServer, self).__init__()
self.coreemu = coreemu
def GetSessions(self, request, context): def GetSessions(self, request, context):
response = core_pb2.SessionsResponse() response = core_pb2.SessionsResponse()
session = response.sessions.add() for session_id in self.coreemu.sessions:
session.id = 1 session = self.coreemu.sessions[session_id]
session_data = response.sessions.add()
session_data.id = session_id
session_data.state = session.state
session_data.nodes = session.get_node_count()
return response
def GetSession(self, request, context):
session = self.coreemu.sessions.get(request.id)
if not request:
pass
response = core_pb2.SessionResponse()
for node_id in session.objects:
node = session.objects[node_id]
if not isinstance(node.objid, int):
continue
node_data = response.nodes.add()
node_data.id = node.objid
node_data.name = node.name
node_data.type = nodeutils.get_node_type(node.__class__).value
model = getattr(node, "type", None)
if model:
node_data.model = model
x = node.position.x
if x is not None:
node_data.position.x = x
y = node.position.y
if y is not None:
node_data.position.y = y
z = node.position.z
if z is not None:
node_data.position.z = z
services = getattr(node, "services", [])
if services is None:
services = []
services = [x.name for x in services]
node_data.services.extend(services)
emane_model = None
if nodeutils.is_node(node, NodeTypes.EMANE):
emane_model = node.model.name
if emane_model:
node_data.emane = emane_model
links_data = node.all_link_data(0)
for link_data in links_data:
pass
# link = core_utils.convert_link(session, link_data)
# links.append(link)
return response return response
def main(): def listen(coreemu, address="[::]:50051"):
logging.info("starting grpc api: %s", address)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
core_pb2_grpc.add_CoreApiServicer_to_server(CoreApiServer(), server) core_pb2_grpc.add_CoreApiServicer_to_server(CoreApiServer(coreemu), server)
server.add_insecure_port("[::]:50051") server.add_insecure_port(address)
server.start() server.start()
try: try:
@ -30,8 +90,3 @@ def main():
time.sleep(_ONE_DAY_IN_SECONDS) time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt: except KeyboardInterrupt:
server.stop(0) server.stop(0)
if __name__ == "__main__":
logging.basicConfig()
main()

View file

@ -63,6 +63,19 @@ def get_node_class(node_type):
return _NODE_MAP[node_type] return _NODE_MAP[node_type]
def get_node_type(node_class):
"""
Retrieve the node type given a node class.
:param class node_class: node class to get type for
:return: node type
:rtype: core.enumerations.NodeTypes
"""
global _NODE_MAP
node_type_map = {v: k for k, v in _NODE_MAP.iteritems()}
return node_type_map.get(node_class)
def is_node(obj, node_types): def is_node(obj, node_types):
""" """
Validates if an object is one of the provided node types. Validates if an object is one of the provided node types.

View file

@ -4,6 +4,7 @@ package core;
service CoreApi { service CoreApi {
rpc GetSessions (SessionsRequest) returns (SessionsResponse) {} rpc GetSessions (SessionsRequest) returns (SessionsResponse) {}
rpc GetSession (SessionRequest) returns (SessionResponse) {}
} }
message SessionsRequest { message SessionsRequest {
@ -13,6 +14,41 @@ message SessionsResponse {
repeated Session sessions = 1; repeated Session sessions = 1;
} }
message Session { message SessionRequest {
int32 id = 1; int32 id = 1;
} }
message SessionResponse {
int32 state = 1;
repeated Node nodes = 2;
repeated Link links = 3;
}
message Session {
int32 id = 1;
int32 state = 2;
int32 nodes = 3;
}
message Node {
int32 id = 1;
string name = 2;
int32 type = 3;
string model = 4;
Position position = 5;
repeated string services = 6;
string emane = 7;
}
message Link {
}
message Position {
int32 x = 1;
int32 y = 2;
int32 z = 3;
float lat = 4;
float lon = 5;
float alt = 6;
}

View file

@ -9,6 +9,7 @@ import ConfigParser
import logging import logging
import optparse import optparse
import sys import sys
import threading
import time import time
from core import load_logging_config from core import load_logging_config
@ -16,6 +17,7 @@ from core import constants
from core import enumerations from core import enumerations
from core.corehandlers import CoreHandler from core.corehandlers import CoreHandler
from core.coreserver import CoreServer from core.coreserver import CoreServer
from core.grpc.server import listen
from core.misc.utils import close_onexec from core.misc.utils import close_onexec
load_logging_config() load_logging_config()
@ -52,6 +54,11 @@ def cored(cfg, use_ovs):
logging.exception("error starting main server on: %s:%s", host, port) logging.exception("error starting main server on: %s:%s", host, port)
sys.exit(1) sys.exit(1)
# initialize grpc api
grpc_thread = threading.Thread(target=listen, args=(server.coreemu,))
grpc_thread.daemon = True
grpc_thread.start()
close_onexec(server.fileno()) close_onexec(server.fileno())
logging.info("server started, listening on: %s:%s", host, port) logging.info("server started, listening on: %s:%s", host, port)
server.serve_forever() server.serve_forever()