NotQuiteParadise2

Source code for nqp.core.scheduler

import collections
import dataclasses
import time
from heapq import heapify, heappop, heappush, heappushpop
from typing import Callable, List, Optional

__all__ = ("ScheduledItem", "Scheduler")


[docs]def remove(items, func): """ Remove scheduled items with func ``func`` from list ``remove_from`` and return True if it was removed """ removed = False remove_ = items.remove for i in list(i for i in items if i.func is func): remove_(i) removed = True return removed
# after py 3.10, add slots=True
[docs]@dataclasses.dataclass() class ScheduledItem: """ Describes a scheduled callback. """ func: Callable last_ts: float next_ts: float interval: float repeat: bool def __lt__(self, other): try: return self.next_ts < other.next_ts except AttributeError: return self.next_ts < other
[docs]class Scheduler: """ Class for scheduling functions. Probably not thread safe. The function should have a prototype that includes ``dt`` as the first argument, which gives the elapsed time, in time units, since the last clock tick. def callback(dt): pass Soft Scheduling =============== Items will be scheduled and rescheduled in such a way to prevent several items from running during the same tick. Enabling a "soft" reschedule means that item is tolerant to having the interval slightly modified. """
[docs] def __init__(self, time_function: Callable = time.perf_counter): """ Initialise a Scheduler, with optional custom time function. Parameters: time_function: Return the elapsed time """ self._time: Callable = time_function self._now: float = 0.0 self._last_ts: float = 0.0 self._times = collections.deque(maxlen=10) self._scheduled_items: List[ScheduledItem] = list() self._next_tick_items: List[ScheduledItem] = list() self._current_interval_item: Optional[ScheduledItem] = None self.cumulative_time: float = self._last_ts
[docs] def schedule_once(self, func: Callable, delay: float = 0.0, soft: bool = False) -> ScheduledItem: """ Schedule a function to be run once sometime in the future. If the delay is not specified, the function will be executed during the next tick. Parameters: func: Function to be called delay: Delay in time unit until it is called soft: See notes about Soft Scheduling Returns: Reference to scheduled item. """ return self.schedule(func, delay, 0.0, False, soft)
[docs] def schedule_interval( self, func: Callable, interval: float = 0.0, delay: float = 0.0, soft: bool = False, ): """ Schedule a function to run on an interval. NOTE! If delay==0.0, then the interval will start the next frame, meaning, the callback will happen the next tick, and then follow the interval. If you want to avoid this, then set delay to the same value as the interval. Items are rescheduled after they are executed. That means that by default, the interval of items may not be consistent with the initial time with was scheduled. If the interval is not specified, the function will be executed every time the scheduler is ticked. Parameters: func: Function to be called interval: Repeat on this interval delay: Delay in time unit until it is called for first time soft: See notes about Soft Scheduling Returns: Reference to scheduled item """ return self.schedule(func, delay, interval, True, soft)
[docs] def schedule(self, func: Callable, delay: float, interval: float, repeat: bool, soft: bool): assert delay >= 0.0 assert interval >= 0.0 if interval: assert repeat last_ts = self._get_nearest_ts() if soft: assert delay > 0.0 next_ts = self._get_soft_next_ts(last_ts, delay) last_ts = next_ts - delay next_ts = last_ts + delay item = ScheduledItem(func, last_ts, next_ts, interval, repeat) if repeat and (interval == next_ts == 0.0): self._next_tick_items.append(item) if len(self._next_tick_items) > 10: # TODO: warning only! raise RuntimeError else: heappush(self._scheduled_items, item) return item
[docs] def unschedule(self, func) -> None: """ Remove a function from the schedule. NOTE: do not unschedule own function during function call If the function appears in the schedule more than once, all occurrences are removed. If the function was not scheduled, no error is raised. Parameters: func: The function to remove from the schedule. """ remove(self._next_tick_items, func) if remove(self._scheduled_items, func): heapify(self._scheduled_items)
[docs] def tick(self) -> float: """ Cause clock to update and call scheduled functions. This updates the clock's internal measure of time and returns the difference since the last update (or since the clock was created). Will call any scheduled functions that have elapsed. Returns: The number of time units since the last "tick", or 0 if this was the first tick. """ delta_t = self.set_time(self._time()) self._times.append(delta_t) self.call_scheduled_functions(delta_t) return delta_t
[docs] def call_scheduled_functions(self, dt: float): """ Call scheduled functions that elapsed on the last `update_time`. Parameters: dt: The elapsed time since the last update to pass to each scheduled function. """ now = self._last_ts # handle items scheduled for each tick if self._next_tick_items: # make copy of list in case event removes itself for item in list(self._next_tick_items): item.func(dt) item = None get_soft_next_ts = self._get_soft_next_ts scheduled_items = self._scheduled_items while scheduled_items: # if next item is scheduled in the future then exit if scheduled_items[0].next_ts > now: break # the scheduler will hold onto a reference to an item in # case it needs to be rescheduled. it is more efficient # to push and pop the heap in one call than to make two # operations. if item is None: item = heappop(scheduled_items) else: item = heappushpop(scheduled_items, item) # call the function associated with the scheduled item item.func(now - item.last_ts) if item.repeat: item.next_ts = item.last_ts + item.interval item.last_ts = now # the execution time of this item has already passed, # so it must be rescheduled if item.next_ts <= now: if now - item.next_ts < 0.05: # reschedule item.next_ts = now + item.interval else: # missed by significant amount; soft reschedule # to avoid lumping everything together. in this # case, the next dt will not be accurate. item.next_ts = get_soft_next_ts(now, item.interval) item.last_ts = item.next_ts - item.interval else: # this item will not be rescheduled item = None if item: heappush(scheduled_items, item)
[docs] def get_running_time(self) -> float: """ Get time clock has been running """ return self.cumulative_time
[docs] def get_counter(self) -> float: """ Get internal counter value """ return self._last_ts
[docs] def get_interval(self) -> float: """ Get the average amount of time passed between each tick. Useful for calculating FPS if this clock is used with the display. Returned value is averaged from last 10 ticks. Value will be 0.0 if before 1st tick. Returns: Average amount of time passed between each tick """ try: return sum(self._times) / len(self._times) except ZeroDivisionError: return 0.0
[docs] def get_schedule(self) -> List[ScheduledItem]: """ Return copy of the schedule. """ return self._next_tick_items + sorted(self._scheduled_items)
[docs] def set_time(self, time_stamp: float) -> float: """ Set the clock manually and do not call scheduled functions. Parameters: time_stamp: This will become the new value of the clock. Returns: The number of time units since the last update, or 0.0 if this was the first update. """ time_stamp = float(time_stamp) # self._last_ts will be -1 before first time set if self._last_ts < 0: delta_t = 0.0 else: delta_t = time_stamp - self._last_ts self.cumulative_time += delta_t self._last_ts = time_stamp return delta_t
[docs] def get_idle_time(self) -> Optional[float]: """ Get the time until the next item is scheduled. Returns: Time until the next scheduled event in time units, or ``None`` if there is no events scheduled. """ if self._next_tick_items: return 0.0 try: next_ts = self._scheduled_items[0].next_ts return max(next_ts - self._time(), 0.0) except IndexError: return None
def _get_nearest_ts(self): """ Schedule from now, unless now is sufficiently close to last_ts, in which case use last_ts. This clusters together scheduled items that probably want to be scheduled together. """ last_ts = self._last_ts ts = self._time() if ts - last_ts > 0.2: last_ts = ts return last_ts def _get_soft_next_ts(self, last_ts, interval): def taken(ts, e): """Return True if the given time has already got an item scheduled nearby. """ for item in self._scheduled_items: if item.next_ts is None: continue elif abs(item.next_ts - ts) <= e: return True elif item.next_ts > ts + e: return False return False next_ts = last_ts + interval if not taken(next_ts, interval / 4): return next_ts dt = interval divs = 1 while True: next_ts = last_ts for i in range(divs - 1): next_ts += dt if not taken(next_ts, dt / 4.0): return next_ts dt /= 2 divs *= 2 # Avoid infinite loop in pathological case if divs > 16: return next_ts