Recovery#
Bytewax allows you to recover a stateful dataflow; it will let you resume processing and output due to a failure without re-processing all initial data to re-calculate all internal state. It does this by periodically snapshotting all internal state and having a way to resume from a recent snapshot.
See python -m bytewax.recovery --help
for an overview of
initializing recovery partitions.
Overview#
Bytewax implements recovery by periodically snapshoting state and progress information for a single dataflow instance in a partitioned set of recovery partitions, SQLite databases in the recovery directory. Recovery data for multiple dataflows must not be mixed together.
When you run your dataflow it will start backing up recovery data automatically. Each run of a dataflow cluster is called an execution.
If the dataflow fails, first you must fix whatever underlying fault caused the issue. That might mean deploying new code which fixes a bug, re-creating destroyed VMs, or resolving an issue with a connected system.
Once that is done, re-run the dataflow using the same recovery directory. Bytewax will automatically read the progress of the previous dataflow execution and determine the most recent coordinated snapshot to resume processing from.
Caveats#
Because snapshotting only happens periodically, it is possible that your output systems will see duplicate data around resume with some input and output connectors. See documentation for each connector for how it is designed and what kinds of guarantees it can enable. In general, design your systems to be idempotent or support at-least-once processing.
Setup#
Recovery partitions must be pre-initialized before running the dataflow initially. First, create the directory to hold the partitions.
$ mkdir db_dir
Then create the partitions by executing this module:
$ python -m bytewax.recovery db_dir/ 4
The second parameter (e.g. 4
) is the number of recovery partitions
to create. This number is fixed and cannot be changed later without
manual SQLite interventions. In general, we recommend picking a number
of partitions equal to the maximum number of workers you think you
will ever rescale to.
This will create a set of partitions:
$ ls db_dir/
part-0.sqlite3
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
Once the recovery partition files have been created, they must be placed in locations that are accessible to the workers. The cluster has a whole must have access to all partitions, but any given worker need not have access to any partition in particular (or any at all). It is ok if a given partition is accesible by multiple workers; only one worker will use it.
Although the partition init script will not create these, partitions after execution may consist of multiple files:
$ ls db_dir/
part-0.sqlite3
part-0.sqlite3-shm
part-0.sqlite3-wal
part-1.sqlite3
part-2.sqlite3
part-3.sqlite3
part-3.sqlite3-shm
part-3.sqlite3-wal
You must move the files with the prefix part-*.
all together.
In general, you’ll want to your environment to try and evenly distribute partitions among your workers.
If you are not running in a cluster environment but on a single machine, placing all the partitions in a single local filesystem directory is fine.
Execution#
To enable recovery when you execute a dataflow, pass the -r
flag to
bytewax.run
and specify the recovery directory.
$ python -m bytewax.run ... -r db_dir/
As the dataflow executes, it now will automatically back up state snapshot and progress data.
See the bytewax.run
for more information.
Resume#
If a dataflow aborts, abruptly shuts down, or gracefully exits due to EOF, you can resume the dataflow via running it again pointing at the same recovery directory. Bytewax will automatically find the most recent consistent snapshot to resume from.
This requires that the workers in the next execution collectively have access to all of the recovery partitions.
This next execution does not need to have the same number of workers as the previous one; you are allowed to rescale the cluster and state will find its way to the proper workers.
If you want to fully restart a dataflow at the beginning of input and ignore all previous state, delete partitions in the recovery directory.
Continuation#
Another use of the recovery system is to allow a dataflow to be continued in a followup execution with new data. For example, you might be processing a large log file, run a dataflow on it, and calculate some metrics. You could then append to that log file, resume the dataflow and it would be processed without needing to re-read and re-process the initial part of the log file.
Bytewax snapshots the dataflow at the end of all input to support this use case. You’ll need to read the specific documentation for each connector to see what kind of resume semantics it has.
Snapshotting#
The snapshot interval is the system time interval at which an
execution cluster synchronizes and snapshots its progress and state.
You can adjust this duration via the -s
parameter to
bytewax.run
.
The dataflow can only resume on snapshot interval boundaries.
In general, the longer this duration is, the less overhead there will be while the dataflow is executing, but the further back the dataflow might have to resume from in case of failure.
Backup and Disaster Recovery#
Usually in a production environment, you’ll want to durably back up your recovery partitions from the machines that are executing the dataflow in case they are destroyed. This can be done through a variety of mechanisms, like a cron job that runs a backup command or litestream.
The recovery system also tries to be efficient and does not want recovery data to grow without bound. It will garbage collect snapshot data that is no longer necessary to support resuming from an epoch far in the past.
Bytewax can only resume a dataflow when it is able to find a consistent snapshot which exists across all recovery partitions. If you need to re-constitute a cluster from these backups, there is then the problem that unless the backups are all from an identical point in time, there might not be an overlapping snapshot among them.
The backup interval is the system time interval for which snapshot
data should be retained longer than when it would be otherwise garbage
collected. This generally should be set slightly longer than your
backup latency. This gives a larger window for independent backup
processes for each partition to complete and still enable a successful
recovery. You can adjust this duration via the -b
parameter to
bytewax.run
.