Simple Example#
Let’s write our first Bytewax dataflow. Be sure that you’ve followed the instructions for installing Bytewax in Installing.
Imports#
Our dataflow starts with a few imports, so let’s create those now.
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
Dataflow#
To begin, create a Dataflow
instance in a
variable named flow
. This defines the empty dataflow we’ll add steps
to.
flow = Dataflow("a_simple_example")
Input#
Every dataflow requires input. For this example, we’ll use the
input
operator together with a
TestingSource
to produce a stream of
int
s.
stream = op.input("input", flow, TestingSource(range(10)))
The input
operator returns a
Stream
of values. If you are using an
editor with a language server setup with type hints, you
can see that the return type is a Stream[int]
containing integers.
Operators#
Each operator method will return a new
Stream
with the results of the step which
you can call more operators on. Let’s use the
map
operator to double each number from
our input stream.
def times_two(inp: int) -> int:
return inp * 2
double = op.map("double", stream, times_two)
Output#
Finally, let’s add an output step. At least one
input
step and one
output
step are required on every
dataflow. We’ll have our output directed to standard out using the
StdOutSink
.
op.output("out", double, StdOutSink())
Running#
When writing Bytewax dataflows for production use, you should run your
dataflow using the bytewax.run
module. Let’s see an example
that does just that.
To begin, save the following code in a file called basic.py
.
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
flow = Dataflow("a_simple_example")
stream = op.input("input", flow, TestingSource(range(10)))
def times_two(inp: int) -> int:
return inp * 2
double = op.map("double", stream, times_two)
op.output("out", double, StdOutSink())
To run the dataflow, use the following command:
$ python -m bytewax.run basic
0
2
4
6
8
10
12
14
16
18
The first argument passed to the bytewax.run
module is a
dataflow getter string. Because we saved our
Dataflow
definition in a variable named
flow
we don’t need to supply it when running our dataflow from the
command line.
Note that just executing the Python file will not run it! You must
use the bytewax.run
script and its options so it can set up
the runtime correctly.