bytewax.operators.window#

Time-based windowing operators.

Data#

C: TypeVar#

Type of downstream containers.

DK: TypeVar#

Type of dict keys.

DV: TypeVar#

Type of dict values.

Classes#

class ClockConfig#

Base class for a clock config.

This describes how a windowing operator should determine the current time and the time for each element.

Use a specific subclass of this that matches the time definition you’d like to use.

Initialization

class EventClockConfig(dt_getter, wait_for_system_duration)#
Bases:

Use a getter function to lookup the timestamp for each item.

The watermark is the largest item timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means items will be correctly processed as long as they are not out of order more than the waiting duration in system time.

If the dataflow has no more input, all windows are closed.

Parameters:
  • dt_getter (Callable[[Any], datetime]) – Returns the timestamp for an item. The datetime returned must have tzinfo set to timezone.utc. E.g. datetime(1970, 1, 1, tzinfo=timezone.utc)

  • wait_for_system_duration (timedelta) – How much time to wait before considering an event late.

Returns:

Config object. Pass this as the clock_config parameter to your windowing operator.

Initialization

property dt_getter#
property wait_for_system_duration#
class SessionWindow(gap)#
Bases:

Session windowing with a fixed inactivity gap.

Each time a new item is received, it is added to the latest window if the time since the latest event is < gap. Otherwise a new window is created that starts at current clock’s time.

Warning

Currently, session windows do not support out-of-order data. Out of order data will be placed in their own sessions rather than merging adjacent sessions.

Ensure that your data source is always in order if using an EventClockConfig. Even if it is in-order, you cannot use event time session windows with any windowing join operator.

SystemClockConfig is always in order, so should be fine to use with any operator.

Parameters:

gap (timedelta) – Gap of inactivity before considering a session closed. The gap should not be negative.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property gap#
class SlidingWindow(length, offset, align_to)#
Bases:

Sliding windows of fixed duration.

If offset == length, windows cover all time but do not overlap. Each item will fall in exactly one window. This would be equivalent to a TumblingWindow.

If offset < length, windows overlap. Each item will fall in multiple windows.

If offset > length, there will be gaps between windows. Each item can fall in up to one window, but might fall into none.

Window start times are inclusive, but end times are exclusive.

Parameters:
  • length (timedelta) – Length of windows.

  • offset (timedelta) – Duration between start times of adjacent windows.

  • align_to (datetime) – Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property align_to#
property length#
property offset#
class SystemClockConfig#
Bases:

Use the current system time as the timestamp for each item.

The watermark is also the current system time.

If the dataflow has no more input, all windows are closed.

Returns: Config object. Pass this as the clock_config parameter to your windowing operator.

Initialization

class TumblingWindow(length, align_to)#
Bases:

Tumbling windows of fixed duration.

Each item will fall in exactly one window.

Window start times are inclusive, but end times are exclusive.

Parameters:
  • length (timedelta) – Length of windows.

  • align_to (timedelta) – Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property align_to#
property length#
class WindowConfig#

Base class for a windower config.

This describes the type of windows you would like.

Use a specific subclass of this that matches the window definition you’d like to use.

Initialization

class WindowMetadata(open_time, close_time)#

Contains information about a window.

Initialization

property close_time#

The time that the window closes.

For some window types like SessionWindow, this value can change as new data is received.

property open_time#

The time that the window starts.

Functions#

collect_window(
step_id: str,
up: KeyedStream,
clock: ClockConfig,
windower: WindowConfig,
into: Type[C] = list,
) KeyedStream[Tuple[WindowMetadata, C]]#

Collect items in a window into a container.

See bytewax.operators.collect for the ability to set a max size.

Parameters:
  • step_id – Unique ID.

  • up – Stream of items to count.

  • clock – Clock.

  • windower – Windower.

  • into – Type to collect into. Defaults to list.

Returns:

A keyed stream of the collected containers at the end of each window.

count_window(
step_id: str,
up: Stream[X],
clock: ClockConfig,
windower: WindowConfig,
key: Callable[[X], str],
) KeyedStream[Tuple[WindowMetadata, int]]#

Count the number of occurrences of items in a window.

Parameters:
  • step_id – Unique ID.

  • up – Stream of items to count.

  • clock – Clock.

  • windower – Windower.

  • key – Function to convert each item into a string key. The counting machinery does not compare the items directly, instead it groups by this string key.

Returns:

A stream of (key, count) per window at the end of each window.

fold_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
builder: Callable[[], S],
folder: Callable[[S, V], S],
) KeyedStream[Tuple[WindowMetadata, S]]#

Build an empty accumulator, then combine values into it.

It is like reduce_window but uses a function to build the initial value.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Clock.

  • windower – Windower.

  • builder – Called the first time a key appears and is expected to return the empty accumulator for that key.

  • folder – Combines a new value into an existing accumulator and returns the updated accumulator. The accumulator is initially the empty accumulator.

Returns:

A keyed stream of the accumulators once each window has closed.

join_window(
step_id: str,
clock: ClockConfig,
windower: WindowConfig,
*sides: KeyedStream[Any],
product: bool = False,
) KeyedStream[Tuple[WindowMetadata, Tuple]]#

Gather together the value for a key on multiple streams.

Warning

Currently this operator does not work correctly with the combination of EventClockConfig and SessionWindow.

Parameters:
  • step_id – Unique ID.

  • clock – Clock.

  • windower – Windower.

  • *sides – Keyed streams.

  • product – When True, emit all combinations of all values seen on all sides. E.g. if side 1 saw "A" and "B", and side 2 saw "C": emit ("A", "C"), ("B", "C") downstream. Defaults to False.

Returns:

Emits tuples with the value from each stream in the order of the argument list once each window has closed.

join_window_named(
step_id: str,
clock: ClockConfig,
windower: WindowConfig,
product: bool = False,
**sides: KeyedStream[Any],
) KeyedStream[Tuple[WindowMetadata, Dict[str, Any]]]#

Gather together the value for a key on multiple named streams.

Warning

Currently this operator does not work correctly with the combination of EventClockConfig and SessionWindow.

Parameters:
  • step_id – Unique ID.

  • clock – Clock.

  • windower – Windower.

  • product – When True, emit all combinations of all values seen on all sides. E.g. if side right saw "A" and "B", and side left saw "C": emit {"right": "A", "left": "C"}, {"right": "B", "left": "C"} downstream. Defaults to False.

  • **sides – Named keyed streams. The name of each stream will be used in the emitted dicts.

Returns:

Emits a dict mapping the name to the value from each stream once each window has closed.

max_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
by=_identity,
) KeyedStream[Tuple[WindowMetadata, V]]#

Find the maximum value for each key.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Clock.

  • windower – Windower.

  • by – A function called on each value that is used to extract what to compare.

Returns:

A keyed stream of the max values once each window has closed.

min_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
by=_identity,
) KeyedStream[Tuple[WindowMetadata, V]]#

Find the minumum value for each key.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Clock.

  • windower – Windower.

  • by – A function called on each value that is used to extract what to compare.

Returns:

A keyed stream of the min values once each window has closed.

reduce_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
reducer: Callable[[V, V], V],
) KeyedStream[Tuple[WindowMetadata, V]]#

Distill all values for a key down into a single value.

It is like fold_window but the first value is the initial accumulator.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Clock.

  • windower – Windower.

  • reducer – Combines a new value into an old value and returns the combined value.

Returns:

A keyed stream of the reduced values once each window has closed.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now