added support for distributed EMANE 0.9.2

(Boeing r1884)
This commit is contained in:
ahrenholz 2014-09-29 19:07:25 +00:00
parent f081985433
commit 5359ff4fd4
3 changed files with 73 additions and 17 deletions

View file

@ -107,16 +107,21 @@ class Emane(ConfigurableManager):
v = self.EMANE092 v = self.EMANE092
return v, result.strip() return v, result.strip()
def initeventservice(self, filename=None): def initeventservice(self, filename=None, shutdown=False):
''' (Re-)initialize the EMANE Event service. The multicast group and/or ''' (Re-)initialize the EMANE Event service.
port may be configured, and can be changed via XML config file and an The multicast group and/or port may be configured.
environment variable pointing to that file. - For versions < 0.9.1 this can be changed via XML config file
and an environment variable pointing to that file.
- For version >= 0.9.1 this is passed into the EventService
constructor.
''' '''
if hasattr(self, 'service'): if hasattr(self, 'service'):
del self.service del self.service
self.service = None self.service = None
# EMANE 0.9.1+ does not require event service XML config # EMANE 0.9.1+ does not require event service XML config
if self.version >= self.EMANE091: if self.version >= self.EMANE091:
if shutdown:
return
values = self.getconfig(None, "emane", values = self.getconfig(None, "emane",
self.emane_config.getdefaultvalues())[1] self.emane_config.getdefaultvalues())[1]
group, port = self.emane_config.valueof('eventservicegroup', group, port = self.emane_config.valueof('eventservicegroup',
@ -129,8 +134,13 @@ class Emane(ConfigurableManager):
dev = self.emane_config.valueof('eventservicedevice', values) dev = self.emane_config.valueof('eventservicedevice', values)
# disabled otachannel for event service # disabled otachannel for event service
# only needed for e.g. antennaprofile events xmit by models # only needed for e.g. antennaprofile events xmit by models
try:
self.service = EventService(eventchannel=(group, int(port), dev), self.service = EventService(eventchannel=(group, int(port), dev),
otachannel=None) otachannel=None)
except Exception, e:
msg = "Error instantiating EMANE event service: %s" % e
self.session.exception(coreapi.CORE_EXCP_LEVEL_ERROR,
"Emane.initeventservice()", None, msg)
return True return True
if filename is not None: if filename is not None:
tmp = os.getenv(self.EVENTCFGVAR) tmp = os.getenv(self.EVENTCFGVAR)
@ -246,6 +256,13 @@ class Emane(ConfigurableManager):
self.addobj(obj) self.addobj(obj)
if len(self._objs) == 0: if len(self._objs) == 0:
return Emane.NOT_NEEDED return Emane.NOT_NEEDED
# control network bridge required for EMANE 0.9.2
# - needs to be configured before checkdistributed() for distributed
# - needs to exist when eventservice binds to it (initeventservice)
if self.version > self.EMANE091 and self.session.master:
ctrlnet = self.session.addremovectrlnet(remove=False,
conf_reqd=False)
self.distributedctrlnet(ctrlnet)
if self.checkdistributed(): if self.checkdistributed():
# we are slave, but haven't received a platformid yet # we are slave, but haven't received a platformid yet
cfgval = self.getconfig(None, self.emane_config._name, cfgval = self.getconfig(None, self.emane_config._name,
@ -427,12 +444,44 @@ class Emane(ConfigurableManager):
# assume self._objslock is already held here # assume self._objslock is already held here
if self.verbose: if self.verbose:
self.info("Emane.buildxml2()") self.info("Emane.buildxml2()")
# control network bridge needs to exist when eventservice binds to it # on master, control network bridge added earlier in startup()
ctrlnet = self.session.addremovectrlnet(remove=False, conf_reqd=False) ctrlnet = self.session.addremovectrlnet(remove=False, conf_reqd=False)
self.buildplatformxml2(ctrlnet) self.buildplatformxml2(ctrlnet)
self.buildnemxml() self.buildnemxml()
self.buildeventservicexml() self.buildeventservicexml()
def distributedctrlnet(self, ctrlnet):
''' Distributed EMANE requires multiple control network prefixes to
be configured. This generates configuration for slave control nets
using the default list of prefixes.
'''
session = self.session
if not session.master:
return # slave server
servers = session.broker.getserverlist()
if len(servers) < 2:
return # not distributed
prefix = session.cfg.get('controlnet')
prefix = getattr(session.options, 'controlnet', prefix)
prefixes = prefix.split()
if len(prefixes) >= len(servers):
return # normal Config messaging will distribute controlnets
# this generates a config message having controlnet prefix assignments
self.info("Setting up default controlnet prefixes for distributed " \
"(%d configured)" % len(prefixes))
prefixes = ctrlnet.DEFAULT_PREFIX
vals = "controlnet='%s'" % prefixes
tlvdata = ""
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OBJ,
"session")
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_TYPE, 0)
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES, vals)
rawmsg = coreapi.CoreConfMessage.pack(0, tlvdata)
msghdr = rawmsg[:coreapi.CoreMessage.hdrsiz]
msg = coreapi.CoreConfMessage(flags=0, hdr=msghdr,
data=rawmsg[coreapi.CoreMessage.hdrsiz:])
self.session.broker.handlemsg(msg)
def xmldoc(self, doctype): def xmldoc(self, doctype):
''' Returns an XML xml.minidom.Document with a DOCTYPE tag set to the ''' Returns an XML xml.minidom.Document with a DOCTYPE tag set to the
provided doctype string, and an initial element having the same provided doctype string, and an initial element having the same
@ -876,7 +925,7 @@ class Emane(ConfigurableManager):
# instantiation was previously delayed by self.setup() # instantiation was previously delayed by self.setup()
# returning Emane.NOT_READY # returning Emane.NOT_READY
h = None h = None
with session._handlerslock: with self.session._handlerslock:
for h in self.session._handlers: for h in self.session._handlers:
break break
self.session.instantiate(handler=h) self.session.instantiate(handler=h)
@ -920,7 +969,7 @@ class Emane(ConfigurableManager):
if self.service is not None: if self.service is not None:
self.service.breakloop() self.service.breakloop()
# reset the service, otherwise nextEvent won't work # reset the service, otherwise nextEvent won't work
self.initeventservice() self.initeventservice(shutdown=True)
if self.eventmonthread is not None: if self.eventmonthread is not None:
if self.version >= self.EMANE091: if self.version >= self.EMANE091:
self.eventmonthread._Thread__stop() self.eventmonthread._Thread__stop()

View file

@ -20,7 +20,7 @@ from core.coreobj import PyCoreNode
class CtrlNet(LxBrNet): class CtrlNet(LxBrNet):
policy = "ACCEPT" policy = "ACCEPT"
CTRLIF_IDX_BASE = 99 # base control interface index CTRLIF_IDX_BASE = 99 # base control interface index
DEFAULT_PREFIX = "172.16.0.0/24" DEFAULT_PREFIX = "172.16.0.0/24 172.16.1.0/24 172.16.2.0/24 172.16.3.0/24 172.16.4.0/24"
def __init__(self, session, objid = "ctrlnet", name = None, def __init__(self, session, objid = "ctrlnet", name = None,
verbose = False, netid = 1, prefix = None, verbose = False, netid = 1, prefix = None,

View file

@ -658,12 +658,14 @@ class Session(object):
''' Check if we have entered the shutdown state, when no running nodes ''' Check if we have entered the shutdown state, when no running nodes
and links remain. and links remain.
''' '''
with self._objslock: nc = self.getnodecount()
nc = len(self._objs)
# TODO: this doesn't consider slave server node counts # TODO: this doesn't consider slave server node counts
# wait for slave servers to enter SHUTDOWN state, then master session # wait for slave servers to enter SHUTDOWN state, then master session
# can enter SHUTDOWN # can enter SHUTDOWN
replies = () replies = ()
if self.getcfgitembool('verbose', False):
self.info("Session %d shutdown: %d nodes remaining" % \
(self.sessionid, nc))
if nc == 0: if nc == 0:
replies = self.setstate(state=coreapi.CORE_EVENT_SHUTDOWN_STATE, replies = self.setstate(state=coreapi.CORE_EVENT_SHUTDOWN_STATE,
info=True, sendevent=True, returnevent=True) info=True, sendevent=True, returnevent=True)
@ -743,8 +745,7 @@ class Session(object):
bridge to be added even if one has not been configured. bridge to be added even if one has not been configured.
''' '''
prefix = self.cfg.get('controlnet') prefix = self.cfg.get('controlnet')
if hasattr(self.options, 'controlnet'): prefix = getattr(self.options, 'controlnet', prefix)
prefix = self.options.controlnet
if not prefix: if not prefix:
if conf_reqd: if conf_reqd:
return None # no controlnet needed return None # no controlnet needed
@ -790,7 +791,11 @@ class Session(object):
servers.remove('localhost') servers.remove('localhost')
prefix = None prefix = None
for server_prefix in prefixes: for server_prefix in prefixes:
try:
server, p = server_prefix.split(':') server, p = server_prefix.split(':')
except ValueError:
server = ""
p = None
if server == servers[0]: if server == servers[0]:
prefix = p prefix = p
break break
@ -799,8 +804,11 @@ class Session(object):
servers[0] servers[0]
self.exception(coreapi.CORE_EXCP_LEVEL_ERROR, self.exception(coreapi.CORE_EXCP_LEVEL_ERROR,
"Session.addremovectrlnet()", None, msg) "Session.addremovectrlnet()", None, msg)
prefix = prefixes[0].split(':', 1)[1]
assign_address = False assign_address = False
try:
prefix = prefixes[0].split(':', 1)[1]
except IndexError:
prefix = prefixes[0]
else: else:
# with one prefix, only master gets a ctrlnet address # with one prefix, only master gets a ctrlnet address
assign_address = self.master assign_address = self.master
@ -1071,7 +1079,6 @@ class SessionConfig(ConfigurableManager, Configurable):
controlnets = value.split() controlnets = value.split()
if len(controlnets) < 2: if len(controlnets) < 2:
return # multiple controlnet prefixes do not exist return # multiple controlnet prefixes do not exist
servers = self.session.broker.getserverlist() servers = self.session.broker.getserverlist()
if len(servers) < 2: if len(servers) < 2:
return # not distributed return # not distributed