bytewax.dataflow#

Data model for dataflows and custom operators.

See bytewax.operators for the built-in operators.

Data#

P: ParamSpec#

Signature of an operator function.

R: TypeVar#

Return type of an operator function.

N: TypeVar#

Type of name of each stream.

Usually either int if derived from *args or str if derived from **kwargs.

X_co: TypeVar#

Type contained within a Stream.

F: TypeVar#

Type of operator builder function.

Classes#

class Port#
Bases:

Generic interface to a port.

Either SinglePort or MultiPort.

port_id: str#
stream_ids: Dict[str, str]#
class SinglePort#

A input or output location on an Operator.

You won’t be instantiating this manually. The operator decorator will create these for you whenever an operator function takes or returns a Stream.

port_id: str#
stream_id: str#
property stream_ids: Dict[str, str]#

Allow this to conform to the Port protocol.

class MultiPort#
Bases:

A multi-stream input or output location on an Operator.

You won’t be instantiating this manually. The operator decorator will create these for you whenever an operator function takes or returns a *args of Stream or **kwargs of Streams.

port_id: str#
stream_ids: Dict[N, str]#
class Operator#

Base class for an operator type.

Subclasses of this must be generated via the operator builder function decorator. See Custom Operators for a tutorial.

Subclasses will contain the specific configuration fields each operator needs.

step_name: str#
step_id: str#
substeps: List[Self]#
ups_names: ClassVar[List[str]]#
dwn_names: ClassVar[List[str]]#
class DataflowId#

Unique ID of a dataflow.

flow_id: str#
class Dataflow#

Dataflow definition.

Once you instantiate this, Use the bytewax.operators (e.g. input) to create Streams.

flow_id: str#
substeps: List[Operator]#
class Stream#
Bases:

Handle to a specific stream of items you can add steps to.

You won’t be instantiating this manually. Use the bytewax.operators (e.g. map, filter, key_on) to create streams.

You can reference this stream multiple times to duplicate the data within.

Operator functions take or return this if they want to create an input or output port.

stream_id: str#
flow() Dataflow#

The containing dataflow.

You might want access to this to add “top level” operators like bytewax.operators.merge.

then(
op_fn: Callable[Concatenate[str, Self, P], R],
step_id: str,
*args: P,
**kwargs: P,
) R#

Chain a new step onto this stream.

This allows you to add intermediate steps to a dataflow without needing to nest operator function calls or make intermediate variables.

The following two dataflow definitions are equivalent:

import bytewax.operators as op
from bytewax.testing import TestingSource
from bytewax.dataflow import Dataflow

flow = Dataflow("map_eg")
s = op.input("inp", flow, TestingSource(range(3)))

def add_one(item):
    return item + 1

s = op.map("add_one", s, add_one)

and

flow = Dataflow("map_eg")
s = op.input("inp", flow, TestingSource(range(3))).then(
    op.map, "add_one", add_one
)

This kind of method chaining is called a “fluent style API”.

Because this style requires a single upstream before the method calling ., this transformation only works for operators that could be called like op_fn(step_id, upstream, ...), like bytewax.operators.map.

Parameters:
  • step_id – Unique ID.

  • op_fn – Operator function. This fluent transformation only works on operators that take a single stream as the second argument.

  • *args – Remaining arguments to pass to op_fn.

  • **kwargs – Remaining arguments to pass to op_fn.

Returns:

op_fun’s return value as if called normally.

Functions#

f_repr(f: Callable) str#

Nicer function repr showing module and line number.

Use this to help with writing easier to debug exceptions in your operators.

The built in repr just shows a memory address.

>>> from bytewax.dataflow import f_repr
>>> def my_f(x):
...     pass
>>> f_repr(my_f)
"<function '....my_f' line 1>"
operator(builder=None, *, _core: bool = False) Callable#

Function decorator to define a new operator.

See Custom Operators for how to use this.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now