bytewax.duckdb.operators#

Operators for the DuckDB sink.

This module provides operators for writing data to DuckDB or MotherDuck using the Bytewax DuckDB sink.

Usage:

import bytewax.duckdb.operators as duck_op
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingSource
import random
from typing import Dict, Tuple, Union

# Initialize the dataflow
flow = Dataflow("duckdb-names-cities")

# Define sample data for names and locations
names = ["Alice", "Bob", "Charlie", "Diana", "Eve"]
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]

# Function to create a dictionary with more varied data
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
    name = random.choice(names)
    age = random.randint(20, 60)  # Random age between 20 and 60
    location = random.choice(locations)
    # This tuple represents the key and the dictionary to be written
    return (str(value), {"id": value, "name": name, "age": age, "location": location})

# Generate input data
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)

# Output the data to DuckDB, creating a table with multiple columns
duck_op.output(
    "out",
    dict_stream,
    db_path,
    "names_cities",
    "CREATE TABLE IF NOT EXISTS names_cities        (id INTEGER, name TEXT, age INTEGER, location TEXT)"
)

# Run the dataflow
run_main(flow)

We can verify it was written to the database by querying the table:

import duckdb

con = duckdb.connect(db_path)
con.execute("SELECT * FROM names_cities").fetchdf()

Functions#

output(
step_id: str,
up: KeyedStream[V],
db_path: str,
table_name: str,
create_table_sql: Optional[str],
timeout: timedelta = timedelta(seconds=1),
batch_size: int = 122880,
) None#

Produce to DuckDB as an output sink.

Parameters:
  • step_id – Unique ID.

  • up – Stream of records. Key must be a String and value must be a Python Dictionary that is serializable into a PyArrow table.

  • create_table_sql – Optional SQL statement to create DuckDB table if it does not already exist.

  • timeout – a timedelta of the amount of time to wait for new data before writing. Defaults to 1 second.

  • batch_size – the number of items to wait for before writing. Defaults to 122_880, an optimal size for DuckDB.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now