Introducing Windowing: Processing smoothie orders using windowing operators#

Currency symbol

In this example we are going to walk through how you can incorporate windowing in your Bytewax dataflow through Bytewax windowing operators. This is part II in our operator introductory tutorial series, for the first part visit Processing smoothie orders with Bytewax operators.

Skill Level

Time to Complete

Level

Intermediate Python programming and Bytewax operators

Approx. 25 Min

Intermediate

Your Takeaway#

This tutorial will teach you how to chain Bytewax windowing operators to wrangle streams of data. By the end of this tutorial you will have an understanding of how to incorporate windowing techniques into a Bytewax dataflow.

Resources#

bytewax/bytewax/docs/tutorials/introducing-windowing/tumbling_dataflow.py bytewax/bytewax/docs/tutorials/introducing-windowing/sliding_dataflow.py bytewax/bytewax/docs/tutorials/introducing-windowing/session_dataflow.py bytewax/bytewax/docs/tutorials/introducing-windowing/smoothie_order_l.csv

In our blog on windowing, we cover in depth key concepts and assumptions we make to ensure a smooth experience incorporating windowing techniques. If you’re new to windowing, we recommend you take a look at the blog and the cheatsheet our team prepared.

Set up and imports#

Before we begin, let’s import the necessary modules and set up the environment for building the dataflow.

Complete installation - we recommend using a virtual environment to manage your Python dependencies. You can install Bytewax using pip:

$ python -m venv venv
$ ./venv/bin/activate
(venv) $ pip install bytewax==0.21.1

Now, let’s import the required modules and set up the environment for building the dataflow.

tumbling_dataflow.py#
 4from datetime import datetime, timedelta, timezone
 5from pathlib import Path
 6
 7import bytewax.operators as op
 8from bytewax.connectors.files import CSVSource
 9from bytewax.dataflow import Dataflow
10from bytewax.operators.windowing import (
11    EventClock,
12    TumblingWindower,
13    count_window,
14)
15

We’re now ready! Let’s jump in.

Processing smoothie orders with windowing#

Here is a mock dataset containing smoothie order data. Let’s apply windowing techniques on this data to answer specific questions.

bytewax/bytewax/docs/tutorials/introducing-windowing/smoothie_order_l.csv

Objectives:

  1. Count the number of smoothie orders every 30 minutes starting from 08:00 AM until 12:30 PM.

  2. Track smoothie orders using overlapping windows to smooth fluctuations in order rates and calculate moving number of orders.

  3. Group smoothie orders that occur closely together in time into dynamic session windows, which close after a period of inactivity.

Let’s first initialize our dataflow:

tumbling_dataflow.py#
19# Create a new dataflow
20flow = Dataflow("windowing_operators_examples")

We can then read the content of the CSV file through the input operator along with the CSVSource

tumbling_dataflow.py#
24# Input stream
25csv_file_path = Path("smoothie_order_l.csv")
26up = op.input("orders", flow, CSVSource(csv_file_path))
27# op.inspect("see_data", up)

By executing

python -m bytewax.run tumbling_dataflow:flow

we can see the stream of processed information (showing first few entries):

windowing_operators_examples.see_data: {'order_id': '1', 'time': '2024-08-29 08:00:00', 'order_requested': 'Green Machine', 'ingredients': 'spinach;banana;almond milk;chia seeds'}
windowing_operators_examples.see_data: {'order_id': '2', 'time': '2024-08-29 08:05:00', 'order_requested': 'Berry Blast', 'ingredients': 'strawberry;blueberry;greek yogurt;honey'}
windowing_operators_examples.see_data: {'order_id': '3', 'time': '2024-08-29 08:10:00', 'order_requested': 'Tropical Twist', 'ingredients': 'mango;pineapple;coconut water;flax seeds'}

Let’s define our clock and window type next.

Choosing the appropriate windowing technique according to the objective#

For each of our three objectives, we will choose to work with a specific type of window as outlined in the table below:

Objective

Type of Window

Specifications

Aggregation or computation

Count number of orders every 30 minutes from 8:00am to 12:30pm

Tumbling Window

Length of window is 30 minutes, aligned to the date of the events

Counting

Track smoothie orders using overlapping windows for moving number of orders

Sliding Window

Length of window can be chosen depending on the desired window size

Counting

Group smoothie orders that occur closely together in time into dynamic session windows, windows close after inactivity

Session Window

Period if inactivity can be defined by the user

Collecting

Let’s now define the appropriate code for each objective.

Objective 1: Count the number of smoothie orders every 30 minutes from 08:00 AM to 12:30 PM#

We can incorporate our clock definition and window definition as follows:

tumbling_dataflow.py#
50# Define the clock using event timestamps
51def extract_timestamp(x):
52    """Extract the timestamp from the data."""
53    return x[1]  # Extract the timestamp from the data
54
55
56clock = EventClock(
57    ts_getter=extract_timestamp,
58    wait_for_system_duration=timedelta(seconds=0),
59)
60
61# Define the tumbling window of 30 minutes starting at 08:00
62windower = TumblingWindower(
63    length=timedelta(minutes=30),
64    align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc),
65)
66
67# Count the number of orders in each window
68windowed = count_window(
69    step_id="count_orders",
70    up=parsed,
71    clock=clock,
72    windower=windower,
73    key=lambda x: "total_orders",  # Use a constant key to aggregate all orders
74)
75
76op.inspect("windowed_count_orders", windowed.down)

Executing our dataflow then returns the following:

python -m bytewax.run tumbling_dataflow:flow

windowing_operators_examples.windowed_count_orders: ('total_orders', (0, 5))
windowing_operators_examples.windowed_count_orders: ('total_orders', (1, 7))
windowing_operators_examples.windowed_count_orders: ('total_orders', (2, 6))
windowing_operators_examples.windowed_count_orders: ('total_orders', (3, 5))
windowing_operators_examples.windowed_count_orders: ('total_orders', (4, 5))
windowing_operators_examples.windowed_count_orders: ('total_orders', (5, 4))
windowing_operators_examples.windowed_count_orders: ('total_orders', (6, 6))
windowing_operators_examples.windowed_count_orders: ('total_orders', (7, 4))
windowing_operators_examples.windowed_count_orders: ('total_orders', (8, 8))

The first element in the tuple corresponds to the window ID, and the second element corresponds to the number of orders. Let’s format our output:

tumbling_dataflow.py#
81# Format and output the results
82def format_output(item):
83    """Format the output for each window."""
84    key, (window_id, count) = item
85    window_start = windower.align_to + timedelta(minutes=30 * window_id)
86    window_end = window_start + timedelta(minutes=30)
87    return f"Window {window_id}\
88    ({window_start.strftime('%H:%M')}\
89    - {window_end.strftime('%H:%M')}): {count} orders"
90
91
92formatted = op.map("format_output", windowed.down, format_output)
93
94# Inspect the output
95op.inspect("output", formatted)

Executing our dataflow then returns the following:

python -m bytewax.run tumbling_dataflow:flow

windowing_operators_examples.formatted_objective_1: 'Window 0    (08:00    - 08:30): 5 orders'
windowing_operators_examples.formatted_objective_1: 'Window 1    (08:30    - 09:00): 7 orders'
windowing_operators_examples.formatted_objective_1: 'Window 2    (09:00    - 09:30): 6 orders'
windowing_operators_examples.formatted_objective_1: 'Window 3    (09:30    - 10:00): 5 orders'
windowing_operators_examples.formatted_objective_1: 'Window 4    (10:00    - 10:30): 5 orders'
windowing_operators_examples.formatted_objective_1: 'Window 5    (10:30    - 11:00): 4 orders'
windowing_operators_examples.formatted_objective_1: 'Window 6    (11:00    - 11:30): 6 orders'
windowing_operators_examples.formatted_objective_1: 'Window 7    (11:30    - 12:00): 4 orders'
windowing_operators_examples.formatted_objective_1: 'Window 8    (12:00    - 12:30): 8 orders'

With the TumblingWindower and count_window operators, we were able to create 30 minute chunks and count the orders within those chunks. Let’s now take a look at our second objective.

Objective 2: Track smoothie orders using overlapping windows for moving number of orders#

We will use the SlidingWindower and count_window operators to acomplish our goal. In the code below, we defined a sliding window with a length of 1 hour with an offset of 15 minutes. It will count the total orders in each window. We’ve also added a formatting function.

sliding_dataflow.py#
50# Define the clock using event timestamps
51def extract_timestamp(x):
52    """Extract the timestamp from the data."""
53    return x[1]  # Extract the timestamp from the data
54
55
56# Define clock based on event timestamps
57clock = EventClock(
58    ts_getter=extract_timestamp, wait_for_system_duration=timedelta(seconds=0)
59)
60
61
62# Create a sliding window with an offset of 15 minutes and length of 1 hour
63windower = SlidingWindower(
64    length=timedelta(hours=1),
65    offset=timedelta(minutes=15),
66    align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc),
67)
68
69
70# Count the number of orders in each window
71windowed = count_window(
72    step_id="count_orders_sliding",
73    up=parsed,
74    clock=clock,
75    windower=windower,
76    key=lambda x: "total_orders",
77)
78
79
80# Format and output the results
81def format_output_objective_2(item):
82    """Format the output for each window."""
83    key, (window_id, count) = item
84    window_start = windower.align_to + timedelta(minutes=15 * window_id)
85    window_end = window_start + windower.length
86    return f"Window {window_id}\
87    ({window_start.strftime('%H:%M')} -\
88    {window_end.strftime('%H:%M')}): {count} orders"
89
90
91formatted_2 = op.map("format_output_sliding", windowed.down, format_output_objective_2)
92op.inspect("formatted_objective_2", formatted_2)

Executing our dataflow then returns the following:

python -m bytewax.run sliding_dataflow:flow

windowing_operators_examples.formatted_objective_2: 'Window -3    (07:15 -    08:15): 3 orders'
windowing_operators_examples.formatted_objective_2: 'Window -2    (07:30 -    08:30): 5 orders'
windowing_operators_examples.formatted_objective_2: 'Window -1    (07:45 -    08:45): 9 orders'
windowing_operators_examples.formatted_objective_2: 'Window 0    (08:00 -    09:00): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 1    (08:15 -    09:15): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 2    (08:30 -    09:30): 13 orders'
windowing_operators_examples.formatted_objective_2: 'Window 3    (08:45 -    09:45): 11 orders'
windowing_operators_examples.formatted_objective_2: 'Window 4    (09:00 -    10:00): 11 orders'
windowing_operators_examples.formatted_objective_2: 'Window 5    (09:15 -    10:15): 11 orders'
windowing_operators_examples.formatted_objective_2: 'Window 6    (09:30 -    10:30): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 7    (09:45 -    10:45): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 8    (10:00 -    11:00): 9 orders'
windowing_operators_examples.formatted_objective_2: 'Window 9    (10:15 -    11:15): 8 orders'
windowing_operators_examples.formatted_objective_2: 'Window 10    (10:30 -    11:30): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 11    (10:45 -    11:45): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 12    (11:00 -    12:00): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 13    (11:15 -    12:15): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 14    (11:30 -    12:30): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 15    (11:45 -    12:45): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 16    (12:00 -    13:00): 8 orders'
windowing_operators_examples.formatted_objective_2: 'Window 17    (12:15 -    13:15): 4 orders'

There seem to be peak times - early morning and around lunch time. Let’s complete our final objective.

Objective 3: Group smoothie orders that occur closely together in time into dynamic session windows, windows close after inactivity#

We will use the SessionWindower and collect_window operators to acomplish our goal.

session_dataflow.py#
52# Define the clock using event timestamps
53def extract_timestamp(x):
54    """Extract the timestamp from the data."""
55    return x[1]  # Extract the timestamp from the data
56
57
58clock = EventClock(
59    ts_getter=extract_timestamp,
60    wait_for_system_duration=timedelta(seconds=0),
61)
62
63windower = SessionWindower(gap=timedelta(minutes=5))
64
65# Count the number of orders in each window
66windowed = collect_window(
67    step_id="collect_sessions",
68    up=keyed,
69    clock=clock,
70    windower=windower,
71)
72
73# op.inspect("windowed_count_orders", windowed.down)
74
75
76def format_output(item):
77    """Format the output."""
78    key, (window_id, orders) = item
79    order_ids = [order[0] for order in orders]
80    timestamps = [order[1] for order in orders]
81    session_start = min(timestamps).strftime("%H:%M")
82    session_end = max(timestamps).strftime("%H:%M")
83    return f"Session {window_id}\
84    ({session_start} - {session_end}): Order ID {order_ids}"
85
86
87formatted = op.map("format_output", windowed.down, format_output)
88
89# Inspect the output
90op.inspect("output", formatted)

Executing our dataflow then returns the following:

python -m bytewax.run session_dataflow:flow

windowing_operators_examples.output: 'Session 0    (08:00 - 08:15): Order ID [1, 2, 3, 4]'
windowing_operators_examples.output: 'Session 1    (08:23 - 08:23): Order ID [5]'
windowing_operators_examples.output: 'Session 2    (08:31 - 08:38): Order ID [6, 7, 8]'
windowing_operators_examples.output: 'Session 3    (08:44 - 08:56): Order ID [9, 10, 11, 12]'
windowing_operators_examples.output: 'Session 4    (09:02 - 09:22): Order ID [13, 14, 15, 16, 17]'
windowing_operators_examples.output: 'Session 5    (09:28 - 09:28): Order ID [18]'
windowing_operators_examples.output: 'Session 6    (09:36 - 09:39): Order ID [19, 20]'
windowing_operators_examples.output: 'Session 7    (09:45 - 09:45): Order ID [21]'
windowing_operators_examples.output: 'Session 8    (09:51 - 09:51): Order ID [22]'
windowing_operators_examples.output: 'Session 9    (09:58 - 09:58): Order ID [23]'
windowing_operators_examples.output: 'Session 10    (10:05 - 10:14): Order ID [24, 25, 26]'
windowing_operators_examples.output: 'Session 11    (10:20 - 10:20): Order ID [27]'
windowing_operators_examples.output: 'Session 12    (10:28 - 10:33): Order ID [28, 29]'
windowing_operators_examples.output: 'Session 13    (10:41 - 10:41): Order ID [30]'
windowing_operators_examples.output: 'Session 14    (10:48 - 10:48): Order ID [31]'
windowing_operators_examples.output: 'Session 15    (10:56 - 10:56): Order ID [32]'
windowing_operators_examples.output: 'Session 16    (11:02 - 11:02): Order ID [33]'
windowing_operators_examples.output: 'Session 17    (11:09 - 11:09): Order ID [34]'
windowing_operators_examples.output: 'Session 18    (11:16 - 11:19): Order ID [35, 36]'
windowing_operators_examples.output: 'Session 19    (11:25 - 11:28): Order ID [37, 38]'
windowing_operators_examples.output: 'Session 20    (11:36 - 11:41): Order ID [39, 40]'
windowing_operators_examples.output: 'Session 21    (11:48 - 11:48): Order ID [41]'
windowing_operators_examples.output: 'Session 22    (11:56 - 12:29): Order ID [42, 43, 44, 45, 46, 47, 48, 49, 50]'

We can see that the busiest window was between 11:56 and 12:29, around lunch time!

Summary#

That’s it, now you have an understanding of how you can chain Bytewax operators along with windowing operators to perform data transformations, aggregations and obtain insights into time-series-based data.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now