Package bytewax

Bytewax is an open source Python framework for building highly scalable dataflows in a streaming or batch context.

See our readme for more documentation.

Expand source code
"""Bytewax is an open source Python framework for building highly
scalable dataflows in a streaming or batch context.

[See our readme for more
documentation.](https://github.com/bytewax/bytewax)

"""
from .bytewax import cluster_main, Dataflow, run_main
from .execution import run, run_cluster, spawn_cluster

__all__ = [
    "Dataflow",
    "run_main",
    "run",
    "run_cluster",
    "spawn_cluster",
    "cluster_main",
]

__pdoc__ = {
    # This is the PyO3 module that has to be named "bytewax". Hide it
    # since we import all its members here.
    "bytewax": False,
    # Hide execution because we import all its members here.
    "execution": False,
}

Sub-modules

bytewax.inputs

Helpers to let you quickly define epoch / batching semantics …

bytewax.parse

Helpers to read execution arguments from the environment or command line.

bytewax.recovery

Bytewax's state recovery machinery …

bytewax.testing

Helper tools for testing dataflows.

Functions

def cluster_main(flow, input_config, output_builder, addresses, proc_id, *, recovery_config, worker_count_per_proc)

Execute a dataflow in the current process as part of a cluster.

You have to coordinate starting up all the processes in the cluster and ensuring they each are assigned a unique ID and know the addresses of other processes. You'd commonly use this for starting processes as part of a Kubernetes cluster.

Blocks until execution is complete.

>>> flow = Dataflow()
>>> def input_builder(worker_index, worker_count, resume_epoch):
...     for epoch, item in enumerate(range(resume_epoch, 3)):
...         yield AdvanceTo(epoch)
...         yield Emit(item)
>>> def output_builder(worker_index, worker_count):
...     return print
>>> cluster_main(flow, ManualInput(input_builder), output_builder)  # doctest: +ELLIPSIS
(...)

See run_main() for a way to test input and output builders without the complexity of starting a cluster.

See run_cluster() for a convenience method to pass data through a dataflow for notebook development.

See spawn_cluster() for starting a simple cluster locally on one machine.

Args

flow
Dataflow to run.
input_config
Input config of type Manual or Kafka. See bytewax.inputs.
output_builder
Returns a callback function for each worker thread, called with (epoch, item) whenever and item passes by a capture operator on this process.
addresses
List of host/port addresses for all processes in this cluster (including this one).
proc_id
Index of this process in cluster; starts from 0.
recovery_config
State recovery config. See bytewax.recovery. If None, state will not be persisted.
worker_count_per_proc
Number of worker threads to start on each process.
def run(flow: Dataflow, inp: Iterable[Tuple[int, Any]]) ‑> List[Tuple[int, Any]]

Pass data through a dataflow running in the current thread.

Blocks until execution is complete.

Output is collected into a list before returning, thus output must be finite.

Handles distributing input and collecting output. You'd commonly use this for tests or prototyping in notebooks.

>>> flow = Dataflow()
>>> flow.map(str.upper)
>>> flow.capture()
>>> out = run(flow, [(0, "a"), (1, "b"), (2, "c")])
>>> sorted(out)
[(0, 'A'), (1, 'B'), (2, 'C')]

Args

flow
Dataflow to run.
inp
Input data. If you are recovering a stateful dataflow, your input should resume from the last finalized epoch.

Returns

List of (epoch, item) tuples seen by capture operators.

Expand source code
def run(
    flow: Dataflow,
    inp: Iterable[Tuple[int, Any]],
) -> List[Tuple[int, Any]]:
    """Pass data through a dataflow running in the current thread.

    Blocks until execution is complete.

    Output is collected into a list before returning, thus output must
    be finite.

    Handles distributing input and collecting output. You'd commonly
    use this for tests or prototyping in notebooks.

    >>> flow = Dataflow()
    >>> flow.map(str.upper)
    >>> flow.capture()
    >>> out = run(flow, [(0, "a"), (1, "b"), (2, "c")])
    >>> sorted(out)
    [(0, 'A'), (1, 'B'), (2, 'C')]

    Args:

        flow: Dataflow to run.

        inp: Input data. If you are recovering a stateful dataflow,
            your input should resume from the last finalized epoch.

    Returns:

        List of `(epoch, item)` tuples seen by capture operators.

    """

    def input_builder(worker_index, worker_count, resume_epoch):
        assert resume_epoch == 0, "Recovery doesn't work with iterator based input"
        assert worker_index == 0
        for epoch, item in inp:
            yield AdvanceTo(epoch)
            yield Emit(item)

    out = []

    def output_builder(worker_index, worker_count):
        assert worker_index == 0
        return out.append

    "Only manual configuration works with iterator based input"
    run_main(
        flow,
        ManualInputConfig(input_builder),
        output_builder,
    )

    return out
def run_cluster(flow: Dataflow, inp: Iterable[Tuple[int, Any]], *, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=<multiprocess.context.SpawnContext object>) ‑> List[Tuple[int, Any]]

Pass data through a dataflow running as a cluster of processes on this machine. Blocks until execution is complete.

Both input and output are collected into lists, thus both must be finite.

Starts up cluster processes for you, handles connecting them together, distributing input, and collecting output. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.

>>> from bytewax.testing import doctest_ctx
>>> flow = Dataflow()
>>> flow.map(str.upper)
>>> flow.capture()
>>> out = run_cluster(
...     flow,
...     [(0, "a"), (1, "b"), (2, "c")],
...     proc_count=2,
...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
... )
>>> sorted(out)
[(0, 'A'), (1, 'B'), (2, 'C')]

See spawn_cluster() for starting a cluster on this machine with full control over inputs and outputs.

See cluster_main() for starting one process in a cluster in a distributed situation.

Args

flow
Dataflow to run.
inp
Input data. Will be reified to a list before sending to processes. Will be partitioned between workers for you. If you are recovering a stateful dataflow, you must ensure your input resumes from the last finalized epoch.
proc_count
Number of processes to start.
worker_count_per_proc
Number of worker threads to start on each process.
mp_ctx
multiprocessing context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn.

Returns

List of (epoch, item) tuples seen by capture operators.

Expand source code
def run_cluster(
    flow: Dataflow,
    inp: Iterable[Tuple[int, Any]],
    *,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
    mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:
    """Pass data through a dataflow running as a cluster of processes on
    this machine.
    Blocks until execution is complete.

    Both input and output are collected into lists, thus both must be
    finite.

    Starts up cluster processes for you, handles connecting them
    together, distributing input, and collecting output. You'd
    commonly use this for notebook analysis that needs parallelism and
    higher throughput, or simple stand-alone demo programs.

    >>> from bytewax.testing import doctest_ctx
    >>> flow = Dataflow()
    >>> flow.map(str.upper)
    >>> flow.capture()
    >>> out = run_cluster(
    ...     flow,
    ...     [(0, "a"), (1, "b"), (2, "c")],
    ...     proc_count=2,
    ...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
    ... )
    >>> sorted(out)
    [(0, 'A'), (1, 'B'), (2, 'C')]

    See `bytewax.spawn_cluster()` for starting a cluster on this
    machine with full control over inputs and outputs.

    See `bytewax.cluster_main()` for starting one process in a cluster
    in a distributed situation.

    Args:

        flow: Dataflow to run.

        inp: Input data. Will be reified to a list before sending to
            processes. Will be partitioned between workers for you. If
            you are recovering a stateful dataflow, you must ensure
            your input resumes from the last finalized epoch.

        proc_count: Number of processes to start.

        worker_count_per_proc: Number of worker threads to start on
            each process.

        mp_ctx: `multiprocessing` context to use. Use this to
            configure starting up subprocesses via spawn or
            fork. Defaults to spawn.

    Returns:

        List of `(epoch, item)` tuples seen by capture operators.
    """
    # A Manager starts up a background process to manage shared state.
    with mp_ctx.Manager() as man:
        inp = man.list(list(inp))

        def input_builder(worker_index, worker_count, resume_epoch):
            assert resume_epoch == 0, "Recovery doesn't work with iterator based input"
            for i, epoch_item in enumerate(inp):
                if i % worker_count == worker_index:
                    (epoch, item) = epoch_item
                    yield AdvanceTo(epoch)
                    yield Emit(item)

        out = man.list()

        def output_builder(worker_index, worker_count):
            return out.append

        spawn_cluster(
            flow,
            ManualInputConfig(input_builder),
            output_builder,
            proc_count=proc_count,
            worker_count_per_proc=worker_count_per_proc,
            mp_ctx=mp_ctx,
        )

        # We have to copy out the shared state before process
        # shutdown.
        return list(out)
def run_main(flow, input_config, output_builder, *, recovery_config)

Execute a dataflow in the current thread.

Blocks until execution is complete.

You'd commonly use this for prototyping custom input and output builders with a single worker before using them in a cluster setting.

>>> flow = Dataflow()
>>> flow.capture()
>>> def input_builder(worker_index, worker_count, resume_epoch):
...     for epoch, item in enumerate(range(resume_epoch, 3)):
...         yield AdvanceTo(epoch)
...         yield Emit(item)
>>> def output_builder(worker_index, worker_count):
...     return print
>>> run_main(flow, ManualConfig(input_builder), output_builder)  # doctest: +ELLIPSIS
(...)

See run() for a convenience method to not need to worry about input or output builders.

See spawn_cluster() for starting a cluster on this machine with full control over inputs and outputs.

Args

flow
Dataflow to run.
input_config
Input config of type Manual or Kafka. See bytewax.inputs.
output_builder
Returns a callback function for each worker thread, called with (epoch, item) whenever and item passes by a capture operator on this process.
recovery_config
State recovery config. See bytewax.recovery. If None, state will not be persisted.
def spawn_cluster(flow: Dataflow, input_config: InputConfig, output_builder: Callable[[int, int, int], Callable[[Tuple[int, Any]], None]], *, recovery_config: Optional[RecoveryConfig] = None, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=<multiprocess.context.SpawnContext object>) ‑> List[Tuple[int, Any]]

Execute a dataflow as a cluster of processes on this machine.

Blocks until execution is complete.

Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.

>>> from bytewax.testing import doctest_ctx
>>> flow = Dataflow()
>>> flow.capture()
>>> def input_builder(worker_index, worker_count, resume_epoch):
...   for epoch, item in enumerate(range(resume_epoch, 3)):
...     yield AdvanceTo(epoch)
...     yield Emit(item)
>>> def output_builder(worker_index, worker_count):
...     return print
>>> spawn_cluster(
...     flow,
...     ManualInputConfig(input_builder),
...     output_builder,
...     proc_count=2,
...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
... )  # doctest: +ELLIPSIS
(...)

See run_main() for a way to test input and output builders without the complexity of starting a cluster.

See run_cluster() for a convenience method to pass data through a dataflow for notebook development.

See cluster_main() for starting one process in a cluster in a distributed situation.

Args

flow
Dataflow to run.
input_config
Input config of type Manual or Kafka. See bytewax.inputs.
output_builder
Returns a callback function for each worker thread, called with (epoch, item) whenever and item passes by a capture operator on this process.
recovery_config
State recovery config. See bytewax.recovery. If None, state will not be persisted.
proc_count
Number of processes to start.
worker_count_per_proc
Number of worker threads to start on each process.
mp_ctx
multiprocessing context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn.
Expand source code
def spawn_cluster(
    flow: Dataflow,
    input_config: InputConfig,
    output_builder: Callable[[int, int, int], Callable[[Tuple[int, Any]], None]],
    *,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
    mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:
    """Execute a dataflow as a cluster of processes on this machine.

    Blocks until execution is complete.

    Starts up cluster processes for you and handles connecting them
    together. You'd commonly use this for notebook analysis that needs
    parallelism and higher throughput, or simple stand-alone demo
    programs.

    >>> from bytewax.testing import doctest_ctx
    >>> flow = Dataflow()
    >>> flow.capture()
    >>> def input_builder(worker_index, worker_count, resume_epoch):
    ...   for epoch, item in enumerate(range(resume_epoch, 3)):
    ...     yield AdvanceTo(epoch)
    ...     yield Emit(item)
    >>> def output_builder(worker_index, worker_count):
    ...     return print
    >>> spawn_cluster(
    ...     flow,
    ...     ManualInputConfig(input_builder),
    ...     output_builder,
    ...     proc_count=2,
    ...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
    ... )  # doctest: +ELLIPSIS
    (...)

    See `bytewax.run_main()` for a way to test input and output
    builders without the complexity of starting a cluster.

    See `bytewax.run_cluster()` for a convenience method to pass data
    through a dataflow for notebook development.

    See `bytewax.cluster_main()` for starting one process in a cluster
    in a distributed situation.

    Args:

        flow: Dataflow to run.

        input_config: Input config of type Manual or Kafka. See `bytewax.inputs`.

        output_builder: Returns a callback function for each worker
            thread, called with `(epoch, item)` whenever and item
            passes by a capture operator on this process.

        recovery_config: State recovery config. See
            `bytewax.recovery`. If `None`, state will not be
            persisted.

        proc_count: Number of processes to start.

        worker_count_per_proc: Number of worker threads to start on
            each process.

        mp_ctx: `multiprocessing` context to use. Use this to
            configure starting up subprocesses via spawn or
            fork. Defaults to spawn.

    """
    addresses = _gen_addresses(proc_count)
    with mp_ctx.Pool(processes=proc_count) as pool:
        futures = [
            pool.apply_async(
                cluster_main,
                (
                    flow,
                    input_config,
                    output_builder,
                ),
                {
                    "recovery_config": recovery_config,
                    "addresses": addresses,
                    "proc_id": proc_id,
                    "worker_count_per_proc": worker_count_per_proc,
                },
            )
            for proc_id in range(proc_count)
        ]
        pool.close()

        for future in futures:
            # Will re-raise exceptions from subprocesses.
            future.get()

        pool.join()

Classes

class Dataflow

A definition of a Bytewax dataflow graph.

Use the methods defined on this class to add steps with operators of the same name.

See the execution functions in bytewax to run.

TODO: Right now this is just a linear dataflow only.

Methods

def capture(self)

Capture is how you specify output of a dataflow.

At least one capture is required on every dataflow.

It emits items downstream unmodified; you can capture midway through a dataflow.

Whenever an item flows into a capture operator, the output handler of the worker is called with that item and epoch. For run() and run_cluster() output handlers are setup for you that return the output as the return value.

There are no guarantees on the order that output is passed to the output handler. Read the attached epoch to discern order.

>>> flow = Dataflow()
>>> flow.capture()
>>> inp = enumerate(range(3))
>>> sorted(run(flow, inp))
[(0, 0), (1, 1), (2, 2)]
def filter(self, predicate)

Filter selectively keeps only some items.

It calls a predicate function on each item.

It emits the item downstream unmodified if the predicate returns True.

It is commonly used for:

  • Selecting relevant events
  • Removing empty events
  • Removing sentinels
  • Removing stop words
>>> def is_odd(item):
...     return item % 2 != 0
>>> flow = Dataflow()
>>> flow.filter(is_odd)
>>> flow.capture()
>>> inp = enumerate(range(3))
>>> for epoch, item in run(flow, inp):
...    print(item)
1
3

Args

predicate
predicate(item: Any) => should_emit: bool
def flat_map(self, mapper)

Flat map is a one-to-many transformation of items.

It calls a mapper function on each item.

It emits each element in the returned iterator individually downstream in the epoch of the input item.

It is commonly used for:

  • Tokenizing
  • Flattening hierarchical objects
  • Breaking up aggregations for further processing
>>> def split_into_words(sentence):
...     return sentence.split()
>>> flow = Dataflow()
>>> flow.flat_map(split_into_words)
>>> flow.capture()
>>> inp = enumerate(["hello world"])
>>> for epoch, item in run(flow, inp):
...     print(epoch, item)
0 hello
0 world

Args

mapper
mapper(item: Any) => emit: Iterable[Any]
def inspect(self, inspector)

Inspect allows you to observe, but not modify, items.

It calls an inspector callback on each item.

It emits items downstream unmodified.

It is commonly used for debugging.

>>> def log(item):
...     print("Saw", item)
>>> flow = Dataflow()
>>> flow.inspect(log)
>>> flow.capture()
>>> inp = enumerate(range(3))
>>> for epoch, item in run(flow, inp):
...     pass  # Don't print captured output.
Saw 1
Saw 2
Saw 3

Args

inspector
inspector(item: Any) => None
def inspect_epoch(self, inspector)

Inspect epoch allows you to observe, but not modify, items and their epochs.

It calls an inspector function on each item with its epoch.

It emits items downstream unmodified.

It is commonly used for debugging.

>>> def log(epoch, item):
...    print(f"Saw {item} @ {epoch}")
>>> flow = Dataflow()
>>> flow.inspect_epoch(log)
>>> flow.capture()
>>> inp = enumerate(range(3))
>>> for epoch, item in run(flow, inp):
...    pass  # Don't print captured output.

Args

inspector
inspector(epoch: int, item: Any) => None
def map(self, mapper)

Map is a one-to-one transformation of items.

It calls a mapper function on each item.

It emits each updated item downstream.

It is commonly used for:

  • Extracting keys
  • Turning JSON into objects
  • So many things
>>> def add_one(item):
...     return item + 10
>>> flow = Dataflow()
>>> flow.map(add_one)
>>> flow.capture()
>>> inp = enumerate(range(3))
>>> for epoch, item in run(flow, inp):
...     print(item)
10
11
12

Args

mapper
mapper(item: Any) => updated_item: Any
def reduce(self, step_id, reducer, is_complete)

Reduce lets you combine items for a key into an accumulator in epoch order.

It is a stateful operator. It requires the the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant accumulator.

It is a recoverable operator. It requires a step ID to recover the correct state.

It calls two functions:

  • A reducer which combines a new value with an accumulator. The accumulator is initially the first value seen for a key. Values will be passed in epoch order, but no order is defined within an epoch. If there is only a single value for a key since the last completion, this function will not be called.

  • An is complete function which returns True if the most recent (key, accumulator) should be emitted downstream and the accumulator for that key forgotten. If there was only a single value for a key, it is passed in as the accumulator here.

It emits (key, accumulator) tuples downstream when the is complete function returns True in the epoch of the most recent value for that key.

It is commonly used for:

  • Sessionization
  • Emitting a summary of data that spans epochs
>>> def user_as_key(event):
...     return event["user"], [event]
>>> def extend_session(session, events):
...     session.extend(events)
...     return session
>>> def session_complete(session):
...     return any(event["type"] == "logout" for event in session)
>>> flow = Dataflow()
>>> flow.map(user_as_key)
>>> flow.inspect_epoch(lambda epoch, item: print("Saw", item, "@", epoch))
>>> flow.reduce("sessionizer", extend_session, session_complete)
>>> flow.capture()
>>> inp = [
...     (0, {"user": "a", "type": "login"}),
...     (1, {"user": "a", "type": "post"}),
...     (1, {"user": "b", "type": "login"}),
...     (2, {"user": "a", "type": "logout"}),
...     (3, {"user": "b", "type": "logout"}),
... ]
>>> for epoch, item in run(flow, inp):
...     print(epoch, item)

Args

step_id
Uniquely identifies this step for recovery.
reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
is_complete
is_complete(updated_accumulator: Any) => should_emit: bool
def reduce_epoch(self, reducer)

Reduce epoch lets you combine all items for a key within an epoch into an accumulator.

It is like Dataflow.reduce() but marks the accumulator as complete automatically at the end of each epoch.

It is a stateful operator. it requires the the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant accumulator.

It calls a reducer function which combines two values. The accumulator is initially the first value seen for a key. Values will be passed in arbitrary order. If there is only a single value for a key in this epoch, this function will not be called.

It emits (key, accumulator) tuples downstream at the end of each epoch.

It is commonly used for:

  • Counting within epochs
  • Accumulation within epochs
>>> def add_initial_count(event):
...     return event["user"], 1
>>> def count(count, event_count):
...     return count + event_count
>>> flow = Dataflow()
>>> flow.map(add_initial_count)
>>> flow.inspect_epoch(lambda epoch, item: print("Saw", item, "@", epoch))
>>> flow.reduce_epoch(count)
>>> flow.capture()
>>> inp = [
...     (0, {"user": "a", "type": "login"}),
...     (0, {"user": "a", "type": "post"}),
...     (0, {"user": "b", "type": "login"}),
...     (1, {"user": "b", "type": "post"}),
... ]
>>> for epoch, item in run(flow, inp):
...     print(epoch, item)
Saw ('a', 1) @ 0
Saw ('b', 1) @ 0
Saw ('a', 1) @ 0
Saw ('b', 1) @ 1
0 ('b', 1)
0 ('a', 2)
1 ('b', 1)

Args

reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
def reduce_epoch_local(self, reducer)

Reduce epoch local lets you combine all items for a key within an epoch on a single worker.

It is exactly like Dataflow.reduce_epoch() but does not ensure all values for a key are routed to the same worker and thus there is only one output accumulator per key.

You should use Dataflow.reduce_epoch() unless you need a network-overhead optimization and some later step does full accumulation.

It is only used for performance optimziation.

Args

reducer
reducer(accumulator: Any, value: Any) => updated_accumulator: Any
def stateful_map(self, step_id, builder, mapper)

Stateful map is a one-to-one transformation of values, but allows you to reference a persistent state for each key when doing the transformation.

It is a stateful operator. It requires the the input stream has items that are (key: str, value) tuples so we can ensure that all relevant values are routed to the relevant state.

It is a recoverable operator. It requires a step ID to recover the correct state.

It calls two functions:

  • A builder which returns a new state and will be called whenever a new key is encountered with the key as a parameter.

  • A mapper which transforms values. Values will be passed in epoch order, but no order is defined within an epoch. If the updated state is None, the state will be forgotten.

It emits a (key, updated_value) tuple downstream for each input item.

It is commonly used for:

  • Anomaly detection
  • State machines
>>> def self_as_key(item):
...     return item, item
>>> def build_count(key):
...     return 0
>>> def check(running_count, item):
...     running_count += 1
...     if running_count == 1:
...         return running_count, item
...     else:
...         return running_count, None
>>> def remove_none_and_key(key_item):
...     key, item = key_item
...     if item is None:
...         return []
...     else:
...         return [item]
>>> flow = Dataflow()
>>> flow.map(self_as_key)
>>> flow.stateful_map("remove_duplicates", build_count, check)
>>> flow.flat_map(remove_none_and_key)
>>> flow.capture()
>>> inp = [
...     (0, "a"),
...     (0, "a"),
...     (0, "a"),
...     (1, "a"),
...     (1, "b"),
... ]
>>> for epoch, item in run(flow, inp):
...     print(epoch, item)
0 a
1 b

Args

step_id
Uniquely identifies this step for recovery.
builder
builder(key: Any) => new_state: Any
mapper
mapper(state: Any, value: Any) => (updated_state: Any, updated_value: Any)