Kafka#

Bytewax ships with connectors for Kafka and Kafka compatible systems, like Redpanda in the bytewax.connectors.kafka module.

In this section, we’ll discuss these connectors in detail, as well as provide some important operational information about using them as sources and sinks.

Connecting to Kafka#

Bytewax provides two basic ways to connect to Kafka.

You can use KafkaSource and KafkaSink directly:

1from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage
2from bytewax import operators as op
3from bytewax.dataflow import Dataflow
4
5brokers = ["localhost:19092"]
6flow = Dataflow("example")
7kinp = op.input("kafka-in", flow, KafkaSource(brokers, ["in-topic"]))
8processed = op.map("map", kinp, lambda x: KafkaSinkMessage(x.key, x.value))
9op.output("kafka-out", processed, KafkaSink(brokers, "out-topic"))

Or use the bytewax.connectors.kafka.operators.input operator:

1from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage
2from bytewax import operators as op
3from bytewax.dataflow import Dataflow
4
5brokers = ["localhost:19092"]
6flow = Dataflow("example")
7kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
8processed = op.map("map", kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value))
9kop.output("kafka-out", processed, brokers=brokers, topic="out-topic")

Typical use cases should prefer the Kafka operators, while the KafkaSource connector can be used for other customizations.

Error handling#

By default, KafkaSource will raise an exception whenever an error is encountered when consuming from Kafka. This behavior can be configured with the raise_on_errors parameter, which will yield KafkaError items. Those errors can be handled downstream individually.

The bytewax.connectors.kafka.operators.input operator returns a dataclass containing two output streams. The .oks field is a stream of KafkaSourceMessage that were successfully processed. The .errs field is a stream of KafkaError messages where an error was encountered. Items that encountered an error have their .err field set with more details about the error.

1flow = Dataflow("example")
2kinp = kop.input("kafka-in-2", flow, brokers=brokers, topics=["in-topic"])
3# Print out errors that are encountered, and then raise an exception
4op.inspect("inspect_err", kinp.errs).then(op.raises, "raise_errors")

Note that if no processing is attached to the .errs stream of messages, they will be silently dropped, and processing will continue.

Alternatively, error messages in the .errs stream can be published to a “dead letter queue”, a separate Kafka topic where they can be inspected and reprocessed later, while allowing the dataflow to continue processing data.

Batch sizes#

By default, Bytewax will consume a batch of up to 1000 messages at a time from Kafka. The default setting is often sufficient, but some dataflows may benefit from a smaller batch size to decrease overall latency.

If your dataflow would benefit from lower latency, you can set the batch_size parameter to a lower value. The batch_size parameter configures the maximum number of messages that will be fetched at a time from each worker.

Message Types#

Messages received from KafkaSource are emitted into the dataflow as items of type KafkaSourceMessage. This dataclass includes basic Kafka fields like .key and .value, as well as extra information fields like .headers.

Messages that are published to a KafkaSink must be of type KafkaSinkMessage

You can create a KafkaSinkMessage with the data you want:

1msg = KafkaSinkMessage(key=None, value="some_value")

And you can optionally set topic, headers, partition and timestamp.

The output operator also accepts messages of type KafkaSourceMessage. They are automatically converted to KafkaSinkMessage keeping only .key and .value from KafkaSourceMessage.

Dynamic topic writes#

Setting the topic field of a KafkaSinkMessage will cause that message to be written to that topic.

Additionally, the KafkaSink class can be constructed without specifying a topic:

1op.output("kafka-dynamic-out", processed, KafkaSink(brokers, topic=None))

Writes to this output will be written to the topic that is specified when creating a KafkaSinkMessage.

1KafkaSinkMessage(msg.key, msg.value, topic="out-topic-1")

Note that not setting a topic for a KafkaSinkMessage when KafkaSink is not configured with a default topic will result in a runtime error.

Kafka and Recovery#

Typical deployments of Kafka utilize consumer groups in order to manage partition assignment and the storing of Kafka offsets.

Bytewax does not use consumer groups to store offsets or asssign Kafka topic partitions to consumers. In order to correctly support recovery, Bytewax must manage and store the consumer offsets in Bytewax recovery partitions.

When recovery is not enabled, Bytewax will start consuming from each partition using the earliest available offset. This setting can be changed when creating a new KafkaSource with the starting_offset parameter, which accepts the types defined in the confluent_kafka library.

If you are not using recovery and would prefer to track offsets on the broker side, you can pass additional options to the Kafka input sources to create a consumer group:

 1from confluent_kafka import OFFSET_STORED
 2
 3add_config = {"group.id": "consumer_group", "enable.auto.commit": "true"}
 4BROKERS = ["localhost:19092"]
 5IN_TOPICS = ["in_topic"]
 6
 7flow = Dataflow("kafka_in_out")
 8kinp = kop.input(
 9    "inp",
10    flow,
11    starting_offset=OFFSET_STORED,
12    add_config=add_config,
13    brokers=BROKERS,
14    topics=IN_TOPICS,
15)

The code above creates a Kafka consumer that uses a group_id of consumer_group that periodically commits it’s consumed offsets according to the auto.commit.interval.ms parameter. By default, this interval is set to 5000ms.

It is important to note that this interval is not coordinated with any processing steps within Bytewax. As a result, some messages may not be processed if the dataflow crashes.

Partitions#

Partitions are a fundamental concept for Kafka and Redpanda, and are the unit of parallelism for producing and consuming messages.

When multiple workers are started, Bytewax will assign individual Kafka topic partitions to the available number of workers. If there are fewer partitions than workers, some workers will not be assigned a partition to read from. If there are more partitions than workers, some workers will handle more than one partition.

If the number of partitions changes, Dataflows will need to be restarted in order to rebalance new partition assignments to workers.

Schema registry#

Bytewax supports integrating with the Redpanda Schema Registry as well as the Confluent Schema Registry.

The following is an example of integrating with the Redpanda Schema Registry:

 1import bytewax.operators as op
 2
 3from bytewax.dataflow import Dataflow
 4from bytewax.connectors.kafka import KafkaSinkMessage, KafkaSourceMessage
 5from bytewax.connectors.kafka import operators as kop
 6from bytewax.connectors.kafka.registry import RedpandaSchemaRegistry, SchemaRef
 7
 8BROKERS = ["localhost:19092"]
 9IN_TOPICS = ["in_topic"]
10REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"
11
12registry = RedpandaSchemaRegistry(REDPANDA_REGISTRY_URL)
13
14flow = Dataflow("schema_registry")
15kinp = kop.input("kafka-in", flow, brokers=BROKERS, topics=IN_TOPICS)
16op.inspect("inspect-kafka-errors", kinp.errs).then(op.raises, "kafka-error")
17key_de = registry.deserializer(SchemaRef("sensor-key"))
18val_de = registry.deserializer(SchemaRef("sensor-value"))
19msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
20op.inspect("inspect-deser", msgs.errs).then(op.raises, "deser-error")

The bytewax.connectors.kafka.operators.deserialize operator accepts a SchemaDeserializer for both the key and value of the Kafka message.

If the deserialization step encounters an error, a separate stream of .errs is returned that can be used for error handling.

When integrating with the Confluent schema registry, new schema versions will attempt to be fetched when a message with a new schema id is encountered. When using the Redpanda schema registry, dataflows will need to be restarted in order to fetch new versions of a schema.

Implementing custom ser/de classes#

Bytewax includes support for creating your own schema registry implementation, or custom (de)serializers.

As a trivial example, we can implement a class that uses the orjson library to deserialize a JSON payload from bytes.

 1import orjson
 2
 3from typing import Dict, Any
 4
 5from bytewax import operators as op
 6from bytewax.connectors.kafka.serde import SchemaDeserializer
 7
 8from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage, KafkaSink
 9from bytewax.dataflow import Dataflow
10
11BROKERS = ["localhost:19092"]
12IN_TOPICS = ["in_topic"]
13
14
15class KeyDeserializer(SchemaDeserializer[bytes, str]):
16    def de(self, obj: bytes) -> str:
17        return str(obj)
18
19
20class JSONDeserializer(SchemaDeserializer[bytes, Dict]):
21    def de(self, obj: bytes) -> Dict[Any, Any]:
22        return orjson.loads(obj)
23
24
25brokers = ["localhost:19092"]
26val_de = JSONDeserializer()
27key_de = KeyDeserializer()
28
29flow = Dataflow("example")
30kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
31json_stream = kop.deserialize(
32    "load_json", kinp.oks, key_deserializer=key_de, val_deserializer=val_de
33)
34op.inspect("inspect", json_stream.oks)
Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now