Custom Operators#

Have you ever wanted to implement particular weird semantic with your operators? Or maybe support something tricky? Or you have a fever and the only prescription is more decorators? Well with this guide you can!

Operator Definition#

You can define new custom operators in terms of already existing operators. To do this you define an operator function and decorate it with operator.

from bytewax.dataflow import Stream, operator
import bytewax.operators as op

def add_to(step_id: str, up: Stream[int], y: int) -> Stream[int]:
    return"shim_map", lambda x: x + y)

Each input or output Stream turns into a Port in the resulting data model.

In order to generate the operator data model, and proper nesting of operators, you must follow a few rules when writing your function:

  • There must be a step_id: str argument, even if not used.

  • You must create a custom dataclass to return multiple values or down streams. You can return a single Stream or None as well if you need a single or no down streams.

  • All arguments, the return value, and return dataclass fields that are Streams must have type annotations. We recommend annotating all the arguments, the return value, and all fields in a return dataclass.

  • Argument and return dataclass field names must not overlap with the names defined on the Operator base class.

  • Streams and Dataflows must not appear in nested objects: they either can be arguments, the return type directly, or the top-level fields of a dataclass that is the return type; nowhere else.


A good docstring for a custom operator has a few things:

  • A one line summary of the operator.

  • A doctest example using the operator.

  • Any arguments that are streams describe the required shape of that upstream.

  • The return streams describe the shape of the data that is being sent downstream.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now