bytewax.testing
#
Helper tools for testing dataflows.
Classes#
- class TestingSink(ls: List[X])#
- Bases:
Append each output item to a list.
You only want to use this for unit testing.
Can support at-least-once processing. The list is not cleared between executions.
Initialization
Init.
- Parameters:
ls – List to append to.
- class TestingSource( )#
- Bases:
Produce input from a Python iterable.
You only want to use this for unit testing.
The iterable must be identical on all workers.
There is no parallelism; only one worker will actually consume the iterable.
Be careful using a generator as the iterable; if you fail and attempt to resume the dataflow without rebuilding it, the half-consumed generator will be re-used on recovery and early input will be lost so resume will see the correct data.
Initialization
Init.
- Parameters:
ib – Iterable for input.
batch_size – Number of items from the iterable to emit in each batch. Defaults to 1.
- class EOF#
Signal the input to EOF.
The next execution will continue from the item after this.
- class ABORT#
Abort the execution when the input processes this item.
The next execution will resume from some item befor this one.
Each abort will only trigger once. They’ll be skipped on resume executions.
You cannot use this in multi-worker executions because the other workers don’t know when to stop.
- list_parts()#
Functions#
- ffwd_iter(it: Iterator[Any], n: int) None #
Skip an iterator forward some number of items.
- Parameters:
it – A stateful iterator to advance.
n – Number of items to skip from the current position.
- poll_next_batch(part, timeout=timedelta(seconds=5))#
Repeatedly poll a partition until it returns a batch.
You’ll want to use this in unit tests of partitions when there’s some non-determinism in how items are read.
This is a busy-loop.
- Parameters:
part – To call
next_batch
on.timeout – How long to continuously poll for.
- Returns:
The next batch found.
- Raises:
TimeoutError – If no batch was returned within the timeout.
- run_main(flow, *, epoch_interval=None, recovery_config=None)#
Execute a dataflow in the current thread.
Blocks until execution is complete.
This is only used for unit testing. See
bytewax.run
.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.connectors.stdio import StdOutSink flow = Dataflow("my_df") s = op.input("inp", flow, TestingSource(range(3))) op.output("out", s, StdOutSink()) run_main(flow)
0 1 2
- Parameters:
flow (Dataflow) – Dataflow to run.
epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.
recovery_config (Optional[RecoveryConfig]) – State recovery config. If
None
, state will not be persisted.
- cluster_main(
- flow,
- addresses,
- proc_id,
- *,
- epoch_interval=None,
- recovery_config=None,
- worker_count_per_proc=1,
Execute a dataflow in the current process as part of a cluster.
This is only used for unit testing. See
bytewax.run
.Blocks until execution is complete.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource, cluster_main from bytewax.connectors.stdio import StdOutSink flow = Dataflow("my_df") s = op.input("inp", flow, TestingSource(range(3))) op.output("out", s, StdOutSink()) # In a real example, use "host:port" of all other workers. addresses = [] proc_id = 0 cluster_main(flow, addresses, proc_id)
0 1 2
- Parameters:
flow (Dataflow) – Dataflow to run.
addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).
proc_id (int) – Index of this process in cluster; starts from 0.
epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.
recovery_config (Optional[RecoveryConfig]) – State recovery config. If
None
, state will not be persisted.worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to
1
.