From 5359ff4fd43da5d0a9b8aca4c0fce95528168e4a Mon Sep 17 00:00:00 2001 From: ahrenholz Date: Mon, 29 Sep 2014 19:07:25 +0000 Subject: [PATCH] added support for distributed EMANE 0.9.2 (Boeing r1884) --- daemon/core/emane/emane.py | 67 +++++++++++++++++++++++++++++++++----- daemon/core/netns/nodes.py | 2 +- daemon/core/session.py | 21 ++++++++---- 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/daemon/core/emane/emane.py b/daemon/core/emane/emane.py index 44e1be78..ff0698fa 100644 --- a/daemon/core/emane/emane.py +++ b/daemon/core/emane/emane.py @@ -107,16 +107,21 @@ class Emane(ConfigurableManager): v = self.EMANE092 return v, result.strip() - def initeventservice(self, filename=None): - ''' (Re-)initialize the EMANE Event service. The multicast group and/or - port may be configured, and can be changed via XML config file and an - environment variable pointing to that file. + def initeventservice(self, filename=None, shutdown=False): + ''' (Re-)initialize the EMANE Event service. + The multicast group and/or port may be configured. + - For versions < 0.9.1 this can be changed via XML config file + and an environment variable pointing to that file. + - For version >= 0.9.1 this is passed into the EventService + constructor. ''' if hasattr(self, 'service'): del self.service self.service = None # EMANE 0.9.1+ does not require event service XML config if self.version >= self.EMANE091: + if shutdown: + return values = self.getconfig(None, "emane", self.emane_config.getdefaultvalues())[1] group, port = self.emane_config.valueof('eventservicegroup', @@ -129,8 +134,13 @@ class Emane(ConfigurableManager): dev = self.emane_config.valueof('eventservicedevice', values) # disabled otachannel for event service # only needed for e.g. antennaprofile events xmit by models - self.service = EventService(eventchannel=(group, int(port), dev), + try: + self.service = EventService(eventchannel=(group, int(port), dev), otachannel=None) + except Exception, e: + msg = "Error instantiating EMANE event service: %s" % e + self.session.exception(coreapi.CORE_EXCP_LEVEL_ERROR, + "Emane.initeventservice()", None, msg) return True if filename is not None: tmp = os.getenv(self.EVENTCFGVAR) @@ -246,6 +256,13 @@ class Emane(ConfigurableManager): self.addobj(obj) if len(self._objs) == 0: return Emane.NOT_NEEDED + # control network bridge required for EMANE 0.9.2 + # - needs to be configured before checkdistributed() for distributed + # - needs to exist when eventservice binds to it (initeventservice) + if self.version > self.EMANE091 and self.session.master: + ctrlnet = self.session.addremovectrlnet(remove=False, + conf_reqd=False) + self.distributedctrlnet(ctrlnet) if self.checkdistributed(): # we are slave, but haven't received a platformid yet cfgval = self.getconfig(None, self.emane_config._name, @@ -427,12 +444,44 @@ class Emane(ConfigurableManager): # assume self._objslock is already held here if self.verbose: self.info("Emane.buildxml2()") - # control network bridge needs to exist when eventservice binds to it + # on master, control network bridge added earlier in startup() ctrlnet = self.session.addremovectrlnet(remove=False, conf_reqd=False) self.buildplatformxml2(ctrlnet) self.buildnemxml() self.buildeventservicexml() + def distributedctrlnet(self, ctrlnet): + ''' Distributed EMANE requires multiple control network prefixes to + be configured. This generates configuration for slave control nets + using the default list of prefixes. + ''' + session = self.session + if not session.master: + return # slave server + servers = session.broker.getserverlist() + if len(servers) < 2: + return # not distributed + prefix = session.cfg.get('controlnet') + prefix = getattr(session.options, 'controlnet', prefix) + prefixes = prefix.split() + if len(prefixes) >= len(servers): + return # normal Config messaging will distribute controlnets + # this generates a config message having controlnet prefix assignments + self.info("Setting up default controlnet prefixes for distributed " \ + "(%d configured)" % len(prefixes)) + prefixes = ctrlnet.DEFAULT_PREFIX + vals = "controlnet='%s'" % prefixes + tlvdata = "" + tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OBJ, + "session") + tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_TYPE, 0) + tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES, vals) + rawmsg = coreapi.CoreConfMessage.pack(0, tlvdata) + msghdr = rawmsg[:coreapi.CoreMessage.hdrsiz] + msg = coreapi.CoreConfMessage(flags=0, hdr=msghdr, + data=rawmsg[coreapi.CoreMessage.hdrsiz:]) + self.session.broker.handlemsg(msg) + def xmldoc(self, doctype): ''' Returns an XML xml.minidom.Document with a DOCTYPE tag set to the provided doctype string, and an initial element having the same @@ -876,7 +925,7 @@ class Emane(ConfigurableManager): # instantiation was previously delayed by self.setup() # returning Emane.NOT_READY h = None - with session._handlerslock: + with self.session._handlerslock: for h in self.session._handlers: break self.session.instantiate(handler=h) @@ -920,7 +969,7 @@ class Emane(ConfigurableManager): if self.service is not None: self.service.breakloop() # reset the service, otherwise nextEvent won't work - self.initeventservice() + self.initeventservice(shutdown=True) if self.eventmonthread is not None: if self.version >= self.EMANE091: self.eventmonthread._Thread__stop() @@ -1260,4 +1309,4 @@ class EmaneGlobalModel(EmaneModel): _confmatrix = _confmatrix_platform + _confmatrix_nem _confgroups = "Platform Attributes:1-%d|NEM Parameters:%d-%d" % \ (len(_confmatrix_platform), len(_confmatrix_platform) + 1, - len(_confmatrix)) \ No newline at end of file + len(_confmatrix)) diff --git a/daemon/core/netns/nodes.py b/daemon/core/netns/nodes.py index 50b453b3..73edddef 100644 --- a/daemon/core/netns/nodes.py +++ b/daemon/core/netns/nodes.py @@ -20,7 +20,7 @@ from core.coreobj import PyCoreNode class CtrlNet(LxBrNet): policy = "ACCEPT" CTRLIF_IDX_BASE = 99 # base control interface index - DEFAULT_PREFIX = "172.16.0.0/24" + DEFAULT_PREFIX = "172.16.0.0/24 172.16.1.0/24 172.16.2.0/24 172.16.3.0/24 172.16.4.0/24" def __init__(self, session, objid = "ctrlnet", name = None, verbose = False, netid = 1, prefix = None, diff --git a/daemon/core/session.py b/daemon/core/session.py index f1ea1c92..8a77ea42 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -658,12 +658,14 @@ class Session(object): ''' Check if we have entered the shutdown state, when no running nodes and links remain. ''' - with self._objslock: - nc = len(self._objs) + nc = self.getnodecount() # TODO: this doesn't consider slave server node counts # wait for slave servers to enter SHUTDOWN state, then master session # can enter SHUTDOWN replies = () + if self.getcfgitembool('verbose', False): + self.info("Session %d shutdown: %d nodes remaining" % \ + (self.sessionid, nc)) if nc == 0: replies = self.setstate(state=coreapi.CORE_EVENT_SHUTDOWN_STATE, info=True, sendevent=True, returnevent=True) @@ -743,8 +745,7 @@ class Session(object): bridge to be added even if one has not been configured. ''' prefix = self.cfg.get('controlnet') - if hasattr(self.options, 'controlnet'): - prefix = self.options.controlnet + prefix = getattr(self.options, 'controlnet', prefix) if not prefix: if conf_reqd: return None # no controlnet needed @@ -790,7 +791,11 @@ class Session(object): servers.remove('localhost') prefix = None for server_prefix in prefixes: - server, p = server_prefix.split(':') + try: + server, p = server_prefix.split(':') + except ValueError: + server = "" + p = None if server == servers[0]: prefix = p break @@ -799,8 +804,11 @@ class Session(object): servers[0] self.exception(coreapi.CORE_EXCP_LEVEL_ERROR, "Session.addremovectrlnet()", None, msg) - prefix = prefixes[0].split(':', 1)[1] assign_address = False + try: + prefix = prefixes[0].split(':', 1)[1] + except IndexError: + prefix = prefixes[0] else: # with one prefix, only master gets a ctrlnet address assign_address = self.master @@ -1071,7 +1079,6 @@ class SessionConfig(ConfigurableManager, Configurable): controlnets = value.split() if len(controlnets) < 2: return # multiple controlnet prefixes do not exist - servers = self.session.broker.getserverlist() if len(servers) < 2: return # not distributed