Building Sessions from Search Logs
Here is a basic example of using Bytewax to turn an incoming stream of event logs from a hypothetical search engine into metrics over search sessions. In this example, we're going to focus on the dataflow itself and aggregating state, and gloss over details of building this processing into a larger system.
Let's start by defining a data model / schema for our incoming events. We'll make a little model class for all the relevant events we'd want to monitor.
from dataclasses import dataclass from typing import List @dataclass class AppOpen: user: int @dataclass class Search: user: int query: str @dataclass class Results: user: int items: List[str] @dataclass class ClickResult: user: int item: str @dataclass class AppClose: user: int @dataclass class Timeout: user: int
In a more mature system, these might come from external schema or be auto generated.
Now that we've got those, here's a small dump of some example data you could imagine coming from your app's events infrastructure.
Let's write a function that will yield one of these events at a time into our dataflow.
IMAGINE_THESE_EVENTS_STREAM_FROM_CLIENTS = [ AppOpen(user=1), Search(user=1, query="dogs"), # Eliding named args... Results(1, ["fido", "rover", "buddy"]), ClickResult(1, "rover"), Search(1, "cats"), Results(1, ["fluffy", "burrito", "kathy"]), ClickResult(1, "fluffy"), AppOpen(2), ClickResult(1, "kathy"), Search(2, "fruit"), AppClose(1), AppClose(2), ] def input_builder(worker_index, worker_count, resume_state): state = resume_state or None for line in IMAGINE_THESE_EVENTS_STREAM_FROM_CLIENTS: yield(state, line)
For the moment, we aren't going to be using our resume state to manage failures, but returning the empty state is a requirement for our input builder.
Let's talk about the tasks high-level plan for how to sessionize:
Searches are per-user, so we need to divvy up events by user.
Searches don't span user sessions, so we should calculate user sessions first.
Sessions without a search shouldn't contribute.
Calculate one metric: click through rate (or CTR), if a user clicked on any result in a search.
Now that we have some input data, let's start defining the computational steps of our dataflow based on our plan.
To start, create an empty
In this case, we'll use a
ManualInputConfig, which takes the
input_builder function that we defined above.
from bytewax.inputs import ManualInputConfig from bytewax.dataflow import Dataflow flow = Dataflow() flow.input("input", ManualInputConfig(input_builder))
Now that we have a Dataflow, and some input, we can add a series of steps to the dataflow. Steps are made up of operators, that provide a "shape" of transformation, and logic functions, that you supply to do your specific transformation. You can read more about all the operators in our documentation.
Our first task is to make sure to group incoming events by user since no session deals with multiple users.
All Bytewax operators that perform grouping require that their input
be in the form of a
(key, value) tuple, where
key is the string
the dataflow will group by before passing to the operator logic.
The operator which modifies all data flowing through it is map. Let's use that and pull each event's user ID as a string into that key position.
def initial_session(event): return str(event.user), [event] flow.map(initial_session)
For the value, we're planning ahead a little bit to our next task:
sessionization. The operator best shaped for this is the reduce
operator which groups items by key, then combines
them together into an aggregator in order. We can think about our
reduce step as "combine together sessions if they should be
joined". We'll be modeling a session as a list of events, so have the
values be a list of a single event
[event] that we will combine with
our reducer function.
Reduce requires two bits of logic:
How do I combine sessions? Since session are just Python lists, we can use the
+operator to add them (via the built-in
When is a session complete? In this case, a session is complete when the last item in the session is the app closing. We'll write a
session_has_closedfunction to answer that.
Reduce also takes a unique step ID to help organize the state saved internally.
import operator def session_has_closed(session): # isinstance does not work on objects sent through pickling, which # Bytewax does when there are multiple workers. return type(session[-1]).__name__ == "AppClose" flow.reduce("sessionizer", operator.add, session_has_closed)
We had to group by user because sessions were per-user, but now that we have sessions, the grouping key is no longer necessary for metrics. Let's remove it with another map.
def remove_key(user_event): user, event = user_event return event flow.map(remove_key)
Our next task is to split user sessions into search sessions. To do that, we'll use the flat map operator, that allows you to emit multiple items downstream (search sessions) for each input item (user session).
We walk through each user session's events, then whenever we encounter
a search, emit downstream the previous events. This works just like
str.split but with objects.
def is_search(event): return type(event).__name__ == "Search" def split_into_searches(user_session): search_session =  for event in user_session: if is_search(event): yield search_session search_session =  search_session.append(event) yield search_session flow.flat_map(split_into_searches)
We can use the filter operator to get rid of all search sessions that don't contain searches and shouldn't contribute to metrics.
def has_search(search_session): return any(is_search(event) for event in search_session) flow.filter(has_search)
We can now move on to our final task: generating metric observations per search session. If there's a click during a search, the CTR is 1.0 for that search, 0.0 otherwise. Given those two extreme values, we can do further statistics to get things like CTR per day, etc
def has_click(search_session): return any(type(event).__name__ == "ClickResult" for event in search_session) def calc_ctr(search_session): if has_click(search_session): return 1.0 else: return 0.0 flow.map(calc_ctr)
Now that our dataflow is done, we can define a function to be called for each output item. In this example, we're just printing out what we've received.
from bytewax.outputs import ManualOutputConfig def output_builder(worker_index, worker_count): return print flow.capture(ManualOutputConfig(output_builder))
Now we're done with defining the dataflow. Let's run it!
Bytewax provides a few different entry points for executing your
dataflow, but because we're focusing on the dataflow in
this example, we're going to use
bytewax.execution.run_main which is the most basic
Let's take our example list of events and pipe it in:
from bytewax.execution import run_main run_main(flow)
Let's inspect the output and see if it makes sense.
1.0 1.0 0.0
Since the capture step is immediately after
calculating CTR, we should see one output item for each search
session. That checks out! There were three searches in the input:
"dogs", "cats", and "fruit". Only the first two resulted in a click,
so they contributed
1.0 to the CTR, while the no-click search