Recovery Example#
Bytewax allows you to recover a stateful dataflow; it will let you resume processing and output due to a failure without re-processing all initial data to re-calculate all internal state. It does this by periodically snapshotting all internal state and having a way to resume from a recent snapshot.
Here, we’ll walk through a tutorial demonstrating recovery. For more
advanced settings and important details about recovery, see the
concepts section article recovery
.
Create Recovery Partitions#
Recovery partitions must be pre-initialized before running the
dataflow. This is done by executing the bytewax.recovery
module:
> python -m bytewax.recovery db_dir/ 1
This will create a recovery partition in the db_dir/
directory:
$ ls db_dir/
part-0.sqlite3
Executing with Recovery#
Let’s create an example dataflow that we can use to demonstrate
recovery. We’re going to use the
stateful_map
operator to keep a running
sum of the numbers we receive as input.
stateful_map
is, as the name implies, a
stateful operator. stateful_map
takes
four parameters: a step_id
, a Stream
of
input data, a builder function, and a mapper function.
The mapper function should return a 2-tuple of (updated_state, emit_value)
. The updated_state
that is returned from this function
is the internal state of this operator, and will be used for recovery.
The emit_value
will be passed downstream.
Let’s see a concrete example. Add the following code in a new file
named recovery.py
:
1import bytewax.operators as op
2
3from bytewax.dataflow import Dataflow
4from bytewax.testing import TestingSource
5from bytewax.connectors.stdio import StdOutSink
6
7inp = [0, 1, 2]
8
9flow = Dataflow("recovery")
10input_stream = op.input("inp", flow, TestingSource(inp))
11# Stateful operators require their input to be keyed
12# We'll use the static key "ALL" so that all input is
13# processed together.
14keyed_stream = op.key_on("key_stream", input_stream, lambda _: "ALL")
15
16
17def update_sum(current_sum, new_item):
18 if current_sum is None:
19 current_sum = 0
20
21 updated_sum = current_sum + new_item
22 return updated_sum, updated_sum
23
24
25total_sum_stream = op.stateful_map("running_count", keyed_stream, update_sum)
26op.output("out", total_sum_stream, StdOutSink())
To enable recovery when you execute a dataflow, pass the -r
flag to
bytewax.run
and specify the recovery directory. We will also
need to set two values for recovery, the snapshot_interval
via the
-s
flag and the backup_interval
via the -b
flag.
The snapshot_interval
specifies the amount of time in seconds to wait
before creating a snapshot. The backup_interval
specifies the amount
of time in seconds to keep older state snapshots around.
For production dataflows, you should set these values carefully for
each dataflow to match your operational needs. For more information,
please see the concept section on recovery
.
Running the example above, you should see the following output:
> python -m bytewax.run recovery -r db_dir/ -s 30 -b 0
('ALL', 0)
('ALL', 1)
('ALL', 3)
Our dataflow stopped when it reached the end of our testing input, but importantly, Bytewax has saved a snapshot of the state of the dataflow before exiting.
Resuming#
If a dataflow aborts, abruptly shuts down, or gracefully exits due to reaching the end of input, you can resume the dataflow by running it again with the files stored in the recovery directory.
Before we re-run our dataflow, let’s change our input data to add some new values:
1inp = [0, 1, 2, 3, 4]
Now we can re-run our dataflow:
> python -m bytewax.run recovery -r db_dir/ -s 30 -b 0
('ALL', 6)
('ALL', 10)
You can see that the dataflow has restored the state of the
stateful_map
operator from our previous
run, started reading input from where it stopped, and then applied our
new data to that restored state.