Module bytewax.inputs

Dataflow input sources.

Bytewax provides pre-packaged input configuration options for common sources you might want to read dataflow input from.

Use

Create an InputConfig subclass for the source you'd like to read from. Then pass that config object to the Dataflow constructor.

Expand source code
"""Dataflow input sources.

Bytewax provides pre-packaged input configuration options for common
sources you might want to read dataflow input from.

Use
---

Create an `InputConfig` subclass for the source you'd like to read
from. Then pass that config object to the `bytewax.dataflow.Dataflow`
constructor.

"""

from typing import Any, Iterable

from .bytewax import InputConfig, KafkaInputConfig, ManualInputConfig  # noqa: F401


class TestingInputConfig(ManualInputConfig):
    """Produce input from a Python iterable.
    You only want to use this for unit testing.

    The iterable must be identical on all workers; this will
    automatically distribute the items across workers and handle
    recovery.

    Args:

        it: Iterable for input.

    Returns:

        Config object. Pass this to the `bytewax.dataflow.Dataflow`
        constructor.

    """

    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, it):
        def gen(worker_index, worker_count, resume_state):
            resume_i = resume_state or 0
            for i, x in enumerate(distribute(it, worker_index, worker_count)):
                # FFWD to the resume item.
                if i < resume_i:
                    continue
                # Store the index in this worker's partition as the resume
                # state.
                yield (i + 1, x)
        return super().__new__(cls, gen)


def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]:
    """Distribute elements equally between a number of buckets and return
    the items for the given bucket index.

    No two buckets will get the same element.

    >>> list(distribute(["blue", "green", "red"], 0, 2))
    ['blue', 'red']
    >>> list(distribute(["blue", "green", "red"], 1, 2))
    ['green']

    Note that if you have more buckets than elements, some buckets
    will get nothing.

    >>> list(distribute(["blue", "green", "red"], 3, 5))
    []

    This is very useful when writing input builders and you want each
    of your workers to handle reading a disjoint partition of your
    input.

    For example this code:

    ```python
    from bytewax.dataflow import Dataflow
    from bytewax.execution import spawn_cluster
    from bytewax.inputs import ManualInputConfig, distribute
    from bytewax.outputs import StdOutputConfig

    def read_topics(topics):
       for topic in topics:
            for i in range(3):
                yield f"topic:{topic} item:{i}"

    def input_builder(worker_index, workers_count, resume_state):
        state = None
        all_topics = ["red", "green", "blue"]
        this_workers_topics = distribute(all_topics, worker_index, workers_count)
        for item in read_topics(this_workers_topics):
            yield (state, f"worker_index:{worker_index} {item}")

    flow = Dataflow()
    flow.input("input", ManualInputConfig(input_builder))
    flow.capture(StdOutputConfig())
    spawn_cluster(flow)
    ```

    Outputs (not in this order):

    ```
    worker_index:0 topic:red item:0
    worker_index:0 topic:red item:1
    worker_index:0 topic:red item:2
    worker_index:0 topic:blue item:0
    worker_index:0 topic:blue item:1
    worker_index:0 topic:blue item:2
    worker_index:1 topic:green item:0
    worker_index:1 topic:green item:1
    worker_index:1 topic:green item:2
    ```


    Args:

        elements: To distribute.

        index: Index of this bucket / worker starting at 0.

        count: Total number of buckets / workers.

    Returns:

        An iterator of the elements only in this bucket.

    """
    assert index < count, f"Highest index should only be {count - 1}; got {index}"
    for i, x in enumerate(elements):
        if i % count == index:
            yield x

Functions

def distribute(elements: Iterable[Any], index: int, count: int) ‑> Iterable[Any]

Distribute elements equally between a number of buckets and return the items for the given bucket index.

No two buckets will get the same element.

>>> list(distribute(["blue", "green", "red"], 0, 2))
['blue', 'red']
>>> list(distribute(["blue", "green", "red"], 1, 2))
['green']

Note that if you have more buckets than elements, some buckets will get nothing.

>>> list(distribute(["blue", "green", "red"], 3, 5))
[]

This is very useful when writing input builders and you want each of your workers to handle reading a disjoint partition of your input.

For example this code:

from bytewax.dataflow import Dataflow
from bytewax.execution import spawn_cluster
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.outputs import StdOutputConfig

def read_topics(topics):
   for topic in topics:
        for i in range(3):
            yield f"topic:{topic} item:{i}"

def input_builder(worker_index, workers_count, resume_state):
    state = None
    all_topics = ["red", "green", "blue"]
    this_workers_topics = distribute(all_topics, worker_index, workers_count)
    for item in read_topics(this_workers_topics):
        yield (state, f"worker_index:{worker_index} {item}")

flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))
flow.capture(StdOutputConfig())
spawn_cluster(flow)

Outputs (not in this order):

worker_index:0 topic:red item:0
worker_index:0 topic:red item:1
worker_index:0 topic:red item:2
worker_index:0 topic:blue item:0
worker_index:0 topic:blue item:1
worker_index:0 topic:blue item:2
worker_index:1 topic:green item:0
worker_index:1 topic:green item:1
worker_index:1 topic:green item:2

Args

elements
To distribute.
index
Index of this bucket / worker starting at 0.
count
Total number of buckets / workers.

Returns

An iterator of the elements only in this bucket.

Expand source code
def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]:
    """Distribute elements equally between a number of buckets and return
    the items for the given bucket index.

    No two buckets will get the same element.

    >>> list(distribute(["blue", "green", "red"], 0, 2))
    ['blue', 'red']
    >>> list(distribute(["blue", "green", "red"], 1, 2))
    ['green']

    Note that if you have more buckets than elements, some buckets
    will get nothing.

    >>> list(distribute(["blue", "green", "red"], 3, 5))
    []

    This is very useful when writing input builders and you want each
    of your workers to handle reading a disjoint partition of your
    input.

    For example this code:

    ```python
    from bytewax.dataflow import Dataflow
    from bytewax.execution import spawn_cluster
    from bytewax.inputs import ManualInputConfig, distribute
    from bytewax.outputs import StdOutputConfig

    def read_topics(topics):
       for topic in topics:
            for i in range(3):
                yield f"topic:{topic} item:{i}"

    def input_builder(worker_index, workers_count, resume_state):
        state = None
        all_topics = ["red", "green", "blue"]
        this_workers_topics = distribute(all_topics, worker_index, workers_count)
        for item in read_topics(this_workers_topics):
            yield (state, f"worker_index:{worker_index} {item}")

    flow = Dataflow()
    flow.input("input", ManualInputConfig(input_builder))
    flow.capture(StdOutputConfig())
    spawn_cluster(flow)
    ```

    Outputs (not in this order):

    ```
    worker_index:0 topic:red item:0
    worker_index:0 topic:red item:1
    worker_index:0 topic:red item:2
    worker_index:0 topic:blue item:0
    worker_index:0 topic:blue item:1
    worker_index:0 topic:blue item:2
    worker_index:1 topic:green item:0
    worker_index:1 topic:green item:1
    worker_index:1 topic:green item:2
    ```


    Args:

        elements: To distribute.

        index: Index of this bucket / worker starting at 0.

        count: Total number of buckets / workers.

    Returns:

        An iterator of the elements only in this bucket.

    """
    assert index < count, f"Highest index should only be {count - 1}; got {index}"
    for i, x in enumerate(elements):
        if i % count == index:
            yield x

Classes

class InputConfig

Base class for an input config.

These define how you will input data to your dataflow.

Use a specific subclass of InputConfig for the kind of input source you are plan to use. See the subclasses in this module.

Subclasses

class KafkaInputConfig (brokers, topic, tail, starting_offset, additional_properties)

Use Kafka as the input source.

Kafka messages will be passed through the dataflow as two-tuples of (key_bytes, payload_bytes).

Args

brokers : List[str]
List of host:port strings of Kafka brokers.
topic : str
Topic to which consumer will subscribe.
tail : bool
Wait for new data on this topic when the end is initially reached.
starting_offset : str
Can be "beginning" or "end". Delegates where to resume if auto_commit is not enabled. Defaults to "beginning".
additional_properties : dict
Any additional configuration properties. Note that consumer group settings will be ignored. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for more options.

Returns

Config object. Pass this as the input_config argument to the Dataflow.input().

Ancestors

Instance variables

var additional_properties

Return an attribute of instance, which is of type owner.

var brokers

Return an attribute of instance, which is of type owner.

var starting_offset

Return an attribute of instance, which is of type owner.

var tail

Return an attribute of instance, which is of type owner.

var topic

Return an attribute of instance, which is of type owner.

class ManualInputConfig (input_builder)

Use a user-defined function that returns an iterable as the input source.

Because Bytewax's execution is cooperative, the resulting iterators must not block waiting for new data, otherwise pending execution of other steps in the dataflow will be delayed an throughput will be reduced. If you are using a generator and no data is ready yet, have it yield None or just yield to signal this.

Args

input_builder
input_builder(worker_index: int, worker_count: int, resume_state: Option[Any]) => Iterator[Tuple[Any, Any]] Builder function which returns an iterator of 2-tuples of (state, item). item is the input that worker should introduce into the dataflow. state is a snapshot of any internal state it will take to resume this input from its current position after the current item. Note that e.g. returning the same list from each worker will result in duplicate data in the dataflow.

Returns

Config object. Pass this as the input_config argument to the Dataflow.input().

Ancestors

Subclasses

Instance variables

var input_builder

Return an attribute of instance, which is of type owner.

class TestingInputConfig (it)

Produce input from a Python iterable. You only want to use this for unit testing.

The iterable must be identical on all workers; this will automatically distribute the items across workers and handle recovery.

Args

it
Iterable for input.

Returns

Config object. Pass this to the Dataflow constructor.

Expand source code
class TestingInputConfig(ManualInputConfig):
    """Produce input from a Python iterable.
    You only want to use this for unit testing.

    The iterable must be identical on all workers; this will
    automatically distribute the items across workers and handle
    recovery.

    Args:

        it: Iterable for input.

    Returns:

        Config object. Pass this to the `bytewax.dataflow.Dataflow`
        constructor.

    """

    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, it):
        def gen(worker_index, worker_count, resume_state):
            resume_i = resume_state or 0
            for i, x in enumerate(distribute(it, worker_index, worker_count)):
                # FFWD to the resume item.
                if i < resume_i:
                    continue
                # Store the index in this worker's partition as the resume
                # state.
                yield (i + 1, x)
        return super().__new__(cls, gen)

Ancestors

Inherited members