diff --git a/daemon/core/api/grpc/client.py b/daemon/core/api/grpc/client.py index 6b5343d8..ea32ffb4 100644 --- a/daemon/core/api/grpc/client.py +++ b/daemon/core/api/grpc/client.py @@ -148,6 +148,31 @@ class CoreGrpcClient: self.stub = None self.channel = None + def start_session(self, session_id, nodes, links): + """ + Start a session. + + :param int session_id: id of session + :param list nodes: list of nodes to create + :param list links: list of links to create + :return: + """ + request = core_pb2.StartSessionRequest( + session_id=session_id, nodes=nodes, links=links + ) + return self.stub.StartSession(request) + + def stop_session(self, session_id): + """ + Stop a running session. + + :param int session_id: id of session + :return: stop session response + :rtype: core_pb2.StopSessionResponse + """ + request = core_pb2.StopSessionRequest(session_id=session_id) + return self.stub.StopSession(request) + def create_session(self, session_id=None): """ Create a session. diff --git a/daemon/core/api/grpc/grpcutils.py b/daemon/core/api/grpc/grpcutils.py new file mode 100644 index 00000000..b729e584 --- /dev/null +++ b/daemon/core/api/grpc/grpcutils.py @@ -0,0 +1,55 @@ +import asyncio +import logging +import time + +from core.emulator.emudata import NodeOptions +from core.emulator.enumerations import NodeTypes + + +def add_node_data(node_proto): + _id = node_proto.id + _type = node_proto.type + if _type is None: + _type = NodeTypes.DEFAULT.value + _type = NodeTypes(_type) + + options = NodeOptions(name=node_proto.name, model=node_proto.model) + options.icon = node_proto.icon + options.opaque = node_proto.opaque + options.image = node_proto.image + options.services = node_proto.services + if node_proto.server: + options.server = node_proto.server + + position = node_proto.position + options.set_position(position.x, position.y) + options.set_location(position.lat, position.lon, position.alt) + return _type, _id, options + + +async def async_add_node(session, node_proto): + _type, _id, options = add_node_data(node_proto) + session.add_node(_type=_type, _id=_id, options=options) + + +async def create_nodes(loop, session, node_protos): + tasks = [] + for node_proto in node_protos: + task = loop.create_task(async_add_node(session, node_proto)) + tasks.append(task) + + start = time.monotonic() + results = await asyncio.gather(*tasks, return_exceptions=True) + total = time.monotonic() - start + + logging.info(f"created nodes time: {total}") + return results + + +def sync_create_nodes(session, node_protos): + start = time.monotonic() + for node_proto in node_protos: + _type, _id, options = add_node_data(node_proto) + session.add_node(_type=_type, _id=_id, options=options) + total = time.monotonic() - start + logging.info(f"created nodes time: {total}") diff --git a/daemon/core/api/grpc/server.py b/daemon/core/api/grpc/server.py index 4381184d..ff23b43e 100644 --- a/daemon/core/api/grpc/server.py +++ b/daemon/core/api/grpc/server.py @@ -9,7 +9,7 @@ from queue import Empty, Queue import grpc -from core.api.grpc import core_pb2, core_pb2_grpc +from core.api.grpc import core_pb2, core_pb2_grpc, grpcutils from core.emane.nodes import EmaneNet from core.emulator.data import ( ConfigData, @@ -260,6 +260,53 @@ class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): except CoreError: context.abort(grpc.StatusCode.NOT_FOUND, f"node {node_id} not found") + def StartSession(self, request, context): + """ + Start a session. + + :param core.api.grpc.core_pb2.StartSessionRequest request: start session request + :param context: grcp context + :return: start session response + :rtype: core.api.grpc.core_pb2.StartSessionResponse + """ + logging.debug("start session: %s", request) + session = self.get_session(request.session_id, context) + + # clear previous state and setup for creation + session.clear() + session.set_state(EventTypes.CONFIGURATION_STATE) + if not os.path.exists(session.session_dir): + os.mkdir(session.session_dir) + + # create nodes + # loop = asyncio.new_event_loop() + # asyncio.set_event_loop(loop) + # results = loop.run_until_complete( + # grpcutils.create_nodes(loop, session, request.nodes) + # ) + grpcutils.sync_create_nodes(session, request.nodes) + + # set to instantiation and start + session.set_state(EventTypes.INSTANTIATION_STATE) + session.instantiate() + + return core_pb2.StartSessionResponse(result=True) + + def StopSession(self, request, context): + """ + Stop a running session. + + :param core.api.grpc.core_pb2.StopSessionRequest request: stop session request + :param context: grcp context + :return: stop session response + :rtype: core.api.grpc.core_pb2.StopSessionResponse + """ + logging.debug("stop session: %s", request) + session = self.coreemu.create_session(request.session_id) + session.set_state(EventTypes.DATACOLLECT_STATE) + session.clear() + return core_pb2.StopSessionResponse(result=True) + def CreateSession(self, request, context): """ Create a session diff --git a/daemon/examples/grpc/large.py b/daemon/examples/grpc/large.py new file mode 100644 index 00000000..c2a4d0b6 --- /dev/null +++ b/daemon/examples/grpc/large.py @@ -0,0 +1,74 @@ +import logging + +from core.api.grpc import client, core_pb2 + + +def log_event(event): + logging.info("event: %s", event) + + +def main(): + core = client.CoreGrpcClient() + + with core.context_connect(): + # create session + response = core.create_session() + session_id = response.session_id + logging.info("created session: %s", response) + + # create nodes for session + nodes = [] + position = core_pb2.Position(x=50, y=100) + switch = core_pb2.Node(id=1, type=core_pb2.NodeType.SWITCH, position=position) + nodes.append(switch) + for i in range(2, 50): + position = core_pb2.Position(x=50 + 50 * i, y=50) + node = core_pb2.Node(id=i, position=position, model="PC") + nodes.append(node) + + # start session + links = [] + response = core.start_session(session_id, nodes, links) + logging.info("started session: %s", response) + + # handle events session may broadcast + # core.events(session_id, log_event) + + # change session state + # response = core.set_session_state( + # session_id, core_pb2.SessionState.CONFIGURATION + # ) + # logging.info("set session state: %s", response) + + # create switch node + # switch = core_pb2.Node(type=core_pb2.NodeType.SWITCH) + # response = core.add_node(session_id, switch) + # logging.info("created switch: %s", response) + # switch_id = response.node_id + + # helper to create interfaces + # interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16") + # + # for i in range(2): + # # create node + # position = core_pb2.Position(x=50 + 50 * i, y=50) + # node = core_pb2.Node(position=position) + # response = core.add_node(session_id, node) + # logging.info("created node: %s", response) + # node_id = response.node_id + # + # # create link + # interface_one = interface_helper.create_interface(node_id, 0) + # response = core.add_link(session_id, node_id, switch_id, interface_one) + # logging.info("created link: %s", response) + + # change session state + # response = core.set_session_state( + # session_id, core_pb2.SessionState.INSTANTIATION + # ) + # logging.info("set session state: %s", response) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() diff --git a/daemon/proto/core/api/grpc/core.proto b/daemon/proto/core/api/grpc/core.proto index 325c436f..2a766fc3 100644 --- a/daemon/proto/core/api/grpc/core.proto +++ b/daemon/proto/core/api/grpc/core.proto @@ -7,6 +7,10 @@ option java_outer_classname = "CoreProto"; service CoreApi { // session rpc + rpc StartSession (StartSessionRequest) returns (StartSessionResponse) { + } + rpc StopSession (StopSessionRequest) returns (StopSessionResponse) { + } rpc CreateSession (CreateSessionRequest) returns (CreateSessionResponse) { } rpc DeleteSession (DeleteSessionRequest) returns (DeleteSessionResponse) { @@ -126,6 +130,24 @@ service CoreApi { } // rpc request/response messages +message StartSessionRequest { + int32 session_id = 1; + repeated Node nodes = 2; + repeated Link links = 3; +} + +message StartSessionResponse { + bool result = 1; +} + +message StopSessionRequest { + int32 session_id = 1; +} + +message StopSessionResponse { + bool result = 1; +} + message CreateSessionRequest { int32 session_id = 1; }