interval#
Time-based interval operators.
Data#
Classes#
- class IntervalLogic#
-
Abstract class to define a
intervaloperator.That operator will call these methods for you.
A unique instance of this logic will be instantiated for each item seen on the left side.
- abstract on_value( ) Iterable[W]#
Called on each new upstream item in within this interval.
Will be called only once with a left side item that created the interval.
- Parameters:
side – Either
"left"or"right".value – The value of the upstream
(key, value).
- Returns:
Any values to emit downstream. Values will automatically be wrapped with key.
- abstract on_close() Iterable[W]#
Called when this interval closes.
- Returns:
Any values to emit downstream. Values will automatically be wrapped with key.
- abstract snapshot() S#
Return a immutable copy of the state for recovery.
This will be called periodically by the runtime.
The value returned here will be passed back to the
builderfunction ofintervalwhen resuming.The state must be
pickle-able.Danger
The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must
copy.deepcopyor something equivalent before returning it here.- Returns:
The immutable state to be
pickled.
- class IntervalOut#
-
Streams returned from an interval operator.
- down: KeyedStream[W_co]#
Items emitted from this operator.
- late: KeyedStream[V]#
Upstream items that were before the current watermark.
- unpaired: KeyedStream[V]#
Items from right side which did not fall into any interval.
Functions#
- interval(
- step_id: str,
- left: KeyedStream[V],
- clock: Clock[V, SC],
- gap_before: timedelta,
- gap_after: timedelta,
- builder: Callable[[Optional[S]], IntervalLogic[V, W, S]],
- right: KeyedStream[V],
Low level interval operator.
This allows opening “windows” whenever an item occurs on the left side, and then pairs it with items in the right side that are within the specified timestamp gap.
Values are always applied to the logic in timestamp (not arrival) order.
- Parameters:
step_id – Unique ID.
left – Triggering stream.
clock – Time definition. Must be able to determine timestamps for both
leftandrightstreams.gap_before – Pair right side items if they are within this duration before a left side item.
gap_after – Pair right side items if they are within this duration after a left side item.
builder – Builds an
IntervalLogicwhenever a new left side value is seen, or state is being reconstituted during a resume.right – Joined stream.
- Returns:
Interval result streams.
- join_interval(
- step_id: str,
- left: KeyedStream[Any],
- clock: Clock[Any, Any],
- gap_before: timedelta,
- gap_after: timedelta,
- *rights: KeyedStream[Any],
- mode: Literal[complete, final, running, product] = 'final',
Gather values for a key which lie near each other in time.
The joining process is triggered only by left side values.
- Parameters:
step_id – Unique ID.
left – Triggering stream.
clock – Time definition. Must be able to determine timestamps for both
leftandrightsstreams.gap_before – Join right side items if they are within this duration before a left side item.
gap_before – Join right side items if they are within this duration after a left side item.
rights – Joined streams.
mode –
What kind of join to do. Determines the output if there is not just a single value on each side within the interval. One of:
"final": Emit a single tuple with the last value (by timestamp) on each side. A side will containNoneif no value seen."running": Emit a tuple of the most recent values for all sides each time there is a new value on any side. The most recent value for a side starts asNone."product": Emit a tuple for each combination of values on each side. A side will containNoneif no value seen.
Defaults to
"final".
- Returns:
Interval result streams. Downstream contains tuples with the value from each stream in the order of the argument list once each interval has closed.