Design patterns

Here are some tips, tricks, and Python features that solve common questions and help you write succinct, easy-to-read code.

Quick Logic Functions

All of the above examples define named custom logic functions and then pass them to an operator. Any callable value can be used as-is, though!

This means you can use the following existing callables to help you make code more concise:

You can also use lambdas to quickly define one-off anonymous functions for simple custom logic.

The following sets of examples are equivalent.

For flat map:

from bytewax import Dataflow, run


def split_sentence(sentence):
    return sentence.split()


flow = Dataflow()
flow.flat_map(split_sentence)
flow.capture()


inp = [(0, "hello world")]
print(run(flow, inp))
[(0, 'hello'), (0, 'world')]
flow = Dataflow()
flow.flat_map(lambda s: s.split())
flow.capture()


inp = [(0, "hello world")]
print(run(flow, inp))
[(0, 'hello'), (0, 'world')]
flow = Dataflow()
flow.flat_map(str.split)
flow.capture()


inp = [(0, "hello world")]
print(run(flow, inp))
[(0, 'hello'), (0, 'world')]

For inspect epoch:

def log(epoch, item):
    print(epoch, item)


flow = Dataflow()
flow.inspect_epoch(log)
flow.capture()


inp = [(0, "a"), (1, "b")]
run(flow, inp)  # Note no print here.
0 a
1 b
flow = Dataflow()
flow.inspect_epoch(lambda e, i: print(e, i))
flow.capture()


inp = [(0, "a"), (1, "b")]
run(flow, inp)
0 a
1 b
flow = Dataflow()
flow.inspect_epoch(print)
flow.capture()


inp = [(0, "a"), (1, "b")]
run(flow, inp)
0 a
1 b

For reduce epoch:

def add_to_list(l, items):
    l.extend(items)
    return l


flow = Dataflow()
flow.reduce_epoch(add_to_list)
flow.capture()


inp = [(0, ("a", ["x"])), (0, ("a", ["y"]))]
print(run(flow, inp))
[(0, ('a', ['x', 'y']))]
flow = Dataflow()
flow.reduce_epoch(lambda l1, l2: l1 + l2)
flow.capture()


inp = [(0, ("a", ["x"])), (0, ("a", ["y"]))]
print(run(flow, inp))
[(0, ('a', ['x', 'y']))]
import operator


flow = Dataflow()
flow.reduce_epoch(operator.add)
flow.capture()


inp = [(0, ("a", ["x"])), (0, ("a", ["y"]))]
print(run(flow, inp))
[(0, ('a', ['x', 'y']))]

For filter:

def is_odd(item):
    return item % 2 == 1


flow = Dataflow()
flow.filter(is_odd)
flow.capture()


inp = [(0, 5), (0, 9), (0, 2)]
print(run(flow, inp))
[(0, 5), (0, 9)]
flow = Dataflow()
flow.filter(lambda x: x % 2 == 1)
flow.capture()


inp = [(0, 5), (0, 9), (0, 2)]
print(run(flow, inp))
[(0, 5), (0, 9)]

Subflows

If you find yourself repeating a series of steps in your dataflows or want to give some steps a descriptive name, you can group those steps into a subflow function which adds a sequence of steps. You can then call that subflow function whenever you need that step sequence. This is just calling a function.

def user_reducer(all_events, new_events):
    return all_events + new_events


def collect_user_events(flow):
    # event
    flow.map(lambda e: (e["user_id"], [e]))
    # (user_id, event)
    flow.reduce_epoch(user_reducer)
    # (user_id, events_for_user)
    flow.map(lambda u_es: u_es[1])
    # events_for_user


flow = Dataflow()
collect_user_events(flow)
flow.capture()


inp = [
    (0, {"user_id": "1", "type": "login"}),
    (0, {"user_id": "1", "type": "logout"}),
]
print(run(flow, inp))
[(0, [{'user_id': '1', 'type': 'login'}, {'user_id': '1', 'type': 'logout'}])]