bytewax.clickhouse.operators
#
Operators for the ClickHouse sink.
It’s suggested to import operators like this:
from bytewax.connectors.clickhouse import operators as chop
And then you can use the operators like this:
from bytewax.dataflow import Dataflow
flow = Dataflow("clickhouse-out")
input = kop.input("kafka_inp", flow, brokers=[...], topics=[...])
chop.output(
"ch-out",
input,
)
Data#
Functions#
- output(
- step_id: str,
- up: KeyedStream[V],
- pa_schema: Schema,
- table_name: str,
- ch_schema: str,
- username: str,
- password: str,
- host: str = 'localhost',
- port: int = 8123,
- database: str = 'default',
- order_by: str = '',
- timeout: timedelta = timedelta(seconds=1),
- max_size: int = 50,
Produce to ClickHouse as an output sink.
Uses Arrow format, must be arrow serializiable.
Default partition routing is used.
Workers are the unit of parallelism.
Can support at-least-once processing depending on the MergeTree used for downstream queries.
- Parameters:
step_id – Unique ID.
up – Stream of records. Key must be a
String
and value must be serializable into an arrow table.pa_schema – Arrow schema.
table_name – Table name for the writes.
ch_schema – schema string of format
column1 UInt32,\\n column2 String,\\n column3 Date
,username – database username, user must have correct permissions.
password
host – host name, defaults to “localhost”.
port – port name, defaults to 8123.
database – optional database name. If omitted this will use the default database.
order_by – order by string that determines the sort of the table for deduplication. Should be of format:
column1, column2
timeout – a timedelta of the amount of time to wait for new data before writing. Defaults to 1 second.
batch_size – the number of items to wait before writing defaults to 50.