Migration Guide#
This guide can help you upgrade code through breaking changes from one Bytewax version to the next. For a detailed list of all changes, see the CHANGELOG.
From v0.18 to v0.19#
Removal of the builder
argument from stateful_map
#
The builder
argument has been removed from
stateful_map
. The initial state value is
always None
and you can call your previous builder by hand in the
mapper
function.
Before:
1def running_builder():
2 return []
3
4
5def calc_running_mean(values, new_value):
6 values.append(new_value)
7 while len(values) > 3:
8 values.pop(0)
9
10 running_mean = sum(values) / len(values)
11 return (values, running_mean)
12
13
14running_means = op.stateful_map(
15 "running_mean", keyed_amounts, running_builder, calc_running_mean
16)
After:
1def calc_running_mean(values, new_value):
2 # On the initial value for this key, instead of the operator calling the
3 # builder for you, you can call it yourself when the state is un-initalized.
4
5 if values is None:
6 values = []
7
8 values.append(new_value)
9 while len(values) > 3:
10 values.pop(0)
11
12 running_mean = sum(values) / len(values)
13 return (values, running_mean)
14
15
16running_means = op.stateful_map("running_mean", keyed_amounts, calc_running_mean)
Connector API Now Contains Step ID#
bytewax.inputs.FixedPartitionedSource.build_part
,
bytewax.inputs.DynamicSource.build
,
bytewax.outputs.FixedPartitionedSink.build_part
and
bytewax.outputs.DynamicSink.build
now take an additional
step_id
argument. This argument can be used as a label when creating
custom Python metrics.
Before:
1from bytewax.inputs import DynamicSource
2
3
4class PeriodicSource(DynamicSource):
5 def build(self, now: datetime, worker_index: int, worker_count: int):
6 pass
After:
1from bytewax.inputs import DynamicSource
2
3
4class PeriodicSource(DynamicSource):
5 def build(self, step_id: str, worker_index: int, worker_count: int):
6 pass
datetime
Arguments Removed for Performance#
bytewax.inputs.FixedPartitionedSource.build_part
,
bytewax.inputs.DynamicSource.build
and
bytewax.operators.UnaryLogic.on_item
no longer take a now: datetime
argument.
bytewax.inputs.StatefulSourcePartition.next_batch
,
bytewax.inputs.StatelessSourcePartition.next_batch
, and
bytewax.operators.UnaryLogic.on_notify
no longer take a
sched: datetime
argument. Generating these values resulted in
significant overhead, even for the majority of sources and stateful
operators that never used them.
If you need the current time, you still can manually get the current time:
1from datetime import datetime, timezone
2
3now = datetime.now(timezone.utc)
If you need the previously scheduled awake time, store it in an
instance variable before returning it from
notify_at
. Your design
probably already has that stored in an instance variable.
Standardization on Confluent’s Kafka Serialization Interface#
Bytewax’s bespoke Kafka schema registry and serialization interface
has been removed in favor of using
confluent_kafka.schema_registry.SchemaRegistryClient
and
confluent_kafka.serialization.Deserializer
s and
confluent_kafka.serialization.Serializer
s directly. This now
gives you direct control over all of the configuration options for
serialization and supports the full range of use cases. Bytewax’s
Kafka serialization operators in
bytewax.connectors.kafka.operators
now take these Confluent
types.
With Confluent Schema Registry#
If you are using Confluent’s schema registry (with it’s magic byte
prefix), you can pass serializers like
confluent_kafka.schema_registry.avro.AvroDeserializer
directly to our operators. See Confluent’s documentation for all the
options here.
Before:
1from bytewax.connectors.kafka import operators as kop
2from bytewax.connectors.kafka.registry import ConfluentSchemaRegistry
3
4sr_conf = {"url": CONFLUENT_URL, "basic.auth.user.info": CONFLUENT_USERINFO}
5registry = ConfluentSchemaRegistry(SchemaRegistryClient(sr_conf))
6
7key_de = AvroDeserializer(client)
8val_de = AvroDeserializer(client)
9
10# Deserialize both key and value
11msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
After:
1from bytewax.connectors.kafka import operators as kop
2from confluent_kafka.schema_registry import SchemaRegistryClient
3from confluent_kafka.schema_registry.avro import AvroDeserializer
4
5# Confluent's SchemaRegistryClient
6client = SchemaRegistryClient(
7 {"url": CONFLUENT_URL, "basic.auth.user.info": CONFLUENT_USERINFO}
8)
9key_de = AvroDeserializer(client)
10val_de = AvroDeserializer(client)
11
12# Deserialize both key and value
13msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
With Redpanda Schema Registry#
If you are using Redpanda’s schema registry or another setup for which
the serialized form does not use Confluent’s wire format, we provide
compatible (de)serializer classes in
bytewax.connectors.kafka.serde
.
Before:
1from bytewax.connectors.kafka import operators as kop
2from bytewax.connectors.kafka.registry import RedpandaSchemaRegistry, SchemaRef
3
4REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"
5
6registry = RedpandaSchemaRegistry(REDPANDA_REGISTRY_URL)
7key_de = registry.deserializer(SchemaRef("sensor-key"))
8val_de = registry.deserializer(SchemaRef("sensor-value"))
9
10msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
After:
1from bytewax.connectors.kafka import operators as kop
2from confluent_kafka.schema_registry import SchemaRegistryClient
3from bytewax.connectors.kafka.serde import PlainAvroDeserializer
4
5REDPANDA_REGISTRY_URL = os.environ["REDPANDA_REGISTRY_URL"]
6# Redpanda's schema registry configuration
7client = SchemaRegistryClient({"url": REDPANDA_REGISTRY_URL})
8
9# Use plain avro instead of confluent's wire format.
10# We need to specify the schema in the deserializer too here.
11key_schema = client.get_latest_version("sensor-key").schema
12key_de = PlainAvroDeserializer(schema=key_schema)
13
14val_schema = client.get_latest_version("sensor-value").schema
15val_de = PlainAvroDeserializer(schema=val_schema)
16
17# Deserialize both key and value
18msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
From v0.17 to v0.18#
Non-Linear Dataflows#
With the addition of non-linear dataflows, the API for constructing dataflows has changed. Operators are now stand-alone functions that can take and return streams.
All operators, not just stateful ones, now require a step_id
; it
should be a "snake_case"
description of the semantic purpose of
that dataflow step.
Also instantiating the dataflow itself now takes a “dataflow ID” so you can disambiguate different dataflows in the metrics.
Before:
1from bytewax.dataflow import Dataflow
2from bytewax.testing import TestingSource
3from bytewax.connectors.stdio import StdOutput
4
5flow = Dataflow()
6flow.input("inp", TestingInput(range(10)))
7flow.map(lambda x: x + 1)
8flow.output("out", StdOutput())
After:
1import bytewax.operators as op
2
3from bytewax.dataflow import Dataflow
4from bytewax.testing import TestingSource
5from bytewax.connectors.stdio import StdOutSink
6
7flow = Dataflow("basic")
8# `op.input` takes the place of `flow.input` and takes a `Dataflow`
9# object as the first parameter.
10# `op.input` returns a `Stream[int]` in this example:
11stream = op.input("inp", flow, TestingSource(range(10)))
12# `op.map` takes a `Stream` as it's second argument, and
13# returns a new `Stream[int]` as it's return value.
14add_one_stream = op.map("add_one", stream, lambda x: x + 1)
15# `op.output` takes a stream as it's second argument, but
16# does not return a new `Stream`.
17op.output("out", add_one_stream, StdOutSink())
Kafka/RedPanda Input#
KafkaSource
has been updated.
KafkaSource
now returns a stream
of KafkaSourceMessage
dataclasses,
and a stream of errors rather than a stream of (key, value) tuples.
You can still use KafkaSource
and
KafkaSink
directly, or use the
custom operators in bytewax.connectors.kafka
to construct an
input source.
Before:
1from bytewax.connectors.kafka import KafkaInput, KafkaOutput
2from bytewax.connectors.stdio import StdOutput
3from bytewax.dataflow import Dataflow
4
5flow = Dataflow()
6flow.input("inp", KafkaInput(["localhost:9092"], ["input_topic"]))
7flow.output(
8 "out",
9 KafkaOutput(
10 brokers=["localhost:9092"],
11 topic="output_topic",
12 ),
13)
After:
1from typing import Tuple, Optional
2
3from bytewax import operators as op
4from bytewax.connectors.kafka import operators as kop
5from bytewax.connectors.kafka import KafkaSinkMessage
6from bytewax.dataflow import Dataflow
7
8flow = Dataflow("kafka_in_out")
9kinp = kop.input("inp", flow, brokers=["localhost:19092"], topics=["in_topic"])
10in_msgs = op.map("get_k_v", kinp.oks, lambda msg: (msg.key, msg.value))
11
12
13def wrap_msg(k_v):
14 k, v = k_v
15 return KafkaSinkMessage(k, v)
16
17
18out_msgs = op.map("wrap_k_v", in_msgs, wrap_msg)
19kop.output("out1", out_msgs, brokers=["localhost:19092"], topic="out_topic")
Renaming#
We have renamed the IO classes to better match their semantics and the way they are talked about in the documentation. Their functionality are unchanged. You should be able to search and replace for the old names and have them work correctly.
Old |
New |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
We have also updated the names of the built-in connectors to match this new naming scheme.
Old |
New |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Current Time Convenience#
In addition to the name changes, we have also added a datetime
argument to
build{_part,}
with the current time to allow you to easily
setup a next awake time. You can ignore this parameter if you are not scheduling
awake times.
We have also added a datetime
argument to next_batch
, which contains the
scheduled awake time for that source. Since awake times are scheduled, but not
guaranteed to fire at the precise time specified, you can use this parameter
to account for any difference.
SimplePollingSource
moved#
SimplePollingSource
has been moved from
bytewax.connectors.periodic
to bytewax.inputs
. You’ll need
to change your imports if you are using that class.
Before:
1from bytewax.connectors.periodic import SimplePollingSource
After:
1from bytewax.inputs import SimplePollingSource
Window Metadata#
Window operators now emit
WindowMetadata
objects downstream.
These objects can be used to introspect the open_time and close_time
of windows. This changes the output type of windowing operators from a
stream of: (key, values)
to a stream of (key, (metadata, values))
.
Recovery flags#
The default values for snapshot-interval
and backup-interval
have been removed
when running a dataflow with recovery enabled.
Previously, the defaults values were to create a snapshot every 10 seconds and keep a day’s worth of old snapshots. This means your recovery DB would max out at a size on disk theoretically thousands of times bigger than your in-memory state.
See Recovery for how to appropriately pick these values for your deployment.
Batch -> Collect#
In v0.18, we’ve renamed the batch
operator to
collect
so as to not be confused with
runtime batching. Behavior is unchanged.
From v0.16 to v0.17#
Bytewax v0.17 introduces major changes to the way that recovery works in order to support rescaling. In v0.17, the number of workers in a cluster can now be changed by stopping the dataflow execution and specifying a different number of workers on resume.
In addition to rescaling, we’ve changed the Bytewax inputs and outputs API to support batching which has yielded significant performance improvements.
In this article, we’ll go over the updates we’ve made to our API and explain the changes you’ll need to make to your dataflows to upgrade to v0.17.
Recovery changes#
In v0.17, recovery has been changed to support rescaling the number of workers in a dataflow. You now pre-create a fixed number of SQLite recovery DB files before running the dataflow.
SQLite recovery DBs created with versions of Bytewax prior to v0.17 are not compatible with this release.
Creating recovery partitions#
Creating recovery stores has been moved to a separate step from running your dataflow.
Recovery partitions must be pre-initialized before running the dataflow initially. This is done by executing this module:
$ python -m bytewax.recovery db_dir/ 4
This will create a set of partitions:
$ ls db_dir/
part-0.sqlite3
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
Once the recovery partition files have been created, they must be placed in locations that are accessible to the workers. The cluster has a whole must have access to all partitions, but any given worker need not have access to any partition in particular (or any at all). It is ok if a given partition is accesible by multiple workers; only one worker will use it.
If you are not running in a cluster environment but on a single machine, placing all the partitions in a single local filesystem directory is fine.
Although the partition init script will not create these, partitions after execution may consist of multiple files:
$ ls db_dir/
part-0.sqlite3
part-0.sqlite3-shm
part-0.sqlite3-wal
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
part-3.sqlite3-shm
part-3.sqlite3-wal
You must remember to move the files with the prefix part-*.
all together.
Choosing the number of recovery partitions#
An important consideration when creating the initial number of recovery partitions; When rescaling the number of workers, the number of recovery partitions is currently fixed.
If you are scaling up the number of workers in your cluster to increase the total throughput of your dataflow, the work of writing recovery data to the recovery partitions will still be limited to the initial number of recovery partitions. If you will likely scale up your dataflow to accomodate increased demand, we recommend that that you consider creating more recovery partitions than you will initially need. Having multiple recovery partitions handled by a single worker is fine.
Epoch interval -> Snapshot interval#
We’ve renamed the cli option of epoch-interval
to
snapshot-interval
to better describe its affect on dataflow
execution. The snapshot interval is the system time duration (in
seconds) to snapshot state for recovery.
Recovering a dataflow can only happen on the boundaries of the most
recently completed snapshot across all workers, but be aware that
making the snapshot-interval
more frequent increases the amount of
recovery work and may impact performance.
Backup interval, and backing up recovery partitions.#
We’ve also introduced an additional parameter to running a dataflow:
backup-interval
.
When running a Dataflow with recovery enabled, it is recommended to back up your recovery partitions on a regular basis for disaster recovery.
The backup-interval
parameter is the length of time to wait before
“garbage collecting” older snapshots. This enables a dataflow to
successfully resume when backups of recovery partitions happen at
different times, which will be true in most distributed deployments.
This value should be set in excess of the interval you can guarantee that all recovery partitions will be backed up to account for transient failures. It defaults to one day.
If you attempt to resume from a set of recovery partitions for which the oldest and youngest backups are more than the backup interval apart, the resumed dataflow could have corrupted state.
Input and Output changes#
In v0.17, we have restructured input and output to support batching for increased throughput.
If you have created custom input connectors, you’ll need to update them to use the new API.
Input changes#
The list_parts
method has been updated to return a List[str]
instead of
a Set[str]
, and now should only reflect the available input partitions
that a given worker has access to. You no longer need to return the
complete set of partitions for all workers.
The next
method of StatefulSource
and StatelessSource
has been
changed to next_batch
and should to return a List
of elements each
time it is called. If there are no elements to return, this method
should return the empty list.
Next awake#
Input sources now have an optional next_awake
method which you can
use to schedule when the next next_batch
call should occur. You can
use this to “sleep” the input operator for a fixed amount of time
while you are waiting for more input.
The default behavior uses a simple heuristic to prevent a spin loop
when there is no input. Always use next_awake
rather than using a
time.sleep
in an input source.
See the periodic_input.py
example in the examples directory for an
implementation that uses this functionality.
Async input adapter#
We’ve included a new bytewax.inputs.batcher_async
to help you use
async Python libraries in Bytewax input sources. It lets you wrap an
async iterator and specify a maximum time and size to collect items
into a batch.
Using Kafka for recovery is now removed#
v0.17 removes the deprecated KafkaRecoveryConfig
as a recovery store
to support the ability to rescale the number of workers.
Support for additional platforms#
Bytewax is now available for linux/aarch64 and linux/armv7.
From 0.15 to 0.16#
Bytewax v0.16 introduces major changes to the way that custom Bytewax inputs are created, as well as improvements to windowing and execution. In this article, we’ll cover some of the updates we’ve made to our API to make upgrading to v0.16 easier.
Input changes#
In v0.16, we have restructured input to be based around partitions in
order to support rescaling stateful dataflows in the future. We have
also dropped the Config
suffix to our input classes. As an example,
KafkaInputConfig
has been renamed to KafkaInput
and has been moved
to bytewax.connectors.kafka.KafkaInput
.
ManualInputConfig
is now replaced by two base superclasses that can
be subclassed to create custom input sources. They are DynamicInput
and PartitionedInput
.
DynamicInput
#
DynamicInput
sources are input sources that support reading distinct
items from any number of workers concurrently.
DynamicInput
sources do not support resume state and thus generally
do not provide delivery guarantees better than at-most-once.
PartitionedInput
#
PartitionedInput
sources can keep internal state on the current
position of each partition. If a partition can “rewind” and resume
reading from a previous position, they can provide delivery guarantees
of at-least-once or better.
PartitionedInput
sources maintain the state of each source and
re-build that state during recovery.
Deprecating Kafka Recovery#
In order to better support rescaling Dataflows, the Kafka recovery store option is being deprecated and will be removed from a future release in favor of the SQLite recovery store.
Capture -> Output#
The capture
operator has been renamed to output
in v0.16 and is
now stateful, so requires a step ID:
In v0.15 and before:
1flow.capture(StdOutputConfig())
In v0.16+:
1from bytewax.dataflow import Dataflow
2from bytewax.connectors.stdio import StdOutput
3
4flow = Dataflow()
5flow.output("out", StdOutput())
ManualOutputConfig
is now replaced by two base superclasses that can
be subclassed to create custom output sinks. They are DynamicOutput
and PartitionedOutput
.
New entrypoint#
In Bytewax v0.16, the way that Dataflows are run has been simplified, and most execution functions have been removed.
Similar to other Python frameworks like Flask and FastAPI, Dataflows
can now be run using a Datfaflow import string in the
<module_name>:<dataflow_variable_name_or_factory_function>
format.
As an example, given a file named dataflow.py
with the following
contents:
1from bytewax.dataflow import Dataflow
2from bytewax.testing import TestingInput
3from bytewax.connectors.stdio import StdOutput
4
5flow = Dataflow()
6flow.input("in", TestingInput(range(3)))
7flow.output("out", StdOutput())
Since a Python file dataflow.py
defines a module dataflow
, the
Dataflow can be run with the following invocation:
> python -m bytewax.run dataflow
0
1
2
By default, Bytewax looks for a variable in the given module named
flow
, so we can eliminate the
<dataflow_variable_name_or_factory_function>
part of our import
string.
Processes, workers, recovery stores and other options can be
configured with command line flags or environment variables. For the
full list of options see the --help
command line flag:
> python -m bytewax.run --help
usage: python -m bytewax.run [-h] [-p PROCESSES] [-w WORKERS_PER_PROCESS] [-i PROCESS_ID] [-a ADDRESSES] [--sqlite-directory SQLITE_DIRECTORY] [--epoch-interval EPOCH_INTERVAL] import_str
Run a bytewax dataflow
positional arguments:
import_str Dataflow import string in the format <module_name>:<dataflow_variable_or_factory> Example: src.dataflow:flow or src.dataflow:get_flow('string_argument')
options:
-h, --help show this help message and exit
Scaling:
You should use either '-p' to spawn multiple processes on this same machine, or '-i/-a' to spawn a single process on different machines
-p PROCESSES, --processes PROCESSES
Number of separate processes to run [env: BYTEWAX_PROCESSES]
-w WORKERS_PER_PROCESS, --workers-per-process WORKERS_PER_PROCESS
Number of workers for each process [env: BYTEWAX_WORKERS_PER_PROCESS]
-i PROCESS_ID, --process-id PROCESS_ID
Process id [env: BYTEWAX_PROCESS_ID]
-a ADDRESSES, --addresses ADDRESSES
Addresses of other processes, separated by semicolumn: -a "localhost:2021;localhost:2022;localhost:2023" [env: BYTEWAX_ADDRESSES]
Recovery:
--sqlite-directory SQLITE_DIRECTORY
Passing this argument enables sqlite recovery in the specified folder [env: BYTEWAX_SQLITE_DIRECTORY]
--epoch-interval EPOCH_INTERVAL
Number of seconds between state snapshots [env: BYTEWAX_EPOCH_INTERVAL]
Porting a simple example from 0.15 to 0.16#
This is what a simple example looked like in 0.15
:
1import operator
2import re
3
4from datetime import timedelta, datetime
5
6from bytewax.dataflow import Dataflow
7from bytewax.inputs import ManualInputConfig
8from bytewax.outputs import StdOutputConfig
9from bytewax.execution import run_main
10from bytewax.window import SystemClockConfig, TumblingWindowConfig
11
12
13def input_builder(worker_index, worker_count, resume_state):
14 state = None # ignore recovery
15 for line in open("wordcount.txt"):
16 yield state, line
17
18
19def lower(line):
20 return line.lower()
21
22
23def tokenize(line):
24 return re.findall(r'[^\s!,.?":;0-9]+', line)
25
26
27def initial_count(word):
28 return word, 1
29
30
31def add(count1, count2):
32 return count1 + count2
33
34
35clock_config = SystemClockConfig()
36window_config = TumblingWindowConfig(length=timedelta(seconds=5))
37
38flow = Dataflow()
39flow.input("input", ManualInputConfig(input_builder))
40flow.map(lower)
41flow.flat_map(tokenize)
42flow.map(initial_count)
43flow.reduce_window("sum", clock_config, window_config, add)
44flow.capture(StdOutputConfig())
45
46run_main(flow)
To port the example to the 0.16
version we need to make a few
changes.
Imports#
Let’s start with the existing imports:
1from bytewax.inputs import ManualInputConfig
2from bytewax.outputs import StdOutputConfig
3from bytewax.execution import run_main
Becomes:
1from bytewax.connectors.files import FileInput
2from bytewax.connectors.stdio import StdOutput
We removed run_main
as it is now only used for unit testing
dataflows. Bytewax now has a built-in FileInput connector, which uses
the PartitionedInput
connector superclass.
Since we are using a built-in connector to read from this file, we can
delete our input_builder
function above.
Windowing#
Most of the operators from v0.15
are the same, but the start_at
parameter of windowing functions has been changed to align_to
. The
start_at
parameter incorrectly implied that there were no potential
windows before that time. What determines if an item is late for a
window is not the windowing definition, but the watermark of the
clock.
SlidingWindow
and TumblingWindow
now require the align_to
parameter. Previously, this was filled with the timestamp of the start
of the Dataflow, but must now be specified. Specifying this parameter
ensures that windows are consistent across Dataflow restarts, so make
sure that you don’t change this parameter between executions.
The old TumblingWindow
definition:
1clock_config = SystemClockConfig()
2window_config = TumblingWindowConfig(length=timedelta(seconds=5))
becomes:
1clock_config = SystemClockConfig()
2window_config = TumblingWindow(
3 length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
4)
Output and execution#
Similarly to the input
, the output configuration is now stateful.
capture
has been renamed to output
, and now takes a name, as all
stateful operators do.
So we move from this:
1flow.capture(StdOutputConfig())
To this:
1flow.output("out", StdOutput())
Complete code#
The complete code for the new simple example now looks like this:
1import operator
2import re
3
4from datetime import timedelta, datetime, timezone
5
6from bytewax.dataflow import Dataflow
7from bytewax.connectors.files import FileInput
8from bytewax.connectors.stdio import StdOutput
9from bytewax.window import SystemClockConfig, TumblingWindow
10
11
12def lower(line):
13 return line.lower()
14
15
16def tokenize(line):
17 return re.findall(r'[^\s!,.?":;0-9]+', line)
18
19
20def initial_count(word):
21 return word, 1
22
23
24def add(count1, count2):
25 return count1 + count2
26
27
28clock_config = SystemClockConfig()
29window_config = TumblingWindow(
30 length=timedelta(seconds=5), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
31)
32
33flow = Dataflow()
34flow.input("inp", FileInput("examples/sample_data/wordcount.txt"))
35flow.map(lower)
36flow.flat_map(tokenize)
37flow.map(initial_count)
38flow.reduce_window("sum", clock_config, window_config, add)
39flow.output("out", StdOutput())
Running our completed Dataflow looks like this, as we’ve named our
Dataflow variable flow
in a file named dataflow
:
> python -m bytewax.run dataflow:flow
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...
We can even run our sample Dataflow on multiple workers to process the file in parallel:
> python -m bytewax.run dataflow:flow -p2
('whether', 1)
("'tis", 1)
('of', 2)
('opposing', 1)
...
In the background, Bytewax has spawned two processes, each of which is processing a part of the file. To see this more clearly, you can start each worker by hand:
In one terminal, run:
> python -m bytewax.run dataflow:flow -i0 -a "localhost:2101;localhost:2102"
('not', 1)
('end', 1)
('opposing', 1)
('take', 1)
...
And in a second terminal, run:
> python -m bytewax.run dataflow:flow -i1 -a "localhost:2101;localhost:2102"
('question', 1)
('the', 3)
('troubles', 1)
('fortune', 1)
...
From 0.10 to 0.11#
Bytewax 0.11 introduces major changes to the way that Bytewax dataflows are structured, as well as improvements to recovery and windowing. This document outlines the major changes between Bytewax 0.10 and 0.11.
Input and epochs#
Bytewax is built on top of the Timely Dataflow framework. The idea of timestamps (which we refer to in Bytewax as epochs) is central to Timely Dataflow’s progress tracking mechanism.
Bytewax initially adopted an input model that included managing the epochs at which input was introduced. The 0.11 version of Bytewax removes the need to manage epochs directly.
Epochs continue to exist in Bytewax, but are now managed internally to
represent a unit of recovery. Bytewax dataflows that are configured
with recovery will shapshot their state after processing all items in
an epoch. In the event of recovery, Bytewax will resume a dataflow at
the last snapshotted state. The frequency of snapshotting can be
configured with an EpochConfig
Recoverable input#
Bytewax 0.11 will now allow you to recover the state of the input to your dataflow.
Manually constructed input functions, like those used with
ManualInputConfig
now take a third argument. If your dataflow is
interrupted, the third argument passed to your input function can be
used to reconstruct the state of your input at the last recovery
snapshot, provided you write your input logic accordingly. The
input_builder
function must return a tuple of (resume_state, datum).
Bytewax’s built-in input handlers, like KafkaInputConfig
are also
recoverable. KafkaInputConfig
will store information about consumer
offsets in the configured Bytewax recovery store. In the event of
recovery, KafkaInputConfig
will start reading from the offsets that
were last committed to the recovery store.
Stateful windowing#
Version 0.11 also introduces stateful windowing operators, including a
new fold_window
operator.
Previously, Bytewax included helper functions to manage windows in terms of epochs. Now that Bytewax manages epochs internally, windowing functions are now operators that appear as a processing step in a dataflow. Dataflows can now have more than one windowing step.
Bytewax’s stateful windowing operators are now built on top of its recovery system, and their operations can be recovered in the event of a failure. See the documentation on Recovery for more information.
Output configurations#
The 0.11 release of Bytewax adds some prepackaged output configuration options for common use-cases:
ManualOutputConfig
which calls a Python callback function for each
output item.
StdOutputConfig
which prints each output item to stdout.
Import path changes and removed entrypoints#
In Bytewax 0.11, the overall Python module structure has changed, and some execution entrypoints have been removed.
cluster_main
,spawn_cluster
, andrun_main
have moved tobytewax.execution
Dataflow
has moved tobytewax.dataflow
run
andrun_cluster
have been removed
Porting the Simple example from 0.10 to 0.11#
This is what the Simple example
looked like in 0.10
:
1import re
2
3from bytewax import Dataflow, run
4
5
6def file_input():
7 for line in open("wordcount.txt"):
8 yield 1, line
9
10
11def lower(line):
12 return line.lower()
13
14
15def tokenize(line):
16 return re.findall(r'[^\s!,.?":;0-9]+', line)
17
18
19def initial_count(word):
20 return word, 1
21
22
23def add(count1, count2):
24 return count1 + count2
25
26
27flow = Dataflow()
28flow.map(lower)
29flow.flat_map(tokenize)
30flow.map(initial_count)
31flow.reduce_epoch(add)
32flow.capture()
33
34
35for epoch, item in run(flow, file_input()):
36 print(item)
To port the example to the 0.11
version we need to make a few
changes.
Imports#
Let’s start with the existing imports:
1from bytewas import Dataflow, run
Becomes:
1from bytewax.dataflow import Dataflow
2from bytewax.execution import run_main
We moved from run
to run_main
as the execution API has been
simplified, and we can now just use the run_main
function to execute
our dataflow.
Input#
The way bytewax handles input changed with 0.11
. input
is now a
proper operator on the Dataflow, and the function now takes 3
parameters: worker_index
, worker_count
, resume_state
. This
allows us to distribute the input across workers, and to handle
recovery if we want to. We are not going to do that in this example,
so the change is minimal.
The input function goes from:
1def file_input():
2 for line in open("wordcount.txt"):
3 yield 1, line
to:
1def input_builder(worker_index, worker_count, resume_state):
2 state = None # ignore recovery
3 for line in open("wordcount.txt"):
4 yield state, line
So instead of manually yielding the epoch
in the input function, we
can either ignore it (passing None
as state), or handle the value to
implement recovery (see our docs on Recovery).
Then we need to wrap the input_builder
with ManualInputConfig
,
give it a name (“file_input” here) and pass it to the input
operator
(rather than the run
function):
1from bytewax.inputs import ManualInputConfig
2
3
4flow.input("file_input", ManualInputConfig(input_builder))
Operators#
Most of the operators are the same, but there is a notable change in
the flow: where we used reduce_epoch
we are now using
reduce_window
. Since the epochs concept is now considered an
internal detail in bytewax, we need to define a way to let the
reduce
operator know when to close a specific window. Previously
this was done everytime the epoch
changed, while now it can be
configured with a time window. We need two config objects to do this:
clock_config
window_config
The clock_config
is used to tell the window-based operators what
reference clock to use, here we use the SystemClockConfig
that just
uses the system’s clock. The window_config
is used to define the
time window we want to use. Here we’ll use the TumblingWindow
that
allows us to have tumbling windows defined by a length (timedelta
),
and we configure it to have windows of 5 seconds each.
So the old reduce_epoch
:
1flow.reduce_epoch(add)
becomes reduce_window
:
1from bytewax.window import SystemClockConfig, TumblingWindow
2
3
4clock_config = SystemClockConfig()
5window_config = TumblingWindow(length=timedelta(seconds=5))
6flow.reduce_window("sum", clock_config, window_config, add)
Output and execution#
Similarly to the input
, the output configuration is now part of an
operator, capture
. Rather than collecting the output in a python
iterator and then manually printing it, we can now configure the
capture
operator to print to standard output.
Since all the input and output handling is now defined inside the Dataflow, we don’t need to pass this information to the execution method.
So we move from this:
1flow.capture()
2
3for epoch, item in run(flow, file_input()):
4 print(item)
To this:
1from bytewax.outputs import StdOutputConfig
2
3
4flow.capture(StdOutputConfig())
5
6run_main(flow)
Complete code#
The complete code for the new simple example now looks like this:
1import operator
2import re
3
4from datetime import timedelta, datetime
5
6from bytewax.dataflow import Dataflow
7from bytewax.inputs import ManualInputConfig
8from bytewax.outputs import StdOutputConfig
9from bytewax.execution import run_main
10from bytewax.window import SystemClockConfig, TumblingWindow
11
12
13def input_builder(worker_index, worker_count, resume_state):
14 state = None # ignore recovery
15 for line in open("wordcount.txt"):
16 yield state, line
17
18
19def lower(line):
20 return line.lower()
21
22
23def tokenize(line):
24 return re.findall(r'[^\s!,.?":;0-9]+', line)
25
26
27def initial_count(word):
28 return word, 1
29
30
31def add(count1, count2):
32 return count1 + count2
33
34
35clock_config = SystemClockConfig()
36window_config = TumblingWindow(length=timedelta(seconds=5))
37
38flow = Dataflow()
39flow.input("input", ManualInputConfig(input_builder))
40flow.map(lower)
41flow.flat_map(tokenize)
42flow.map(initial_count)
43flow.reduce_window("sum", clock_config, window_config, add)
44flow.capture(StdOutputConfig())
45
46run_main(flow)