Kafka#
Bytewax ships with connectors for Kafka
and Kafka compatible systems, like Redpanda
in the bytewax.connectors.kafka
module.
In this section, we’ll discuss these connectors in detail, as well as provide some important operational information about using them as sources and sinks.
Connecting to Kafka#
Bytewax provides two basic ways to connect to Kafka.
You can use KafkaSource
and
KafkaSink
directly:
from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSinkMessage
from bytewax import operators as op
from bytewax.dataflow import Dataflow
brokers = ["localhost:19092"]
flow = Dataflow("example")
kinp = op.input("kafka-in", flow, KafkaSource(brokers, ["in-topic"]))
processed = op.map("map", kinp, lambda x: KafkaSinkMessage(x.key, x.value))
op.output("kafka-out", processed, KafkaSink(brokers, "out-topic"))
Or use the bytewax.connectors.kafka.operators.input
operator:
from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage
from bytewax import operators as op
from bytewax.dataflow import Dataflow
brokers = ["localhost:19092"]
flow = Dataflow("example")
kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
processed = op.map("map", kinp.oks, lambda x: KafkaSinkMessage(x.key, x.value))
kop.output("kafka-out", processed, brokers=brokers, topic="out-topic")
Typical use cases should prefer the Kafka operators, while the
KafkaSource
connector can
be used for other customizations.
Error Handling#
By default, KafkaSource
will raise
an exception whenever an error is encountered when consuming from
Kafka. This behavior can be configured with the raise_on_errors
parameter, which will yield
KafkaError
items. Those
errors can be handled downstream individually.
The bytewax.connectors.kafka.operators.input
operator
returns a dataclass containing two output streams. The .oks
field is
a stream of
KafkaSourceMessage
that
were successfully processed. The .errs
field is a stream of
KafkaError
messages where an
error was encountered. Items that encountered an error have their
.err
field set with more details about the error.
flow = Dataflow("example")
kinp = kop.input("kafka-in-2", flow, brokers=brokers, topics=["in-topic"])
# Print out errors that are encountered, and then raise an exception
op.inspect("inspect_err", kinp.errs).then(op.raises, "raise_errors")
Note that if no processing is attached to the .errs
stream of
messages, they will be silently dropped, and processing will continue.
Alternatively, error messages in the .errs
stream can be published
to a “dead letter queue”, a separate Kafka topic where they can be
inspected and reprocessed later, while allowing the dataflow to
continue processing data.
Batch Sizes#
By default, Bytewax will consume a batch of up to 1000 messages at a time from Kafka. The default setting is often sufficient, but some dataflows may benefit from a smaller batch size to decrease overall latency.
If your dataflow would benefit from lower latency, you can set the
batch_size
parameter to a lower value. The batch_size
parameter
configures the maximum number of messages that will be fetched at a
time from each worker.
Message Types#
Messages received from KafkaSource
are emitted into the dataflow as items of type
KafkaSourceMessage
. This dataclass
includes basic Kafka fields like .key
and .value
, as well as extra
information fields like .headers
.
Messages that are published to a
KafkaSink
must be of type
KafkaSinkMessage
You can create a KafkaSinkMessage
with the data you want:
msg = KafkaSinkMessage(key=None, value="some_value")
And you can optionally set topic
, headers
, partition
and timestamp
.
The output
operator also accepts messages of type
KafkaSourceMessage
. They are
automatically converted to
KafkaSinkMessage
keeping only
.key
and .value
from
KafkaSourceMessage
.
Dynamically Writing to Different Topics#
Setting the topic
field of a
KafkaSinkMessage
will cause that
message to be written to that topic.
Additionally, the KafkaSink
class
can be constructed without specifying a topic:
op.output("kafka-dynamic-out", processed, KafkaSink(brokers, topic=None))
Writes to this output will be written to the topic that is specified
when creating a KafkaSinkMessage
.
KafkaSinkMessage(msg.key, msg.value, topic="out-topic-1")
Note that not setting a topic for a
KafkaSinkMessage
when KafkaSink
is not configured with a default topic will result in a runtime error.
Kafka and Recovery#
Typical deployments of Kafka utilize consumer groups in order to manage partition assignment and the storing of Kafka offsets.
Bytewax does not use consumer groups to store offsets or assign Kafka topic partitions to consumers. In order to correctly support recovery, Bytewax must manage and store the consumer offsets in Bytewax recovery partitions.
When recovery is not enabled, Bytewax will start consuming from each
partition using the earliest available offset. This setting can be
changed when creating a new KafkaSource
with the starting_offset
parameter, which accepts the types defined in the
confluent_kafka
library.
If you are not using recovery and would prefer to track offsets on the broker side, you can pass additional options to the Kafka input sources to create a consumer group:
from confluent_kafka import OFFSET_STORED
add_config = {"group.id": "consumer_group", "enable.auto.commit": "true"}
BROKERS = ["localhost:19092"]
IN_TOPICS = ["in_topic"]
flow = Dataflow("kafka_in_out")
kinp = kop.input(
"inp",
flow,
starting_offset=OFFSET_STORED,
add_config=add_config,
brokers=BROKERS,
topics=IN_TOPICS,
)
The code above creates a Kafka consumer that uses a group_id
of
consumer_group
that periodically commits it’s consumed offsets
according to the auto.commit.interval.ms
parameter. By default, this
interval is set to 5000ms.
It is important to note that this interval is not coordinated with any processing steps within Bytewax. As a result, some messages may not be processed if the dataflow crashes.
Partitions#
Partitions are a fundamental concept for Kafka and Redpanda, and are the unit of parallelism for producing and consuming messages.
When multiple workers are started, Bytewax will assign individual Kafka topic partitions to the available number of workers. If there are fewer partitions than workers, some workers will not be assigned a partition to read from. If there are more partitions than workers, some workers will handle more than one partition.
If the number of partitions changes, Dataflows will need to be restarted in order to rebalance new partition assignments to workers.
Serialization and Deserialization#
Bytewax supports (de)serialization of messages using serializers that
conforms to confluent_kafka.serialization.Serializer
and
confluent_kafka.serialization.Deserializer
interface.
The bytewax.connectors.kafka.operators
module offers some
custom operators to help with that. If you are working with
confluent’s python libraries, you can use confluent’s schema registry
client and (de)serializers directly:
from bytewax.dataflow import Dataflow
from bytewax.connectors.kafka import operators as kop
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
conf = {
"url": "http://registry.invalid/",
}
client = SchemaRegistryClient(conf)
# We don't need to specify the schema, as the client handles that on its own
# if you are serializing messages with confluent's library, that uses
# a custom wire format that includes schema_id in each message.
key_de = AvroDeserializer(client)
val_de = AvroDeserializer(client)
# Initialize the flow, read from kafka, and deserialize messages
flow = Dataflow("schema_registry")
kinp = kop.input(
"inp",
flow,
starting_offset=OFFSET_STORED,
add_config=add_config,
brokers=BROKERS,
topics=IN_TOPICS,
)
msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
This works with Redpanda
’s schema registry too.
If you are serializing messages with other libraries that do not use confluent’s wire format, you’ll need to use a different deserializer. The connector offers (de)serializers for plain avro format:
1client = SchemaRegistryClient(conf)
2
3# Here we do need to specify the schema we want to use, as the schema_id
4# is not included in plain avro messages. We can use the client to retrieve
5# the schema and pass it to the deserializers:
6key_schema = client.get_latest_version("sensor_key").schema
7key_de = PlainAvroDeserializer(schema=key_schema.schema_str)
8val_schema = client.get_latest_version("sensor_value").schema
9val_de = PlainAvroDeserializer(schema=val_schema.schema_str)
10
11# Same as before...
12flow = Dataflow("schema_registry")
13kinp = kop.input(
14 "inp",
15 flow,
16 starting_offset=OFFSET_STORED,
17 add_config=add_config,
18 brokers=BROKERS,
19 topics=IN_TOPICS,
20)
21msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
Custom Serialization Format#
Bytewax includes support for creating your own schema registry implementation, or custom (de)serializers.
As a trivial example, we can implement a class that uses the orjson library to deserialize a JSON payload from bytes.
import orjson
from typing import Optional
from confluent_kafka.serialization import (
Serializer,
Deserializer,
StringDeserializer,
SerializationContext,
)
from bytewax import operators as op
from bytewax.connectors.kafka import operators as kop, KafkaSinkMessage, KafkaSink
from bytewax.dataflow import Dataflow
BROKERS = ["localhost:19092"]
IN_TOPICS = ["in_topic"]
class OrJSONSerializer(Serializer):
def __call__(
self, obj: Optional[object], ctx: Optional[SerializationContext] = None
) -> Optional[bytes]:
if obj is None:
return None
return orjson.dumps(obj)
class OrJSONDeserializer(Deserializer):
def __call__(
self, value: Optional[bytes], ctx: Optional[SerializationContext] = None
) -> Optional[object]:
if value is None:
return None
return orjson.loads(value)
brokers = ["localhost:19092"]
val_de = OrJSONDeserializer()
key_de = StringDeserializer()
flow = Dataflow("example")
kinp = kop.input("kafka-in", flow, brokers=brokers, topics=["in-topic"])
json_stream = kop.deserialize(
"load_json",
kinp.oks,
key_deserializer=key_de,
val_deserializer=val_de,
)
op.inspect("inspect", json_stream.oks)