bytewax.testing#

Helper tools for testing dataflows.

Classes#

class TestingSink(ls: List[X])#
Bases:

Append each output item to a list.

You only want to use this for unit testing.

Can support at-least-once processing. The list is not cleared between executions.

Initialization

Init.

Parameters:

ls – List to append to.

build(
step_id: str,
worker_index: int,
worker_count: int,
) bytewax.testing._ListSinkPartition[X]#
class TestingSource(
ib: Iterable[Union[X, EOF, ABORT]],
batch_size: int = 1,
)#
Bases:

Produce input from a Python iterable.

You only want to use this for unit testing.

The iterable must be identical on all workers.

There is no parallelism; only one worker will actually consume the iterable.

Be careful using a generator as the iterable; if you fail and attempt to resume the dataflow without rebuilding it, the half-consumed generator will be re-used on recovery and early input will be lost so resume will see the correct data.

Initialization

Init.

Parameters:
  • ib – Iterable for input.

  • batch_size – Number of items from the iterable to emit in each batch. Defaults to 1.

class EOF#

Signal the input to EOF.

The next execution will continue from the item after this.

class ABORT#

Abort the execution when the input processes this item.

The next execution will resume from some item befor this one.

Each abort will only trigger once. They’ll be skipped on resume executions.

You cannot use this in multi-worker executions because the other workers don’t know when to stop.

list_parts()#
build_part(
step_id: str,
for_part: str,
resume_state: Optional[int],
) bytewax.testing._IterSourcePartition[X]#

Functions#

cluster_main(
flow,
addresses,
proc_id,
*,
epoch_interval=None,
recovery_config=None,
worker_count_per_proc=1,
)#

Execute a dataflow in the current process as part of a cluster.

This is only used for unit testing. See bytewax.run.

Blocks until execution is complete.

 1>>> from bytewax.dataflow import Dataflow
 2>>> import bytewax.operators as op
 3>>> from bytewax.testing import TestingSource, cluster_main
 4>>> from bytewax.connectors.stdio import StdOutSink
 5>>> flow = Dataflow("my_df")
 6>>> s = op.input("inp", flow, TestingSource(range(3)))
 7>>> op.output("out", s, StdOutSink())
 8>>> # In a real example, use "host:port" of all other workers.
 9>>> addresses = []
10>>> proc_id = 0
11>>> cluster_main(flow, addresses, proc_id)
120
131
142
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).

  • proc_id (int) – Index of this process in cluster; starts from 0.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

  • worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to 1.

ffwd_iter(it: Iterator[Any], n: int) None#

Skip an iterator forward some number of items.

Parameters:
  • it – A stateful iterator to advance.

  • n – Number of items to skip from the current position.

poll_next_batch(part, timeout=timedelta(seconds=5))#

Repeatedly poll a partition until it returns a batch.

You’ll want to use this in unit tests of partitions when there’s some non-determinism in how items are read.

This is a busy-loop.

Parameters:
  • part – To call next_batch on.

  • timeout – How long to continuously poll for.

Returns:

The next batch found.

Raises:

TimeoutError – If no batch was returned within the timeout.

run_main(flow, *, epoch_interval=None, recovery_config=None)#

Execute a dataflow in the current thread.

Blocks until execution is complete.

This is only used for unit testing. See bytewax.run.

 1>>> from bytewax.dataflow import Dataflow
 2>>> import bytewax.operators as op
 3>>> from bytewax.testing import TestingSource, run_main
 4>>> from bytewax.connectors.stdio import StdOutSink
 5>>> flow = Dataflow("my_df")
 6>>> s = op.input("inp", flow, TestingSource(range(3)))
 7>>> op.output("out", s, StdOutSink())
 8>>> run_main(flow)
 90
101
112
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now