stateful_timeout#

Stateful operators that have timeout parameters.

Data#

ZERO_TD: timedelta#

A zero length of time.

Functions#

stateful_flat_map_timeout(
step_id: str,
up: KeyedStream[V],
mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]],
timeout: timedelta,
on_timeout: Callable[[S], Iterable[W]] = lambda s: ...,
_now_getter: Callable[[], datetime] = _get_system_utc,
) KeyedStream[W]#

One-to-many transform using an expiring persistent state.

Works similar to stateful_flat_map, but will discard the state if no value is seen for a key after a timeout. Just before a discard due to timeout, on_timeout is called and allows you to emit items downstream referencing just the state.

All keys’ state times out on EOF and on_timeout is called.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • mapper – Called whenever a value is encountered from upstream with the last state or None, and then the upstream value. Should return a 2-tuple of (updated_state, emit_values). If the updated state is None, discard it.

  • timeout – Discard state after this duration if a value isn’t seen again for a key.

  • on_timeout – Called just before discarding the state for a key if due to timeout. Should return any additional values to emit downstream just before the state is discarded. Defaults to emitting nothing downstream on expiration.

Returns:

A keyed stream.

stateful_map_timeout(
step_id: str,
up: KeyedStream[V],
mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]],
timeout: timedelta,
_now_getter: Callable[[], datetime] = _get_system_utc,
) KeyedStream[W]#

One-to-one transform using an expiring persistent state.

Works exactly like stateful_map, but will discard the state if no value is seen for a key after a timeout. Nothing is emitted downstream on expiration.

All keys’ state times out on EOF and on_timeout is called.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • mapper – Called whenever a value is encountered from upstream with the last state or None, and then the upstream value. Should return a 2-tuple of (updated_state, emit_value). If the updated state is None, discard it.

  • timeout – Discard state after this duration if a value isn’t seen again for a key.

Returns:

A keyed stream.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now