bytewax.operators
#
Built-in operators.
See getting-started
for the basics of building and running
dataflows.
Submodules#
Data#
- JoinInsertMode: TypeAlias#
How to handle multiple values from a side during a join.
First: Emit a row containing only the first value from a side, if any.
Last: Emit a row containing only the last value from a side, if any.
Product: Emit a row with every combination of values for all sides. This is similar to a SQL cross join.
- JoinEmitMode: TypeAlias#
When should a join emit rows downstream.
Complete: Emit once a value has been seen from each side. Then discard the state.
Final: Emit when the upstream is EOF or the window closes. Then discard the state.
This mode only works on finite data streams and only returns a result once the upstream is EOF. You’ll need to use a different mode on infinite data.
Running: Emit every time a new value is seen on any side. Retain the state forever.
Classes#
- class StatefulBatchLogic#
-
Abstract class to define a
stateful
operator.# This dataflow will create the cumulative # sum of values for each key. from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.operators import StatefulBatchLogic class SumLogic(StatefulBatchLogic): def __init__(self): self.sum = 0 def on_batch(self, batch): for event in batch: key = event['key'] self.sum += event['val'] print(f"Current sum for key {key}: {self.sum}") print("End of batch\n") # Return the cumulative sum wrapped # in an iterable and False to retain the logic return [(key, self.sum)], False def snapshot(self): # Return the current state to be saved return self.sum def restore(self, snapshot): # Restore the state from the snapshot self.sum = snapshot source = [ {"key": "a", "val": 1}, {"key": "b", "val": 2}, {"key": "a", "val": 3}, {"key": "b", "val": 4}, ] flow = Dataflow("stateful_batch_eg") nums = op.input("nums", flow, TestingSource(source)) # note - this step creates a tuple of the form # ("a", {"key": "a", "val": 1}) # this is a pre-requisite for statetul operators # such as stateful_batch key_on = op.key_on("keys", nums, lambda x: x['key']) op.inspect("mapping_items", key_on) stateful = op.stateful_batch("stateful_batch", key_on, lambda _: SumLogic()) op.inspect("out", stateful)
stateful_batch_eg.mapping_items: ('a', {'key': 'a', 'val': 1}) Current sum for key a: 1 End of batch stateful_batch_eg.out: ('a', ('a', 1)) stateful_batch_eg.mapping_items: ('b', {'key': 'b', 'val': 2}) Current sum for key b: 2 End of batch stateful_batch_eg.out: ('b', ('b', 2)) stateful_batch_eg.mapping_items: ('a', {'key': 'a', 'val': 3}) Current sum for key a: 4 End of batch stateful_batch_eg.out: ('a', ('a', 4)) stateful_batch_eg.mapping_items: ('b', {'key': 'b', 'val': 4}) Current sum for key b: 6 End of batch stateful_batch_eg.out: ('b', ('b', 6))
The operator will call these methods in order:
on_batch
once with all items queued, thenon_notify
if the notification time has passed, thenon_eof
if the upstream is EOF and no new items will be received this execution. If the logic is retained after all the above calls thennotify_at
will be called.snapshot
is periodically called.- RETAIN: bool#
This logic should be retained after this returns.
If you always return this, this state will never be deleted and if your key-space grows without bound, your memory usage will also grow without bound.
- abstract on_batch( ) Tuple[Iterable[W], bool] #
Called on each new upstream item.
This will be called multiple times in a row if there are multiple items from upstream.
- Parameters:
value – The value of the upstream
(key, value)
.- Returns:
A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in
(key, value)
automatically.
- on_notify() Tuple[Iterable[W], bool] #
Called when the scheduled notification time has passed.
Defaults to emitting nothing and retaining the logic.
- Returns:
A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in
(key, value)
automatically.
- on_eof() Tuple[Iterable[W], bool] #
The upstream has no more items on this execution.
This will only be called once per awake after
on_batch
is called.Defaults to emitting nothing and retaining the logic.
- Returns:
2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in
(key, value)
automatically.
- notify_at() Optional[datetime] #
Return the next notification time.
This will be called once right after the logic is built, and if any of the
on_*
methods were called if the logic was retained.This must always return the next notification time. The operator only stores a single next time, so if there are a series of times you would like to notify at, store all of them but only return the soonest.
Defaults to returning
None
.- Returns:
Scheduled time. If
None
, noon_notify
callback will occur.
- abstract snapshot() S #
Return a immutable copy of the state for recovery.
This will be called periodically by the runtime.
The value returned here will be passed back to the
builder
function ofstateful
when resuming.The state must be
pickle
-able.Danger
The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must
copy.deepcopy
or something equivalent before returning it here.- Returns:
The immutable state to be
pickle
d.
- class StatefulLogic#
-
Abstract class to define a
stateful
operator.The operator will call these methods in order:
on_item
once for any items queued, thenon_notify
if the notification time has passed, thenon_eof
if the upstream is EOF and no new items will be received this execution. If the logic is retained after all the above calls thennotify_at
will be called.snapshot
is periodically called.- RETAIN: bool#
This logic should be retained after this returns.
If you always return this, this state will never be deleted and if your key-space grows without bound, your memory usage will also grow without bound.
- abstract on_item(
- value: V,
Called on each new upstream item.
This will be called multiple times in a row if there are multiple items from upstream.
- Parameters:
value – The value of the upstream
(key, value)
.- Returns:
A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in
(key, value)
automatically.
- on_notify() Tuple[Iterable[W], bool] #
Called when the scheduled notification time has passed.
- Returns:
A 2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in
(key, value)
automatically.
- on_eof() Tuple[Iterable[W], bool] #
The upstream has no more items on this execution.
This will only be called once per execution after
on_item
is done being called.- Returns:
2-tuple of: any values to emit downstream and wheither to discard this logic. Values will be wrapped in
(key, value)
automatically.
- notify_at() Optional[datetime] #
Return the next notification time.
This will be called once right after the logic is built, and if any of the
on_*
methods were called if the logic was retained.This must always return the next notification time. The operator only stores a single next time, so if
- Returns:
Scheduled time. If
None
, noon_notify
callback will occur.
- abstract snapshot() S #
Return a immutable copy of the state for recovery.
This will be called periodically by the runtime.
The value returned here will be passed back to the
builder
function ofstateful
when resuming.The state must be
pickle
-able.Danger
The state must be effectively immutable! If any of the other functions in this class might be able to mutate the state, you must
copy.deepcopy
or something equivalent before returning it here.- Returns:
The immutable state to be
pickle
d.
Functions#
- branch( ) BranchOut #
Divide items into two streams with a predicate.
import bytewax.operators as op from bytewax.dataflow import Dataflow from bytewax.testing import run_main, TestingSource flow = Dataflow("branch_eg") nums = op.input("nums", flow, TestingSource([1, 2, 3, 4, 5])) b_out = op.branch("even_odd", nums, lambda x: x % 2 == 0) evens = b_out.trues odds = b_out.falses _ = op.inspect("evens", evens) _ = op.inspect("odds", odds)
branch_eg.odds: 1 branch_eg.evens: 2 branch_eg.odds: 3 branch_eg.evens: 4 branch_eg.odds: 5
- Parameters:
step_id – Unique ID.
up – Stream to divide.
predicate –
Function to call on each upstream item. Items for which this returns
True
will be put into one branch stream;False
the other branch stream.If this function is a
typing.TypeGuard
, the downstreams will be properly typed.
- Returns:
A stream of items for which the predicate returns
True
, and a stream of items for which the predicate returnsFalse
.
- flat_map_batch( ) Stream[Y] #
Transform an entire batch of items 1-to-many.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("flat_map_batch_eg") numbers = numbers = op.input("nums", flow, TestingSource([[1, 2], [3]])) def batch_mapper(batch): return [x * 10 for x in batch] flat_mapped = op.flat_map_batch("batch_flat_map", numbers, batch_mapper) op.inspect("out", flat_mapped)
flat_map_batch_eg.out: [1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2] flat_map_batch_eg.out: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
The batch size received here depends on the exact behavior of the upstream input sources and operators. It should be used as a performance optimization when processing multiple items at once has much reduced overhead.
See also the
batch_size
parameter on various input sources.See also the
collect
operator, which collects multiple items next to each other in a stream into a single list of them flowing through the stream.- Parameters:
step_id – Unique ID.
up – Stream.
mapper – Called once with each batch of items the runtime receives. Returns the items to emit downstream.
- Returns:
A stream of each item returned by the mapper.
- input( ) Stream[X] #
Introduce items into a dataflow.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("input_eg") nums = op.input("nums", flow, TestingSource([10, 20, 30])) op.inspect("out", nums)
input_eg.out: 10 input_eg.out: 20 input_eg.out: 30
See
bytewax.inputs
for more information on how input works. Seebytewax.connectors
for a buffet of our built-in connector types.
- inspect_debug(
- step_id: str,
- up: Stream[X],
- inspector: Callable[[str, X, int, int], None] = _default_debug_inspector,
Observe items, their worker, and their epoch for debugging.
import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("inspect_debug_eg") s = op.input("inp", flow, TestingSource(range(3))) _ = op.inspect_debug("help", s)
inspect_debug_eg.help W0 @1: 0 inspect_debug_eg.help W0 @1: 1 inspect_debug_eg.help W0 @1: 2
- Parameters:
step_id – Unique ID.
up – Stream.
inspector – Called with the step ID, each item in the stream, the epoch of that item, and the worker processing the item. Defaults to printing out all the arguments.
- Returns:
The upstream unmodified.
- merge( ) Stream[Any] #
Combine multiple streams together.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("merge_eg") nums1 = op.input("nums1", flow, TestingSource([1, 2])) nums2 = op.input("nums2", flow, TestingSource([3, 4])) merged = op.merge("merged", nums1, nums2) op.inspect("out", merged)
merge_eg.out: 1 merge_eg.out: 3 merge_eg.out: 2 merge_eg.out: 4
- Parameters:
step_id – Unique ID.
*ups – Streams.
- Returns:
A single stream of the same type as all the upstreams with items from all upstreams merged into it unmodified.
- output( ) None #
Write items out of a dataflow.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.connectors.stdio import StdOutSink flow = Dataflow("output_eg") nums = op.input("nums", flow, TestingSource([1, 2, 3])) # This will print the items to stdout. # You can replace this with any other sink. op.output("sink", nums, StdOutSink())
1 2 3
See
bytewax.outputs
for more information on how output works. Seebytewax.connectors
for a buffet of our built-in connector types.- Parameters:
step_id – Unique ID.
up – Stream of items to write. See your specific
Sink
documentation for the required type of those items.sink – Write items to.
- redistribute( ) Stream[X] #
Redistributes data across workers in a Bytewax dataflow.
The
{py:obj}~bytewax.operators.redistribute
operator is useful for redistributing work to achieve better parallelization in a distributed dataflow. It moves each incoming item to a random worker to balance the workload, especially in cases where a prior step concentrates data on only a few workers, leading to poor utilization of CPU resources across the cluster.Bytewax will exchange an item between workers before stateful steps to ensure correctness, but stateless operators like
{py:obj}~bytewax.operators.filter
are run on all workers without any data exchange before or after they are executed. This means that without redistribution, certain CPU-intensive steps may run only on a subset of workers if previous steps concentrated items on just a few workers.Use cases for
redistribute
:Good Use: When you have an IO-bound or CPU-heavy workload that needs to be distributed across multiple workers in a cluster, such as a network request or CPU-bound tasks on a machine with multiple workers and CPU cores.
Bad Use: If the operation you want to parallelize is already fast or the workload already spawns enough threads to use all available cores, redistributing can introduce unnecessary overhead and regress performance.
IMPORTANT
redistribute
only helps increase utilization when placed immediately before stateless operators, e.g.{py:obj}~bytewax.operators.map
,{py:obj}~bytewax.operators.filter
,{py:obj}~bytewax.operators.flat_map
, etc. It has no effect or a detrimental effect when placed immediately before stateful operators, e.g.{py:obj}~bytewax.operators.stateful_map
,{py:obj}~bytewax.operators.copllect_window
, etc.- Parameters:
step_id – Unique ID.
up – Stream.
- Returns:
Stream unmodified.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("redistribute_eg") nums = op.input("nums", flow, TestingSource([1, 2, 3, 4, 5])) redistributed = op.redistribute("redistribute", nums) op.inspect_debug("out", redistributed)
In this example, if you run it with one worker, all the items will be processed by that single worker, assuming your file is named
redistribute_eg.py
python -m bytewax.run redistribute:flow
Output:
redistribute_eg.out W0 @1: 1 redistribute_eg.out W0 @1: 2 redistribute_eg.out W0 @1: 3 redistribute_eg.out W0 @1: 4 redistribute_eg.out W0 @1: 5
However, if you run it with two workers, the items will be distributed randomly across the two workers:
python -m bytewax.run -w2 redistribute:flow redistribute_eg.inspect W0 @1: 1 redistribute_eg.inspect W1 @1: 2 redistribute_eg.inspect W1 @1: 4 redistribute_eg.inspect W0 @1: 3 redistribute_eg.inspect W0 @1: 5
- stateful_batch(
- step_id: str,
- up: KeyedStream[V],
- builder: Callable[[Optional[S]], StatefulBatchLogic[V, W, S]],
Advanced generic stateful operator.
# This dataflow will create the cumulative # sum of values for each key. from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.operators import StatefulBatchLogic class SumLogic(StatefulBatchLogic): def __init__(self): self.sum = 0 def on_batch(self, batch): for event in batch: key = event['key'] self.sum += event['val'] print(f"Current sum for key {key}: {self.sum}") print("End of batch\n") # Return the cumulative sum wrapped # in an iterable and False to retain the logic return [(key, self.sum)], False def snapshot(self): # Return the current state to be saved return self.sum def restore(self, snapshot): # Restore the state from the snapshot self.sum = snapshot source = [ {"key": "a", "val": 1}, {"key": "b", "val": 2}, {"key": "a", "val": 3}, {"key": "b", "val": 4}, ] flow = Dataflow("stateful_batch_eg") nums = op.input("nums", flow, TestingSource(source)) # note - this step creates a tuple of the form # ("a", {"key": "a", "val": 1}) # this is a pre-requisite for statetul operators # such as stateful_batch key_on = op.key_on("keys", nums, lambda x: x['key']) op.inspect("mapping_items", key_on) stateful = op.stateful_batch("stateful_batch", key_on, lambda _: SumLogic()) op.inspect("out", stateful)
stateful_batch_eg.mapping_items: ('a', {'key': 'a', 'val': 1}) Current sum for key a: 1 End of batch stateful_batch_eg.out: ('a', ('a', 1)) stateful_batch_eg.mapping_items: ('b', {'key': 'b', 'val': 2}) Current sum for key b: 2 End of batch stateful_batch_eg.out: ('b', ('b', 2)) stateful_batch_eg.mapping_items: ('a', {'key': 'a', 'val': 3}) Current sum for key a: 4 End of batch stateful_batch_eg.out: ('a', ('a', 4)) stateful_batch_eg.mapping_items: ('b', {'key': 'b', 'val': 4}) Current sum for key b: 6 End of batch stateful_batch_eg.out: ('b', ('b', 6))
This is the lowest-level operator Bytewax provides and gives you full control over all aspects of the operator processing and lifecycle. Usualy you will want to use a higher-level operator than this.
Subclass
StatefulBatchLogic
to define its behavior. See documentation there.- Parameters:
step_id – Unique ID.
up – Keyed stream.
builder – Called whenever a new key is encountered with the resume state returned from
StatefulBatchLogic.snapshot
for this key, if any. This should close over any non-state configuration and combine it with the resume state to return the preparedStatefulBatchLogic
for the new key.
- Returns:
Keyed stream of all items returned from
StatefulBatchLogic.on_batch
,StatefulBatchLogic.on_notify
, andStatefulBatchLogic.on_eof
.
- stateful(
- step_id: str,
- up: KeyedStream[V],
- builder: Callable[[Optional[S]], StatefulLogic[V, W, S]],
Advanced generic stateful operator.
This is a low-level operator Bytewax provides and gives you control over most aspects of the operator processing and lifecycle. Usualy you will want to use a higher-level operator than this. Also see
stateful_batch
for even more control.Subclass
StatefulLogic
to define its behavior. See documentation there.- Parameters:
step_id – Unique ID.
up – Keyed stream.
builder – Called whenver a new key is encountered with the resume state returned from
StatefulLogic.snapshot
for this key, if any. This should close over any non-state configuration and combine it with the resume state to return the preparedStatefulLogic
for the new key.
- Returns:
Keyed stream of all items returned from
StatefulLogic.on_item
,StatefulLogic.on_notify
, andStatefulLogic.on_eof
.
- collect(
- step_id: str,
- up: KeyedStream[V],
- timeout: timedelta,
- max_size: int,
Collect items into a list up to a size or a timeout.
# This dataflow will collect items into lists of size 2 or # every second, whichever comes first. from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource from datetime import timedelta flow = Dataflow("collect_eg") source = [ {"key": "a", "val": 1}, {"key": "b", "val": 2}, {"key": "a", "val": 3}, {"key": "b", "val": 4}, {"key": "a", "val": 5}, {"key": "b", "val": 6}, {"key": "a", "val": 7}, {"key": "b", "val": 8}, ] nums = op.input("nums", flow, TestingSource(source)) keyed = op.key_on("key", nums, lambda x: x['key']) collected = op.collect("collect", keyed, timedelta(seconds=1), max_size=2) op.inspect("out", collected)
collect_eg.out: ('a', [{'key': 'a', 'val': 1}, {'key': 'a', 'val': 3}]) collect_eg.out: ('b', [{'key': 'b', 'val': 2}, {'key': 'b', 'val': 4}]) collect_eg.out: ('a', [{'key': 'a', 'val': 5}, {'key': 'a', 'val': 7}]) collect_eg.out: ('b', [{'key': 'b', 'val': 6}, {'key': 'b', 'val': 8}])
See
bytewax.operators.windowing.collect_window
for more control over time.- Parameters:
step_id – Unique ID.
up – Stream of individual items.
timeout – Timeout before emitting the list, even if
max_size
was not reached.max_size – Emit the list once it reaches this size, even if
timeout
was not reached.
- Returns:
A stream of upstream items gathered into lists.
- count_final( ) KeyedStream[int] #
Count the number of occurrences of items in the entire stream.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("count_final_eg") source = ["apple", "banana", "apple", "banana", "banana"] words = op.input("words", flow, TestingSource(source)) counted = op.count_final("count", words, key=lambda x: x) op.inspect("out", counted)
count_final_eg.out: ('apple', 2) count_final_eg.out: ('banana', 3)
This only works on finite data streams and only return counts once the upstream is EOF. You’ll need to use
bytewax.operators.windowing.count_window
on infinite data.- Parameters:
step_id – Unique ID.
up – Stream of items to count.
key – Function to convert each item into a string key. The counting machinery does not compare the items directly, instead it groups by this string key.
- Returns:
A stream of
(key, count)
. Only once the upstream is EOF.
- enrich_cached(
- step_id: str,
- up: Stream[X],
- getter: Callable[[DK], DV],
- mapper: Callable[[TTLCache[DK, DV], X], Y],
- ttl: timedelta = timedelta.max,
- _now_getter: Callable[[], datetime] = _get_system_utc,
Enrich / join items using a cached lookup.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource def mock_service(key): return {"a": 10, "b": 20, "c": 30}.get(key) flow = Dataflow("enrich_cached_eg") keys = op.input("keys", flow, TestingSource(["a", "b", "a", "c", "a", "a"])) def enrich_with_service(cache, item): value = cache.get(item) return {"key": item, "value": value} enriched = op.enrich_cached("enrich", keys, mock_service, enrich_with_service) op.inspect("out", enriched)
enrich_cached_eg.out: {'key': 'a', 'value': 10} enrich_cached_eg.out: {'key': 'b', 'value': 20} enrich_cached_eg.out: {'key': 'a', 'value': 10} enrich_cached_eg.out: {'key': 'c', 'value': 30} enrich_cached_eg.out: {'key': 'a', 'value': 10} enrich_cached_eg.out: {'key': 'a', 'value': 10}
Use this if you’d like to join items in the dataflow with unsychronized pulled / polled data from an external service. This assumes that the joined data is static and will not emit updates. Since there is no integration with the recovery system, it’s possible that results will change across resumes if the joined data source changes.
The joined data is cached by a key you specify.
import bytewax.operators as op from bytewax.dataflow import Dataflow from bytewax.testing import TestingSource, run_main def query_icon_url_service(code): if code == "dog_ico": return "http://domain.invalid/static/dog_v1.png" elif code == "cat_ico": return "http://domain.invalid/static/cat_v2.png" elif code == "rabbit_ico": return "http://domain.invalid/static/rabbit_v1.png" flow = Dataflow("param_eg") inp = op.input( "inp", flow, TestingSource( [ {"user_id": "1", "avatar_icon_code": "dog_ico"}, {"user_id": "3", "avatar_icon_code": "rabbit_ico"}, {"user_id": "2", "avatar_icon_code": "dog_ico"}, ] ), ) op.inspect("check_inp", inp) def icon_code_to_url(cache, msg): code = msg.pop("avatar_icon_code") msg["avatar_icon_url"] = cache.get(code) return msg with_urls = op.enrich_cached( "with_url", inp, query_icon_url_service, icon_code_to_url, ) op.inspect("check_with_url", with_urls)
If you have a join source which is push-based or need to emit updates when either side of the join changes, instead consider having that be a second
input
to the dataflow and using a runningjoin
. This reduces cache misses and startup overhead.Each worker will keep a local cache of values. There is no max size.
You can also use a
map
step in the same way to manage the cache yourself manually.- Parameters:
step_id – Unique ID.
up – Stream.
getter – On cache miss, get the new updated value for a key.
mapper – Called on each item with access to the cache. Each return value is emitted downstream.
ttl – Re-get values in the cache that are older than this.
- Returns:
A stream of items returned by the mapper.
- flat_map( ) Stream[Y] #
Transform items one-to-many.
This is like a combination of
map
andflatten
.It is commonly used for:
Tokenizing
Flattening hierarchical objects
Breaking up aggregations for further processing
import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("flat_map_eg") inp = ["hello world"] s = op.input("inp", flow, TestingSource(inp)) def split_into_words(sentence): return sentence.split() s = op.flat_map("split_words", s, split_into_words) _ = op.inspect("out", s)
flat_map_eg.out: 'hello' flat_map_eg.out: 'world'
- Parameters:
step_id – Unique ID.
up – Stream.
mapper – Called once on each upstream item. Returns the items to emit downstream.
- Returns:
A stream of each item returned by the mapper.
- flat_map_value( ) KeyedStream[W] #
Transform values one-to-many.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("flat_map_value_eg") source = [("key1", "hello world"), ("key2", "hi")] inp = op.input("inp", flow, TestingSource(source)) def split_words(value): return value.split() flat_mapped = op.flat_map_value("split_words", inp, split_words) op.inspect("out", flat_mapped)
flat_map_value_eg.out: ('key1', 'hello') flat_map_value_eg.out: ('key1', 'world') flat_map_value_eg.out: ('key2', 'hi')
- Parameters:
step_id – Unique ID.
up – Keyed stream.
mapper – Called once on each upstream value. Returns the values to emit downstream.
- Returns:
A keyed stream of each value returned by the mapper.
- flatten( ) Stream[X] #
Move all sub-items up a level.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("flatten_eg") inp = op.input("inp", flow, TestingSource([[1, 2], [3, 4, 5]])) flattened = op.flatten("flatten", inp) op.inspect("out", flattened)
flatten_eg.out: 1 flatten_eg.out: 2 flatten_eg.out: 3 flatten_eg.out: 4 flatten_eg.out: 5
- Parameters:
step_id – Unique ID.
up – Stream of iterables.
- Returns:
A stream of the items within each iterable in the upstream.
- filter( ) Stream[X] #
Keep only some items.
It is commonly used for:
Selecting relevant events
Removing empty events
Removing sentinels
Removing stop words
import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("filter_eg") s = op.input("inp", flow, TestingSource(range(4))) def is_odd(item): return item % 2 != 0 s = op.filter("filter_odd", s, is_odd) _ = op.inspect("out", s)
filter_eg.out: 1 filter_eg.out: 3
- Parameters:
step_id – Unique ID.
up – Stream.
predicate – Called with each upstream item. Only items for which this returns true
True
will be emitted downstream.
- Returns:
A stream with only the upstream items for which the predicate returns
True
.
- filter_value( ) KeyedStream[V] #
Selectively keep only some items from a keyed stream.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("filter_value_eg") source = [("key1", 1), ("key2", 2), ("key3", 3), ("key4", 4), ("key5", 5)] inp = op.input("inp", flow, TestingSource(source)) filtered = op.filter_value("filter_odd", inp, lambda x: x % 2 != 0) op.inspect("out", filtered)
filter_value_eg.out: ('key1', 1) filter_value_eg.out: ('key3', 3) filter_value_eg.out: ('key5', 5)
- Parameters:
step_id – Unique ID.
up – Keyed stream.
predicate – Will be called with each upstream value. Only values for which this returns
True
will be emitted downstream.
- Returns:
A keyed stream with only the upstream pairs for which the predicate returns
True
.
- filter_map( ) Stream[Y] #
A one-to-maybe-one transformation of items.
This is like a combination of
map
and thenfilter
with a predicate removingNone
values.import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("filter_map_eg") s = op.input( "inp", flow, TestingSource( [ {"key": "a", "val": 1}, {"bad": "obj"}, ] ), ) def validate(data): if type(data) != dict or "key" not in data: return None else: return data["key"], data s = op.filter_map("validate", s, validate) _ = op.inspect("out", s)
filter_map_eg.out: ('a', {'key': 'a', 'val': 1})
- Parameters:
step_id – Unique ID.
up – Stream.
mapper – Called on each item. Each return value is emitted downstream, unless it is
None
.
- Returns:
A stream of items returned from
mapper
, unless it isNone
.
- filter_map_value( ) KeyedStream[W] #
Transform values one-to-maybe-one.
This is like a combination of
map_value
and thenfilter_value
with a predicate removingNone
values.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("filter_map_value_eg") source = [("key1", 1), ("key2", "2"), ("key3", 2)] inp = op.input("inp", flow, TestingSource(source)) def to_even_str(val): if isinstance(val, int) and val % 2 == 0: return str(val) return None filtered_mapped = op.filter_map_value("filter_map_value", inp, to_even_str) op.inspect("out", filtered_mapped)
filter_map_value_eg.out: ('key3', '2')
- Parameters:
step_id – Unique ID.
up – Stream.
mapper – Called on each value. Each return value is emitted downstream, unless it is
None
.
- Returns:
A keyed stream of values returned from the mapper, unless the value is
None
. The key is unchanged.
- fold_final( ) KeyedStream[S] #
Build an empty accumulator, then combine values into it.
This only works on finite data streams and only returns a result once the upstream is EOF. You’ll need to use
bytewax.operators.windowing.fold_window
orbytewax.operators.stateful_flat_map
on infinite data.It is like
reduce_final
but uses a function to build the initial value.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("fold_final_eg") source = [("key1", 1), ("key1", 2), ("key2", 3), ("key2",5)] inp = op.input("inp", flow, TestingSource(source)) def build_accumulator(): return 0 def folder(acc, val): return acc + val folded = op.fold_final("fold", inp, build_accumulator, folder) op.inspect("out", folded)
fold_final_eg.out: ('key1', 3) fold_final_eg.out: ('key2', 8)
- Parameters:
step_id – Unique ID.
up – Keyed stream.
builder – Called the first time a key appears and is expected to return the empty accumulator for that key.
folder – Combines a new value into an existing accumulator and returns the updated accumulator. The accumulator is initially the empty accumulator.
- Returns:
A keyed stream of the accumulators. Only once the upstream is EOF.
- inspect( ) Stream[X] #
Observe items for debugging.
import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("my_flow") s = op.input("inp", flow, TestingSource(range(3))) _ = op.inspect("help", s)
my_flow.help: 0 my_flow.help: 1 my_flow.help: 2
- Parameters:
step_id – Unique ID.
up – Stream.
inspector – Called with the step ID and each item in the stream. Defaults to printing the step ID and each item.
- Returns:
The upstream unmodified.
- join(
- step_id: str,
- *sides: KeyedStream[Any],
- insert_mode: JoinInsertMode = 'last',
- emit_mode: JoinEmitMode = 'complete',
Gather together the value for a key on multiple streams.
See Joins for more information.
- Parameters:
step_id – Unique ID.
*sides – Keyed streams.
insert_mode – Mode of this join. See
JoinInsertMode
for more info. Defaults to"last"
.emit_mode – Mode of this join. See
JoinEmitMode
for more info. Defaults to"complete"
.
- Returns:
Emits a tuple with the value from each stream in the order of the argument list. See
JoinEmitMode
for when tuples are emitted.
- key_on( ) KeyedStream[X] #
Add a key for each item.
This allows you to use all the keyed operators that require the upstream to be a
KeyedStream
.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource from datetime import timedelta flow = Dataflow("collect_eg") source = [ {"key": "a", "val": 1}, {"key": "b", "val": 2} ] nums = op.input("nums", flow, TestingSource(source)) keyed = op.key_on("key", nums, lambda x: x['key']) op.inspect("out", keyed)
collect_eg.out: ('a', {'key': 'a', 'val': 1}) collect_eg.out: ('b', {'key': 'b', 'val': 2})
- Parameters:
step_id – Unique ID.
up – Stream.
key – Called on each item and should return the key for that item.
- Returns:
A stream of 2-tuples of
(key, item)
AKA a keyed stream. The keys come from the return value of thekey
function; upstream items will automatically be attached as values.
- key_rm(
- step_id: str,
- up: KeyedStream[X],
Discard keys.
KeyedStream
s are 2-tuples of(key, value)
. This will discard the key so you just have the values if you don’t need the keys anymore.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource from datetime import timedelta flow = Dataflow("collect_eg") source = [ {"key": "a", "val": 1}, {"key": "b", "val": 2} ] nums = op.input("nums", flow, TestingSource(source)) keyed = op.key_on("key", nums, lambda x: x['key']) op.inspect("with_key", keyed) remove_key = op.key_rm("no_key", keyed) op.inspect("without_key", remove_key)
collect_eg.with_key: ('a', {'key': 'a', 'val': 1}) collect_eg.without_key: {'key': 'a', 'val': 1} collect_eg.with_key: ('b', {'key': 'b', 'val': 2}) collect_eg.without_key: {'key': 'b', 'val': 2}
- Parameters:
step_id – Unique ID.
up – Keyed stream.
- Returns:
A stream of just values.
- map( ) Stream[Y] #
Transform items one-by-one.
It is commonly used for:
Serialization and deserialization.
Selection of fields.
import bytewax.operators as op from bytewax.testing import TestingSource from bytewax.dataflow import Dataflow flow = Dataflow("map_eg") s = op.input("inp", flow, TestingSource(range(3))) def add_one(item): return item + 10 s = op.map("add_one", s, add_one) _ = op.inspect("out", s)
map_eg.out: 10 map_eg.out: 11 map_eg.out: 12
- Parameters:
step_id – Unique ID.
up – Stream.
mapper – Called on each item. Each return value is emitted downstream.
- Returns:
A stream of items returned from the mapper.
- map_value( ) KeyedStream[W] #
Transform values one-by-one.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("map_eg") nums = op.input("nums", flow, TestingSource([1, 2, 3])) mapped = op.map("double", nums, lambda x: x * 2) op.inspect("out", mapped)
map_eg.out: 2 map_eg.out: 4 map_eg.out: 6
- Parameters:
step_id – Unique ID.
up – Keyed stream.
mapper – Called on each value. Each return value is emitted downstream.
- Returns:
A keyed stream of values returned from the mapper. The key is unchanged.
- max_final(
- step_id: str,
- up: KeyedStream[V],
- by=_identity,
Find the maximum value for each key.
This only works on finite data streams and only returns a result once the upstream is EOF. You’ll need to use
bytewax.operators.windowing.max_window
on infinite data.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("max_final_eg") source = [("key1", 1), ("key1", 3), ("key2", 2), ("key2",19)] inp = op.input("inp", flow, TestingSource(source)) max_val = op.max_final("max", inp) op.inspect("out", max_val)
max_final_eg.out: ('key1', 3) max_final_eg.out: ('key2', 19)
- Parameters:
step_id – Unique ID.
up – Keyed stream.
by – A function called on each value that is used to extract what to compare.
- Returns:
A keyed stream of the max values. Only once the upstream is EOF.
- min_final(
- step_id: str,
- up: KeyedStream[V],
- by=_identity,
Find the minimum value for each key.
This only works on finite data streams and only returns a result once the upstream is EOF. You’ll need to use
bytewax.operators.windowing.min_window
on infinite data.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("max_final_eg") source = [("key1", 1), ("key1", 3), ("key2", 2), ("key2",19)] inp = op.input("inp", flow, TestingSource(source)) min_val = op.min_final("min", inp) op.inspect("out", min_val)
max_final_eg.out: ('key1', 1) max_final_eg.out: ('key2', 2)
- Parameters:
step_id – Unique ID.
up – Keyed stream.
by – A function called on each value that is used to extract what to compare.
- Returns:
A keyed stream of the min values. Only once the upstream is EOF.
- raises(step_id: str, up: Stream[Any]) None #
Raise an exception and crash the dataflow on any item.
- Parameters:
step_id – Unique ID.
up – Any item on this stream will throw a
RuntimeError
.
- reduce_final( ) KeyedStream[V] #
Distill all values for a key down into a single value.
This only works on finite data streams and only returns a result once the upstream is EOF. You’ll need to use
bytewax.operators.windowing.reduce_window
orbytewax.operators.stateful_flat_map
on infinite data.It is like
fold_final
but the first value is the initial accumulator.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource flow = Dataflow("reduce_final_eg") inp = op.input("inp", flow, TestingSource([("key1", 1), ("key1", 2), ("key2", 3)])) reduced = op.reduce_final("sum", inp, lambda acc, x: acc + x) op.inspect("out", reduced)
reduce_final_eg.out: ('key1', 3) reduce_final_eg.out: ('key2', 3)
- Parameters:
step_id – Unique ID.
up – Keyed stream.
reducer – Combines a new value into an old value and returns the combined value.
- Returns:
A keyed stream of the accumulators. Only once the upstream is EOF.
- stateful_flat_map(
- step_id: str,
- up: KeyedStream[V],
- mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]],
Transform values one-to-many, referencing a persistent state.
- Parameters:
step_id – Unique ID.
up – Keyed stream.
mapper – Called whenever a value is encountered from upstream with the last state or
None
, and then the upstream value. Should return a 2-tuple of(updated_state, emit_values)
. If the updated state isNone
, discard it.
- Returns:
A keyed stream.
- stateful_map( ) KeyedStream[W] #
Transform values one-to-one, referencing a persistent state.
It is commonly used for:
Anomaly detection
State machines
import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.dataflow import Dataflow flow = Dataflow("stateful_map_eg") inp = [ "a", "a", "a", "b", "a", ] s = op.input("inp", flow, TestingSource(inp)) s = op.key_on("self_as_key", s, lambda x: x) def check(running_count, _item): if running_count is None: running_count = 0 running_count += 1 return (running_count, running_count) s = op.stateful_map("running_count", s, check) _ = op.inspect("out", s)
stateful_map_eg.out: ('a', 1) stateful_map_eg.out: ('a', 2) stateful_map_eg.out: ('a', 3) stateful_map_eg.out: ('b', 1) stateful_map_eg.out: ('a', 4)
- Parameters:
step_id – Unique ID.
up – Keyed stream.
mapper – Called whenever a value is encountered from upstream with the last state or
None
, and then the upstream value. Should return a 2-tuple of(updated_state, emit_value)
. If the updated state isNone
, discard it.
- Returns:
A keyed stream.