Custom Operators#
Have you ever wanted to implement particular weird semantic with your operators? Or maybe support something tricky? Or you have a fever and the only prescription is more decorators? Well with this guide you can!
Operator Definition#
You can define new custom operators in terms of already existing
operators. To do this you define an operator function and decorate
it with operator.
from bytewax.dataflow import Stream, operator
import bytewax.operators as op
@operator
def add_to(step_id: str, up: Stream[int], y: int) -> Stream[int]:
return op.map("shim_map", lambda x: x + y)
Each input or output Stream turns into a
Port in the resulting data model.
In order to generate the operator data model, and proper nesting of operators, you must follow a few rules when writing your function:
There must be a
step_id: strargument, even if not used.You must create a custom
dataclassto return multiple values or down streams. You can return a singleStreamorNoneas well if you need a single or no down streams.All arguments, the return value, and return
dataclassfields that areStreams must have type annotations. We recommend annotating all the arguments, the return value, and all fields in a returndataclass.Argument and return
dataclassfield names must not overlap with the names defined on theOperatorbase class.Streams andDataflows must not appear in nested objects: they either can be arguments, the return type directly, or the top-level fields of adataclassthat is the return type; nowhere else.
Docstrings#
A good docstring for a custom operator has a few things:
A one line summary of the operator.
A doctest example using the operator.
Any arguments that are streams describe the required shape of that upstream.
The return streams describe the shape of the data that is being sent downstream.