bytewax.connectors.kafka.operators#
Operators for the kafka source and sink.
Classes#
- class KafkaSourceOut#
-
Split output for KafkaSource.
Returns an object with two attributes: -
.oksis a stream ofKafkaSourceMessage. -.errsis a stream ofKafkaError.- oks: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]#
- errs: Stream[KafkaError[bytewax.connectors.kafka._types.K2, bytewax.connectors.kafka._types.V2]]#
Functions#
- input(
- step_id: str,
- flow: Dataflow,
- *,
- brokers: List[str],
- topics: List[str],
- tail: bool = True,
- starting_offset: int = OFFSET_BEGINNING,
- add_config: Optional[Dict[str, str]] = None,
- batch_size: int = 1000,
Use a set of Kafka topics as an input source.
Partitions are the unit of parallelism. Can support exactly-once processing.
Messages are emitted into the dataflow as
bytewax.connectors.kafka.KafkaSourceMessageobjects.Args: step_id: Unique name for this step
flow: A Dataflow brokers: List of `host:port` strings of Kafka brokers. topics: List of topics to consume from. tail: Whether to wait for new data on this topic when the end is initially reached. starting_offset: Can be either `confluent_kafka.OFFSET_BEGINNING` or `confluent_kafka.OFFSET_END`. Defaults to beginning of topic. add_config: Any additional configuration properties. See the `rdkafka` [docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for options. batch_size: How many messages to consume at most at each poll. Defaults to 1000.
- output(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes], KafkaSinkMessage[bytewax.connectors.kafka._types.MaybeStrBytes, bytewax.connectors.kafka._types.MaybeStrBytes]]],
- *,
- brokers: List[str],
- topic: str,
- add_config: Optional[Dict[str, str]] = None,
Use a single Kafka topic as an output sink.
Items consumed from the dataflow must be either a
KafkaSourceMessageor aKafkaSinkMessagewith both key and values expressed asstr | bytes | None. Default partition routing is used.Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume epoch will be duplicated right after resume.
Args: step_id: Unique name for this step
up: A stream of `KafkaSourceMessage | KafkaSinkMessage` brokers: List of `host:port` strings of Kafka brokers. topic: Topic to produce to. add_config: Any additional configuration properties. See the `rdkafka` [docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) for options.
- deserialize_key(
- step_id: str,
- up: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]],
- deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
Deserialize the key of a KafkaSourceMessage using the provided deserializer.
Returns an object with two attributes:
- .oks: A stream of `KafkaSourceMessage` - .errs: A stream of `KafkaError`
- deserialize_value(
- step_id: str,
- up: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]],
- deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
Deserialize the value of a KafkaSourceMessage using the provided deserializer.
Returns an object with two attributes:
- .oks: A stream of `KafkaSourceMessage` - .errs: A stream of `KafkaError`
- deserialize(
- step_id: str,
- up: Stream[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]],
- *,
- key_deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
- val_deserializer: SchemaDeserializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
Serialize both keys and values with the given serializers.
Returns an object with two attributes:
- .oks: A stream of `KafkaSourceMessage` - .errs: A stream of `KafkaError`
A message will be put in .errs even if only one of the deserializers fail.
- serialize_key(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V], KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]],
- serializer: SchemaSerializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
Serialize the key of a kafka message using the provided SchemaSerializer.
It accepts both KafkaSourceMessage and KafkaSinkMessage. KafkaSourceMessages will be automatically converted to a KafkaSinkMessage ignoring all metadata.
Crash if any error occurs.
- serialize_value(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V], KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]],
- serializer: SchemaSerializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
Serialize the value of a kafka message using the provided SchemaSerializer.
It accepts both KafkaSourceMessage and KafkaSinkMessage. KafkaSourceMessages will be automatically converted to a KafkaSinkMessage ignoring all metadata.
Crash if any error occurs.
- serialize(
- step_id: str,
- up: Stream[Union[KafkaSourceMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V], KafkaSinkMessage[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.V]]],
- *,
- key_serializer: SchemaSerializer[bytewax.connectors.kafka._types.K, bytewax.connectors.kafka._types.K2],
- val_serializer: SchemaSerializer[bytewax.connectors.kafka._types.V, bytewax.connectors.kafka._types.V2],
Serialize both keys and values with the given serializers.
It accepts both KafkaSourceMessage and KafkaSinkMessage. KafkaSourceMessages will be automatically converted to a KafkaSinkMessage ignoring all metadata.
Crash if any error occurs.