bytewax._bytewax#

Internal Bytewax symbols from Rust.

These are re-imported elsewhere in the public bytewax module for use.

Classes#

class BytewaxTracer#

Utility class used to handle tracing.

It keeps a tokio runtime that is alive as long as the struct itself.

This should only be built via setup_tracing.

class RecoveryConfig(db_dir, backup_interval=None)#

Configuration settings for recovery.

Parameters:
  • db_dir (Path) – Local filesystem directory to search for recovery database partitions.

  • backup_interval (Optional[timedelta]) – Amount of system time to wait to permanently delete a state snapshot after it is no longer needed. You should set this to the interval at which you are backing up the recovery partitions off of the workers into archival storage (e.g. S3). Defaults to zero duration.

Initialization

property backup_interval#
property db_dir#
class TracingConfig#

Base class for tracing/logging configuration.

There defines what to do with traces and logs emitted by Bytewax.

Use a specific subclass of this to configure where you want the traces to go.

Initialization

class JaegerConfig(service_name, endpoint=None, sampling_ratio=1.0)#
Bases:

Configure tracing to send traces to a Jaeger instance.

The endpoint can be configured with the parameter passed to this config, or with two environment variables:

OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1"
OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"
Parameters:
  • service_name (str) – Identifies this dataflow in Jaeger.

  • endpoint (str) – Connection info. Takes precidence over env vars. Defaults to "127.0.0.1:6831".

  • sampling_ratio (float) – Fraction of traces to send between 0.0 and 1.0.

Initialization

property endpoint#
property sampling_ratio#
property service_name#
class OtlpTracingConfig(service_name, url=None, sampling_ratio=1.0)#
Bases:

Send traces to the OpenTelemetry collector.

See OpenTelemetry collector docs for more info.

Only supports GRPC protocol, so make sure to enable it on your OTEL configuration.

This is the recommended approach since it allows the maximum flexibility in what to do with all the data bytewax can generate.

Parameters:
  • service_name (str) – Identifies this dataflow in OTLP.

  • url (str) – Connection info. Defaults to "grpc:://127.0.0.1:4317".

  • sampling_ratio (float) – Fraction of traces to send between 0.0 and 1.0.

Initialization

property sampling_ratio#
property service_name#
property url#

Functions#

cli_main(
flow,
*,
workers_per_process=1,
process_id=None,
addresses=None,
epoch_interval=None,
recovery_config=None,
)#
cluster_main(
flow,
addresses,
proc_id,
*,
epoch_interval=None,
recovery_config=None,
worker_count_per_proc=1,
)#

Execute a dataflow in the current process as part of a cluster.

This is only used for unit testing. See bytewax.run.

Blocks until execution is complete.

from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource, cluster_main
from bytewax.connectors.stdio import StdOutSink

flow = Dataflow("my_df")
s = op.input("inp", flow, TestingSource(range(3)))
op.output("out", s, StdOutSink())

# In a real example, use "host:port" of all other workers.
addresses = []
proc_id = 0
cluster_main(flow, addresses, proc_id)
0
1
2
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).

  • proc_id (int) – Index of this process in cluster; starts from 0.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

  • worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to 1.

init_db_dir(db_dir, count)#

Create and init a set of empty recovery partitions.

Parameters:
  • db_dir (Path) – Local directory to create partitions in.

  • count (int) – Number of partitions to create.

run_main(flow, *, epoch_interval=None, recovery_config=None)#

Execute a dataflow in the current thread.

Blocks until execution is complete.

This is only used for unit testing. See bytewax.run.

from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource, run_main
from bytewax.connectors.stdio import StdOutSink
flow = Dataflow("my_df")
s = op.input("inp", flow, TestingSource(range(3)))
op.output("out", s, StdOutSink())

run_main(flow)
0
1
2
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

setup_tracing(tracing_config=None, log_level=None)#

Setup Bytewax’s internal tracing and logging.

By default it starts a tracer that logs all ERROR-level messages to stdout.

Note: To make this work, you have to keep a reference of the returned object.

1from bytewax.tracing import setup_tracing
2
3tracer = setup_tracing()
Parameters:
  • tracing_config (TracingConfig) – The specific backend you want to use.

  • log_level (str) – String of the log level. One of "ERROR", "WARN", "INFO", "DEBUG", "TRACE". Defaults to "ERROR".

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now