bytewax.inputs#

Low-level input interfaces and input helpers.

If you want pre-built connectors for various external systems, see bytewax.connectors.

Data#

S: TypeVar#

Type of state snapshots.

Sn: TypeVar#

Type of state snapshots but defaults to None.

X: TypeVar#

Type emitted by a Source.

Classes#

class DynamicSource#
Bases:

An input source where all workers can read distinct items.

Does not support storing any resume state. Thus these kind of sources only naively can support at-most-once processing.

The source must somehow support supplying disjoint data for each worker. If you re-read the same items on multiple workers, the dataflow will process these as duplicate items.

abstract build(
step_id: str,
worker_index: int,
worker_count: int,
) StatelessSourcePartition[X]#

Build an input source for a worker.

Will be called once on each worker.

Parameters:
  • step_id – The step_id of the input operator.

  • worker_index – Index of this worker. Workers are zero-indexed.

  • worker_count – Total number of workers.

Returns:

The built partition.

class FixedPartitionedSource#
Bases:

An input source with a fixed number of independent partitions.

Will maintain the state of each source and re-build using it during resume. If the source supports seeking, this input can support exactly-once processing.

Each partition must contain unique data. If you re-read the same data in multiple partitions, the dataflow will process these duplicate items.

abstract list_parts() List[str]#

List all local partitions this worker has access to.

You do not need to list all partitions globally.

Returns:

Local partition keys.

abstract build_part(
step_id: str,
for_part: str,
resume_state: Optional[S],
) StatefulSourcePartition[X, S]#

Build anew or resume an input partition.

Will be called once per execution for each partition key on a worker that reported that partition was local in list_parts.

Do not pre-build state about a partition in the constructor. All state must be derived from resume_state for recovery to work properly.

Parameters:
  • step_id – The step_id of the input operator.

  • for_part – Which partition to build. Will always be one of the keys returned by list_parts on this worker.

  • resume_state – State data containing where in the input stream this partition should be begin reading during this execution.

Returns:

The built partition.

class SimplePollingSource(
interval: timedelta,
align_to: Optional[datetime] = None,
)#
Bases:

Calls a user defined function at a regular interval.

from bytewax.inputs import SimplePollingSource

class URLSource(SimplePollingSource):
    def __init__(self):
        super().__init__(interval=timedelta(seconds=10))

    def next_item(self):
        res = requests.get("https://example.com")
        if not res.ok:
            raise SimplePollingSource.Retry(timedelta(seconds=1))
        return res.text

There is no parallelism; only one worker will poll this source.

Supports maintaining the state of the source and resuming. If the source supports seeking, this input can support exactly-once processing. Override snapshot and resume.

This is best for low-throughput polling on the order of seconds to hours.

If you need a high-throughput source, or custom retry or timing, avoid this. Instead create a source using one of the other Source subclasses where you can have increased parallelism, batching, and finer control over timing.

Initialization

Init.

Parameters:
  • interval – The interval between calling next_item.

  • align_to – Align awake times to the given datetime. Defaults to now.

exception Retry#
Bases:

Raise this to try to get items before the usual interval.

Parameters:

timeout – How long to wait before calling SimplePollingSource.next_item again.

Initialization

Initialize self. See help(type(self)) for accurate signature.

timeout: timedelta#
list_parts() List[str]#
build_part(
_step_id: str,
for_part: str,
resume_state: Optional[Sn],
) bytewax.inputs._SimplePollingPartition[X, Sn]#
abstract next_item() X#

Override with custom logic to poll your source.

Returns:

Next item to emit into the dataflow. If None, no item is emitted.

Raises:

Retry – Raise if you can’t fetch items and would like to call this function sooner than the usual interval.

snapshot() Sn#

Snapshot the position of the next read of this source.

This will be returned to you via the resume_state parameter of resume.

Be careful of “off by one” errors in resume state. This should return a state that, after being passed to resume, resumes reading after the last read item, not any of the same item that next_item last returned.

Returns:

Resume state.

resume(resume_state: Sn) None#

Reset the position of the next read of this source.

This will be called once before next_item if this execution is a resume.

Parameters:

resume_state

class Source#
Bases:

A location to read input items from.

Base class for all input sources. Do not subclass this.

If you want to implement a custom connector, instead subclass one of the specific source sub-types below in this module.

class StatefulSourcePartition#
Bases:

Input partition that maintains state of its position.

abstract next_batch() Iterable[X]#

Attempt to get the next batch of input items.

This must participate in a kind of cooperative multi-tasking, never blocking but returning an empty list if there are no items to emit yet.

Returns:

Items immediately ready. May be empty if no new items.

Raises:

StopIteration – When the source is complete.

next_awake() Optional[datetime]#

When to next attempt to get input items.

next_batch will not be called until the most recently returned time has past.

This will be called upon initialization of the source and after next_batch, but also possibly at other times. Multiple times are not stored; you must return the next awake time on every call, if any.

If this returns None, next_batch will be called immediately unless the previous batch had no items, in which case there is a 1 millisecond delay.

Use this instead of time.sleep in next_batch.

Returns:

Next awake time or None to indicate automatic behavior.

abstract snapshot() S#

Snapshot the position of the next read of this partition.

This will be returned to you via the resume_state parameter of your input builder.

Be careful of “off by one” errors in resume state. This should return a state that, when built into a partition, resumes reading after the last read item, not any of the same items that next_batch last returned.

This is guaranteed to never be called after close.

Returns:

Resume state.

close() None#

Cleanup this partition when the dataflow completes.

This is not guaranteed to be called. It will only be called when the dataflow finishes on finite input. It will not be called during an abrupt or abort shutdown.

class StatelessSourcePartition#
Bases:

Input partition that is stateless.

abstract next_batch() Iterable[X]#

Attempt to get the next batch of input items.

This must participate in a kind of cooperative multi-tasking, never blocking but yielding an empty list if there are no new items yet.

Returns:

Items immediately ready. May be empty if no new items.

Raises:

StopIteration – When the source is complete.

next_awake() Optional[datetime]#

When to next attempt to get input items.

next_batch will not be called until the most recently returned time has past.

This will be called upon initialization of the source and after next_batch, but also possibly at other times. Multiple times are not stored; you must return the next awake time on every call, if any.

If this returns None, next_batch will be called immediately unless the previous batch had no items, in which case there is a 1 millisecond delay.

Use this instead of time.sleep in next_batch.

Returns:

Next awake time or None to indicate automatic behavior.

close() None#

Cleanup this partition when the dataflow completes.

This is not guaranteed to be called. It will only be called when the dataflow finishes on finite input. It will not be called during an abrupt or abort shutdown.

Functions#

batch(
ib: Iterable[X],
batch_size: int,
) Iterator[List[X]]#

Batch an iterable.

Use this to easily generate batches of items for a partition’s next_batch method.

Parameters:
  • ib – The underlying source iterable of items.

  • batch_size – Maximum number of items to yield in a batch.

Yields:

The next gathered batch of items.

batch_async(
aib: AsyncIterable[X],
timeout: timedelta,
batch_size: int,
loop=None,
) Iterator[List[X]]#

Batch an async iterable synchronously up to a timeout.

This allows using an async iterator as an input source. The next_batch method on a partition must never block, this allows running an async iterator up to a timeout so that you correctly cooperatively multitask with the rest of the dataflow.

Parameters:
  • aib – The underlying source async iterable of items.

  • timeout – Duration of time to repeatedly poll the source async iterator for items.

  • batch_size – Maximum number of items to yield in a batch, even if the timeout has not been hit.

  • loop – Custom asyncio run loop to use, if any.

Yields:

The next gathered batch of items.

This function will take up to timeout time to yield, or will return a list with length up to max_len.

batch_getter(
getter: Callable[[], X],
batch_size: int,
yield_on: Optional[X] = None,
) Iterator[List[X]]#

Batch from a getter function that might not return an item.

Use this to easily generate batches of items for a partition’s next_batch method.

Parameters:
  • getter – Function to call to get the next item. Should raise StopIteration on EOF.

  • batch_size – Maximum number of items to yield in a batch.

  • yield_on – Sentinel value that indicates that there are no more items yet, and to return the current batch. Defaults to None.

Yields:

The next gathered batch of items.

batch_getter_ex(
getter: Callable[[], X],
batch_size: int,
yield_ex: Type[Exception] = queue.Empty,
) Iterator[List[X]]#

Batch from a getter function that raises on no items yet.

Use this to easily generate batches of items for a partitions’s next_batch method.

Parameters:
  • getter – Function to call to get the next item. Should raise StopIteration on EOF.

  • batch_size – Maximum number of items to return in a batch.

  • yield_ex – Exception raised by getter that indicates that there are no more items yet, and to return the current batch. Defaults to queue.Empty.

Yields:

The next gathered batch of items.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now