stateful_timeout
#
Stateful operators that have timeout parameters.
Data#
Functions#
- stateful_flat_map_timeout(
- step_id: str,
- up: KeyedStream[V],
- mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]],
- timeout: timedelta,
- on_timeout: Callable[[S], Iterable[W]] = lambda s: ...,
- _now_getter: Callable[[], datetime] = _get_system_utc,
One-to-many transform using an expiring persistent state.
Works similar to
stateful_flat_map
, but will discard the state if no value is seen for a key after a timeout. Just before a discard due to timeout,on_timeout
is called and allows you to emit items downstream referencing just the state.All keys’ state times out on EOF and
on_timeout
is called.- Parameters:
step_id – Unique ID.
up – Keyed stream.
mapper – Called whenever a value is encountered from upstream with the last state or
None
, and then the upstream value. Should return a 2-tuple of(updated_state, emit_values)
. If the updated state isNone
, discard it.timeout – Discard state after this duration if a value isn’t seen again for a key.
on_timeout – Called just before discarding the state for a key if due to timeout. Should return any additional values to emit downstream just before the state is discarded. Defaults to emitting nothing downstream on expiration.
- Returns:
A keyed stream.
- stateful_map_timeout(
- step_id: str,
- up: KeyedStream[V],
- mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]],
- timeout: timedelta,
- _now_getter: Callable[[], datetime] = _get_system_utc,
One-to-one transform using an expiring persistent state.
Works exactly like
stateful_map
, but will discard the state if no value is seen for a key after a timeout. Nothing is emitted downstream on expiration.All keys’ state times out on EOF and
on_timeout
is called.- Parameters:
step_id – Unique ID.
up – Keyed stream.
mapper – Called whenever a value is encountered from upstream with the last state or
None
, and then the upstream value. Should return a 2-tuple of(updated_state, emit_value)
. If the updated state isNone
, discard it.timeout – Discard state after this duration if a value isn’t seen again for a key.
- Returns:
A keyed stream.