bytewax.operators.window#

Time-based windowing operators.

Data#

C: TypeVar#
K: TypeVar#
S: TypeVar#
V: TypeVar#
W: TypeVar#
X: TypeVar#
Y: TypeVar#

Classes#

class ClockConfig#

Base class for a clock config.

This describes how a windowing operator should determine the current time and the time for each element.

Use a specific subclass of this that matches the time definition you’d like to use.

Initialization

class EventClockConfig(dt_getter, wait_for_system_duration)#
Bases:

Use a getter function to lookup the timestamp for each item.

The watermark is the largest item timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means items will be correctly processed as long as they are not out of order more than the waiting duration in system time.

If the dataflow has no more input, all windows are closed.

Parameters:
  • dt_getter (Callable[[Any], datetime]) – Returns the timestamp for an item. The datetime returned must have tzinfo set to timezone.utc. E.g. datetime(1970, 1, 1, tzinfo=timezone.utc)

  • wait_for_system_duration (timedelta) – How much time to wait before considering an event late.

Returns:

Config object. Pass this as the clock_config parameter to your windowing operator.

Initialization

property wait_for_system_duration#
property dt_getter#
class SessionWindow(gap)#
Bases:

Session windowing with a fixed inactivity gap.

Each time a new item is received, it is added to the latest window if the time since the latest event is < gap. Otherwise a new window is created that starts at current clock’s time.

Parameters:

gap (timedelta) – Gap of inactivity before considering a session closed. The gap should not be negative.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property gap#
class SlidingWindow(length, offset, align_to)#
Bases:

Sliding windows of fixed duration.

If offset == length, windows cover all time but do not overlap. Each item will fall in exactly one window. This would be equivalent to a TumblingWindow.

If offset < length, windows overlap. Each item will fall in multiple windows.

If offset > length, there will be gaps between windows. Each item can fall in up to one window, but might fall into none.

Window start times are inclusive, but end times are exclusive.

Parameters:
  • length (timedelta) – Length of windows.

  • offset (timedelta) – Duration between start times of adjacent windows.

  • align_to (datetime) – Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property align_to#
property offset#
property length#
class SystemClockConfig#
Bases:

Use the current system time as the timestamp for each item.

The watermark is also the current system time.

If the dataflow has no more input, all windows are closed.

Returns: Config object. Pass this as the clock_config parameter to your windowing operator.

Initialization

class TumblingWindow(length, align_to)#
Bases:

Tumbling windows of fixed duration.

Each item will fall in exactly one window.

Window start times are inclusive, but end times are exclusive.

Parameters:
  • length (timedelta) – Length of windows.

  • align_to (timedelta) – Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property length#
property align_to#
class WindowConfig#

Base class for a windower config.

This describes the type of windows you would like.

Use a specific subclass of this that matches the window definition you’d like to use.

Initialization

class WindowMetadata(open_time, close_time)#

Contains information about a window.

Initialization

property open_time#

The time that the window starts.

property close_time#

The time that the window closes.

For some window types like SessionWindow, this value can change as new data is received.

Functions#

collect_window(
step_id: str,
up: KeyedStream,
clock: ClockConfig,
windower: WindowConfig,
into: Type[C] = list,
) KeyedStream[Tuple[WindowMetadata, C]]#

Collect items in a window into a container.

See bytewax.operators.collect for the ability to set a max size.

Args: step_id: Unique ID.

up: Stream of items to count.

clock: Clock.

windower: Windower.

into: Type to collect into. Defaults to `list`.

Returns: A keyed stream of the collected containers at the end of each window.

count_window(
step_id: str,
up: Stream[X],
clock: ClockConfig,
windower: WindowConfig,
key: Callable[[X], str],
) KeyedStream[Tuple[WindowMetadata, int]]#

Count the number of occurrences of items in a window.

Args: step_id: Unique ID.

up: Stream of items to count.

clock: Clock.

windower: Windower.

key: Function to convert each item into a string key. The
    counting machinery does not compare the items directly,
    instead it groups by this string key.

Returns: A stream of (key, count) per window at the end of each window.

fold_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
builder: Callable[[], S],
folder: Callable[[S, V], S],
) KeyedStream[Tuple[WindowMetadata, S]]#

Build an empty accumulator, then combine values into it.

It is like reduce_window but uses a function to build the initial value.

Args: step_id: Unique ID.

up: Keyed stream.

clock: Clock.

windower: Windower.

builder: Called the first time a key appears and is expected
    to return the empty accumulator for that key.

folder: Combines a new value into an existing accumulator and
    returns the updated accumulator. The accumulator is
    initially the empty accumulator.

Returns: A keyed stream of the accumulators once each window has closed.

join_window(
step_id: str,
clock: ClockConfig,
windower: WindowConfig,
*sides: KeyedStream[Any],
product: bool = False,
) KeyedStream[Tuple[WindowMetadata, Tuple]]#

Gather together the value for a key on multiple streams.

Args: step_id: Unique ID.

clock: Clock.

windower: Windower.

*sides: Keyed streams.

product: When `True`, emit all combinations of all values seen
    on all sides. E.g. if side 1 saw `"A"` and `"B"`, and side
    2 saw `"C"`: emit `("A", "C")`, `("B", "C")` downstream.
    Defaults to `False`.

Returns: Emits tuples with the value from each stream in the order of the argument list once each window has closed.

join_window_named(
step_id: str,
clock: ClockConfig,
windower: WindowConfig,
product: bool = False,
**sides: KeyedStream[Any],
) KeyedStream[Tuple[WindowMetadata, Dict[str, Any]]]#

Gather together the value for a key on multiple named streams.

Args: step_id: Unique ID.

clock: Clock.

windower: Windower.

product: When `True`, emit all combinations of all values seen
    on all sides. E.g. if side `right` saw `"A"` and `"B"`,
    and side `left` saw `"C"`: emit `{"right": "A", "left":
    "C"}`, `{"right": "B", "left": "C"}` downstream. Defaults
    to `False`.

**sides: Named keyed streams. The name of each stream will be
    used in the emitted `dict`s.

Returns: Emits a dict mapping the name to the value from each stream once each window has closed.

max_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
by=_identity,
) KeyedStream[Tuple[WindowMetadata, V]]#

Find the minumum value for each key.

Args: step_id: Unique ID.

up: Keyed stream.

clock: Clock.

windower: Windower.

by: A function called on each value that is used to extract
    what to compare.

Returns: A keyed stream of the min values once each window has closed.

min_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
by=_identity,
) KeyedStream[Tuple[WindowMetadata, V]]#

Find the minumum value for each key.

Args: step_id: Unique ID.

up: Keyed stream.

clock: Clock.

windower: Windower.

by: A function called on each value that is used to extract
    what to compare.

Returns: A keyed stream of the min values once each window has closed.

reduce_window(
step_id: str,
up: KeyedStream[V],
clock: ClockConfig,
windower: WindowConfig,
reducer: Callable[[V, V], V],
) KeyedStream[Tuple[WindowMetadata, V]]#

Distill all values for a key down into a single value.

It is like fold_window but the first value is the initial accumulator.

Args: step_id: Unique ID.

up: Keyed stream.

clock: Clock.

windower: Windower.

reducer: Combines a new value into an old value and returns
    the combined value.

Returns: A keyed stream of the reduced values once each window has closed.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now