bytewax.dataflow#

Data model for dataflows and custom operators.

See bytewax.operators for the built-in operators.

Data#

P: ParamSpec#
R: TypeVar#
K: TypeVar#
X_co: TypeVar#
F: TypeVar#

Classes#

class Port#
Bases:

Generic interface to a port.

Either SinglePort or MultiPort.

port_id: str#
stream_ids: Dict[str, str]#
class SinglePort#

A input or output location on an Operator.

You won’t be instantiating this manually. The operator decorator will create these for you whenever an operator function takes or returns a Stream.

port_id: str#
stream_id: str#
property stream_ids: Dict[str, str]#

Allow this to conform to the Port protocol.

class MultiPort#
Bases:

A multi-stream input or output location on an Operator.

You won’t be instantiating this manually. The operator decorator will create these for you whenever an operator function takes or returns a *args of Stream or **kwargs of Streams or a MultiStream.

port_id: str#
stream_ids: Dict[K, str]#
class Operator#

Base class for an operator type.

Subclasses of this must be generated via the operator builder function decorator. See the bytewax.dataflow module docstring for a tutorial.

Subclasses will contain the specific configuration fields each operator needs.

step_name: str#
step_id: str#
substeps: List[Self]#
ups_names: ClassVar[List[str]]#
dwn_names: ClassVar[List[str]]#
class DataflowId#

Unique ID of a dataflow.

flow_id: str#
class Dataflow#

Dataflow definition.

Once you instantiate this, Use the bytewax.operators (e.g. bytewax.operators.input) to create Streams.

flow_id: str#
substeps: List[Operator]#

‘field(…)’

class Stream#
Bases:

Handle to a specific stream of items you can add steps to.

You won’t be instantiating this manually. Use the bytewax.operators (e.g. bytewax.operators.map, bytewax.operators.filter, bytewax.operators.key_on) to create Streams.

You can reference this stream multiple times to duplicate the data within.

Operator functions take or return this if they want to create an input or output port.

stream_id: str#
flow() Dataflow#

The containing Dataflow.

You might want access to this to add “top level” operators like bytewax.operators.merge_all.merge_all.

then(
op_fn: Callable[Concatenate[str, Self, P], R],
step_id: str,
*args: P,
**kwargs: P,
) R#

Chain a new step onto this stream.

This allows you to add intermediate steps to a dataflow without needing to nest operator function calls or make intermediate variables.

The following two dataflow definitions are equivalent:

import bytewax.operators as op from bytewax.testing import run_main, TestingSource from bytewax.dataflow import Dataflow def add_one(item): … return item + 1

flow = Dataflow(“map_eg”) s = op.input(“inp”, flow, TestingSource(range(3))) s = op.map(“add_one”, s, add_one)

and

flow = Dataflow(“map_eg”) s = op.input(“inp”, flow, TestingSource(range(3))).then( … op.map, “add_one”, add_one … )

This kind of method chaining is called a “fluent style API”.

Because this style requires a single upstream before the ., this transformation only works for operators that could be called like op_fn(step_id, upstream, ...), like bytewax.operators.map. It will not work for operators like bytewax.operators.join_named, since they do not have that shape of function signature.

Args: step_id: Unique ID.

op_fn: Operator function. This fluent transformation only
  works on operators that take a single stream as the
  second argument.

*args: Remaining arguments to pass to `op_fn`.

**kwargs: Remaining arguments to pass to `op_fn`.

Functions#

f_repr(f: Callable) str#

Nicer repr for functions with the defining module and line.

Use this to help with writing easier to debug exceptions in your operators.

The built in repr just shows a memory address.

def my_f(x): … pass f_repr(my_f) “<function ‘bytewax.dataflow.my_f’ line 1>”

operator(builder=None, *, _core: bool = False) Callable#

Function decorator to define a new operator.

See bytewax.dataflow module docstring for how to use this.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now