core-extra/daemon/core/session.py
ahrenholz dba5f31b8d (Boeing r1766)
fix bug where TunnelNodes not included in boot count, session doesn't enter
 the RUNTIME state
2013-09-05 17:32:50 +00:00

1117 lines
43 KiB
Python

#
# CORE
# Copyright (c)2010-2013 the Boeing Company.
# See the LICENSE file included in this distribution.
#
# authors: Tom Goff <thomas.goff@boeing.com>
# Jeff Ahrenholz <jeffrey.m.ahrenholz@boeing.com>
#
'''
session.py: defines the Session class used by the core-daemon daemon program
that manages a CORE session.
'''
import os, sys, tempfile, shutil, shlex, atexit, gc, pwd
import threading, time, random
from core.api import coreapi
if os.uname()[0] == "Linux":
from core.netns import nodes
from core.netns.vnet import GreTapBridge
elif os.uname()[0] == "FreeBSD":
from core.bsd import nodes
from core.emane import emane
from core.misc.utils import check_call, mutedetach, readfileintodict, \
filemunge, filedemunge
from core.conf import ConfigurableManager, Configurable
from core.location import CoreLocation
from core.service import CoreServices
from core.broker import CoreBroker
from core.mobility import MobilityManager
from core.sdt import Sdt
from core.misc.ipaddr import MacAddr
from core.misc.event import EventLoop
from core.constants import *
from core.xen import xenconfig
class Session(object):
# sessions that get automatically shutdown when the process
# terminates normally
__sessions = set()
''' CORE session manager.
'''
def __init__(self, sessionid = None, cfg = {}, server = None,
persistent = False):
if sessionid is None:
# try to keep this short since it's used to construct
# network interface names
pid = os.getpid()
sessionid = ((pid >> 16) ^
(pid & ((1 << 16) - 1)))
sessionid ^= ((id(self) >> 16) ^ (id(self) & ((1 << 16) - 1)))
sessionid &= 0xffff
self.sessionid = sessionid
self.sessiondir = os.path.join(tempfile.gettempdir(),
"pycore.%s" % self.sessionid)
os.mkdir(self.sessiondir)
self.name = None
self.filename = None
self.thumbnail = None
self.user = None
self.node_count = None
self._time = time.time()
self.evq = EventLoop()
# dict of objects: all nodes and nets
self._objs = {}
self._objslock = threading.Lock()
# dict of configurable objects
self._confobjs = {}
self._confobjslock = threading.Lock()
self._handlers = set()
self._handlerslock = threading.Lock()
self._hooks = {}
self.setstate(state=coreapi.CORE_EVENT_DEFINITION_STATE,
info=False, sendevent=False)
# dict of configuration items from /etc/core/core.conf config file
self.cfg = cfg
self.server = server
if not persistent:
self.addsession(self)
self.master = False
self.broker = CoreBroker(session=self, verbose=True)
self.location = CoreLocation(self)
self.mobility = MobilityManager(self)
self.services = CoreServices(self)
self.emane = emane.Emane(self)
self.xen = xenconfig.XenConfigManager(self)
self.sdt = Sdt(self)
# future parameters set by the GUI may go here
self.options = SessionConfig(self)
self.metadata = SessionMetaData(self)
@classmethod
def addsession(cls, session):
cls.__sessions.add(session)
@classmethod
def delsession(cls, session):
try:
cls.__sessions.remove(session)
except KeyError:
pass
@classmethod
def atexit(cls):
while cls.__sessions:
s = cls.__sessions.pop()
print >> sys.stderr, "WARNING: automatically shutting down " \
"non-persistent session %s" % s.sessionid
s.shutdown()
def __del__(self):
# note: there is no guarantee this will ever run
self.shutdown()
def shutdown(self):
''' Shut down all emulation objects and remove the session directory.
'''
self.emane.shutdown()
self.broker.shutdown()
self.sdt.shutdown()
self.delobjs()
preserve = False
if hasattr(self.options, 'preservedir'):
if self.options.preservedir == '1':
preserve = True
if not preserve:
shutil.rmtree(self.sessiondir, ignore_errors = True)
if self.server:
self.server.delsession(self)
self.delsession(self)
def isconnected(self):
''' Returns true if this session has a request handler.
'''
with self._handlerslock:
if len(self._handlers) == 0:
return False
else:
return True
def connect(self, handler):
''' Set the request handler for this session, making it connected.
'''
# the master flag will only be set after a GUI has connected with the
# handler, e.g. not during normal startup
if handler.master is True:
self.master = True
with self._handlerslock:
self._handlers.add(handler)
def disconnect(self, handler):
''' Disconnect a request handler from this session. Shutdown this
session if there is no running emulation.
'''
with self._handlerslock:
try:
self._handlers.remove(handler)
except KeyError:
raise ValueError, \
"Handler %s not associated with this session" % handler
num_handlers = len(self._handlers)
if num_handlers == 0:
# shut down this session unless we are instantiating, running,
# or collecting final data
if self.getstate() < coreapi.CORE_EVENT_INSTANTIATION_STATE or \
self.getstate() > coreapi.CORE_EVENT_DATACOLLECT_STATE:
self.shutdown()
def broadcast(self, src, msg):
''' Send Node and Link CORE API messages to all handlers connected to this session.
'''
self._handlerslock.acquire()
for handler in self._handlers:
if handler == src:
continue
if isinstance(msg, coreapi.CoreNodeMessage) or \
isinstance(msg, coreapi.CoreLinkMessage):
try:
handler.sendall(msg.rawmsg)
except Exception, e:
self.warn("sendall() error: %s" % e)
self._handlerslock.release()
def broadcastraw(self, src, data):
''' Broadcast raw data to all handlers except src.
'''
self._handlerslock.acquire()
for handler in self._handlers:
if handler == src:
continue
try:
handler.sendall(data)
except Exception, e:
self.warn("sendall() error: %s" % e)
self._handlerslock.release()
def gethandler(self):
''' Get one of the connected handlers, preferrably the master.
'''
with self._handlerslock:
if len(self._handlers) == 0:
return None
for handler in self._handlers:
if handler.master:
return handler
for handler in self._handlers:
return handler
def setstate(self, state, info = False, sendevent = False,
returnevent = False):
''' Set the session state. When info is true, log the state change
event using the session handler's info method. When sendevent is
true, generate a CORE API Event Message and send to the connected
entity.
'''
self._time = time.time()
self._state = state
replies = []
if not self.isconnected():
return replies
if info:
statename = coreapi.state_name(state)
self._handlerslock.acquire()
for handler in self._handlers:
handler.info("SESSION %s STATE %d: %s at %s" % \
(self.sessionid, state, statename, time.ctime()))
self._handlerslock.release()
self.writestate(state)
self.runhook(state)
if sendevent:
tlvdata = ""
tlvdata += coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
state)
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
# send Event Message to connected handlers (e.g. GUI)
try:
if returnevent:
replies.append(msg)
else:
self.broadcastraw(None, msg)
except Exception, e:
self.warn("Error sending Event Message: %s" % e)
# also inform slave servers
tmp = self.broker.handlerawmsg(msg)
return replies
def getstate(self):
''' Retrieve the current state of the session.
'''
return self._state
def writestate(self, state):
''' Write the current state to a state file in the session dir.
'''
try:
f = open(os.path.join(self.sessiondir, "state"), "w")
f.write("%d %s\n" % (state, coreapi.state_name(state)))
f.close()
except Exception, e:
self.warn("Error writing state file: %s" % e)
def runhook(self, state, hooks=None):
''' Run hook scripts upon changing states.
If hooks is not specified, run all hooks in the given state.
'''
if state not in self._hooks:
return
if hooks is None:
hooks = self._hooks[state]
for (filename, data) in hooks:
try:
f = open(os.path.join(self.sessiondir, filename), "w")
f.write(data)
f.close()
except Exception, e:
self.warn("Error writing hook '%s': %s" % (filename, e))
self.info("Running hook %s for state %s" % (filename, state))
try:
check_call(["/bin/sh", filename], cwd=self.sessiondir,
env=self.getenviron())
except Exception, e:
self.warn("Error running hook '%s' for state %s: %s" %
(filename, state, e))
def sethook(self, type, filename, srcname, data):
''' Store a hook from a received File Message.
'''
if srcname is not None:
raise NotImplementedError
(hookid, state) = type.split(':')[:2]
if not state.isdigit():
self.warn("Error setting hook having state '%s'" % state)
return
state = int(state)
hook = (filename, data)
if state not in self._hooks:
self._hooks[state] = [hook,]
else:
self._hooks[state] += hook
# immediately run a hook if it is in the current state
# (this allows hooks in the definition and configuration states)
if self.getstate() == state:
self.runhook(state, hooks = [hook,])
def delhooks(self):
''' Clear the hook scripts dict.
'''
self._hooks = {}
def getenviron(self, state=True):
''' Get an environment suitable for a subprocess.Popen call.
This is the current process environment with some session-specific
variables.
'''
env = os.environ.copy()
env['SESSION'] = "%s" % self.sessionid
env['SESSION_DIR'] = "%s" % self.sessiondir
env['SESSION_NAME'] = "%s" % self.name
env['SESSION_FILENAME'] = "%s" % self.filename
env['SESSION_USER'] = "%s" % self.user
env['SESSION_NODE_COUNT'] = "%s" % self.node_count
if state:
env['SESSION_STATE'] = "%s" % self.getstate()
try:
readfileintodict(os.path.join(CORE_CONF_DIR, "environment"), env)
except IOError:
pass
if self.user:
try:
readfileintodict(os.path.join('/home', self.user, ".core",
"environment"), env)
except IOError:
pass
return env
def setthumbnail(self, thumbfile):
''' Set the thumbnail filename. Move files from /tmp to session dir.
'''
if not os.path.exists(thumbfile):
self.thumbnail = None
return
dstfile = os.path.join(self.sessiondir, os.path.basename(thumbfile))
shutil.move(thumbfile, dstfile)
#print "thumbnail: %s -> %s" % (thumbfile, dstfile)
self.thumbnail = dstfile
def setuser(self, user):
''' Set the username for this session. Update the permissions of the
session dir to allow the user write access.
'''
if user is not None:
try:
uid = pwd.getpwnam(user).pw_uid
gid = os.stat(self.sessiondir).st_gid
os.chown(self.sessiondir, uid, gid)
except Exception, e:
self.warn("Failed to set permission on %s: %s" % (self.sessiondir, e))
self.user = user
def objs(self):
''' Return iterator over the emulation object dictionary.
'''
return self._objs.itervalues()
def getobjid(self):
''' Return a unique, random object id.
'''
self._objslock.acquire()
while True:
id = random.randint(1, 0xFFFF)
if id not in self._objs:
break
self._objslock.release()
return id
def addobj(self, cls, *clsargs, **clskwds):
''' Add an emulation object.
'''
obj = cls(self, *clsargs, **clskwds)
self._objslock.acquire()
if obj.objid in self._objs:
self._objslock.release()
obj.shutdown()
raise KeyError, "non-unique object id %s for %s" % (obj.objid, obj)
self._objs[obj.objid] = obj
self._objslock.release()
return obj
def obj(self, objid):
''' Get an emulation object.
'''
if objid not in self._objs:
raise KeyError, "unknown object id %s" % (objid)
return self._objs[objid]
def objbyname(self, name):
''' Get an emulation object using its name attribute.
'''
with self._objslock:
for obj in self.objs():
if hasattr(obj, "name") and obj.name == name:
return obj
raise KeyError, "unknown object with name %s" % (name)
def delobj(self, objid):
''' Remove an emulation object.
'''
self._objslock.acquire()
try:
o = self._objs.pop(objid)
except KeyError:
o = None
self._objslock.release()
if o:
o.shutdown()
del o
gc.collect()
# print "gc count:", gc.get_count()
# for o in gc.get_objects():
# if isinstance(o, PyCoreObj):
# print "XXX XXX XXX PyCoreObj:", o
# for r in gc.get_referrers(o):
# print "XXX XXX XXX referrer:", gc.get_referrers(o)
def delobjs(self):
''' Clear the _objs dictionary, and call each obj.shutdown() routine.
'''
self._objslock.acquire()
while self._objs:
k, o = self._objs.popitem()
o.shutdown()
self._objslock.release()
def writeobjs(self):
''' Write objects to a 'nodes' file in the session dir.
The 'nodes' file lists:
number, name, api-type, class-type
'''
try:
f = open(os.path.join(self.sessiondir, "nodes"), "w")
with self._objslock:
for objid in sorted(self._objs.keys()):
o = self._objs[objid]
f.write("%s %s %s %s\n" % (objid, o.name, o.apitype, type(o)))
f.close()
except Exception, e:
self.warn("Error writing nodes file: %s" % e)
def addconfobj(self, objname, type, callback):
''' Objects can register configuration objects that are included in
the Register Message and may be configured via the Configure
Message. The callback is invoked when receiving a Configure Message.
'''
if type not in coreapi.reg_tlvs:
raise Exception, "invalid configuration object type"
self._confobjslock.acquire()
self._confobjs[objname] = (type, callback)
self._confobjslock.release()
def confobj(self, objname, session, msg):
''' Invoke the callback for an object upon receipt of a Configure
Message for that object. A no-op if the object doesn't exist.
'''
replies = []
self._confobjslock.acquire()
if objname == "all":
for objname in self._confobjs:
(type, callback) = self._confobjs[objname]
reply = callback(session, msg)
if reply is not None:
replies.append(reply)
self._confobjslock.release()
return replies
if objname in self._confobjs:
(type, callback) = self._confobjs[objname]
self._confobjslock.release()
reply = callback(session, msg)
if reply is not None:
replies.append(reply)
return replies
else:
self.info("session object doesn't own model '%s', ignoring" % \
objname)
self._confobjslock.release()
return replies
def confobjs_to_tlvs(self):
''' Turn the configuration objects into a list of Register Message TLVs.
'''
tlvdata = ""
self._confobjslock.acquire()
for objname in self._confobjs:
(type, callback) = self._confobjs[objname]
# type must be in coreapi.reg_tlvs
tlvdata += coreapi.CoreRegTlv.pack(type, objname)
self._confobjslock.release()
return tlvdata
def info(self, msg):
''' Utility method for writing output to stdout.
'''
print msg
sys.stdout.flush()
def warn(self, msg):
''' Utility method for writing output to stderr.
'''
print >> sys.stderr, msg
sys.stderr.flush()
def dumpsession(self):
''' Debug print this session.
'''
self.info("session id=%s name=%s state=%s connected=%s" % \
(self.sessionid, self.name, self._state, self.isconnected()))
num = len(self._objs)
self.info(" file=%s thumb=%s nc=%s/%s" % \
(self.filename, self.thumbnail, self.node_count, num))
def exception(self, level, source, objid, text):
''' Generate an Exception Message
'''
vals = (objid, str(self.sessionid), level, source, time.ctime(), text)
types = ("NODE", "SESSION", "LEVEL", "SOURCE", "DATE", "TEXT")
tlvdata = ""
for (t,v) in zip(types, vals):
if v is not None:
tlvdata += coreapi.CoreExceptionTlv.pack(
eval("coreapi.CORE_TLV_EXCP_%s" % t), v)
msg = coreapi.CoreExceptionMessage.pack(0, tlvdata)
# send Exception Message to connected handlers (e.g. GUI)
self.broadcastraw(None, msg)
def getcfgitem(self, cfgname):
''' Return an entry from the configuration dictionary that comes from
command-line arguments and/or the core.conf config file.
'''
if cfgname not in self.cfg:
return None
else:
return self.cfg[cfgname]
def getcfgitembool(self, cfgname, defaultifnone = None):
''' Return a boolean entry from the configuration dictionary, may
return None if undefined.
'''
item = self.getcfgitem(cfgname)
if item is None:
return defaultifnone
return bool(item.lower() == "true")
def getcfgitemint(self, cfgname, defaultifnone = None):
''' Return an integer entry from the configuration dictionary, may
return None if undefined.
'''
item = self.getcfgitem(cfgname)
if item is None:
return defaultifnone
return int(item)
def instantiate(self, handler=None):
''' We have entered the instantiation state, invoke startup methods
of various managers and boot the nodes. Validate nodes and check
for transition to the runtime state.
'''
self.writeobjs()
# controlnet may be needed by some EMANE models
self.addremovectrlif(node=None, remove=False)
if self.emane.startup() == self.emane.NOT_READY:
return # instantiate() will be invoked again upon Emane.configure()
self.broker.startup()
self.mobility.startup()
# boot the services on each node
self.bootnodes(handler)
# allow time for processes to start
time.sleep(0.125)
self.validatenodes()
self.emane.poststartup()
# assume either all nodes have booted already, or there are some
# nodes on slave servers that will be booted and those servers will
# send a node status response message
self.checkruntime()
def checkruntime(self):
''' Check if we have entered the runtime state, that all nodes have been
started and the emulation is running. Start the event loop once we
have entered runtime (time=0).
'''
# this is called from instantiate() after receiving an event message
# for the instantiation state, and from the broker when distributed
# nodes have been started
if self.node_count is None:
return
if self.getstate() == coreapi.CORE_EVENT_RUNTIME_STATE:
return
session_node_count = int(self.node_count)
nc = 0
with self._objslock:
for obj in self.objs():
# these networks may be added by the daemon and are not
# considered in the GUI's node count
if isinstance(obj, (nodes.PtpNet, nodes.CtrlNet)):
continue
# on Linux, GreTapBridges are auto-created, not part of GUI's
# node count
if 'GreTapBridge' in globals():
if isinstance(obj, GreTapBridge) and \
not isinstance(obj, nodes.TunnelNode):
continue
nc += 1
# count booted nodes not emulated on this server
# TODO: let slave server determine RUNTIME and wait for Event Message
# broker.getbootocunt() counts all CoreNodes from status reponse
# messages, plus any remote WLANs; remote EMANE, hub, switch, etc.
# are already counted in self._objs
nc += self.broker.getbootcount()
self.info("Checking for runtime with %d of %d session nodes" % \
(nc, session_node_count))
if nc < session_node_count:
return # do not have information on all nodes yet
# information on all nodes has been received and they have been started
# enter the runtime state
# TODO: more sophisticated checks to verify that all nodes and networks
# are running
state = coreapi.CORE_EVENT_RUNTIME_STATE
self.evq.run()
self.setstate(state, info=True, sendevent=True)
def datacollect(self):
''' Tear down a running session. Stop the event loop and any running
nodes, and perform clean-up.
'''
self.evq.stop()
with self._objslock:
for obj in self.objs():
if isinstance(obj, nodes.PyCoreNode):
self.services.stopnodeservices(obj)
self.emane.shutdown()
self.updatectrlifhosts(remove=True)
self.addremovectrlif(node=None, remove=True)
# self.checkshutdown() is currently invoked from node delete handler
def checkshutdown(self):
''' Check if we have entered the shutdown state, when no running nodes
and links remain.
'''
with self._objslock:
nc = len(self._objs)
# TODO: this doesn't consider slave server node counts
# wait for slave servers to enter SHUTDOWN state, then master session
# can enter SHUTDOWN
replies = ()
if nc == 0:
replies = self.setstate(state=coreapi.CORE_EVENT_SHUTDOWN_STATE,
info=True, sendevent=True, returnevent=True)
self.sdt.shutdown()
return replies
def setmaster(self, handler):
''' Look for the specified handler and set our master flag
appropriately. Returns True if we are connected to the given
handler.
'''
with self._handlerslock:
for h in self._handlers:
if h != handler:
continue
self.master = h.master
return True
return False
def shortsessionid(self):
''' Return a shorter version of the session ID, appropriate for
interface names, where length may be limited.
'''
return (self.sessionid >> 8) ^ (self.sessionid & ((1 << 8) - 1))
def bootnodes(self, handler):
''' Invoke the boot() procedure for all nodes and send back node
messages to the GUI for node messages that had the status
request flag.
'''
#self.addremovectrlif(node=None, remove=False)
with self._objslock:
for n in self.objs():
if not isinstance(n, nodes.PyCoreNode):
continue
if isinstance(n, nodes.RJ45Node):
continue
# add a control interface if configured
self.addremovectrlif(node=n, remove=False)
n.boot()
nodenum = n.objid
if handler is None:
continue
if nodenum in handler.nodestatusreq:
tlvdata = ""
tlvdata += coreapi.CoreNodeTlv.pack(coreapi.CORE_TLV_NODE_NUMBER,
nodenum)
tlvdata += coreapi.CoreNodeTlv.pack(coreapi.CORE_TLV_NODE_EMUID,
n.objid)
reply = coreapi.CoreNodeMessage.pack(coreapi.CORE_API_ADD_FLAG \
| coreapi.CORE_API_LOC_FLAG,
tlvdata)
try:
handler.request.sendall(reply)
except Exception, e:
self.warn("sendall() error: %s" % e)
del handler.nodestatusreq[nodenum]
self.updatectrlifhosts()
def validatenodes(self):
with self._objslock:
for n in self.objs():
# TODO: this can be extended to validate everything
# such as vnoded process, bridges, etc.
if not isinstance(n, nodes.PyCoreNode):
continue
if isinstance(n, nodes.RJ45Node):
continue
n.validate()
def addremovectrlnet(self, remove=False):
''' Create a control network bridge as necessary.
When the remove flag is True, remove the bridge that connects control
interfaces.
'''
prefix = None
try:
if self.cfg['controlnet']:
prefix = self.cfg['controlnet']
except KeyError:
pass
if hasattr(self.options, 'controlnet'):
prefix = self.options.controlnet
if not prefix:
return None # no controlnet needed
# return any existing controlnet bridge
id = "ctrlnet"
try:
ctrlnet = self.obj(id)
if remove:
self.delobj(ctrlnet.objid)
return None
return ctrlnet
except KeyError:
if remove:
return None
# build a new controlnet bridge
updown_script = None
try:
if self.cfg['controlnet_updown_script']:
updown_script = self.cfg['controlnet_updown_script']
except KeyError:
pass
prefixes = prefix.split()
if len(prefixes) > 1:
assign_address = True
if self.master:
try:
prefix = prefixes[0].split(':', 1)[1]
except IndexError:
prefix = prefixes[0] # possibly only one server
else:
# slave servers have their name and localhost in the serverlist
servers = self.broker.getserverlist()
servers.remove('localhost')
prefix = None
for server_prefix in prefixes:
server, p = server_prefix.split(':')
if server == servers[0]:
prefix = p
break
if not prefix:
msg = "Control network prefix not found for server '%s'" % \
servers[0]
self.exception(coreapi.CORE_EXCP_LEVEL_ERROR,
"Session.addremovectrlnet()", None, msg)
prefix = prefixes[0].split(':', 1)[1]
assign_address = False
else:
# with one prefix, only master gets a ctrlnet address
assign_address = self.master
ctrlnet = self.addobj(cls=nodes.CtrlNet, objid=id, prefix=prefix,
assign_address=assign_address,
updown_script=updown_script)
# tunnels between controlnets will be built with Broker.addnettunnels()
self.broker.addnet(id)
for server in self.broker.getserverlist():
self.broker.addnodemap(server, id)
return ctrlnet
def addremovectrlif(self, node, remove=False):
''' Add a control interface to a node when a 'controlnet' prefix is
listed in the config file or session options. Uses
addremovectrlnet() to build or remove the control bridge.
'''
ctrlnet = self.addremovectrlnet(remove)
if ctrlnet is None:
return
if node is None:
return
ctrlip = node.objid
try:
addrlist = ["%s/%s" % (ctrlnet.prefix.addr(ctrlip),
ctrlnet.prefix.prefixlen)]
except ValueError:
msg = "Control interface not added to node %s. " % node.objid
msg += "Invalid control network prefix (%s). " % ctrlnet.prefix
msg += "A longer prefix length may be required for this many nodes."
node.exception(coreapi.CORE_EXCP_LEVEL_ERROR,
"Session.addremovectrlif()", msg)
return
ifi = node.newnetif(net = ctrlnet, ifindex = ctrlnet.CTRLIF_IDX_BASE,
ifname = "ctrl0", hwaddr = MacAddr.random(),
addrlist = addrlist)
node.netif(ifi).control = True
def updatectrlifhosts(self, remove=False):
''' Add the IP addresses of control interfaces to the /etc/hosts file.
'''
if not self.getcfgitembool('update_etc_hosts', False):
return
id = "ctrlnet"
try:
ctrlnet = self.obj(id)
except KeyError:
return
header = "CORE session %s host entries" % self.sessionid
if remove:
if self.getcfgitembool('verbose', False):
self.info("Removing /etc/hosts file entries.")
filedemunge('/etc/hosts', header)
return
entries = []
for ifc in ctrlnet.netifs():
name = ifc.node.name
for addr in ifc.addrlist:
entries.append("%s %s" % (addr.split('/')[0], ifc.node.name))
if self.getcfgitembool('verbose', False):
self.info("Adding %d /etc/hosts file entries." % len(entries))
filemunge('/etc/hosts', header, '\n'.join(entries) + '\n')
def runtime(self):
''' Return the current time we have been in the runtime state, or zero
if not in runtime.
'''
if self.getstate() == coreapi.CORE_EVENT_RUNTIME_STATE:
return time.time() - self._time
else:
return 0.0
def addevent(self, etime, node=None, name=None, data=None):
''' Add an event to the event queue, with a start time relative to the
start of the runtime state.
'''
etime = float(etime)
runtime = self.runtime()
if runtime > 0.0:
if time <= runtime:
self.warn("Could not schedule past event for time %s " \
"(run time is now %s)" % (time, runtime))
return
etime = etime - runtime
func = self.runevent
self.evq.add_event(etime, func, node=node, name=name, data=data)
if name is None:
name = ""
self.info("scheduled event %s at time %s data=%s" % \
(name, etime + runtime, data))
def runevent(self, node=None, name=None, data=None):
''' Run a scheduled event, executing commands in the data string.
'''
now = self.runtime()
if name is None:
name = ""
self.info("running event %s at time %s cmd=%s" % (name, now, data))
if node is None:
mutedetach(shlex.split(data))
else:
n = self.obj(node)
n.cmd(shlex.split(data), wait=False)
def sendobjs(self):
''' Return API messages that describe the current session.
'''
replies = []
nn = 0
# send node messages for node and network objects
with self._objslock:
for obj in self.objs():
msg = obj.tonodemsg(flags = coreapi.CORE_API_ADD_FLAG)
if msg is not None:
replies.append(msg)
nn += 1
nl = 0
# send link messages from net objects
with self._objslock:
for obj in self.objs():
linkmsgs = obj.tolinkmsgs(flags = coreapi.CORE_API_ADD_FLAG)
for msg in linkmsgs:
replies.append(msg)
nl += 1
# send model info
configs = self.mobility.getallconfigs()
configs += self.emane.getallconfigs()
for (nodenum, cls, values) in configs:
#cls = self.mobility._modelclsmap[conftype]
msg = cls.toconfmsg(flags=0, nodenum=nodenum,
typeflags=coreapi.CONF_TYPE_FLAGS_UPDATE,
values=values)
replies.append(msg)
# service customizations
svc_configs = self.services.getallconfigs()
for (nodenum, svc) in svc_configs:
opaque = "service:%s" % svc._name
tlvdata = ""
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_NODE,
nodenum)
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OPAQUE,
opaque)
tmp = coreapi.CoreConfMessage(flags=0, hdr="", data=tlvdata)
replies.append(self.services.configure_request(tmp))
for (filename, data) in self.services.getallfiles(svc):
flags = coreapi.CORE_API_ADD_FLAG
tlvdata = coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_NODE,
nodenum)
tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_NAME,
str(filename))
tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_TYPE,
opaque)
tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_DATA,
str(data))
replies.append(coreapi.CoreFileMessage.pack(flags, tlvdata))
# TODO: send location info
# replies.append(self.location.toconfmsg())
# send hook scripts
for state in sorted(self._hooks.keys()):
for (filename, data) in self._hooks[state]:
flags = coreapi.CORE_API_ADD_FLAG
tlvdata = coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_NAME,
str(filename))
tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_TYPE,
"hook:%s" % state)
tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_DATA,
str(data))
replies.append(coreapi.CoreFileMessage.pack(flags, tlvdata))
# send meta data
tmp = coreapi.CoreConfMessage(flags=0, hdr="", data="")
opts = self.options.configure_request(tmp,
typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE)
if opts:
replies.append(opts)
meta = self.metadata.configure_request(tmp,
typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE)
if meta:
replies.append(meta)
self.info("informing GUI about %d nodes and %d links" % (nn, nl))
return replies
class SessionConfig(ConfigurableManager, Configurable):
_name = 'session'
_type = coreapi.CORE_TLV_REG_UTILITY
_confmatrix = [
("controlnet", coreapi.CONF_DATA_TYPE_STRING, '', '',
'Control network'),
("enablerj45", coreapi.CONF_DATA_TYPE_BOOL, '1', 'On,Off',
'Enable RJ45s'),
("preservedir", coreapi.CONF_DATA_TYPE_BOOL, '0', 'On,Off',
'Preserve session dir'),
("enablesdt", coreapi.CONF_DATA_TYPE_BOOL, '0', 'On,Off',
'Enable SDT3D output'),
]
_confgroups = "Options:1-%d" % len(_confmatrix)
def __init__(self, session):
ConfigurableManager.__init__(self, session)
session.broker.handlers += (self.handledistributed, )
self.reset()
def reset(self):
defaults = self.getdefaultvalues()
for k in self.getnames():
# value may come from config file
v = self.session.getcfgitem(k)
if v is None:
v = self.valueof(k, defaults)
v = self.offontobool(v)
setattr(self, k, v)
def configure_values(self, msg, values):
return self.configure_values_keyvalues(msg, values, self,
self.getnames())
def configure_request(self, msg, typeflags = coreapi.CONF_TYPE_FLAGS_NONE):
nodenum = msg.gettlv(coreapi.CORE_TLV_CONF_NODE)
values = []
for k in self.getnames():
v = getattr(self, k)
if v is None:
v = ""
values.append("%s" % v)
return self.toconfmsg(0, nodenum, typeflags, values)
def handledistributed(self, msg):
''' Handle the session options config message as it has reached the
broker. Options requiring modification for distributed operation should
be handled here.
'''
if not self.session.master:
return
if msg.msgtype != coreapi.CORE_API_CONF_MSG or \
msg.gettlv(coreapi.CORE_TLV_CONF_OBJ) != "session":
return
values_str = msg.gettlv(coreapi.CORE_TLV_CONF_VALUES)
if values_str is None:
return
values = values_str.split('|')
if not self.haskeyvalues(values):
return
for v in values:
key, value = v.split('=', 1)
if key == "controlnet":
self.handledistributedcontrolnet(msg, values, values.index(v))
def handledistributedcontrolnet(self, msg, values, idx):
''' Modify Config Message if multiple control network prefixes are
defined. Map server names to prefixes and repack the message before
it is forwarded to slave servers.
'''
kv = values[idx]
key, value = kv.split('=', 1)
controlnets = value.split()
if len(controlnets) < 2:
return # multiple controlnet prefixes do not exist
servers = self.session.broker.getserverlist()
if len(servers) < 2:
return # not distributed
servers.remove("localhost")
servers.insert(0, "localhost") # master always gets first prefix
# create list of "server1:ctrlnet1 server2:ctrlnet2 ..."
controlnets = map(lambda(x): "%s:%s" % (x[0],x[1]),
zip(servers, controlnets))
values[idx] = "controlnet=%s" % (' '.join(controlnets))
values_str = '|'.join(values)
msg.tlvdata[coreapi.CORE_TLV_CONF_VALUES] = values_str
msg.repack()
class SessionMetaData(ConfigurableManager):
''' Metadata is simply stored in a configs[] dict. Key=value pairs are
passed in from configure messages destined to the "metadata" object.
The data is not otherwise interpreted or processed.
'''
_name = "metadata"
_type = coreapi.CORE_TLV_REG_UTILITY
def configure_values(self, msg, values):
if values is None:
return None
kvs = values.split('|')
for kv in kvs:
try:
(key, value) = kv.split('=', 1)
except ValueError:
raise ValueError, "invalid key in metdata: %s" % kv
self.additem(key, value)
return None
def configure_request(self, msg, typeflags = coreapi.CONF_TYPE_FLAGS_NONE):
nodenum = msg.gettlv(coreapi.CORE_TLV_CONF_NODE)
values_str = "|".join(map(lambda(k,v): "%s=%s" % (k,v), self.items()))
return self.toconfmsg(0, nodenum, typeflags, values_str)
def toconfmsg(self, flags, nodenum, typeflags, values_str):
tlvdata = ""
if nodenum is not None:
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_NODE,
nodenum)
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OBJ,
self._name)
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_TYPE,
typeflags)
datatypes = tuple( map(lambda(k,v): coreapi.CONF_DATA_TYPE_STRING,
self.items()) )
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_DATA_TYPES,
datatypes)
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES,
values_str)
msg = coreapi.CoreConfMessage.pack(flags, tlvdata)
return msg
def additem(self, key, value):
self.configs[key] = value
def items(self):
return self.configs.iteritems()
atexit.register(Session.atexit)