timers#
Tools for managing multiple timers.
Data#
Classes#
- class MultiNotifier#
-
Allow multiple notifications in a stateful operator.
This helps you when you’re writing a
bytewax.operators.StatefulBatchLogicorbytewax.operators.StatefulLogicto keep track of multiple pending notifications.Generally, it’ll be used something like this.
import copy from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Iterable, List, Tuple, Optional from bytewax.operators import StatefulBatchLogic from bytewax.timers import MultiNotifier @dataclass(frozen=True) class MyState: notifier: MultiNotifier class MyOperatorLogic(StatefulBatchLogic[V, V, MyState]): def __init__(self, notifier: MultiNotifier[str]) -> None: self.notifier = notifier @override def on_batch(self, values: List[V]) -> Tuple[Iterable[V], bool]: now = datetime.now(tz=timezone.utc) notify_in_ten_sec = now + timedelta(seconds=10) self.notifier.notify_at(notify_in_ten_sec, "token") # Do whatever else this logic should do. ... @override def on_notify(self) -> Tuple[Iterable[W], bool]: now = datetime.now(tz=timezone.utc) for token in self.notifier.due(now): # Do some work for this notification. ... # Do whatever else this logic should do. ... @override def notify_at(self) -> Optional[datetime]: return self.notifier.next_at() @override def snapshot(self) -> MyState: return MyState(copy.deepcopy(self.notifier))
You must round-trip the
MultiNotifierthrough the snapshotting system. This requires you to add it to whatever state you are snapshotting, and ensure that it is built back into yourbytewax.operators.StatefulBatchLogicsubclass in thebytewax.operators.stateful_batch’sbuildercallback function.Initialization
Init.
- notify_at(at: datetime, token: T) None#
Mark that you want a notification at a given time.
- Parameters:
at – Time to notify at.
token – That will be returned to you in the return list of
due.
- next_at() Optional[datetime]#
The soonest notification time, if any.
This should be called from
bytewax.operators.StatefulBatchLogic.notify_ator other relevant hooks which specify when for an operator to next awake.- Returns:
- due(now: datetime) Iterable[T]#
Given the current time, find all due notifications.
This should be called from
bytewax.operators.StatefulBatchLogic.on_notifyso that you can determine the exact notifications that have been triggered.- Parameters:
now – The current time.
- Returns:
A list of all
tokengiven tonotify_atfor which the notification time has passed.