Custom IO Connectors#

More documentation here is forthcoming.

For now look at bytewax.inputs.FixedPartitionedSource and bytewax.inputs.DynamicSource for inputs and bytewax.outputs.FixedPartitionedSink and bytewax.outputs.DynamicSink for outputs for the abstract base classes you need to implement.

Also check out some of our built-in custom connectors in bytewax.connectors and looking at the source for how they are implemented.

Async Client Libraries#

The lowest-level hooks that Bytewax’s source and sink APIs provide are currently all synchronous. E.g. bytewax.inputs.StatefulSourcePartition.next_batch is a def next_batch and not an async def next_batch. This means that you cannot directly call async functions provided by a client library.

Here we’ll provide some common patterns for bridging the gap between async client libraries and Bytewax’s source and sink APIs. It’s possible the client library you pick will use an API design that is not compatible with these patterns, but the ideas within can be reused to fit with other designs.

Async Sources#

If your source’s client library only provides async functions, we recommend creating a helper async generator that wraps up all async operations, and using bytewax.inputs.batch_async.

In the following example, we’ll use a hypothetical WebSockets library that gives us the following WSClient class with async methods:

from typing_extensions import Self


class WSClient:
    @classmethod
    async def connect(cls, to_url: str) -> Self:
        """Open a new connection."""
        ...

    async def recv(self) -> bytes:
        """Get the next message, waiting until one is available."""
        ...

Given this API, first lets encapsulate all of the async calls into a single asynchronous generator so we only have a single async entry point which we need to shim.

from typing import AsyncIterator


async def _ws_helper(to_url: str) -> AsyncIterator[bytes]:
    client = await WSClient.connect(to_url)

    while True:
        msg = await client.recv()
        yield msg

This sets up a connection, then polls it for an unending stream of new data.

We now need to have some way to make this async generator play well with the cooperative multitasking source API: Each worker goes through all of the operators, polls them for work to be done. If a badly-behaved operator blocks forever, then the rest of the operators in the dataflow are never executed! Thus, e.g. next_batch must never block for long periods of time. Sources are especially easy to accidentally make badly-behaved logic which never yields because they aren’t responding to upstream items: they generate items from an external system and you need to introduce some manual way of saying “that’s enough work, take a break”.

bytewax.inputs.batch_async is a helper function which turns an async iterator into a normal non-async iterator of batches. It gives you a way to run an async iterator for a short period of time, collect the items yielded, then pause the iterator so you can come back to it later. This lets you play well with the cooperative multitasking requirement of sources.

To use it, you pick two parameters:

  • timeout - How long to block and poll for new items before pausing.

  • batch_size - The maximum number of items to accumulate before pausing.

Now that we have our tools, we’ll build an input source that represents the WebSocket stream as a single partition. First we have to call our _ws_helper to instantiate the async generator that will yield the incoming messages. But then we wrap that in bytewax.inputs.batch_async to get a normal sync iterator of batches. Here we specify that whenever we poll self._batcher for more items, it’ll wait up to 500 milliseconds or gather up to 100 items before returning that batch, whichever comes first. next is used to advance that sync iterator.

from typing import List

from bytewax.inputs import batch_async, StatefulSourcePartition


class EgWebSocketSourcePartition(StatefulSourcePartition[bytes, None]):
    def __init__(self):
        agen = _ws_helper("http://test.invalid/socket")
        self._batcher = batch_async(agen, timedelta(seconds=0.5), 100)

    def next_batch(self) -> List[bytes]:
        return next(self._batcher)

    def snapshot(self) -> None:
        return None

Then finally we wrap that up in the boilerplate to make a single partitioned source.

from bytewax.inputs import FixedPartitionedSource


class EgWebSocketSource(FixedPartitionedSource[bytes, None]):
    def list_parts(self) -> List[str]:
        return ["singleton"]

    def build_part(
        self, step_id: str, for_key: str, _resume_state: None
    ) -> EgWebSocketSourcePartition:
        return EgWebSocketSourcePartition()

This WebSocketSource can then be used with any bytewax.operators.input like usual.

Async Sinks#

Writing sinks which use an async client library requires a slightly different pattern because a sink must successfully write out all provided items before bytewax.outputs.StatefulSinkPartition.write_batch returns; it can’t delay writing items and maintain delivery guarantees. Thankfully the Python standard library gives you the asyncio.run method, which will block and run an async function to completion.

This fits well when you are given a client library with the following shape:

class WSClient:
    async def send(self, msg: bytes) -> None:
        """Send the following message."""
        ...

You could then use asyncio.run to call the async def send method synchronously for each item.

import asyncio

from bytewax.outputs import StatefulSinkPartition


class EgWebSocketSinkPartition(StatefulSinkPartition[bytes, None]):
    def __init__(self):
        self._client = WSClient()

    def write_batch(self, batch: List[bytes]) -> None:
        for item in batch:
            asyncio.run(self._client.send(item))

    def snapshot(self) -> None:
        return None
Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now