core-extra/daemon/core/misc/event.py
2017-08-04 14:51:25 -07:00

286 lines
7 KiB
Python

"""
event.py: event loop implementation using a heap queue and threads.
"""
import heapq
import threading
import time
from core.misc import log
logger = log.get_logger(__name__)
class Timer(threading.Thread):
"""
Based on threading.Timer but cancel() returns if the timer was
already running.
"""
def __init__(self, interval, function, args=None, kwargs=None):
"""
Create a Timer instance.
:param interval: time interval
:param function: function to call when timer finishes
:param args: function arguments
:param kwargs: function keyword arguments
"""
super(Timer, self).__init__()
self.interval = interval
self.function = function
self.finished = threading.Event()
self._running = threading.Lock()
# validate arguments were provided
if args:
self.args = args
else:
self.args = []
# validate keyword arguments were provided
if kwargs:
self.kwargs = kwargs
else:
self.kwargs = {}
def cancel(self):
"""
Stop the timer if it hasn't finished yet. Return False if
the timer was already running.
:return: True if canceled, False otherwise
:rtype: bool
"""
locked = self._running.acquire(False)
if locked:
self.finished.set()
self._running.release()
return locked
def run(self):
"""
Run the timer.
:return: nothing
"""
self.finished.wait(self.interval)
with self._running:
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
class Event(object):
"""
Provides event objects that can be used within the EventLoop class.
"""
def __init__(self, eventnum, event_time, func, *args, **kwds):
"""
Create an Event instance.
:param eventnum: event number
:param event_time: event time
:param func: event function
:param args: function arguments
:param kwds: function keyword arguments
"""
self.eventnum = eventnum
self.time = event_time
self.func = func
self.args = args
self.kwds = kwds
self.canceled = False
def __cmp__(self, other):
"""
Comparison function.
:param Event other: event to compare with
:return: comparison result
:rtype: int
"""
tmp = cmp(self.time, other.time)
if tmp == 0:
tmp = cmp(self.eventnum, other.eventnum)
return tmp
def run(self):
"""
Run an event.
:return: nothing
"""
if self.canceled:
return
self.func(*self.args, **self.kwds)
def cancel(self):
"""
Cancel event.
:return: nothing
"""
# XXX not thread-safe
self.canceled = True
class EventLoop(object):
"""
Provides an event loop for running events.
"""
def __init__(self):
"""
Creates a EventLoop instance.
"""
self.lock = threading.RLock()
self.queue = []
self.eventnum = 0
self.timer = None
self.running = False
self.start = None
def __run_events(self):
"""
Run events.
:return: nothing
"""
schedule = False
while True:
with self.lock:
if not self.running or not self.queue:
break
now = time.time()
if self.queue[0].time > now:
schedule = True
break
event = heapq.heappop(self.queue)
assert event.time <= now
event.run()
with self.lock:
self.timer = None
if schedule:
self.__schedule_event()
def __schedule_event(self):
"""
Schedule event.
:return: nothing
"""
with self.lock:
assert self.running
if not self.queue:
return
delay = self.queue[0].time - time.time()
assert self.timer is None
self.timer = Timer(delay, self.__run_events)
self.timer.daemon = True
self.timer.start()
def run(self):
"""
Start event loop.
:return: nothing
"""
with self.lock:
if self.running:
return
self.running = True
self.start = time.time()
for event in self.queue:
event.time += self.start
self.__schedule_event()
def stop(self):
"""
Stop event loop.
:return: nothing
"""
with self.lock:
if not self.running:
return
self.queue = []
self.eventnum = 0
if self.timer is not None:
self.timer.cancel()
self.timer = None
self.running = False
self.start = None
def add_event(self, delaysec, func, *args, **kwds):
"""
Add an event to the event loop.
:param int delaysec: delay in seconds for event
:param func: event function
:param args: event arguments
:param kwds: event keyword arguments
:return: created event
:rtype: Event
"""
with self.lock:
eventnum = self.eventnum
self.eventnum += 1
evtime = float(delaysec)
if self.running:
evtime += time.time()
event = Event(eventnum, evtime, func, *args, **kwds)
if self.queue:
prevhead = self.queue[0]
else:
prevhead = None
heapq.heappush(self.queue, event)
head = self.queue[0]
if prevhead is not None and prevhead != head:
if self.timer is not None and self.timer.cancel():
self.timer = None
if self.running and self.timer is None:
self.__schedule_event()
return event
# TODO: move example to documentation
def example():
loop = EventLoop()
def msg(arg):
delta = time.time() - loop.start
logger.debug("%s arg: %s", delta, arg)
def repeat(interval, count):
count -= 1
msg("repeat: interval: %s; remaining: %s" % (interval, count))
if count > 0:
loop.add_event(interval, repeat, interval, count)
def sleep(delay):
msg("sleep %s" % delay)
time.sleep(delay)
msg("sleep done")
def stop(arg):
msg(arg)
loop.stop()
loop.add_event(0, msg, "start")
loop.add_event(0, msg, "time zero")
for delay in 5, 4, 10, -1, 0, 9, 3, 7, 3.14:
loop.add_event(delay, msg, "time %s" % delay)
loop.run()
loop.add_event(0, repeat, 1, 5)
loop.add_event(12, sleep, 10)
loop.add_event(15.75, stop, "stop time: 15.75")