bytewax.connectors.kafka.operators
#
Operators for the kafka source and sink.
It’s suggested to import operators like this:
from bytewax.connectors.kafka import operators as kop
And then you can use the operators like this:
from bytewax.dataflow import Dataflow
flow = Dataflow("kafka-in-out")
kafka_input = kop.input("kafka_inp", flow, brokers=[...], topics=[...])
kop.output("kafka-out", kafka_input.oks, brokers=[...], topic="...")
Data#
Classes#
Functions#
- input(
- step_id: str,
- flow: Dataflow,
- *,
- brokers: List[str],
- topics: List[str],
- tail: bool = True,
- starting_offset: int = OFFSET_BEGINNING,
- add_config: Optional[Dict[str, str]] = None,
- batch_size: int = 1000,
Consume from Kafka as an input source.
Partitions are the unit of parallelism. Can support exactly-once processing.
- Parameters:
step_id – Unique Id.
flow – Dataflow.
brokers – List of
host:port
strings of Kafka brokers.topics – List of topics to consume from.
tail – Whether to wait for new data on this topic when the end is initially reached.
starting_offset – Can be either
confluent_kafka.OFFSET_BEGINNING
orconfluent_kafka.OFFSET_END
.add_config – Any additional configuration properties. See the
rdkafka
docs for options.batch_size – How many messages to consume at most at each poll.
- Returns:
A stream of consumed items and a stream of consumer errors.
- output(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[Optional[bytes], Optional[bytes]], KafkaSinkMessage[Optional[bytes], Optional[bytes]]]],
- *,
- brokers: List[str],
- topic: str,
- add_config: Optional[Dict[str, str]] = None,
Produce to Kafka as an output sink.
Default partition routing is used.
Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.
- Parameters:
step_id – Unique ID.
up – Stream of fully serialized messages. Key and value must be
Optional[bytes]
.brokers – List of
host:port
strings of Kafka brokers.topic – Topic to produce to. If individual items have
topic
set, will override this per-message.add_config – Any additional configuration properties. See the
rdkafka
docs for options.
- deserialize_key(
- step_id: str,
- up: Stream[KafkaSourceMessage[Optional[bytes], V]],
- deserializer: Deserializer,
Deserialize Kafka message keys.
- Parameters:
step_id – Unique ID.
up – Stream.
deserializer – To use.
- Returns:
Stream of deserialized messages and a stream of deserialization errors.
- deserialize_value(
- step_id: str,
- up: Stream[KafkaSourceMessage[K, Optional[bytes]]],
- deserializer: Deserializer,
Deserialize Kafka message values.
- Parameters:
step_id – Unique ID.
up – Stream.
deserializer – To use.
- Returns:
Stream of deserialized messages and a stream of deserialization errors.
- deserialize(
- step_id: str,
- up: Stream[KafkaSourceMessage[Optional[bytes], Optional[bytes]]],
- *,
- key_deserializer: Deserializer,
- val_deserializer: Deserializer,
Deserialize Kafka messages.
If there is an error on deserializing either key or value, the original message will be attached to the error.
- Parameters:
step_id – Unique ID.
up – Stream.
key_deserializer – To use.
val_deserializer – To use.
- Returns:
Stream of deserialized messages and a stream of deserialization errors.
- serialize_key(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[Any, V], KafkaSinkMessage[Any, V]]],
- serializer: Serializer,
Serialize Kafka message keys.
If there is an error on serializing, this operator will raise an exception.
- Parameters:
step_id – Unique ID.
up – Stream. Will automatically convert source messages to sink messages via
to_sink
.serializer – To use.
- Returns:
Stream of serialized messages.
- serialize_value(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[K, Any], KafkaSinkMessage[K, Any]]],
- serializer: Serializer,
Serialize Kafka message values.
If there is an error on serializing, this operator will raise an exception.
- Parameters:
step_id – Unique ID.
up – Stream. Will automatically convert source messages to sink messages via
to_sink
.serializer – To use.
- Returns:
Stream of serialized messages.
- serialize(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[Any, Any], KafkaSinkMessage[Any, Any]]],
- *,
- key_serializer: Serializer,
- val_serializer: Serializer,
Serialize Kafka messages.
If there is an error on serializing, this operator will raise an exception.
- Parameters:
step_id – Unique ID.
up – Stream. Will automatically convert source messages to sink messages via
to_sink
.key_serializer – To use.
val_serializer – To use.
- Returns:
Stream of serialized messages.