Collecting and Windowing Example#
In this section, we’ll be talking about the collect and window operators that Bytewax provides.
Collecting#
The collect operator operates over a
stream of data and collects a number of items until a max size, or a
given timeout is reached.
Let’s construct a simple dataflow to demonstrate. In the following
dataflow, we’re using the TestingSource to
generate sample data from a list. In a production dataflow, your input
could come from Redpanda, or any other input
source.
The TestingSource emits one integer at a
time into the dataflow. In our collect
operator, we’ll configure it to wait for either 3 values, or for 10
seconds to expire before emitting the list downstream. Since we won’t
be waiting for data with our TestingSource,
we should see list of 3 items until we run out of input from our
TestingSource.
It is important to remember that the
collect operator collects data based on a
key. We’ll use the key_on operator to
give all of our items the same fixed key so they are processed
together.
Copy the following code into a file named collect_example.py:
1from datetime import timedelta
2
3import bytewax.operators as op
4from bytewax.dataflow import Dataflow
5from bytewax.connectors.stdio import StdOutSink
6from bytewax.testing import TestingSource
7
8flow = Dataflow("collect")
9stream = op.input("input", flow, TestingSource(list(range(10))))
10# We want to consider all the items together, so we assign the same fixed key to each of them.
11keyed_stream = op.key_on("key", stream, lambda _x: "ALL")
12collected_stream = op.collect(
13 "collect", keyed_stream, timeout=timedelta(seconds=10), max_size=3
14)
15op.output("out", collected_stream, StdOutSink())
Now we have our dataflow, we can run our example:
> python -m bytewax.run collect_example
('ALL', [0, 1, 2])
('ALL', [3, 4, 5])
('ALL', [6, 7, 8])
('ALL', [9])
Windowing#
Windowing operators (which live in bytewax.operators.window)
perform computation over a time-based window of data where time can be
defined as the system time that the data is processed, known as
processing time, or time as a property of the data itself referred
to as event time. For this example, we’re going to use event time.
Time will be used in our window operators to decide which windows a
given item belongs to, and to determine when an item is late.
Let’s start by and importing the relevant classes, creating a dataflow, and configuring some test input.
1from datetime import datetime, timedelta, timezone
2
3import bytewax.operators as op
4import bytewax.operators.window as win
5
6from bytewax.dataflow import Dataflow
7from bytewax.connectors.stdio import StdOutSink
8from bytewax.operators.window import EventClockConfig, TumblingWindow, WindowMetadata
9from bytewax.testing import TestingSource
10
11flow = Dataflow("windowing")
12
13align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
14inp = [
15 {"time": align_to, "user": "a", "val": 1},
16 {"time": align_to + timedelta(seconds=4), "user": "a", "val": 1},
17 {"time": align_to + timedelta(seconds=5), "user": "b", "val": 1},
18 {"time": align_to + timedelta(seconds=8), "user": "a", "val": 1},
19 {"time": align_to + timedelta(seconds=12), "user": "a", "val": 1},
20 {"time": align_to + timedelta(seconds=13), "user": "a", "val": 1},
21 {"time": align_to + timedelta(seconds=14), "user": "b", "val": 1},
22]
23stream = op.input("input", flow, TestingSource(inp))
24keyed_stream = op.key_on("key_on_user", stream, lambda e: e["user"])
Window assignment, and late arriving data#
In addition to a sense of time, windowing operators also require a
configuration that determines how items are assigned to windows. Items
can be assigned to one or more windows, depending on the desired
behavior. In this example, we’ll be using the
TumblingWindow assigner, which
will assign each item to a single, fixed duration window for each key
in the stream.
In the following snippet, we configure the
EventClockConfig to determine the
time of items flowing through the dataflow with a
lambda that reads the “time” key of each item we
created in the dictionary above.
We also configure our
EventClockConfig with a value for
the
wait_for_system_duration
parameter.
wait_for_system_duration
is the amount of system time we’re willing to wait for any late
arriving items before closing the window and emitting it downstream.
After a window is closed, late arriving items for that window will be
discarded.
In order for windows to be generated consistently, we finally supply
the align_to
parameter, which says that all windows that we collect will be aligned
to the given datetime. We’ll use the value that we created above for
our event data.
1ZERO_TD = timedelta(seconds=0)
2clock = EventClockConfig(lambda e: e["time"], wait_for_system_duration=ZERO_TD)
3windower = TumblingWindow(length=timedelta(seconds=10), align_to=align_to)
Window processing#
Now that we have our window assignment, and our clock configuration,
we need to define the processing step that we would like to perform on
each window. We’ll use the
collect_window operator to collect
all of the events for a given user in each window.
1windowed_stream = win.collect_window("add", keyed_stream, clock, windower)
Window Metadata#
The output of window operators in Bytewax is a tuple in the format:
(key, (metadata, window)) Where metadata is a
WindowMetadata object with
information about the open_time and close_time of the window.
We’ll write our window output to STDOUT using our output operator:
1op.output("out", windowed_stream, StdOutSink())
Running the dataflow#
Running our dataflow, we should see the following output:
> python -m bytewax.run window_example
('a', (WindowMetadata(open_time: 2022-01-01 00:00:00 UTC, close_time: 2022-01-01 00:00:10 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 4, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 8, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (WindowMetadata(open_time: 2022-01-01 00:00:00 UTC, close_time: 2022-01-01 00:00:10 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 5, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))
('a', (WindowMetadata(open_time: 2022-01-01 00:00:10 UTC, close_time: 2022-01-01 00:00:20 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 12, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 13, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (WindowMetadata(open_time: 2022-01-01 00:00:10 UTC, close_time: 2022-01-01 00:00:20 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 14, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))
Bytewax has created a window for each key (in our case, the user
value) and collected all of the items that were encountered in that
window. Along with the key for each value, we receive a
WindowMetadata object with
information about the open time and close time of each window that
Bytewax created.
Wrapping up#
Bytewax offers multiple processing shapes, window assignment types and
other configuration options. For more information, please see the
Windowing section, and the windowing API documentation
in bytewax.operators.window.