bytewax.operators.window
#
Time-based windowing operators.
Data#
Classes#
- class ClockConfig#
Base class for a clock config.
This describes how a windowing operator should determine the current time and the time for each element.
Use a specific subclass of this that matches the time definition you’d like to use.
Initialization
- class EventClockConfig(dt_getter, wait_for_system_duration)#
- Bases:
Use a getter function to lookup the timestamp for each item.
The watermark is the largest item timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means items will be correctly processed as long as they are not out of order more than the waiting duration in system time.
If the dataflow has no more input, all windows are closed.
- Parameters:
- Returns:
Config object. Pass this as the
clock_config
parameter to your windowing operator.
Initialization
- property dt_getter#
- property wait_for_system_duration#
- class SessionWindow(gap)#
- Bases:
Session windowing with a fixed inactivity gap.
Each time a new item is received, it is added to the latest window if the time since the latest event is <
gap
. Otherwise a new window is created that starts at current clock’s time.Warning
Currently, session windows do not support out-of-order data. Out of order data will be placed in their own sessions rather than merging adjacent sessions.
Ensure that your data source is always in order if using an
EventClockConfig
. Even if it is in-order, you cannot use event time session windows with any windowing join operator.SystemClockConfig
is always in order, so should be fine to use with any operator.- Parameters:
gap (timedelta) – Gap of inactivity before considering a session closed. The gap should not be negative.
- Returns:
Config object. Pass this as the
window_config
parameter to your windowing operator.
Initialization
- property gap#
- class SlidingWindow(length, offset, align_to)#
- Bases:
Sliding windows of fixed duration.
If
offset == length
, windows cover all time but do not overlap. Each item will fall in exactly one window. This would be equivalent to aTumblingWindow
.If
offset < length
, windows overlap. Each item will fall in multiple windows.If
offset > length
, there will be gaps between windows. Each item can fall in up to one window, but might fall into none.Window start times are inclusive, but end times are exclusive.
- Parameters:
- Returns:
Config object. Pass this as the
window_config
parameter to your windowing operator.
Initialization
- property align_to#
- property length#
- property offset#
- class SystemClockConfig#
- Bases:
Use the current system time as the timestamp for each item.
The watermark is also the current system time.
If the dataflow has no more input, all windows are closed.
Returns: Config object. Pass this as the
clock_config
parameter to your windowing operator.Initialization
- class TumblingWindow(length, align_to)#
- Bases:
Tumbling windows of fixed duration.
Each item will fall in exactly one window.
Window start times are inclusive, but end times are exclusive.
- Parameters:
- Returns:
Config object. Pass this as the
window_config
parameter to your windowing operator.
Initialization
- property align_to#
- property length#
- class WindowConfig#
Base class for a windower config.
This describes the type of windows you would like.
Use a specific subclass of this that matches the window definition you’d like to use.
Initialization
Functions#
- collect_window(
- step_id: str,
- up: KeyedStream,
- clock: ClockConfig,
- windower: WindowConfig,
- into: Type[C] = list,
Collect items in a window into a container.
See
bytewax.operators.collect
for the ability to set a max size.- Parameters:
step_id – Unique ID.
up – Stream of items to count.
clock – Clock.
windower – Windower.
into – Type to collect into. Defaults to
list
.
- Returns:
A keyed stream of the collected containers at the end of each window.
- count_window(
- step_id: str,
- up: Stream[X],
- clock: ClockConfig,
- windower: WindowConfig,
- key: Callable[[X], str],
Count the number of occurrences of items in a window.
- Parameters:
step_id – Unique ID.
up – Stream of items to count.
clock – Clock.
windower – Windower.
key – Function to convert each item into a string key. The counting machinery does not compare the items directly, instead it groups by this string key.
- Returns:
A stream of
(key, count)
per window at the end of each window.
- fold_window(
- step_id: str,
- up: KeyedStream[V],
- clock: ClockConfig,
- windower: WindowConfig,
- builder: Callable[[], S],
- folder: Callable[[S, V], S],
Build an empty accumulator, then combine values into it.
It is like
reduce_window
but uses a function to build the initial value.- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Clock.
windower – Windower.
builder – Called the first time a key appears and is expected to return the empty accumulator for that key.
folder – Combines a new value into an existing accumulator and returns the updated accumulator. The accumulator is initially the empty accumulator.
- Returns:
A keyed stream of the accumulators once each window has closed.
- join_window(
- step_id: str,
- clock: ClockConfig,
- windower: WindowConfig,
- *sides: KeyedStream[Any],
- product: bool = False,
Gather together the value for a key on multiple streams.
Warning
Currently this operator does not work correctly with the combination of
EventClockConfig
andSessionWindow
.- Parameters:
step_id – Unique ID.
clock – Clock.
windower – Windower.
*sides – Keyed streams.
product – When
True
, emit all combinations of all values seen on all sides. E.g. if side 1 saw"A"
and"B"
, and side 2 saw"C"
: emit("A", "C")
,("B", "C")
downstream. Defaults toFalse
.
- Returns:
Emits tuples with the value from each stream in the order of the argument list once each window has closed.
- join_window_named(
- step_id: str,
- clock: ClockConfig,
- windower: WindowConfig,
- product: bool = False,
- **sides: KeyedStream[Any],
Gather together the value for a key on multiple named streams.
Warning
Currently this operator does not work correctly with the combination of
EventClockConfig
andSessionWindow
.- Parameters:
step_id – Unique ID.
clock – Clock.
windower – Windower.
product – When
True
, emit all combinations of all values seen on all sides. E.g. if sideright
saw"A"
and"B"
, and sideleft
saw"C"
: emit{"right": "A", "left": "C"}
,{"right": "B", "left": "C"}
downstream. Defaults toFalse
.**sides – Named keyed streams. The name of each stream will be used in the emitted
dict
s.
- Returns:
Emits a
dict
mapping the name to the value from each stream once each window has closed.
- max_window(
- step_id: str,
- up: KeyedStream[V],
- clock: ClockConfig,
- windower: WindowConfig,
- by=_identity,
Find the maximum value for each key.
- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Clock.
windower – Windower.
by – A function called on each value that is used to extract what to compare.
- Returns:
A keyed stream of the max values once each window has closed.
- min_window(
- step_id: str,
- up: KeyedStream[V],
- clock: ClockConfig,
- windower: WindowConfig,
- by=_identity,
Find the minumum value for each key.
- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Clock.
windower – Windower.
by – A function called on each value that is used to extract what to compare.
- Returns:
A keyed stream of the min values once each window has closed.
- reduce_window(
- step_id: str,
- up: KeyedStream[V],
- clock: ClockConfig,
- windower: WindowConfig,
- reducer: Callable[[V, V], V],
Distill all values for a key down into a single value.
It is like
fold_window
but the first value is the initial accumulator.- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Clock.
windower – Windower.
reducer – Combines a new value into an old value and returns the combined value.
- Returns:
A keyed stream of the reduced values once each window has closed.