diff --git a/.gitignore b/.gitignore index ba78096f..dbf9e4c3 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,10 @@ configure debian stamp-h1 +# generated protobuf files +daemon/core/grpc/core_pb2.py +daemon/core/grpc/core_pb2_grpc.py + # python build directory dist diff --git a/Makefile.am b/Makefile.am index 7626ced9..42dd4af6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -56,7 +56,7 @@ fpm -s dir -t $1 -n core-gui \ -m "$(PACKAGE_MAINTAINERS)" \ --license "BSD" \ --description "Common Open Research Emulator GUI front-end" \ - --url http://www.nrl.navy.mil/itd/ncs/products/core \ + --url https://github.com/coreemu/core \ --vendor "$(PACKAGE_VENDOR)" \ -p core-gui_VERSION_ARCH.$1 \ -v $(PACKAGE_VERSION) \ diff --git a/README.md b/README.md index 9db7e9ab..c5f7e182 100644 --- a/README.md +++ b/README.md @@ -32,12 +32,6 @@ GitHub integration. This allows for more dynamic conversations and the capability to respond faster. Feel free to join us at the link below. -You can also get help with questions, comments, or trouble, by using -the CORE mailing lists: - -* [core-users](https://pf.itd.nrl.navy.mil/mailman/listinfo/core-users) for general comments and questions -* [core-dev](https://pf.itd.nrl.navy.mil/mailman/listinfo/core-dev) for bugs, compile errors, and other development issues - ## Building CORE See [CORE Installation](http://coreemu.github.io/core/install.html) for detailed build instructions. diff --git a/configure.ac b/configure.ac index 10c1c245..79c5ecc0 100644 --- a/configure.ac +++ b/configure.ac @@ -240,6 +240,7 @@ AC_CONFIG_FILES([Makefile daemon/Makefile daemon/doc/Makefile daemon/doc/conf.py + daemon/proto/Makefile netns/Makefile netns/version.h ns3/Makefile],) diff --git a/daemon/Makefile.am b/daemon/Makefile.am index a5fe9cb9..769098bb 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -11,9 +11,11 @@ SETUPPY = setup.py SETUPPYFLAGS = -v if WANT_DOCS - SUBDIRS = doc + DOCS = doc endif +SUBDIRS = proto $(DOCS) + SCRIPT_FILES := $(notdir $(wildcard scripts/*)) MAN_FILES := $(notdir $(wildcard ../man/*.1)) diff --git a/daemon/core/broker.py b/daemon/core/broker.py index a1c45bd6..19d9713b 100644 --- a/daemon/core/broker.py +++ b/daemon/core/broker.py @@ -389,7 +389,7 @@ class CoreBroker(object): sid = self.session_id_master if sid is None: # this is the master session - sid = self.session.session_id + sid = self.session.id key = (sid << 16) ^ hash(n1num) ^ (hash(n2num) << 8) return key & 0xFFFFFFFF @@ -697,7 +697,7 @@ class CoreBroker(object): tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.DATA_TYPES.value, (ConfigDataTypes.STRING.value,)) tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.VALUES.value, "%s:%s:%s" % (server.name, server.host, server.port)) - tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.SESSION.value, "%s" % self.session.session_id) + tlvdata += coreapi.CoreConfigTlv.pack(ConfigTlvs.SESSION.value, "%s" % self.session.id) msg = coreapi.CoreConfMessage.pack(0, tlvdata) server.sock.send(msg) @@ -973,7 +973,7 @@ class CoreBroker(object): filename = os.path.join(self.session.session_dir, "servers") master = self.session_id_master if master is None: - master = self.session.session_id + master = self.session.id try: with open(filename, "w") as f: f.write("master=%s\n" % master) diff --git a/daemon/core/constants.py.in b/daemon/core/constants.py.in index f4083518..0cb3750e 100644 --- a/daemon/core/constants.py.in +++ b/daemon/core/constants.py.in @@ -10,9 +10,9 @@ FRR_STATE_DIR = "@CORE_STATE_DIR@/run/frr" def which(command): for path in os.environ["PATH"].split(os.pathsep): - command_path = os.path.join(path, command) - if os.path.isfile(command_path) and os.access(command_path, os.X_OK): - return command_path + command_path = os.path.join(path, command) + if os.path.isfile(command_path) and os.access(command_path, os.X_OK): + return command_path VNODED_BIN = which("vnoded") diff --git a/daemon/core/corehandlers.py b/daemon/core/corehandlers.py index 4a38bd3c..7c4a377c 100644 --- a/daemon/core/corehandlers.py +++ b/daemon/core/corehandlers.py @@ -57,7 +57,6 @@ class CoreHandler(SocketServer.BaseRequestHandler): :param request: request object :param str client_address: client address :param CoreServer server: core server instance - :return: """ self.done = False self.message_handlers = { @@ -140,7 +139,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.session.broker.session_clients.remove(self) if not self.session.broker.session_clients and not self.session.is_active(): logging.info("no session clients left and not active, initiating shutdown") - self.coreemu.delete_session(self.session.session_id) + self.coreemu.delete_session(self.session.id) return SocketServer.BaseRequestHandler.finish(self) @@ -160,9 +159,9 @@ class CoreHandler(SocketServer.BaseRequestHandler): num_sessions = 0 with self._sessions_lock: - for session_id, session in self.coreemu.sessions.iteritems(): + for _id, session in self.coreemu.sessions.iteritems(): num_sessions += 1 - id_list.append(str(session_id)) + id_list.append(str(_id)) name = session.name if not name: @@ -320,13 +319,16 @@ class CoreHandler(SocketServer.BaseRequestHandler): :return: nothing """ logging.debug("handling broadcast link: %s", link_data) + per = "" + if link_data.per is not None: + per = str(link_data.per) tlv_data = structutils.pack_values(coreapi.CoreLinkTlv, [ (LinkTlvs.N1_NUMBER, link_data.node1_id), (LinkTlvs.N2_NUMBER, link_data.node2_id), (LinkTlvs.DELAY, link_data.delay), (LinkTlvs.BANDWIDTH, link_data.bandwidth), - (LinkTlvs.PER, link_data.per), + (LinkTlvs.PER, per), (LinkTlvs.DUP, link_data.dup), (LinkTlvs.JITTER, link_data.jitter), (LinkTlvs.MER, link_data.mer), @@ -369,7 +371,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): :return: register message data """ - logging.info("GUI has connected to session %d at %s", self.session.session_id, time.ctime()) + logging.info("GUI has connected to session %d at %s", self.session.id, time.ctime()) tlv_data = "" tlv_data += coreapi.CoreRegisterTlv.pack(RegisterTlvs.EXECUTE_SERVER.value, "core-daemon") @@ -535,7 +537,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): # TODO: add shutdown handler for session self.session = self.coreemu.create_session(port, master=False) # self.session.shutdown_handlers.append(self.session_shutdown) - logging.debug("created new session for client: %s", self.session.session_id) + logging.debug("created new session for client: %s", self.session.id) # TODO: hack to associate this handler with this sessions broker for broadcasting # TODO: broker needs to be pulled out of session to the server/handler level @@ -589,7 +591,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): :return: """ exception_data = ExceptionData( - session=str(self.session.session_id), + session=str(self.session.id), node=node, date=time.ctime(), level=level.value, @@ -847,7 +849,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): try: session.open_xml(file_name, start=True) except: - self.coreemu.delete_session(session.session_id) + self.coreemu.delete_session(session.id) raise else: thread = threading.Thread( @@ -905,7 +907,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): # find the session containing this client and set the session to master for session in self.coreemu.sessions.itervalues(): if self in session.broker.session_clients: - logging.debug("setting session to master: %s", session.session_id) + logging.debug("setting session to master: %s", session.id) session.master = True break @@ -1588,7 +1590,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): logging.warn("session %s not found", session_id) continue - logging.info("request to modify to session: %s", session.session_id) + logging.info("request to modify to session: %s", session.id) if names is not None: session.name = names[index] @@ -1621,7 +1623,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.remove_session_handlers() self.session.broker.session_clients.remove(self) if not self.session.broker.session_clients and not self.session.is_active(): - self.coreemu.delete_session(self.session.session_id) + self.coreemu.delete_session(self.session.id) # set session to join self.session = session @@ -1724,7 +1726,7 @@ class CoreHandler(SocketServer.BaseRequestHandler): type=ConfigFlags.UPDATE.value, data_types=data_types, data_values=values, - session=str(self.session.session_id), + session=str(self.session.id), opaque=opaque ) self.session.broadcast_config(config_data) @@ -1758,15 +1760,17 @@ class CoreHandler(SocketServer.BaseRequestHandler): self.session.broadcast_config(config_data) # send session metadata - data_values = "|".join(["%s=%s" % item for item in self.session.metadata.get_configs().iteritems()]) - data_types = tuple(ConfigDataTypes.STRING.value for _ in self.session.metadata.get_configs()) - config_data = ConfigData( - message_type=0, - object=self.session.metadata.name, - type=ConfigFlags.NONE.value, - data_types=data_types, - data_values=data_values - ) - self.session.broadcast_config(config_data) + metadata_configs = self.session.metadata.get_configs() + if metadata_configs: + data_values = "|".join(["%s=%s" % item for item in metadata_configs.iteritems()]) + data_types = tuple(ConfigDataTypes.STRING.value for _ in self.session.metadata.get_configs()) + config_data = ConfigData( + message_type=0, + object=self.session.metadata.name, + type=ConfigFlags.NONE.value, + data_types=data_types, + data_values=data_values + ) + self.session.broadcast_config(config_data) logging.info("informed GUI about %d nodes and %d links", len(nodes_data), len(links_data)) diff --git a/daemon/core/coreobj.py b/daemon/core/coreobj.py index 91492cd2..daf5d9da 100644 --- a/daemon/core/coreobj.py +++ b/daemon/core/coreobj.py @@ -570,7 +570,8 @@ class PyCoreNet(PyCoreObj): delay=netif.getparam("delay"), bandwidth=netif.getparam("bw"), dup=netif.getparam("duplicate"), - jitter=netif.getparam("jitter") + jitter=netif.getparam("jitter"), + per=netif.getparam("loss") ) all_links.append(link_data) @@ -587,7 +588,8 @@ class PyCoreNet(PyCoreObj): delay=netif.getparam("delay"), bandwidth=netif.getparam("bw"), dup=netif.getparam("duplicate"), - jitter=netif.getparam("jitter") + jitter=netif.getparam("jitter"), + per=netif.getparam("loss") ) netif.swapparams('_params_up') diff --git a/daemon/core/emane/bypass.py b/daemon/core/emane/bypass.py index 42340f2b..91a01b37 100644 --- a/daemon/core/emane/bypass.py +++ b/daemon/core/emane/bypass.py @@ -29,6 +29,11 @@ class EmaneBypassModel(emanemodel.EmaneModel): phy_library = "bypassphylayer" phy_config = [] + @classmethod + def load(cls, emane_prefix): + # ignore default logic + pass + # override config groups @classmethod def config_groups(cls): diff --git a/daemon/core/emane/commeffect.py b/daemon/core/emane/commeffect.py index 37fe0f67..62676c16 100644 --- a/daemon/core/emane/commeffect.py +++ b/daemon/core/emane/commeffect.py @@ -37,13 +37,18 @@ class EmaneCommEffectModel(emanemodel.EmaneModel): name = "emane_commeffect" shim_library = "commeffectshim" - shim_xml = "/usr/share/emane/manifest/commeffectshim.xml" + shim_xml = "commeffectshim.xml" shim_defaults = {} - config_shim = emanemanifest.parse(shim_xml, shim_defaults) + config_shim = [] # comm effect does not need the default phy and external configurations - phy_config = () - external_config = () + phy_config = [] + external_config = [] + + @classmethod + def load(cls, emane_prefix): + shim_xml_path = os.path.join(emane_prefix, "share/emane/manifest", cls.shim_xml) + cls.config_shim = emanemanifest.parse(shim_xml_path, cls.shim_defaults) @classmethod def configurations(cls): diff --git a/daemon/core/emane/emanemanager.py b/daemon/core/emane/emanemanager.py index 4255b7de..a7c9b121 100644 --- a/daemon/core/emane/emanemanager.py +++ b/daemon/core/emane/emanemanager.py @@ -51,6 +51,7 @@ EMANE_MODELS = [ EmaneBypassModel, EmaneTdmaModel ] +DEFAULT_EMANE_PREFIX = "/usr" class EmaneManager(ModelManager): @@ -212,6 +213,8 @@ class EmaneManager(ModelManager): """ for emane_model in emane_models: logging.info("loading emane model: %s", emane_model.__name__) + emane_prefix = self.session.options.get_config("emane_prefix", default=DEFAULT_EMANE_PREFIX) + emane_model.load(emane_prefix) self.models[emane_model.name] = emane_model def add_node(self, emane_node): diff --git a/daemon/core/emane/emanemodel.py b/daemon/core/emane/emanemodel.py index 284ef84a..01bf1835 100644 --- a/daemon/core/emane/emanemodel.py +++ b/daemon/core/emane/emanemodel.py @@ -26,13 +26,13 @@ class EmaneModel(WirelessModel): # default phy configuration settings, using the universal model phy_library = None - phy_xml = "/usr/share/emane/manifest/emanephy.xml" + phy_xml = "emanephy.xml" phy_defaults = { "subid": "1", "propagationmodel": "2ray", "noisemode": "none" } - phy_config = emanemanifest.parse(phy_xml, phy_defaults) + phy_config = [] # support for external configurations external_config = [ @@ -43,12 +43,42 @@ class EmaneModel(WirelessModel): config_ignore = set() + @classmethod + def load(cls, emane_prefix): + """ + Called after being loaded within the EmaneManager. Provides configured emane_prefix for + parsing xml files. + + :param str emane_prefix: configured emane prefix path + :return: nothing + """ + manifest_path = "share/emane/manifest" + # load mac configuration + mac_xml_path = os.path.join(emane_prefix, manifest_path, cls.mac_xml) + cls.mac_config = emanemanifest.parse(mac_xml_path, cls.mac_defaults) + + # load phy configuration + phy_xml_path = os.path.join(emane_prefix, manifest_path, cls.phy_xml) + cls.phy_config = emanemanifest.parse(phy_xml_path, cls.phy_defaults) + @classmethod def configurations(cls): + """ + Returns the combination all all configurations (mac, phy, and external). + + :return: all configurations + :rtype: list[Configuration] + """ return cls.mac_config + cls.phy_config + cls.external_config @classmethod def config_groups(cls): + """ + Returns the defined configuration groups. + + :return: list of configuration groups. + :rtype: list[ConfigGroup] + """ mac_len = len(cls.mac_config) phy_len = len(cls.phy_config) + mac_len config_len = len(cls.configurations()) diff --git a/daemon/core/emane/ieee80211abg.py b/daemon/core/emane/ieee80211abg.py index 9b4e6ea6..e99ebe3b 100644 --- a/daemon/core/emane/ieee80211abg.py +++ b/daemon/core/emane/ieee80211abg.py @@ -1,8 +1,8 @@ """ ieee80211abg.py: EMANE IEEE 802.11abg model for CORE """ +import os -from core.emane import emanemanifest from core.emane import emanemodel @@ -12,8 +12,12 @@ class EmaneIeee80211abgModel(emanemodel.EmaneModel): # mac configuration mac_library = "ieee80211abgmaclayer" - mac_xml = "/usr/share/emane/manifest/ieee80211abgmaclayer.xml" - mac_defaults = { - "pcrcurveuri": "/usr/share/emane/xml/models/mac/ieee80211abg/ieee80211pcr.xml", - } - mac_config = emanemanifest.parse(mac_xml, mac_defaults) + mac_xml = "ieee80211abgmaclayer.xml" + + @classmethod + def load(cls, emane_prefix): + cls.mac_defaults["pcrcurveuri"] = os.path.join( + emane_prefix, + "share/emane/xml/models/mac/ieee80211abg/ieee80211pcr.xml" + ) + super(EmaneIeee80211abgModel, cls).load(emane_prefix) diff --git a/daemon/core/emane/rfpipe.py b/daemon/core/emane/rfpipe.py index 3c600ddd..4942d89e 100644 --- a/daemon/core/emane/rfpipe.py +++ b/daemon/core/emane/rfpipe.py @@ -1,8 +1,8 @@ """ rfpipe.py: EMANE RF-PIPE model for CORE """ +import os -from core.emane import emanemanifest from core.emane import emanemodel @@ -12,8 +12,12 @@ class EmaneRfPipeModel(emanemodel.EmaneModel): # mac configuration mac_library = "rfpipemaclayer" - mac_xml = "/usr/share/emane/manifest/rfpipemaclayer.xml" - mac_defaults = { - "pcrcurveuri": "/usr/share/emane/xml/models/mac/rfpipe/rfpipepcr.xml", - } - mac_config = emanemanifest.parse(mac_xml, mac_defaults) + mac_xml = "rfpipemaclayer.xml" + + @classmethod + def load(cls, emane_prefix): + cls.mac_defaults["pcrcurveuri"] = os.path.join( + emane_prefix, + "share/emane/xml/models/mac/rfpipe/rfpipepcr.xml" + ) + super(EmaneRfPipeModel, cls).load(emane_prefix) diff --git a/daemon/core/emane/tdma.py b/daemon/core/emane/tdma.py index 832f5d13..b599638d 100644 --- a/daemon/core/emane/tdma.py +++ b/daemon/core/emane/tdma.py @@ -7,7 +7,6 @@ import os from core import constants from core.conf import Configuration -from core.emane import emanemanifest from core.emane import emanemodel from core.enumerations import ConfigDataTypes from core.misc import utils @@ -19,26 +18,30 @@ class EmaneTdmaModel(emanemodel.EmaneModel): # mac configuration mac_library = "tdmaeventschedulerradiomodel" - mac_xml = "/usr/share/emane/manifest/tdmaeventschedulerradiomodel.xml" - mac_defaults = { - "pcrcurveuri": "/usr/share/emane/xml/models/mac/tdmaeventscheduler/tdmabasemodelpcr.xml", - } - mac_config = emanemanifest.parse(mac_xml, mac_defaults) + mac_xml = "tdmaeventschedulerradiomodel.xml" # add custom schedule options and ignore it when writing emane xml schedule_name = "schedule" default_schedule = os.path.join(constants.CORE_DATA_DIR, "examples", "tdma", "schedule.xml") - mac_config.insert( - 0, - Configuration( - _id=schedule_name, - _type=ConfigDataTypes.STRING, - default=default_schedule, - label="TDMA schedule file (core)" - ) - ) config_ignore = {schedule_name} + @classmethod + def load(cls, emane_prefix): + cls.mac_defaults["pcrcurveuri"] = os.path.join( + emane_prefix, + "share/emane/xml/models/mac/tdmaeventscheduler/tdmabasemodelpcr.xml" + ) + super(EmaneTdmaModel, cls).load(emane_prefix) + cls.mac_config.insert( + 0, + Configuration( + _id=cls.schedule_name, + _type=ConfigDataTypes.STRING, + default=cls.default_schedule, + label="TDMA schedule file (core)" + ) + ) + def post_startup(self): """ Logic to execute after the emane manager is finished with startup. diff --git a/daemon/core/emulator/coreemu.py b/daemon/core/emulator/coreemu.py index d0b562cb..9f7e128a 100644 --- a/daemon/core/emulator/coreemu.py +++ b/daemon/core/emulator/coreemu.py @@ -118,8 +118,8 @@ class IdGen(object): class EmuSession(Session): - def __init__(self, session_id, config=None, mkdir=True): - super(EmuSession, self).__init__(session_id, config, mkdir) + def __init__(self, _id, config=None, mkdir=True): + super(EmuSession, self).__init__(_id, config, mkdir) # object management self.node_id_gen = IdGen() @@ -620,7 +620,7 @@ class EmuSession(Session): :return: nothing """ - logging.info("session(%s) shutting down", self.session_id) + logging.info("session(%s) shutting down", self.id) self.set_state(EventTypes.DATACOLLECT_STATE, send_event=True) self.set_state(EventTypes.SHUTDOWN_STATE, send_event=True) super(EmuSession, self).shutdown() @@ -647,7 +647,7 @@ class EmuSession(Session): :return: True if active, False otherwise """ result = self.state in {EventTypes.RUNTIME_STATE.value, EventTypes.DATACOLLECT_STATE.value} - logging.info("session(%s) checking if active: %s", self.session_id, result) + logging.info("session(%s) checking if active: %s", self.id, result) return result def open_xml(self, file_name, start=False): diff --git a/daemon/core/emulator/emudata.py b/daemon/core/emulator/emudata.py index e224209d..2d70d367 100644 --- a/daemon/core/emulator/emudata.py +++ b/daemon/core/emulator/emudata.py @@ -183,13 +183,13 @@ class InterfaceData(object): """ Creates an InterfaceData object. - :param int _id: - :param str name: - :param str mac: - :param str ip4: - :param int ip4_mask: - :param str ip6: - :param int ip6_mask: + :param int _id: interface id + :param str name: name for interface + :param core.misc.ipaddress.MacAddress mac: mac address + :param str ip4: ipv4 address + :param int ip4_mask: ipv4 bit mask + :param str ip6: ipv6 address + :param int ip6_mask: ipv6 bit mask """ self.id = _id self.name = name diff --git a/daemon/core/grpc/__init__.py b/daemon/core/grpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/daemon/core/grpc/client.py b/daemon/core/grpc/client.py new file mode 100644 index 00000000..a6863e37 --- /dev/null +++ b/daemon/core/grpc/client.py @@ -0,0 +1,831 @@ +""" +gRpc client for interfacing with CORE, when gRPC mode is enabled. +""" + +from __future__ import print_function + +import logging +import threading +from contextlib import contextmanager + +import grpc + +from core.grpc import core_pb2 +from core.grpc import core_pb2_grpc +from core.misc.ipaddress import Ipv4Prefix, Ipv6Prefix, MacAddress + + +class InterfaceHelper(object): + """ + Convenience class to help generate IP4 and IP6 addresses for gRPC clients. + """ + + def __init__(self, ip4_prefix=None, ip6_prefix=None): + """ + Creates an InterfaceHelper object. + + :param str ip4_prefix: ip4 prefix to use for generation + :param str ip6_prefix: ip6 prefix to use for generation + :raises ValueError: when both ip4 and ip6 prefixes have not been provided + """ + if not ip4_prefix and not ip6_prefix: + raise ValueError("ip4 or ip6 must be provided") + + self.ip4 = None + if ip4_prefix: + self.ip4 = Ipv4Prefix(ip4_prefix) + self.ip6 = None + if ip6_prefix: + self.ip6 = Ipv6Prefix(ip6_prefix) + + def ip4_address(self, node_id): + """ + Convenience method to return the IP4 address for a node. + + :param int node_id: node id to get IP4 address for + :return: IP4 address or None + :rtype: str + """ + if not self.ip4: + raise ValueError("ip4 prefixes have not been set") + return str(self.ip4.addr(node_id)) + + def ip6_address(self, node_id): + """ + Convenience method to return the IP6 address for a node. + + :param int node_id: node id to get IP6 address for + :return: IP4 address or None + :rtype: str + """ + if not self.ip6: + raise ValueError("ip6 prefixes have not been set") + return str(self.ip6.addr(node_id)) + + def create_interface(self, node_id, interface_id, name=None, mac=None): + """ + Creates interface data for linking nodes, using the nodes unique id for generation, along with a random + mac address, unless provided. + + :param int node_id: node id to create interface for + :param int interface_id: interface id for interface + :param str name: name to set for interface, default is eth{id} + :param str mac: mac address to use for this interface, default is random generation + :return: new interface data for the provided node + :rtype: core_pb2.Interface + """ + # generate ip4 data + ip4 = None + ip4_mask = None + if self.ip4: + ip4 = str(self.ip4.addr(node_id)) + ip4_mask = self.ip4.prefixlen + + # generate ip6 data + ip6 = None + ip6_mask = None + if self.ip6: + ip6 = str(self.ip6.addr(node_id)) + ip6_mask = self.ip6.prefixlen + + # random mac + if not mac: + mac = MacAddress.random() + + return core_pb2.Interface( + id=interface_id, + name=name, + ip4=ip4, + ip4mask=ip4_mask, + ip6=ip6, + ip6mask=ip6_mask, + mac=str(mac) + ) + + +def stream_listener(stream, handler): + """ + Listen for stream events and provide them to the handler. + + :param stream: grpc stream that will provide events + :param handler: function that handles an event + :return: nothing + """ + try: + for event in stream: + handler(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.CANCELLED: + logging.debug("stream closed") + else: + logging.exception("stream error") + + +def start_streamer(stream, handler): + """ + Convenience method for starting a grpc stream thread for handling streamed events. + + :param stream: grpc stream that will provide events + :param handler: function that handles an event + :return: nothing + """ + thread = threading.Thread(target=stream_listener, args=(stream, handler)) + thread.daemon = True + thread.start() + + +class CoreGrpcClient(object): + """ + Provides convenience methods for interfacing with the CORE grpc server. + """ + + def __init__(self, address="localhost:50051"): + """ + Creates a CoreGrpcClient instance. + + :param str address: grpc server address to connect to + """ + self.address = address + self.stub = None + self.channel = None + + def create_session(self, _id=None): + """ + Create a session. + + :param int _id: id for session, default is None and one will be created for you + :return: response with created session id + :rtype: core_pb2.CreateSessionResponse + """ + request = core_pb2.CreateSessionRequest(id=_id) + return self.stub.CreateSession(request) + + def delete_session(self, _id): + """ + Delete a session. + + :param int _id: id of session + :return: response with result of deletion success or failure + :rtype: core_pb2.DeleteSessionResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.DeleteSessionRequest(id=_id) + return self.stub.DeleteSession(request) + + def get_sessions(self): + """ + Retrieves all currently known sessions. + + :return: response with a list of currently known session, their state and number of nodes + :rtype: core_pb2.GetSessionsResponse + """ + return self.stub.GetSessions(core_pb2.GetSessionsRequest()) + + def get_session(self, _id): + """ + Retrieve a session. + + :param int _id: id of session + :return: response with sessions state, nodes, and links + :rtype: core_pb2.GetSessionResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetSessionRequest(id=_id) + return self.stub.GetSession(request) + + def get_session_options(self, _id): + """ + Retrieve session options. + + :param int _id: id of session + :return: response with a list of configuration groups + :rtype: core_pb2.GetSessionOptionsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetSessionOptionsRequest(id=_id) + return self.stub.GetSessionOptions(request) + + def set_session_options(self, _id, config): + """ + Set options for a session. + + :param int _id: id of session + :param dict[str, str] config: configuration values to set + :return: response with result of success or failure + :rtype: core_pb2.SetSessionOptionsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.SetSessionOptionsRequest(id=_id, config=config) + return self.stub.SetSessionOptions(request) + + def get_session_location(self, _id): + """ + Get session location. + + :param int _id: id of session + :return: response with session position reference and scale + :rtype: core_pb2.GetSessionLocationResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetSessionLocationRequest(id=_id) + return self.stub.GetSessionLocation(request) + + def set_session_location(self, _id, x=None, y=None, z=None, lat=None, lon=None, alt=None, scale=None): + """ + Set session location. + + :param int _id: id of session + :param float x: x position + :param float y: y position + :param float z: z position + :param float lat: latitude position + :param float lon: longitude position + :param float alt: altitude position + :param float scale: geo scale + :return: response with result of success or failure + :rtype: core_pb2.SetSessionLocationResponse + :raises grpc.RpcError: when session doesn't exist + """ + position = core_pb2.Position(x=x, y=y, z=z, lat=lat, lon=lon, alt=alt) + request = core_pb2.SetSessionLocationRequest(id=_id, position=position, scale=scale) + return self.stub.SetSessionLocation(request) + + def set_session_state(self, _id, state): + """ + Set session state. + + :param int _id: id of session + :param core_pb2.SessionState state: session state to transition to + :return: response with result of success or failure + :rtype: core_pb2.SetSessionStateResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.SetSessionStateRequest(id=_id, state=state) + return self.stub.SetSessionState(request) + + def node_events(self, _id, handler): + """ + Listen for session node events. + + :param int _id: id of session + :param handler: handler for every event + :return: nothing + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.NodeEventsRequest(id=_id) + stream = self.stub.NodeEvents(request) + start_streamer(stream, handler) + + def link_events(self, _id, handler): + """ + Listen for session link events. + + :param int _id: id of session + :param handler: handler for every event + :return: nothing + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.LinkEventsRequest(id=_id) + stream = self.stub.LinkEvents(request) + start_streamer(stream, handler) + + def session_events(self, _id, handler): + """ + Listen for session events. + + :param int _id: id of session + :param handler: handler for every event + :return: nothing + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.SessionEventsRequest(id=_id) + stream = self.stub.SessionEvents(request) + start_streamer(stream, handler) + + def config_events(self, _id, handler): + """ + Listen for session config events. + + :param int _id: id of session + :param handler: handler for every event + :return: nothing + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.ConfigEventsRequest(id=_id) + stream = self.stub.ConfigEvents(request) + start_streamer(stream, handler) + + def exception_events(self, _id, handler): + """ + Listen for session exception events. + + :param int _id: id of session + :param handler: handler for every event + :return: nothing + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.ExceptionEventsRequest(id=_id) + stream = self.stub.ExceptionEvents(request) + start_streamer(stream, handler) + + def file_events(self, _id, handler): + """ + Listen for session file events. + + :param int _id: id of session + :param handler: handler for every event + :return: nothing + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.FileEventsRequest(id=_id) + stream = self.stub.FileEvents(request) + start_streamer(stream, handler) + + def add_node(self, session, node): + """ + Add node to session. + + :param int session: session id + :param core_pb2.Node node: node to add + :return: response with node id + :rtype: core_pb2.AddNodeResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.AddNodeRequest(session=session, node=node) + return self.stub.AddNode(request) + + def get_node(self, session, _id): + """ + Get node details. + + :param int session: session id + :param int _id: node id + :return: response with node details + :rtype: core_pb2.GetNodeResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.GetNodeRequest(session=session, id=_id) + return self.stub.GetNode(request) + + def edit_node(self, session, _id, position): + """ + Edit a node, currently only changes position. + + :param int session: session id + :param int _id: node id + :param core_pb2.Position position: position to set node to + :return: response with result of success or failure + :rtype: core_pb2.EditNodeResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.EditNodeRequest(session=session, id=_id, position=position) + return self.stub.EditNode(request) + + def delete_node(self, session, _id): + """ + Delete node from session. + + :param int session: session id + :param int _id: node id + :return: response with result of success or failure + :rtype: core_pb2.DeleteNodeResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.DeleteNodeRequest(session=session, id=_id) + return self.stub.DeleteNode(request) + + def get_node_links(self, session, _id): + """ + Get current links for a node. + + :param int session: session id + :param int _id: node id + :return: response with a list of links + :rtype: core_pb2.GetNodeLinksResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.GetNodeLinksRequest(session=session, id=_id) + return self.stub.GetNodeLinks(request) + + def add_link(self, session, node_one, node_two, interface_one=None, interface_two=None, options=None): + """ + Add a link between nodes. + + :param int session: session id + :param int node_one: node one id + :param int node_two: node two id + :param core_pb2.Interface interface_one: node one interface data + :param core_pb2.Interface interface_two: node two interface data + :param core_pb2.LinkOptions options: options for link (jitter, bandwidth, etc) + :return: response with result of success or failure + :rtype: core_pb2.AddLinkResponse + :raises grpc.RpcError: when session or one of the nodes don't exist + """ + link = core_pb2.Link( + node_one=node_one, node_two=node_two, type=core_pb2.LINK_WIRED, + interface_one=interface_one, interface_two=interface_two, options=options) + request = core_pb2.AddLinkRequest(session=session, link=link) + return self.stub.AddLink(request) + + def edit_link(self, session, node_one, node_two, options, interface_one=None, interface_two=None): + """ + Edit a link between nodes. + + :param int session: session id + :param int node_one: node one id + :param int node_two: node two id + :param core_pb2.LinkOptions options: options for link (jitter, bandwidth, etc) + :param int interface_one: node one interface id + :param int interface_two: node two interface id + :return: response with result of success or failure + :rtype: core_pb2.EditLinkResponse + :raises grpc.RpcError: when session or one of the nodes don't exist + """ + request = core_pb2.EditLinkRequest( + session=session, node_one=node_one, node_two=node_two, options=options, + interface_one=interface_one, interface_two=interface_two) + return self.stub.EditLink(request) + + def delete_link(self, session, node_one, node_two, interface_one=None, interface_two=None): + """ + Delete a link between nodes. + + :param int session: session id + :param int node_one: node one id + :param int node_two: node two id + :param int interface_one: node one interface id + :param int interface_two: node two interface id + :return: response with result of success or failure + :rtype: core_pb2.DeleteLinkResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.DeleteLinkRequest( + session=session, node_one=node_one, node_two=node_two, + interface_one=interface_one, interface_two=interface_two) + return self.stub.DeleteLink(request) + + def get_hooks(self, session): + """ + Get all hook scripts. + + :param int session: session id + :return: response with a list of hooks + :rtype: core_pb2.GetHooksResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetHooksRequest(session=session) + return self.stub.GetHooks(request) + + def add_hook(self, session, state, file_name, file_data): + """ + Add hook scripts. + + :param int session: session id + :param core_pb2.SessionState state: state to trigger hook + :param str file_name: name of file for hook script + :param bytes file_data: hook script contents + :return: response with result of success or failure + :rtype: core_pb2.AddHookResponse + :raises grpc.RpcError: when session doesn't exist + """ + hook = core_pb2.Hook(state=state, file=file_name, data=file_data) + request = core_pb2.AddHookRequest(session=session, hook=hook) + return self.stub.AddHook(request) + + def get_mobility_configs(self, session): + """ + Get all mobility configurations. + + :param int session: session id + :return: response with a dict of node ids to mobility configurations + :rtype: core_pb2.GetMobilityConfigsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetMobilityConfigsRequest(session=session) + return self.stub.GetMobilityConfigs(request) + + def get_mobility_config(self, session, _id): + """ + Get mobility configuration for a node. + + :param int session: session id + :param int _id: node id + :return: response with a list of configuration groups + :rtype: core_pb2.GetMobilityConfigResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.GetMobilityConfigRequest(session=session, id=_id) + return self.stub.GetMobilityConfig(request) + + def set_mobility_config(self, session, _id, config): + """ + Set mobility configuration for a node. + + :param int session: session id + :param int _id: node id + :param dict[str, str] config: mobility configuration + :return: response with result of success or failure + :rtype: core_pb2.SetMobilityConfigResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.SetMobilityConfigRequest(session=session, id=_id, config=config) + return self.stub.SetMobilityConfig(request) + + def mobility_action(self, session, _id, action): + """ + Send a mobility action for a node. + + :param int session: session id + :param int _id: node id + :param core_pb2.ServiceAction action: action to take + :return: response with result of success or failure + :rtype: core_pb2.MobilityActionResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.MobilityActionRequest(session=session, id=_id, action=action) + return self.stub.MobilityAction(request) + + def get_services(self): + """ + Get all currently loaded services. + + :return: response with a list of services + :rtype: core_pb2.GetServicesResponse + """ + request = core_pb2.GetServicesRequest() + return self.stub.GetServices(request) + + def get_service_defaults(self, session): + """ + Get default services for different default node models. + + :param int session: session id + :return: response with a dict of node model to a list of services + :rtype: core_pb2.GetServiceDefaultsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetServiceDefaultsRequest(session=session) + return self.stub.GetServiceDefaults(request) + + def set_service_defaults(self, session, service_defaults): + """ + Set default services for node models. + + :param int session: session id + :param dict service_defaults: node models to lists of services + :return: response with result of success or failure + :rtype: core_pb2.SetServiceDefaultsResponse + :raises grpc.RpcError: when session doesn't exist + """ + defaults = [] + for node_type in service_defaults: + services = service_defaults[node_type] + default = core_pb2.ServiceDefaults(node_type=node_type, services=services) + defaults.append(default) + request = core_pb2.SetServiceDefaultsRequest(session=session, defaults=defaults) + return self.stub.SetServiceDefaults(request) + + def get_node_service(self, session, _id, service): + """ + Get service data for a node. + + :param int session: session id + :param int _id: node id + :param str service: service name + :return: response with node service data + :rtype: core_pb2.GetNodeServiceResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.GetNodeServiceRequest(session=session, id=_id, service=service) + return self.stub.GetNodeService(request) + + def get_node_service_file(self, session, _id, service, file_name): + """ + Get a service file for a node. + + :param int session: session id + :param int _id: node id + :param str service: service name + :param str file_name: file name to get data for + :return: response with file data + :rtype: core_pb2.GetNodeServiceFileResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.GetNodeServiceFileRequest(session=session, id=_id, service=service, file=file_name) + return self.stub.GetNodeServiceFile(request) + + def set_node_service(self, session, _id, service, startup, validate, shutdown): + """ + Set service data for a node. + + :param int session: session id + :param int _id: node id + :param str service: service name + :param list startup: startup commands + :param list validate: validation commands + :param list shutdown: shutdown commands + :return: response with result of success or failure + :rtype: core_pb2.SetNodeServiceResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.SetNodeServiceRequest( + session=session, id=_id, service=service, startup=startup, validate=validate, shutdown=shutdown) + return self.stub.SetNodeService(request) + + def set_node_service_file(self, session, _id, service, file_name, data): + """ + Set a service file for a node. + + :param int session: session id + :param int _id: node id + :param str service: service name + :param str file_name: file name to save + :param bytes data: data to save for file + :return: response with result of success or failure + :rtype: core_pb2.SetNodeServiceFileResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.SetNodeServiceFileRequest( + session=session, id=_id, service=service, file=file_name, data=data) + return self.stub.SetNodeServiceFile(request) + + def service_action(self, session, _id, service, action): + """ + Send an action to a service for a node. + + :param int session: session id + :param int _id: node id + :param str service: service name + :param core_pb2.ServiceAction action: action for service (start, stop, restart, validate) + :return: response with result of success or failure + :rtype: core_pb2.ServiceActionResponse + :raises grpc.RpcError: when session or node doesn't exist + """ + request = core_pb2.ServiceActionRequest(session=session, id=_id, service=service, action=action) + return self.stub.ServiceAction(request) + + def get_wlan_config(self, session, _id): + """ + Get wlan configuration for a node. + + :param int session: session id + :param int _id: node id + :return: response with a list of configuration groups + :rtype: core_pb2.GetWlanConfigResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetWlanConfigRequest(session=session, id=_id) + return self.stub.GetWlanConfig(request) + + def set_wlan_config(self, session, _id, config): + """ + Set wlan configuration for a node. + + :param int session: session id + :param int _id: node id + :param dict[str, str] config: wlan configuration + :return: response with result of success or failure + :rtype: core_pb2.SetWlanConfigResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.SetWlanConfigRequest(session=session, id=_id, config=config) + return self.stub.SetWlanConfig(request) + + def get_emane_config(self, session): + """ + Get session emane configuration. + + :param int session: session id + :return: response with a list of configuration groups + :rtype: core_pb2.GetEmaneConfigResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetEmaneConfigRequest(session=session) + return self.stub.GetEmaneConfig(request) + + def set_emane_config(self, session, config): + """ + Set session emane configuration. + + :param int session: session id + :param dict[str, str] config: emane configuration + :return: response with result of success or failure + :rtype: core_pb2.SetEmaneConfigResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.SetEmaneConfigRequest(session=session, config=config) + return self.stub.SetEmaneConfig(request) + + def get_emane_models(self, session): + """ + Get session emane models. + + :param int session: session id + :return: response with a list of emane models + :rtype: core_pb2.GetEmaneModelsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetEmaneModelsRequest(session=session) + return self.stub.GetEmaneModels(request) + + def get_emane_model_config(self, session, _id, model, interface_id=-1): + """ + Get emane model configuration for a node or a node's interface. + + :param int session: session id + :param int _id: node id + :param str model: emane model name + :param int interface_id: node interface id + :return: response with a list of configuration groups + :rtype: core_pb2.GetEmaneModelConfigResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetEmaneModelConfigRequest(session=session, id=_id, model=model, interface=interface_id) + return self.stub.GetEmaneModelConfig(request) + + def set_emane_model_config(self, session, _id, model, config, interface_id=-1): + """ + Set emane model configuration for a node or a node's interface. + + :param int session: session id + :param int _id: node id + :param str model: emane model name + :param dict[str, str] config: emane model configuration + :param int interface_id: node interface id + :return: response with result of success or failure + :rtype: core_pb2.SetEmaneModelConfigResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.SetEmaneModelConfigRequest( + session=session, id=_id, model=model, config=config, interface=interface_id) + return self.stub.SetEmaneModelConfig(request) + + def get_emane_model_configs(self, session): + """ + Get all emane model configurations for a session. + + :param int session: session id + :return: response with a dictionary of node/interface ids to configurations + :rtype: core_pb2.GetEmaneModelConfigsResponse + :raises grpc.RpcError: when session doesn't exist + """ + request = core_pb2.GetEmaneModelConfigsRequest(session=session) + return self.stub.GetEmaneModelConfigs(request) + + def save_xml(self, session, file_path): + """ + Save the current scenario to an XML file. + + :param int session: session id + :param str file_path: local path to save scenario XML file to + :return: nothing + """ + request = core_pb2.SaveXmlRequest(session=session) + response = self.stub.SaveXml(request) + with open(file_path, "wb") as xml_file: + xml_file.write(response.data) + + def open_xml(self, file_path): + """ + Load a local scenario XML file to open as a new session. + + :param str file_path: path of scenario XML file + :return: response with opened session id + :rtype: core_pb2.OpenXmlResponse + """ + with open(file_path, "rb") as xml_file: + data = xml_file.read() + request = core_pb2.OpenXmlRequest(data=data) + return self.stub.OpenXml(request) + + def connect(self): + """ + Open connection to server, must be closed manually. + + :return: nothing + """ + self.channel = grpc.insecure_channel(self.address) + self.stub = core_pb2_grpc.CoreApiStub(self.channel) + + def close(self): + """ + Close currently opened server channel connection. + + :return: nothing + """ + if self.channel: + self.channel.close() + self.channel = None + + @contextmanager + def context_connect(self): + """ + Makes a context manager based connection to the server, will close after context ends. + + :return: nothing + """ + try: + self.connect() + yield + finally: + self.close() diff --git a/daemon/core/grpc/server.py b/daemon/core/grpc/server.py new file mode 100644 index 00000000..45c35fc8 --- /dev/null +++ b/daemon/core/grpc/server.py @@ -0,0 +1,888 @@ +import atexit +import logging +import os +import tempfile +import time +from Queue import Queue, Empty + +import grpc +from concurrent import futures + +from core.emulator.emudata import NodeOptions, InterfaceData, LinkOptions +from core.enumerations import NodeTypes, EventTypes, LinkTypes +from core.grpc import core_pb2 +from core.grpc import core_pb2_grpc +from core.misc import nodeutils +from core.misc.ipaddress import MacAddress +from core.mobility import BasicRangeModel, Ns2ScriptedMobility +from core.service import ServiceManager + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +def convert_value(value): + if value is not None: + value = str(value) + return value + + +def get_config_groups(config, configurable_options): + groups = [] + config_options = [] + + for configuration in configurable_options.configurations(): + value = config[configuration.id] + config_option = core_pb2.ConfigOption() + config_option.label = configuration.label + config_option.name = configuration.id + config_option.value = value + config_option.type = configuration.type.value + config_option.select.extend(configuration.options) + config_options.append(config_option) + + for config_group in configurable_options.config_groups(): + start = config_group.start - 1 + stop = config_group.stop + options = config_options[start: stop] + config_group_proto = core_pb2.ConfigGroup(name=config_group.name, options=options) + groups.append(config_group_proto) + + return groups + + +def get_links(session, node): + links = [] + for link_data in node.all_link_data(0): + link = convert_link(session, link_data) + links.append(link) + return links + + +def get_emane_model_id(_id, interface): + if interface >= 0: + return _id * 1000 + interface + else: + return _id + + +def convert_link(session, link_data): + interface_one = None + if link_data.interface1_id is not None: + node = session.get_object(link_data.node1_id) + interface = node.netif(link_data.interface1_id) + interface_one = core_pb2.Interface( + id=link_data.interface1_id, name=interface.name, mac=convert_value(link_data.interface1_mac), + ip4=convert_value(link_data.interface1_ip4), ip4mask=link_data.interface1_ip4_mask, + ip6=convert_value(link_data.interface1_ip6), ip6mask=link_data.interface1_ip6_mask) + + interface_two = None + if link_data.interface2_id is not None: + node = session.get_object(link_data.node2_id) + interface = node.netif(link_data.interface2_id) + interface_two = core_pb2.Interface( + id=link_data.interface2_id, name=interface.name, mac=convert_value(link_data.interface2_mac), + ip4=convert_value(link_data.interface2_ip4), ip4mask=link_data.interface2_ip4_mask, + ip6=convert_value(link_data.interface2_ip6), ip6mask=link_data.interface2_ip6_mask) + + options = core_pb2.LinkOptions( + opaque=link_data.opaque, + jitter=link_data.jitter, + key=link_data.key, + mburst=link_data.mburst, + mer=link_data.mer, + per=link_data.per, + bandwidth=link_data.bandwidth, + burst=link_data.burst, + delay=link_data.delay, + dup=link_data.dup, + unidirectional=link_data.unidirectional + ) + + return core_pb2.Link( + type=link_data.link_type, node_one=link_data.node1_id, node_two=link_data.node2_id, + interface_one=interface_one, interface_two=interface_two, options=options + ) + + +class CoreGrpcServer(core_pb2_grpc.CoreApiServicer): + def __init__(self, coreemu): + super(CoreGrpcServer, self).__init__() + self.coreemu = coreemu + self.running = True + self.server = None + atexit.register(self._exit_handler) + + def _exit_handler(self): + logging.debug("catching exit, stop running") + self.running = False + + def _is_running(self, context): + return self.running and context.is_active() + + def _cancel_stream(self, context): + context.abort(grpc.StatusCode.CANCELLED, "server stopping") + + def listen(self, address="[::]:50051"): + logging.info("starting grpc api: %s", address) + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + core_pb2_grpc.add_CoreApiServicer_to_server(self, self.server) + self.server.add_insecure_port(address) + self.server.start() + + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + self.server.stop(None) + + def get_session(self, _id, context): + session = self.coreemu.sessions.get(_id) + if not session: + context.abort(grpc.StatusCode.NOT_FOUND, "session {} not found".format(_id)) + return session + + def get_node(self, session, _id, context): + try: + return session.get_object(_id) + except KeyError: + context.abort(grpc.StatusCode.NOT_FOUND, "node {} not found".format(_id)) + + def CreateSession(self, request, context): + logging.debug("create session: %s", request) + session = self.coreemu.create_session(request.id) + session.set_state(EventTypes.DEFINITION_STATE) + session.location.setrefgeo(47.57917, -122.13232, 2.0) + session.location.refscale = 150000.0 + return core_pb2.CreateSessionResponse(id=session.id, state=session.state) + + def DeleteSession(self, request, context): + logging.debug("delete session: %s", request) + result = self.coreemu.delete_session(request.id) + return core_pb2.DeleteSessionResponse(result=result) + + def GetSessions(self, request, context): + logging.debug("get sessions: %s", request) + sessions = [] + for session_id in self.coreemu.sessions: + session = self.coreemu.sessions[session_id] + session_summary = core_pb2.SessionSummary( + id=session_id, state=session.state, nodes=session.get_node_count()) + sessions.append(session_summary) + return core_pb2.GetSessionsResponse(sessions=sessions) + + def GetSessionLocation(self, request, context): + logging.debug("get session location: %s", request) + session = self.get_session(request.id, context) + x, y, z = session.location.refxyz + lat, lon, alt = session.location.refgeo + position = core_pb2.Position(x=x, y=y, z=z, lat=lat, lon=lon, alt=alt) + return core_pb2.GetSessionLocationResponse(position=position, scale=session.location.refscale) + + def SetSessionLocation(self, request, context): + logging.debug("set session location: %s", request) + session = self.get_session(request.id, context) + session.location.refxyz = (request.position.x, request.position.y, request.position.z) + session.location.setrefgeo(request.position.lat, request.position.lon, request.position.alt) + session.location.refscale = request.scale + return core_pb2.SetSessionLocationResponse(result=True) + + def SetSessionState(self, request, context): + logging.debug("set session state: %s", request) + session = self.get_session(request.id, context) + + try: + state = EventTypes(request.state) + session.set_state(state) + + if state == EventTypes.INSTANTIATION_STATE: + if not os.path.exists(session.session_dir): + os.mkdir(session.session_dir) + session.instantiate() + elif state == EventTypes.SHUTDOWN_STATE: + session.shutdown() + elif state == EventTypes.DATACOLLECT_STATE: + session.data_collect() + elif state == EventTypes.DEFINITION_STATE: + session.clear() + + result = True + except KeyError: + result = False + + return core_pb2.SetSessionStateResponse(result=result) + + def GetSessionOptions(self, request, context): + logging.debug("get session options: %s", request) + session = self.get_session(request.id, context) + config = session.options.get_configs() + defaults = session.options.default_values() + defaults.update(config) + groups = get_config_groups(defaults, session.options) + return core_pb2.GetSessionOptionsResponse(groups=groups) + + def SetSessionOptions(self, request, context): + logging.debug("set session options: %s", request) + session = self.get_session(request.id, context) + config = session.options.get_configs() + config.update(request.config) + return core_pb2.SetSessionOptionsResponse(result=True) + + def GetSession(self, request, context): + logging.debug("get session: %s", request) + session = self.get_session(request.id, context) + + links = [] + nodes = [] + for node_id in session.objects: + node = session.objects[node_id] + if not isinstance(node.objid, int): + continue + + node_type = nodeutils.get_node_type(node.__class__).value + model = getattr(node, "type", None) + position = core_pb2.Position(x=node.position.x, y=node.position.y, z=node.position.z) + + services = getattr(node, "services", []) + if services is None: + services = [] + services = [x.name for x in services] + + emane_model = None + if nodeutils.is_node(node, NodeTypes.EMANE): + emane_model = node.model.name + + node_proto = core_pb2.Node( + id=node.objid, name=node.name, emane=emane_model, model=model, + type=node_type, position=position, services=services) + nodes.append(node_proto) + + node_links = get_links(session, node) + links.extend(node_links) + + session_proto = core_pb2.Session(state=session.state, nodes=nodes, links=links) + return core_pb2.GetSessionResponse(session=session_proto) + + def NodeEvents(self, request, context): + session = self.get_session(request.id, context) + queue = Queue() + session.node_handlers.append(queue.put) + + while self._is_running(context): + try: + node = queue.get(timeout=1) + position = core_pb2.Position(x=node.x_position, y=node.y_position) + services = node.services or "" + services = services.split("|") + node_proto = core_pb2.Node( + id=node.id, name=node.name, model=node.model, position=position, services=services) + node_event = core_pb2.NodeEvent(node=node_proto) + yield node_event + except Empty: + continue + + self._cancel_stream(context) + + def LinkEvents(self, request, context): + session = self.get_session(request.id, context) + queue = Queue() + session.link_handlers.append(queue.put) + + while self._is_running(context): + try: + event = queue.get(timeout=1) + interface_one = None + if event.interface1_id is not None: + interface_one = core_pb2.Interface( + id=event.interface1_id, name=event.interface1_name, mac=convert_value(event.interface1_mac), + ip4=convert_value(event.interface1_ip4), ip4mask=event.interface1_ip4_mask, + ip6=convert_value(event.interface1_ip6), ip6mask=event.interface1_ip6_mask) + + interface_two = None + if event.interface2_id is not None: + interface_two = core_pb2.Interface( + id=event.interface2_id, name=event.interface2_name, mac=convert_value(event.interface2_mac), + ip4=convert_value(event.interface2_ip4), ip4mask=event.interface2_ip4_mask, + ip6=convert_value(event.interface2_ip6), ip6mask=event.interface2_ip6_mask) + + options = core_pb2.LinkOptions( + opaque=event.opaque, + jitter=event.jitter, + key=event.key, + mburst=event.mburst, + mer=event.mer, + per=event.per, + bandwidth=event.bandwidth, + burst=event.burst, + delay=event.delay, + dup=event.dup, + unidirectional=event.unidirectional + ) + link = core_pb2.Link( + type=event.link_type, node_one=event.node1_id, node_two=event.node2_id, + interface_one=interface_one, interface_two=interface_two, options=options) + link_event = core_pb2.LinkEvent(message_type=event.message_type, link=link) + yield link_event + except Empty: + continue + + self._cancel_stream(context) + + def SessionEvents(self, request, context): + session = self.get_session(request.id, context) + queue = Queue() + session.event_handlers.append(queue.put) + + while self._is_running(context): + try: + event = queue.get(timeout=1) + event_time = event.time + if event_time is not None: + event_time = float(event_time) + session_event = core_pb2.SessionEvent( + node=event.node, + event=event.event_type, + name=event.name, + data=event.data, + time=event_time, + session=session.id + ) + yield session_event + except Empty: + continue + + self._cancel_stream(context) + + def ConfigEvents(self, request, context): + session = self.get_session(request.id, context) + queue = Queue() + session.config_handlers.append(queue.put) + + while self._is_running(context): + try: + event = queue.get(timeout=1) + config_event = core_pb2.ConfigEvent( + message_type=event.message_type, + node=event.node, + object=event.object, + type=event.type, + captions=event.captions, + bitmap=event.bitmap, + data_values=event.data_values, + possible_values=event.possible_values, + groups=event.groups, + session=event.session, + interface=event.interface_number, + network_id=event.network_id, + opaque=event.opaque, + data_types=event.data_types + ) + yield config_event + except Empty: + continue + + self._cancel_stream(context) + + def ExceptionEvents(self, request, context): + session = self.get_session(request.id, context) + queue = Queue() + session.exception_handlers.append(queue.put) + + while self._is_running(context): + try: + event = queue.get(timeout=1) + exception_event = core_pb2.ExceptionEvent( + node=event.node, + session=int(event.session), + level=event.level.value, + source=event.source, + date=event.date, + text=event.text, + opaque=event.opaque + ) + yield exception_event + except Empty: + continue + + self._cancel_stream(context) + + def FileEvents(self, request, context): + session = self.get_session(request.id, context) + queue = Queue() + session.file_handlers.append(queue.put) + + while self._is_running(context): + try: + event = queue.get(timeout=1) + file_event = core_pb2.FileEvent( + message_type=event.message_type, + node=event.node, + name=event.name, + mode=event.mode, + number=event.number, + type=event.type, + source=event.source, + session=event.session, + data=event.data, + compressed_data=event.compressed_data + ) + yield file_event + except Empty: + continue + + self._cancel_stream(context) + + def AddNode(self, request, context): + logging.debug("add node: %s", request) + session = self.get_session(request.session, context) + + node_proto = request.node + node_id = node_proto.id + node_type = node_proto.type + if node_type is None: + node_type = NodeTypes.DEFAULT.value + node_type = NodeTypes(node_type) + + node_options = NodeOptions(name=node_proto.name, model=node_proto.model) + node_options.icon = node_proto.icon + node_options.opaque = node_proto.opaque + node_options.services = node_proto.services + + position = node_proto.position + node_options.set_position(position.x, position.y) + node_options.set_location(position.lat, position.lon, position.alt) + node = session.add_node(_type=node_type, _id=node_id, node_options=node_options) + + # configure emane if provided + emane_model = node_proto.emane + if emane_model: + session.emane.set_model_config(node_id, emane_model) + + return core_pb2.AddNodeResponse(id=node.objid) + + def GetNode(self, request, context): + logging.debug("get node: %s", request) + session = self.get_session(request.session, context) + node = self.get_node(session, request.id, context) + + interfaces = [] + for interface_id, interface in node._netif.iteritems(): + net_id = None + if interface.net: + net_id = interface.net.objid + interface_proto = core_pb2.Interface( + id=interface_id, netid=net_id, name=interface.name, mac=str(interface.hwaddr), + mtu=interface.mtu, flowid=interface.flow_id) + interfaces.append(interface_proto) + + emane_model = None + if nodeutils.is_node(node, NodeTypes.EMANE): + emane_model = node.model.name + + services = [x.name for x in getattr(node, "services", [])] + position = core_pb2.Position(x=node.position.x, y=node.position.y, z=node.position.z) + node_type = nodeutils.get_node_type(node.__class__).value + node = core_pb2.Node( + id=node.objid, name=node.name, type=node_type, emane=emane_model, model=node.type, position=position, + services=services) + + return core_pb2.GetNodeResponse(node=node, interfaces=interfaces) + + def EditNode(self, request, context): + logging.debug("edit node: %s", request) + session = self.get_session(request.session, context) + node_id = request.id + node_options = NodeOptions() + x = request.position.x + y = request.position.y + node_options.set_position(x, y) + lat = request.position.lat + lon = request.position.lon + alt = request.position.alt + node_options.set_location(lat, lon, alt) + result = session.update_node(node_id, node_options) + return core_pb2.EditNodeResponse(result=result) + + def DeleteNode(self, request, context): + logging.debug("delete node: %s", request) + session = self.get_session(request.session, context) + result = session.delete_node(request.id) + return core_pb2.DeleteNodeResponse(result=result) + + def GetNodeLinks(self, request, context): + logging.debug("get node links: %s", request) + session = self.get_session(request.session, context) + node = self.get_node(session, request.id, context) + links = get_links(session, node) + return core_pb2.GetNodeLinksResponse(links=links) + + def AddLink(self, request, context): + logging.debug("add link: %s", request) + session = self.get_session(request.session, context) + + # validate node exist + self.get_node(session, request.link.node_one, context) + self.get_node(session, request.link.node_two, context) + node_one = request.link.node_one + node_two = request.link.node_two + + interface_one = None + interface_one_data = request.link.interface_one + if interface_one_data: + name = interface_one_data.name + if name == "": + name = None + mac = interface_one_data.mac + if mac == "": + mac = None + else: + mac = MacAddress.from_string(mac) + interface_one = InterfaceData( + _id=interface_one_data.id, + name=name, + mac=mac, + ip4=interface_one_data.ip4, + ip4_mask=interface_one_data.ip4mask, + ip6=interface_one_data.ip6, + ip6_mask=interface_one_data.ip6mask, + ) + + interface_two = None + interface_two_data = request.link.interface_two + if interface_two_data: + name = interface_two_data.name + if name == "": + name = None + mac = interface_two_data.mac + if mac == "": + mac = None + else: + mac = MacAddress.from_string(mac) + interface_two = InterfaceData( + _id=interface_two_data.id, + name=name, + mac=mac, + ip4=interface_two_data.ip4, + ip4_mask=interface_two_data.ip4mask, + ip6=interface_two_data.ip6, + ip6_mask=interface_two_data.ip6mask, + ) + + link_type = None + link_type_value = request.link.type + if link_type_value is not None: + link_type = LinkTypes(link_type_value) + + options_data = request.link.options + link_options = LinkOptions(_type=link_type) + if options_data: + link_options.delay = options_data.delay + link_options.bandwidth = options_data.bandwidth + link_options.per = options_data.per + link_options.dup = options_data.dup + link_options.jitter = options_data.jitter + link_options.mer = options_data.mer + link_options.burst = options_data.burst + link_options.mburst = options_data.mburst + link_options.unidirectional = options_data.unidirectional + link_options.key = options_data.key + link_options.opaque = options_data.opaque + + session.add_link(node_one, node_two, interface_one, interface_two, link_options=link_options) + return core_pb2.AddLinkResponse(result=True) + + def EditLink(self, request, context): + logging.debug("edit link: %s", request) + session = self.get_session(request.session, context) + node_one = request.node_one + node_two = request.node_two + interface_one_id = request.interface_one + interface_two_id = request.interface_two + options_data = request.options + link_options = LinkOptions() + link_options.delay = options_data.delay + link_options.bandwidth = options_data.bandwidth + link_options.per = options_data.per + link_options.dup = options_data.dup + link_options.jitter = options_data.jitter + link_options.mer = options_data.mer + link_options.burst = options_data.burst + link_options.mburst = options_data.mburst + link_options.unidirectional = options_data.unidirectional + link_options.key = options_data.key + link_options.opaque = options_data.opaque + session.update_link(node_one, node_two, interface_one_id, interface_two_id, link_options) + return core_pb2.EditLinkResponse(result=True) + + def DeleteLink(self, request, context): + logging.debug("delete link: %s", request) + session = self.get_session(request.session, context) + node_one = request.node_one + node_two = request.node_two + interface_one = request.interface_one + interface_two = request.interface_two + session.delete_link(node_one, node_two, interface_one, interface_two) + return core_pb2.DeleteLinkResponse(result=True) + + def GetHooks(self, request, context): + logging.debug("get hooks: %s", request) + session = self.get_session(request.session, context) + hooks = [] + for state, state_hooks in session._hooks.iteritems(): + for file_name, file_data in state_hooks: + hook = core_pb2.Hook(state=state, file=file_name, data=file_data) + hooks.append(hook) + return core_pb2.GetHooksResponse(hooks=hooks) + + def AddHook(self, request, context): + logging.debug("add hook: %s", request) + session = self.get_session(request.session, context) + hook = request.hook + session.add_hook(hook.state, hook.file, None, hook.data) + return core_pb2.AddHookResponse(result=True) + + def GetMobilityConfigs(self, request, context): + logging.debug("get mobility configs: %s", request) + session = self.get_session(request.session, context) + response = core_pb2.GetMobilityConfigsResponse() + for node_id, model_config in session.mobility.node_configurations.iteritems(): + if node_id == -1: + continue + for model_name in model_config.iterkeys(): + if model_name != Ns2ScriptedMobility.name: + continue + config = session.mobility.get_model_config(node_id, model_name) + groups = get_config_groups(config, Ns2ScriptedMobility) + response.configs[node_id].groups.extend(groups) + return response + + def GetMobilityConfig(self, request, context): + logging.debug("get mobility config: %s", request) + session = self.get_session(request.session, context) + config = session.mobility.get_model_config(request.id, Ns2ScriptedMobility.name) + groups = get_config_groups(config, Ns2ScriptedMobility) + return core_pb2.GetMobilityConfigResponse(groups=groups) + + def SetMobilityConfig(self, request, context): + logging.debug("set mobility config: %s", request) + session = self.get_session(request.session, context) + session.mobility.set_model_config(request.id, Ns2ScriptedMobility.name, request.config) + return core_pb2.SetMobilityConfigResponse(result=True) + + def MobilityAction(self, request, context): + logging.debug("mobility action: %s", request) + session = self.get_session(request.session, context) + node = self.get_node(session, request.id, context) + result = True + if request.action == core_pb2.MOBILITY_START: + node.mobility.start() + elif request.action == core_pb2.MOBILITY_PAUSE: + node.mobility.pause() + elif request.action == core_pb2.MOBILITY_STOP: + node.mobility.stop(move_initial=True) + else: + result = False + return core_pb2.MobilityActionResponse(result=result) + + def GetServices(self, request, context): + logging.debug("get services: %s", request) + services = [] + for service in ServiceManager.services.itervalues(): + service_proto = core_pb2.Service(group=service.group, name=service.name) + services.append(service_proto) + return core_pb2.GetServicesResponse(services=services) + + def GetServiceDefaults(self, request, context): + logging.debug("get service defaults: %s", request) + session = self.get_session(request.session, context) + all_service_defaults = [] + for node_type in session.services.default_services: + services = session.services.default_services[node_type] + service_defaults = core_pb2.ServiceDefaults(node_type=node_type, services=services) + all_service_defaults.append(service_defaults) + return core_pb2.GetServiceDefaultsResponse(defaults=all_service_defaults) + + def SetServiceDefaults(self, request, context): + logging.debug("set service defaults: %s", request) + session = self.get_session(request.session, context) + session.services.default_services.clear() + for service_defaults in request.defaults: + session.services.default_services[service_defaults.node_type] = service_defaults.services + return core_pb2.SetServiceDefaultsResponse(result=True) + + def GetNodeService(self, request, context): + logging.debug("get node service: %s", request) + session = self.get_session(request.session, context) + service = session.services.get_service(request.id, request.service, default_service=True) + service_proto = core_pb2.NodeServiceData( + executables=service.executables, + dependencies=service.dependencies, + dirs=service.dirs, + configs=service.configs, + startup=service.startup, + validate=service.validate, + validation_mode=service.validation_mode.value, + validation_timer=service.validation_timer, + shutdown=service.shutdown, + meta=service.meta + ) + return core_pb2.GetNodeServiceResponse(service=service_proto) + + def GetNodeServiceFile(self, request, context): + logging.debug("get node service file: %s", request) + session = self.get_session(request.session, context) + node = self.get_node(session, request.id, context) + service = None + for current_service in node.services: + if current_service.name == request.service: + service = current_service + break + if not service: + context.abort(grpc.StatusCode.NOT_FOUND, "service not found") + file_data = session.services.get_service_file(node, request.service, request.file) + return core_pb2.GetNodeServiceFileResponse(data=file_data.data) + + def SetNodeService(self, request, context): + logging.debug("set node service: %s", request) + session = self.get_session(request.session, context) + session.services.set_service(request.id, request.service) + service = session.services.get_service(request.id, request.service) + service.startup = tuple(request.startup) + service.validate = tuple(request.validate) + service.shutdown = tuple(request.shutdown) + return core_pb2.SetNodeServiceResponse(result=True) + + def SetNodeServiceFile(self, request, context): + logging.debug("set node service file: %s", request) + session = self.get_session(request.session, context) + session.services.set_service_file(request.id, request.service, request.file, request.data) + return core_pb2.SetNodeServiceFileResponse(result=True) + + def ServiceAction(self, request, context): + logging.debug("service action: %s", request) + session = self.get_session(request.session, context) + node = self.get_node(session, request.id, context) + service = None + for current_service in node.services: + if current_service.name == request.service: + service = current_service + break + + if not service: + context.abort(grpc.StatusCode.NOT_FOUND, "service not found") + + status = -1 + if request.action == core_pb2.SERVICE_START: + status = session.services.startup_service(node, service, wait=True) + elif request.action == core_pb2.SERVICE_STOP: + status = session.services.stop_service(node, service) + elif request.action == core_pb2.SERVICE_RESTART: + status = session.services.stop_service(node, service) + if not status: + status = session.services.startup_service(node, service, wait=True) + elif request.action == core_pb2.SERVICE_VALIDATE: + status = session.services.validate_service(node, service) + + result = False + if not status: + result = True + + return core_pb2.ServiceActionResponse(result=result) + + def GetWlanConfig(self, request, context): + logging.debug("get wlan config: %s", request) + session = self.get_session(request.session, context) + config = session.mobility.get_model_config(request.id, BasicRangeModel.name) + groups = get_config_groups(config, BasicRangeModel) + return core_pb2.GetWlanConfigResponse(groups=groups) + + def SetWlanConfig(self, request, context): + logging.debug("set wlan config: %s", request) + session = self.get_session(request.session, context) + session.mobility.set_model_config(request.id, BasicRangeModel.name, request.config) + return core_pb2.SetWlanConfigResponse(result=True) + + def GetEmaneConfig(self, request, context): + logging.debug("get emane config: %s", request) + session = self.get_session(request.session, context) + config = session.emane.get_configs() + groups = get_config_groups(config, session.emane.emane_config) + return core_pb2.GetEmaneConfigResponse(groups=groups) + + def SetEmaneConfig(self, request, context): + logging.debug("set emane config: %s", request) + session = self.get_session(request.session, context) + config = session.emane.get_configs() + config.update(request.config) + return core_pb2.SetEmaneConfigResponse(result=True) + + def GetEmaneModels(self, request, context): + logging.debug("get emane models: %s", request) + session = self.get_session(request.session, context) + models = [] + for model in session.emane.models.keys(): + if len(model.split("_")) != 2: + continue + models.append(model) + return core_pb2.GetEmaneModelsResponse(models=models) + + def GetEmaneModelConfig(self, request, context): + logging.debug("get emane model config: %s", request) + session = self.get_session(request.session, context) + model = session.emane.models[request.model] + _id = get_emane_model_id(request.id, request.interface) + config = session.emane.get_model_config(_id, request.model) + groups = get_config_groups(config, model) + return core_pb2.GetEmaneModelConfigResponse(groups=groups) + + def SetEmaneModelConfig(self, request, context): + logging.debug("set emane model config: %s", request) + session = self.get_session(request.session, context) + _id = get_emane_model_id(request.id, request.interface) + session.emane.set_model_config(_id, request.model, request.config) + return core_pb2.SetEmaneModelConfigResponse(result=True) + + def GetEmaneModelConfigs(self, request, context): + logging.debug("get emane model configs: %s", request) + session = self.get_session(request.session, context) + response = core_pb2.GetEmaneModelConfigsResponse() + for node_id, model_config in session.emane.node_configurations.iteritems(): + if node_id == -1: + continue + + for model_name in model_config.iterkeys(): + model = session.emane.models[model_name] + config = session.emane.get_model_config(node_id, model_name) + config_groups = get_config_groups(config, model) + node_configurations = response.configs[node_id] + node_configurations.model = model_name + node_configurations.groups.extend(config_groups) + return response + + def SaveXml(self, request, context): + logging.debug("save xml: %s", request) + session = self.get_session(request.session, context) + + _, temp_path = tempfile.mkstemp() + session.save_xml(temp_path) + + with open(temp_path, "rb") as xml_file: + data = xml_file.read() + + return core_pb2.SaveXmlResponse(data=data) + + def OpenXml(self, request, context): + logging.debug("open xml: %s", request) + session = self.coreemu.create_session() + session.set_state(EventTypes.CONFIGURATION_STATE) + + _, temp_path = tempfile.mkstemp() + with open(temp_path, "wb") as xml_file: + xml_file.write(request.data) + + try: + session.open_xml(temp_path, start=True) + return core_pb2.OpenXmlResponse(session=session.id, result=True) + except IOError: + logging.exception("error opening session file") + self.coreemu.delete_session(session.id) + context.abort(grpc.StatusCode.INVALID_ARGUMENT, "invalid xml file") diff --git a/daemon/core/misc/nodeutils.py b/daemon/core/misc/nodeutils.py index 645c0543..ef4dacb6 100644 --- a/daemon/core/misc/nodeutils.py +++ b/daemon/core/misc/nodeutils.py @@ -63,6 +63,19 @@ def get_node_class(node_type): return _NODE_MAP[node_type] +def get_node_type(node_class): + """ + Retrieve the node type given a node class. + + :param class node_class: node class to get type for + :return: node type + :rtype: core.enumerations.NodeTypes + """ + global _NODE_MAP + node_type_map = {v: k for k, v in _NODE_MAP.iteritems()} + return node_type_map.get(node_class) + + def is_node(obj, node_types): """ Validates if an object is one of the provided node types. diff --git a/daemon/core/misc/utils.py b/daemon/core/misc/utils.py index 0aadd5fe..f2e578bf 100644 --- a/daemon/core/misc/utils.py +++ b/daemon/core/misc/utils.py @@ -319,7 +319,7 @@ def expand_corepath(pathname, session=None, node=None): """ if session is not None: pathname = pathname.replace("~", "/home/%s" % session.user) - pathname = pathname.replace("%SESSION%", str(session.session_id)) + pathname = pathname.replace("%SESSION%", str(session.id)) pathname = pathname.replace("%SESSION_DIR%", session.session_dir) pathname = pathname.replace("%SESSION_USER%", session.user) diff --git a/daemon/core/mobility.py b/daemon/core/mobility.py index e8bb747d..4a9eb746 100644 --- a/daemon/core/mobility.py +++ b/daemon/core/mobility.py @@ -271,7 +271,6 @@ class WirelessModel(ConfigurableOptions): :param core.session.Session session: core session we are tied to :param int object_id: object id - :param dict config: values """ self.session = session self.object_id = object_id diff --git a/daemon/core/netns/openvswitch.py b/daemon/core/netns/openvswitch.py index c0b86d0a..d18471ea 100644 --- a/daemon/core/netns/openvswitch.py +++ b/daemon/core/netns/openvswitch.py @@ -648,7 +648,7 @@ class OvsGreTapBridge(OvsNet): OvsNet.__init__(self, session=session, objid=objid, name=name, policy=policy, start=False) self.grekey = key if self.grekey is None: - self.grekey = self.session.session_id ^ self.objid + self.grekey = self.session.id ^ self.objid self.localnum = None self.remotenum = None diff --git a/daemon/core/netns/vnet.py b/daemon/core/netns/vnet.py index 55dc855d..17bcada0 100644 --- a/daemon/core/netns/vnet.py +++ b/daemon/core/netns/vnet.py @@ -589,7 +589,7 @@ class GreTapBridge(LxBrNet): LxBrNet.__init__(self, session=session, objid=objid, name=name, policy=policy, start=False) self.grekey = key if self.grekey is None: - self.grekey = self.session.session_id ^ self.objid + self.grekey = self.session.id ^ self.objid self.localnum = None self.remotenum = None self.remoteip = remoteip diff --git a/daemon/core/session.py b/daemon/core/session.py index 5d955c2a..84f1d822 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -46,18 +46,18 @@ class Session(object): CORE session manager. """ - def __init__(self, session_id, config=None, mkdir=True): + def __init__(self, _id, config=None, mkdir=True): """ Create a Session instance. - :param int session_id: session id + :param int _id: session id :param dict config: session configuration :param bool mkdir: flag to determine if a directory should be made """ - self.session_id = session_id + self.id = _id # define and create session directory when desired - self.session_dir = os.path.join(tempfile.gettempdir(), "pycore.%s" % self.session_id) + self.session_dir = os.path.join(tempfile.gettempdir(), "pycore.%s" % self.id) if mkdir: os.mkdir(self.session_dir) @@ -207,12 +207,12 @@ class Session(object): state_name = state.name if self.state == state_value: - logging.info("session(%s) is already in state: %s, skipping change", self.session_id, state_name) + logging.info("session(%s) is already in state: %s, skipping change", self.id, state_name) return self.state = state_value self._state_time = time.time() - logging.info("changing session(%s) to state %s", self.session_id, state_name) + logging.info("changing session(%s) to state %s", self.id, state_name) self.write_state(state_value) self.run_hooks(state_value) @@ -397,7 +397,7 @@ class Session(object): :return: """ env = os.environ.copy() - env["SESSION"] = "%s" % self.session_id + env["SESSION"] = "%s" % self.id env["SESSION_SHORT"] = "%s" % self.short_session_id() env["SESSION_DIR"] = "%s" % self.session_dir env["SESSION_NAME"] = "%s" % self.name @@ -562,9 +562,9 @@ class Session(object): """ Log information about the session in its current state. """ - logging.info("session id=%s name=%s state=%s", self.session_id, self.name, self.state) + logging.info("session id=%s name=%s state=%s", self.id, self.name, self.state) logging.info("file=%s thumbnail=%s node_count=%s/%s", - self.file_name, self.thumbnail, self.get_node_count(), len(self.objects)) + self.file_name, self.thumbnail, self.get_node_count(), len(self.objects)) def exception(self, level, source, object_id, text): """ @@ -579,7 +579,7 @@ class Session(object): exception_data = ExceptionData( node=object_id, - session=str(self.session_id), + session=str(self.id), level=level, source=source, date=time.ctime(), @@ -631,10 +631,12 @@ class Session(object): """ with self._objects_lock: - count = len([x for x in self.objects if not nodeutils.is_node(x, (NodeTypes.PEER_TO_PEER, NodeTypes.CONTROL_NET))]) + count = len([x for x in self.objects.itervalues() + if not nodeutils.is_node(x, (NodeTypes.PEER_TO_PEER, NodeTypes.CONTROL_NET))]) # on Linux, GreTapBridges are auto-created, not part of GUI's node count - count -= len([x for x in self.objects if nodeutils.is_node(x, NodeTypes.TAP_BRIDGE) and not nodeutils.is_node(x, NodeTypes.TUNNEL)]) + count -= len([x for x in self.objects.itervalues() + if nodeutils.is_node(x, NodeTypes.TAP_BRIDGE) and not nodeutils.is_node(x, NodeTypes.TUNNEL)]) return count @@ -647,8 +649,8 @@ class Session(object): # this is called from instantiate() after receiving an event message # for the instantiation state, and from the broker when distributed # nodes have been started - logging.info("session(%s) checking if not in runtime state, current state: %s", self.session_id, - coreapi.state_name(self.state)) + logging.info("session(%s) checking if not in runtime state, current state: %s", self.id, + coreapi.state_name(self.state)) if self.state == EventTypes.RUNTIME_STATE.value: logging.info("valid runtime state found, returning") return @@ -694,7 +696,7 @@ class Session(object): and links remain. """ node_count = self.get_node_count() - logging.info("session(%s) checking shutdown: %s nodes remaining", self.session_id, node_count) + logging.info("session(%s) checking shutdown: %s nodes remaining", self.id, node_count) shutdown = False if node_count == 0: @@ -708,7 +710,7 @@ class Session(object): Return a shorter version of the session ID, appropriate for interface names, where length may be limited. """ - ssid = (self.session_id >> 8) ^ (self.session_id & ((1 << 8) - 1)) + ssid = (self.id >> 8) ^ (self.id & ((1 << 8) - 1)) return "%x" % ssid def boot_nodes(self): @@ -961,7 +963,7 @@ class Session(object): logging.exception("error retrieving control net object") return - header = "CORE session %s host entries" % self.session_id + header = "CORE session %s host entries" % self.id if remove: logging.info("Removing /etc/hosts file entries.") utils.file_demunge("/etc/hosts", header) diff --git a/daemon/data/core.conf b/daemon/data/core.conf index d6f650b9..27fa698e 100644 --- a/daemon/data/core.conf +++ b/daemon/data/core.conf @@ -58,3 +58,5 @@ emane_event_monitor = False # EMANE log level range [0,4] default: 2 #emane_log_level = 2 emane_realtime = True +# prefix used for emane installation +# emane_prefix = /usr diff --git a/daemon/examples/grpc/__init__.py b/daemon/examples/grpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/daemon/examples/grpc/switch.py b/daemon/examples/grpc/switch.py new file mode 100644 index 00000000..354e7b66 --- /dev/null +++ b/daemon/examples/grpc/switch.py @@ -0,0 +1,60 @@ +import logging + +from core.grpc import client +from core.grpc import core_pb2 + + +def log_event(event): + logging.info("event: %s", event) + + +def main(): + core = client.CoreGrpcClient() + + with core.context_connect(): + # create session + session = core.create_session() + logging.info("created session: %s", session) + + # handle events session may broadcast + core.exception_events(session.id, log_event) + core.node_events(session.id, log_event) + core.session_events(session.id, log_event) + core.link_events(session.id, log_event) + core.file_events(session.id, log_event) + core.config_events(session.id, log_event) + + # change session state + response = core.set_session_state(session.id, core_pb2.STATE_CONFIGURATION) + logging.info("set session state: %s", response) + + # create switch node + switch = core_pb2.Node(type=core_pb2.NODE_SWITCH) + response = core.add_node(session.id, switch) + logging.info("created switch: %s", response) + switch_id = response.id + + # helper to create interfaces + interface_helper = client.InterfaceHelper(ip4_prefix="10.83.0.0/16") + + for i in xrange(2): + # create node + position = core_pb2.Position(x=50 + 50 * i, y=50) + node = core_pb2.Node(position=position) + response = core.add_node(session.id, node) + logging.info("created node: %s", response) + node_id = response.id + + # create link + interface_one = interface_helper.create_interface(node_id, 0) + response = core.add_link(session.id, node_id, switch_id, interface_one) + logging.info("created link: %s", response) + + # change session state + response = core.set_session_state(session.id, core_pb2.STATE_INSTANTIATION) + logging.info("set session state: %s", response) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() diff --git a/daemon/proto/Makefile.am b/daemon/proto/Makefile.am new file mode 100644 index 00000000..bcf62a2c --- /dev/null +++ b/daemon/proto/Makefile.am @@ -0,0 +1,5 @@ +all: + $(PYTHON) -m grpc_tools.protoc -I . --python_out=../core/grpc --grpc_python_out=../core/grpc core.proto + +clean: + -rm -f ../core/grpc/core_pb2* diff --git a/daemon/proto/core.proto b/daemon/proto/core.proto new file mode 100644 index 00000000..45fd597e --- /dev/null +++ b/daemon/proto/core.proto @@ -0,0 +1,778 @@ +syntax = "proto3"; + +package core; + +service CoreApi { + // session rpc + rpc CreateSession (CreateSessionRequest) returns (CreateSessionResponse) { + } + rpc DeleteSession (DeleteSessionRequest) returns (DeleteSessionResponse) { + } + rpc GetSessions (GetSessionsRequest) returns (GetSessionsResponse) { + } + rpc GetSession (GetSessionRequest) returns (GetSessionResponse) { + } + rpc GetSessionOptions (GetSessionOptionsRequest) returns (GetSessionOptionsResponse) { + } + rpc SetSessionOptions (SetSessionOptionsRequest) returns (SetSessionOptionsResponse) { + } + rpc GetSessionLocation (GetSessionLocationRequest) returns (GetSessionLocationResponse) { + } + rpc SetSessionLocation (SetSessionLocationRequest) returns (SetSessionLocationResponse) { + } + rpc SetSessionState (SetSessionStateRequest) returns (SetSessionStateResponse) { + } + + // event streams + rpc NodeEvents (NodeEventsRequest) returns (stream NodeEvent) { + } + rpc LinkEvents (LinkEventsRequest) returns (stream LinkEvent) { + } + rpc SessionEvents (SessionEventsRequest) returns (stream SessionEvent) { + } + rpc ConfigEvents (ConfigEventsRequest) returns (stream ConfigEvent) { + } + rpc ExceptionEvents (ExceptionEventsRequest) returns (stream ExceptionEvent) { + } + rpc FileEvents (FileEventsRequest) returns (stream FileEvent) { + } + + // node rpc + rpc AddNode (AddNodeRequest) returns (AddNodeResponse) { + } + rpc GetNode (GetNodeRequest) returns (GetNodeResponse) { + } + rpc EditNode (EditNodeRequest) returns (EditNodeResponse) { + } + rpc DeleteNode (DeleteNodeRequest) returns (DeleteNodeResponse) { + } + + // link rpc + rpc GetNodeLinks (GetNodeLinksRequest) returns (GetNodeLinksResponse) { + } + rpc AddLink (AddLinkRequest) returns (AddLinkResponse) { + } + rpc EditLink (EditLinkRequest) returns (EditLinkResponse) { + } + rpc DeleteLink (DeleteLinkRequest) returns (DeleteLinkResponse) { + } + + // hook rpc + rpc GetHooks (GetHooksRequest) returns (GetHooksResponse) { + } + rpc AddHook (AddHookRequest) returns (AddHookResponse) { + } + + // mobility rpc + rpc GetMobilityConfigs (GetMobilityConfigsRequest) returns (GetMobilityConfigsResponse) { + } + rpc GetMobilityConfig (GetMobilityConfigRequest) returns (GetMobilityConfigResponse) { + } + rpc SetMobilityConfig (SetMobilityConfigRequest) returns (SetMobilityConfigResponse) { + } + rpc MobilityAction (MobilityActionRequest) returns (MobilityActionResponse) { + } + + // service rpc + rpc GetServices (GetServicesRequest) returns (GetServicesResponse) { + } + rpc GetServiceDefaults (GetServiceDefaultsRequest) returns (GetServiceDefaultsResponse) { + } + rpc SetServiceDefaults (SetServiceDefaultsRequest) returns (SetServiceDefaultsResponse) { + } + rpc GetNodeService (GetNodeServiceRequest) returns (GetNodeServiceResponse) { + } + rpc GetNodeServiceFile (GetNodeServiceFileRequest) returns (GetNodeServiceFileResponse) { + } + rpc SetNodeService (SetNodeServiceRequest) returns (SetNodeServiceResponse) { + } + rpc SetNodeServiceFile (SetNodeServiceFileRequest) returns (SetNodeServiceFileResponse) { + } + rpc ServiceAction (ServiceActionRequest) returns (ServiceActionResponse) { + } + + // wlan rpc + rpc GetWlanConfig (GetWlanConfigRequest) returns (GetWlanConfigResponse) { + } + rpc SetWlanConfig (SetWlanConfigRequest) returns (SetWlanConfigResponse) { + } + + // emane rpc + rpc GetEmaneConfig (GetEmaneConfigRequest) returns (GetEmaneConfigResponse) { + } + rpc SetEmaneConfig (SetEmaneConfigRequest) returns (SetEmaneConfigResponse) { + } + rpc GetEmaneModels (GetEmaneModelsRequest) returns (GetEmaneModelsResponse) { + } + rpc GetEmaneModelConfig (GetEmaneModelConfigRequest) returns (GetEmaneModelConfigResponse) { + } + rpc SetEmaneModelConfig (SetEmaneModelConfigRequest) returns (SetEmaneModelConfigResponse) { + } + rpc GetEmaneModelConfigs (GetEmaneModelConfigsRequest) returns (GetEmaneModelConfigsResponse) { + } + + // xml rpc + rpc SaveXml (SaveXmlRequest) returns (SaveXmlResponse) { + } + rpc OpenXml (OpenXmlRequest) returns (OpenXmlResponse) { + } +} + +// rpc request/response messages +message CreateSessionRequest { + int32 id = 1; +} + +message CreateSessionResponse { + int32 id = 1; + SessionState state = 2; +} + +message DeleteSessionRequest { + int32 id = 1; +} + +message DeleteSessionResponse { + bool result = 1; +} + +message GetSessionsRequest { +} + +message GetSessionsResponse { + repeated SessionSummary sessions = 1; +} + +message GetSessionRequest { + int32 id = 1; +} + +message GetSessionResponse { + Session session = 1; +} + +message GetSessionOptionsRequest { + int32 id = 1; +} + +message GetSessionOptionsResponse { + repeated ConfigGroup groups = 1; +} + +message SetSessionOptionsRequest { + int32 id = 1; + map config = 2; +} + +message SetSessionOptionsResponse { + bool result = 1; +} + +message GetSessionLocationRequest { + int32 id = 1; +} + +message GetSessionLocationResponse { + Position position = 1; + float scale = 2; +} + +message SetSessionLocationRequest { + int32 id = 1; + Position position = 2; + float scale = 3; +} + +message SetSessionLocationResponse { + bool result = 1; +} + +message SetSessionStateRequest { + int32 id = 1; + SessionState state = 2; +} + +message SetSessionStateResponse { + bool result = 1; +} + +message NodeEventsRequest { + int32 id = 1; +} + +message NodeEvent { + Node node = 1; +} + +message LinkEventsRequest { + int32 id = 1; +} + +message LinkEvent { + MessageType message_type = 1; + Link link = 2; +} + +message SessionEventsRequest { + int32 id = 1; +} + +message SessionEvent { + int32 node = 1; + int32 event = 2; + string name = 3; + bytes data = 4; + float time = 5; + int32 session = 6; +} + +message ConfigEventsRequest { + int32 id = 1; +} + +message ConfigEvent { + MessageType message_type = 1; + int32 node = 2; + string object = 3; + int32 type = 4; + repeated int32 data_types = 5; + string data_values = 6; + string captions = 7; + string bitmap = 8; + string possible_values = 9; + string groups = 10; + string session = 11; + int32 interface = 12; + int32 network_id = 13; + string opaque = 14; +} + +message ExceptionEventsRequest { + int32 id = 1; +} + +message ExceptionEvent { + int32 node = 1; + int32 session = 2; + ExceptionLevel level = 3; + string source = 4; + string date = 5; + string text = 6; + string opaque = 7; +} + +message FileEventsRequest { + int32 id = 1; +} + +message FileEvent { + MessageType message_type = 1; + int32 node = 2; + string name = 3; + string mode = 4; + int32 number = 5; + string type = 6; + string source = 7; + int32 session = 8; + bytes data = 9; + bytes compressed_data = 10; +} + +message AddNodeRequest { + int32 session = 1; + Node node = 2; +} + +message AddNodeResponse { + int32 id = 1; +} + +message GetNodeRequest { + int32 session = 1; + int32 id = 2; +} + +message GetNodeResponse { + Node node = 1; + repeated Interface interfaces = 2; +} + +message EditNodeRequest { + int32 session = 1; + int32 id = 2; + Position position = 3; +} + +message EditNodeResponse { + bool result = 1; +} + +message DeleteNodeRequest { + int32 session = 1; + int32 id = 2; +} + +message DeleteNodeResponse { + bool result = 1; +} + +message GetNodeLinksRequest { + int32 session = 1; + int32 id = 2; +} + +message GetNodeLinksResponse { + repeated Link links = 1; +} + +message AddLinkRequest { + int32 session = 1; + Link link = 2; +} + +message AddLinkResponse { + bool result = 1; +} + +message EditLinkRequest { + int32 session = 1; + int32 node_one = 2; + int32 node_two = 3; + int32 interface_one = 4; + int32 interface_two = 5; + LinkOptions options = 6; +} + +message EditLinkResponse { + bool result = 1; +} + +message DeleteLinkRequest { + int32 session = 1; + int32 node_one = 2; + int32 node_two = 3; + int32 interface_one = 4; + int32 interface_two = 5; +} + +message DeleteLinkResponse { + bool result = 1; +} + +message GetHooksRequest { + int32 session = 1; +} + +message GetHooksResponse { + repeated Hook hooks = 1; +} + +message AddHookRequest { + int32 session = 1; + Hook hook = 2; +} + +message AddHookResponse { + bool result = 1; +} + +message GetMobilityConfigsRequest { + int32 session = 1; +} + +message GetMobilityConfigsResponse { + message MobilityConfig { + repeated ConfigGroup groups = 1; + } + map configs = 1; +} + +message GetMobilityConfigRequest { + int32 session = 1; + int32 id = 2; +} + +message GetMobilityConfigResponse { + repeated ConfigGroup groups = 1; +} + +message SetMobilityConfigRequest { + int32 session = 1; + int32 id = 2; + map config = 3; +} + +message SetMobilityConfigResponse { + bool result = 1; +} + +message MobilityActionRequest { + int32 session = 1; + int32 id = 2; + MobilityAction action = 3; +} + +message MobilityActionResponse { + bool result = 1; +} + +message GetServicesRequest { + +} + +message GetServicesResponse { + repeated Service services = 1; +} + +message GetServiceDefaultsRequest { + int32 session = 1; +} + +message GetServiceDefaultsResponse { + repeated ServiceDefaults defaults = 1; +} + +message SetServiceDefaultsRequest { + int32 session = 1; + repeated ServiceDefaults defaults = 2; +} + +message SetServiceDefaultsResponse { + bool result = 1; +} + +message GetNodeServiceRequest { + int32 session = 1; + int32 id = 2; + string service = 3; +} + +message GetNodeServiceResponse { + NodeServiceData service = 1; +} + +message GetNodeServiceFileRequest { + int32 session = 1; + int32 id = 2; + string service = 3; + string file = 4; +} + +message GetNodeServiceFileResponse { + bytes data = 1; +} + +message SetNodeServiceRequest { + int32 session = 1; + int32 id = 2; + string service = 3; + repeated string startup = 4; + repeated string validate = 5; + repeated string shutdown = 6; +} + +message SetNodeServiceResponse { + bool result = 1; +} + +message SetNodeServiceFileRequest { + int32 session = 1; + int32 id = 2; + string service = 3; + string file = 4; + bytes data = 5; +} + +message SetNodeServiceFileResponse { + bool result = 1; +} + +message ServiceActionRequest { + int32 session = 1; + int32 id = 2; + string service = 3; + ServiceAction action = 4; +} + +message ServiceActionResponse { + bool result = 1; +} + +message GetWlanConfigRequest { + int32 session = 1; + int32 id = 2; +} + +message GetWlanConfigResponse { + repeated ConfigGroup groups = 1; +} + +message SetWlanConfigRequest { + int32 session = 1; + int32 id = 2; + map config = 3; +} + +message SetWlanConfigResponse { + bool result = 1; +} + +message GetEmaneConfigRequest { + int32 session = 1; +} + +message GetEmaneConfigResponse { + repeated ConfigGroup groups = 1; +} + +message SetEmaneConfigRequest { + int32 session = 1; + map config = 2; +} + +message SetEmaneConfigResponse { + bool result = 1; +} + +message GetEmaneModelsRequest { + int32 session = 1; +} + +message GetEmaneModelsResponse { + repeated string models = 1; +} + +message GetEmaneModelConfigRequest { + int32 session = 1; + int32 id = 2; + int32 interface = 3; + string model = 4; +} + +message GetEmaneModelConfigResponse { + repeated ConfigGroup groups = 1; +} + +message SetEmaneModelConfigRequest { + int32 session = 1; + int32 id = 2; + int32 interface = 3; + string model = 4; + map config = 5; +} + +message SetEmaneModelConfigResponse { + bool result = 1; +} + +message GetEmaneModelConfigsRequest { + int32 session = 1; +} + +message GetEmaneModelConfigsResponse { + message ModelConfig { + string model = 1; + repeated ConfigGroup groups = 2; + } + map configs = 1; +} + +message SaveXmlRequest { + int32 session = 1; +} + +message SaveXmlResponse { + bytes data = 1; +} + +message OpenXmlRequest { + bytes data = 1; +} + +message OpenXmlResponse { + bool result = 1; + int32 session = 2; +} + +// data structures for messages below +enum MessageType { + MESSAGE_NONE = 0; + MESSAGE_ADD = 1; + MESSAGE_DELETE = 2; + MESSAGE_CRI = 4; + MESSAGE_LOCAL = 8; + MESSAGE_STRING = 16; + MESSAGE_TEXT = 32; + MESSAGE_TTY = 64; +} + +enum LinkType { + LINK_WIRELESS = 0; + LINK_WIRED = 1; +} + +enum SessionState { + STATE_NONE = 0; + STATE_DEFINITION = 1; + STATE_CONFIGURATION = 2; + STATE_INSTANTIATION = 3; + STATE_RUNTIME = 4; + STATE_DATACOLLECT = 5; + STATE_SHUTDOWN = 6; +} + +enum NodeType { + NODE_DEFAULT = 0; + NODE_PHYSICAL = 1; + NODE_TBD = 3; + NODE_SWITCH = 4; + NODE_HUB = 5; + NODE_WIRELESS_LAN = 6; + NODE_RJ45 = 7; + NODE_TUNNEL = 8; + NODE_KTUNNEL = 9; + NODE_EMANE = 10; + NODE_TAP_BRIDGE = 11; + NODE_PEER_TO_PEER = 12; + NODE_CONTROL_NET = 13; + NODE_EMANE_NET = 14; +} + +enum ServiceValidationMode { + VALIDATION_BLOCKING = 0; + VALIDATION_NON_BLOCKING = 1; + VALIDATION_TIMER = 2; +} + +enum ServiceAction { + SERVICE_START = 0; + SERVICE_STOP = 1; + SERVICE_RESTART = 2; + SERVICE_VALIDATE = 3; +} + +enum MobilityAction { + MOBILITY_START = 0; + MOBILITY_PAUSE = 1; + MOBILITY_STOP = 2; +} + +enum ExceptionLevel { + EXCEPTION_DEFAULT = 0; + EXCEPTION_FATAL = 1; + EXCEPTION_ERROR = 2; + EXCEPTION_WARNING = 3; + EXCEPTION_NOTICE = 4; +} + +message Hook { + SessionState state = 1; + string file = 2; + bytes data = 3; +} + +message ServiceDefaults { + string node_type = 1; + repeated string services = 2; +} + +message Service { + string group = 1; + string name = 2; +} + +message NodeServiceData { + repeated string executables = 1; + repeated string dependencies = 2; + repeated string dirs = 3; + repeated string configs = 4; + repeated string startup = 5; + repeated string validate = 6; + ServiceValidationMode validation_mode = 7; + int32 validation_timer = 8; + repeated string shutdown = 9; + string meta = 10; +} + +message ConfigGroup { + string name = 1; + repeated ConfigOption options = 2; +} + +message ConfigOption { + string label = 1; + string name = 2; + string value = 3; + int32 type = 4; + repeated string select = 5; +} + +message Session { + int32 id = 1; + SessionState state = 2; + repeated Node nodes = 3; + repeated Link links = 4; +} + +message SessionSummary { + int32 id = 1; + SessionState state = 2; + int32 nodes = 3; +} + +message Node { + int32 id = 1; + string name = 2; + NodeType type = 3; + string model = 4; + Position position = 5; + repeated string services = 6; + string emane = 7; + string icon = 8; + string opaque = 9; +} + +message Link { + int32 node_one = 1; + int32 node_two = 2; + LinkType type = 3; + Interface interface_one = 4; + Interface interface_two = 5; + LinkOptions options = 6; +} + +message LinkOptions { + string opaque = 1; + float jitter = 2; + string key = 3; + float mburst = 4; + float mer = 5; + float per = 6; + float bandwidth = 7; + float burst = 8; + float delay = 9; + float dup = 10; + bool unidirectional = 11; +} + +message Interface { + int32 id = 1; + string name = 2; + string mac = 3; + string ip4 = 4; + int32 ip4mask = 5; + string ip6 = 6; + int32 ip6mask = 7; + int32 netid = 8; + int32 flowid = 9; + int32 mtu = 10; +} + +message Position { + float x = 1; + float y = 2; + float z = 3; + float lat = 4; + float lon = 5; + float alt = 6; +} diff --git a/daemon/requirements.txt b/daemon/requirements.txt index ba137563..ae84c527 100644 --- a/daemon/requirements.txt +++ b/daemon/requirements.txt @@ -1,7 +1,5 @@ enum34==1.1.6 +futures==3.2.0 +grpcio==1.18.0 lxml==3.5.0 -mock==1.3.0 -pycco==0.5.1 -pytest==3.0.7 -pytest-cov==2.5.1 -pytest-runner==2.11.1 +six==1.12.0 diff --git a/daemon/scripts/core-daemon b/daemon/scripts/core-daemon old mode 100644 new mode 100755 index 34c516d3..a1c14cec --- a/daemon/scripts/core-daemon +++ b/daemon/scripts/core-daemon @@ -5,10 +5,11 @@ messages and instantiates emulated nodes and networks within the kernel. Various message handlers are defined and some support for sending messages. """ +import argparse import ConfigParser import logging -import optparse import sys +import threading import time from core import load_logging_config @@ -16,6 +17,7 @@ from core import constants from core import enumerations from core.corehandlers import CoreHandler from core.coreserver import CoreServer +from core.grpc.server import CoreGrpcServer from core.misc.utils import close_onexec load_logging_config() @@ -30,7 +32,7 @@ def banner(): logging.info("CORE daemon v.%s started %s", constants.COREDPY_VERSION, time.ctime()) -def cored(cfg, use_ovs): +def cored(cfg): """ Start the CoreServer object and enter the server loop. @@ -45,13 +47,20 @@ def cored(cfg, use_ovs): try: server = CoreServer((host, port), CoreHandler, cfg) - if use_ovs: + if cfg["ovs"] == "True": from core.netns.openvswitch import OVS_NODES server.coreemu.update_nodes(OVS_NODES) except: logging.exception("error starting main server on: %s:%s", host, port) sys.exit(1) + # initialize grpc api + if cfg["grpc"] == "True": + grpc_server = CoreGrpcServer(server.coreemu) + grpc_thread = threading.Thread(target=grpc_server.listen) + grpc_thread.daemon = True + grpc_thread.start() + close_onexec(server.fileno()) logging.info("server started, listening on: %s:%s", host, port) server.serve_forever() @@ -73,24 +82,24 @@ def get_merged_config(filename): "numthreads": "1", } - usagestr = "usage: %prog [-h] [options] [args]\n\n" + \ - "CORE daemon v.%s instantiates Linux network namespace " \ - "nodes." % constants.COREDPY_VERSION - parser = optparse.OptionParser(usage=usagestr) - parser.add_option("-f", "--configfile", dest="configfile", type="string", - help="read config from specified file; default = %s" % filename) - parser.add_option("-p", "--port", dest="port", type=int, - help="port number to listen on; default = %s" % defaults["port"]) - parser.add_option("-t", "--numthreads", dest="numthreads", type=int, - help="number of server threads; default = %s" % defaults["numthreads"]) + parser = argparse.ArgumentParser( + description="CORE daemon v.%s instantiates Linux network namespace nodes." % constants.COREDPY_VERSION) + parser.add_argument("-f", "--configfile", dest="configfile", + help="read config from specified file; default = %s" % filename) + parser.add_argument("-p", "--port", dest="port", type=int, + help="port number to listen on; default = %s" % defaults["port"]) + parser.add_argument("-n", "--numthreads", dest="numthreads", type=int, + help="number of server threads; default = %s" % defaults["numthreads"]) + parser.add_argument("--ovs", action="store_true", help="enable experimental ovs mode, default is false") + parser.add_argument("--grpc", action="store_true", help="enable grpc api, default is false") # parse command line options - options, args = parser.parse_args() + args = parser.parse_args() # read the config file - if options.configfile is not None: - filename = options.configfile - del options.configfile + if args.configfile is not None: + filename = args.configfile + del args.configfile cfg = ConfigParser.SafeConfigParser(defaults) cfg.read(filename) @@ -99,12 +108,12 @@ def get_merged_config(filename): cfg.add_section(section) # merge command line with config file - for opt in options.__dict__: - val = options.__dict__[opt] + for opt in args.__dict__: + val = args.__dict__[opt] if val is not None: - cfg.set(section, opt, val.__str__()) + cfg.set(section, opt, str(val)) - return dict(cfg.items(section)), args + return dict(cfg.items(section)) def main(): @@ -114,22 +123,14 @@ def main(): :return: nothing """ # get a configuration merged from config file and command-line arguments - cfg, args = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR) - for a in args: - logging.error("ignoring command line argument: %s", a) - + cfg = get_merged_config("%s/core.conf" % constants.CORE_CONF_DIR) banner() - # check if ovs flag was provided - use_ovs = len(sys.argv) == 2 and sys.argv[1] == "ovs" - try: - cored(cfg, use_ovs) + cored(cfg) except KeyboardInterrupt: logging.info("keyboard interrupt, stopping core daemon") - sys.exit(0) - if __name__ == "__main__": main() diff --git a/daemon/setup.py.in b/daemon/setup.py.in index efbb980d..89be649e 100644 --- a/daemon/setup.py.in +++ b/daemon/setup.py.in @@ -54,7 +54,7 @@ setup( data_files=data_files, scripts=glob.glob("scripts/*"), description="Python components of CORE", - url="http://www.nrl.navy.mil/itd/ncs/products/core", + url="https://github.com/coreemu/core", author="Boeing Research & Technology", author_email="core-dev@nrl.navy.mil", license="BSD", diff --git a/daemon/tests/conftest.py b/daemon/tests/conftest.py index d087e2e6..3bd32121 100644 --- a/daemon/tests/conftest.py +++ b/daemon/tests/conftest.py @@ -3,6 +3,8 @@ Unit test fixture module. """ import os +import threading +import time import pytest from mock.mock import MagicMock @@ -26,6 +28,8 @@ from core.enumerations import LinkTypes from core.enumerations import MessageFlags from core.enumerations import NodeTlvs from core.enumerations import NodeTypes +from core.grpc.client import InterfaceHelper +from core.grpc.server import CoreGrpcServer from core.misc import ipaddress from core.misc.ipaddress import MacAddress from core.service import ServiceManager @@ -190,7 +194,7 @@ class CoreServerTest(object): # set services for host nodes message = CoreConfMessage.create(0, [ - (ConfigTlvs.SESSION, str(self.session.session_id)), + (ConfigTlvs.SESSION, str(self.session.id)), (ConfigTlvs.OBJECT, "services"), (ConfigTlvs.TYPE, 0), (ConfigTlvs.DATA_TYPES, (10, 10, 10)), @@ -204,10 +208,23 @@ class CoreServerTest(object): self.server.server_close() +@pytest.fixture +def grpc_server(): + coremu = CoreEmu() + grpc_server = CoreGrpcServer(coremu) + thread = threading.Thread(target=grpc_server.listen) + thread.daemon = True + thread.start() + time.sleep(0.1) + yield grpc_server + coremu.shutdown() + grpc_server.server.stop(None) + + @pytest.fixture def session(): # use coreemu and create a session - coreemu = CoreEmu() + coreemu = CoreEmu(config={"emane_prefix": "/usr"}) session_fixture = coreemu.create_session() session_fixture.set_state(EventTypes.CONFIGURATION_STATE) assert os.path.exists(session_fixture.session_dir) @@ -233,6 +250,11 @@ def ip_prefixes(): return IpPrefixes(ip4_prefix="10.83.0.0/16") +@pytest.fixture(scope="module") +def interface_helper(): + return InterfaceHelper(ip4_prefix="10.83.0.0/16") + + @pytest.fixture() def cored(): # create and return server @@ -242,8 +264,6 @@ def cored(): # cleanup server.shutdown() - # - # cleanup services ServiceManager.services.clear() diff --git a/daemon/tests/test_emane.py b/daemon/tests/test_emane.py index 7720e285..1c77d8ea 100644 --- a/daemon/tests/test_emane.py +++ b/daemon/tests/test_emane.py @@ -1,6 +1,7 @@ """ Unit tests for testing CORE EMANE networks. """ +import os import pytest @@ -19,6 +20,7 @@ _EMANE_MODELS = [ EmaneCommEffectModel, EmaneTdmaModel, ] +_DIR = os.path.dirname(os.path.abspath(__file__)) class TestEmane: @@ -39,6 +41,12 @@ class TestEmane: ) emane_network.setposition(x=80, y=50) + # configure tdma + if model == EmaneTdmaModel: + session.emane.set_model_config(emane_network.objid, EmaneTdmaModel.name, { + "schedule": os.path.join(_DIR, "../examples/tdma/schedule.xml") + }) + # create nodes node_options = NodeOptions() node_options.set_position(150, 150) diff --git a/daemon/tests/test_grpc.py b/daemon/tests/test_grpc.py new file mode 100644 index 00000000..85a7910d --- /dev/null +++ b/daemon/tests/test_grpc.py @@ -0,0 +1,830 @@ +import time + +from Queue import Queue + +import grpc +import pytest + +from core.conf import ConfigShim +from core.data import EventData +from core.emane.ieee80211abg import EmaneIeee80211abgModel +from core.enumerations import NodeTypes, EventTypes, ConfigFlags, ExceptionLevels +from core.grpc import core_pb2 +from core.grpc.client import CoreGrpcClient +from core.mobility import BasicRangeModel, Ns2ScriptedMobility + + +class TestGrpc: + @pytest.mark.parametrize("session_id", [None, 6013]) + def test_create_session(self, grpc_server, session_id): + # given + client = CoreGrpcClient() + + # when + with client.context_connect(): + response = client.create_session(session_id) + + # then + assert isinstance(response.id, int) + assert isinstance(response.state, int) + session = grpc_server.coreemu.sessions.get(response.id) + assert session is not None + assert session.state == response.state + if session_id is not None: + assert response.id == session_id + assert session.id == session_id + + @pytest.mark.parametrize("session_id, expected", [ + (None, True), + (6013, False) + ]) + def test_delete_session(self, grpc_server, session_id, expected): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + if session_id is None: + session_id = session.id + + # then + with client.context_connect(): + response = client.delete_session(session_id) + + # then + assert response.result is expected + assert grpc_server.coreemu.sessions.get(session_id) is None + + def test_get_session(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + session.add_node() + session.set_state(EventTypes.DEFINITION_STATE) + + # then + with client.context_connect(): + response = client.get_session(session.id) + + # then + assert response.session.state == core_pb2.STATE_DEFINITION + assert len(response.session.nodes) == 1 + assert len(response.session.links) == 0 + + def test_get_sessions(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.get_sessions() + + # then + found_session = None + for current_session in response.sessions: + if current_session.id == session.id: + found_session = current_session + break + assert len(response.sessions) == 1 + assert found_session is not None + + def test_get_session_options(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.get_session_options(session.id) + + # then + assert len(response.groups) > 0 + + def test_get_session_location(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.get_session_location(session.id) + + # then + assert response.scale == 1.0 + assert response.position.x == 0 + assert response.position.y == 0 + assert response.position.z == 0 + assert response.position.lat == 0 + assert response.position.lon == 0 + assert response.position.alt == 0 + + def test_set_session_location(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + scale = 2 + xyz = (1, 1, 1) + lat_lon_alt = (1, 1, 1) + with client.context_connect(): + response = client.set_session_location( + session.id, + x=xyz[0], y=xyz[1], z=xyz[2], + lat=lat_lon_alt[0], lon=lat_lon_alt[1], alt=lat_lon_alt[2], + scale=scale + ) + + # then + assert response.result is True + assert session.location.refxyz == xyz + assert session.location.refscale == scale + assert session.location.refgeo == lat_lon_alt + + def test_set_session_options(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + option = "enablerj45" + value = "1" + with client.context_connect(): + response = client.set_session_options(session.id, {option: value}) + + # then + assert response.result is True + assert session.options.get_config(option) == value + config = session.options.get_configs() + assert len(config) > 0 + + def test_set_session_state(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.set_session_state(session.id, core_pb2.STATE_DEFINITION) + + # then + assert response.result is True + assert session.state == core_pb2.STATE_DEFINITION + + def test_add_node(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + node = core_pb2.Node() + response = client.add_node(session.id, node) + + # then + assert response.id is not None + assert session.get_object(response.id) is not None + + def test_get_node(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + + # then + with client.context_connect(): + response = client.get_node(session.id, node.objid) + + # then + assert response.node.id == node.objid + + @pytest.mark.parametrize("node_id, expected", [ + (1, True), + (2, False) + ]) + def test_edit_node(self, grpc_server, node_id, expected): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + + # then + x, y = 10, 10 + with client.context_connect(): + position = core_pb2.Position(x=x, y=y) + response = client.edit_node(session.id, node_id, position) + + # then + assert response.result is expected + if expected is True: + assert node.position.x == x + assert node.position.y == y + + @pytest.mark.parametrize("node_id, expected", [ + (1, True), + (2, False) + ]) + def test_delete_node(self, grpc_server, node_id, expected): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + + # then + with client.context_connect(): + response = client.delete_node(session.id, node_id) + + # then + assert response.result is expected + if expected is True: + with pytest.raises(KeyError): + assert session.get_object(node.objid) + + def test_get_hooks(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + file_name = "test" + file_data = "echo hello" + session.add_hook(EventTypes.RUNTIME_STATE.value, file_name, None, file_data) + + # then + with client.context_connect(): + response = client.get_hooks(session.id) + + # then + assert len(response.hooks) == 1 + hook = response.hooks[0] + assert hook.state == EventTypes.RUNTIME_STATE.value + assert hook.file == file_name + assert hook.data == file_data + + def test_add_hook(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + file_name = "test" + file_data = "echo hello" + with client.context_connect(): + response = client.add_hook(session.id, core_pb2.STATE_RUNTIME, file_name, file_data) + + # then + assert response.result is True + + def test_save_xml(self, grpc_server, tmpdir): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + tmp = tmpdir.join("text.xml") + + # then + with client.context_connect(): + client.save_xml(session.id, str(tmp)) + + # then + assert tmp.exists() + + def test_open_xml_hook(self, grpc_server, tmpdir): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + tmp = tmpdir.join("text.xml") + session.save_xml(str(tmp)) + + # then + with client.context_connect(): + response = client.open_xml(str(tmp)) + + # then + assert response.result is True + assert response.session is not None + + def test_get_node_links(self, grpc_server, ip_prefixes): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + switch = session.add_node(_type=NodeTypes.SWITCH) + node = session.add_node() + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, switch.objid, interface) + + # then + with client.context_connect(): + response = client.get_node_links(session.id, switch.objid) + + # then + assert len(response.links) == 1 + + def test_get_node_links_exception(self, grpc_server, ip_prefixes): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + switch = session.add_node(_type=NodeTypes.SWITCH) + node = session.add_node() + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, switch.objid, interface) + + # then + with pytest.raises(grpc.RpcError): + with client.context_connect(): + client.get_node_links(session.id, 3) + + def test_add_link(self, grpc_server, interface_helper): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + switch = session.add_node(_type=NodeTypes.SWITCH) + node = session.add_node() + assert len(switch.all_link_data(0)) == 0 + + # then + interface = interface_helper.create_interface(node.objid, 0) + with client.context_connect(): + response = client.add_link(session.id, node.objid, switch.objid, interface) + + # then + assert response.result is True + assert len(switch.all_link_data(0)) == 1 + + def test_add_link_exception(self, grpc_server, interface_helper): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + + # then + interface = interface_helper.create_interface(node.objid, 0) + with pytest.raises(grpc.RpcError): + with client.context_connect(): + client.add_link(session.id, 1, 3, interface) + + def test_edit_link(self, grpc_server, ip_prefixes): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + switch = session.add_node(_type=NodeTypes.SWITCH) + node = session.add_node() + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, switch.objid, interface) + options = core_pb2.LinkOptions(bandwidth=30000) + link = switch.all_link_data(0)[0] + assert options.bandwidth != link.bandwidth + + # then + with client.context_connect(): + response = client.edit_link(session.id, node.objid, switch.objid, options) + + # then + assert response.result is True + link = switch.all_link_data(0)[0] + assert options.bandwidth == link.bandwidth + + def test_delete_link(self, grpc_server, ip_prefixes): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node_one = session.add_node() + interface_one = ip_prefixes.create_interface(node_one) + node_two = session.add_node() + interface_two = ip_prefixes.create_interface(node_two) + session.add_link(node_one.objid, node_two.objid, interface_one, interface_two) + link_node = None + for node_id in session.objects: + node = session.objects[node_id] + if node.objid not in {node_one.objid, node_two.objid}: + link_node = node + break + assert len(link_node.all_link_data(0)) == 1 + + # then + with client.context_connect(): + response = client.delete_link( + session.id, node_one.objid, node_two.objid, interface_one.id, interface_two.id) + + # then + assert response.result is True + assert len(link_node.all_link_data(0)) == 0 + + def test_get_wlan_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + + # then + with client.context_connect(): + response = client.get_wlan_config(session.id, wlan.objid) + + # then + assert len(response.groups) > 0 + + def test_set_wlan_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + range_key = "range" + range_value = "300" + + # then + with client.context_connect(): + response = client.set_wlan_config(session.id, wlan.objid, {range_key: range_value}) + + # then + assert response.result is True + config = session.mobility.get_model_config(wlan.objid, BasicRangeModel.name) + assert config[range_key] == range_value + + def test_get_emane_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.get_emane_config(session.id) + + # then + assert len(response.groups) > 0 + + def test_set_emane_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + config_key = "platform_id_start" + config_value = "2" + + # then + with client.context_connect(): + response = client.set_emane_config(session.id, {config_key: config_value}) + + # then + assert response.result is True + config = session.emane.get_configs() + assert len(config) > 1 + assert config[config_key] == config_value + + def test_get_emane_model_configs(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + emane_network = session.create_emane_network( + model=EmaneIeee80211abgModel, + geo_reference=(47.57917, -122.13232, 2.00000) + ) + config_key = "platform_id_start" + config_value = "2" + session.emane.set_model_config(emane_network.objid, EmaneIeee80211abgModel.name, {config_key: config_value}) + + # then + with client.context_connect(): + response = client.get_emane_model_configs(session.id) + + # then + assert len(response.configs) == 1 + assert emane_network.objid in response.configs + + def test_set_emane_model_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + emane_network = session.create_emane_network( + model=EmaneIeee80211abgModel, + geo_reference=(47.57917, -122.13232, 2.00000) + ) + config_key = "bandwidth" + config_value = "900000" + + # then + with client.context_connect(): + response = client.set_emane_model_config( + session.id, emane_network.objid, EmaneIeee80211abgModel.name, {config_key: config_value}) + + # then + assert response.result is True + config = session.emane.get_model_config(emane_network.objid, EmaneIeee80211abgModel.name) + assert config[config_key] == config_value + + def test_get_emane_model_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + emane_network = session.create_emane_network( + model=EmaneIeee80211abgModel, + geo_reference=(47.57917, -122.13232, 2.00000) + ) + + # then + with client.context_connect(): + response = client.get_emane_model_config( + session.id, emane_network.objid, EmaneIeee80211abgModel.name) + + # then + assert len(response.groups) > 0 + + def test_get_emane_models(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.get_emane_models(session.id) + + # then + assert len(response.models) > 0 + + def test_get_mobility_configs(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + session.mobility.set_model_config(wlan.objid, Ns2ScriptedMobility.name, {}) + + # then + with client.context_connect(): + response = client.get_mobility_configs(session.id) + + # then + assert len(response.configs) > 0 + assert wlan.objid in response.configs + + def test_get_mobility_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + session.mobility.set_model_config(wlan.objid, Ns2ScriptedMobility.name, {}) + + # then + with client.context_connect(): + response = client.get_mobility_config(session.id, wlan.objid) + + # then + assert len(response.groups) > 0 + + def test_set_mobility_config(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + config_key = "refresh_ms" + config_value = "60" + + # then + with client.context_connect(): + response = client.set_mobility_config(session.id, wlan.objid, {config_key: config_value}) + + # then + assert response.result is True + config = session.mobility.get_model_config(wlan.objid, Ns2ScriptedMobility.name) + assert config[config_key] == config_value + + def test_mobility_action(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + session.mobility.set_model_config(wlan.objid, Ns2ScriptedMobility.name, {}) + session.instantiate() + + # then + with client.context_connect(): + response = client.mobility_action(session.id, wlan.objid, core_pb2.MOBILITY_STOP) + + # then + assert response.result is True + + def test_get_services(self, grpc_server): + # given + client = CoreGrpcClient() + + # then + with client.context_connect(): + response = client.get_services() + + # then + assert len(response.services) > 0 + + def test_get_service_defaults(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + + # then + with client.context_connect(): + response = client.get_service_defaults(session.id) + + # then + assert len(response.defaults) > 0 + + def test_set_service_defaults(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node_type = "test" + services = ["SSH"] + + # then + with client.context_connect(): + response = client.set_service_defaults(session.id, {node_type: services}) + + # then + assert response.result is True + assert session.services.default_services[node_type] == services + + def test_get_node_service(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + + # then + with client.context_connect(): + response = client.get_node_service(session.id, node.objid, "IPForward") + + # then + assert len(response.service.configs) > 0 + + def test_get_node_service_file(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + + # then + with client.context_connect(): + response = client.get_node_service_file(session.id, node.objid, "IPForward", "ipforward.sh") + + # then + assert response.data is not None + + def test_set_node_service(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + service_name = "IPForward" + validate = ("echo hello",) + + # then + with client.context_connect(): + response = client.set_node_service(session.id, node.objid, service_name, (), validate, ()) + + # then + assert response.result is True + service = session.services.get_service(node.objid, service_name, default_service=True) + assert service.validate == validate + + def test_set_node_service_file(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + service_name = "IPForward" + file_name = "ipforward.sh" + file_data = "echo hello" + + # then + with client.context_connect(): + response = client.set_node_service_file(session.id, node.objid, service_name, file_name, file_data) + + # then + assert response.result is True + service_file = session.services.get_service_file(node, service_name, file_name) + assert service_file.data == file_data + + def test_service_action(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + service_name = "IPForward" + + # then + with client.context_connect(): + response = client.service_action(session.id, node.objid, service_name, core_pb2.SERVICE_STOP) + + # then + assert response.result is True + + def test_node_events(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + node_data = node.data(message_type=0) + queue = Queue() + + def handle_event(event_data): + queue.put(event_data) + + # then + with client.context_connect(): + client.node_events(session.id, handle_event) + time.sleep(0.1) + session.broadcast_node(node_data) + + # then + queue.get(timeout=5) + + def test_link_events(self, grpc_server, ip_prefixes): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + wlan = session.add_node(_type=NodeTypes.WIRELESS_LAN) + node = session.add_node() + interface = ip_prefixes.create_interface(node) + session.add_link(node.objid, wlan.objid, interface) + link_data = wlan.all_link_data(0)[0] + queue = Queue() + + def handle_event(event_data): + queue.put(event_data) + + # then + with client.context_connect(): + client.link_events(session.id, handle_event) + time.sleep(0.1) + session.broadcast_link(link_data) + + # then + queue.get(timeout=5) + + def test_session_events(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + queue = Queue() + + def handle_event(event_data): + queue.put(event_data) + + # then + with client.context_connect(): + client.session_events(session.id, handle_event) + time.sleep(0.1) + event = EventData(event_type=EventTypes.RUNTIME_STATE.value, time="%s" % time.time()) + session.broadcast_event(event) + + # then + queue.get(timeout=5) + + def test_config_events(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + queue = Queue() + + def handle_event(event_data): + queue.put(event_data) + + # then + with client.context_connect(): + client.config_events(session.id, handle_event) + time.sleep(0.1) + session_config = session.options.get_configs() + config_data = ConfigShim.config_data(0, None, ConfigFlags.UPDATE.value, session.options, session_config) + session.broadcast_config(config_data) + + # then + queue.get(timeout=5) + + def test_exception_events(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + queue = Queue() + + def handle_event(event_data): + queue.put(event_data) + + # then + with client.context_connect(): + client.exception_events(session.id, handle_event) + time.sleep(0.1) + session.exception(ExceptionLevels.FATAL, "test", None, "exception message") + + # then + queue.get(timeout=5) + + def test_file_events(self, grpc_server): + # given + client = CoreGrpcClient() + session = grpc_server.coreemu.create_session() + node = session.add_node() + queue = Queue() + + def handle_event(event_data): + queue.put(event_data) + + # then + with client.context_connect(): + client.file_events(session.id, handle_event) + time.sleep(0.1) + file_data = session.services.get_service_file(node, "IPForward", "ipforward.sh") + session.broadcast_file(file_data) + + # then + queue.get(timeout=5) diff --git a/daemon/tests/test_links.py b/daemon/tests/test_links.py index 75475388..27289014 100644 --- a/daemon/tests/test_links.py +++ b/daemon/tests/test_links.py @@ -124,7 +124,6 @@ class TestLinks: session.add_link(node_one.objid, node_two.objid, interface_one, interface_two) assert node_one.netif(interface_one.id) assert node_two.netif(interface_two.id) - assert session.get_node_count() == 3 # when session.delete_link(node_one.objid, node_two.objid, interface_one.id, interface_two.id) @@ -132,7 +131,6 @@ class TestLinks: # then assert not node_one.netif(interface_one.id) assert not node_two.netif(interface_two.id) - assert session.get_node_count() == 2 def test_link_bandwidth(self, session, ip_prefixes): """ diff --git a/netns/setup.py.in b/netns/setup.py.in index ad53e41a..0866fe5c 100644 --- a/netns/setup.py.in +++ b/netns/setup.py.in @@ -36,7 +36,7 @@ setup( netns, vcmd ], - url="http://www.nrl.navy.mil/itd/ncs/products/core", + url="https://github.com/coreemu/core", author="Boeing Research & Technology", author_email="core-dev@nrl.navy.mil", license="BSD", diff --git a/ns3/corens3/obj.py b/ns3/corens3/obj.py index acb56e1c..10e7e614 100644 --- a/ns3/corens3/obj.py +++ b/ns3/corens3/obj.py @@ -360,11 +360,11 @@ class Ns3Session(Session): A Session that starts an ns-3 simulation thread. """ - def __init__(self, session_id, persistent=False, duration=600): + def __init__(self, _id, persistent=False, duration=600): self.duration = duration self.nodes = ns.network.NodeContainer() self.mobhelper = ns.mobility.MobilityHelper() - Session.__init__(self, session_id) + Session.__init__(self, _id) def run(self, vis=False): """ diff --git a/ns3/setup.py.in b/ns3/setup.py.in index 31f154f2..8335d43f 100644 --- a/ns3/setup.py.in +++ b/ns3/setup.py.in @@ -12,7 +12,7 @@ setup( ], data_files=[(_EXAMPLES_DIR, glob.glob("examples/*"))], description="Python ns-3 components of CORE", - url="http://www.nrl.navy.mil/itd/ncs/products/core", + url="https://github.com/coreemu/core", author="Boeing Research & Technology", author_email="core-dev@nrl.navy.mil", license="GPLv2",