Migration Guide#
This guide can help you upgrade code through breaking changes from one Bytewax version to the next. For a detailed list of all changes, see the CHANGELOG.
From v0.19 to v0.20#
Windowing Components Renamed#
Windowing operators have been moved from bytewax.operators.window
to bytewax.operators.windowing
.
In addition, ClockConfig
classes have had the Config
suffix
dropped from them, and WindowConfig
s have been renamed to
Windower
s. Other than the name changes, the functionality
is unchanged.
Before:
1import bytewax.operators.window as win
2from bytewax.operators.window import EventClockConfig, TumblingWindow
After:
import bytewax.operators.windowing as win
from bytewax.operators.windowing import EventClock, TumblingWindower
Windowing Operator Output#
Windowing operators now return a set of three streams bundled in a
WindowOut
object:
down
stream - window IDs and the resulting output from that operator.late
stream - items which were late and not assigned or processed in a window, but labeled with the window ID they would have been included in.meta
stream - window IDs and the finalWindowMetadata
describing the open and close times of that window.
Items in all three window output streams are now labeled with the unique int
window ID they were assigned to facilitate joining the data later to derive more
complex context about the resulting windows.
To recreate the exact downstream items that window operators emitted in v0.19,
you’ll now need to join
the down
stream with the
meta
stream on key and window ID, then remove the window ID. This transformation
only works with windowing operators that emit a single item downstream.
Before:
1from datetime import datetime, timedelta, timezone
2
3from bytewax.dataflow import Dataflow
4import bytewax.operators as op
5import bytewax.operators.window as win
6from bytewax.operators.window import SystemClockConfig, TumblingWindow
7from bytewax.testing import TestingSource
8
9events = [
10 {"user": "a", "val": 1},
11 {"user": "a", "val": 1},
12 {"user": "b", "val": 1},
13]
14
15flow = Dataflow("count")
16inp = op.input("inp", flow, TestingSource(events))
17counts = win.count_window(
18 "count",
19 inp,
20 SystemClock(),
21 TumblingWindow(
22 length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
23 ),
24 lambda x: x["user"],
25)
26op.inspect("inspect", counts)
27# ("user", (WindowMetadata(...), count_per_window))
After:
from datetime import datetime, timedelta, timezone
from typing import Tuple, TypeVar
import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import SystemClock, TumblingWindower, WindowMetadata
from bytewax.testing import TestingSource
events = [
{"user": "a", "val": 1},
{"user": "a", "val": 1},
{"user": "b", "val": 1},
]
flow = Dataflow("count")
inp = op.input("inp", flow, TestingSource(events))
count_out = win.count_window(
"count",
inp,
SystemClock(),
TumblingWindower(
length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
),
lambda x: x["user"],
)
# Returning just the counts per window: ('user', (window_id, count_per_window))
op.inspect("check_down", count_out.down)
X = TypeVar("X")
def rekey_by_window(key_id_obj: Tuple[str, Tuple[int, X]]) -> Tuple[str, X]:
key, id_obj = key_id_obj
win_id, obj = id_obj
return (f"{key}-{win_id}", obj)
keyed_metadata = op.map("rekey_meta", count_out.meta, rekey_by_window)
keyed_counts = op.map("rekey_counts", count_out.down, rekey_by_window)
joined_counts = op.join("check_joined", keyed_metadata, keyed_counts)
def unkey_join_rows(
rekey_row: Tuple[str, Tuple[WindowMetadata, int]],
) -> Tuple[str, Tuple[WindowMetadata, int]]:
rekey, row = rekey_row
key, _win_id = rekey.rsplit("-", maxsplit=1)
return (key, row)
# Returning the old output ('user', (WindowMetadata(..), count_per_window))
cleaned_joined_counts = op.map("unkey_joined", joined_counts, unkey_join_rows)
op.inspect("check_joined_counts", cleaned_joined_counts)
If your original dataflow ignored the WindowMetadata
, you can skip doing the above step and instead use down
directly without joining and drop the window ID.
Fold Window Merges#
fold_window
now requires a merger
callback that takes two fully formed accumulators and combines them into one. The merger
function
will be called with when the windower determines that two
windows must be merged. This most commonly happens when using the SessionWindower
and a new, out-of-order item bridges a gap.
Before:
1from datetime import datetime, timedelta, timezone
2from typing import List
3
4import bytewax.operators as op
5import bytewax.operators.window as win
6from bytewax.dataflow import Dataflow
7from bytewax.operators.window import EventClockConfig, SessionWindow
8from bytewax.testing import TestingSource
9
10align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
11events = [
12 {"user": "a", "val": 1, "time": align_to + timedelta(seconds=1)},
13 {"user": "a", "val": 2, "time": align_to + timedelta(seconds=3)},
14 {"user": "a", "val": 3, "time": align_to + timedelta(seconds=7)},
15]
16
17flow = Dataflow("merge_session")
18inp = op.input("inp", flow, TestingSource(events)).then(
19 op.key_on, "key_all", lambda _: "ALL"
20)
21
22
23def ts_getter(event: dict) -> datetime:
24 return event["time"]
25
26
27def add(acc: List[str], event: dict[str, str]) -> List[str]:
28 acc.append(event["val"])
29 return acc
30
31
32counts = win.fold_window(
33 "count",
34 inp,
35 EventClockConfig(ts_getter, wait_for_system_duration=timedelta(seconds=0)),
36 SessionWindow(gap=timedelta(seconds=3)),
37 list,
38 add,
39)
40op.inspect("inspect", counts.down)
After:
from datetime import datetime, timedelta, timezone
from typing import List
import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import EventClock, SessionWindower
from bytewax.testing import TestingSource
align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
events = [
{"user": "a", "val": 1, "time": align_to + timedelta(seconds=1)},
{"user": "a", "val": 2, "time": align_to + timedelta(seconds=3)},
{"user": "a", "val": 3, "time": align_to + timedelta(seconds=7)},
]
flow = Dataflow("merge_session")
inp = op.input("inp", flow, TestingSource(events)).then(
op.key_on, "key_all", lambda _: "ALL"
)
def ts_getter(event: dict) -> datetime:
return event["time"]
def add(acc: List[str], event: dict[str, str]) -> List[str]:
acc.append(event["val"])
return acc
counts = win.fold_window(
"count",
inp,
EventClock(ts_getter, wait_for_system_duration=timedelta(seconds=0)),
SessionWindower(gap=timedelta(seconds=3)),
list,
add,
list.__add__,
)
op.inspect("inspect", counts.down)
Join Modes#
To specify a running join
, now use mode="running"
instead of running=True
. To specify a product join_window
, use mode="product"
instead of product=True
. Both these operators now have more modes to choose from; see bytewax.operators.JoinInsertMode
and bytewax.operators.JoinEmitMode
.
Before:
1flow = Dataflow("running_join")
2
3names_l = [
4 {"user_id": 123, "name": "Bee"},
5 {"user_id": 456, "name": "Hive"},
6]
7
8names = op.input("names", flow, TestingSource(names_l))
9
10
11emails_l = [
12 {"user_id": 123, "email": "bee@bytewax.io"},
13 {"user_id": 456, "email": "hive@bytewax.io"},
14 {"user_id": 123, "email": "queen@bytewax.io"},
15]
16
17emails = op.input("emails", flow, TestingSource(emails_l))
18
19
20keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
21keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
22joined = op.join("join", keyed_names, keyed_emails, running=True)
23op.inspect("insp", joined)
After:
from datetime import datetime, timedelta, timezone
import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import EventClock, TumblingWindower
from bytewax.testing import TestingSource
from typing import Tuple, Any
align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
events = [
{"user": "a", "val": 1, "timestamp": align_to + timedelta(seconds=0)},
{"user": "a", "val": 1, "timestamp": align_to + timedelta(seconds=1)},
{"user": "b", "val": 1, "timestamp": align_to + timedelta(seconds=3)},
]
flow = Dataflow("count")
inp = op.input("inp", flow, TestingSource(events))
clock = EventClock(lambda x: x["timestamp"], wait_for_system_duration=timedelta(seconds=0))
counts = win.count_window(
"count",
inp,
clock,
TumblingWindower(
length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
),
lambda x: x["user"],
)
# Returning just the counts per window: ('user', (window_id, count_per_window))
op.inspect("new_output", counts.down)
def join_window_id(window_out: Tuple[str, Tuple[int, Any]]) -> Tuple[str, Any]:
(user_id, (window_id, count)) = window_out
return (f"{user_id}:{window_id}", count)
# Re-key each stream by user-id:window-id
keyed_metadata = op.map("k_md", counts.meta, join_window_id)
keyed_counts = op.map("k_count", counts.down, join_window_id)
# Join the output of the window and the metadata stream on user-id:window-id
joined_meta = op.join("joined_output", keyed_metadata, keyed_counts)
user_output = op.map("reformat-out", joined_meta, lambda x: (x[0].split(":")[0], x[1]))
op.inspect("old_output", user_output)
Removal of join_named
and join_window_named
#
The join_named
and join_window_named
operators have been removed as
they can’t be made to support fully type checked dataflows.
The same functionality is still available but with a slightly
differently shaped API via join
or join_window
:
Before:
1import bytewax.operators as op
2from bytewax.dataflow import Dataflow
3from bytewax.testing import TestingSource
4
5flow = Dataflow("join_eg")
6names_l = [
7 {"user_id": 123, "name": "Bee"},
8 {"user_id": 456, "name": "Hive"},
9]
10names = op.input("names", flow, TestingSource(names_l))
11emails_l = [
12 {"user_id": 123, "email": "bee@bytewax.io"},
13 {"user_id": 456, "email": "hive@bytewax.io"},
14 {"user_id": 123, "email": "queen@bytewax.io"},
15]
16emails = op.input("emails", flow, TestingSource(emails_l))
17keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
18keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
19joined = op.join_named("join", name=keyed_names, email=keyed_emails)
20op.inspect("check_join", joined)
After:
from dataclasses import dataclass
from typing import Dict, Tuple
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
flow = Dataflow("join_eg")
names_l = [
{"user_id": 123, "name": "Bee"},
{"user_id": 456, "name": "Hive"},
]
emails_l = [
{"user_id": 123, "email": "bee@bytewax.io"},
{"user_id": 456, "email": "hive@bytewax.io"},
{"user_id": 123, "email": "queen@bytewax.io"},
]
def id_field(field_name: str, x: Dict) -> Tuple[str, str]:
return (str(x["user_id"]), x[field_name])
names = op.input("names", flow, TestingSource(names_l))
emails = op.input("emails", flow, TestingSource(emails_l))
keyed_names = op.map("key_names", names, lambda x: id_field("name", x))
keyed_emails = op.map("key_emails", emails, lambda x: id_field("email", x))
joined = op.join("join", keyed_names, keyed_emails)
op.inspect("join_returns_tuples", joined)
@dataclass
class User:
user_id: int
name: str
email: str
def as_user(id_row: Tuple[str, Tuple[str, str]]) -> User:
user_id, row = id_row
# Unpack the tuple values in the id_row, and create the completed User
name, email = row
return User(int(user_id), name, email)
users = op.map("as_dict", joined, as_user)
op.inspect("inspect", users)
From v0.18 to v0.19#
Removal of the builder
argument from stateful_map
#
The builder
argument has been removed from
stateful_map
. The initial state value is
always None
and you can call your previous builder by hand in the
mapper
function.
Before:
1import bytewax.operators as op
2from bytewax.dataflow import Dataflow
3from bytewax.testing import TestingSource
4
5
6flow = Dataflow("builder_eg")
7keyed_amounts = op.input(
8 "inp",
9 flow,
10 TestingSource(
11 [
12 ("KEY", 1.0),
13 ("KEY", 6.0),
14 ("KEY", 2.0),
15 ]
16 ),
17)
18
19
20def running_builder():
21 return []
22
23
24def calc_running_mean(values, new_value):
25 values.append(new_value)
26 while len(values) > 3:
27 values.pop(0)
28
29 running_mean = sum(values) / len(values)
30 return (values, running_mean)
31
32
33running_means = op.stateful_map(
34 "running_mean", keyed_amounts, running_builder, calc_running_mean
35)
After:
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
flow = Dataflow("builder_eg")
keyed_amounts = op.input("inp", flow, TestingSource([
("KEY", 1.0),
("KEY", 6.0),
("KEY", 2.0),
]))
def calc_running_mean(values, new_value):
# On the initial value for this key, instead of the operator calling the
# builder for you, you can call it yourself when the state is un-initalized.
if values is None:
values = []
values.append(new_value)
while len(values) > 3:
values.pop(0)
running_mean = sum(values) / len(values)
return (values, running_mean)
running_means = op.stateful_map("running_mean", keyed_amounts, calc_running_mean)
Connector API Now Contains Step ID#
bytewax.inputs.FixedPartitionedSource.build_part
,
bytewax.inputs.DynamicSource.build
,
bytewax.outputs.FixedPartitionedSink.build_part
and
bytewax.outputs.DynamicSink.build
now take an additional
step_id
argument. This argument can be used as a label when creating
custom Python metrics.
Before:
1from bytewax.inputs import DynamicSource
2
3
4class PeriodicSource(DynamicSource):
5 def build(self, now: datetime, worker_index: int, worker_count: int):
6 pass
After:
from bytewax.inputs import DynamicSource
class PeriodicSource(DynamicSource):
def build(self, step_id: str, worker_index: int, worker_count: int):
pass
datetime
Arguments Removed for Performance#
bytewax.inputs.FixedPartitionedSource.build_part
,
bytewax.inputs.DynamicSource.build
and
bytewax.operators.UnaryLogic.on_item
no longer take a now: datetime
argument.
bytewax.inputs.StatefulSourcePartition.next_batch
,
bytewax.inputs.StatelessSourcePartition.next_batch
, and
bytewax.operators.UnaryLogic.on_notify
no longer take a sched: datetime
argument. Generating these values resulted in significant
overhead, even for the majority of sources and stateful operators that
never used them.
If you need the current time, you still can manually get the current time:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
If you need the previously scheduled awake time, store it in an
instance variable before returning it from
~bytewax.operators.UnaryLogic.notify_at
. Your design probably
already has that stored in an instance variable.
Standardization on Confluent’s Kafka Serialization Interface#
Bytewax’s bespoke Kafka schema registry and serialization interface
has been removed in favor of using
confluent_kafka.schema_registry.SchemaRegistryClient
and
confluent_kafka.serialization.Deserializer
s and
confluent_kafka.serialization.Serializer
s directly. This now
gives you direct control over all of the configuration options for
serialization and supports the full range of use cases. Bytewax’s
Kafka serialization operators in
bytewax.connectors.kafka.operators
now take these Confluent
types.
With Confluent Schema Registry#
If you are using Confluent’s schema registry (with it’s magic byte
prefix), you can pass serializers like
confluent_kafka.schema_registry.avro.AvroDeserializer
directly to our operators. See Confluent’s documentation for all the
options here.
Before:
1from bytewax.connectors.kafka import operators as kop
2from bytewax.connectors.kafka.registry import ConfluentSchemaRegistry
3
4flow = Dataflow("confluent_sr_eg")
5kinp = kop.input("inp", flow, brokers=["broker.test"], topics=["eg_topic"])
6
7sr_conf = {"url": "http://registry.test/", "basic.auth.user.info": "user:pass"}
8registry = ConfluentSchemaRegistry(SchemaRegistryClient(sr_conf))
9
10key_de = AvroDeserializer(client)
11val_de = AvroDeserializer(client)
12
13# Deserialize both key and value
14msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
After:
from bytewax.dataflow import Dataflow
from bytewax.connectors.kafka import operators as kop
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
flow = Dataflow("confluent_sr_eg")
kinp = kop.input("inp", flow, brokers=["broker.test"], topics=["eg_topic"])
# Confluent's SchemaRegistryClient
client = SchemaRegistryClient(
{"url": "http://registry.test/", "basic.auth.user.info": "user:pass"}
)
key_de = AvroDeserializer(client)
val_de = AvroDeserializer(client)
# Deserialize both key and value
msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
With Redpanda Schema Registry#
If you are using Redpanda’s schema registry or another setup for which
the serialized form does not use Confluent’s wire format, we provide
compatible (de)serializer classes in
bytewax.connectors.kafka.serde
.
Before:
1from bytewax.connectors.kafka import operators as kop
2from bytewax.connectors.kafka.registry import RedpandaSchemaRegistry, SchemaRef
3
4REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"
5
6registry = RedpandaSchemaRegistry(REDPANDA_REGISTRY_URL)
7key_de = registry.deserializer(SchemaRef("sensor-key"))
8val_de = registry.deserializer(SchemaRef("sensor-value"))
9
10msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
After:
1from bytewax.connectors.kafka import operators as kop
2from bytewax.connectors.kafka.serde import PlainAvroDeserializer
3
4from confluent_kafka.schema_registry import SchemaRegistryClient
5
6REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"
7
8client = SchemaRegistryClient({"url": REDPANDA_REGISTRY_URL})
9
10# Use plain Avro instead of Confluent's prefixed-Avro wire format.
11# We need to specify the schema in the deserializer too here.
12key_schema = client.get_latest_version("sensor-key").schema
13key_de = PlainAvroDeserializer(schema=key_schema)
14
15val_schema = client.get_latest_version("sensor-value").schema
16val_de = PlainAvroDeserializer(schema=val_schema)
17
18# Deserialize both key and value
19msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
From v0.17 to v0.18#
Non-Linear Dataflows#
With the addition of non-linear dataflows, the API for constructing dataflows has changed. Operators are now stand-alone functions that can take and return streams.
All operators, not just stateful ones, now require a step_id
; it
should be a "snake_case"
description of the semantic purpose of
that dataflow step.
Also instantiating the dataflow itself now takes a “dataflow ID” so you can disambiguate different dataflows in the metrics.
Before:
1from bytewax.dataflow import Dataflow
2from bytewax.testing import TestingSource
3from bytewax.connectors.stdio import StdOutput
4
5flow = Dataflow()
6flow.input("inp", TestingInput(range(10)))
7flow.map(lambda x: x + 1)
8flow.output("out", StdOutput())
After:
1import bytewax.operators as op
2
3from bytewax.dataflow import Dataflow
4from bytewax.testing import TestingSource
5from bytewax.connectors.stdio import StdOutSink
6
7flow = Dataflow("basic")
8# `op.input` takes the place of `flow.input` and takes a `Dataflow`
9# object as the first parameter.
10# `op.input` returns a `Stream[int]` in this example:
11stream = op.input("inp", flow, TestingSource(range(10)))
12# `op.map` takes a `Stream` as it's second argument, and
13# returns a new `Stream[int]` as it's return value.
14add_one_stream = op.map("add_one", stream, lambda x: x + 1)
15# `op.output` takes a stream as it's second argument, but
16# does not return a new `Stream`.
17op.output("out", add_one_stream, StdOutSink())
Kafka/RedPanda Input#
KafkaSource
has been updated.
KafkaSource
now returns a stream
of KafkaSourceMessage
dataclasses,
and a stream of errors rather than a stream of (key, value) tuples.
You can still use KafkaSource
and
KafkaSink
directly, or use the
custom operators in bytewax.connectors.kafka
to construct an
input source.
Before:
1from bytewax.connectors.kafka import KafkaInput, KafkaOutput
2from bytewax.connectors.stdio import StdOutput
3from bytewax.dataflow import Dataflow
4
5flow = Dataflow()
6flow.input("inp", KafkaInput(["localhost:9092"], ["input_topic"]))
7flow.output(
8 "out",
9 KafkaOutput(
10 brokers=["localhost:9092"],
11 topic="output_topic",
12 ),
13)
After:
1from typing import Tuple, Optional
2
3from bytewax import operators as op
4from bytewax.connectors.kafka import operators as kop
5from bytewax.connectors.kafka import KafkaSinkMessage
6from bytewax.dataflow import Dataflow
7
8flow = Dataflow("kafka_in_out")
9kinp = kop.input("inp", flow, brokers=["localhost:19092"], topics=["in_topic"])
10in_msgs = op.map("get_k_v", kinp.oks, lambda msg: (msg.key, msg.value))
11
12
13def wrap_msg(k_v):
14 k, v = k_v
15 return KafkaSinkMessage(k, v)
16
17
18out_msgs = op.map("wrap_k_v", in_msgs, wrap_msg)
19kop.output("out1", out_msgs, brokers=["localhost:19092"], topic="out_topic")
Renaming#
We have renamed the IO classes to better match their semantics and the way they are talked about in the documentation. Their functionality are unchanged. You should be able to search and replace for the old names and have them work correctly.
Old |
New |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
We have also updated the names of the built-in connectors to match this new naming scheme.
Old |
New |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Current Time Convenience#
In addition to the name changes, we have also added a datetime
argument to
build{_part,}
with the current time to allow you to easily
set up a next awake time. You can ignore this parameter if you are not scheduling
awake times.
We have also added a datetime
argument to next_batch
, which contains the
scheduled awake time for that source. Since awake times are scheduled, but not
guaranteed to fire at the precise time specified, you can use this parameter
to account for any difference.
SimplePollingSource
moved#
SimplePollingSource
has been moved from
bytewax.connectors.periodic
to bytewax.inputs
. You’ll need
to change your imports if you are using that class.
Before:
1from bytewax.connectors.periodic import SimplePollingSource
After:
1from bytewax.inputs import SimplePollingSource
Window Metadata#
Window operators now emit
WindowMetadata
objects
downstream. These objects can be used to introspect the open_time and
close_time of windows. This changes the output type of windowing
operators from a stream of: (key, values)
to a stream of (key, (metadata, values))
.
Recovery flags#
The default values for snapshot-interval
and backup-interval
have been removed
when running a dataflow with recovery enabled.
Previously, the defaults values were to create a snapshot every 10 seconds and keep a day’s worth of old snapshots. This means your recovery DB would max out at a size on disk theoretically thousands of times bigger than your in-memory state.
See Recovery for how to appropriately pick these values for your deployment.
Batch -> Collect#
In v0.18, we’ve renamed the batch
operator to
collect
so as to not be confused with
runtime batching. Behavior is unchanged.
From v0.16 to v0.17#
Bytewax v0.17 introduces major changes to the way that recovery works in order to support rescaling. In v0.17, the number of workers in a cluster can now be changed by stopping the dataflow execution and specifying a different number of workers on resume.
In addition to rescaling, we’ve changed the Bytewax inputs and outputs API to support batching which has yielded significant performance improvements.
In this article, we’ll go over the updates we’ve made to our API and explain the changes you’ll need to make to your dataflows to upgrade to v0.17.
Recovery changes#
In v0.17, recovery has been changed to support rescaling the number of workers in a dataflow. You now pre-create a fixed number of SQLite recovery DB files before running the dataflow.
SQLite recovery DBs created with versions of Bytewax prior to v0.17 are not compatible with this release.
Creating recovery partitions#
Creating recovery stores has been moved to a separate step from running your dataflow.
Recovery partitions must be pre-initialized before running the dataflow initially. This is done by executing this module:
$ python -m bytewax.recovery db_dir/ 4
This will create a set of partitions:
$ ls db_dir/
part-0.sqlite3
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
Once the recovery partition files have been created, they must be placed in locations that are accessible to the workers. The cluster has a whole must have access to all partitions, but any given worker need not have access to any partition in particular (or any at all). It is ok if a given partition is accesible by multiple workers; only one worker will use it.
If you are not running in a cluster environment but on a single machine, placing all the partitions in a single local filesystem directory is fine.
Although the partition init script will not create these, partitions after execution may consist of multiple files:
$ ls db_dir/
part-0.sqlite3
part-0.sqlite3-shm
part-0.sqlite3-wal
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
part-3.sqlite3-shm
part-3.sqlite3-wal
You must remember to move the files with the prefix part-*.
all together.
Choosing the number of recovery partitions#
An important consideration when creating the initial number of recovery partitions; When rescaling the number of workers, the number of recovery partitions is currently fixed.
If you are scaling up the number of workers in your cluster to increase the total throughput of your dataflow, the work of writing recovery data to the recovery partitions will still be limited to the initial number of recovery partitions. If you will likely scale up your dataflow to accomodate increased demand, we recommend that that you consider creating more recovery partitions than you will initially need. Having multiple recovery partitions handled by a single worker is fine.
Epoch interval -> Snapshot interval#
We’ve renamed the cli option of epoch-interval
to
snapshot-interval
to better describe its affect on dataflow
execution. The snapshot interval is the system time duration (in
seconds) to snapshot state for recovery.
Recovering a dataflow can only happen on the boundaries of the most
recently completed snapshot across all workers, but be aware that
making the snapshot-interval
more frequent increases the amount of
recovery work and may impact performance.
Backup interval, and backing up recovery partitions.#
We’ve also introduced an additional parameter to running a dataflow:
backup-interval
.
When running a Dataflow with recovery enabled, it is recommended to back up your recovery partitions on a regular basis for disaster recovery.
The backup-interval
parameter is the length of time to wait before
“garbage collecting” older snapshots. This enables a dataflow to
successfully resume when backups of recovery partitions happen at
different times, which will be true in most distributed deployments.
This value should be set in excess of the interval you can guarantee that all recovery partitions will be backed up to account for transient failures. It defaults to one day.
If you attempt to resume from a set of recovery partitions for which the oldest and youngest backups are more than the backup interval apart, the resumed dataflow could have corrupted state.
Input and Output changes#
In v0.17, we have restructured input and output to support batching for increased throughput.
If you have created custom input connectors, you’ll need to update them to use the new API.
Input changes#
The list_parts
method has been updated to return a List[str]
instead of
a Set[str]
, and now should only reflect the available input partitions
that a given worker has access to. You no longer need to return the
complete set of partitions for all workers.
The next
method of StatefulSource
and StatelessSource
has been
changed to next_batch
and should to return a List
of elements each
time it is called. If there are no elements to return, this method
should return the empty list.
Next awake#
Input sources now have an optional next_awake
method which you can
use to schedule when the next next_batch
call should occur. You can
use this to “sleep” the input operator for a fixed amount of time
while you are waiting for more input.
The default behavior uses a simple heuristic to prevent a spin loop
when there is no input. Always use next_awake
rather than using a
time.sleep
in an input source.
See the periodic_input.py
example in the examples directory for an
implementation that uses this functionality.
Async input adapter#
We’ve included a new bytewax.inputs.batcher_async
to help you use
async Python libraries in Bytewax input sources. It lets you wrap an
async iterator and specify a maximum time and size to collect items
into a batch.
Using Kafka for recovery is now removed#
v0.17 removes the deprecated KafkaRecoveryConfig
as a recovery store
to support the ability to rescale the number of workers.
Support for additional platforms#
Bytewax is now available for linux/aarch64 and linux/armv7.
From 0.15 to 0.16#
Bytewax v0.16 introduces major changes to the way that custom Bytewax inputs are created, as well as improvements to windowing and execution. In this article, we’ll cover some of the updates we’ve made to our API to make upgrading to v0.16 easier.
Input changes#
In v0.16, we have restructured input to be based around partitions in
order to support rescaling stateful dataflows in the future. We have
also dropped the Config
suffix to our input classes. As an example,
KafkaInputConfig
has been renamed to KafkaInput
and has been moved
to bytewax.connectors.kafka.KafkaInput
.
ManualInputConfig
is now replaced by two base superclasses that can
be subclassed to create custom input sources. They are DynamicInput
and PartitionedInput
.
DynamicInput
#
DynamicInput
sources are input sources that support reading distinct
items from any number of workers concurrently.
DynamicInput
sources do not support resume state and thus generally
do not provide delivery guarantees better than at-most-once.
PartitionedInput
#
PartitionedInput
sources can keep internal state on the current
position of each partition. If a partition can “rewind” and resume
reading from a previous position, they can provide delivery guarantees
of at-least-once or better.
PartitionedInput
sources maintain the state of each source and
re-build that state during recovery.
Deprecating Kafka Recovery#
In order to better support rescaling Dataflows, the Kafka recovery store option is being deprecated and will be removed from a future release in favor of the SQLite recovery store.
Capture -> Output#
The capture
operator has been renamed to output
in v0.16 and is
now stateful, so requires a step ID:
In v0.15 and before:
1flow.capture(StdOutputConfig())
In v0.16+:
1from bytewax.dataflow import Dataflow
2from bytewax.connectors.stdio import StdOutput
3
4flow = Dataflow()
5flow.output("out", StdOutput())
ManualOutputConfig
is now replaced by two base superclasses that can
be subclassed to create custom output sinks. They are DynamicOutput
and PartitionedOutput
.
New entrypoint#
In Bytewax v0.16, the way that Dataflows are run has been simplified, and most execution functions have been removed.
Similar to other Python frameworks like Flask and FastAPI, Dataflows
can now be run using a Datfaflow import string in the
<module_name>:<dataflow_variable_name_or_factory_function>
format.
As an example, given a file named dataflow.py
with the following
contents:
1from bytewax.dataflow import Dataflow
2from bytewax.testing import TestingInput
3from bytewax.connectors.stdio import StdOutput
4
5flow = Dataflow()
6flow.input("in", TestingInput(range(3)))
7flow.output("out", StdOutput())
Since a Python file dataflow.py
defines a module dataflow
, the
Dataflow can be run with the following invocation:
> python -m bytewax.run dataflow
0
1
2
By default, Bytewax looks for a variable in the given module named
flow
, so we can eliminate the
<dataflow_variable_name_or_factory_function>
part of our import
string.
Processes, workers, recovery stores and other options can be
configured with command line flags or environment variables. For the
full list of options see the --help
command line flag:
> python -m bytewax.run --help
usage: python -m bytewax.run [-h] [-p PROCESSES] [-w WORKERS_PER_PROCESS] [-i PROCESS_ID] [-a ADDRESSES] [--sqlite-directory SQLITE_DIRECTORY] [--epoch-interval EPOCH_INTERVAL] import_str
Run a bytewax dataflow
positional arguments:
import_str Dataflow import string in the format <module_name>:<dataflow_variable_or_factory> Example: src.dataflow:flow or src.dataflow:get_flow('string_argument')
options:
-h, --help show this help message and exit
Scaling:
You should use either '-p' to spawn multiple processes on this same machine, or '-i/-a' to spawn a single process on different machines
-p PROCESSES, --processes PROCESSES
Number of separate processes to run [env: BYTEWAX_PROCESSES]
-w WORKERS_PER_PROCESS, --workers-per-process WORKERS_PER_PROCESS
Number of workers for each process [env: BYTEWAX_WORKERS_PER_PROCESS]
-i PROCESS_ID, --process-id PROCESS_ID
Process id [env: BYTEWAX_PROCESS_ID]
-a ADDRESSES, --addresses ADDRESSES
Addresses of other processes, separated by semicolumn: -a "localhost:2021;localhost:2022;localhost:2023" [env: BYTEWAX_ADDRESSES]
Recovery:
--sqlite-directory SQLITE_DIRECTORY
Passing this argument enables sqlite recovery in the specified folder [env: BYTEWAX_SQLITE_DIRECTORY]
--epoch-interval EPOCH_INTERVAL
Number of seconds between state snapshots [env: BYTEWAX_EPOCH_INTERVAL]
Porting a simple example from 0.15 to 0.16#
This is what a simple example looked like in 0.15
:
1import operator
2import re
3
4from datetime import timedelta, datetime
5
6from bytewax.dataflow import Dataflow
7from bytewax.inputs import ManualInputConfig
8from bytewax.outputs import StdOutputConfig
9from bytewax.execution import run_main
10from bytewax.window import SystemClockConfig, TumblingWindowConfig
11
12
13def input_builder(worker_index, worker_count, resume_state):
14 state = None # ignore recovery
15 for line in open("wordcount.txt"):
16 yield state, line
17
18
19def lower(line):
20 return line.lower()
21
22
23def tokenize(line):
24 return re.findall(r'[^\s!,.?":;0-9]+', line)
25
26
27def initial_count(word):
28 return word, 1
29
30
31def add(count1, count2):
32 return count1 + count2
33
34
35clock_config = SystemClockConfig()
36window_config = TumblingWindowConfig(length=timedelta(seconds=5))
37
38flow = Dataflow()
39flow.input("input", ManualInputConfig(input_builder))
40flow.map(lower)
41flow.flat_map(tokenize)
42flow.map(initial_count)
43flow.reduce_window("sum", clock_config, window_config, add)
44flow.capture(StdOutputConfig())
45
46run_main(flow)
To port the example to the 0.16
version we need to make a few
changes.
Imports#
Let’s start with the existing imports:
1from bytewax.inputs import ManualInputConfig
2from bytewax.outputs import StdOutputConfig
3from bytewax.execution import run_main
Becomes:
1from bytewax.connectors.files import FileInput
2from bytewax.connectors.stdio import StdOutput
We removed run_main
as it is now only used for unit testing
dataflows. Bytewax now has a built-in FileInput connector, which uses
the PartitionedInput
connector superclass.
Since we are using a built-in connector to read from this file, we can
delete our input_builder
function above.
Windowing#
Most of the operators from v0.15
are the same, but the start_at
parameter of windowing functions has been changed to align_to
. The
start_at
parameter incorrectly implied that there were no potential
windows before that time. What determines if an item is late for a
window is not the windowing definition, but the watermark of the
clock.
SlidingWindow
and TumblingWindow
now require the align_to
parameter. Previously, this was filled with the timestamp of the start
of the Dataflow, but must now be specified. Specifying this parameter
ensures that windows are consistent across Dataflow restarts, so make
sure that you don’t change this parameter between executions.
The old TumblingWindow
definition:
1clock_config = SystemClockConfig()
2window_config = TumblingWindowConfig(length=timedelta(seconds=5))
becomes:
1clock_config = SystemClockConfig()
2window_config = TumblingWindow(
3 length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
4)
Output and execution#
Similarly to the input
, the output configuration is now stateful.
capture
has been renamed to output
, and now takes a name, as all
stateful operators do.
So we move from this:
1flow.capture(StdOutputConfig())
To this:
1flow.output("out", StdOutput())
Complete code#
The complete code for the new simple example now looks like this:
1import operator
2import re
3
4from datetime import timedelta, datetime, timezone
5
6from bytewax.dataflow import Dataflow
7from bytewax.connectors.files import FileInput
8from bytewax.connectors.stdio import StdOutput
9from bytewax.window import SystemClockConfig, TumblingWindow
10
11
12def lower(line):
13 return line.lower()
14
15
16def tokenize(line):
17 return re.findall(r'[^\s!,.?":;0-9]+', line)
18
19
20def initial_count(word):
21 return word, 1
22
23
24def add(count1, count2):
25 return count1 + count2
26
27
28clock_config = SystemClockConfig()
29window_config = TumblingWindow(
30 length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
31)
32
33flow = Dataflow()
34flow.input("inp", FileInput("examples/sample_data/wordcount.txt"))
35flow.map(lower)
36flow.flat_map(tokenize)
37flow.map(initial_count)
38flow.reduce_window("sum", clock_config, window_config, add)
39flow.output("out", StdOutput())
Running our completed Dataflow looks like this, as we’ve named our
Dataflow variable flow
in a file named dataflow
:
> python -m bytewax.run dataflow:flow
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...
We can even run our sample Dataflow on multiple workers to process the file in parallel:
> python -m bytewax.run dataflow:flow -p2
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...
In the background, Bytewax has spawned two processes, each of which is processing a part of the file. To see this more clearly, you can start each worker by hand:
In one terminal, run:
> python -m bytewax.run dataflow:flow -i0 -a "localhost:2101;localhost:2102"
('not', 1)
('end', 1)
('opposing', 1)
('take', 1)
...
And in a second terminal, run:
> python -m bytewax.run dataflow:flow -i1 -a "localhost:2101;localhost:2102"
('question', 1)
('the', 3)
('troubles', 1)
('fortune', 1)
...
From 0.10 to 0.11#
Bytewax 0.11 introduces major changes to the way that Bytewax dataflows are structured, as well as improvements to recovery and windowing. This document outlines the major changes between Bytewax 0.10 and 0.11.
Input and epochs#
Bytewax is built on top of the Timely Dataflow framework. The idea of timestamps (which we refer to in Bytewax as epochs) is central to Timely Dataflow’s progress tracking mechanism.
Bytewax initially adopted an input model that included managing the epochs at which input was introduced. The 0.11 version of Bytewax removes the need to manage epochs directly.
Epochs continue to exist in Bytewax, but are now managed internally to
represent a unit of recovery. Bytewax dataflows that are configured
with recovery will shapshot their state after processing all items in
an epoch. In the event of recovery, Bytewax will resume a dataflow at
the last snapshotted state. The frequency of snapshotting can be
configured with an EpochConfig
Recoverable input#
Bytewax 0.11 will now allow you to recover the state of the input to your dataflow.
Manually constructed input functions, like those used with
ManualInputConfig
now take a third argument. If your dataflow is
interrupted, the third argument passed to your input function can be
used to reconstruct the state of your input at the last recovery
snapshot, provided you write your input logic accordingly. The
input_builder
function must return a tuple of (resume_state, datum).
Bytewax’s built-in input handlers, like KafkaInputConfig
are also
recoverable. KafkaInputConfig
will store information about consumer
offsets in the configured Bytewax recovery store. In the event of
recovery, KafkaInputConfig
will start reading from the offsets that
were last committed to the recovery store.
Stateful windowing#
Version 0.11 also introduces stateful windowing operators, including a
new fold_window
operator.
Previously, Bytewax included helper functions to manage windows in terms of epochs. Now that Bytewax manages epochs internally, windowing functions are now operators that appear as a processing step in a dataflow. Dataflows can now have more than one windowing step.
Bytewax’s stateful windowing operators are now built on top of its recovery system, and their operations can be recovered in the event of a failure. See the documentation on Recovery for more information.
Output configurations#
The 0.11 release of Bytewax adds some prepackaged output configuration options for common use-cases:
ManualOutputConfig
which calls a Python callback function for each
output item.
StdOutputConfig
which prints each output item to stdout.
Import path changes and removed entrypoints#
In Bytewax 0.11, the overall Python module structure has changed, and some execution entrypoints have been removed.
cluster_main
,spawn_cluster
, andrun_main
have moved tobytewax.execution
Dataflow
has moved tobytewax.dataflow
run
andrun_cluster
have been removed
Porting the Simple example from 0.10 to 0.11#
This is what the Simple example
looked like in 0.10
:
1import re
2
3from bytewax import Dataflow, run
4
5
6def file_input():
7 for line in open("wordcount.txt"):
8 yield 1, line
9
10
11def lower(line):
12 return line.lower()
13
14
15def tokenize(line):
16 return re.findall(r'[^\s!,.?":;0-9]+', line)
17
18
19def initial_count(word):
20 return word, 1
21
22
23def add(count1, count2):
24 return count1 + count2
25
26
27flow = Dataflow()
28flow.map(lower)
29flow.flat_map(tokenize)
30flow.map(initial_count)
31flow.reduce_epoch(add)
32flow.capture()
33
34
35for epoch, item in run(flow, file_input()):
36 print(item)
To port the example to the 0.11
version we need to make a few
changes.
Imports#
Let’s start with the existing imports:
1from bytewas import Dataflow, run
Becomes:
1from bytewax.dataflow import Dataflow
2from bytewax.execution import run_main
We moved from run
to run_main
as the execution API has been
simplified, and we can now just use the run_main
function to execute
our dataflow.
Input#
The way bytewax handles input changed with 0.11
. input
is now a
proper operator on the Dataflow, and the function now takes 3
parameters: worker_index
, worker_count
, resume_state
. This
allows us to distribute the input across workers, and to handle
recovery if we want to. We are not going to do that in this example,
so the change is minimal.
The input function goes from:
1def file_input():
2 for line in open("wordcount.txt"):
3 yield 1, line
to:
1def input_builder(worker_index, worker_count, resume_state):
2 state = None # ignore recovery
3 for line in open("wordcount.txt"):
4 yield state, line
So instead of manually yielding the epoch
in the input function, we
can either ignore it (passing None
as state), or handle the value to
implement recovery (see our docs on Recovery).
Then we need to wrap the input_builder
with ManualInputConfig
,
give it a name (“file_input” here) and pass it to the input
operator
(rather than the run
function):
1from bytewax.inputs import ManualInputConfig
2
3
4flow.input("file_input", ManualInputConfig(input_builder))
Operators#
Most of the operators are the same, but there is a notable change in
the flow: where we used reduce_epoch
we are now using
reduce_window
. Since the epochs concept is now considered an
internal detail in bytewax, we need to define a way to let the
reduce
operator know when to close a specific window. Previously
this was done everytime the epoch
changed, while now it can be
configured with a time window. We need two config objects to do this:
clock_config
window_config
The clock_config
is used to tell the window-based operators what
reference clock to use, here we use the SystemClockConfig
that just
uses the system’s clock. The window_config
is used to define the
time window we want to use. Here we’ll use the TumblingWindow
that
allows us to have tumbling windows defined by a length (timedelta
),
and we configure it to have windows of 5 seconds each.
So the old reduce_epoch
:
1flow.reduce_epoch(add)
becomes reduce_window
:
1from bytewax.window import SystemClockConfig, TumblingWindow
2
3
4clock_config = SystemClockConfig()
5window_config = TumblingWindow(length=timedelta(seconds=5))
6flow.reduce_window("sum", clock_config, window_config, add)
Output and execution#
Similarly to the input
, the output configuration is now part of an
operator, capture
. Rather than collecting the output in a python
iterator and then manually printing it, we can now configure the
capture
operator to print to standard output.
Since all the input and output handling is now defined inside the Dataflow, we don’t need to pass this information to the execution method.
So we move from this:
1flow.capture()
2
3for epoch, item in run(flow, file_input()):
4 print(item)
To this:
1from bytewax.outputs import StdOutputConfig
2
3
4flow.capture(StdOutputConfig())
5
6run_main(flow)
Complete code#
The complete code for the new simple example now looks like this:
1import operator
2import re
3
4from datetime import timedelta, datetime
5
6from bytewax.dataflow import Dataflow
7from bytewax.inputs import ManualInputConfig
8from bytewax.outputs import StdOutputConfig
9from bytewax.execution import run_main
10from bytewax.window import SystemClockConfig, TumblingWindow
11
12
13def input_builder(worker_index, worker_count, resume_state):
14 state = None # ignore recovery
15 for line in open("wordcount.txt"):
16 yield state, line
17
18
19def lower(line):
20 return line.lower()
21
22
23def tokenize(line):
24 return re.findall(r'[^\s!,.?":;0-9]+', line)
25
26
27def initial_count(word):
28 return word, 1
29
30
31def add(count1, count2):
32 return count1 + count2
33
34
35clock_config = SystemClockConfig()
36window_config = TumblingWindow(length=timedelta(seconds=5))
37
38flow = Dataflow()
39flow.input("input", ManualInputConfig(input_builder))
40flow.map(lower)
41flow.flat_map(tokenize)
42flow.map(initial_count)
43flow.reduce_window("sum", clock_config, window_config, add)
44flow.capture(StdOutputConfig())
45
46run_main(flow)