Joins#

_Here we’re going to give a deep dive on how joins work in Bytewax. If you’d like a quick demo of “show me how to do a simple join”, see Join Example.

A join is a way to combine items in two or more streams into a single stream, matching them by a join key.

This is a very similar concept to the joins available in SQL, but there are some new edge cases and behaviors you have to be aware of when handling unbounded streams of data.

For example, you might have a stream of user names:

{"user_id": 123, "name": "Bee"}
{"user_id": 456, "name": "Hive"}

And another stream of email addresses.

{"user_id": 123, "email": "bee@bytewax.io"}
{"user_id": 456, "email": "hive@bytewax.io"}

And you’d like to be able to have access to both simultaneously, joined onto the same object:

{"user_id": 123, "name": "Bee", "email": "bee@bytewax.io"}
{"user_id": 456, "name": "Hive", "email": "hive@bytewax.io"}

Let’s setup a sample dataflow that gets that data into some streams:

 1from bytewax.dataflow import Dataflow
 2import bytewax.operators as op
 3from bytewax.testing import TestingSource
 4
 5flow = Dataflow("join_eg")
 6
 7names_l = [
 8    {"user_id": 123, "name": "Bee"},
 9    {"user_id": 456, "name": "Hive"},
10]
11names = op.input("names", flow, TestingSource(names_l))
12
13emails_l = [
14    {"user_id": 123, "email": "bee@bytewax.io"},
15    {"user_id": 456, "email": "hive@bytewax.io"},
16]
17emails = op.input("emails", flow, TestingSource(emails_l))

Bytewax provides the join, join_named, join_window, join_window_named operators to provide this functionality.

Join Keys#

All the join operators above are stateful operators and so require all of the upstreams to contain 2-tuples with the first element being a string called a key. A string is required so that the Bytewax runtime has a standard type it can use to route data correctly. For more information about state keys, see our State Keys.

If we wanted to join the data in the above streams, let’s key it by the user_id since that is what we want to bring the data together by.

Since user_id is an int in our input data, we need to pass it through str. And since it’d be redundant to keep the "user_id" key in the dict, we’ll map the value to just include the relevant field.

1keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
2keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))

Let’s inspect our streams to double check we know what they look like:

1import bytewax.testing
2
3op.inspect("check_names", keyed_names)
4op.inspect("check_emails", keyed_emails)
5
6bytewax.testing.run_main(flow)

Looks like we see our 2-tuples!

join_eg.check_names: ('123', 'Bee')
join_eg.check_emails: ('123', 'bee@bytewax.io')
join_eg.check_names: ('456', 'Hive')
join_eg.check_emails: ('456', 'hive@bytewax.io')

We took the input items, removed field names, and converted them to (key, value) 2-tuples with a distinct string key to put them the necessary format to do the join.

Complete Joins#

When doing a join in a SQL database, you look at the keys contained in two tables and bring the other columns together. In a streaming context, we have the concept of “key”, we have the concept of “column” (in the values for each key), but we don’t have the exact concept of “table”. Each stream we call a side and is sort of like a table, but for a SQL table there’s an obvious end to the table; when you run the join it takes the state of the table when you run the command.

In a streaming system, there is no guaranteed end to the stream, so our joins have to have slightly different semantics. The default behavior of the join operator takes any number of upstream sides and waits until we have seen a value for a key on all sides of the join, and only then do we emit the gathered values downstream as a tuple of the values in the same order as the sides stream arguments. When we wait for a value from all sides, Bytewax calls this a complete join. This is similar to an inner join in SQL.

Let’s see that in action. To recap our example:

 1flow = Dataflow("join_eg")
 2
 3names_l = [
 4    {"user_id": 123, "name": "Bee"},
 5    {"user_id": 456, "name": "Hive"},
 6]
 7names = op.input("names", flow, TestingSource(names_l))
 8
 9emails_l = [
10    {"user_id": 123, "email": "bee@bytewax.io"},
11    {"user_id": 456, "email": "hive@bytewax.io"},
12]
13emails = op.input("emails", flow, TestingSource(emails_l))
14
15keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
16keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))

Then let’s add a join operator taking the keyed_names stream as one side and keyed_emails stream as the other and view the output:

1joined = op.join("join", keyed_names, keyed_emails)
2
3op.inspect("check_join", joined)
4
5bytewax.testing.run_main(flow)

Alright! It looks like we gathered the names and emails for each key.

join_eg.check_join: ('123', ('Bee', 'bee@bytewax.io'))
join_eg.check_join: ('456', ('Hive', 'hive@bytewax.io'))

Missing Values#

What happens if we don’t have a value for a key? Let’s update our names input to add a name that won’t have an email. Then run the dataflow again.

 1names_l.clear()
 2names_l.extend(
 3    [
 4        {"user_id": 123, "name": "Bee"},
 5        {"user_id": 456, "name": "Hive"},
 6        {"user_id": 789, "name": "Pooh Bear"},
 7    ]
 8)
 9
10bytewax.testing.run_main(flow)

Hmm. It seems we didn’t get any output for Pooh.

join_eg.check_join: ('123', ('Bee', 'bee@bytewax.io'))
join_eg.check_join: ('456', ('Hive', 'hive@bytewax.io'))

This is because the inner join by default will wait forever until it gets a value for all sides for a key. It has no idea how long it should wait for Pooh’s email to arrive, so it stays there.

Another ramification of this, is that if you see a second value on one side for a key, you will also not see any output. For example, let’s update our input to have an email update for the Bee.

 1names_l.clear()
 2names_l.extend(
 3    [
 4        {"user_id": 123, "name": "Bee"},
 5        {"user_id": 456, "name": "Hive"},
 6    ]
 7)
 8
 9emails_l.clear()
10emails_l.extend(
11    [
12        {"user_id": 123, "email": "bee@bytewax.io"},
13        {"user_id": 456, "email": "hive@bytewax.io"},
14        {"user_id": 123, "email": "queen@bytewax.io"},
15    ]
16)
17
18bytewax.testing.run_main(flow)

Notice we still don’t see any new output for that user.

join_eg.check_join: ('123', ('Bee', 'bee@bytewax.io'))
join_eg.check_join: ('456', ('Hive', 'hive@bytewax.io'))

For our streaming inner join, by default, as soon as we see all the values for a key, we discard the join state and send it down stream. Thus when the second “email” value comes in, there’s no “name” value for the key "123", and the join operator waits until another value comes in.

Let’s visualize the state of the join operator evolving as it sees new items as a table. The table starts empty.

Key

Name Value

Email Value

Then it sees the first name value and emits nothing downstream.

Key

Name Value

Email Value

123

"Bee"

Then it sees the second name value and emits nothing downstream.

Key

Name Value

Email Value

123

"Bee"

456

"Hive"

Then finally it sees the first email value.

Key

Name Value

Email Value

123

"Bee"

"bee@bytewax.io"

456

"Hive"

Right after this point, it gathers together each of the values, and then clears that “row” in the “table” and the tuple is emitted downstream.

Key

Name Value

Email Value

456

"Hive"

('123', ('Bee', 'bee@bytewax.io'))

Let’s continue. We then see the second email come through and clear that row.

Key

Name Value

Email Value

('456', ('Hive', 'hive@bytewax.io'))

So note, now that the table is empty, when we see the email update for the Bee, there’s no name state. So nothing is emitted!

Key

Name Value

Email Value

123

"bee@bytewax.io"

Hopefully this helps clarify how basic streaming joins work. Realizing that the join operator only keeps the state around while it is waiting for all sides to complete is the trick to remember.

Running Joins#

Complete join semantics are useful in some cases, but on infinite data you could imagine other join semantics. What Bytewax calls a running join emits downstream the values for all sides of the join whenever any value comes in, and keeps the state around. This is similar to a full outer join in SQL. Pass running=True to join to enable a running join. By default joins are complete, as described in the previous section, and not running.

Let’s review what the dataflow would look like then:

 1flow = Dataflow("join_eg")
 2
 3names_l = [
 4    {"user_id": 123, "name": "Bee"},
 5    {"user_id": 456, "name": "Hive"},
 6]
 7names = op.input("names", flow, TestingSource(names_l))
 8
 9emails_l = [
10    {"user_id": 123, "email": "bee@bytewax.io"},
11    {"user_id": 456, "email": "hive@bytewax.io"},
12    {"user_id": 123, "email": "queen@bytewax.io"},
13]
14emails = op.input("emails", flow, TestingSource(emails_l))
15
16keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
17keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
18
19joined = op.join("join", keyed_names, keyed_emails, running=True)

Now let’s run the dataflow again an inspect the output.

1op.inspect("check_join", joined)
2
3bytewax.testing.run_main(flow)

Here’s what we get. Let’s visualize the progress and outputs of the join state table again but with running=True.

join_eg.check_join: ('123', ('Bee', None))
join_eg.check_join: ('123', ('Bee', 'bee@bytewax.io'))
join_eg.check_join: ('456', ('Hive', None))
join_eg.check_join: ('456', ('Hive', 'hive@bytewax.io'))
join_eg.check_join: ('123', ('Bee', 'queen@bytewax.io'))

First the table starts empty.

Key

Name Value

Email Value

Then we see the first name value. Since the table was updated in any way, it emits the current values for that key and keeps them in the table! If it doesn’t know a value yet, it fills it in as None.

Key

Name Value

Email Value

123

"Bee"

('123', ({'user_id': 123, 'name': 'Bee'}, None))

Then we see the second name value. The same thing happens

Key

Name Value

Email Value

123

"Bee"

456

"Hive"

('456', ({'user_id': 456, 'name': 'Hive'}, None))

Now we see the first email value. The same rules apply, but now since there are values for all the sides, we see them all in the output. The state for that key is not cleared!

Key

Name Value

Email Value

123

"Bee"

"bee@bytewax.io"

456

"Hive"

('123', ('Bee', 'bee@bytewax.io'))

Then we see the second email value.

Key

Name Value

Email Value

123

"Bee"

"bee@bytewax.io"

456

"Hive"

"hive@bytewax.io"

('456', ('Hive', 'hive@bytewax.io'))

Note that none of the state we have seen has been cleared. This now means when we see the updated Bee email, we’ll see some output!

Key

Name Value

Email Value

123

"Bee"

"queen@bytewax.io"

456

"Hive"

"hive@bytewax.io"

('123', ('Bee', 'queen@bytewax.io'))

So the running join is cool in that you can track updates to changes in values over time. There is also no concept of waiting for missing values: all missing values are replaced with None. But this comes with a downside! Because we never throw away the state for a key, this state keeps growing in memory forever if you keep adding keys. This might be the behavior you want, but realize that it does not come for free. A complete join is better if you know you’ll only get one value for each side for each key, since it discards the state after sending the values downstream.

Windowed Joins#

The streaming join assumes that a key could come anywhere in the entire lifetime of a stream. This means it could possibly wait forever for a value that will never arrive. Another option is to use a windowed join that always flushes out the values for a key whenever the time-based window closes. You can use this if you need to know the join values over an infinite stream when you aren’t sure that you’ll see values on all sides of the join.

Bytewax provides the operators join_window and join_window_named to implement this.

For the details of all the types of windows you can define and explanation of the parameters, see Windowing. We’re going to use a simple 1 hour tumbling window; the previous window closes and the next window starts at the top of each hour. We’ll be using event time.

 1from datetime import timedelta, datetime, timezone
 2from bytewax.operators.window import EventClockConfig, TumblingWindow
 3
 4clock = EventClockConfig(
 5    dt_getter=lambda x: x["at"], wait_for_system_duration=timedelta(0)
 6)
 7windower = TumblingWindow(
 8    length=timedelta(hours=1),
 9    align_to=datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
10)

Let’s assume we have input sources that are similar in shape to before, but now have timestamps.

 1flow = Dataflow("join_eg")
 2
 3names_l = [
 4    {
 5        "user_id": 123,
 6        "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
 7        "name": "Bee",
 8    },
 9    {
10        "user_id": 456,
11        "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
12        "name": "Hive",
13    },
14]
15names = op.input("names", flow, TestingSource(names_l))
16
17emails_l = [
18    {
19        "user_id": 123,
20        "at": datetime(2023, 12, 14, 0, 15, tzinfo=timezone.utc),
21        "email": "bee@bytewax.io",
22    },
23    {
24        "user_id": 456,
25        "at": datetime(2023, 12, 14, 1, 15, tzinfo=timezone.utc),
26        "email": "hive@bytewax.io",
27    },
28]
29emails = op.input("emails", flow, TestingSource(emails_l))

Since our objects are no longer just strings, let’s keep the values untouched so we have access to the timestamps.

1keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x))
2keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x))

Let’s inspect this just to double check we understand the shape.

1op.inspect("check_names", keyed_names)
2op.inspect("check_emails", keyed_emails)
3
4bytewax.testing.run_main(flow)

The values are entire dicts and we’ll still access the "at" key to use the event timestamp.

join_eg.check_names: ('123', {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'})
join_eg.check_emails: ('123', {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 15, tzinfo=datetime.timezone.utc), 'email': 'bee@bytewax.io'})
join_eg.check_names: ('456', {'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Hive'})
join_eg.check_emails: ('456', {'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 1, 15, tzinfo=datetime.timezone.utc), 'email': 'hive@bytewax.io'})

Let’s visualize what these windows and events look like on a timeline. Window start times are inclusive, so something right on a window border is in the next window.

gantt dateFormat YYYY-MM-DDTHH:mm axisFormat %Y-%m-%d %H:%M section Windows Zero: w0, 2023-12-13T23:00, 1h One: w1, after w0, 1h Two: w2, after w1, 1h section Names ('123', {'name'= 'Bee'}): milestone, 2023-12-14T00:00, 0d ('456', {'name'= 'Hive'}): milestone, 2023-12-14T00:00, 0d section Emails ('123', {'email'= 'bee@bytewax.io'}): milestone, 2023-12-14T00:15, 0d ('456', {'email'= 'hive@bytewax.io'}): milestone, 2023-12-14T01:15, 0d

So it looks like for key "123", we should see the name and email be joined because they occur in the same window. But for key "456" we should see one downstream item of just the name (because no email came for "456" in that window), and another downstream item of just the email (because no name came in that window).

Now let’s set up the windowed join and inspect the results to see if it matches that. To review, the entire dataflow is as follows.

 1import bytewax.operators.window as op_w
 2
 3flow = Dataflow("join_eg")
 4
 5names_l = [
 6    {
 7        "user_id": 123,
 8        "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
 9        "name": "Bee",
10    },
11    {
12        "user_id": 456,
13        "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
14        "name": "Hive",
15    },
16]
17names = op.input("names", flow, TestingSource(names_l))
18
19emails_l = [
20    {
21        "user_id": 123,
22        "at": datetime(2023, 12, 14, 0, 15, tzinfo=timezone.utc),
23        "email": "bee@bytewax.io",
24    },
25    {
26        "user_id": 456,
27        "at": datetime(2023, 12, 14, 1, 15, tzinfo=timezone.utc),
28        "email": "hive@bytewax.io",
29    },
30]
31emails = op.input("emails", flow, TestingSource(emails_l))
32
33keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x))
34keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x))
35
36joined = op_w.join_window("join", clock, windower, keyed_names, keyed_emails)
37
38op.inspect("check_join", joined)
39
40bytewax.testing.run_main(flow)

Looks like that’s what we see! Notice the None in the output for key "456". All window operators also emit the metadata about the window for analysis, but you can ignore that.

join_eg.check_join: ('456', (WindowMetadata(open_time: 2023-12-14 00:00:00 UTC, close_time: 2023-12-14 01:00:00 UTC), ({'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Hive'}, None)))
join_eg.check_join: ('123', (WindowMetadata(open_time: 2023-12-14 00:00:00 UTC, close_time: 2023-12-14 01:00:00 UTC), ({'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'}, {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 15, tzinfo=datetime.timezone.utc), 'email': 'bee@bytewax.io'})))
join_eg.check_join: ('456', (WindowMetadata(open_time: 2023-12-14 01:00:00 UTC, close_time: 2023-12-14 02:00:00 UTC), (None, {'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 1, 15, tzinfo=datetime.timezone.utc), 'email': 'hive@bytewax.io'})))

You will have to decide in a downstream step how to handle these None values. You might filter out joined items that contain any None values so you can focus only on the running list of complete updates, or fill in default values, etc.

Bytewax currently only supports complete windowed joins and does not support running windowed joins.

Product Joins#

Window operators, because they have a defined close time, also support another join type. The product join emits all of the combinations of all of the input values seen on a side.

For example, if we don’t change the join parameters, but update the input in the above dataflow to include multiple values in a window for a key.

 1names_l.clear()
 2names_l.extend(
 3    [
 4        {
 5            "user_id": 123,
 6            "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
 7            "name": "Bee",
 8        },
 9    ]
10)
11
12emails_l.clear()
13emails_l.extend(
14    [
15        {
16            "user_id": 123,
17            "at": datetime(2023, 12, 14, 0, 15, tzinfo=timezone.utc),
18            "email": "bee@bytewax.io",
19        },
20        {
21            "user_id": 123,
22            "at": datetime(2023, 12, 14, 0, 30, tzinfo=timezone.utc),
23            "email": "queen@bytewax.io",
24        },
25    ]
26)
27
28bytewax.testing.run_main(flow)

Notice how now we only have the latest email for the bee:

join_eg.check_join: ('123', (WindowMetadata(open_time: 2023-12-14 00:00:00 UTC, close_time: 2023-12-14 01:00:00 UTC), ({'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'}, {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 30, tzinfo=datetime.timezone.utc), 'email': 'queen@bytewax.io'})))

Now if we re-define the dataflow and use product=True, we can see all of the values for the Bee’s email in that window.

 1import bytewax.operators.window as op_w
 2
 3flow = Dataflow("join_eg")
 4
 5names = op.input("names", flow, TestingSource(names_l))
 6emails = op.input("emails", flow, TestingSource(emails_l))
 7
 8keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x))
 9keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x))
10
11joined = op_w.join_window(
12    "join", clock, windower, keyed_names, keyed_emails, product=True
13)
14
15op.inspect("check_join", joined)
16
17bytewax.testing.run_main(flow)

Notice there are now two output values for that key.

join_eg.check_join: ('123', (WindowMetadata(open_time: 2023-12-14 00:00:00 UTC, close_time: 2023-12-14 01:00:00 UTC), ({'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'}, {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 15, tzinfo=datetime.timezone.utc), 'email': 'bee@bytewax.io'})))
join_eg.check_join: ('123', (WindowMetadata(open_time: 2023-12-14 00:00:00 UTC, close_time: 2023-12-14 01:00:00 UTC), ({'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'}, {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 30, tzinfo=datetime.timezone.utc), 'email': 'queen@bytewax.io'})))

Named Joins#

The two previously described join operators have named versions. join, has join_named and join_window has join_window_named.

The named versions have identical parameters and join semantics, but in stead of emitting tuples downstream, they emit dicts in which the keys are the names of the keyword arguments you use to specify the upstream sides. This can help you keep track of the values of the joined data more easily in your code. Depending on the kinds of transformations you are doing downstream, it might make more sense to use named joins so that it makes it easiest to write that downstream code.

For example, given our original streaming join. If you remember the output was 2-tuples because there were two sides to the join.

 1flow = Dataflow("join_eg")
 2
 3names_l = [
 4    {"user_id": 123, "name": "Bee"},
 5    {"user_id": 456, "name": "Hive"},
 6]
 7names = op.input("names", flow, TestingSource(names_l))
 8
 9emails_l = [
10    {"user_id": 123, "email": "bee@bytewax.io"},
11    {"user_id": 456, "email": "hive@bytewax.io"},
12    {"user_id": 123, "email": "queen@bytewax.io"},
13]
14emails = op.input("emails", flow, TestingSource(emails_l))
15
16keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
17keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
18
19joined = op.join("join", keyed_names, keyed_emails)
20
21op.inspect("check_join", joined)
22
23bytewax.testing.run_main(flow)
join_eg.check_join: ('123', ('Bee', 'bee@bytewax.io'))
join_eg.check_join: ('456', ('Hive', 'hive@bytewax.io'))

If we change this to use an equivalent join_named instead:

 1flow = Dataflow("join_eg")
 2
 3names_l = [
 4    {"user_id": 123, "name": "Bee"},
 5    {"user_id": 456, "name": "Hive"},
 6]
 7names = op.input("names", flow, TestingSource(names_l))
 8
 9emails_l = [
10    {"user_id": 123, "email": "bee@bytewax.io"},
11    {"user_id": 456, "email": "hive@bytewax.io"},
12    {"user_id": 123, "email": "queen@bytewax.io"},
13]
14emails = op.input("emails", flow, TestingSource(emails_l))
15
16keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
17keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
18
19joined = op.join_named("join", name=keyed_names, email=keyed_emails)
20
21op.inspect("check_join", joined)
22
23bytewax.testing.run_main(flow)

Notice how the output is now dicts and how the kwargs name and email are represented as keys in the output dictionary.

join_eg.check_join: ('123', {'name': 'Bee', 'email': 'bee@bytewax.io'})
join_eg.check_join: ('456', {'name': 'Hive', 'email': 'hive@bytewax.io'})
Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now