bytewax.clickhouse.operators#

Operators for the ClickHouse sink.

It’s suggested to import operators like this:

from bytewax.connectors.clickhouse import operators as chop

And then you can use the operators like this:

from bytewax.dataflow import Dataflow

flow = Dataflow("clickhouse-out")
input = kop.input("kafka_inp", flow, brokers=[...], topics=[...])
chop.output(
    "ch-out",
    input,
)

Data#

KeyedStream: TypeAlias#

A Stream of (key, value) 2-tuples.

Functions#

output(
step_id: str,
up: KeyedStream[V],
pa_schema: Schema,
table_name: str,
ch_schema: str,
username: str,
password: str,
host: str = 'localhost',
port: int = 8123,
database: str = 'default',
order_by: str = '',
timeout: timedelta = timedelta(seconds=1),
max_size: int = 50,
) None#

Produce to ClickHouse as an output sink.

Uses Arrow format, must be arrow serializiable.

Default partition routing is used.

Workers are the unit of parallelism.

Can support at-least-once processing depending on the MergeTree used for downstream queries.

Parameters:
  • step_id – Unique ID.

  • up – Stream of records. Key must be a String and value must be serializable into an arrow table.

  • pa_schema – Arrow schema.

  • table_name – Table name for the writes.

  • ch_schema – schema string of format column1 UInt32,\\n column2 String,\\n column3 Date,

  • username – database username, user must have correct permissions.

  • password

  • host – host name, defaults to “localhost”.

  • port – port name, defaults to 8123.

  • database – optional database name. If omitted this will use the default database.

  • order_by – order by string that determines the sort of the table for deduplication. Should be of format: column1, column2

  • timeout – a timedelta of the amount of time to wait for new data before writing. Defaults to 1 second.

  • batch_size – the number of items to wait before writing defaults to 50.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now