bytewax.connectors.kafka.operators#

Operators for the kafka source and sink.

It’s suggested to import operators like this:

1from bytewax.connectors.kafka import operators as kop

And then you can use the operators like this:

1from bytewax.dataflow import Dataflow
2
3flow = Dataflow("kafka-in-out")
4kafka_input = kop.input("kafka_inp", flow, brokers=[...], topics=[...])
5kop.output("kafka-out", kafka_input.oks, brokers=[...], topic="...")

Data#

X: TypeVar#

Type of sucessfully processed items.

E: TypeVar#

Type of errors.

Classes#

class KafkaOpOut#
Bases:

Result streams from Kafka operators.

oks: Stream[X]#

Successfully processed items.

errs: Stream[E]#

Errors.

Functions#

input(
step_id: str,
flow: Dataflow,
*,
brokers: List[str],
topics: List[str],
tail: bool = True,
starting_offset: int = OFFSET_BEGINNING,
add_config: Optional[Dict[str, str]] = None,
batch_size: int = 1000,
) KafkaOpOut[SerializedKafkaSourceMessage, KafkaSourceError]#

Consume from Kafka as an input source.

Partitions are the unit of parallelism. Can support exactly-once processing.

Parameters:
  • step_id – Unique Id.

  • flow – Dataflow.

  • brokers – List of host:port strings of Kafka brokers.

  • topics – List of topics to consume from.

  • tail – Whether to wait for new data on this topic when the end is initially reached.

  • starting_offset – Can be either confluent_kafka.OFFSET_BEGINNING or confluent_kafka.OFFSET_END.

  • add_config – Any additional configuration properties. See the rdkafka docs for options.

  • batch_size – How many messages to consume at most at each poll.

Returns:

A stream of consumed items and a stream of consumer errors.

output(
step_id: str,
up: Stream[Union[SerializedKafkaSourceMessage, SerializedKafkaSinkMessage]],
*,
brokers: List[str],
topic: str,
add_config: Optional[Dict[str, str]] = None,
) None#

Produce to Kafka as an output sink.

Default partition routing is used.

Workers are the unit of parallelism.

Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.

Parameters:
  • step_id – Unique ID.

  • up – Stream of fully serialized messages. Key and value must be MaybeStrBytes.

  • brokers – List of host:port strings of Kafka brokers.

  • topic – Topic to produce to. If individual items have topic set, will override this per-message.

  • add_config – Any additional configuration properties. See the rdkafka docs for options.

deserialize_key(
step_id: str,
up: Stream[KafkaSourceMessage[MaybeStrBytes, V]],
deserializer: Deserializer,
) KafkaOpOut[KafkaSourceMessage[dict, V], KafkaError[MaybeStrBytes, V]]#

Deserialize Kafka message keys.

Parameters:
  • step_id – Unique ID.

  • up – Stream.

  • deserializer – To use.

Returns:

Stream of deserialized messages and a stream of deserialization errors.

deserialize_value(
step_id: str,
up: Stream[KafkaSourceMessage[K, MaybeStrBytes]],
deserializer: Deserializer,
) KafkaOpOut[KafkaSourceMessage[K, dict], KafkaError[K, MaybeStrBytes]]#

Deserialize Kafka message values.

Parameters:
  • step_id – Unique ID.

  • up – Stream.

  • deserializer – To use.

Returns:

Stream of deserialized messages and a stream of deserialization errors.

deserialize(
step_id: str,
up: Stream[KafkaSourceMessage[MaybeStrBytes, MaybeStrBytes]],
*,
key_deserializer: Deserializer,
val_deserializer: Deserializer,
) KafkaOpOut[KafkaSourceMessage[dict, dict], KafkaError[MaybeStrBytes, MaybeStrBytes]]#

Deserialize Kafka messages.

If there is an error on deserializing either key or value, the original message will be attached to the error.

Parameters:
  • step_id – Unique ID.

  • up – Stream.

  • key_deserializer – To use.

  • val_deserializer – To use.

Returns:

Stream of deserialized messages and a stream of deserialization errors.

serialize_key(
step_id: str,
up: Stream[Union[KafkaSourceMessage[dict, V], KafkaSinkMessage[dict, V]]],
serializer: Serializer,
) Stream[KafkaSinkMessage[bytes, V]]#

Serialize Kafka message keys.

If there is an error on serializing, this operator will raise an exception.

Parameters:
  • step_id – Unique ID.

  • up – Stream. Will automatically convert source messages to sink messages via to_sink.

  • serializer – To use.

Returns:

Stream of serialized messages.

serialize_value(
step_id: str,
up: Stream[Union[KafkaSourceMessage[K, dict], KafkaSinkMessage[K, dict]]],
serializer: Serializer,
) Stream[KafkaSinkMessage[K, bytes]]#

Serialize Kafka message values.

If there is an error on serializing, this operator will raise an exception.

Parameters:
  • step_id – Unique ID.

  • up – Stream. Will automatically convert source messages to sink messages via to_sink.

  • serializer – To use.

Returns:

Stream of serialized messages.

serialize(
step_id: str,
up: Stream[Union[KafkaSourceMessage[dict, dict], KafkaSinkMessage[dict, dict]]],
*,
key_serializer: Serializer,
val_serializer: Serializer,
) Stream[KafkaSinkMessage[bytes, bytes]]#

Serialize Kafka messages.

If there is an error on serializing, this operator will raise an exception.

Parameters:
  • step_id – Unique ID.

  • up – Stream. Will automatically convert source messages to sink messages via to_sink.

  • key_serializer – To use.

  • val_serializer – To use.

Returns:

Stream of serialized messages.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now