bytewax.testing#

Helper tools for testing dataflows.

Classes#

class TestingSink(ls: List[X])#
Bases:

Append each output item to a list.

You only want to use this for unit testing.

Can support at-least-once processing. The list is not cleared between executions.

Initialization

Init.

Parameters:

ls – List to append to.

build(
step_id: str,
worker_index: int,
worker_count: int,
) bytewax.testing._ListSinkPartition[X]#
class TestingSource(
ib: Iterable[Union[X, EOF, ABORT]],
batch_size: int = 1,
)#
Bases:

Produce input from a Python iterable.

You only want to use this for unit testing.

The iterable must be identical on all workers.

There is no parallelism; only one worker will actually consume the iterable.

Be careful using a generator as the iterable; if you fail and attempt to resume the dataflow without rebuilding it, the half-consumed generator will be re-used on recovery and early input will be lost so resume will see the correct data.

Initialization

Init.

Parameters:
  • ib – Iterable for input.

  • batch_size – Number of items from the iterable to emit in each batch. Defaults to 1.

class EOF#

Signal the input to EOF.

The next execution will continue from the item after this.

class ABORT#

Abort the execution when the input processes this item.

The next execution will resume from some item befor this one.

Each abort will only trigger once. They’ll be skipped on resume executions.

You cannot use this in multi-worker executions because the other workers don’t know when to stop.

list_parts()#
build_part(
step_id: str,
for_part: str,
resume_state: Optional[int],
) bytewax.testing._IterSourcePartition[X]#
class TimeTestingGetter#

Wrapper to provide a modifyable system clock for unit tests.

now: datetime#
advance(td: timedelta) None#

Advance the current time.

Parameters:

td – By this amount.

get() datetime#

Return the “current time”.

Use this if you need a getter.

Returns:

The “current time”.

Functions#

ffwd_iter(it: Iterator[Any], n: int) None#

Skip an iterator forward some number of items.

Parameters:
  • it – A stateful iterator to advance.

  • n – Number of items to skip from the current position.

poll_next_batch(part, timeout=timedelta(seconds=5))#

Repeatedly poll a partition until it returns a batch.

You’ll want to use this in unit tests of partitions when there’s some non-determinism in how items are read.

This is a busy-loop.

Parameters:
  • part – To call next_batch on.

  • timeout – How long to continuously poll for.

Returns:

The next batch found.

Raises:

TimeoutError – If no batch was returned within the timeout.

run_main(flow, *, epoch_interval=None, recovery_config=None)#

Execute a dataflow in the current thread.

Blocks until execution is complete.

This is only used for unit testing. See bytewax.run.

from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource, run_main
from bytewax.connectors.stdio import StdOutSink
flow = Dataflow("my_df")
s = op.input("inp", flow, TestingSource(range(3)))
op.output("out", s, StdOutSink())

run_main(flow)
0
1
2
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

cluster_main(
flow,
addresses,
proc_id,
*,
epoch_interval=None,
recovery_config=None,
worker_count_per_proc=1,
)#

Execute a dataflow in the current process as part of a cluster.

This is only used for unit testing. See bytewax.run.

Blocks until execution is complete.

from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource, cluster_main
from bytewax.connectors.stdio import StdOutSink

flow = Dataflow("my_df")
s = op.input("inp", flow, TestingSource(range(3)))
op.output("out", s, StdOutSink())

# In a real example, use "host:port" of all other workers.
addresses = []
proc_id = 0
cluster_main(flow, addresses, proc_id)
0
1
2
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).

  • proc_id (int) – Index of this process in cluster; starts from 0.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

  • worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to 1.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now