bytewax.dataflow
#
Data model for dataflows and custom operators.
See bytewax.operators
for the built-in operators.
Data#
Classes#
- class Port#
- Bases:
Generic interface to a port.
Either
SinglePort
orMultiPort
.
- class SinglePort#
A input or output location on an
Operator
.You won’t be instantiating this manually. The
operator
decorator will create these for you whenever an operator function takes or returns aStream
.
- class MultiPort#
-
A multi-stream input or output location on an
Operator
.You won’t be instantiating this manually. The
operator
decorator will create these for you whenever an operator function takes or returns a*args
ofStream
or**kwargs
ofStream
s.
- class Operator#
Base class for an operator type.
Subclasses of this must be generated via the
operator
builder function decorator. See Custom Operators for a tutorial.Subclasses will contain the specific configuration fields each operator needs.
- class Dataflow#
Dataflow definition.
Once you instantiate this, Use the
bytewax.operators
(e.g.input
) to createStream
s.
- class Stream#
-
Handle to a specific stream of items you can add steps to.
You won’t be instantiating this manually. Use the
bytewax.operators
(e.g.map
,filter
,key_on
) to create streams.You can reference this stream multiple times to duplicate the data within.
Operator functions take or return this if they want to create an input or output port.
- flow() Dataflow #
The containing dataflow.
You might want access to this to add “top level” operators like
bytewax.operators.merge
.
- then( ) R #
Chain a new step onto this stream.
This allows you to add intermediate steps to a dataflow without needing to nest operator function calls or make intermediate variables.
The following two dataflow definitions are equivalent:
import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("map_eg") s = op.input("inp", flow, TestingSource(range(3))) def add_one(item): return item + 1 s = op.map("add_one", s, add_one)
and
flow = Dataflow("map_eg") s = op.input("inp", flow, TestingSource(range(3))).then( op.map, "add_one", add_one )
This kind of method chaining is called a “fluent style API”.
Because this style requires a single upstream before the method calling
.
, this transformation only works for operators that could be called likeop_fn(step_id, upstream, ...)
, likebytewax.operators.map
.- Parameters:
step_id – Unique ID.
op_fn – Operator function. This fluent transformation only works on operators that take a single stream as the second argument.
*args – Remaining arguments to pass to
op_fn
.**kwargs – Remaining arguments to pass to
op_fn
.
- Returns:
op_fun
’s return value as if called normally.
Functions#
- f_repr(f: Callable) str #
Nicer function
repr
showing module and line number.Use this to help with writing easier to debug exceptions in your operators.
The built in repr just shows a memory address.
>>> from bytewax.dataflow import f_repr >>> def my_f(x): ... pass >>> f_repr(my_f) "<function '....my_f' line 1>"
- operator(builder=None, *, _core: bool = False) Callable #
Function decorator to define a new operator.
See Custom Operators for how to use this.