bytewax.clickhouse#

ClickHouse Sink Implementation.

This module provides a dynamic sink for writing data to a ClickHouse database using Bytewax’s streaming data processing framework. The sink is capable of creating and managing a connection to ClickHouse, checking for the existence of the target table, and creating it if necessary using a ReplacingMergeTree engine.

Classes: ClickHouseSink: A dynamic sink that connects to a ClickHouse database, manages table creation, and writes data in batches. _ClickHousePartition: A stateless partition responsible for writing batches of data to the ClickHouse database.

Usage: - The ClickHouseSink class is used to define a sink that can be connected to a Bytewax dataflow. - The build method of ClickHouseSink creates a _ClickHousePartition that handles the actual data writing process. - The sink supports creating a table with a specified schema if it does not exist, and verifies the existing table’s engine type for compatibility with ReplacingMergeTree.

Logging: The module uses Python’s logging library to log important events such as table existence checks, schema details, and warnings about potential performance issues.

Submodules#

Data#

K: TypeVar#

Type of key in Kafka message.

V: TypeVar#

Type of value in a Kafka message.

logger#

Classes#

class ClickHouseSink(
table_name: str,
schema: str,
username: str,
password: str,
host: str = 'localhost',
port: int = 8123,
database: str = 'default',
order_by: str = '',
)#
Bases:

A dynamic sink for writing data to a ClickHouse database in a Bytewax dataflow.

The ClickHouseSink class provides functionality to connect to a ClickHouse database, check for the existence of a specified table, and create it if it doesn’t exist. The class ensures that the table uses the ReplacingMergeTree engine and writes data in batches using the PyArrow format.

Methods: build(step_id, worker_index, worker_count) -> _ClickHousePartition: Constructs a _ClickHousePartition instance that manages the actual data writing process.

Initialization

Initialize the ClickHouseSink.

Sets up the connection parameters for the ClickHouse database and verifies the existence of the target table. If the table does not exist, it will be created using the specified schema.

Args: table_name (str): Name of the table in ClickHouse. username (str): Username for authentication with the ClickHouse server. password (str): Password for authentication with the ClickHouse server. host (str, optional): Hostname of ClickHouse server. Default is “localhost”. port (int, optional): Port number of the ClickHouse server. Default is 8123. database (Optional[str], optional): Name of the database in ClickHouse. If not provided, uses “default”. schema (Optional[str], optional): Schema definition for the table if needs to be created. Defaults to None. order_by (str, optional): Comma-separated list of columns to order by in the ReplacingMergeTree engine. Defaults to “”.

Raises: ValueError: If the schema is not provided and the table does not exist, a ValueError is raised.

build(
step_id: str,
worker_index: int,
worker_count: int,
) bytewax.clickhouse._ClickHousePartition#

Build a sink partition for writing to ClickHouse.

This method constructs an instance of _ClickHousePartition, which will handle the actual data writing to the ClickHouse table for the specified worker in a distributed Bytewax dataflow.

Args: step_id (str): The ID of the step in the Bytewax dataflow. worker_index (int): The index of the worker in the dataflow. worker_count (int): The total number of workers in the dataflow.

Returns: _ClickHousePartition: An instance of _ClickHousePartition that will manage the data writing for this worker.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now