Collecting and Windowing Example#

In this section, we’ll be talking about the collect and window operators that Bytewax provides.

Collecting#

The collect operator operates over a stream of data and collects a number of items until a max size, or a given timeout is reached.

Let’s construct a simple dataflow to demonstrate. In the following dataflow, we’re using the TestingSource to generate sample data from a list. In a production dataflow, your input could come from Redpanda, or any other input source.

The TestingSource emits one integer at a time into the dataflow. In our collect operator, we’ll configure it to wait for either 3 values, or for 10 seconds to expire before emitting the list downstream. Since we won’t be waiting for data with our TestingSource, we should see list of 3 items until we run out of input from our TestingSource.

It is important to remember that the collect operator collects data based on a key. We’ll use the key_on operator to give all of our items the same fixed key so they are processed together.

Copy the following code into a file named collect_example.py:

 1from datetime import timedelta
 2
 3import bytewax.operators as op
 4from bytewax.dataflow import Dataflow
 5from bytewax.connectors.stdio import StdOutSink
 6from bytewax.testing import TestingSource
 7
 8flow = Dataflow("collect")
 9stream = op.input("input", flow, TestingSource(list(range(10))))
10# We want to consider all the items together, so we assign the same fixed key to each of them.
11keyed_stream = op.key_on("key", stream, lambda _x: "ALL")
12collected_stream = op.collect(
13    "collect", keyed_stream, timeout=timedelta(seconds=10), max_size=3
14)
15op.output("out", collected_stream, StdOutSink())

Now we have our dataflow, we can run our example:

> python -m bytewax.run collect_example
('ALL', [0, 1, 2])
('ALL', [3, 4, 5])
('ALL', [6, 7, 8])
('ALL', [9])

Windowing#

Windowing operators (which live in bytewax.operators.window) perform computation over a time-based window of data where time can be defined as the system time that the data is processed, known as processing time, or time as a property of the data itself referred to as event time. For this example, we’re going to use event time. Time will be used in our window operators to decide which windows a given item belongs to, and to determine when an item is late.

Let’s start by and importing the relevant classes, creating a dataflow, and configuring some test input.

 1from datetime import datetime, timedelta, timezone
 2
 3import bytewax.operators as op
 4import bytewax.operators.window as win
 5
 6from bytewax.dataflow import Dataflow
 7from bytewax.connectors.stdio import StdOutSink
 8from bytewax.operators.window import EventClockConfig, TumblingWindow, WindowMetadata
 9from bytewax.testing import TestingSource
10
11flow = Dataflow("windowing")
12
13align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
14inp = [
15    {"time": align_to, "user": "a", "val": 1},
16    {"time": align_to + timedelta(seconds=4), "user": "a", "val": 1},
17    {"time": align_to + timedelta(seconds=5), "user": "b", "val": 1},
18    {"time": align_to + timedelta(seconds=8), "user": "a", "val": 1},
19    {"time": align_to + timedelta(seconds=12), "user": "a", "val": 1},
20    {"time": align_to + timedelta(seconds=13), "user": "a", "val": 1},
21    {"time": align_to + timedelta(seconds=14), "user": "b", "val": 1},
22]
23stream = op.input("input", flow, TestingSource(inp))
24keyed_stream = op.key_on("key_on_user", stream, lambda e: e["user"])

Window assignment, and late arriving data#

In addition to a sense of time, windowing operators also require a configuration that determines how items are assigned to windows. Items can be assigned to one or more windows, depending on the desired behavior. In this example, we’ll be using the TumblingWindow assigner, which will assign each item to a single, fixed duration window for each key in the stream.

In the following snippet, we configure the EventClockConfig to determine the time of items flowing through the dataflow with a lambda that reads the “time” key of each item we created in the dictionary above.

We also configure our EventClockConfig with a value for the wait_for_system_duration parameter.

wait_for_system_duration is the amount of system time we’re willing to wait for any late arriving items before closing the window and emitting it downstream. After a window is closed, late arriving items for that window will be discarded.

In order for windows to be generated consistently, we finally supply the align_to parameter, which says that all windows that we collect will be aligned to the given datetime. We’ll use the value that we created above for our event data.

1ZERO_TD = timedelta(seconds=0)
2clock = EventClockConfig(lambda e: e["time"], wait_for_system_duration=ZERO_TD)
3windower = TumblingWindow(length=timedelta(seconds=10), align_to=align_to)

Window processing#

Now that we have our window assignment, and our clock configuration, we need to define the processing step that we would like to perform on each window. We’ll use the collect_window operator to collect all of the events for a given user in each window.

1windowed_stream = win.collect_window("add", keyed_stream, clock, windower)

Window Metadata#

The output of window operators in Bytewax is a tuple in the format: (key, (metadata, window)) Where metadata is a WindowMetadata object with information about the open_time and close_time of the window.

We’ll write our window output to STDOUT using our output operator:

1op.output("out", windowed_stream, StdOutSink())

Running the dataflow#

Running our dataflow, we should see the following output:

> python -m bytewax.run window_example
('a', (WindowMetadata(open_time: 2022-01-01 00:00:00 UTC, close_time: 2022-01-01 00:00:10 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 4, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 8, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (WindowMetadata(open_time: 2022-01-01 00:00:00 UTC, close_time: 2022-01-01 00:00:10 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 5, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))
('a', (WindowMetadata(open_time: 2022-01-01 00:00:10 UTC, close_time: 2022-01-01 00:00:20 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 12, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}, {'time': datetime.datetime(2022, 1, 1, 0, 0, 13, tzinfo=datetime.timezone.utc), 'user': 'a', 'val': 1}]))
('b', (WindowMetadata(open_time: 2022-01-01 00:00:10 UTC, close_time: 2022-01-01 00:00:20 UTC), [{'time': datetime.datetime(2022, 1, 1, 0, 0, 14, tzinfo=datetime.timezone.utc), 'user': 'b', 'val': 1}]))

Bytewax has created a window for each key (in our case, the user value) and collected all of the items that were encountered in that window. Along with the key for each value, we receive a WindowMetadata object with information about the open time and close time of each window that Bytewax created.

Wrapping up#

Bytewax offers multiple processing shapes, window assignment types and other configuration options. For more information, please see the Windowing section, and the windowing API documentation in bytewax.operators.window.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now