bytewax.connectors.kafka
#
Connectors for Kafka.
Importing this module requires the
confluent-kafka
package to be installed.
The input source returns a stream of
KafkaSourceMessage
. See the
docstring for its use.
You can use KafkaSource
and
KafkaSink
directly:
from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage
from bytewax import operators as op
from bytewax.dataflow import Dataflow
brokers = ["localhost:19092"]
flow = Dataflow("example")
kinp = op.input("kafka-in", flow, KafkaSource(brokers, ["in-topic"]))
processed = op.map("map", kinp, lambda x: KafkaSinkMessage(x.key, x.value))
op.output("kafka-out", processed, KafkaSink(brokers, "out-topic"))
Or the custom operators:
from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage
from bytewax import operators as op
from bytewax.dataflow import Dataflow
brokers = ["localhost:19092"]
flow = Dataflow("example")
kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
errs = op.inspect("errors", kinp.errs).then(op.raises, "crash-on-err")
processed = op.map("map", kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value))
kop.output("kafka-out", processed, brokers=brokers, topic="out-topic")
Submodules#
Data#
- BYTEWAX_CONSUMER_LAG_GAUGE#
Classes#
- class KafkaSourceMessage#
-
Message read from Kafka.
- class KafkaError#
-
Error from a
KafkaSource
.- err: KafkaError#
Underlying error from the consumer.
- msg: KafkaSourceMessage[K, V]#
Message attached to that error.
- class KafkaSource(
- brokers: Iterable[str],
- topics: Iterable[str],
- tail: bool = True,
- starting_offset: int = OFFSET_BEGINNING,
- add_config: Optional[Dict[str, str]] = None,
- batch_size: int = 1000,
- raise_on_errors: bool = True,
- Bases:
Use a set of Kafka topics as an input source.
Partitions are the unit of parallelism. Can support exactly-once processing.
Messages are emitted into the dataflow as
KafkaSourceMessage
objects with both keys and values as optional bytes.Initialization
Init.
- Parameters:
brokers – List of
host:port
strings of Kafka brokers.topics – List of topics to consume from.
tail – Whether to wait for new data on this topic when the end is initially reached.
starting_offset – Can be either
confluent_kafka.OFFSET_BEGINNING
orconfluent_kafka.OFFSET_END
. Defaults to beginning of topic.add_config – Any additional configuration properties. See the
rdkafka
documentation for options.batch_size – How many messages to consume at most at each poll. This is 1000 by default. The default setting is a suitable starting point for higher throughput dataflows, but can be tuned lower to potentially decrease individual message processing latency.
raise_on_errors – If set to False, errors won’t stop the dataflow, and will be emitted into the dataflow.
- class KafkaSinkMessage#
-
Message to be written to Kafka.
- class KafkaSink( )#
- Bases:
Use a single Kafka topic as an output sink.
Items consumed from the dataflow must be
KafkaSinkMessage
with both keys and values as optional bytes.Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.
Initialization
Init.
- Parameters:
brokers – List of
host:port
strings of Kafka brokers.topic – Topic to produce to. If it’s
None
, the topic to produce to will be read in eachKafkaSinkMessage
.add_config – Any additional configuration properties. See the
rdkafka
documentation for options.