bytewax.operators.windowing
#
Time-based windowing operators.
Data#
- SC: TypeVar#
Type of
ClockLogic
’s state snapshots.
- SW: TypeVar#
Type of
WindowerLogic
’s state snapshots.
Classes#
- class ClockLogic#
-
Abstract class to define a sense of time for windowing.
If you’re just getting started with windowing operators, see the concrete subclasses of
Clock
for built-in options.This type is instantiated within Bytewax’s windowing operators and has methods called by the runtime. A new subclass of this will need to be paired with a new subclass of
Clock
, which holds the configuration data for this type.This is instantiated for each key which is encountered.
- abstract before_batch() None #
Prepare to process items incoming simultaneously.
Called once before a series of
on_item
calls.You can use this to cache a “current time” or prepare other state.
- abstract on_item(
- value: V,
Get the timestamp and watermark after this item.
Called on each new upstream item.
If new timestamps will permanently affect the watermark, relevant state must be retained within.
- Parameters:
value – The value of the upstream
(key, value)
.- Returns:
A 2-tuple of timestamp for the item and the current watermark.
- abstract on_notify() datetime #
Get the current watermark when there is no item.
Called when a window might need to close.
- Returns:
The current watermark.
- abstract on_eof() datetime #
Get the current watermark when no more items upstream.
Generally you’d return
UTC_MAX
if you want to close all windows just before stopping because the window semantics don’t make sense across a resume.To support continuation resumes, this should not permanently affect the state that is snapshot.
- Returns:
The current watermark.
- abstract to_system_utc(
- timestamp: datetime,
Convert a timestamp to a UTC system time.
This is used to determine when the runtime should wake up to check for closed windows.
- Parameters:
timestamp – Previously returned by
on_item
.- Returns:
A system time in UTC. If
None
, signal that there is no relevant system time to wake at.
- abstract snapshot() S #
Return a immutable copy of the state for recovery.
This will be called periodically by the runtime.
The value returned here will be passed back to
Clock.build
when resuming.The state must be
pickle
-able.Danger
The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must
copy.deepcopy
or something equivalent before returning it here.- Returns:
The immutable state to be
pickle
d.
- class Clock#
-
Abstract class defining a type of clock.
If you’re just getting started with windowing operators, see the concrete subclasses of
Clock
for built-in options.Every subclass of this class must also have a matching implementation of
ClockLogic
that actually implements the clock.- abstract build( ) ClockLogic[V, S] #
Construct a new clock logic instance.
This should close over any non-state configuration and combine it with the resume state to return a prepared
ClockLogic
instance.- Parameters:
resume_state – Snapshot from the previous call to
ClockLogic.snapshot
, if any.- Returns:
A prepared clock logic.
- class SystemClock#
-
Uses the current system time as the timestamp for each item.
The watermark is the current system time.
All timestamps and watermarks are generated in UTC.
When the dataflow has no more input, all windows are closed.
- class EventClock#
-
Use a timestamp embedded within each item.
The watermark is the largest timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means that items will be correctly processed as long as they do not have timestamps out-of-order or arrive later than the waiting duration.
Note
By default, all timestamps returned by
ts_getter
also need to be aware datetimes in UTC.This is because
now_getter
defaults to returning the current system time as aware datetimes in UTC, and comparisons are made to that.It’s easiest to convert your timestamps to UTC in
ts_getter
, but you can also specify a customnow_getter
andto_system_utc
to support other sense of time.- Parameters:
ts_getter – Called once on each item to get its timestamp.
wait_for_system_duration – How much system time to wait after seeing a timestamp to have the watermark catch up to that time.
now_getter – Return the current “system” timestamp used to advance the watermark. Defaults to the current system time in UTC.
to_system_utc – For a given window close timestamp, return the corresponding UTC system time that the runtime should use to know when to wake up to run window close logic. If
None
is returned, the runtime will not wake up to close windows and will depend on new items or EOF to trigger any closes. Defaults to passing through the timestamp.
- class WindowMetadata#
Metadata about a window.
The exact semantics of the fields will depend on the windower used. E.g. for
SessionWindower
,close_time
is inclusive; but forSlidingWindower
it is not.
- class WindowerLogic#
-
Abstract class which defines a type of window.
If you’re just getting started with windowing operators, see the concrete subclasses of
Windower
for built-in options.This type is instantiated within Bytewax’s windowing operators and has methods called by the runtime. A new subclass of this will need to be paired with a new subclass of
Windower
, which holds the configuration data for this type.This is instantiated for each key which is encountered.
- abstract open_for(timestamp: datetime) Iterable[int] #
Find which windows an item is in and mark them as open.
Called when a new, non-late item arrives.
You’ll need to do whatever bookkeeping is necessary internally to be able to satisfy the other abstract methods, like being able to return the metadata for a window.
- Parameters:
timestamp – Of the incoming item.
- Returns:
A list of window IDs that this item is in.
- abstract late_for(timestamp: datetime) Iterable[int] #
Find which windows an item would have been in, if on-time.
Called when a late item arrives.
If there isn’t a way to know for sure which specific windows this item would have been in, you can return a sentinel value.
This should not persist any state internally for the returned window IDs; the watermark has already passed these windows and so they will not be processed by
close_for
.- Parameters:
timestamp – Of the incoming item.
- Returns:
A list of window IDs that this item is late for.
- abstract merged() Iterable[Tuple[int, int]] #
Report any windows that have been merged.
A merge is reported by saying an original window should be merged into a target window.
This will be called after a batch of items has been processed. The system assumes merges can only occur because of incoming items.
You only need to report a merge once. After a window ID has been returned as an original window, that ID does not need to be reported as closed.
- Returns:
An iterable of 2-tuples of
(original_window_id, target_window_id)
.
- abstract close_for(
- watermark: datetime,
Report what windows are now closed.
Called periodically once the watermark has advanced.
Once a window ID is returned from this method, it should never be opened again (although items may be reported as late for it). Internally, you can discard whatever state was being maintained about this window.
- Parameters:
watermark – Current watermark. Guaranteed to never regress across calls.
- Returns:
A list of now closed window IDs and the their final metadata.
- abstract notify_at() Optional[datetime] #
Next system time timestamp for which a window might close.
This will be called periodically to know when to wake up this operator to close windows, even if no items were received.
- Returns:
System time of when to next wake up.
- abstract is_empty() bool #
If no state needs to be maintained anymore.
This, in general, is if there are no windows currently open. But you might have other state that needs to persist across resumes, and so might return
True
here to maintain that.If
False
is returned here, the entire windower will be discarded as to not “leak” memory. A new windower will be built if this state key is encountered again.- Returns:
If no state needs to be maintained.
- abstract snapshot() S #
Return a immutable copy of the state for recovery.
This will be called periodically by the runtime.
The value returned here will be passed back to
Windower.build
when resuming.The state must be
pickle
-able.Danger
The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must
copy.deepcopy
or something equivalent before returning it here.- Returns:
The immutable state to be
pickle
d.
- class Windower#
-
A type of window.
If you’re just getting started with windowing operators, see the concrete subclasses of
Windower
for built-in options.Every subclass of this class must also have a matching implementation of
WindowerLogic
that actually implements the window.- abstract build( ) WindowerLogic[S] #
Construct a new windower logic instance.
This should close over any non-state configuration and combine it with the resume state to return a prepared
WindowerLogic
instance.- Parameters:
resume_state – Snapshot from the previous call to
WindowerLogic.snapshot
, if any.- Returns:
A prepared windower logic.
- class SlidingWindower#
- Bases:
Windower
[
_SlidingWindowerState
]
Sliding windows of fixed duration.
If
offset == length
, windows cover all time but do not overlap. Each item will fall in exactly one window. This is equivalent to usingTumblingWindower
.If
offset < length
, windows overlap. Items could call in multiple windows.You are not allowed to set
offset > length
, otherwise there would be undefined gaps between windows which would lose items.Window open times are inclusive, but close times are exclusive.
Warning
Changing any of the windowing parameters across executions will result in incorrect output for any windows that were open at the time of the resume snapshot.
- Parameters:
length – Length of windows.
offset – Duration between start times of adjacent windows.
align_to – Align windows so this instant starts a window. You can use this to align all windows to an hour boundary, e.g.
- class TumblingWindower#
- Bases:
Windower
[
_SlidingWindowerState
]
Tumbling windows of fixed duration.
Each item will fall in exactly one window.
Window open times are inclusive, but close times are exclusive.
Warning
Changing any of the windowing parameters across executions will result in incorrect output for any windows that were open at the time of the resume snapshot.
- Parameters:
length – Length of windows.
align_to – Align windows so this instant starts a window. You can use this to align all windows to an hour boundary, e.g.
- class SessionWindower#
- Bases:
Windower
[
_SessionWindowerState
]
Session windows with a fixed inactivity gap.
- Parameters:
gap – Gap of inactivity before considering a session closed. The gap must not be negative.
- class WindowLogic#
-
Abstract class to define a
window
operator.That operator will call these methods for you.
A unique instance of this logic will be instantiated for each unique window within each unique key.
- abstract on_value(value: V) Iterable[W] #
Called on each new upstream item within this window.
This will be called with values in timestamp order.
- Parameters:
value – The value of the upstream
(key, value)
.- Returns:
Any values to emit downstream. Values will automatically be wrapped with key and window ID.
- abstract on_merge(
- original: Self,
Called when two windows should merge.
Not all windowers result in merges.
The
original
window state will be deleted after this.- Parameters:
original – Other logic to consume the state within.
- Returns:
Any values to emit downstream. Values will automatically be wrapped with key and window ID.
- abstract on_close() Iterable[W] #
Called when this window closes.
- Returns:
Any values to emit downstream. Values will automatically be wrapped with key and window ID.
- abstract snapshot() S #
Return a immutable copy of the state for recovery.
This will be called periodically by the runtime.
The value returned here will be passed back to the
builder
function ofwindow
when resuming.The state must be
pickle
-able.Danger
The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must
copy.deepcopy
or something equivalent before returning it here.- Returns:
The immutable state to be
pickle
d.
- class WindowOut#
-
Streams returned from a windowing operator.
- down: KeyedStream[Tuple[int, W_co]]#
Items emitted from this operator.
Sub-keyed by window ID.
- late: KeyedStream[Tuple[int, V]]#
Upstreams items that were deemed late for a window.
It’s possible you will see window IDs here that were never in the
down
ormeta
streams, depending on the specifics of the ordering of the data.Sub-keyed by window ID.
- meta: KeyedStream[Tuple[int, WindowMetadata]]#
Metadata about closed windows.
Emitted once when that window closes. Not emitted for original windows that are merged into another window. The target window’s
WindowMetadata
will have the original window IDs inmerged_ids
.Sub-keyed by window ID.
Functions#
- window(
- step_id: str,
- up: KeyedStream[V],
- clock: Clock[V, Any],
- windower: Windower[Any],
- builder: Callable[[Optional[S]], WindowLogic[V, W, S]],
- ordered: bool = True,
Advanced generic windowing operator.
This is a lower-level operator Bytewax and gives you control over when a windowing operator emits items.
WindowLogic
works in tandem withClockLogic
andWindowerLogic
to implement its behavior. See documentation of those interfaces.- Parameters:
step_id – Unique ID.
up – Keyed upstream.
clock – Time definition.
windower – Window definition.
builder – Called whenever a new window is opened with the resume state returned from
WindowLogic.snapshot
for that window, if any. This should close over any non-state configuration and combine it with the resume state to return a preparedWindowLogic
for this window.ordered – Whether to apply values to the logic in timestamp order. If not, they’ll be in upstream order. There is a performance and latency penalty to ordering by timestamp. Defaults to
True
.
- Returns:
Window result streams.
- collect_window(
- step_id: str,
- up: KeyedStream[V],
- clock: Clock[V, Any],
- windower: Windower[Any],
- into=list,
- ordered: bool = True,
Collect items in a window into a container.
In the example below, we will create a tumbling window of 10 minutes that starts at 12:00. We will apply a map function to uppercase the strings and flatten the windowed data. The output windows will be as follows:
Time Axis (UTC): |—————–|—————–|—————–| 12:00 12:10 12:20 12:30 Window 0 Window 1 Window 2
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.operators.windowing import (TumblingWindower, EventClock, collect_window) from bytewax.testing import TestingSource flow = Dataflow("collect_window_example") # Sample data with timestamps data = [ ("key1", "apple", datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)), ("key1", "banana", datetime(2023, 1, 1, 12, 5, tzinfo=timezone.utc)), ("key2", "cherry", datetime(2023, 1, 1, 12, 10, tzinfo=timezone.utc)), ("key1", "date", datetime(2023, 1, 1, 12, 15, tzinfo=timezone.utc)), ("key2", "elderberry", datetime(2023, 1, 1, 12, 20, tzinfo=timezone.utc)), ] up = op.input("input", flow, TestingSource(data)) # Key the stream keyed = op.key_on("key_on", up, lambda x: x[0]) # Define the clock and windower def extract_timestamp(x): return x[2] # Extract the timestamp from the data clock = EventClock( ts_getter=extract_timestamp, wait_for_system_duration=timedelta(seconds=0), ) windower = TumblingWindower( length=timedelta(minutes=10), align_to=datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc) ) # Collect items into a list per window windowed = collect_window("collect_window", keyed, clock, windower) # Transform to uppercase and flatten the windowed data def to_uppercase_and_flatten(item): key, (window_id, values) = item for k, s, ts in values: s_upper = s.upper() ts_str = ts.strftime("%H:%M") yield ((k, s_upper, ts_str), window_id) uppercased_flattened = op.flat_map("to_uppercase_and_flatten", windowed.down, to_uppercase_and_flatten) # Format the output def format_output(item): (k, s_upper, ts_str), window_id = item return f"({k!r}, {s_upper!r}, {ts_str}) => Window {window_id}" formatted = op.map("format_output", uppercased_flattened, format_output) # Inspect the output op.inspect("output", formatted)
collect_window_example.output: "('key1', 'APPLE', 12:00) => Window 0" collect_window_example.output: "('key1', 'BANANA', 12:05) => Window 0" collect_window_example.output: "('key2', 'CHERRY', 12:10) => Window 1" collect_window_example.output: "('key1', 'DATE', 12:15) => Window 1" collect_window_example.output: "('key2', 'ELDERBERRY', 12:20) => Window 2"
See
bytewax.operators.collect
for the ability to set a max size.- Parameters:
step_id – Unique ID.
up – Stream of items to count.
clock – Time definition.
windower – Window definition.
into – Type to collect into. Defaults to
list
.ordered – Whether values in the resulting containers are in timestamp order. There is a performance and latency penalty to ordering by timestamp. If not, they’ll be in upstream order. Defaults to
True
.
- Returns:
Window result streams. Downstream contains the collected containers with values in timestamp order at the end of each window.
- count_window( ) WindowOut[X, int] #
Count the number of occurrences of items in a window.
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.operators.windowing import EventClock, TumblingWindower, count_window from bytewax.testing import TestingSource, run_main # Create a new dataflow flow = Dataflow("count_window_example") # Sample data with timestamps data = [ ("apple", datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)), ("banana", datetime(2023, 1, 1, 12, 5, tzinfo=timezone.utc)), ("apple", datetime(2023, 1, 1, 12, 10, tzinfo=timezone.utc)), ("banana", datetime(2023, 1, 1, 12, 15, tzinfo=timezone.utc)), ("apple", datetime(2023, 1, 1, 12, 20, tzinfo=timezone.utc)), ("cherry", datetime(2023, 1, 1, 12, 25, tzinfo=timezone.utc)), ] # Input stream up = op.input("input", flow, TestingSource(data)) # Define the clock using event timestamps def extract_timestamp(x): return x[1] clock = EventClock( ts_getter=extract_timestamp, wait_for_system_duration=timedelta(seconds=0), ) # Define the windowing strategy windower = TumblingWindower( length=timedelta(minutes=15), align_to=datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc), ) # Use count_window to count occurrences of each item in each window windowed = count_window( step_id="count_window", up=up, clock=clock, windower=windower, key=lambda x: x[0], # Use the item name as the key ) # Format and output the results def format_output(item): key, (window_id, count) = item return f"Item '{key}' occurred {count} times in Window {window_id}" formatted = op.map("format_output", windowed.down, format_output) # Inspect the output op.inspect("output", formatted)
count_window_example.output: "Item 'banana' occurred 1 times in Window 0" count_window_example.output: "Item 'apple' occurred 2 times in Window 0" count_window_example.output: "Item 'apple' occurred 1 times in Window 1" count_window_example.output: "Item 'banana' occurred 1 times in Window 1" count_window_example.output: "Item 'cherry' occurred 1 times in Window 1"
- Parameters:
step_id – Unique ID.
up – Stream of items to count.
clock – Time definition.
windower – Window definition.
key – Function to convert each item into a string key. The counting machinery does not compare the items directly, instead it groups by this string key.
- Returns:
Window result streams. Downstream contains of
(key, count)
per window at the end of each window.
- fold_window(
- step_id: str,
- up: KeyedStream[V],
- clock: Clock[V, Any],
- windower: Windower[Any],
- builder: Callable[[], S],
- folder: Callable[[S, V], S],
- merger: Callable[[S, S], S],
- ordered: bool = True,
Build an empty accumulator, then combine values into it.
It is like
reduce_window
but uses a function to build the initial value.In the example below, we will create a tumbling window of 10 minutes that starts at 12:00. We will sum the values in the window.
Time Axis (UTC): |—————–|—————–|—————–| 12:00 12:10 12:20 12:30 Window 0 Window 1 Window 2
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.operators.windowing import (TumblingWindower, EventClock, fold_window) from bytewax.testing import TestingSource flow = Dataflow("fold_window_example") # Sample data with timestamps data = [ ("key1", 1, datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)), ("key1", 2, datetime(2023, 1, 1, 12, 5, tzinfo=timezone.utc)), ("key2", 3, datetime(2023, 1, 1, 12, 10, tzinfo=timezone.utc)), ("key2", 4, datetime(2023, 1, 1, 12, 15, tzinfo=timezone.utc)), ("key1", 5, datetime(2023, 1, 1, 12, 20, tzinfo=timezone.utc)), ] up = op.input("input", flow, TestingSource(data)) # Key the stream keyed = op.key_on("key_on", up, lambda x: x[0]) # Define the clock and windower def extract_timestamp(x): return x[2] clock = EventClock( ts_getter=extract_timestamp, wait_for_system_duration=timedelta(seconds=0), ) windower = TumblingWindower( length=timedelta(minutes=10), align_to=datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc) ) # Fold the values by summing them windowed = fold_window( "fold_window", keyed, clock, windower, builder=lambda: 0, folder=lambda acc, x: acc + x[1], merger=lambda a, b: a + b, ) # Inspect the output op.inspect("output", windowed.down)
fold_window_example.output: ('key1', (0, 3)) fold_window_example.output: ('key1', (2, 5)) fold_window_example.output: ('key2', (1, 7))
- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Time definition.
windower – Window definition.
builder – Called the first time a key appears and is expected to return the empty accumulator for that key.
folder – Combines a new value into an existing accumulator and returns the updated accumulator. The accumulator is initially the empty accumulator. Values will be passed in timestamp order.
merger – Combines two states whenever two windows merge. Not all window definitions result in merges.
ordered – Whether to fold values in timestamp order. If not, they’ll be in upstream order. There is a performance and latency penalty to ordering by timestamp. Defaults to
True
.
- Returns:
Window result streams. Downstream contains the accumulator for each window, once that window has closed.
- join_window(
- step_id: str,
- clock: Clock[Any, Any],
- windower: Windower[Any],
- *sides: KeyedStream[Any],
- insert_mode: JoinInsertMode = 'last',
- emit_mode: JoinEmitMode = 'final',
- ordered: bool = True,
Gather together the value for a key on multiple streams.
See Joins for more information.
- Parameters:
step_id – Unique ID.
clock – Time definition.
windower – Window definition.
*sides – Keyed streams.
insert_mode – Mode of this join. See
JoinInsertMode
for more info. Defaults to"last"
.emit_mode – Mode of this join. See
JoinEmitMode
for more info. Defaults to"final"
.ordered – Whether to apply values to the logic in timestamp order. If not, they’ll be in upstream order. There is a performance and latency penalty to ordering by timestamp. Defaults to
True
.
- Returns:
Window result streams. Downstream contains tuples with the value from each stream in the order of the argument list. See
JoinEmitMode
for when tuples are emitted.
- max_window( ) WindowOut[V, V] #
Find the maximum value for each key.
- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Time definition.
windower – Window definition.
by – A function called on each value that is used to extract what to compare.
- Returns:
Window result streams. Downstream contains the max value for each window, once that window has closed.
- min_window( ) WindowOut[V, V] #
Find the minumum value for each key.
- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Time definition.
windower – Window definition.
by – A function called on each value that is used to extract what to compare.
- Returns:
Window result streams. Downstream contains the min value for each window, once that window has closed.
- reduce_window(
- step_id: str,
- up: KeyedStream[V],
- clock: Clock[V, Any],
- windower: Windower[Any],
- reducer: Callable[[V, V], V],
Distill all values for a key down into a single value.
It is like
fold_window
but the first value is the initial accumulator.- Parameters:
step_id – Unique ID.
up – Keyed stream.
clock – Time definition.
windower – Window definition.
reducer – Combines a new value into an old value and returns the combined value.
- Returns:
Window result streams. Downstream contains the reduced value for each window, once that window has closed.