influxdb#

Sink and source connectors for influxdb.

Data#

logger#

Classes#

class InfluxDBSource(
interval: timedelta,
host: str,
database: str,
token: str,
measurement: str,
org: Optional[str] = None,
start_time: Union[datetime, None] = None,
)#
Bases:

A source for reading data from an InfluxDB instance in fixed partitions.

Initialization

Influx DB Source Connector.

Manages the connection to an InfluxDB instance and reads data at fixed intervals, partitioning the data flow using a single partition (singleton). It supports resuming from a specific state based on the last read time.

Args: interval (timedelta): The time interval between polls. host (str): The InfluxDB host URL. database (str): The name of the InfluxDB database. token (str): The authentication token for InfluxDB. measurement (str): The InfluxDB measurement to query. org (Optional[str]): The InfluxDB organization name. start_time (Union[datetime, None], optional): The starting point for the data query. If None, it defaults to the current time minus the interval.

Methods: list_parts(): Lists the partitions managed by this source. build_part(step_id, for_part, resume_state): Builds and returns a specific partition.

list_parts() List[str]#

List the partitions managed by this source.

This Source is limited to a single client connection/query.

Returns: List[str]: A list containing the partition identifier(s).

build_part(
step_id: str,
for_part: str,
resume_state: Union[Any, None],
) influxdb._InfluxDBSourcePartition#

Build and return a specific partition for the data source.

This method creates an _InfluxDBPartition instance that handles the actual data retrieval. It resumes from the last known state if available, or starts from the specified start_time or interval.

Args: step_id (str): The ID of the current step in the dataflow. for_part (str): The partition identifier, should always be “singleton”. resume_state (Optional[dict]): The state to resume from, typically containing the last read time.

Returns: _InfluxDBPartition: An instance of the partition that handles data retrieval.

class InfluxDBSink(
host: str,
database: str,
token: str,
org: str,
write_precision: str = DEFAULT_WRITE_PRECISION,
**kwargs: Optional[Any],
)#
Bases:

A dynamic sink for writing data to InfluxDB.

Initialization

Influx DB sink connector.

This class handles the creation of sink partitions that perform the actual data writing to InfluxDB. It is used within the Bytewax dataflow to send data to InfluxDB. Can be used with multiple workers.

Args: host (str): The InfluxDB host URL. database (str): The name of the InfluxDB database. token (str): The authentication token for InfluxDB. org (str): The InfluxDB organization name. write_precision (str, optional): The precision for writing points. Defaults to the clients default precision. **kwargs: Additional parameters passed to the InfluxDB client.

Methods: build(step_id, worker_index, worker_count): Builds and returns an _InfluxDBSinkPartition.

build(
step_id: str,
worker_index: int,
worker_count: int,
) influxdb._InfluxDBSinkPartition#

Build the individual sink partitions.

creates an _InfluxDBPartition on each worker

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now