Building Sessions from Search Logs#

Description of SVG

Here is an introductory example of using Bytewax to turn an incoming stream of event logs from a hypothetical search engine into metrics over search sessions. In this example, we’re going to focus on the dataflow itself and aggregating state.

Skill Level

Time to Complete

Level

Basic, no prior knowledge requirement

Approx. 25 Min

Beginner

Your Takeaway#

This tutorial will teach you how to use Bytewax to detect and calculate the Click-Through Rate (CTR) on a custom session window on streaming data using a window and then calculate metrics downstream.

Resources#

bytewax/bytewax/docs/tutorials/search-logs/dataflow.py

Introduction and Problem Statement#

A simple, but useful metric in evaluating the effectiveness of online platforms, particularly search engines, is click-through rate (CTR). The CTR is the number of clicks on a search result or advertisement, divided by the total number of searches or views. This gives you a rough sense of the effectiveness of your search results.

This relevance of CTR extends to any enterprise aiming to understand user behavior, refine content relevancy, and ultimately, increase the profitability of online activities. As such, efficiently calculating and analyzing CTR is not only essential for enhancing user experience but also for driving strategic business decisions. The challenge, however, lies in accurately aggregating and processing streaming data to generate timely and actionable insights.

Our focus on developing a dataflow using Bytewax—an open-source Python framework for streaming data processing—addresses this challenge head-on. Bytewax allows for the real-time processing of large volumes of event data, which is particularly beneficial for organizations dealing with continuous streams of user interactions. This tutorial is specifically relevant for:

  • Digital Marketers: Who need to analyze user interaction to optimize ad placements and content strategy effectively.

  • Data Analysts and Scientists: Who require robust tools to process and interpret user data to derive insights that drive business intelligence.

  • Web Developers: Focused on improving site architecture and user interface to enhance user engagement and satisfaction.

  • Product Managers: Who oversee digital platforms and are responsible for increasing user engagement and retention through data-driven methodologies.

Strategy and Assumptions#

The key steps involved in this process include:

  • Defining a data model/schema for incoming events.

  • Generating input data to simulate user interactions.

  • Implementing logic functions to calculate CTR for each search session.

  • Creating a dataflow that incorporates windowing to process the incoming event stream.

  • Executing the dataflow to generate actionable insights.

Assumptions#

  • Searches are per-user, so we need to divvy up events by user.

  • Searches don’t span user sessions, so we should calculate user sessions first.

  • Sessions without a search shouldn’t contribute.

  • Calculate one metric: CTR, if a user clicked on any result in a search.

Imports and Setup#

Before we begin, let’s import the necessary modules and set up the environment for building the dataflow.

Complete installation - we recommend using a virtual environment to manage your Python dependencies. You can install Bytewax using pip:

$ python -m venv venv
$ ./venv/bin/activate
(venv) $ pip install bytewax==0.21.0

Now, let’s import the required modules and set up the environment for building the dataflow.

dataflow.py#
11from dataclasses import dataclass
12from datetime import datetime, timedelta, timezone
13from typing import List
14
15import bytewax.operators as op
16from bytewax.connectors.stdio import StdOutSink
17from bytewax.dataflow import Dataflow
18from bytewax.operators import windowing as win
19from bytewax.operators.windowing import EventClock, SessionWindower
20from bytewax.testing import TestingSource
21

Creating our Dataflow#

A dataflow is the deployable unit in Bytewax. Dataflows are data-parallel directed acyclic graphs that are made up of processing steps. Each step in the dataflow is an operator that processes data in some way.

We can initialize the dataflow as follows:

26# Create and configure the Dataflow
27flow = Dataflow("search_ctr")

Data Model#

Let’s start by defining a data model/schema for our incoming events. We’ll make model classes for all the relevant events we’d want to monitor.

32@dataclass
33class AppOpen:
34    """Represents an app opening event.
35
36    This class encapsulates the data for an app
37    opening event,including the user ID and the
38    timestamp when the app was opened.
39    """
40
41    user: int
42    time: datetime
43
44
45@dataclass
46class Search:
47    """Represents a search event.
48
49    This class encapsulates the data for an app
50    search event,including the user ID and the
51    timestamp when the app was opened.
52    """
53
54    user: int
55    query: str
56    time: datetime
57
58
59@dataclass
60class Results:
61    """Represents a search results event.
62
63    This class encapsulates the data for an app
64    result event, including the user ID and the
65    timestamp when the app was opened.
66    """
67
68    user: int
69    items: List[str]
70    time: datetime
71
72
73@dataclass
74class ClickResult:
75    """Represents a click result event.
76
77    This class encapsulates the data for an app
78    click event,including the user ID and the
79    timestamp when the app was opened.
80    """
81
82    user: int
83    item: str
84    time: datetime
85
86

In a production system, these might come from external schema or be auto generated.

Once the data model is defined, we can move on to generating input data to simulate user interactions. This will allow us to test our dataflow and logic functions before deploying them in a live environment. Let’s create two users and simulate their click activity as follows:

 90# Simulated events to emit into our Dataflow
 91align_to = datetime(2024, 1, 1, tzinfo=timezone.utc)
 92client_events = [
 93    Search(user=1, query="dogs", time=align_to + timedelta(seconds=5)),
 94    Results(
 95        user=1, items=["fido", "rover", "buddy"], time=align_to + timedelta(seconds=6)
 96    ),
 97    ClickResult(user=1, item="rover", time=align_to + timedelta(seconds=7)),
 98    Search(user=2, query="cats", time=align_to + timedelta(seconds=5)),
 99    Results(
100        user=2,
101        items=["fluffy", "burrito", "kathy"],
102        time=align_to + timedelta(seconds=6),
103    ),
104    ClickResult(user=2, item="fluffy", time=align_to + timedelta(seconds=7)),
105    ClickResult(user=2, item="kathy", time=align_to + timedelta(seconds=8)),
106]

The client events will constitute the data input for our dataflow, simulating user interactions with the search engine. The events will include user IDs, search queries, search results, and click activity. This data will be used to calculate the Click-Through Rate (CTR) for each search session.

Once the events have been created, we can add them to the dataflow as input data. This will allow us to process the events and calculate the CTR for each search session.

Bytewax has a TestingSource class that takes an enumerable list of events that it will emit, one at a time into our dataflow. TestingSource will be initialized with the list of events we created earlier in the variable client_events. This will be our input source for the dataflow.

110# Feed input data
111inp = op.input("inp", flow, TestingSource(client_events))

The next step is to define the logic functions that will process the incoming events and calculate the CTR for each search session. We will define two helper functions: user_event, and calculate_ctr, these functions will be used to process the incoming events and calculate the CTR for each search session.

  1. The user_event function will extract the user ID from the incoming event and use it as the key for grouping the events by user.

116def user_event(event):
117    """Extract user ID as a key and pass the event itself."""
118    return str(event.user), event
119
120
121# Map the user event function to the input
122user_event_map = op.map("user_event", inp, user_event)

All of Bytewax’s operators are in the bytewax.operators module, which we’ve imported here by a shorter name, op. We are using the map operator - it takes each event from the input and applies the user_event function. This function is transforming each event into a format suitable for grouping by user (key-value pairs where the key is the user ID).

  1. The calculate_ctr function will calculate the Click-Through Rate (CTR) for each search session based on the click activity in the session.

127def calc_ctr(window_out):
128    """Calculate the click-through rate (CTR)."""
129    _, search_session = window_out
130    user, events = search_session
131
132    searches = [event for event in events if isinstance(event, Search)]
133    clicks = [event for event in events if isinstance(event, ClickResult)]
134
135    print(f"User {user}: {len(searches)} searches, {len(clicks)} clicks")
136
137    return (user, len(clicks) / len(searches) if searches else 0)
138
139

We will now turn our attention to windowing the data. In a dataflow pipeline, the role of collecting windowed data, particularly after mapping user events, is crucial for segmenting the continuous stream of events into manageable, discrete chunks based on time or event characteristics. This step enables the aggregation and analysis of events within specific time frames or sessions, which is essential for understanding patterns, behaviors, and trends over time.

After user events are mapped, typically transforming each event into a tuple of (user_id, event_data), the next step is to group these events into windows. In this example, we will use a SessionWindower to group events by user sessions. We will also use an EventClock to manage the timing and order of events as they are processed through the dataflow.

143# Configure the event clock and session windower
144event_time_config: EventClock = EventClock(
145    ts_getter=lambda e: e.time, wait_for_system_duration=timedelta(seconds=1)
146)
147clock_config = SessionWindower(gap=timedelta(seconds=10))
148
149# Collect the windowed data
150window = win.collect_window(
151    "windowed_data", user_event_map, clock=event_time_config, windower=clock_config
152)
  • A clock defines the sense of time for the windowing operator and using an EventClock means we want to use the timestamps embedded in the events themselves to determine ordering. This allows out-of-order events to still be processed correctly.

  • The SessionWindower specifies how to group these timestamped events into sessions. A session window collects all events that occur within a specified gap of each other, allowing for dynamic window sizes based on the flow of incoming data

These configurations ensure that your dataflow can handle streaming data effectively, capturing user behavior in sessions and calculating relevant metrics like CTR in a way that is timely and reflective of actual user interactions. This setup is ideal for scenarios where user engagement metrics over time are critical, such as in digital marketing analysis, website optimization, or interactive application monitoring.

Once the events are grouped into windows, further processing can be performed on these grouped events, such as calculating metrics like CTR within each session. This step often involves applying additional functions to the windowed data to extract insights, such as counting clicks and searches to compute the CTR.

We can apply the calculate_ctr function to the windowed data to calculate the CTR for each search session. This step will provide a comprehensive overview of user engagement with search results, enabling data analysts, marketers, and developers to evaluate the effectiveness of search algorithms, content relevance, and user experience.

156# Calculate the click-through rate using the
157# calc_ctr function and the windowed data
158calc = op.map("calc_ctr", window.down, calc_ctr)

Returning results#

Finally, we can add an output step to our dataflow to return the results of the CTR calculation. This step will emit the CTR for each search session, providing a comprehensive overview of user engagement with search results.

162# Output the results to the standard output
163op.output("out", calc, StdOutSink())

Execution#

Now we’re done with defining the dataflow. Let’s run it! We can see that the CTR for each search session is calculated based on the simulated user interactions.

$ python -m bytewax.run dataflow:flow
User 1: 1 searches, 1 clicks
User 2: 1 searches, 2 clicks
('1', 1.0)
('2', 2.0)

Summary#

That’s it, now you have an understanding of how you can build custom session windows, how you can define data classes to be used in Bytewax, and how to calculate the click-through rate on a stream of logs.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now