bytewax.inputs#
Low-level input interfaces and input helpers.
If you want pre-built connectors for various external systems, see
bytewax.connectors.
Data#
Classes#
- class DynamicSource#
-
An input source where all workers can read distinct items.
Does not support storing any resume state. Thus these kind of sources only naively can support at-most-once processing.
The source must somehow support supplying disjoint data for each worker. If you re-read the same items on multiple workers, the dataflow will process these as duplicate items.
- abstract build( ) StatelessSourcePartition[X]#
Build an input source for a worker.
Will be called once on each worker.
Args: now: The current time.
worker_index: Index of this worker. Workers are zero-indexed. worker_count: Total number of workers.
Returns: The built partition.
- class FixedPartitionedSource#
-
An input source with a fixed number of independent partitions.
Will maintain the state of each source and re-build using it during resume. If the source supports seeking, this input can support exactly-once processing.
Each partition must contain unique data. If you re-read the same data in multiple partitions, the dataflow will process these duplicate items.
- abstract list_parts() List[str]#
List all local partitions this worker has access to.
You do not need to list all partitions globally.
Returns: Local partition keys.
- abstract build_part( ) StatefulSourcePartition[X, S]#
Build anew or resume an input partition.
Will be called once per execution for each partition key on a worker that reported that partition was local in
list_parts.Do not pre-build state about a partition in the constructor. All state must be derived from
resume_statefor recovery to work properly.Args: now: The current time.
for_part: Which partition to build. Will always be one of the keys returned by `list_parts` on this worker. resume_state: State data containing where in the input stream this partition should be begin reading during this execution.Returns: The built partition.
- class SimplePollingSource( )#
- Bases:
Calls a user defined function at a regular interval.
class URLSource(SimplePollingSource): … def init(self): … super(interval=timedelta(seconds=10)) … … def next_item(self): … res = requests.get(“https://example.com”) … if not res.ok: … raise SimplePollingSource.Retry(timedelta(seconds=1)) … return res.text
There is no parallelism; only one worker will poll this source.
Does not support storing any resume state. Thus these kind of sources only naively can support at-most-once processing.
This is best for low-throughput polling on the order of seconds to hours.
If you need a high-throughput source, or custom retry or timing, avoid this. Instead create a source using one of the other
Sourcesubclasses where you can have increased paralellism, batching, and finer control over timing.Initialization
Init.
Args: interval: The interval between calling
next_item. align_to: Align awake times to the given datetime. Defaults to now.- exception Retry#
- Bases:
Raise this to try to get items before the usual interval.
Args: timeout: How long to wait before calling
SimplePollingSource.next_itemagain.Initialization
Initialize self. See help(type(self)) for accurate signature.
- class Source#
-
A location to read input items from.
Base class for all input sources. Do not subclass this.
If you want to implement a custom connector, instead subclass one of the specific source sub-types below in this module.
- class StatefulSourcePartition#
-
Input partition that maintains state of its position.
- abstract next_batch( ) Iterable[X]#
Attempt to get the next batch of input items.
This must participate in a kind of cooperative multi-tasking, never blocking but returning an empty list if there are no items to emit yet.
Args: sched: The scheduled awake time, if one was returned by
next_awake.Returns: Items immediately ready. May be empty if no new items.
Raises: StopIteration: When the source is complete.
- next_awake() Optional[datetime]#
When to next attempt to get input items.
next_batch()will not be called until the most recently returned time has past.This will be called upon initialization of the source and after
next_batch(), but also possibly at other times. Multiple times are not stored; you must return the next awake time on every call, if any.If this returns
None,next_batch()will be called immediately unless the previous batch had no items, in which case there is a 1 millisecond delay.Use this instead of
time.sleepinnext_batch().Returns: Next awake time or
Noneto indicate automatic behavior.
- abstract snapshot() S#
Snapshot the position of the next read of this partition.
This will be returned to you via the
resume_stateparameter of your input builder.Be careful of “off by one” errors in resume state. This should return a state that, when built into a partition, resumes reading after the last read item item, not the same item that
next()last returned.This is guaranteed to never be called after
close().Returns: Resume state.
- class StatelessSourcePartition#
-
Input partition that is stateless.
- abstract next_batch(sched: datetime) Iterable[X]#
Attempt to get the next batch of input items.
This must participate in a kind of cooperative multi-tasking, never blocking but yielding an empty list if there are no new items yet.
Args: sched: The scheduled awake time.
Returns: Items immediately ready. May be empty if no new items.
Raises: StopIteration: When the source is complete.
- next_awake() Optional[datetime]#
When to next attempt to get input items.
next_batch()will not be called until the most recently returned time has past.This will be called upon initialization of the source and after
next_batch(), but also possibly at other times. Multiple times are not stored; you must return the next awake time on every call, if any.If this returns
None,next_batch()will be called immediately unless the previous batch had no items, in which case there is a 1 millisecond delay.Use this instead of
time.sleepinnext_batch().Returns: Next awake time or
Noneto indicate automatic behavior.
Functions#
- batch( ) Iterator[List[X]]#
Batch an iterable.
Use this to easily generate batches of items for a source’s
next_batchmethod.Args: ib: The underlying source iterable of items. batch_size: Maximum number of items to yield in a batch.
Yields: The next gathered batch of items.
- batch_async(
- aib: AsyncIterable[X],
- timeout: timedelta,
- batch_size: int,
- loop=None,
Batch an async iterable synchronously up to a timeout.
This allows using an async iterator as an input source. The
next_batchmethod on an input source must never block, this allows running an async iterator up to a timeout so that you correctly cooperatively multitask with the rest of the dataflow.Args: aib: The underlying source async iterable of items. timeout: Duration of time to repeatedly poll the source async iterator for items. batch_size: Maximum number of items to yield in a batch, even if the timeout has not been hit. loop: Custom
asynciorun loop to use, if any.Yields: The next gathered batch of items.
This function will take up to `timeout` time to yield, or will return a list with length up to `max_len`.
- batch_getter( ) Iterator[List[X]]#
Batch from a getter function that might not return an item.
Use this to easily generate batches of items for a source’s
next_batchmethod.Args: getter: Function to call to get the next item. Should raise
StopIterationon EOF.batch_size: Maximum number of items to yield in a batch. yield_on: Sentinel value that indicates that there are no more items yet, and to return the current batch. Defaults to `None`.Yields: The next gathered batch of items.
- batch_getter_ex( ) Iterator[List[X]]#
Batch from a getter function that raises on no items yet.
Use this to easily generate batches of items for a source’s
next_batchmethod.Args: getter: Function to call to get the next item. Should raise
StopIterationon EOF.batch_size: Maximum number of items to return in a batch. yield_ex: Exception raised by `getter` that indicates that there are no more items yet, and to return the current batch. Defaults to `queue.Empty`.Yields: The next gathered batch of items.