timers#

Tools for managing multiple timers.

Data#

T: TypeVar#

Type of alarm token.

Classes#

class MultiNotifier#
Bases:

Allow multiple notifications in a stateful operator.

This helps you when you’re writing a bytewax.operators.StatefulBatchLogic or bytewax.operators.StatefulLogic to keep track of multiple pending notifications.

Generally, it’ll be used something like this.

import copy

from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, Tuple, Optional

from bytewax.operators import StatefulBatchLogic
from bytewax.timers import MultiNotifier


@dataclass(frozen=True)
class MyState:
    notifier: MultiNotifier


class MyOperatorLogic(StatefulBatchLogic[V, V, MyState]):
    def __init__(self, notifier: MultiNotifier[str]) -> None:
        self.notifier = notifier

    @override
    def on_batch(self, values: List[V]) -> Tuple[Iterable[V], bool]:
        now = datetime.now(tz=timezone.utc)
        notify_in_ten_sec = now + timedelta(seconds=10)
        self.notifier.notify_at(notify_in_ten_sec, "token")

        # Do whatever else this logic should do.
        ...

    @override
    def on_notify(self) -> Tuple[Iterable[W], bool]:
        now = datetime.now(tz=timezone.utc)
        for token in self.notifier.due(now):
            # Do some work for this notification.
            ...

        # Do whatever else this logic should do.
        ...

    @override
    def notify_at(self) -> Optional[datetime]:
        return self.notifier.next_at()

    @override
    def snapshot(self) -> MyState:
        return MyState(copy.deepcopy(self.notifier))

You must round-trip the MultiNotifier through the snapshotting system. This requires you to add it to whatever state you are snapshotting, and ensure that it is built back into your bytewax.operators.StatefulBatchLogic subclass in the bytewax.operators.stateful_batch’s builder callback function.

Initialization

Init.

notify_at(at: datetime, token: T) None#

Mark that you want a notification at a given time.

Parameters:
  • at – Time to notify at.

  • token – That will be returned to you in the return list of due.

next_at() Optional[datetime]#

The soonest notification time, if any.

This should be called from bytewax.operators.StatefulBatchLogic.notify_at or other relevant hooks which specify when for an operator to next awake.

Returns:

due(now: datetime) Iterable[T]#

Given the current time, find all due notifications.

This should be called from bytewax.operators.StatefulBatchLogic.on_notify so that you can determine the exact notifications that have been triggered.

Parameters:

now – The current time.

Returns:

A list of all token given to notify_at for which the notification time has passed.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now