bytewax.connectors.kafka.operators#

Operators for the kafka source and sink.

Classes#

class KafkaSourceOut#
Bases:

Split output for KafkaSource.

Returns an object with two attributes: - .oks is a stream of KafkaSourceMessage. - .errs is a stream of KafkaError.

oks: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]#
errs: Stream[KafkaError[bytewax.connectors.kafka._types.K2, bytewax.connectors.kafka._types.V2]]#

Functions#

input(
step_id: str,
flow: Dataflow,
*,
brokers: List[str],
topics: List[str],
tail: bool = True,
starting_offset: int = OFFSET_BEGINNING,
add_config: Optional[Dict[str, str]] = None,
batch_size: int = 1000,
) KafkaSourceOut[bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes]#

Use a set of Kafka topics as an input source.

Partitions are the unit of parallelism. Can support exactly-once processing.

Messages are emitted into the dataflow as bytewax.connectors.kafka.KafkaSourceMessage objects.

Args: step_id: Unique name for this step

flow: A Dataflow

brokers: List of `host:port` strings of Kafka brokers.

topics: List of topics to consume from.

tail: Whether to wait for new data on this topic when the
    end is initially reached.

starting_offset: Can be either `confluent_kafka.OFFSET_BEGINNING` or
    `confluent_kafka.OFFSET_END`. Defaults to beginning of
    topic.

add_config: Any additional configuration properties. See the `rdkafka`
    [docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
    for options.

batch_size: How many messages to consume at most at each poll. Defaults to 1000.
output(
step_id: str,
up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes], KafkaSinkMessage[bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes]]],
*,
brokers: List[str],
topic: str,
add_config: Optional[Dict[str, str]] = None,
) None#

Use a single Kafka topic as an output sink.

Items consumed from the dataflow must be either a KafkaSourceMessage or a KafkaSinkMessage with both key and values expressed as str | bytes | None. Default partition routing is used.

Workers are the unit of parallelism.

Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.

Args: step_id: Unique name for this step

up: A stream of `KafkaSourceMessage | KafkaSinkMessage`

brokers: List of `host:port` strings of Kafka brokers.

topic: Topic to produce to.

add_config: Any additional configuration properties. See the `rdkafka`
    [docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
    for options.
deserialize_key(
step_id: str,
up: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]],
deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
) KafkaSourceOut[bytewax.connectors.kafka._types.K2, bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]#

Deserialize the key of a KafkaSourceMessage using the provided deserializer.

Returns an object with two attributes:

- .oks: A stream of `KafkaSourceMessage`
- .errs: A stream of `KafkaError`
deserialize_value(
step_id: str,
up: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]],
deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
) KafkaSourceOut[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V2, bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]#

Deserialize the value of a KafkaSourceMessage using the provided deserializer.

Returns an object with two attributes:

- .oks: A stream of `KafkaSourceMessage`
- .errs: A stream of `KafkaError`
deserialize(
step_id: str,
up: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]],
*,
key_deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
val_deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
) KafkaSourceOut[bytewax.connectors.kafka._types.K2, bytewax.connectors.kafka._types.V2, bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]#

Serialize both keys and values with the given serializers.

Returns an object with two attributes:

- .oks: A stream of `KafkaSourceMessage`
- .errs: A stream of `KafkaError`

A message will be put in .errs even if only one of the deserializers fail.

serialize_key(
step_id: str,
up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V], KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]],
serializer: SchemaSerializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
) Stream[KafkaSinkMessage[bytewax.connectors.kafka._types.K2, bytewax.connectors.kafka._types.V]]#

Serialize the key of a kafka message using the provided SchemaSerializer.

It accepts both KafkaSourceMessage and KafkaSinkMessage. KafkaSourceMessages will be automatically converted to a KafkaSinkMessage ignoring all metadata.

Crash if any error occurs.

serialize_value(
step_id: str,
up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V], KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]],
serializer: SchemaSerializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
) Stream[KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V2]]#

Serialize the value of a kafka message using the provided SchemaSerializer.

It accepts both KafkaSourceMessage and KafkaSinkMessage. KafkaSourceMessages will be automatically converted to a KafkaSinkMessage ignoring all metadata.

Crash if any error occurs.

serialize(
step_id: str,
up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V], KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]],
*,
key_serializer: SchemaSerializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
val_serializer: SchemaSerializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
) Stream[KafkaSinkMessage[bytewax.connectors.kafka._types.K2, bytewax.connectors.kafka._types.V2]]#

Serialize both keys and values with the given serializers.

It accepts both KafkaSourceMessage and KafkaSinkMessage. KafkaSourceMessages will be automatically converted to a KafkaSinkMessage ignoring all metadata.

Crash if any error occurs.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now