Collecting and Windowing Example#
In this section, we’ll be talking about the collect and window operators that Bytewax provides.
Collecting#
The collect
operator operates over a
stream of data and collects a number of items until a max size, or a
given timeout is reached.
Let’s construct a simple dataflow to demonstrate. In the following
dataflow, we’re using the TestingSource
to
generate sample data from a list. In a production dataflow, your input
could come from Redpanda, or any other input
source.
The TestingSource
emits one integer at a
time into the dataflow. In our collect
operator, we’ll configure it to wait for either 3 values, or for 10
seconds to expire before emitting the list downstream. Since we won’t
be waiting for data with our TestingSource
,
we should see list of 3 items until we run out of input from our
TestingSource
.
It is important to remember that the
collect
operator collects data based on a
key. We’ll use the key_on
operator to
give all of our items the same fixed key so they are processed
together.
Copy the following code into a file named collect_example.py
:
1from datetime import timedelta
2
3import bytewax.operators as op
4from bytewax.dataflow import Dataflow
5from bytewax.connectors.stdio import StdOutSink
6from bytewax.testing import TestingSource
7
8flow = Dataflow("collect")
9stream = op.input("input", flow, TestingSource(list(range(10))))
10# We want to consider all the items together, so we assign the same fixed key to each of them.
11keyed_stream = op.key_on("key", stream, lambda _x: "ALL")
12collected_stream = op.collect(
13 "collect", keyed_stream, timeout=timedelta(seconds=10), max_size=3
14)
15op.output("out", collected_stream, StdOutSink())
Now we have our dataflow, we can run our example:
> python -m bytewax.run collect_example
('ALL', [0, 1, 2])
('ALL', [3, 4, 5])
('ALL', [6, 7, 8])
('ALL', [9])
Windowing#
Windowing operators (which live in bytewax.operators.window
)
perform computation over a time-based window of data where time can be
defined as the system time that the data is processed, known as
processing time, or time as a property of the data itself referred
to as event time. For this example, we’re going to use event time.
Time will be used in our window operators to decide which windows a
given item belongs to, and to determine when an item is late.
Let’s start by and importing the relevant classes, creating a dataflow, and configuring some test input.
1from datetime import datetime, timedelta, timezone
2
3import bytewax.operators as op
4import bytewax.operators.window as win
5
6from bytewax.dataflow import Dataflow
7from bytewax.connectors.stdio import StdOutSink
8from bytewax.operators.window import EventClockConfig, TumblingWindow, WindowMetadata
9from bytewax.testing import TestingSource
10
11flow = Dataflow("windowing")
12
13align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
14inp = [
15 {"time": align_to, "user": "a", "val": 1},
16 {"time": align_to + timedelta(seconds=4), "user": "a", "val": 1},
17 {"time": align_to + timedelta(seconds=5), "user": "b", "val": 1},
18 {"time": align_to + timedelta(seconds=8), "user": "a", "val": 1},
19 {"time": align_to + timedelta(seconds=12), "user": "a", "val": 1},
20 {"time": align_to + timedelta(seconds=13), "user": "a", "val": 1},
21 {"time": align_to + timedelta(seconds=14), "user": "b", "val": 1},
22]
23stream = op.input("input", flow, TestingSource(inp))
24keyed_stream = op.key_on("key_on_user", stream, lambda e: e["user"])
Window assignment, and late arriving data#
In addition to a sense of time, windowing operators also require a
configuration that determines how items are assigned to windows. Items
can be assigned to one or more windows, depending on the desired
behavior. In this example, we’ll be using the
TumblingWindow
assigner, which
will assign each item to a single, fixed duration window for each key
in the stream.
In the following snippet, we configure the
EventClockConfig
to determine the
time of items flowing through the dataflow with a
lambda
that reads the “time” key of each item we
created in the dictionary above.
We also configure our
EventClockConfig
with a value for
the
wait_for_system_duration
parameter.
wait_for_system_duration
is the amount of system time we’re willing to wait for any late
arriving items before closing the window and emitting it downstream.
After a window is closed, late arriving items for that window will be
discarded.
In order for windows to be generated consistently, we finally supply
the align_to
parameter, which says that all windows that we collect will be aligned
to the given datetime
. We’ll use the value that we created above for
our event data.
1ZERO_TD = timedelta(seconds=0)
2clock = EventClockConfig(lambda e: e["time"], wait_for_system_duration=ZERO_TD)
3windower = TumblingWindow(length=timedelta(seconds=10), align_to=align_to)
Window processing#
Now that we have our window assignment, and our clock configuration,
we need to define the processing step that we would like to perform on
each window. We’ll use the
collect_window
operator to collect
all of the events for a given user in each window.
1windowed_stream = win.collect_window("add", keyed_stream, clock, windower)
Window Metadata#
The output of window operators in Bytewax is a tuple in the format:
(key, (metadata, window))
Where metadata
is a
WindowMetadata
object with
information about the open_time
and close_time
of the window.
We’ll write our window output to STDOUT using our output operator:
1op.output("out", windowed_stream, StdOutSink())
Running the dataflow#
Running our dataflow, we should see the following output:
> python -m bytewax.run window_example
('a', (WindowMetadata(open_time: 2022-01-01 00:00:00 UTC, close_time: 2022-01-01 00:00:10 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 4, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 8, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (WindowMetadata(open_time: 2022-01-01 00:00:00 UTC, close_time: 2022-01-01 00:00:10 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 5, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))
('a', (WindowMetadata(open_time: 2022-01-01 00:00:10 UTC, close_time: 2022-01-01 00:00:20 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 12, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 13, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (WindowMetadata(open_time: 2022-01-01 00:00:10 UTC, close_time: 2022-01-01 00:00:20 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 14, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))
Bytewax has created a window for each key (in our case, the user
value) and collected all of the items that were encountered in that
window. Along with the key for each value, we receive a
WindowMetadata
object with
information about the open time and close time of each window that
Bytewax created.
Wrapping up#
Bytewax offers multiple processing shapes, window assignment types and
other configuration options. For more information, please see the
Windowing section, and the windowing API documentation
in bytewax.operators.window
.