bytewax._bytewax
#
Internal Bytewax symbols from Rust.
These are re-imported elsewhere in the public bytewax
module for
use.
Classes#
- class BytewaxTracer#
Utility class used to handle tracing.
It keeps a tokio runtime that is alive as long as the struct itself.
This should only be built via
setup_tracing
.
- class RecoveryConfig(db_dir, backup_interval=None)#
Configuration settings for recovery.
- Parameters:
db_dir (Path) – Local filesystem directory to search for recovery database partitions.
backup_interval (Optional[timedelta]) – Amount of system time to wait to permanently delete a state snapshot after it is no longer needed. You should set this to the interval at which you are backing up the recovery partitions off of the workers into archival storage (e.g. S3). Defaults to zero duration.
Initialization
- property backup_interval#
- property db_dir#
- class TracingConfig#
Base class for tracing/logging configuration.
There defines what to do with traces and logs emitted by Bytewax.
Use a specific subclass of this to configure where you want the traces to go.
Initialization
- class JaegerConfig(service_name, endpoint=None, sampling_ratio=1.0)#
- Bases:
Configure tracing to send traces to a Jaeger instance.
The endpoint can be configured with the parameter passed to this config, or with two environment variables:
OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1" OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"
- Parameters:
Initialization
- property endpoint#
- property sampling_ratio#
- property service_name#
- class OtlpTracingConfig(service_name, url=None, sampling_ratio=1.0)#
- Bases:
Send traces to the OpenTelemetry collector.
See OpenTelemetry collector docs for more info.
Only supports GRPC protocol, so make sure to enable it on your OTEL configuration.
This is the recommended approach since it allows the maximum flexibility in what to do with all the data bytewax can generate.
- Parameters:
Initialization
- property sampling_ratio#
- property service_name#
- property url#
Functions#
- cli_main(
- flow,
- *,
- workers_per_process=1,
- process_id=None,
- addresses=None,
- epoch_interval=None,
- recovery_config=None,
- cluster_main(
- flow,
- addresses,
- proc_id,
- *,
- epoch_interval=None,
- recovery_config=None,
- worker_count_per_proc=1,
Execute a dataflow in the current process as part of a cluster.
This is only used for unit testing. See
bytewax.run
.Blocks until execution is complete.
from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource, cluster_main from bytewax.connectors.stdio import StdOutSink flow = Dataflow("my_df") s = op.input("inp", flow, TestingSource(range(3))) op.output("out", s, StdOutSink()) # In a real example, use "host:port" of all other workers. addresses = [] proc_id = 0 cluster_main(flow, addresses, proc_id)
0 1 2
- Parameters:
flow (Dataflow) – Dataflow to run.
addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).
proc_id (int) – Index of this process in cluster; starts from 0.
epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.
recovery_config (Optional[RecoveryConfig]) – State recovery config. If
None
, state will not be persisted.worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to
1
.
- init_db_dir(db_dir, count)#
Create and init a set of empty recovery partitions.
- run_main(flow, *, epoch_interval=None, recovery_config=None)#
Execute a dataflow in the current thread.
Blocks until execution is complete.
This is only used for unit testing. See
bytewax.run
.from bytewax.dataflow import Dataflow import bytewax.operators as op from bytewax.testing import TestingSource, run_main from bytewax.connectors.stdio import StdOutSink flow = Dataflow("my_df") s = op.input("inp", flow, TestingSource(range(3))) op.output("out", s, StdOutSink()) run_main(flow)
0 1 2
- Parameters:
flow (Dataflow) – Dataflow to run.
epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.
recovery_config (Optional[RecoveryConfig]) – State recovery config. If
None
, state will not be persisted.
- setup_tracing(tracing_config=None, log_level=None)#
Setup Bytewax’s internal tracing and logging.
By default it starts a tracer that logs all
ERROR
-level messages to stdout.Note: To make this work, you have to keep a reference of the returned object.
1from bytewax.tracing import setup_tracing 2 3tracer = setup_tracing()
- Parameters:
tracing_config (TracingConfig) – The specific backend you want to use.
log_level (str) – String of the log level. One of
"ERROR"
,"WARN"
,"INFO"
,"DEBUG"
,"TRACE"
. Defaults to"ERROR"
.