bytewax.operators.helpers#

Helper functions for using operators.

Data#

K: TypeVar#
V: TypeVar#

Functions#

map_dict_value(
key: K,
mapper: Callable[[V], V],
) Callable[[Dict[K, V]], Dict[K, V]]#

Build a function to map an item in a dict and return the dict.

Use this to help build mapper functions for the map operator that work on a specific value in a dict, but leave the other values untouched.

import bytewax.operators as op
from bytewax.testing import TestingSource
from bytewax.dataflow import Dataflow
from bytewax.operators.helpers import map_dict_value

flow = Dataflow("lens_item_map_eg")
s = op.input(
    "inp",
    flow,
    TestingSource(
        [
            {"name": "Rachel White", "email": "rachel@white.com"},
            {"name": "John Smith", "email": "john@smith.com"},
        ]
    ),
)

def normalize(name):
    return name.upper()

s = op.map("normalize", s, map_dict_value("name", normalize))

_ = op.inspect("out", s)
lens_item_map_eg.out: {'name': 'RACHEL WHITE', 'email': 'rachel@white.com'}
lens_item_map_eg.out: {'name': 'JOHN SMITH', 'email': 'john@smith.com'}

This type of “do an operation on a known nested structure” is called a lens. If you’d like to produce more complex lenses, see the lenses package. It handles many more nuances of this problem like mutable vs immutable data types, attributes vs keys, and mutating methods vs returning functions. You can use it to build mappers for Bytewax operators.

Parameters:
  • key – Dictionary key.

  • mapper – Function to run on the value for that key.

Returns:

A function which will perform that mapping operation when called.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now