Recovery Example#

Bytewax allows you to recover a stateful dataflow; it will let you resume processing and output due to a failure without re-processing all initial data to re-calculate all internal state. It does this by periodically snapshotting all internal state and having a way to resume from a recent snapshot.

Here, we’ll walk through a tutorial demonstrating recovery. For more advanced settings and important details about recovery, see the concepts section article recovery.

Create Recovery Partitions#

Recovery partitions must be pre-initialized before running the dataflow. This is done by executing the bytewax.recovery module:

$ python -m bytewax.recovery db_dir/ 1

This will create a recovery partition in the db_dir/ directory:

$ ls db_dir/
part-0.sqlite3

Executing with Recovery#

Let’s create an example dataflow that we can use to demonstrate recovery. We’re going to use the stateful_map operator to keep a running sum of the numbers we receive as input.

stateful_map is, as the name implies, a stateful operator. stateful_map takes four parameters: a step_id, a Stream of input data, a builder function, and a mapper function.

The mapper function should return a 2-tuple of (updated_state, emit_value). The updated_state that is returned from this function is the internal state of this operator, and will be used for recovery. The emit_value will be passed downstream.

Let’s see a concrete example. Add the following code in a new file named recovery.py:

import bytewax.operators as op

from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
from bytewax.connectors.stdio import StdOutSink

inp = [0, 1, 2]

flow = Dataflow("recovery")
input_stream = op.input("inp", flow, TestingSource(inp))
# Stateful operators require their input to be keyed
# We'll use the static key "ALL" so that all input is
# processed together.
keyed_stream = op.key_on("key_stream", input_stream, lambda _: "ALL")


def update_sum(current_sum, new_item):
    if current_sum is None:
        current_sum = 0

    updated_sum = current_sum + new_item
    return updated_sum, updated_sum


total_sum_stream = op.stateful_map("running_count", keyed_stream, update_sum)
op.output("out", total_sum_stream, StdOutSink())

To enable recovery when you execute a dataflow, pass the -r flag to bytewax.run and specify the recovery directory. We will also need to set two values for recovery, the snapshot_interval via the -s flag and the backup_interval via the -b flag.

The snapshot_interval specifies the amount of time in seconds to wait before creating a snapshot. The backup_interval specifies the amount of time in seconds to keep older state snapshots around.

For production dataflows, you should set these values carefully for each dataflow to match your operational needs. For more information, please see the concept section on recovery.

Running the example above, you should see the following output:

$ python -m bytewax.run recovery -r db_dir/ -s 30 -b 0
('ALL', 0)
('ALL', 1)
('ALL', 3)

Our dataflow stopped when it reached the end of our testing input, but importantly, Bytewax has saved a snapshot of the state of the dataflow before exiting.

Resuming#

If a dataflow aborts, abruptly shuts down, or gracefully exits due to reaching the end of input, you can resume the dataflow by running it again with the files stored in the recovery directory.

Before we re-run our dataflow, let’s change our input data to add some new values:

inp.extend([3, 4])

Now we can re-run our dataflow:

$ python -m bytewax.run recovery -r db_dir/ -s 30 -b 0
('ALL', 6)
('ALL', 10)

You can see that the dataflow has restored the state of the stateful_map operator from our previous run, started reading input from where it stopped, and then applied our new data to that restored state.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now