bytewax.operators.windowing#

Time-based windowing operators.

Data#

ZERO_TD: timedelta#

A zero length of time.

UTC_MAX: datetime#

Maximum possible UTC date time.

UTC_MIN: datetime#

Minimum possible UTC date time.

SC: TypeVar#

Type of ClockLogic’s state snapshots.

SW: TypeVar#

Type of WindowerLogic’s state snapshots.

C: TypeVar#

Type of downstream containers.

LATE_SESSION_ID: int#

Sentinel window ID assigned to late items.

Classes#

class ClockLogic#
Bases:

Abstract class to define a sense of time for windowing.

If you’re just getting started with windowing operators, see the concrete subclasses of Clock for built-in options.

This type is instantiated within Bytewax’s windowing operators and has methods called by the runtime. A new subclass of this will need to be paired with a new subclass of Clock, which holds the configuration data for this type.

This is instantiated for each key which is encountered.

abstract before_batch() None#

Prepare to process items incoming simultaneously.

Called once before a series of on_item calls.

You can use this to cache a “current time” or prepare other state.

abstract on_item(
value: V,
) Tuple[datetime, datetime]#

Get the timestamp and watermark after this item.

Called on each new upstream item.

If new timestamps will permanently affect the watermark, relevant state must be retained within.

Parameters:

value – The value of the upstream (key, value).

Returns:

A 2-tuple of timestamp for the item and the current watermark.

abstract on_notify() datetime#

Get the current watermark when there is no item.

Called when a window might need to close.

Returns:

The current watermark.

abstract on_eof() datetime#

Get the current watermark when no more items upstream.

Generally you’d return UTC_MAX if you want to close all windows just before stopping because the window semantics don’t make sense across a resume.

To support continuation resumes, this should not permanently affect the state that is snapshot.

Returns:

The current watermark.

abstract to_system_utc(
timestamp: datetime,
) Optional[datetime]#

Convert a timestamp to a UTC system time.

This is used to determine when the runtime should wake up to check for closed windows.

Parameters:

timestamp – Previously returned by on_item.

Returns:

A system time in UTC. If None, signal that there is no relevant system time to wake at.

abstract snapshot() S#

Return a immutable copy of the state for recovery.

This will be called periodically by the runtime.

The value returned here will be passed back to Clock.build when resuming.

The state must be pickle-able.

Danger

The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must copy.deepcopy or something equivalent before returning it here.

Returns:

The immutable state to be pickled.

class Clock#
Bases:

Abstract class defining a type of clock.

If you’re just getting started with windowing operators, see the concrete subclasses of Clock for built-in options.

Every subclass of this class must also have a matching implementation of ClockLogic that actually implements the clock.

abstract build(
resume_state: Optional[S],
) ClockLogic[V, S]#

Construct a new clock logic instance.

This should close over any non-state configuration and combine it with the resume state to return a prepared ClockLogic instance.

Parameters:

resume_state – Snapshot from the previous call to ClockLogic.snapshot, if any.

Returns:

A prepared clock logic.

class SystemClock#
Bases:

Uses the current system time as the timestamp for each item.

The watermark is the current system time.

All timestamps and watermarks are generated in UTC.

When the dataflow has no more input, all windows are closed.

build(resume_state: None) bytewax.operators.windowing._SystemClockLogic#
class EventClock#
Bases:

Use a timestamp embedded within each item.

The watermark is the largest timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means that items will be correctly processed as long as they do not have timestamps out-of-order or arrive later than the waiting duration.

Note

By default, all timestamps returned by ts_getter also need to be aware datetimes in UTC.

This is because now_getter defaults to returning the current system time as aware datetimes in UTC, and comparisons are made to that.

It’s easiest to convert your timestamps to UTC in ts_getter, but you can also specify a custom now_getter and to_system_utc to support other sense of time.

Parameters:
  • ts_getter – Called once on each item to get its timestamp.

  • wait_for_system_duration – How much system time to wait after seeing a timestamp to have the watermark catch up to that time.

  • now_getter – Return the current “system” timestamp used to advance the watermark. Defaults to the current system time in UTC.

  • to_system_utc – For a given window close timestamp, return the corresponding UTC system time that the runtime should use to know when to wake up to run window close logic. If None is returned, the runtime will not wake up to close windows and will depend on new items or EOF to trigger any closes. Defaults to passing through the timestamp.

ts_getter: Callable[[V], datetime]#
wait_for_system_duration: timedelta#
now_getter: Callable[[], datetime]#
to_system_utc: Callable[[datetime], Optional[datetime]]#
build(
resume_state: Optional[bytewax.operators.windowing._EventClockState],
) bytewax.operators.windowing._EventClockLogic[V]#
class WindowMetadata#

Metadata about a window.

The exact semantics of the fields will depend on the windower used. E.g. for SessionWindower, close_time is inclusive; but for SlidingWindower it is not.

open_time: datetime#

The timestamp this window opened.

close_time: datetime#

The timestamp this window closed.

merged_ids: Set[int]#

Any original window IDs merged into this window before close.

class WindowerLogic#
Bases:

Abstract class which defines a type of window.

If you’re just getting started with windowing operators, see the concrete subclasses of Windower for built-in options.

This type is instantiated within Bytewax’s windowing operators and has methods called by the runtime. A new subclass of this will need to be paired with a new subclass of Windower, which holds the configuration data for this type.

This is instantiated for each key which is encountered.

abstract open_for(timestamp: datetime) Iterable[int]#

Find which windows an item is in and mark them as open.

Called when a new, non-late item arrives.

You’ll need to do whatever bookkeeping is necessary internally to be able to satisfy the other abstract methods, like being able to return the metadata for a window.

Parameters:

timestamp – Of the incoming item.

Returns:

A list of window IDs that this item is in.

abstract late_for(timestamp: datetime) Iterable[int]#

Find which windows an item would have been in, if on-time.

Called when a late item arrives.

If there isn’t a way to know for sure which specific windows this item would have been in, you can return a sentinel value.

This should not persist any state internally for the returned window IDs; the watermark has already passed these windows and so they will not be processed by close_for.

Parameters:

timestamp – Of the incoming item.

Returns:

A list of window IDs that this item is late for.

abstract merged() Iterable[Tuple[int, int]]#

Report any windows that have been merged.

A merge is reported by saying an original window should be merged into a target window.

This will be called after a batch of items has been processed. The system assumes merges can only occur because of incoming items.

You only need to report a merge once. After a window ID has been returned as an original window, that ID does not need to be reported as closed.

Returns:

An iterable of 2-tuples of (original_window_id, target_window_id).

abstract close_for(
watermark: datetime,
) Iterable[Tuple[int, WindowMetadata]]#

Report what windows are now closed.

Called periodically once the watermark has advanced.

Once a window ID is returned from this method, it should never be opened again (although items may be reported as late for it). Internally, you can discard whatever state was being maintained about this window.

Parameters:

watermark – Current watermark. Guaranteed to never regress across calls.

Returns:

A list of now closed window IDs and the their final metadata.

abstract notify_at() Optional[datetime]#

Next system time timestamp for which a window might close.

This will be called periodically to know when to wake up this operator to close windows, even if no items were received.

Returns:

System time of when to next wake up.

abstract is_empty() bool#

If no state needs to be maintained anymore.

This, in general, is if there are no windows currently open. But you might have other state that needs to persist across resumes, and so might return True here to maintain that.

If False is returned here, the entire windower will be discarded as to not “leak” memory. A new windower will be built if this state key is encountered again.

Returns:

If no state needs to be maintained.

abstract snapshot() S#

Return a immutable copy of the state for recovery.

This will be called periodically by the runtime.

The value returned here will be passed back to Windower.build when resuming.

The state must be pickle-able.

Danger

The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must copy.deepcopy or something equivalent before returning it here.

Returns:

The immutable state to be pickled.

class Windower#
Bases:

A type of window.

If you’re just getting started with windowing operators, see the concrete subclasses of Windower for built-in options.

Every subclass of this class must also have a matching implementation of WindowerLogic that actually implements the window.

abstract build(
resume_state: Optional[S],
) WindowerLogic[S]#

Construct a new windower logic instance.

This should close over any non-state configuration and combine it with the resume state to return a prepared WindowerLogic instance.

Parameters:

resume_state – Snapshot from the previous call to WindowerLogic.snapshot, if any.

Returns:

A prepared windower logic.

class SlidingWindower#
Bases:

Sliding windows of fixed duration.

If offset == length, windows cover all time but do not overlap. Each item will fall in exactly one window. This is equivalent to using TumblingWindower.

If offset < length, windows overlap. Items could call in multiple windows.

You are not allowed to set offset > length, otherwise there would be undefined gaps between windows which would lose items.

Window open times are inclusive, but close times are exclusive.

Warning

Changing any of the windowing parameters across executions will result in incorrect output for any windows that were open at the time of the resume snapshot.

Parameters:
  • length – Length of windows.

  • offset – Duration between start times of adjacent windows.

  • align_to – Align windows so this instant starts a window. You can use this to align all windows to an hour boundary, e.g.

length: timedelta#
offset: timedelta#
align_to: datetime#
build(
resume_state: Optional[bytewax.operators.windowing._SlidingWindowerState],
) bytewax.operators.windowing._SlidingWindowerLogic#
class TumblingWindower#
Bases:

Tumbling windows of fixed duration.

Each item will fall in exactly one window.

Window open times are inclusive, but close times are exclusive.

Warning

Changing any of the windowing parameters across executions will result in incorrect output for any windows that were open at the time of the resume snapshot.

Parameters:
  • length – Length of windows.

  • align_to – Align windows so this instant starts a window. You can use this to align all windows to an hour boundary, e.g.

length: timedelta#
align_to: datetime#
build(
resume_state: Optional[bytewax.operators.windowing._SlidingWindowerState],
) bytewax.operators.windowing._SlidingWindowerLogic#
class SessionWindower#
Bases:

Session windows with a fixed inactivity gap.

Parameters:

gap – Gap of inactivity before considering a session closed. The gap must not be negative.

gap: timedelta#
build(
resume_state: Optional[bytewax.operators.windowing._SessionWindowerState],
) bytewax.operators.windowing._SessionWindowerLogic#
class WindowLogic#
Bases:

Abstract class to define a window operator.

That operator will call these methods for you.

A unique instance of this logic will be instantiated for each unique window within each unique key.

abstract on_value(value: V) Iterable[W]#

Called on each new upstream item within this window.

This will be called with values in timestamp order.

Parameters:

value – The value of the upstream (key, value).

Returns:

Any values to emit downstream. Values will automatically be wrapped with key and window ID.

abstract on_merge(
original: Self,
) Iterable[W]#

Called when two windows should merge.

Not all windowers result in merges.

The original window state will be deleted after this.

Parameters:

original – Other logic to consume the state within.

Returns:

Any values to emit downstream. Values will automatically be wrapped with key and window ID.

abstract on_close() Iterable[W]#

Called when this window closes.

Returns:

Any values to emit downstream. Values will automatically be wrapped with key and window ID.

abstract snapshot() S#

Return a immutable copy of the state for recovery.

This will be called periodically by the runtime.

The value returned here will be passed back to the builder function of window when resuming.

The state must be pickle-able.

Danger

The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must copy.deepcopy or something equivalent before returning it here.

Returns:

The immutable state to be pickled.

class WindowOut#
Bases:

Streams returned from a windowing operator.

down: KeyedStream[Tuple[int, W_co]]#

Items emitted from this operator.

Sub-keyed by window ID.

late: KeyedStream[Tuple[int, V]]#

Upstreams items that were deemed late for a window.

It’s possible you will see window IDs here that were never in the down or meta streams, depending on the specifics of the ordering of the data.

Sub-keyed by window ID.

meta: KeyedStream[Tuple[int, WindowMetadata]]#

Metadata about closed windows.

Emitted once when that window closes. Not emitted for original windows that are merged into another window. The target window’s WindowMetadata will have the original window IDs in merged_ids.

Sub-keyed by window ID.

Functions#

window(
step_id: str,
up: KeyedStream[V],
clock: Clock[V, Any],
windower: Windower[Any],
builder: Callable[[Optional[S]], WindowLogic[V, W, S]],
ordered: bool = True,
) WindowOut[V, W]#

Advanced generic windowing operator.

This is a lower-level operator Bytewax and gives you control over when a windowing operator emits items. WindowLogic works in tandem with ClockLogic and WindowerLogic to implement its behavior. See documentation of those interfaces.

Parameters:
  • step_id – Unique ID.

  • up – Keyed upstream.

  • clock – Time definition.

  • windower – Window definition.

  • builder – Called whenever a new window is opened with the resume state returned from WindowLogic.snapshot for that window, if any. This should close over any non-state configuration and combine it with the resume state to return a prepared WindowLogic for this window.

  • ordered – Whether to apply values to the logic in timestamp order. If not, they’ll be in upstream order. There is a performance and latency penalty to ordering by timestamp. Defaults to True.

Returns:

Window result streams.

collect_window(
step_id: str,
up: KeyedStream[V],
clock: Clock[V, Any],
windower: Windower[Any],
into=list,
ordered: bool = True,
) WindowOut[V, Any]#

Collect items in a window into a container.

See bytewax.operators.collect for the ability to set a max size.

Parameters:
  • step_id – Unique ID.

  • up – Stream of items to count.

  • clock – Time definition.

  • windower – Window definition.

  • into – Type to collect into. Defaults to list.

  • ordered – Whether values in the resulting containers are in timestamp order. There is a performance and latency penalty to ordering by timestamp. If not, they’ll be in upstream order. Defaults to True.

Returns:

Window result streams. Downstream contains the collected containers with values in timestamp order at the end of each window.

count_window(
step_id: str,
up: Stream[X],
clock: Clock[X, SC],
windower: Windower[Any],
key: Callable[[X], str],
) WindowOut[X, int]#

Count the number of occurrences of items in a window.

Parameters:
  • step_id – Unique ID.

  • up – Stream of items to count.

  • clock – Time definition.

  • windower – Window definition.

  • key – Function to convert each item into a string key. The counting machinery does not compare the items directly, instead it groups by this string key.

Returns:

Window result streams. Downstream contains of (key, count) per window at the end of each window.

fold_window(
step_id: str,
up: KeyedStream[V],
clock: Clock[V, Any],
windower: Windower[Any],
builder: Callable[[], S],
folder: Callable[[S, V], S],
merger: Callable[[S, S], S],
ordered: bool = True,
) WindowOut[V, S]#

Build an empty accumulator, then combine values into it.

It is like reduce_window but uses a function to build the initial value.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Time definition.

  • windower – Window definition.

  • builder – Called the first time a key appears and is expected to return the empty accumulator for that key.

  • folder – Combines a new value into an existing accumulator and returns the updated accumulator. The accumulator is initially the empty accumulator. Values will be passed in timestamp order.

  • merger – Combines two states whenever two windows merge. Not all window definitions result in merges.

  • ordered – Whether to fold values in timestamp order. If not, they’ll be in upstream order. There is a performance and latency penalty to ordering by timestamp. Defaults to True.

Returns:

Window result streams. Downstream contains the accumulator for each window, once that window has closed.

join_window(
step_id: str,
clock: Clock[Any, Any],
windower: Windower[Any],
*sides: KeyedStream[Any],
insert_mode: JoinInsertMode = 'last',
emit_mode: JoinEmitMode = 'final',
) WindowOut[Any, Tuple]#

Gather together the value for a key on multiple streams.

See Joins for more information.

Parameters:
  • step_id – Unique ID.

  • clock – Time definition.

  • windower – Window definition.

  • *sides – Keyed streams.

  • insert_mode – Mode of this join. See JoinInsertMode for more info. Defaults to "last".

  • emit_mode – Mode of this join. See JoinEmitMode for more info. Defaults to "final".

Returns:

Window result streams. Downstream contains tuples with the value from each stream in the order of the argument list. See JoinEmitMode for when tuples are emitted.

max_window(
step_id: str,
up: KeyedStream[V],
clock: Clock[V, Any],
windower: Windower[Any],
by=_identity,
) WindowOut[V, V]#

Find the maximum value for each key.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Time definition.

  • windower – Window definition.

  • by – A function called on each value that is used to extract what to compare.

Returns:

Window result streams. Downstream contains the max value for each window, once that window has closed.

min_window(
step_id: str,
up: KeyedStream[V],
clock: Clock[V, Any],
windower: Windower[Any],
by=_identity,
) WindowOut[V, V]#

Find the minumum value for each key.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Time definition.

  • windower – Window definition.

  • by – A function called on each value that is used to extract what to compare.

Returns:

Window result streams. Downstream contains the min value for each window, once that window has closed.

reduce_window(
step_id: str,
up: KeyedStream[V],
clock: Clock[V, Any],
windower: Windower[Any],
reducer: Callable[[V, V], V],
) WindowOut[V, V]#

Distill all values for a key down into a single value.

It is like fold_window but the first value is the initial accumulator.

Parameters:
  • step_id – Unique ID.

  • up – Keyed stream.

  • clock – Time definition.

  • windower – Window definition.

  • reducer – Combines a new value into an old value and returns the combined value.

Returns:

Window result streams. Downstream contains the reduced value for each window, once that window has closed.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now