Migration Guide#

This guide can help you upgrade code through breaking changes from one Bytewax version to the next. For a detailed list of all changes, see the CHANGELOG.

From v0.19 to v0.20#

Windowing Components Renamed#

Windowing operators have been moved from bytewax.operators.window to bytewax.operators.windowing.

In addition, ClockConfig classes have had the Config suffix dropped from them, and WindowConfigs have been renamed to Windowers. Other than the name changes, the functionality is unchanged.

Before:

1import bytewax.operators.window as win
2from bytewax.operators.window import EventClockConfig, TumblingWindow

After:

import bytewax.operators.windowing as win
from bytewax.operators.windowing import EventClock, TumblingWindower

Windowing Operator Output#

Windowing operators now return a set of three streams bundled in a WindowOut object:

  1. down stream - window IDs and the resulting output from that operator.

  2. late stream - items which were late and not assigned or processed in a window, but labeled with the window ID they would have been included in.

  3. meta stream - window IDs and the final WindowMetadata describing the open and close times of that window.

Items in all three window output streams are now labeled with the unique int window ID they were assigned to facilitate joining the data later to derive more complex context about the resulting windows.

To recreate the exact downstream items that window operators emitted in v0.19, you’ll now need to join the down stream with the meta stream on key and window ID, then remove the window ID. This transformation only works with windowing operators that emit a single item downstream.

Before:

 1from datetime import datetime, timedelta, timezone
 2
 3from bytewax.dataflow import Dataflow
 4import bytewax.operators as op
 5import bytewax.operators.window as win
 6from bytewax.operators.window import SystemClockConfig, TumblingWindow
 7from bytewax.testing import TestingSource
 8
 9events = [
10    {"user": "a", "val": 1},
11    {"user": "a", "val": 1},
12    {"user": "b", "val": 1},
13]
14
15flow = Dataflow("count")
16inp = op.input("inp", flow, TestingSource(events))
17counts = win.count_window(
18    "count",
19    inp,
20    SystemClock(),
21    TumblingWindow(
22        length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
23    ),
24    lambda x: x["user"],
25)
26op.inspect("inspect", counts)
27# ("user", (WindowMetadata(...), count_per_window))

After:

from datetime import datetime, timedelta, timezone
from typing import Tuple, TypeVar

import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import SystemClock, TumblingWindower, WindowMetadata
from bytewax.testing import TestingSource

events = [
    {"user": "a", "val": 1},
    {"user": "a", "val": 1},
    {"user": "b", "val": 1},
]

flow = Dataflow("count")
inp = op.input("inp", flow, TestingSource(events))
count_out = win.count_window(
    "count",
    inp,
    SystemClock(),
    TumblingWindower(
        length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
    ),
    lambda x: x["user"],
)
# Returning just the counts per window: ('user', (window_id, count_per_window))
op.inspect("check_down", count_out.down)
X = TypeVar("X")


def rekey_by_window(key_id_obj: Tuple[str, Tuple[int, X]]) -> Tuple[str, X]:
    key, id_obj = key_id_obj
    win_id, obj = id_obj
    return (f"{key}-{win_id}", obj)


keyed_metadata = op.map("rekey_meta", count_out.meta, rekey_by_window)
keyed_counts = op.map("rekey_counts", count_out.down, rekey_by_window)
joined_counts = op.join("check_joined", keyed_metadata, keyed_counts)


def unkey_join_rows(
    rekey_row: Tuple[str, Tuple[WindowMetadata, int]],
) -> Tuple[str, Tuple[WindowMetadata, int]]:
    rekey, row = rekey_row
    key, _win_id = rekey.rsplit("-", maxsplit=1)
    return (key, row)


# Returning the old output ('user', (WindowMetadata(..), count_per_window))
cleaned_joined_counts = op.map("unkey_joined", joined_counts, unkey_join_rows)
op.inspect("check_joined_counts", cleaned_joined_counts)

If your original dataflow ignored the WindowMetadata, you can skip doing the above step and instead use down directly without joining and drop the window ID.

Fold Window Merges#

fold_window now requires a merger callback that takes two fully formed accumulators and combines them into one. The merger function will be called with when the windower determines that two windows must be merged. This most commonly happens when using the SessionWindower and a new, out-of-order item bridges a gap.

Before:

 1from datetime import datetime, timedelta, timezone
 2from typing import List
 3
 4import bytewax.operators as op
 5import bytewax.operators.window as win
 6from bytewax.dataflow import Dataflow
 7from bytewax.operators.window import EventClockConfig, SessionWindow
 8from bytewax.testing import TestingSource
 9
10align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
11events = [
12    {"user": "a", "val": 1, "time": align_to + timedelta(seconds=1)},
13    {"user": "a", "val": 2, "time": align_to + timedelta(seconds=3)},
14    {"user": "a", "val": 3, "time": align_to + timedelta(seconds=7)},
15]
16
17flow = Dataflow("merge_session")
18inp = op.input("inp", flow, TestingSource(events)).then(
19    op.key_on, "key_all", lambda _: "ALL"
20)
21
22
23def ts_getter(event: dict) -> datetime:
24    return event["time"]
25
26
27def add(acc: List[str], event: dict[str, str]) -> List[str]:
28    acc.append(event["val"])
29    return acc
30
31
32counts = win.fold_window(
33    "count",
34    inp,
35    EventClockConfig(ts_getter, wait_for_system_duration=timedelta(seconds=0)),
36    SessionWindow(gap=timedelta(seconds=3)),
37    list,
38    add,
39)
40op.inspect("inspect", counts.down)

After:

from datetime import datetime, timedelta, timezone
from typing import List

import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import EventClock, SessionWindower
from bytewax.testing import TestingSource

align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
events = [
    {"user": "a", "val": 1, "time": align_to + timedelta(seconds=1)},
    {"user": "a", "val": 2, "time": align_to + timedelta(seconds=3)},
    {"user": "a", "val": 3, "time": align_to + timedelta(seconds=7)},
]

flow = Dataflow("merge_session")
inp = op.input("inp", flow, TestingSource(events)).then(
    op.key_on, "key_all", lambda _: "ALL"
)


def ts_getter(event: dict) -> datetime:
    return event["time"]


def add(acc: List[str], event: dict[str, str]) -> List[str]:
    acc.append(event["val"])
    return acc


counts = win.fold_window(
    "count",
    inp,
    EventClock(ts_getter, wait_for_system_duration=timedelta(seconds=0)),
    SessionWindower(gap=timedelta(seconds=3)),
    list,
    add,
    list.__add__,
)
op.inspect("inspect", counts.down)

Join Modes#

To specify a running join, now use mode="running" instead of running=True. To specify a product join_window, use mode="product" instead of product=True. Both these operators now have more modes to choose from; see bytewax.operators.JoinInsertMode and bytewax.operators.JoinEmitMode.

Before:

 1flow = Dataflow("running_join")
 2
 3names_l = [
 4    {"user_id": 123, "name": "Bee"},
 5    {"user_id": 456, "name": "Hive"},
 6]
 7
 8names = op.input("names", flow, TestingSource(names_l))
 9
10
11emails_l = [
12    {"user_id": 123, "email": "bee@bytewax.io"},
13    {"user_id": 456, "email": "hive@bytewax.io"},
14    {"user_id": 123, "email": "queen@bytewax.io"},
15]
16
17emails = op.input("emails", flow, TestingSource(emails_l))
18
19
20keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
21keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
22joined = op.join("join", keyed_names, keyed_emails, running=True)
23op.inspect("insp", joined)

After:

from datetime import datetime, timedelta, timezone

import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import EventClock, TumblingWindower
from bytewax.testing import TestingSource
from typing import Tuple, Any

align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)
events = [
    {"user": "a", "val": 1, "timestamp": align_to + timedelta(seconds=0)},
    {"user": "a", "val": 1, "timestamp": align_to + timedelta(seconds=1)},
    {"user": "b", "val": 1, "timestamp": align_to + timedelta(seconds=3)},
]

flow = Dataflow("count")
inp = op.input("inp", flow, TestingSource(events))
clock = EventClock(lambda x: x["timestamp"], wait_for_system_duration=timedelta(seconds=0))
counts = win.count_window(
    "count",
    inp,
    clock,
    TumblingWindower(
        length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
    ),
    lambda x: x["user"],
)
# Returning just the counts per window: ('user', (window_id, count_per_window))
op.inspect("new_output", counts.down)

def join_window_id(window_out: Tuple[str, Tuple[int, Any]]) -> Tuple[str, Any]:
    (user_id, (window_id, count)) = window_out
    return (f"{user_id}:{window_id}", count)

# Re-key each stream by user-id:window-id
keyed_metadata = op.map("k_md", counts.meta, join_window_id)
keyed_counts = op.map("k_count", counts.down, join_window_id)
# Join the output of the window and the metadata stream on user-id:window-id
joined_meta = op.join("joined_output", keyed_metadata, keyed_counts)
user_output = op.map("reformat-out", joined_meta, lambda x: (x[0].split(":")[0], x[1]))
op.inspect("old_output", user_output)

Removal of join_named and join_window_named#

The join_named and join_window_named operators have been removed as they can’t be made to support fully type checked dataflows.

The same functionality is still available but with a slightly differently shaped API via join or join_window:

Before:

 1import bytewax.operators as op
 2from bytewax.dataflow import Dataflow
 3from bytewax.testing import TestingSource
 4
 5flow = Dataflow("join_eg")
 6names_l = [
 7    {"user_id": 123, "name": "Bee"},
 8    {"user_id": 456, "name": "Hive"},
 9]
10names = op.input("names", flow, TestingSource(names_l))
11emails_l = [
12    {"user_id": 123, "email": "bee@bytewax.io"},
13    {"user_id": 456, "email": "hive@bytewax.io"},
14    {"user_id": 123, "email": "queen@bytewax.io"},
15]
16emails = op.input("emails", flow, TestingSource(emails_l))
17keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x["name"]))
18keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x["email"]))
19joined = op.join_named("join", name=keyed_names, email=keyed_emails)
20op.inspect("check_join", joined)

After:

from dataclasses import dataclass
from typing import Dict, Tuple

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource

flow = Dataflow("join_eg")
names_l = [
    {"user_id": 123, "name": "Bee"},
    {"user_id": 456, "name": "Hive"},
]
emails_l = [
    {"user_id": 123, "email": "bee@bytewax.io"},
    {"user_id": 456, "email": "hive@bytewax.io"},
    {"user_id": 123, "email": "queen@bytewax.io"},
]


def id_field(field_name: str, x: Dict) -> Tuple[str, str]:
    return (str(x["user_id"]), x[field_name])


names = op.input("names", flow, TestingSource(names_l))
emails = op.input("emails", flow, TestingSource(emails_l))
keyed_names = op.map("key_names", names, lambda x: id_field("name", x))
keyed_emails = op.map("key_emails", emails, lambda x: id_field("email", x))
joined = op.join("join", keyed_names, keyed_emails)
op.inspect("join_returns_tuples", joined)


@dataclass
class User:
    user_id: int
    name: str
    email: str


def as_user(id_row: Tuple[str, Tuple[str, str]]) -> User:
    user_id, row = id_row
    # Unpack the tuple values in the id_row, and create the completed User
    name, email = row
    return User(int(user_id), name, email)


users = op.map("as_dict", joined, as_user)
op.inspect("inspect", users)

From v0.18 to v0.19#

Removal of the builder argument from stateful_map#

The builder argument has been removed from stateful_map. The initial state value is always None and you can call your previous builder by hand in the mapper function.

Before:

 1import bytewax.operators as op
 2from bytewax.dataflow import Dataflow
 3from bytewax.testing import TestingSource
 4
 5
 6flow = Dataflow("builder_eg")
 7keyed_amounts = op.input(
 8    "inp",
 9    flow,
10    TestingSource(
11        [
12            ("KEY", 1.0),
13            ("KEY", 6.0),
14            ("KEY", 2.0),
15        ]
16    ),
17)
18
19
20def running_builder():
21    return []
22
23
24def calc_running_mean(values, new_value):
25    values.append(new_value)
26    while len(values) > 3:
27        values.pop(0)
28
29    running_mean = sum(values) / len(values)
30    return (values, running_mean)
31
32
33running_means = op.stateful_map(
34    "running_mean", keyed_amounts, running_builder, calc_running_mean
35)

After:

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource


flow = Dataflow("builder_eg")
keyed_amounts = op.input("inp", flow, TestingSource([
    ("KEY", 1.0),
    ("KEY", 6.0),
    ("KEY", 2.0),
]))


def calc_running_mean(values, new_value):
    # On the initial value for this key, instead of the operator calling the
    # builder for you, you can call it yourself when the state is un-initalized.

    if values is None:
        values = []

    values.append(new_value)
    while len(values) > 3:
        values.pop(0)

    running_mean = sum(values) / len(values)
    return (values, running_mean)


running_means = op.stateful_map("running_mean", keyed_amounts, calc_running_mean)

Connector API Now Contains Step ID#

bytewax.inputs.FixedPartitionedSource.build_part, bytewax.inputs.DynamicSource.build, bytewax.outputs.FixedPartitionedSink.build_part and bytewax.outputs.DynamicSink.build now take an additional step_id argument. This argument can be used as a label when creating custom Python metrics.

Before:

1from bytewax.inputs import DynamicSource
2
3
4class PeriodicSource(DynamicSource):
5    def build(self, now: datetime, worker_index: int, worker_count: int):
6        pass

After:

from bytewax.inputs import DynamicSource


class PeriodicSource(DynamicSource):
    def build(self, step_id: str, worker_index: int, worker_count: int):
        pass

datetime Arguments Removed for Performance#

bytewax.inputs.FixedPartitionedSource.build_part, bytewax.inputs.DynamicSource.build and bytewax.operators.UnaryLogic.on_item no longer take a now: datetime argument. bytewax.inputs.StatefulSourcePartition.next_batch, bytewax.inputs.StatelessSourcePartition.next_batch, and bytewax.operators.UnaryLogic.on_notify no longer take a sched: datetime argument. Generating these values resulted in significant overhead, even for the majority of sources and stateful operators that never used them.

If you need the current time, you still can manually get the current time:

from datetime import datetime, timezone

now = datetime.now(timezone.utc)

If you need the previously scheduled awake time, store it in an instance variable before returning it from ~bytewax.operators.UnaryLogic.notify_at. Your design probably already has that stored in an instance variable.

Standardization on Confluent’s Kafka Serialization Interface#

Bytewax’s bespoke Kafka schema registry and serialization interface has been removed in favor of using confluent_kafka.schema_registry.SchemaRegistryClient and confluent_kafka.serialization.Deserializers and confluent_kafka.serialization.Serializers directly. This now gives you direct control over all of the configuration options for serialization and supports the full range of use cases. Bytewax’s Kafka serialization operators in bytewax.connectors.kafka.operators now take these Confluent types.

With Confluent Schema Registry#

If you are using Confluent’s schema registry (with it’s magic byte prefix), you can pass serializers like confluent_kafka.schema_registry.avro.AvroDeserializer directly to our operators. See Confluent’s documentation for all the options here.

Before:

 1from bytewax.connectors.kafka import operators as kop
 2from bytewax.connectors.kafka.registry import ConfluentSchemaRegistry
 3
 4flow = Dataflow("confluent_sr_eg")
 5kinp = kop.input("inp", flow, brokers=["broker.test"], topics=["eg_topic"])
 6
 7sr_conf = {"url": "http://registry.test/", "basic.auth.user.info": "user:pass"}
 8registry = ConfluentSchemaRegistry(SchemaRegistryClient(sr_conf))
 9
10key_de = AvroDeserializer(client)
11val_de = AvroDeserializer(client)
12
13# Deserialize both key and value
14msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)

After:

from bytewax.dataflow import Dataflow
from bytewax.connectors.kafka import operators as kop

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

flow = Dataflow("confluent_sr_eg")
kinp = kop.input("inp", flow, brokers=["broker.test"], topics=["eg_topic"])

# Confluent's SchemaRegistryClient
client = SchemaRegistryClient(
    {"url": "http://registry.test/", "basic.auth.user.info": "user:pass"}
)
key_de = AvroDeserializer(client)
val_de = AvroDeserializer(client)

# Deserialize both key and value
msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)

With Redpanda Schema Registry#

If you are using Redpanda’s schema registry or another setup for which the serialized form does not use Confluent’s wire format, we provide compatible (de)serializer classes in bytewax.connectors.kafka.serde.

Before:

 1from bytewax.connectors.kafka import operators as kop
 2from bytewax.connectors.kafka.registry import RedpandaSchemaRegistry, SchemaRef
 3
 4REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"
 5
 6registry = RedpandaSchemaRegistry(REDPANDA_REGISTRY_URL)
 7key_de = registry.deserializer(SchemaRef("sensor-key"))
 8val_de = registry.deserializer(SchemaRef("sensor-value"))
 9
10msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)

After:

 1from bytewax.connectors.kafka import operators as kop
 2from bytewax.connectors.kafka.serde import PlainAvroDeserializer
 3
 4from confluent_kafka.schema_registry import SchemaRegistryClient
 5
 6REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"
 7
 8client = SchemaRegistryClient({"url": REDPANDA_REGISTRY_URL})
 9
10# Use plain Avro instead of Confluent's prefixed-Avro wire format.
11# We need to specify the schema in the deserializer too here.
12key_schema = client.get_latest_version("sensor-key").schema
13key_de = PlainAvroDeserializer(schema=key_schema)
14
15val_schema = client.get_latest_version("sensor-value").schema
16val_de = PlainAvroDeserializer(schema=val_schema)
17
18# Deserialize both key and value
19msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)

From v0.17 to v0.18#

Non-Linear Dataflows#

With the addition of non-linear dataflows, the API for constructing dataflows has changed. Operators are now stand-alone functions that can take and return streams.

All operators, not just stateful ones, now require a step_id; it should be a "snake_case" description of the semantic purpose of that dataflow step.

Also instantiating the dataflow itself now takes a “dataflow ID” so you can disambiguate different dataflows in the metrics.

Before:

1from bytewax.dataflow import Dataflow
2from bytewax.testing import TestingSource
3from bytewax.connectors.stdio import StdOutput
4
5flow = Dataflow()
6flow.input("inp", TestingInput(range(10)))
7flow.map(lambda x: x + 1)
8flow.output("out", StdOutput())

After:

 1import bytewax.operators as op
 2
 3from bytewax.dataflow import Dataflow
 4from bytewax.testing import TestingSource
 5from bytewax.connectors.stdio import StdOutSink
 6
 7flow = Dataflow("basic")
 8# `op.input` takes the place of `flow.input` and takes a `Dataflow`
 9# object as the first parameter.
10# `op.input` returns a `Stream[int]` in this example:
11stream = op.input("inp", flow, TestingSource(range(10)))
12# `op.map` takes a `Stream` as it's second argument, and
13# returns a new `Stream[int]` as it's return value.
14add_one_stream = op.map("add_one", stream, lambda x: x + 1)
15# `op.output` takes a stream as it's second argument, but
16# does not return a new `Stream`.
17op.output("out", add_one_stream, StdOutSink())

Kafka/RedPanda Input#

KafkaSource has been updated. KafkaSource now returns a stream of KafkaSourceMessage dataclasses, and a stream of errors rather than a stream of (key, value) tuples.

You can still use KafkaSource and KafkaSink directly, or use the custom operators in bytewax.connectors.kafka to construct an input source.

Before:

 1from bytewax.connectors.kafka import KafkaInput, KafkaOutput
 2from bytewax.connectors.stdio import StdOutput
 3from bytewax.dataflow import Dataflow
 4
 5flow = Dataflow()
 6flow.input("inp", KafkaInput(["localhost:9092"], ["input_topic"]))
 7flow.output(
 8    "out",
 9    KafkaOutput(
10        brokers=["localhost:9092"],
11        topic="output_topic",
12    ),
13)

After:

 1from typing import Tuple, Optional
 2
 3from bytewax import operators as op
 4from bytewax.connectors.kafka import operators as kop
 5from bytewax.connectors.kafka import KafkaSinkMessage
 6from bytewax.dataflow import Dataflow
 7
 8flow = Dataflow("kafka_in_out")
 9kinp = kop.input("inp", flow, brokers=["localhost:19092"], topics=["in_topic"])
10in_msgs = op.map("get_k_v", kinp.oks, lambda msg: (msg.key, msg.value))
11
12
13def wrap_msg(k_v):
14    k, v = k_v
15    return KafkaSinkMessage(k, v)
16
17
18out_msgs = op.map("wrap_k_v", in_msgs, wrap_msg)
19kop.output("out1", out_msgs, brokers=["localhost:19092"], topic="out_topic")

Renaming#

We have renamed the IO classes to better match their semantics and the way they are talked about in the documentation. Their functionality are unchanged. You should be able to search and replace for the old names and have them work correctly.

Old

New

Input

Source

PartitionedInput

FixedPartitionedSource

DynamicInput

DynamicSource

StatefulSource

StatefulSourcePartition

StatelessSource

StatelessSourcePartition

SimplePollingInput

SimplePollingSource

Output

Sink

PartitionedOutput

FixedPartitionedSink

DynamicOutput

DynamicSink

StatefulSink

StatefulSinkPartition

StatelessSink

StatelessSinkPartition

We have also updated the names of the built-in connectors to match this new naming scheme.

Old

New

FileInput

FileSource

FileOutput

FileSink

DirInput

DirSource

DirOutput

DirSink

CSVInput

CSVSource

StdOutput

StdOutSink

KafkaInput

KafkaSource

KafkaOutput

KafkaSink

TestingInput

TestingSource

TestingOutput

TestingSink

Current Time Convenience#

In addition to the name changes, we have also added a datetime argument to build{_part,} with the current time to allow you to easily set up a next awake time. You can ignore this parameter if you are not scheduling awake times.

We have also added a datetime argument to next_batch, which contains the scheduled awake time for that source. Since awake times are scheduled, but not guaranteed to fire at the precise time specified, you can use this parameter to account for any difference.

SimplePollingSource moved#

SimplePollingSource has been moved from bytewax.connectors.periodic to bytewax.inputs. You’ll need to change your imports if you are using that class.

Before:

1from bytewax.connectors.periodic import SimplePollingSource

After:

1from bytewax.inputs import SimplePollingSource

Window Metadata#

Window operators now emit WindowMetadata objects downstream. These objects can be used to introspect the open_time and close_time of windows. This changes the output type of windowing operators from a stream of: (key, values) to a stream of (key, (metadata, values)).

Recovery flags#

The default values for snapshot-interval and backup-interval have been removed when running a dataflow with recovery enabled.

Previously, the defaults values were to create a snapshot every 10 seconds and keep a day’s worth of old snapshots. This means your recovery DB would max out at a size on disk theoretically thousands of times bigger than your in-memory state.

See Recovery for how to appropriately pick these values for your deployment.

Batch -> Collect#

In v0.18, we’ve renamed the batch operator to collect so as to not be confused with runtime batching. Behavior is unchanged.

From v0.16 to v0.17#

Bytewax v0.17 introduces major changes to the way that recovery works in order to support rescaling. In v0.17, the number of workers in a cluster can now be changed by stopping the dataflow execution and specifying a different number of workers on resume.

In addition to rescaling, we’ve changed the Bytewax inputs and outputs API to support batching which has yielded significant performance improvements.

In this article, we’ll go over the updates we’ve made to our API and explain the changes you’ll need to make to your dataflows to upgrade to v0.17.

Recovery changes#

In v0.17, recovery has been changed to support rescaling the number of workers in a dataflow. You now pre-create a fixed number of SQLite recovery DB files before running the dataflow.

SQLite recovery DBs created with versions of Bytewax prior to v0.17 are not compatible with this release.

Creating recovery partitions#

Creating recovery stores has been moved to a separate step from running your dataflow.

Recovery partitions must be pre-initialized before running the dataflow initially. This is done by executing this module:

$ python -m bytewax.recovery db_dir/ 4

This will create a set of partitions:

$ ls db_dir/
part-0.sqlite3
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3

Once the recovery partition files have been created, they must be placed in locations that are accessible to the workers. The cluster has a whole must have access to all partitions, but any given worker need not have access to any partition in particular (or any at all). It is ok if a given partition is accesible by multiple workers; only one worker will use it.

If you are not running in a cluster environment but on a single machine, placing all the partitions in a single local filesystem directory is fine.

Although the partition init script will not create these, partitions after execution may consist of multiple files:

$ ls db_dir/
part-0.sqlite3
part-0.sqlite3-shm
part-0.sqlite3-wal
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
part-3.sqlite3-shm
part-3.sqlite3-wal

You must remember to move the files with the prefix part-*. all together.

Choosing the number of recovery partitions#

An important consideration when creating the initial number of recovery partitions; When rescaling the number of workers, the number of recovery partitions is currently fixed.

If you are scaling up the number of workers in your cluster to increase the total throughput of your dataflow, the work of writing recovery data to the recovery partitions will still be limited to the initial number of recovery partitions. If you will likely scale up your dataflow to accomodate increased demand, we recommend that that you consider creating more recovery partitions than you will initially need. Having multiple recovery partitions handled by a single worker is fine.

Epoch interval -> Snapshot interval#

We’ve renamed the cli option of epoch-interval to snapshot-interval to better describe its affect on dataflow execution. The snapshot interval is the system time duration (in seconds) to snapshot state for recovery.

Recovering a dataflow can only happen on the boundaries of the most recently completed snapshot across all workers, but be aware that making the snapshot-interval more frequent increases the amount of recovery work and may impact performance.

Backup interval, and backing up recovery partitions.#

We’ve also introduced an additional parameter to running a dataflow: backup-interval.

When running a Dataflow with recovery enabled, it is recommended to back up your recovery partitions on a regular basis for disaster recovery.

The backup-interval parameter is the length of time to wait before “garbage collecting” older snapshots. This enables a dataflow to successfully resume when backups of recovery partitions happen at different times, which will be true in most distributed deployments.

This value should be set in excess of the interval you can guarantee that all recovery partitions will be backed up to account for transient failures. It defaults to one day.

If you attempt to resume from a set of recovery partitions for which the oldest and youngest backups are more than the backup interval apart, the resumed dataflow could have corrupted state.

Input and Output changes#

In v0.17, we have restructured input and output to support batching for increased throughput.

If you have created custom input connectors, you’ll need to update them to use the new API.

Input changes#

The list_parts method has been updated to return a List[str] instead of a Set[str], and now should only reflect the available input partitions that a given worker has access to. You no longer need to return the complete set of partitions for all workers.

The next method of StatefulSource and StatelessSource has been changed to next_batch and should to return a List of elements each time it is called. If there are no elements to return, this method should return the empty list.

Next awake#

Input sources now have an optional next_awake method which you can use to schedule when the next next_batch call should occur. You can use this to “sleep” the input operator for a fixed amount of time while you are waiting for more input.

The default behavior uses a simple heuristic to prevent a spin loop when there is no input. Always use next_awake rather than using a time.sleep in an input source.

See the periodic_input.py example in the examples directory for an implementation that uses this functionality.

Async input adapter#

We’ve included a new bytewax.inputs.batcher_async to help you use async Python libraries in Bytewax input sources. It lets you wrap an async iterator and specify a maximum time and size to collect items into a batch.

Using Kafka for recovery is now removed#

v0.17 removes the deprecated KafkaRecoveryConfig as a recovery store to support the ability to rescale the number of workers.

Support for additional platforms#

Bytewax is now available for linux/aarch64 and linux/armv7.

From 0.15 to 0.16#

Bytewax v0.16 introduces major changes to the way that custom Bytewax inputs are created, as well as improvements to windowing and execution. In this article, we’ll cover some of the updates we’ve made to our API to make upgrading to v0.16 easier.

Input changes#

In v0.16, we have restructured input to be based around partitions in order to support rescaling stateful dataflows in the future. We have also dropped the Config suffix to our input classes. As an example, KafkaInputConfig has been renamed to KafkaInput and has been moved to bytewax.connectors.kafka.KafkaInput.

ManualInputConfig is now replaced by two base superclasses that can be subclassed to create custom input sources. They are DynamicInput and PartitionedInput.

DynamicInput#

DynamicInput sources are input sources that support reading distinct items from any number of workers concurrently.

DynamicInput sources do not support resume state and thus generally do not provide delivery guarantees better than at-most-once.

PartitionedInput#

PartitionedInput sources can keep internal state on the current position of each partition. If a partition can “rewind” and resume reading from a previous position, they can provide delivery guarantees of at-least-once or better.

PartitionedInput sources maintain the state of each source and re-build that state during recovery.

Deprecating Kafka Recovery#

In order to better support rescaling Dataflows, the Kafka recovery store option is being deprecated and will be removed from a future release in favor of the SQLite recovery store.

Capture -> Output#

The capture operator has been renamed to output in v0.16 and is now stateful, so requires a step ID:

In v0.15 and before:

1flow.capture(StdOutputConfig())

In v0.16+:

1from bytewax.dataflow import Dataflow
2from bytewax.connectors.stdio import StdOutput
3
4flow = Dataflow()
5flow.output("out", StdOutput())

ManualOutputConfig is now replaced by two base superclasses that can be subclassed to create custom output sinks. They are DynamicOutput and PartitionedOutput.

New entrypoint#

In Bytewax v0.16, the way that Dataflows are run has been simplified, and most execution functions have been removed.

Similar to other Python frameworks like Flask and FastAPI, Dataflows can now be run using a Datfaflow import string in the <module_name>:<dataflow_variable_name_or_factory_function> format. As an example, given a file named dataflow.py with the following contents:

1from bytewax.dataflow import Dataflow
2from bytewax.testing import TestingInput
3from bytewax.connectors.stdio import StdOutput
4
5flow = Dataflow()
6flow.input("in", TestingInput(range(3)))
7flow.output("out", StdOutput())

Since a Python file dataflow.py defines a module dataflow, the Dataflow can be run with the following invocation:

> python -m bytewax.run dataflow
0
1
2

By default, Bytewax looks for a variable in the given module named flow, so we can eliminate the <dataflow_variable_name_or_factory_function> part of our import string.

Processes, workers, recovery stores and other options can be configured with command line flags or environment variables. For the full list of options see the --help command line flag:

> python -m bytewax.run --help
usage: python -m bytewax.run [-h] [-p PROCESSES] [-w WORKERS_PER_PROCESS] [-i PROCESS_ID] [-a ADDRESSES] [--sqlite-directory SQLITE_DIRECTORY] [--epoch-interval EPOCH_INTERVAL] import_str

Run a bytewax dataflow

positional arguments:
  import_str            Dataflow import string in the format <module_name>:<dataflow_variable_or_factory> Example: src.dataflow:flow or src.dataflow:get_flow('string_argument')

options:
  -h, --help            show this help message and exit

Scaling:
  You should use either '-p' to spawn multiple processes on this same machine, or '-i/-a' to spawn a single process on different machines

  -p PROCESSES, --processes PROCESSES
                        Number of separate processes to run [env: BYTEWAX_PROCESSES]
  -w WORKERS_PER_PROCESS, --workers-per-process WORKERS_PER_PROCESS
                        Number of workers for each process [env: BYTEWAX_WORKERS_PER_PROCESS]
  -i PROCESS_ID, --process-id PROCESS_ID
                        Process id [env: BYTEWAX_PROCESS_ID]
  -a ADDRESSES, --addresses ADDRESSES
                        Addresses of other processes, separated by semicolumn: -a "localhost:2021;localhost:2022;localhost:2023" [env: BYTEWAX_ADDRESSES]

Recovery:
  --sqlite-directory SQLITE_DIRECTORY
                        Passing this argument enables sqlite recovery in the specified folder [env: BYTEWAX_SQLITE_DIRECTORY]
  --epoch-interval EPOCH_INTERVAL
                        Number of seconds between state snapshots [env: BYTEWAX_EPOCH_INTERVAL]

Porting a simple example from 0.15 to 0.16#

This is what a simple example looked like in 0.15:

 1import operator
 2import re
 3
 4from datetime import timedelta, datetime
 5
 6from bytewax.dataflow import Dataflow
 7from bytewax.inputs import ManualInputConfig
 8from bytewax.outputs import StdOutputConfig
 9from bytewax.execution import run_main
10from bytewax.window import SystemClockConfig, TumblingWindowConfig
11
12
13def input_builder(worker_index, worker_count, resume_state):
14    state = None  # ignore recovery
15    for line in open("wordcount.txt"):
16        yield state, line
17
18
19def lower(line):
20    return line.lower()
21
22
23def tokenize(line):
24    return re.findall(r'[^\s!,.?":;0-9]+', line)
25
26
27def initial_count(word):
28    return word, 1
29
30
31def add(count1, count2):
32    return count1 + count2
33
34
35clock_config = SystemClockConfig()
36window_config = TumblingWindowConfig(length=timedelta(seconds=5))
37
38flow = Dataflow()
39flow.input("input", ManualInputConfig(input_builder))
40flow.map(lower)
41flow.flat_map(tokenize)
42flow.map(initial_count)
43flow.reduce_window("sum", clock_config, window_config, add)
44flow.capture(StdOutputConfig())
45
46run_main(flow)

To port the example to the 0.16 version we need to make a few changes.

Imports#

Let’s start with the existing imports:

1from bytewax.inputs import ManualInputConfig
2from bytewax.outputs import StdOutputConfig
3from bytewax.execution import run_main

Becomes:

1from bytewax.connectors.files import FileInput
2from bytewax.connectors.stdio import StdOutput

We removed run_main as it is now only used for unit testing dataflows. Bytewax now has a built-in FileInput connector, which uses the PartitionedInput connector superclass.

Since we are using a built-in connector to read from this file, we can delete our input_builder function above.

Windowing#

Most of the operators from v0.15 are the same, but the start_at parameter of windowing functions has been changed to align_to. The start_at parameter incorrectly implied that there were no potential windows before that time. What determines if an item is late for a window is not the windowing definition, but the watermark of the clock.

SlidingWindow and TumblingWindow now require the align_to parameter. Previously, this was filled with the timestamp of the start of the Dataflow, but must now be specified. Specifying this parameter ensures that windows are consistent across Dataflow restarts, so make sure that you don’t change this parameter between executions.

The old TumblingWindow definition:

1clock_config = SystemClockConfig()
2window_config = TumblingWindowConfig(length=timedelta(seconds=5))

becomes:

1clock_config = SystemClockConfig()
2window_config = TumblingWindow(
3    length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
4)

Output and execution#

Similarly to the input, the output configuration is now stateful. capture has been renamed to output, and now takes a name, as all stateful operators do.

So we move from this:

1flow.capture(StdOutputConfig())

To this:

1flow.output("out", StdOutput())

Complete code#

The complete code for the new simple example now looks like this:

 1import operator
 2import re
 3
 4from datetime import timedelta, datetime, timezone
 5
 6from bytewax.dataflow import Dataflow
 7from bytewax.connectors.files import FileInput
 8from bytewax.connectors.stdio import StdOutput
 9from bytewax.window import SystemClockConfig, TumblingWindow
10
11
12def lower(line):
13    return line.lower()
14
15
16def tokenize(line):
17    return re.findall(r'[^\s!,.?":;0-9]+', line)
18
19
20def initial_count(word):
21    return word, 1
22
23
24def add(count1, count2):
25    return count1 + count2
26
27
28clock_config = SystemClockConfig()
29window_config = TumblingWindow(
30    length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
31)
32
33flow = Dataflow()
34flow.input("inp", FileInput("examples/sample_data/wordcount.txt"))
35flow.map(lower)
36flow.flat_map(tokenize)
37flow.map(initial_count)
38flow.reduce_window("sum", clock_config, window_config, add)
39flow.output("out", StdOutput())

Running our completed Dataflow looks like this, as we’ve named our Dataflow variable flow in a file named dataflow:

> python -m bytewax.run dataflow:flow
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...

We can even run our sample Dataflow on multiple workers to process the file in parallel:

> python -m bytewax.run dataflow:flow -p2
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...

In the background, Bytewax has spawned two processes, each of which is processing a part of the file. To see this more clearly, you can start each worker by hand:

In one terminal, run:

> python -m bytewax.run dataflow:flow -i0 -a "localhost:2101;localhost:2102"
('not', 1)
('end', 1)
('opposing', 1)
('take', 1)
...

And in a second terminal, run:

> python -m bytewax.run dataflow:flow -i1 -a "localhost:2101;localhost:2102"
('question', 1)
('the', 3)
('troubles', 1)
('fortune', 1)
...

From 0.10 to 0.11#

Bytewax 0.11 introduces major changes to the way that Bytewax dataflows are structured, as well as improvements to recovery and windowing. This document outlines the major changes between Bytewax 0.10 and 0.11.

Input and epochs#

Bytewax is built on top of the Timely Dataflow framework. The idea of timestamps (which we refer to in Bytewax as epochs) is central to Timely Dataflow’s progress tracking mechanism.

Bytewax initially adopted an input model that included managing the epochs at which input was introduced. The 0.11 version of Bytewax removes the need to manage epochs directly.

Epochs continue to exist in Bytewax, but are now managed internally to represent a unit of recovery. Bytewax dataflows that are configured with recovery will shapshot their state after processing all items in an epoch. In the event of recovery, Bytewax will resume a dataflow at the last snapshotted state. The frequency of snapshotting can be configured with an EpochConfig

Recoverable input#

Bytewax 0.11 will now allow you to recover the state of the input to your dataflow.

Manually constructed input functions, like those used with ManualInputConfig now take a third argument. If your dataflow is interrupted, the third argument passed to your input function can be used to reconstruct the state of your input at the last recovery snapshot, provided you write your input logic accordingly. The input_builder function must return a tuple of (resume_state, datum).

Bytewax’s built-in input handlers, like KafkaInputConfig are also recoverable. KafkaInputConfig will store information about consumer offsets in the configured Bytewax recovery store. In the event of recovery, KafkaInputConfig will start reading from the offsets that were last committed to the recovery store.

Stateful windowing#

Version 0.11 also introduces stateful windowing operators, including a new fold_window operator.

Previously, Bytewax included helper functions to manage windows in terms of epochs. Now that Bytewax manages epochs internally, windowing functions are now operators that appear as a processing step in a dataflow. Dataflows can now have more than one windowing step.

Bytewax’s stateful windowing operators are now built on top of its recovery system, and their operations can be recovered in the event of a failure. See the documentation on Recovery for more information.

Output configurations#

The 0.11 release of Bytewax adds some prepackaged output configuration options for common use-cases:

ManualOutputConfig which calls a Python callback function for each output item.

StdOutputConfig which prints each output item to stdout.

Import path changes and removed entrypoints#

In Bytewax 0.11, the overall Python module structure has changed, and some execution entrypoints have been removed.

  • cluster_main, spawn_cluster, and run_main have moved to bytewax.execution

  • Dataflow has moved to bytewax.dataflow

  • run and run_cluster have been removed

Porting the Simple example from 0.10 to 0.11#

This is what the Simple example looked like in 0.10:

 1import re
 2
 3from bytewax import Dataflow, run
 4
 5
 6def file_input():
 7    for line in open("wordcount.txt"):
 8        yield 1, line
 9
10
11def lower(line):
12    return line.lower()
13
14
15def tokenize(line):
16    return re.findall(r'[^\s!,.?":;0-9]+', line)
17
18
19def initial_count(word):
20    return word, 1
21
22
23def add(count1, count2):
24    return count1 + count2
25
26
27flow = Dataflow()
28flow.map(lower)
29flow.flat_map(tokenize)
30flow.map(initial_count)
31flow.reduce_epoch(add)
32flow.capture()
33
34
35for epoch, item in run(flow, file_input()):
36    print(item)

To port the example to the 0.11 version we need to make a few changes.

Imports#

Let’s start with the existing imports:

1from bytewas import Dataflow, run

Becomes:

1from bytewax.dataflow import Dataflow
2from bytewax.execution import run_main

We moved from run to run_main as the execution API has been simplified, and we can now just use the run_main function to execute our dataflow.

Input#

The way bytewax handles input changed with 0.11. input is now a proper operator on the Dataflow, and the function now takes 3 parameters: worker_index, worker_count, resume_state. This allows us to distribute the input across workers, and to handle recovery if we want to. We are not going to do that in this example, so the change is minimal.

The input function goes from:

1def file_input():
2    for line in open("wordcount.txt"):
3        yield 1, line

to:

1def input_builder(worker_index, worker_count, resume_state):
2    state = None  # ignore recovery
3    for line in open("wordcount.txt"):
4        yield state, line

So instead of manually yielding the epoch in the input function, we can either ignore it (passing None as state), or handle the value to implement recovery (see our docs on Recovery).

Then we need to wrap the input_builder with ManualInputConfig, give it a name (“file_input” here) and pass it to the input operator (rather than the run function):

1from bytewax.inputs import ManualInputConfig
2
3
4flow.input("file_input", ManualInputConfig(input_builder))

Operators#

Most of the operators are the same, but there is a notable change in the flow: where we used reduce_epoch we are now using reduce_window. Since the epochs concept is now considered an internal detail in bytewax, we need to define a way to let the reduce operator know when to close a specific window. Previously this was done everytime the epoch changed, while now it can be configured with a time window. We need two config objects to do this:

  • clock_config

  • window_config

The clock_config is used to tell the window-based operators what reference clock to use, here we use the SystemClockConfig that just uses the system’s clock. The window_config is used to define the time window we want to use. Here we’ll use the TumblingWindow that allows us to have tumbling windows defined by a length (timedelta), and we configure it to have windows of 5 seconds each.

So the old reduce_epoch:

1flow.reduce_epoch(add)

becomes reduce_window:

1from bytewax.window import SystemClockConfig, TumblingWindow
2
3
4clock_config = SystemClockConfig()
5window_config = TumblingWindow(length=timedelta(seconds=5))
6flow.reduce_window("sum", clock_config, window_config, add)

Output and execution#

Similarly to the input, the output configuration is now part of an operator, capture. Rather than collecting the output in a python iterator and then manually printing it, we can now configure the capture operator to print to standard output.

Since all the input and output handling is now defined inside the Dataflow, we don’t need to pass this information to the execution method.

So we move from this:

1flow.capture()
2
3for epoch, item in run(flow, file_input()):
4    print(item)

To this:

1from bytewax.outputs import StdOutputConfig
2
3
4flow.capture(StdOutputConfig())
5
6run_main(flow)

Complete code#

The complete code for the new simple example now looks like this:

 1import operator
 2import re
 3
 4from datetime import timedelta, datetime
 5
 6from bytewax.dataflow import Dataflow
 7from bytewax.inputs import ManualInputConfig
 8from bytewax.outputs import StdOutputConfig
 9from bytewax.execution import run_main
10from bytewax.window import SystemClockConfig, TumblingWindow
11
12
13def input_builder(worker_index, worker_count, resume_state):
14    state = None  # ignore recovery
15    for line in open("wordcount.txt"):
16        yield state, line
17
18
19def lower(line):
20    return line.lower()
21
22
23def tokenize(line):
24    return re.findall(r'[^\s!,.?":;0-9]+', line)
25
26
27def initial_count(word):
28    return word, 1
29
30
31def add(count1, count2):
32    return count1 + count2
33
34
35clock_config = SystemClockConfig()
36window_config = TumblingWindow(length=timedelta(seconds=5))
37
38flow = Dataflow()
39flow.input("input", ManualInputConfig(input_builder))
40flow.map(lower)
41flow.flat_map(tokenize)
42flow.map(initial_count)
43flow.reduce_window("sum", clock_config, window_config, add)
44flow.capture(StdOutputConfig())
45
46run_main(flow)
Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now