Join Example#
This section will introduce some of the basic concepts of merging and joining streams of data.
Multiple input sources#
Bytewax dataflows can receive input from multiple sources. In the
following example, we create two TestingSource sources and
add them to our Dataflow as input.
1from bytewax import operators as op
2
3from bytewax.connectors.stdio import StdOutSink
4from bytewax.dataflow import Dataflow
5from bytewax.testing import TestingSource
6
7flow = Dataflow("join")
8
9src_1 = [
10 {"user_id": "123", "name": "Bumble"},
11]
12inp1 = op.input("inp1", flow, TestingSource(src_1))
13
14src_2 = [
15 {"user_id": "123", "email": "bee@bytewax.com"},
16 {"user_id": "456", "email": "hive@bytewax.com"},
17]
18inp2 = op.input("inp2", flow, TestingSource(src_2))
In order for our dataflow to process input from either of these
sources, we’ll need to create a Stream that combines input
from both of them, we can use the merge operator to do so:
1merged_stream = op.merge("merge", inp1, inp2)
Now that we have our merged stream, we can write it to standard out:
1op.inspect("debug", merged_stream)
> python -m bytewax.run merge_example
{'user_id': "123", 'name': 'Bumble'}
{'user_id': "123", 'email': 'bee@bytewax.com'}
{'user_id': "234", 'email': 'hive@bytewax.com'}
The dataflow will stop once all input sources are completely exhausted. Even though the input sources have different numbers of items, we see all of them.
Joining streams#
To create a streaming join of data from both of our input sources,
we’ll need to first choose a key that we want to join our streams on.
In our example data, we’ll use the user_id field.
1from bytewax import operators as op
2
3from bytewax.connectors.stdio import StdOutSink
4from bytewax.dataflow import Dataflow
5from bytewax.testing import TestingSource
6
7flow = Dataflow("join")
8
9src_1 = [
10 {"user_id": "123", "name": "Bumble"},
11]
12inp1 = op.input("inp1", flow, TestingSource(src_1))
13keyed_inp_1 = op.key_on("key_stream_1", inp1, lambda x: x["user_id"])
14src_2 = [
15 {"user_id": "123", "email": "bee@bytewax.com"},
16 {"user_id": "456", "email": "hive@bytewax.com"},
17]
18inp2 = op.input("inp2", flow, TestingSource(src_2))
19keyed_inp_2 = op.key_on("key_stream_2", inp2, lambda x: x["user_id"])
Now that we have our two keyed streams of data, we can join them
together with the join operator.
When creating a dataflow, you can use the inspect operator
to view the data in a stream. The inspect operator can be used
multiple times and counts as an output (recall that every dataflow
requires an output).
1merged_stream = op.join("join", keyed_inp_1, keyed_inp_2)
2op.inspect("debug", merged_stream)
Running this example, we should see the following output for our
stream, which includes the step_id for our inspect operator.
> python -m bytewax.run join_example
join.debug: ('123', ({'user_id': '123', 'name': 'Bumble'}, {'user_id': '123', 'email': 'bee@bytewax.com'})
Notice that we don’t see any output for user_id 456. Since we didn’t
receive any input for that key from inp2, we won’t see any output
for that user until we do.
For more details about the behavior of the join operator, see the Joins section of the documentation.