Introducing Windowing: Processing smoothie orders using windowing operators#
In this example we are going to walk through how you can incorporate windowing in your Bytewax dataflow through Bytewax windowing operators. This is part II in our operator introductory tutorial series, for the first part visit Processing smoothie orders with Bytewax operators.
Skill Level |
Time to Complete |
Level |
---|---|---|
Intermediate Python programming and Bytewax operators |
Approx. 25 Min |
Intermediate |
Your Takeaway#
This tutorial will teach you how to chain Bytewax windowing operators to wrangle streams of data. By the end of this tutorial you will have an understanding of how to incorporate windowing techniques into a Bytewax dataflow.
Resources#
bytewax/bytewax/docs/tutorials/introducing-windowing/tumbling_dataflow.py bytewax/bytewax/docs/tutorials/introducing-windowing/sliding_dataflow.py bytewax/bytewax/docs/tutorials/introducing-windowing/session_dataflow.py bytewax/bytewax/docs/tutorials/introducing-windowing/smoothie_order_l.csv
In our blog on windowing, we cover in depth key concepts and assumptions we make to ensure a smooth experience incorporating windowing techniques. If you’re new to windowing, we recommend you take a look at the blog and the cheatsheet our team prepared.
Set up and imports#
Before we begin, let’s import the necessary modules and set up the environment for building the dataflow.
Complete installation - we recommend using a virtual environment to manage your Python dependencies. You can install Bytewax using pip:
$ python -m venv venv
$ ./venv/bin/activate
(venv) $ pip install bytewax==0.21.1
Now, let’s import the required modules and set up the environment for building the dataflow.
4from datetime import datetime, timedelta, timezone
5from pathlib import Path
6
7import bytewax.operators as op
8from bytewax.connectors.files import CSVSource
9from bytewax.dataflow import Dataflow
10from bytewax.operators.windowing import (
11 EventClock,
12 TumblingWindower,
13 count_window,
14)
15
We’re now ready! Let’s jump in.
Processing smoothie orders with windowing#
Here is a mock dataset containing smoothie order data. Let’s apply windowing techniques on this data to answer specific questions.
bytewax/bytewax/docs/tutorials/introducing-windowing/smoothie_order_l.csv
Objectives:
Count the number of smoothie orders every 30 minutes starting from 08:00 AM until 12:30 PM.
Track smoothie orders using overlapping windows to smooth fluctuations in order rates and calculate moving number of orders.
Group smoothie orders that occur closely together in time into dynamic session windows, which close after a period of inactivity.
Let’s first initialize our dataflow:
19# Create a new dataflow
20flow = Dataflow("windowing_operators_examples")
We can then read the content of the CSV file through the input
operator along with the CSVSource
24# Input stream
25csv_file_path = Path("smoothie_order_l.csv")
26up = op.input("orders", flow, CSVSource(csv_file_path))
27# op.inspect("see_data", up)
By executing
python -m bytewax.run tumbling_dataflow:flow
we can see the stream of processed information (showing first few entries):
windowing_operators_examples.see_data: {'order_id': '1', 'time': '2024-08-29 08:00:00', 'order_requested': 'Green Machine', 'ingredients': 'spinach;banana;almond milk;chia seeds'}
windowing_operators_examples.see_data: {'order_id': '2', 'time': '2024-08-29 08:05:00', 'order_requested': 'Berry Blast', 'ingredients': 'strawberry;blueberry;greek yogurt;honey'}
windowing_operators_examples.see_data: {'order_id': '3', 'time': '2024-08-29 08:10:00', 'order_requested': 'Tropical Twist', 'ingredients': 'mango;pineapple;coconut water;flax seeds'}
Let’s define our clock and window type next.
Choosing the appropriate windowing technique according to the objective#
For each of our three objectives, we will choose to work with a specific type of window as outlined in the table below:
Objective |
Type of Window |
Specifications |
Aggregation or computation |
---|---|---|---|
Count number of orders every 30 minutes from 8:00am to 12:30pm |
Tumbling Window |
Length of window is 30 minutes, aligned to the date of the events |
Counting |
Track smoothie orders using overlapping windows for moving number of orders |
Sliding Window |
Length of window can be chosen depending on the desired window size |
Counting |
Group smoothie orders that occur closely together in time into dynamic session windows, windows close after inactivity |
Session Window |
Period if inactivity can be defined by the user |
Collecting |
Let’s now define the appropriate code for each objective.
Objective 1: Count the number of smoothie orders every 30 minutes from 08:00 AM to 12:30 PM#
We can incorporate our clock definition and window definition as follows:
50# Define the clock using event timestamps
51def extract_timestamp(x):
52 """Extract the timestamp from the data."""
53 return x[1] # Extract the timestamp from the data
54
55
56clock = EventClock(
57 ts_getter=extract_timestamp,
58 wait_for_system_duration=timedelta(seconds=0),
59)
60
61# Define the tumbling window of 30 minutes starting at 08:00
62windower = TumblingWindower(
63 length=timedelta(minutes=30),
64 align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc),
65)
66
67# Count the number of orders in each window
68windowed = count_window(
69 step_id="count_orders",
70 up=parsed,
71 clock=clock,
72 windower=windower,
73 key=lambda x: "total_orders", # Use a constant key to aggregate all orders
74)
75
76op.inspect("windowed_count_orders", windowed.down)
Executing our dataflow then returns the following:
python -m bytewax.run tumbling_dataflow:flow
windowing_operators_examples.windowed_count_orders: ('total_orders', (0, 5))
windowing_operators_examples.windowed_count_orders: ('total_orders', (1, 7))
windowing_operators_examples.windowed_count_orders: ('total_orders', (2, 6))
windowing_operators_examples.windowed_count_orders: ('total_orders', (3, 5))
windowing_operators_examples.windowed_count_orders: ('total_orders', (4, 5))
windowing_operators_examples.windowed_count_orders: ('total_orders', (5, 4))
windowing_operators_examples.windowed_count_orders: ('total_orders', (6, 6))
windowing_operators_examples.windowed_count_orders: ('total_orders', (7, 4))
windowing_operators_examples.windowed_count_orders: ('total_orders', (8, 8))
The first element in the tuple corresponds to the window ID, and the second element corresponds to the number of orders. Let’s format our output:
81# Format and output the results
82def format_output(item):
83 """Format the output for each window."""
84 key, (window_id, count) = item
85 window_start = windower.align_to + timedelta(minutes=30 * window_id)
86 window_end = window_start + timedelta(minutes=30)
87 return f"Window {window_id}\
88 ({window_start.strftime('%H:%M')}\
89 - {window_end.strftime('%H:%M')}): {count} orders"
90
91
92formatted = op.map("format_output", windowed.down, format_output)
93
94# Inspect the output
95op.inspect("output", formatted)
Executing our dataflow then returns the following:
python -m bytewax.run tumbling_dataflow:flow
windowing_operators_examples.formatted_objective_1: 'Window 0 (08:00 - 08:30): 5 orders'
windowing_operators_examples.formatted_objective_1: 'Window 1 (08:30 - 09:00): 7 orders'
windowing_operators_examples.formatted_objective_1: 'Window 2 (09:00 - 09:30): 6 orders'
windowing_operators_examples.formatted_objective_1: 'Window 3 (09:30 - 10:00): 5 orders'
windowing_operators_examples.formatted_objective_1: 'Window 4 (10:00 - 10:30): 5 orders'
windowing_operators_examples.formatted_objective_1: 'Window 5 (10:30 - 11:00): 4 orders'
windowing_operators_examples.formatted_objective_1: 'Window 6 (11:00 - 11:30): 6 orders'
windowing_operators_examples.formatted_objective_1: 'Window 7 (11:30 - 12:00): 4 orders'
windowing_operators_examples.formatted_objective_1: 'Window 8 (12:00 - 12:30): 8 orders'
With the TumblingWindower
and count_window
operators, we were able to create 30 minute chunks and count the orders within those chunks. Let’s now take a look at our second objective.
Objective 2: Track smoothie orders using overlapping windows for moving number of orders#
We will use the SlidingWindower
and count_window
operators to acomplish our goal. In the code below, we defined a sliding window with a length of 1 hour with an offset of 15 minutes. It will count the total orders in each window.
We’ve also added a formatting function.
50# Define the clock using event timestamps
51def extract_timestamp(x):
52 """Extract the timestamp from the data."""
53 return x[1] # Extract the timestamp from the data
54
55
56# Define clock based on event timestamps
57clock = EventClock(
58 ts_getter=extract_timestamp, wait_for_system_duration=timedelta(seconds=0)
59)
60
61
62# Create a sliding window with an offset of 15 minutes and length of 1 hour
63windower = SlidingWindower(
64 length=timedelta(hours=1),
65 offset=timedelta(minutes=15),
66 align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc),
67)
68
69
70# Count the number of orders in each window
71windowed = count_window(
72 step_id="count_orders_sliding",
73 up=parsed,
74 clock=clock,
75 windower=windower,
76 key=lambda x: "total_orders",
77)
78
79
80# Format and output the results
81def format_output_objective_2(item):
82 """Format the output for each window."""
83 key, (window_id, count) = item
84 window_start = windower.align_to + timedelta(minutes=15 * window_id)
85 window_end = window_start + windower.length
86 return f"Window {window_id}\
87 ({window_start.strftime('%H:%M')} -\
88 {window_end.strftime('%H:%M')}): {count} orders"
89
90
91formatted_2 = op.map("format_output_sliding", windowed.down, format_output_objective_2)
92op.inspect("formatted_objective_2", formatted_2)
Executing our dataflow then returns the following:
python -m bytewax.run sliding_dataflow:flow
windowing_operators_examples.formatted_objective_2: 'Window -3 (07:15 - 08:15): 3 orders'
windowing_operators_examples.formatted_objective_2: 'Window -2 (07:30 - 08:30): 5 orders'
windowing_operators_examples.formatted_objective_2: 'Window -1 (07:45 - 08:45): 9 orders'
windowing_operators_examples.formatted_objective_2: 'Window 0 (08:00 - 09:00): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 1 (08:15 - 09:15): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 2 (08:30 - 09:30): 13 orders'
windowing_operators_examples.formatted_objective_2: 'Window 3 (08:45 - 09:45): 11 orders'
windowing_operators_examples.formatted_objective_2: 'Window 4 (09:00 - 10:00): 11 orders'
windowing_operators_examples.formatted_objective_2: 'Window 5 (09:15 - 10:15): 11 orders'
windowing_operators_examples.formatted_objective_2: 'Window 6 (09:30 - 10:30): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 7 (09:45 - 10:45): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 8 (10:00 - 11:00): 9 orders'
windowing_operators_examples.formatted_objective_2: 'Window 9 (10:15 - 11:15): 8 orders'
windowing_operators_examples.formatted_objective_2: 'Window 10 (10:30 - 11:30): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 11 (10:45 - 11:45): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 12 (11:00 - 12:00): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 13 (11:15 - 12:15): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 14 (11:30 - 12:30): 12 orders'
windowing_operators_examples.formatted_objective_2: 'Window 15 (11:45 - 12:45): 10 orders'
windowing_operators_examples.formatted_objective_2: 'Window 16 (12:00 - 13:00): 8 orders'
windowing_operators_examples.formatted_objective_2: 'Window 17 (12:15 - 13:15): 4 orders'
There seem to be peak times - early morning and around lunch time. Let’s complete our final objective.
Objective 3: Group smoothie orders that occur closely together in time into dynamic session windows, windows close after inactivity#
We will use the SessionWindower
and collect_window
operators to acomplish our goal.
52# Define the clock using event timestamps
53def extract_timestamp(x):
54 """Extract the timestamp from the data."""
55 return x[1] # Extract the timestamp from the data
56
57
58clock = EventClock(
59 ts_getter=extract_timestamp,
60 wait_for_system_duration=timedelta(seconds=0),
61)
62
63windower = SessionWindower(gap=timedelta(minutes=5))
64
65# Count the number of orders in each window
66windowed = collect_window(
67 step_id="collect_sessions",
68 up=keyed,
69 clock=clock,
70 windower=windower,
71)
72
73# op.inspect("windowed_count_orders", windowed.down)
74
75
76def format_output(item):
77 """Format the output."""
78 key, (window_id, orders) = item
79 order_ids = [order[0] for order in orders]
80 timestamps = [order[1] for order in orders]
81 session_start = min(timestamps).strftime("%H:%M")
82 session_end = max(timestamps).strftime("%H:%M")
83 return f"Session {window_id}\
84 ({session_start} - {session_end}): Order ID {order_ids}"
85
86
87formatted = op.map("format_output", windowed.down, format_output)
88
89# Inspect the output
90op.inspect("output", formatted)
Executing our dataflow then returns the following:
python -m bytewax.run session_dataflow:flow
windowing_operators_examples.output: 'Session 0 (08:00 - 08:15): Order ID [1, 2, 3, 4]'
windowing_operators_examples.output: 'Session 1 (08:23 - 08:23): Order ID [5]'
windowing_operators_examples.output: 'Session 2 (08:31 - 08:38): Order ID [6, 7, 8]'
windowing_operators_examples.output: 'Session 3 (08:44 - 08:56): Order ID [9, 10, 11, 12]'
windowing_operators_examples.output: 'Session 4 (09:02 - 09:22): Order ID [13, 14, 15, 16, 17]'
windowing_operators_examples.output: 'Session 5 (09:28 - 09:28): Order ID [18]'
windowing_operators_examples.output: 'Session 6 (09:36 - 09:39): Order ID [19, 20]'
windowing_operators_examples.output: 'Session 7 (09:45 - 09:45): Order ID [21]'
windowing_operators_examples.output: 'Session 8 (09:51 - 09:51): Order ID [22]'
windowing_operators_examples.output: 'Session 9 (09:58 - 09:58): Order ID [23]'
windowing_operators_examples.output: 'Session 10 (10:05 - 10:14): Order ID [24, 25, 26]'
windowing_operators_examples.output: 'Session 11 (10:20 - 10:20): Order ID [27]'
windowing_operators_examples.output: 'Session 12 (10:28 - 10:33): Order ID [28, 29]'
windowing_operators_examples.output: 'Session 13 (10:41 - 10:41): Order ID [30]'
windowing_operators_examples.output: 'Session 14 (10:48 - 10:48): Order ID [31]'
windowing_operators_examples.output: 'Session 15 (10:56 - 10:56): Order ID [32]'
windowing_operators_examples.output: 'Session 16 (11:02 - 11:02): Order ID [33]'
windowing_operators_examples.output: 'Session 17 (11:09 - 11:09): Order ID [34]'
windowing_operators_examples.output: 'Session 18 (11:16 - 11:19): Order ID [35, 36]'
windowing_operators_examples.output: 'Session 19 (11:25 - 11:28): Order ID [37, 38]'
windowing_operators_examples.output: 'Session 20 (11:36 - 11:41): Order ID [39, 40]'
windowing_operators_examples.output: 'Session 21 (11:48 - 11:48): Order ID [41]'
windowing_operators_examples.output: 'Session 22 (11:56 - 12:29): Order ID [42, 43, 44, 45, 46, 47, 48, 49, 50]'
We can see that the busiest window was between 11:56 and 12:29, around lunch time!
Summary#
That’s it, now you have an understanding of how you can chain Bytewax operators along with windowing operators to perform data transformations, aggregations and obtain insights into time-series-based data.