bytewax.dataflow#
Data model for dataflows and custom operators.
See bytewax.operators for the built-in operators.
Data#
Classes#
- class Port#
- Bases:
Generic interface to a port.
Either
SinglePortorMultiPort.
- class SinglePort#
A input or output location on an
Operator.You won’t be instantiating this manually. The
operatordecorator will create these for you whenever an operator function takes or returns aStream.
- class MultiPort#
-
A multi-stream input or output location on an
Operator.You won’t be instantiating this manually. The
operatordecorator will create these for you whenever an operator function takes or returns a*argsofStreamor**kwargsofStreams or aMultiStream.
- class Operator#
Base class for an operator type.
Subclasses of this must be generated via the
operatorbuilder function decorator. See thebytewax.dataflowmodule docstring for a tutorial.Subclasses will contain the specific configuration fields each operator needs.
- class Dataflow#
Dataflow definition.
Once you instantiate this, Use the
bytewax.operators(e.g.bytewax.operators.input) to createStreams.
- class Stream#
-
Handle to a specific stream of items you can add steps to.
You won’t be instantiating this manually. Use the
bytewax.operators(e.g.bytewax.operators.map,bytewax.operators.filter,bytewax.operators.key_on) to createStreams.You can reference this stream multiple times to duplicate the data within.
Operator functions take or return this if they want to create an input or output port.
- flow() Dataflow#
The containing
Dataflow.You might want access to this to add “top level” operators like
bytewax.operators.merge_all.merge_all.
- then( ) R#
Chain a new step onto this stream.
This allows you to add intermediate steps to a dataflow without needing to nest operator function calls or make intermediate variables.
The following two dataflow definitions are equivalent:
import bytewax.operators as op from bytewax.testing import run_main, TestingSource from bytewax.dataflow import Dataflow def add_one(item): … return item + 1
flow = Dataflow(“map_eg”) s = op.input(“inp”, flow, TestingSource(range(3))) s = op.map(“add_one”, s, add_one)
and
flow = Dataflow(“map_eg”) s = op.input(“inp”, flow, TestingSource(range(3))).then( … op.map, “add_one”, add_one … )
This kind of method chaining is called a “fluent style API”.
Because this style requires a single upstream before the
., this transformation only works for operators that could be called likeop_fn(step_id, upstream, ...), likebytewax.operators.map. It will not work for operators likebytewax.operators.join_named, since they do not have that shape of function signature.Args: step_id: Unique ID.
op_fn: Operator function. This fluent transformation only works on operators that take a single stream as the second argument. *args: Remaining arguments to pass to `op_fn`. **kwargs: Remaining arguments to pass to `op_fn`.