From d33147154cbee5c5f39a4bb1df4f824cdb36c282 Mon Sep 17 00:00:00 2001 From: Tom Goff Date: Wed, 12 Oct 2016 17:36:59 +0000 Subject: [PATCH] daemon: Add an instantiation-complete CORE API event type. Have CORE servers send an instantiation-complete event after a session has completed instantiating. Only enter the runtime state after instantiation-complete events have been received form all servers. --- daemon/core/api/data.py | 1 + daemon/core/broker.py | 28 ++++++++++++++++++++++++++++ daemon/core/session.py | 11 +++++++++-- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/daemon/core/api/data.py b/daemon/core/api/data.py index 8d889481..45f838da 100644 --- a/daemon/core/api/data.py +++ b/daemon/core/api/data.py @@ -286,6 +286,7 @@ event_types = dict(enumerate([ "CORE_EVENT_FILE_SAVE", "CORE_EVENT_SCHEDULED", "CORE_EVENT_RECONFIGURE", + "CORE_EVENT_INSTANTIATION_COMPLETE", ])) enumdict(event_types) diff --git a/daemon/core/broker.py b/daemon/core/broker.py index a927c19b..2582c3a6 100644 --- a/daemon/core/broker.py +++ b/daemon/core/broker.py @@ -31,6 +31,7 @@ class CoreServer(object): self.host = host self.port = port self.sock = None + self.instantiation_complete = False def connect(self): assert self.sock is None @@ -206,6 +207,13 @@ class CoreBroker(ConfigurableManager): # this allows green link lines for remote WLANs msg = coreapi.CoreLinkMessage(msgflags, msghdr, msgdata) self.session.sdt.handledistributed(msg) + elif msgtype == coreapi.CORE_API_EVENT_MSG: + msg = coreapi.CoreEventMessage(msgflags, msghdr, msgdata) + eventtype = msg.gettlv(coreapi.CORE_TLV_EVENT_TYPE) + if eventtype == coreapi.CORE_EVENT_INSTANTIATION_COMPLETE: + server.instantiation_complete = True + if self.instantiation_complete(): + self.session.checkruntime() self.session.broadcastraw(None, data) if count is not None and count < 1: @@ -213,6 +221,26 @@ class CoreBroker(ConfigurableManager): else: return len(data) + def local_instantiation_complete(self): + '''\ + Set the local server's instantiation-complete status to True. + ''' + with self.servers_lock: + server = self.servers.get('localhost') + if server is not None: + server.instantiation_complete = True + + def instantiation_complete(self): + '''\ + Return True if all servers have completed instantiation, False + otherwise. + ''' + with self.servers_lock: + for server in self.servers.itervalues(): + if not server.instantiation_complete: + return False + return True + def addserver(self, name, host, port): ''' Add a new server, and try to connect to it. If we're already connected to this (host, port), then leave it alone. When host,port diff --git a/daemon/core/session.py b/daemon/core/session.py index 2e40ab8c..ed2c288b 100644 --- a/daemon/core/session.py +++ b/daemon/core/session.py @@ -638,6 +638,13 @@ class Session(object): # allow time for processes to start time.sleep(0.125) self.validatenodes() + self.broker.local_instantiation_complete() + if self.isconnected(): + tlvdata = '' + tlvdata += coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE, + coreapi.CORE_EVENT_INSTANTIATION_COMPLETE) + msg = coreapi.CoreEventMessage.pack(0, tlvdata) + self.broadcastraw(None, msg) # assume either all nodes have booted already, or there are some # nodes on slave servers that will be booted and those servers will # send a node status response message @@ -687,8 +694,8 @@ class Session(object): return # do not have information on all nodes yet # information on all nodes has been received and they have been started # enter the runtime state - # TODO: more sophisticated checks to verify that all nodes and networks - # are running + if not self.broker.instantiation_complete(): + return state = coreapi.CORE_EVENT_RUNTIME_STATE self.evq.run() self.setstate(state, info=True, sendevent=True)