bytewax.connectors.kafka#

Connectors for Kafka.

Importing this module requires the confluent-kafka package to be installed.

The input source returns a stream of KafkaMessage. See the docstring for its use.

You can use KafkaSource and KafkaSink directly:

from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage from bytewax import operators as op from bytewax.dataflow import Dataflow

brokers = [“localhost:19092”] flow = Dataflow(“example”) kinp = op.input(“kafka-in”, flow, KafkaSource(brokers, [“in-topic”])) processed = op.map(“map”, kinp, lambda x: KafkaSinkMessage(x.key, x.value)) op.output(“kafka-out”, processed, KafkaSink(brokers, “out-topic”))

Or the custom operators:

from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage from bytewax import operators as op from bytewax.dataflow import Dataflow

brokers = [“localhost:19092”] flow = Dataflow(“example”) kinp = kop.input(“kafka-in”, flow, brokers=brokers, topics=[“in-topic”]) errs = op.inspect(“errors”, kinp.errs).then(op.raises, “crash-on-err”) processed = op.map(“map”, kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value)) kop.output(“kafka-out”, processed, brokers=brokers, topic=”out-topic”)

Submodules#

Classes#

class KafkaSource(
brokers: Iterable[str],
topics: Iterable[str],
tail: bool = True,
starting_offset: int = OFFSET_BEGINNING,
add_config: Optional[Dict[str, str]] = None,
batch_size: int = 1000,
raise_on_errors: bool = True,
)#
Bases:

Use a set of Kafka topics as an input source.

Partitions are the unit of parallelism. Can support exactly-once processing.

Messages are emitted into the dataflow as bytewax.connectors.kafka.KafkaMessage objects.

Initialization

Init.

Args: brokers: List of host:port strings of Kafka brokers.

topics: List of topics to consume from.

tail: Whether to wait for new data on this topic when the
    end is initially reached.

starting_offset: Can be either `confluent_kafka.OFFSET_BEGINNING` or
    `confluent_kafka.OFFSET_END`. Defaults to beginning of
    topic.

add_config: Any additional configuration properties. See [the
    `rdkafka`
    documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
    for options.

batch_size: How many messages to consume at most at each poll.
    This is 1000 by default. The default setting is a suitable
    starting point for higher throughput dataflows, but can be
    tuned lower to potentially decrease individual message
    processing latency.

raise_on_errors: If set to False, errors won't stop the dataflow, and the
    KafkaMessage.error field will be set. It's up to you to
    properly handle the error later.
list_parts() List[str]#

Each Kafka partition is an input partition.

build_part(
now: datetime,
for_part: str,
resume_state: Optional[int],
) bytewax.connectors.kafka.source._KafkaSourcePartition#

See ABC docstring.

class KafkaSink(
brokers: Iterable[str],
topic: Optional[str],
add_config: Optional[Dict[str, str]] = None,
)#
Bases:

Use a single Kafka topic as an output sink.

Items consumed from the dataflow must be KafkaSinkMessage with both key and value represented as str | bytes | None.

Workers are the unit of parallelism.

Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.

Initialization

Init.

Args: brokers: List of host:port strings of Kafka brokers.

topic: Topic to produce to. If it's `None`, the topic
    to produce to will be read in each KafkaMessage.

add_config: Any additional configuration properties. See [the
    `rdkafka`
    documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
    for options.
build(
worker_index: int,
worker_count: int,
) bytewax.connectors.kafka.sink._KafkaSinkPartition#

See ABC docstring.

class KafkaSinkMessage#
Bases:

Class that holds a message from kafka with metadata.

Use KafkaMessage.key to get the key and KafkaMessage.value to get the value.

Other fields: topic, headers, partition, timestamp

key: bytewax.connectors.kafka._types.K_co#
value: bytewax.connectors.kafka._types.V_co#
topic: Optional[str]#
headers: List[Tuple[str, bytes]]#

‘field(…)’

partition: Optional[int]#
timestamp: int = 0#
class KafkaSourceMessage#
Bases:

Class that holds a message from kafka with metadata.

Use msg.key to get the key and msg.value to get the value.

Other fields: topic, headers, latency offset, partition, timestamp

key: bytewax.connectors.kafka._types.K#
value: bytewax.connectors.kafka._types.V#
topic: Optional[str]#

‘field(…)’

headers: List[Tuple[str, bytes]]#

‘field(…)’

latency: Optional[float]#

‘field(…)’

offset: Optional[int]#

‘field(…)’

partition: Optional[int]#

‘field(…)’

timestamp: Optional[Tuple[int, int]]#

‘field(…)’

to_sink() KafkaSinkMessage[_types.K, _types.V]#

Safely convert KafkaSourceMessage to KafkaSinkMessage.

Only key, value and timestamp are used.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now