# # CORE # Copyright (c)2010-2013 the Boeing Company. # See the LICENSE file included in this distribution. # # authors: Tom Goff # Jeff Ahrenholz # ''' session.py: defines the Session class used by the core-daemon daemon program that manages a CORE session. ''' import os, sys, tempfile, shutil, shlex, atexit, gc, pwd import threading, time, random from core.api import coreapi if os.uname()[0] == "Linux": from core.netns import nodes from core.netns.vnet import GreTapBridge elif os.uname()[0] == "FreeBSD": from core.bsd import nodes from core.emane import emane from core.misc.utils import check_call, mutedetach, readfileintodict, \ filemunge, filedemunge from core.conf import ConfigurableManager, Configurable from core.location import CoreLocation from core.service import CoreServices from core.broker import CoreBroker from core.mobility import MobilityManager from core.sdt import Sdt from core.misc.ipaddr import MacAddr from core.misc.event import EventLoop from core.constants import * from core.xen import xenconfig class Session(object): # sessions that get automatically shutdown when the process # terminates normally __sessions = set() ''' CORE session manager. ''' def __init__(self, sessionid = None, cfg = {}, server = None, persistent = False): if sessionid is None: # try to keep this short since it's used to construct # network interface names pid = os.getpid() sessionid = ((pid >> 16) ^ (pid & ((1 << 16) - 1))) sessionid ^= ((id(self) >> 16) ^ (id(self) & ((1 << 16) - 1))) sessionid &= 0xffff self.sessionid = sessionid self.sessiondir = os.path.join(tempfile.gettempdir(), "pycore.%s" % self.sessionid) os.mkdir(self.sessiondir) self.name = None self.filename = None self.thumbnail = None self.user = None self.node_count = None self._time = time.time() self.evq = EventLoop() # dict of objects: all nodes and nets self._objs = {} self._objslock = threading.Lock() # dict of configurable objects self._confobjs = {} self._confobjslock = threading.Lock() self._handlers = set() self._handlerslock = threading.Lock() self._hooks = {} self.setstate(state=coreapi.CORE_EVENT_DEFINITION_STATE, info=False, sendevent=False) # dict of configuration items from /etc/core/core.conf config file self.cfg = cfg self.server = server if not persistent: self.addsession(self) self.master = False self.broker = CoreBroker(session=self, verbose=True) self.location = CoreLocation(self) self.mobility = MobilityManager(self) self.services = CoreServices(self) self.emane = emane.Emane(self) self.xen = xenconfig.XenConfigManager(self) self.sdt = Sdt(self) # future parameters set by the GUI may go here self.options = SessionConfig(self) self.metadata = SessionMetaData(self) @classmethod def addsession(cls, session): cls.__sessions.add(session) @classmethod def delsession(cls, session): try: cls.__sessions.remove(session) except KeyError: pass @classmethod def atexit(cls): while cls.__sessions: s = cls.__sessions.pop() print >> sys.stderr, "WARNING: automatically shutting down " \ "non-persistent session %s" % s.sessionid s.shutdown() def __del__(self): # note: there is no guarantee this will ever run self.shutdown() def shutdown(self): ''' Shut down all emulation objects and remove the session directory. ''' self.emane.shutdown() self.broker.shutdown() self.sdt.shutdown() self.delobjs() preserve = False if hasattr(self.options, 'preservedir'): if self.options.preservedir == '1': preserve = True if not preserve: shutil.rmtree(self.sessiondir, ignore_errors = True) if self.server: self.server.delsession(self) self.delsession(self) def isconnected(self): ''' Returns true if this session has a request handler. ''' with self._handlerslock: if len(self._handlers) == 0: return False else: return True def connect(self, handler): ''' Set the request handler for this session, making it connected. ''' # the master flag will only be set after a GUI has connected with the # handler, e.g. not during normal startup if handler.master is True: self.master = True with self._handlerslock: self._handlers.add(handler) def disconnect(self, handler): ''' Disconnect a request handler from this session. Shutdown this session if there is no running emulation. ''' with self._handlerslock: try: self._handlers.remove(handler) except KeyError: raise ValueError, \ "Handler %s not associated with this session" % handler num_handlers = len(self._handlers) if num_handlers == 0: # shut down this session unless we are instantiating, running, # or collecting final data if self.getstate() < coreapi.CORE_EVENT_INSTANTIATION_STATE or \ self.getstate() > coreapi.CORE_EVENT_DATACOLLECT_STATE: self.shutdown() def broadcast(self, src, msg): ''' Send Node and Link CORE API messages to all handlers connected to this session. ''' self._handlerslock.acquire() for handler in self._handlers: if handler == src: continue if isinstance(msg, coreapi.CoreNodeMessage) or \ isinstance(msg, coreapi.CoreLinkMessage): try: handler.sendall(msg.rawmsg) except Exception, e: self.warn("sendall() error: %s" % e) self._handlerslock.release() def broadcastraw(self, src, data): ''' Broadcast raw data to all handlers except src. ''' self._handlerslock.acquire() for handler in self._handlers: if handler == src: continue try: handler.sendall(data) except Exception, e: self.warn("sendall() error: %s" % e) self._handlerslock.release() def gethandler(self): ''' Get one of the connected handlers, preferrably the master. ''' with self._handlerslock: if len(self._handlers) == 0: return None for handler in self._handlers: if handler.master: return handler for handler in self._handlers: return handler def setstate(self, state, info = False, sendevent = False, returnevent = False): ''' Set the session state. When info is true, log the state change event using the session handler's info method. When sendevent is true, generate a CORE API Event Message and send to the connected entity. ''' self._time = time.time() self._state = state replies = [] if not self.isconnected(): return replies if info: statename = coreapi.state_name(state) self._handlerslock.acquire() for handler in self._handlers: handler.info("SESSION %s STATE %d: %s at %s" % \ (self.sessionid, state, statename, time.ctime())) self._handlerslock.release() self.writestate(state) self.runhook(state) if sendevent: tlvdata = "" tlvdata += coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE, state) msg = coreapi.CoreEventMessage.pack(0, tlvdata) # send Event Message to connected handlers (e.g. GUI) try: if returnevent: replies.append(msg) else: self.broadcastraw(None, msg) except Exception, e: self.warn("Error sending Event Message: %s" % e) # also inform slave servers tmp = self.broker.handlerawmsg(msg) return replies def getstate(self): ''' Retrieve the current state of the session. ''' return self._state def writestate(self, state): ''' Write the current state to a state file in the session dir. ''' try: f = open(os.path.join(self.sessiondir, "state"), "w") f.write("%d %s\n" % (state, coreapi.state_name(state))) f.close() except Exception, e: self.warn("Error writing state file: %s" % e) def runhook(self, state, hooks=None): ''' Run hook scripts upon changing states. If hooks is not specified, run all hooks in the given state. ''' if state not in self._hooks: return if hooks is None: hooks = self._hooks[state] for (filename, data) in hooks: try: f = open(os.path.join(self.sessiondir, filename), "w") f.write(data) f.close() except Exception, e: self.warn("Error writing hook '%s': %s" % (filename, e)) self.info("Running hook %s for state %s" % (filename, state)) try: check_call(["/bin/sh", filename], cwd=self.sessiondir, env=self.getenviron()) except Exception, e: self.warn("Error running hook '%s' for state %s: %s" % (filename, state, e)) def sethook(self, type, filename, srcname, data): ''' Store a hook from a received File Message. ''' if srcname is not None: raise NotImplementedError (hookid, state) = type.split(':')[:2] if not state.isdigit(): self.warn("Error setting hook having state '%s'" % state) return state = int(state) hook = (filename, data) if state not in self._hooks: self._hooks[state] = [hook,] else: self._hooks[state] += hook # immediately run a hook if it is in the current state # (this allows hooks in the definition and configuration states) if self.getstate() == state: self.runhook(state, hooks = [hook,]) def delhooks(self): ''' Clear the hook scripts dict. ''' self._hooks = {} def getenviron(self, state=True): ''' Get an environment suitable for a subprocess.Popen call. This is the current process environment with some session-specific variables. ''' env = os.environ.copy() env['SESSION'] = "%s" % self.sessionid env['SESSION_DIR'] = "%s" % self.sessiondir env['SESSION_NAME'] = "%s" % self.name env['SESSION_FILENAME'] = "%s" % self.filename env['SESSION_USER'] = "%s" % self.user env['SESSION_NODE_COUNT'] = "%s" % self.node_count if state: env['SESSION_STATE'] = "%s" % self.getstate() try: readfileintodict(os.path.join(CORE_CONF_DIR, "environment"), env) except IOError: pass if self.user: try: readfileintodict(os.path.join('/home', self.user, ".core", "environment"), env) except IOError: pass return env def setthumbnail(self, thumbfile): ''' Set the thumbnail filename. Move files from /tmp to session dir. ''' if not os.path.exists(thumbfile): self.thumbnail = None return dstfile = os.path.join(self.sessiondir, os.path.basename(thumbfile)) shutil.move(thumbfile, dstfile) #print "thumbnail: %s -> %s" % (thumbfile, dstfile) self.thumbnail = dstfile def setuser(self, user): ''' Set the username for this session. Update the permissions of the session dir to allow the user write access. ''' if user is not None: try: uid = pwd.getpwnam(user).pw_uid gid = os.stat(self.sessiondir).st_gid os.chown(self.sessiondir, uid, gid) except Exception, e: self.warn("Failed to set permission on %s: %s" % (self.sessiondir, e)) self.user = user def objs(self): ''' Return iterator over the emulation object dictionary. ''' return self._objs.itervalues() def getobjid(self): ''' Return a unique, random object id. ''' self._objslock.acquire() while True: id = random.randint(1, 0xFFFF) if id not in self._objs: break self._objslock.release() return id def addobj(self, cls, *clsargs, **clskwds): ''' Add an emulation object. ''' obj = cls(self, *clsargs, **clskwds) self._objslock.acquire() if obj.objid in self._objs: self._objslock.release() obj.shutdown() raise KeyError, "non-unique object id %s for %s" % (obj.objid, obj) self._objs[obj.objid] = obj self._objslock.release() return obj def obj(self, objid): ''' Get an emulation object. ''' if objid not in self._objs: raise KeyError, "unknown object id %s" % (objid) return self._objs[objid] def objbyname(self, name): ''' Get an emulation object using its name attribute. ''' with self._objslock: for obj in self.objs(): if hasattr(obj, "name") and obj.name == name: return obj raise KeyError, "unknown object with name %s" % (name) def delobj(self, objid): ''' Remove an emulation object. ''' self._objslock.acquire() try: o = self._objs.pop(objid) except KeyError: o = None self._objslock.release() if o: o.shutdown() del o gc.collect() # print "gc count:", gc.get_count() # for o in gc.get_objects(): # if isinstance(o, PyCoreObj): # print "XXX XXX XXX PyCoreObj:", o # for r in gc.get_referrers(o): # print "XXX XXX XXX referrer:", gc.get_referrers(o) def delobjs(self): ''' Clear the _objs dictionary, and call each obj.shutdown() routine. ''' self._objslock.acquire() while self._objs: k, o = self._objs.popitem() o.shutdown() self._objslock.release() def writeobjs(self): ''' Write objects to a 'nodes' file in the session dir. The 'nodes' file lists: number, name, api-type, class-type ''' try: f = open(os.path.join(self.sessiondir, "nodes"), "w") with self._objslock: for objid in sorted(self._objs.keys()): o = self._objs[objid] f.write("%s %s %s %s\n" % (objid, o.name, o.apitype, type(o))) f.close() except Exception, e: self.warn("Error writing nodes file: %s" % e) def addconfobj(self, objname, type, callback): ''' Objects can register configuration objects that are included in the Register Message and may be configured via the Configure Message. The callback is invoked when receiving a Configure Message. ''' if type not in coreapi.reg_tlvs: raise Exception, "invalid configuration object type" self._confobjslock.acquire() self._confobjs[objname] = (type, callback) self._confobjslock.release() def confobj(self, objname, session, msg): ''' Invoke the callback for an object upon receipt of a Configure Message for that object. A no-op if the object doesn't exist. ''' replies = [] self._confobjslock.acquire() if objname == "all": for objname in self._confobjs: (type, callback) = self._confobjs[objname] reply = callback(session, msg) if reply is not None: replies.append(reply) self._confobjslock.release() return replies if objname in self._confobjs: (type, callback) = self._confobjs[objname] self._confobjslock.release() reply = callback(session, msg) if reply is not None: replies.append(reply) return replies else: self.info("session object doesn't own model '%s', ignoring" % \ objname) self._confobjslock.release() return replies def confobjs_to_tlvs(self): ''' Turn the configuration objects into a list of Register Message TLVs. ''' tlvdata = "" self._confobjslock.acquire() for objname in self._confobjs: (type, callback) = self._confobjs[objname] # type must be in coreapi.reg_tlvs tlvdata += coreapi.CoreRegTlv.pack(type, objname) self._confobjslock.release() return tlvdata def info(self, msg): ''' Utility method for writing output to stdout. ''' print msg sys.stdout.flush() def warn(self, msg): ''' Utility method for writing output to stderr. ''' print >> sys.stderr, msg sys.stderr.flush() def dumpsession(self): ''' Debug print this session. ''' self.info("session id=%s name=%s state=%s connected=%s" % \ (self.sessionid, self.name, self._state, self.isconnected())) num = len(self._objs) self.info(" file=%s thumb=%s nc=%s/%s" % \ (self.filename, self.thumbnail, self.node_count, num)) def exception(self, level, source, objid, text): ''' Generate an Exception Message ''' vals = (objid, str(self.sessionid), level, source, time.ctime(), text) types = ("NODE", "SESSION", "LEVEL", "SOURCE", "DATE", "TEXT") tlvdata = "" for (t,v) in zip(types, vals): if v is not None: tlvdata += coreapi.CoreExceptionTlv.pack( eval("coreapi.CORE_TLV_EXCP_%s" % t), v) msg = coreapi.CoreExceptionMessage.pack(0, tlvdata) # send Exception Message to connected handlers (e.g. GUI) self.broadcastraw(None, msg) def getcfgitem(self, cfgname): ''' Return an entry from the configuration dictionary that comes from command-line arguments and/or the core.conf config file. ''' if cfgname not in self.cfg: return None else: return self.cfg[cfgname] def getcfgitembool(self, cfgname, defaultifnone = None): ''' Return a boolean entry from the configuration dictionary, may return None if undefined. ''' item = self.getcfgitem(cfgname) if item is None: return defaultifnone return bool(item.lower() == "true") def getcfgitemint(self, cfgname, defaultifnone = None): ''' Return an integer entry from the configuration dictionary, may return None if undefined. ''' item = self.getcfgitem(cfgname) if item is None: return defaultifnone return int(item) def instantiate(self, handler=None): ''' We have entered the instantiation state, invoke startup methods of various managers and boot the nodes. Validate nodes and check for transition to the runtime state. ''' self.writeobjs() # controlnet may be needed by some EMANE models self.addremovectrlif(node=None, remove=False) if self.emane.startup() == self.emane.NOT_READY: return # instantiate() will be invoked again upon Emane.configure() self.broker.startup() self.mobility.startup() # boot the services on each node self.bootnodes(handler) # allow time for processes to start time.sleep(0.125) self.validatenodes() self.emane.poststartup() # assume either all nodes have booted already, or there are some # nodes on slave servers that will be booted and those servers will # send a node status response message self.checkruntime() def checkruntime(self): ''' Check if we have entered the runtime state, that all nodes have been started and the emulation is running. Start the event loop once we have entered runtime (time=0). ''' # this is called from instantiate() after receiving an event message # for the instantiation state, and from the broker when distributed # nodes have been started if self.node_count is None: return if self.getstate() == coreapi.CORE_EVENT_RUNTIME_STATE: return session_node_count = int(self.node_count) nc = 0 with self._objslock: for obj in self.objs(): # these networks may be added by the daemon and are not # considered in the GUI's node count if isinstance(obj, (nodes.PtpNet, nodes.CtrlNet)): continue # on Linux, GreTapBridges are auto-created, not part of GUI's # node count if 'GreTapBridge' in globals(): if isinstance(obj, GreTapBridge) and \ not isinstance(obj, nodes.TunnelNode): continue nc += 1 # count booted nodes not emulated on this server # TODO: let slave server determine RUNTIME and wait for Event Message # broker.getbootocunt() counts all CoreNodes from status reponse # messages, plus any remote WLANs; remote EMANE, hub, switch, etc. # are already counted in self._objs nc += self.broker.getbootcount() self.info("Checking for runtime with %d of %d session nodes" % \ (nc, session_node_count)) if nc < session_node_count: return # do not have information on all nodes yet # information on all nodes has been received and they have been started # enter the runtime state # TODO: more sophisticated checks to verify that all nodes and networks # are running state = coreapi.CORE_EVENT_RUNTIME_STATE self.evq.run() self.setstate(state, info=True, sendevent=True) def datacollect(self): ''' Tear down a running session. Stop the event loop and any running nodes, and perform clean-up. ''' self.evq.stop() with self._objslock: for obj in self.objs(): if isinstance(obj, nodes.PyCoreNode): self.services.stopnodeservices(obj) self.emane.shutdown() self.updatectrlifhosts(remove=True) self.addremovectrlif(node=None, remove=True) # self.checkshutdown() is currently invoked from node delete handler def checkshutdown(self): ''' Check if we have entered the shutdown state, when no running nodes and links remain. ''' with self._objslock: nc = len(self._objs) # TODO: this doesn't consider slave server node counts # wait for slave servers to enter SHUTDOWN state, then master session # can enter SHUTDOWN replies = () if nc == 0: replies = self.setstate(state=coreapi.CORE_EVENT_SHUTDOWN_STATE, info=True, sendevent=True, returnevent=True) self.sdt.shutdown() return replies def setmaster(self, handler): ''' Look for the specified handler and set our master flag appropriately. Returns True if we are connected to the given handler. ''' with self._handlerslock: for h in self._handlers: if h != handler: continue self.master = h.master return True return False def shortsessionid(self): ''' Return a shorter version of the session ID, appropriate for interface names, where length may be limited. ''' return (self.sessionid >> 8) ^ (self.sessionid & ((1 << 8) - 1)) def bootnodes(self, handler): ''' Invoke the boot() procedure for all nodes and send back node messages to the GUI for node messages that had the status request flag. ''' #self.addremovectrlif(node=None, remove=False) with self._objslock: for n in self.objs(): if not isinstance(n, nodes.PyCoreNode): continue if isinstance(n, nodes.RJ45Node): continue # add a control interface if configured self.addremovectrlif(node=n, remove=False) n.boot() nodenum = n.objid if handler is None: continue if nodenum in handler.nodestatusreq: tlvdata = "" tlvdata += coreapi.CoreNodeTlv.pack(coreapi.CORE_TLV_NODE_NUMBER, nodenum) tlvdata += coreapi.CoreNodeTlv.pack(coreapi.CORE_TLV_NODE_EMUID, n.objid) reply = coreapi.CoreNodeMessage.pack(coreapi.CORE_API_ADD_FLAG \ | coreapi.CORE_API_LOC_FLAG, tlvdata) try: handler.request.sendall(reply) except Exception, e: self.warn("sendall() error: %s" % e) del handler.nodestatusreq[nodenum] self.updatectrlifhosts() def validatenodes(self): with self._objslock: for n in self.objs(): # TODO: this can be extended to validate everything # such as vnoded process, bridges, etc. if not isinstance(n, nodes.PyCoreNode): continue if isinstance(n, nodes.RJ45Node): continue n.validate() def addremovectrlnet(self, remove=False): ''' Create a control network bridge as necessary. When the remove flag is True, remove the bridge that connects control interfaces. ''' prefix = None try: if self.cfg['controlnet']: prefix = self.cfg['controlnet'] except KeyError: pass if hasattr(self.options, 'controlnet'): prefix = self.options.controlnet if not prefix: return None # no controlnet needed # return any existing controlnet bridge id = "ctrlnet" try: ctrlnet = self.obj(id) if remove: self.delobj(ctrlnet.objid) return None return ctrlnet except KeyError: if remove: return None # build a new controlnet bridge updown_script = None try: if self.cfg['controlnet_updown_script']: updown_script = self.cfg['controlnet_updown_script'] except KeyError: pass prefixes = prefix.split() if len(prefixes) > 1: assign_address = True if self.master: try: prefix = prefixes[0].split(':', 1)[1] except IndexError: prefix = prefixes[0] # possibly only one server else: # slave servers have their name and localhost in the serverlist servers = self.broker.getserverlist() servers.remove('localhost') prefix = None for server_prefix in prefixes: server, p = server_prefix.split(':') if server == servers[0]: prefix = p break if not prefix: msg = "Control network prefix not found for server '%s'" % \ servers[0] self.exception(coreapi.CORE_EXCP_LEVEL_ERROR, "Session.addremovectrlnet()", None, msg) prefix = prefixes[0].split(':', 1)[1] assign_address = False else: # with one prefix, only master gets a ctrlnet address assign_address = self.master ctrlnet = self.addobj(cls=nodes.CtrlNet, objid=id, prefix=prefix, assign_address=assign_address, updown_script=updown_script) # tunnels between controlnets will be built with Broker.addnettunnels() self.broker.addnet(id) for server in self.broker.getserverlist(): self.broker.addnodemap(server, id) return ctrlnet def addremovectrlif(self, node, remove=False): ''' Add a control interface to a node when a 'controlnet' prefix is listed in the config file or session options. Uses addremovectrlnet() to build or remove the control bridge. ''' ctrlnet = self.addremovectrlnet(remove) if ctrlnet is None: return if node is None: return ctrlip = node.objid try: addrlist = ["%s/%s" % (ctrlnet.prefix.addr(ctrlip), ctrlnet.prefix.prefixlen)] except ValueError: msg = "Control interface not added to node %s. " % node.objid msg += "Invalid control network prefix (%s). " % ctrlnet.prefix msg += "A longer prefix length may be required for this many nodes." node.exception(coreapi.CORE_EXCP_LEVEL_ERROR, "Session.addremovectrlif()", msg) return ifi = node.newnetif(net = ctrlnet, ifindex = ctrlnet.CTRLIF_IDX_BASE, ifname = "ctrl0", hwaddr = MacAddr.random(), addrlist = addrlist) node.netif(ifi).control = True def updatectrlifhosts(self, remove=False): ''' Add the IP addresses of control interfaces to the /etc/hosts file. ''' if not self.getcfgitembool('update_etc_hosts', False): return id = "ctrlnet" try: ctrlnet = self.obj(id) except KeyError: return header = "CORE session %s host entries" % self.sessionid if remove: if self.getcfgitembool('verbose', False): self.info("Removing /etc/hosts file entries.") filedemunge('/etc/hosts', header) return entries = [] for ifc in ctrlnet.netifs(): name = ifc.node.name for addr in ifc.addrlist: entries.append("%s %s" % (addr.split('/')[0], ifc.node.name)) if self.getcfgitembool('verbose', False): self.info("Adding %d /etc/hosts file entries." % len(entries)) filemunge('/etc/hosts', header, '\n'.join(entries) + '\n') def runtime(self): ''' Return the current time we have been in the runtime state, or zero if not in runtime. ''' if self.getstate() == coreapi.CORE_EVENT_RUNTIME_STATE: return time.time() - self._time else: return 0.0 def addevent(self, etime, node=None, name=None, data=None): ''' Add an event to the event queue, with a start time relative to the start of the runtime state. ''' etime = float(etime) runtime = self.runtime() if runtime > 0.0: if time <= runtime: self.warn("Could not schedule past event for time %s " \ "(run time is now %s)" % (time, runtime)) return etime = etime - runtime func = self.runevent self.evq.add_event(etime, func, node=node, name=name, data=data) if name is None: name = "" self.info("scheduled event %s at time %s data=%s" % \ (name, etime + runtime, data)) def runevent(self, node=None, name=None, data=None): ''' Run a scheduled event, executing commands in the data string. ''' now = self.runtime() if name is None: name = "" self.info("running event %s at time %s cmd=%s" % (name, now, data)) if node is None: mutedetach(shlex.split(data)) else: n = self.obj(node) n.cmd(shlex.split(data), wait=False) def sendobjs(self): ''' Return API messages that describe the current session. ''' replies = [] nn = 0 # send node messages for node and network objects with self._objslock: for obj in self.objs(): msg = obj.tonodemsg(flags = coreapi.CORE_API_ADD_FLAG) if msg is not None: replies.append(msg) nn += 1 nl = 0 # send link messages from net objects with self._objslock: for obj in self.objs(): linkmsgs = obj.tolinkmsgs(flags = coreapi.CORE_API_ADD_FLAG) for msg in linkmsgs: replies.append(msg) nl += 1 # send model info configs = self.mobility.getallconfigs() configs += self.emane.getallconfigs() for (nodenum, cls, values) in configs: #cls = self.mobility._modelclsmap[conftype] msg = cls.toconfmsg(flags=0, nodenum=nodenum, typeflags=coreapi.CONF_TYPE_FLAGS_UPDATE, values=values) replies.append(msg) # service customizations svc_configs = self.services.getallconfigs() for (nodenum, svc) in svc_configs: opaque = "service:%s" % svc._name tlvdata = "" tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_NODE, nodenum) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OPAQUE, opaque) tmp = coreapi.CoreConfMessage(flags=0, hdr="", data=tlvdata) replies.append(self.services.configure_request(tmp)) for (filename, data) in self.services.getallfiles(svc): flags = coreapi.CORE_API_ADD_FLAG tlvdata = coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_NODE, nodenum) tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_NAME, str(filename)) tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_TYPE, opaque) tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_DATA, str(data)) replies.append(coreapi.CoreFileMessage.pack(flags, tlvdata)) # TODO: send location info # replies.append(self.location.toconfmsg()) # send hook scripts for state in sorted(self._hooks.keys()): for (filename, data) in self._hooks[state]: flags = coreapi.CORE_API_ADD_FLAG tlvdata = coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_NAME, str(filename)) tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_TYPE, "hook:%s" % state) tlvdata += coreapi.CoreFileTlv.pack(coreapi.CORE_TLV_FILE_DATA, str(data)) replies.append(coreapi.CoreFileMessage.pack(flags, tlvdata)) # send meta data tmp = coreapi.CoreConfMessage(flags=0, hdr="", data="") opts = self.options.configure_request(tmp, typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE) if opts: replies.append(opts) meta = self.metadata.configure_request(tmp, typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE) if meta: replies.append(meta) self.info("informing GUI about %d nodes and %d links" % (nn, nl)) return replies class SessionConfig(ConfigurableManager, Configurable): _name = 'session' _type = coreapi.CORE_TLV_REG_UTILITY _confmatrix = [ ("controlnet", coreapi.CONF_DATA_TYPE_STRING, '', '', 'Control network'), ("enablerj45", coreapi.CONF_DATA_TYPE_BOOL, '1', 'On,Off', 'Enable RJ45s'), ("preservedir", coreapi.CONF_DATA_TYPE_BOOL, '0', 'On,Off', 'Preserve session dir'), ("enablesdt", coreapi.CONF_DATA_TYPE_BOOL, '0', 'On,Off', 'Enable SDT3D output'), ] _confgroups = "Options:1-%d" % len(_confmatrix) def __init__(self, session): ConfigurableManager.__init__(self, session) session.broker.handlers += (self.handledistributed, ) self.reset() def reset(self): defaults = self.getdefaultvalues() for k in self.getnames(): # value may come from config file v = self.session.getcfgitem(k) if v is None: v = self.valueof(k, defaults) v = self.offontobool(v) setattr(self, k, v) def configure_values(self, msg, values): return self.configure_values_keyvalues(msg, values, self, self.getnames()) def configure_request(self, msg, typeflags = coreapi.CONF_TYPE_FLAGS_NONE): nodenum = msg.gettlv(coreapi.CORE_TLV_CONF_NODE) values = [] for k in self.getnames(): v = getattr(self, k) if v is None: v = "" values.append("%s" % v) return self.toconfmsg(0, nodenum, typeflags, values) def handledistributed(self, msg): ''' Handle the session options config message as it has reached the broker. Options requiring modification for distributed operation should be handled here. ''' if not self.session.master: return if msg.msgtype != coreapi.CORE_API_CONF_MSG or \ msg.gettlv(coreapi.CORE_TLV_CONF_OBJ) != "session": return values_str = msg.gettlv(coreapi.CORE_TLV_CONF_VALUES) if values_str is None: return values = values_str.split('|') if not self.haskeyvalues(values): return for v in values: key, value = v.split('=', 1) if key == "controlnet": self.handledistributedcontrolnet(msg, values, values.index(v)) def handledistributedcontrolnet(self, msg, values, idx): ''' Modify Config Message if multiple control network prefixes are defined. Map server names to prefixes and repack the message before it is forwarded to slave servers. ''' kv = values[idx] key, value = kv.split('=', 1) controlnets = value.split() if len(controlnets) < 2: return # multiple controlnet prefixes do not exist servers = self.session.broker.getserverlist() if len(servers) < 2: return # not distributed servers.remove("localhost") servers.insert(0, "localhost") # master always gets first prefix # create list of "server1:ctrlnet1 server2:ctrlnet2 ..." controlnets = map(lambda(x): "%s:%s" % (x[0],x[1]), zip(servers, controlnets)) values[idx] = "controlnet=%s" % (' '.join(controlnets)) values_str = '|'.join(values) msg.tlvdata[coreapi.CORE_TLV_CONF_VALUES] = values_str msg.repack() class SessionMetaData(ConfigurableManager): ''' Metadata is simply stored in a configs[] dict. Key=value pairs are passed in from configure messages destined to the "metadata" object. The data is not otherwise interpreted or processed. ''' _name = "metadata" _type = coreapi.CORE_TLV_REG_UTILITY def configure_values(self, msg, values): if values is None: return None kvs = values.split('|') for kv in kvs: try: (key, value) = kv.split('=', 1) except ValueError: raise ValueError, "invalid key in metdata: %s" % kv self.additem(key, value) return None def configure_request(self, msg, typeflags = coreapi.CONF_TYPE_FLAGS_NONE): nodenum = msg.gettlv(coreapi.CORE_TLV_CONF_NODE) values_str = "|".join(map(lambda(k,v): "%s=%s" % (k,v), self.items())) return self.toconfmsg(0, nodenum, typeflags, values_str) def toconfmsg(self, flags, nodenum, typeflags, values_str): tlvdata = "" if nodenum is not None: tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_NODE, nodenum) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OBJ, self._name) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_TYPE, typeflags) datatypes = tuple( map(lambda(k,v): coreapi.CONF_DATA_TYPE_STRING, self.items()) ) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_DATA_TYPES, datatypes) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES, values_str) msg = coreapi.CoreConfMessage.pack(flags, tlvdata) return msg def additem(self, key, value): self.configs[key] = value def items(self): return self.configs.iteritems() atexit.register(Session.atexit)