daemon: Add an instantiation-complete CORE API event type.
Have CORE servers send an instantiation-complete event after a session has completed instantiating. Only enter the runtime state after instantiation-complete events have been received form all servers.
This commit is contained in:
parent
42b1771dcb
commit
d33147154c
3 changed files with 38 additions and 2 deletions
|
@ -286,6 +286,7 @@ event_types = dict(enumerate([
|
|||
"CORE_EVENT_FILE_SAVE",
|
||||
"CORE_EVENT_SCHEDULED",
|
||||
"CORE_EVENT_RECONFIGURE",
|
||||
"CORE_EVENT_INSTANTIATION_COMPLETE",
|
||||
]))
|
||||
|
||||
enumdict(event_types)
|
||||
|
|
|
@ -31,6 +31,7 @@ class CoreServer(object):
|
|||
self.host = host
|
||||
self.port = port
|
||||
self.sock = None
|
||||
self.instantiation_complete = False
|
||||
|
||||
def connect(self):
|
||||
assert self.sock is None
|
||||
|
@ -206,6 +207,13 @@ class CoreBroker(ConfigurableManager):
|
|||
# this allows green link lines for remote WLANs
|
||||
msg = coreapi.CoreLinkMessage(msgflags, msghdr, msgdata)
|
||||
self.session.sdt.handledistributed(msg)
|
||||
elif msgtype == coreapi.CORE_API_EVENT_MSG:
|
||||
msg = coreapi.CoreEventMessage(msgflags, msghdr, msgdata)
|
||||
eventtype = msg.gettlv(coreapi.CORE_TLV_EVENT_TYPE)
|
||||
if eventtype == coreapi.CORE_EVENT_INSTANTIATION_COMPLETE:
|
||||
server.instantiation_complete = True
|
||||
if self.instantiation_complete():
|
||||
self.session.checkruntime()
|
||||
|
||||
self.session.broadcastraw(None, data)
|
||||
if count is not None and count < 1:
|
||||
|
@ -213,6 +221,26 @@ class CoreBroker(ConfigurableManager):
|
|||
else:
|
||||
return len(data)
|
||||
|
||||
def local_instantiation_complete(self):
|
||||
'''\
|
||||
Set the local server's instantiation-complete status to True.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
server = self.servers.get('localhost')
|
||||
if server is not None:
|
||||
server.instantiation_complete = True
|
||||
|
||||
def instantiation_complete(self):
|
||||
'''\
|
||||
Return True if all servers have completed instantiation, False
|
||||
otherwise.
|
||||
'''
|
||||
with self.servers_lock:
|
||||
for server in self.servers.itervalues():
|
||||
if not server.instantiation_complete:
|
||||
return False
|
||||
return True
|
||||
|
||||
def addserver(self, name, host, port):
|
||||
''' Add a new server, and try to connect to it. If we're already
|
||||
connected to this (host, port), then leave it alone. When host,port
|
||||
|
|
|
@ -638,6 +638,13 @@ class Session(object):
|
|||
# allow time for processes to start
|
||||
time.sleep(0.125)
|
||||
self.validatenodes()
|
||||
self.broker.local_instantiation_complete()
|
||||
if self.isconnected():
|
||||
tlvdata = ''
|
||||
tlvdata += coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
|
||||
coreapi.CORE_EVENT_INSTANTIATION_COMPLETE)
|
||||
msg = coreapi.CoreEventMessage.pack(0, tlvdata)
|
||||
self.broadcastraw(None, msg)
|
||||
# assume either all nodes have booted already, or there are some
|
||||
# nodes on slave servers that will be booted and those servers will
|
||||
# send a node status response message
|
||||
|
@ -687,8 +694,8 @@ class Session(object):
|
|||
return # do not have information on all nodes yet
|
||||
# information on all nodes has been received and they have been started
|
||||
# enter the runtime state
|
||||
# TODO: more sophisticated checks to verify that all nodes and networks
|
||||
# are running
|
||||
if not self.broker.instantiation_complete():
|
||||
return
|
||||
state = coreapi.CORE_EVENT_RUNTIME_STATE
|
||||
self.evq.run()
|
||||
self.setstate(state, info=True, sendevent=True)
|
||||
|
|
Loading…
Reference in a new issue