Kafka#
Bytewax ships with connectors for Kafka
and Kafka compatible systems, like Redpanda
in the bytewax.connectors.kafka
module.
In this section, we’ll discuss these connectors in detail, as well as provide some important operational information about using them as sources and sinks.
Connecting to Kafka#
Bytewax provides two basic ways to connect to Kafka.
You can use KafkaSource
and
KafkaSink
directly:
1from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage
2from bytewax import operators as op
3from bytewax.dataflow import Dataflow
4
5brokers = ["localhost:19092"]
6flow = Dataflow("example")
7kinp = op.input("kafka-in", flow, KafkaSource(brokers, ["in-topic"]))
8processed = op.map("map", kinp, lambda x: KafkaSinkMessage(x.key, x.value))
9op.output("kafka-out", processed, KafkaSink(brokers, "out-topic"))
Or use the bytewax.connectors.kafka.operators.input
operator:
1from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage
2from bytewax import operators as op
3from bytewax.dataflow import Dataflow
4
5brokers = ["localhost:19092"]
6flow = Dataflow("example")
7kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
8processed = op.map("map", kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value))
9kop.output("kafka-out", processed, brokers=brokers, topic="out-topic")
Typical use cases should prefer the Kafka operators, while the
KafkaSource
connector can
be used for other customizations.
Error handling#
By default, KafkaSource
will raise
an exception whenever an error is encountered when consuming from
Kafka. This behavior can be configured with the raise_on_errors
parameter, which will yield
KafkaError
items. Those
errors can be handled downstream individually.
The bytewax.connectors.kafka.operators.input
operator
returns a dataclass containing two output streams. The .oks
field is
a stream of
KafkaSourceMessage
that
were successfully processed. The .errs
field is a stream of
KafkaError
messages where an
error was encountered. Items that encountered an error have their
.err
field set with more details about the error.
1flow = Dataflow("example")
2kinp = kop.input("kafka-in-2", flow, brokers=brokers, topics=["in-topic"])
3# Print out errors that are encountered, and then raise an exception
4op.inspect("inspect_err", kinp.errs).then(op.raises, "raise_errors")
Note that if no processing is attached to the .errs
stream of
messages, they will be silently dropped, and processing will continue.
Alternatively, error messages in the .errs
stream can be published
to a “dead letter queue”, a separate Kafka topic where they can be
inspected and reprocessed later, while allowing the dataflow to
continue processing data.
Batch sizes#
By default, Bytewax will consume a batch of up to 1000 messages at a time from Kafka. The default setting is often sufficient, but some dataflows may benefit from a smaller batch size to decrease overall latency.
If your dataflow would benefit from lower latency, you can set the
batch_size
parameter to a lower value. The batch_size
parameter
configures the maximum number of messages that will be fetched at a
time from each worker.
Message Types#
Messages received from
KafkaSource
are emitted
into the dataflow as items of type
KafkaSourceMessage
. This
dataclass includes basic Kafka fields like .key
and .value
, as
well as extra information fields like .headers
.
Messages that are published to a
KafkaSink
must be of type
KafkaSinkMessage
You can create a
KafkaSinkMessage
with the
data you want:
1msg = KafkaSinkMessage(key=None, value="some_value")
And you can optionally set topic
, headers
, partition
and timestamp
.
The output
operator also accepts messages of type
KafkaSourceMessage
. They
are automatically converted to
KafkaSinkMessage
keeping
only .key
and .value
from
KafkaSourceMessage
.
Dynamic topic writes#
Setting the topic
field of a
KafkaSinkMessage
will
cause that message to be written to that topic.
Additionally, the KafkaSink
class can be constructed without specifying a topic:
1op.output("kafka-dynamic-out", processed, KafkaSink(brokers, topic=None))
Writes to this output will be written to the topic that is specified
when creating a
KafkaSinkMessage
.
1KafkaSinkMessage(msg.key, msg.value, topic="out-topic-1")
Note that not setting a topic for a
KafkaSinkMessage
when
KafkaSink
is not configured with a default topic will result in a
runtime error.
Kafka and Recovery#
Typical deployments of Kafka utilize consumer groups in order to manage partition assignment and the storing of Kafka offsets.
Bytewax does not use consumer groups to store offsets or assign Kafka topic partitions to consumers. In order to correctly support recovery, Bytewax must manage and store the consumer offsets in Bytewax recovery partitions.
When recovery is not enabled, Bytewax will start consuming from each
partition using the earliest available offset. This setting can be
changed when creating a new KafkaSource
with the starting_offset
parameter, which accepts the types defined in the
confluent_kafka
library.
If you are not using recovery and would prefer to track offsets on the broker side, you can pass additional options to the Kafka input sources to create a consumer group:
1from confluent_kafka import OFFSET_STORED
2
3add_config = {"group.id": "consumer_group", "enable.auto.commit": "true"}
4BROKERS = ["localhost:19092"]
5IN_TOPICS = ["in_topic"]
6
7flow = Dataflow("kafka_in_out")
8kinp = kop.input(
9 "inp",
10 flow,
11 starting_offset=OFFSET_STORED,
12 add_config=add_config,
13 brokers=BROKERS,
14 topics=IN_TOPICS,
15)
The code above creates a Kafka consumer that uses a group_id
of
consumer_group
that periodically commits it’s consumed offsets
according to the auto.commit.interval.ms
parameter. By default, this
interval is set to 5000ms.
It is important to note that this interval is not coordinated with any processing steps within Bytewax. As a result, some messages may not be processed if the dataflow crashes.
Partitions#
Partitions are a fundamental concept for Kafka and Redpanda, and are the unit of parallelism for producing and consuming messages.
When multiple workers are started, Bytewax will assign individual Kafka topic partitions to the available number of workers. If there are fewer partitions than workers, some workers will not be assigned a partition to read from. If there are more partitions than workers, some workers will handle more than one partition.
If the number of partitions changes, Dataflows will need to be restarted in order to rebalance new partition assignments to workers.
Serialization and deserialization#
Bytewax supports (de)serialization of messages using serializers that conforms to
confluent_kafka.serialization.Serializer
and confluent_kafka.serialization.Deserializer
interface.
The bytewax.connectors.kafka.operators
module offers some custom operators to help with that.
If you are working with confluent’s python libraries, you can use confluent’s
schema registry client and (de)serializers directly:
1from bytewax.dataflow import Dataflow
2from bytewax.connectors.kafka import operators as kop
3from confluent_kafka.schema_registry import SchemaRegistryClient
4from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
5
6client = SchemaRegistryClient(...)
7# We don't need to specify the schema, as the client handles that on its own
8# if you are serializing messages with confluent's library, that uses
9# a custom wire format that includes schema_id in each message.
10key_de = AvroDeserializer(client)
11val_de = AvroDeserializer(client)
12
13# Initialize the flow, read from kafka, and deserialize messages
14flow = Dataflow("schema_registry")
15kinp = kop.input("kafka-in", flow, ...)
16msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
This works with Redpanda
’s schema registry too.
If you are serializing messages with other libraries that do not use confluent’s wire format, you’ll need to use a different deserializer. The connector offers (de)serializers for plain avro format:
1client = SchemaRegistryClient(...)
2
3# Here we do need to specify the schema we want to use, as the schema_id
4# is not included in plain avro messages. We can use the client to retrieve
5# the schema and pass it to the deserializers:
6key_schema = client.get_latest_version("sensor_key").schema
7key_de = PlainAvroDeserializer(schema=key_schema.schema_str)
8val_schema = client.get_latest_version("sensor_value").schema
9val_de = PlainAvroDeserializer(schema=val_schema.schema_str)
10
11# Same as before...
12flow = Dataflow("schema_registry")
13kinp = kop.input("kafka-in", flow, ...)
14msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
Implementing custom ser/de classes#
Bytewax includes support for creating your own schema registry implementation, or custom (de)serializers.
As a trivial example, we can implement a class that uses the orjson library to deserialize a JSON payload from bytes.
1import orjson
2
3from typing import Dict, Any
4
5from bytewax import operators as op
6from bytewax.connectors.kafka.serde import SchemaDeserializer
7
8from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage, KafkaSink
9from bytewax.dataflow import Dataflow
10
11BROKERS = ["localhost:19092"]
12IN_TOPICS = ["in_topic"]
13
14
15class KeyDeserializer(SchemaDeserializer[bytes, str]):
16 def de(self, obj: bytes) -> str:
17 return str(obj)
18
19
20class JSONDeserializer(SchemaDeserializer[bytes, Dict]):
21 def de(self, obj: bytes) -> Dict[Any, Any]:
22 return orjson.loads(obj)
23
24
25brokers = ["localhost:19092"]
26val_de = JSONDeserializer()
27key_de = KeyDeserializer()
28
29flow = Dataflow("example")
30kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
31json_stream = kop.deserialize(
32 "load_json", kinp.oks, key_deserializer=key_de, val_deserializer=val_de
33)
34op.inspect("inspect", json_stream.oks)