bytewax.duckdb.operators#
Operators for the DuckDB sink.
This module provides operators for writing data to DuckDB or MotherDuck using the Bytewax DuckDB sink.
Usage:
import bytewax.duckdb.operators as duck_op
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingSource
import random
from typing import Dict, Tuple, Union
# Initialize the dataflow
flow = Dataflow("duckdb-names-cities")
# Define sample data for names and locations
names = ["Alice", "Bob", "Charlie", "Diana", "Eve"]
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]
# Function to create a dictionary with more varied data
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
name = random.choice(names)
age = random.randint(20, 60) # Random age between 20 and 60
location = random.choice(locations)
# This tuple represents the key and the dictionary to be written
return (str(value), {"id": value, "name": name, "age": age, "location": location})
# Generate input data
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)
# Output the data to DuckDB, creating a table with multiple columns
duck_op.output(
"out",
dict_stream,
db_path,
"names_cities",
"CREATE TABLE IF NOT EXISTS names_cities (id INTEGER, name TEXT, age INTEGER, location TEXT)"
)
# Run the dataflow
run_main(flow)
We can verify it was written to the database by querying the table:
import duckdb
con = duckdb.connect(db_path)
con.execute("SELECT * FROM names_cities").fetchdf()
Functions#
- output(
- step_id: str,
- up: KeyedStream[V],
- db_path: str,
- table_name: str,
- create_table_sql: Optional[str],
- timeout: timedelta = timedelta(seconds=1),
- batch_size: int = 122880,
Produce to DuckDB as an output sink.
- Parameters:
step_id – Unique ID.
up – Stream of records. Key must be a
Stringand value must be a Python Dictionary that is serializable into a PyArrow table.create_table_sql – Optional SQL statement to create DuckDB table if it does not already exist.
timeout – a timedelta of the amount of time to wait for new data before writing. Defaults to 1 second.
batch_size – the number of items to wait for before writing. Defaults to 122_880, an optimal size for DuckDB.