bytewax.operators#

Built-in operators.

See the bytewax module docstring for the basics of building and running dataflows.

See bytewax.operators.window for windowing operators.

Submodules#

Data#

X: TypeVar#
Y: TypeVar#
V: TypeVar#
W: TypeVar#
S: TypeVar#
KeyedStream: TypeAlias#
EMPTY#

‘tuple(…)’

Classes#

class BranchOut#
Bases:

Streams returned from branch operator.

trues: Stream[X]#
falses: Stream[Y]#
class UnaryLogic#
Bases:

Abstract class to define the behavior of a unary operator.

The operator will call these methods in order: on_item once for any items queued, then on_notify if the notification time has passed, then on_eof if the upstream is EOF and no new items will be received this execution. If the logic is retained after all the above calls then notify_at will be called. snapshot is periodically called.

RETAIN: bool = False#
DISCARD: bool#
abstract on_item(
now: datetime,
value: V,
) Tuple[Iterable[W], bool]#

Called on each new upstream item.

This will be called multiple times in a row if there are multiple items from upstream.

Args: now: The current datetime.

value: The value of the upstream `(key, value)`.

Returns: A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in (key,     value) automatically.

abstract on_notify(
sched: datetime,
) Tuple[Iterable[W], bool]#

Called when the scheduled notification time has passed.

Args: sched: The scheduled notification time.

Returns: A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in (key,     value) automatically.

abstract on_eof() Tuple[Iterable[W], bool]#

The upstream has no more items on this execution.

This will only be called once per execution after on_item is done being called.

Returns: A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in (key,     value) automatically.

abstract notify_at() Optional[datetime]#

Return the next notification time.

This will be called once right after the logic is built, and if any of the on_* methods were called if the logic was retained by is_complete.

This must always return the next notification time. The operator only stores a single next time, so if

Returns: Scheduled time. If None, no on_notify callback will occur.

abstract snapshot() S#

Return a immutable copy of the state for recovery.

This will be called periodically by the runtime.

The value returned here will be passed back to the builder function of unary.unary when resuming.

The state must be pickle-able.

The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must copy.deepcopy or something equivalent before returning it here.

Functions#

branch(
step_id: str,
up: Stream[X],
predicate: Callable[[X], bool],
) BranchOut#

Divide items into two streams with a predicate.

import bytewax.operators as op from bytewax.testing import run_main, TestingSource flow = Dataflow(“branch_eg”) nums = op.input(“nums”, flow, TestingSource([1, 2, 3, 4, 5])) b_out = op.branch(“even_odd”, nums, lambda x: x % 2 == 0) evens = b_out.trues odds = b_out.falses _ = op.inspect(“evens”, evens) _ = op.inspect(“odds”, odds) run_main(flow) branch_eg.odds: 1 branch_eg.evens: 2 branch_eg.odds: 3 branch_eg.evens: 4 branch_eg.odds: 5

Args: step_id: Unique ID.

up: Stream to divide.

predicate: Function to call on each upstream item. Items for
    which this returns `True` will be put into one branch
    `Stream`; `False` the other branch `Stream`. If this
    function is a `typing.TypeGuard`, the downstreams will be
    properly typed.

Returns: A stream of items for which the predicate returns True, and a stream of items for which the predicate returns False.

flat_map_batch(
step_id: str,
up: Stream[X],
mapper: Callable[[List[X]], Iterable[Y]],
) Stream[Y]#

Transform an entire batch of items 1-to-many.

The batch size received here depends on the exact behavior of the upstream input sources and operators. It should be used as a performance optimization when processing multiple items at once has much reduced overhead.

See also the batch_size parameter on various input sources.

See also the batch operator, which collects multiple items next to each other in a stream into a single list of them flowing through the stream.

Args: step_id: Unique ID.

up: Stream.

mapper: Called once with each batch of items the runtime
    receives. Returns the items to emit downstream.

Returns: A stream of each item returned by the mapper.

input(
step_id: str,
flow: Dataflow,
source: Source[X],
) Stream[X]#

Introduce items into a dataflow.

See bytewax.inputs for more information on how input works. See bytewax.connectors for a buffet of our built-in connector types.

Args: step_id: Unique ID.

flow: The dataflow.

source: Read items from.

Returns: A stream of items from the source. See your specific source documentation for what kind of item that is.

This stream might be keyed. See your specific
`Source.stream_typ`.
inspect_debug(
step_id: str,
up: Stream[X],
inspector: Callable[[str, X, int, int], None] = _default_debug_inspector,
) Stream[X]#

Observe items, their worker, and their epoch for debugging.

import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.dataflow import Dataflow flow = Dataflow(“inspect_debug_eg”) s = op.input(“inp”, flow, TestingSource(range(3))) _ = op.inspect_debug(“help”, s) run_main(flow) inspect_debug_eg.help W0 @1: 0 inspect_debug_eg.help W0 @1: 1 inspect_debug_eg.help W0 @1: 2

Args: step_id: Unique ID.

up: Stream.

inspector: Called with the step ID, each item in the stream,
    the epoch of that item, and the worker processing the
    item. Defaults to printing out all the arguments.

Return: The upstream unmodified.

merge(
step_id: str,
*ups: Stream[X],
) Stream[X]#

Combine multiple streams together.

Args: step_id: Unique ID.

*ups: Streams.

Returns: A single stream of the same type as all the upstreams with items from all upstreams merged into it unmodified.

output(
step_id: str,
up: Stream[X],
sink: Sink[X],
) None#

Write items out of a dataflow.

See bytewax.outputs for more information on how output works. See bytewax.connectors for a buffet of our built-in connector types.

Args: step_id: Unique ID.

up: Stream of items to write. See your specific sink
    documentation for the required type of those items.

sink: Write items to.
redistribute(
step_id: str,
up: Stream[X],
) Stream[X]#

Redistribute items randomly across all workers.

Bytewax’s execution model has workers executing all steps, but the state in each step is partitioned across workers by some key. Bytewax will only exchange an item between workers before stateful steps in order to ensure correctness, that they interact with the correct state for that key. Stateless operators (like filter) are run on all workers and do not result in exchanging items before or after they are run.

This can result in certain ordering of operators to result in poor parallelization across an entire execution cluster. If the previous step (like a reduce_window or input with a PartitionedInput) concentrated items on a subset of workers in the cluster, but the next step is a CPU-intensive stateless step (like a map), it’s possible that not all workers will contribute to processing the CPU-intesive step.

This operation has a overhead, since it will need to serialize, send, and deserialize the items, so while it can significantly speed up the execution in some cases, it can also make it slower.

A good use of this operator is to parallelize an IO bound step, like a network request, or a heavy, single-cpu workload, on a machine with multiple workers and multiple cpu cores that would remain unused otherwise.

A bad use of this operator is if the operation you want to parallelize is already really fast as it is, as the overhead can overshadow the advantages of distributing the work. Another case where you could see regressions in performance is if the heavy CPU workload already spawns enough threads to use all the available cores. In this case multiple processes trying to compete for the cpu can end up being slower than doing the work serially. If the workers run on different machines though, it might again be a valuable use of the operator.

Use this operator with caution, and measure whether you get an improvement out of it.

Once the work has been spread to another worker, it will stay on those workers unless other operators explicitely move the item again (usually on output).

Args: step_id: Unique ID.

up: Stream.

Returns: Stream unmodified.

unary(
step_id: str,
up: KeyedStream[V],
builder: Callable[[datetime, Optional[S]], UnaryLogic[V, W, S]],
) KeyedStream[W]#

Advanced generic stateful operator.

This is the lowest-level operator Bytewax provides and gives you full control over all aspects of the operator processing and lifecycle. Usualy you will want to use a higher-level operator than this.

Subclass UnaryLogic to define its behavior. See documentation there.

Args: step_id: Unique ID.

up: Keyed stream.

builder: Called whenver a new key is encountered with the
    current `datetime` and the resume state returned from
    `UnaryLogic.snapshot` for this key, if any. This should
    close over any non-state configuration and combine it with
    the resume state to return the prepared `UnaryLogic` for
    the new key.

Returns: Keyed stream of all items returned from UnaryLogic.on_item, UnaryLogic.on_notify, and UnaryLogic.on_eof.

collect(
step_id: str,
up: KeyedStream[V],
timeout: timedelta,
max_size: int,
) KeyedStream[List[V]]#

Collect items into a list up to a size or a timeout.

See bytewax.operators.window.collect_window for more control over time.

Args: step_id: Unique ID.

up: Stream of individual items.

timeout: Timeout before emitting the list, even if `max_size`
    was not reached.

max_size: Emit the list once it reaches this size, even if
`timeout` was not reached.

Returns: A stream of upstream items gathered into lists.

count_final(
step_id: str,
up: Stream[X],
key: Callable[[X], str],
) KeyedStream[int]#

Count the number of occurrences of items in the entire stream.

This will only return counts once the upstream is EOF. You’ll need to use count_window on infinite data.

Args: step_id: Unique ID.

up: Stream of items to count.

key: Function to convert each item into a string key. The
    counting machinery does not compare the items directly,
    instead it groups by this string key.

Returns: A stream of (key, count) once the upstream is EOF.

flat_map(
step_id: str,
up: Stream[X],
mapper: Callable[[X], Iterable[Y]],
) Stream[Y]#

Transform items one-to-many.

This is like a combination of map and flatten.

It is commonly used for:

  • Tokenizing

  • Flattening hierarchical objects

  • Breaking up aggregations for further processing

import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.dataflow import Dataflow flow = Dataflow(“flat_map_eg”) inp = [“hello world”] s = op.input(“inp”, flow, TestingSource(inp)) def split_into_words(sentence): … return sentence.split() s = op.flat_map(“split_words”, s, split_into_words) _ = op.inspect(“out”, s) run_main(flow) flat_map_eg.out: ‘hello’ flat_map_eg.out: ‘world’

Args: step_id: Unique ID.

up: Stream.

mapper: Called once on each upstream item. Returns the items
    to emit downstream.

Returns: A stream of each item returned by the mapper.

flat_map_value(
step_id: str,
up: KeyedStream[V],
mapper: Callable[[V], Iterable[W]],
) KeyedStream[W]#

Transform values one-to-many.

Args: step_id: Unique ID.

up: Keyed stream.

mapper: Called once on each upstream value. Returns the values
    to emit downstream.

Returns: A keyed stream of each value returned by the mapper.

flatten(
step_id: str,
up: Stream[Iterable[X]],
) Stream[X]#

Move all sub-items up a level.

Args: step_id: Unique ID.

up: Stream of iterables.

Returns: A stream of the items within each iterable in the upstream.

filter(
step_id: str,
up: Stream[X],
predicate: Callable[[X], bool],
) Stream[X]#

Keep only some items.

It is commonly used for:

  • Selecting relevant events

  • Removing empty events

  • Removing sentinels

  • Removing stop words

import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.dataflow import Dataflow flow = Dataflow(“filter_eg”) s = op.input(“inp”, flow, TestingSource(range(4))) def is_odd(item): … return item % 2 != 0 s = op.filter(“filter_odd”, s, is_odd) _ = op.inspect(“out”, s) run_main(flow) filter_eg.out: 1 filter_eg.out: 3

Args: step_id: Unique ID.

up: Stream.

predicate: Called with each upstream item. Only items for
    which this returns true `True` will be emitted downstream.

Returns: A stream with only the upstream items for which the predicate returns True.

filter_value(
step_id: str,
up: KeyedStream[V],
predicate: Callable[[V], bool],
) KeyedStream[V]#

Selectively keep only some items from a keyed stream.

Args: step_id: Unique ID.

up: Keyed stream.

predicate: Will be called with each upstream value. Only
    values for which this returns `True` will be emitted
    downstream.

Returns: A keyed stream with only the upstream pairs for which the predicate returns True.

filter_map(
step_id: str,
up: Stream[X],
mapper: Callable[[X], Optional[Y]],
) Stream[Y]#

A one-to-maybe-one transformation of items.

This is like a combination of map and then filter with a predicate removing None values.

import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.dataflow import Dataflow flow = Dataflow(“filter_map_eg”) s = op.input( … “inp”, … flow, … TestingSource( … [ … {“key”: “a”, “val”: 1}, … {“bad”: “obj”}, … ] … ), … ) def validate(data): … if type(data) != dict or “key” not in data: … return None … else: … return data[“key”], data s = op.filter_map(“validate”, s, validate) _ = op.inspect(“out”, s) run_main(flow) filter_map_eg.out: (‘a’, {‘key’: ‘a’, ‘val’: 1})

Args: step_id: Unique ID.

up: Stream.

mapper: Called on each item. Each return value is emitted
    downstream, unless it is `None`.

Returns: A stream of items returned from the mapper, unless it is None.

fold_final(
step_id: str,
up: KeyedStream[V],
builder: Callable[[], S],
folder: Callable[[S, V], S],
) KeyedStream[S]#

Build an empty accumulator, then combine values into it.

It is like reduce_final but uses a function to build the initial value.

Args: step_id: Unique ID.

up: Keyed stream.

builder: Called the first time a key appears and is expected
    to return the empty accumulator for that key.

folder: Combines a new value into an existing accumulator and
    returns the updated accumulator. The accumulator is
    initially the empty accumulator.

Returns: A keyed stream of the accumulators. Only once the upstream is EOF.

inspect(
step_id: str,
up: Stream[X],
inspector: Callable[[str, X], None] = _default_inspector,
) Stream[X]#

Observe items for debugging.

import bytewax.operators as op from bytewax.testing import run_main, TestingSource from bytewax.dataflow import Dataflow flow = Dataflow(“my_flow”) s = op.input(“inp”, flow, TestingSource(range(3))) _ = op.inspect(“help”, s) run_main(flow) my_flow.help: 0 my_flow.help: 1 my_flow.help: 2

Args: step_id: Unique ID.

up: Stream.

inspector: Called with the step ID and each item in the
    stream. Defaults to printing the step ID and each item.

Returns: The upstream unmodified.

join(
step_id: str,
*sides: KeyedStream[Any],
running: bool = False,
) KeyedStream[Tuple]#

Gather together the value for a key on multiple streams.

Args: step_id: Unique ID.

*sides: Keyed streams.

running: If `True`, perform a "running join" and, emit the
    current set of values (if any) each time a new value
    arrives. The set of values will _never be discarded_ so
    might result in unbounded memory use. If `False`, perform
    a "complete join" and, only emit once there is a value on
    each stream, then discard the set. Defaults to `False`.

Returns: Emits a tuple with the value from each stream in the order of the argument list. If running is True, some values might be None.

join_named(
step_id: str,
running: bool = False,
**sides: KeyedStream[Any],
) KeyedStream[Dict[str, Any]]#

Gather together the value for a key on multiple named streams.

Args: step_id: Unique ID.

**sides: Named keyed streams. The name of each stream will be
    used in the emitted `dict`s.

running: If `True`, perform a "running join" and, emit the
    current set of values (if any) each time a new value
    arrives. The set of values will _never be discarded_ so
    might result in unbounded memory use. If `False`, perform
    a "complete join" and, only emit once there is a value on
    each stream, then discard the set. Defaults to `False`.

Returns: Emits a dict mapping the name to the value from each stream. If running is True, some names might be missing from the dict.

key_on(
step_id: str,
up: Stream[X],
key: Callable[[X], str],
) KeyedStream[X]#

Add a key for each item.

This allows you to use all the keyed operators that only are methods on KeyedStream.

Args: step_id: Unique ID.

up: Stream.

key: Called on each item and should return the key for that
    item.

Returns: A stream of 2-tuples of (key, item) AKA a keyed stream. The keys come from the return value of the key function; upstream items will automatically be attached as values.

map(
step_id: str,
up: Stream[X],
mapper: Callable[[X], Y],
) Stream[Y]#

Transform items one-by-one.

It is commonly used for:

  • Serialization and deserialization.

  • Selection of fields.

import bytewax.operators as op from bytewax.testing import run_main, TestingSource from bytewax.dataflow import Dataflow flow = Dataflow(“map_eg”) s = op.input(“inp”, flow, TestingSource(range(3))) def add_one(item): … return item + 10 s = op.map(“add_one”, s, add_one) _ = op.inspect(“out”, s) run_main(flow) map_eg.out: 10 map_eg.out: 11 map_eg.out: 12

Args: step_id: Unique ID.

up: Stream.

mapper: Called on each item. Each return value is emitted
    downstream.

Returns: A stream of items returned from the mapper.

map_value(
step_id: str,
up: KeyedStream[V],
mapper: Callable[[V], W],
) KeyedStream[W]#

Transform values one-by-one.

Args: step_id: Unique ID.

up: Keyed stream.

mapper: Called on each value. Each return value is emitted
    downstream.

Returns: A keyed stream of values returned from the mapper. The key is unchanged.

max_final(
step_id: str,
up: KeyedStream[V],
by=_identity,
) KeyedStream#

Find the maximum value for each key.

Args: step_id: Unique ID.

up: Keyed stream.

by: A function called on each value that is used to extract
    what to compare.

Returns: A keyed stream of the max values. Only once the upstream is EOF.

min_final(
step_id: str,
up: KeyedStream[V],
by=_identity,
) KeyedStream#

Find the minumum value for each key.

Args: step_id: Unique ID.

up: Keyed stream.

by: A function called on each value that is used to extract
    what to compare.

Returns: A keyed stream of the min values. Only once the upstream is EOF.

raises(step_id: str, up: Stream[Any]) None#

Raise an exception and crash the dataflow on any item.

Args: step_id: Unique ID.

up: Any item on this stream will throw a `RuntimeError`.
reduce_final(
step_id: str,
up: KeyedStream[V],
reducer: Callable[[V, V], V],
) KeyedStream[V]#

Distill all values for a key down into a single value.

It is like fold_final but the first value is the initial accumulator.

Args: step_id: Unique ID.

up: Keyed stream.

reducer: Combines a new value into an old value and returns
    the combined value.

Returns: A keyed stream of the accumulators. Only once the upstream is EOF.

stateful_map(
step_id: str,
up: KeyedStream[V],
builder: Callable[[], S],
mapper: Callable[[S, V], Tuple[Optional[S], W]],
) KeyedStream[W]#

Transform values one-to-one, referencing a persistent state.

It is commonly used for:

  • Anomaly detection

  • State machines

import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.dataflow import Dataflow flow = Dataflow(“stateful_map_eg”) inp = [ … “a”, … “a”, … “a”, … “b”, … “a”, … ] s = op.input(“inp”, flow, TestingSource(inp)) s = op.key_on(“self_as_key”, s, lambda x: x) def build_count(): … return 0 def check(running_count, _item): … running_count += 1 … return (running_count, running_count) s = op.stateful_map(“running_count”, s, build_count, check) _ = op.inspect(“out”, s) run_main(flow) stateful_map_eg.out: (‘a’, 1) stateful_map_eg.out: (‘a’, 2) stateful_map_eg.out: (‘a’, 3) stateful_map_eg.out: (‘b’, 1) stateful_map_eg.out: (‘a’, 4)

Args: step_id: Unique ID.

up: Keyed stream.

builder: Called whenever a new key is encountered and should
    return the "empty state" for this key.

mapper: Called whenever a value is encountered from upstream
    with the last state, and then the upstream value. Should
    return a 2-tuple of `(updated_state, emit_value)`. If the
    updated state is `None`, discard it.

Returns: A keyed stream.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now