bytewax.connectors.kafka#
Connectors for Kafka.
Importing this module requires the
confluent-kafka
package to be installed.
The input source returns a stream of KafkaMessage.
See the docstring for its use.
You can use KafkaSource and KafkaSink directly:
from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage from bytewax import operators as op from bytewax.dataflow import Dataflow
brokers = [“localhost:19092”] flow = Dataflow(“example”) kinp = op.input(“kafka-in”, flow, KafkaSource(brokers, [“in-topic”])) processed = op.map(“map”, kinp, lambda x: KafkaSinkMessage(x.key, x.value)) op.output(“kafka-out”, processed, KafkaSink(brokers, “out-topic”))
Or the custom operators:
from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage from bytewax import operators as op from bytewax.dataflow import Dataflow
brokers = [“localhost:19092”] flow = Dataflow(“example”) kinp = kop.input(“kafka-in”, flow, brokers=brokers, topics=[“in-topic”]) errs = op.inspect(“errors”, kinp.errs).then(op.raises, “crash-on-err”) processed = op.map(“map”, kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value)) kop.output(“kafka-out”, processed, brokers=brokers, topic=”out-topic”)
Submodules#
Classes#
- class KafkaSource(
- brokers: Iterable[str],
- topics: Iterable[str],
- tail: bool = True,
- starting_offset: int = OFFSET_BEGINNING,
- add_config: Optional[Dict[str, str]] = None,
- batch_size: int = 1000,
- raise_on_errors: bool = True,
- Bases:
FixedPartitionedSource[_KafkaItem,Optional[int]]
Use a set of Kafka topics as an input source.
Partitions are the unit of parallelism. Can support exactly-once processing.
Messages are emitted into the dataflow as
bytewax.connectors.kafka.KafkaMessageobjects.Initialization
Init.
Args: brokers: List of
host:portstrings of Kafka brokers.topics: List of topics to consume from. tail: Whether to wait for new data on this topic when the end is initially reached. starting_offset: Can be either `confluent_kafka.OFFSET_BEGINNING` or `confluent_kafka.OFFSET_END`. Defaults to beginning of topic. add_config: Any additional configuration properties. See [the `rdkafka` documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for options. batch_size: How many messages to consume at most at each poll. This is 1000 by default. The default setting is a suitable starting point for higher throughput dataflows, but can be tuned lower to potentially decrease individual message processing latency. raise_on_errors: If set to False, errors won't stop the dataflow, and the KafkaMessage.error field will be set. It's up to you to properly handle the error later.
- class KafkaSink( )#
-
Use a single Kafka topic as an output sink.
Items consumed from the dataflow must be
KafkaSinkMessagewith both key and value represented asstr | bytes | None.Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.
Initialization
Init.
Args: brokers: List of
host:portstrings of Kafka brokers.topic: Topic to produce to. If it's `None`, the topic to produce to will be read in each KafkaMessage. add_config: Any additional configuration properties. See [the `rdkafka` documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for options.
- class KafkaSinkMessage#
-
Class that holds a message from kafka with metadata.
Use
KafkaMessage.keyto get the key andKafkaMessage.valueto get the value.Other fields:
topic,headers,partition,timestamp- key: bytewax.connectors.kafka._types.K_co#
- value: bytewax.connectors.kafka._types.V_co#
- class KafkaSourceMessage#
-
Class that holds a message from kafka with metadata.
Use
msg.keyto get the key andmsg.valueto get the value.Other fields:
topic,headers,latencyoffset,partition,timestamp- key: bytewax.connectors.kafka._types.K#
- value: bytewax.connectors.kafka._types.V#
- to_sink() KafkaSinkMessage[_types.K, _types.V]#
Safely convert KafkaSourceMessage to KafkaSinkMessage.
Only
key,valueandtimestampare used.