bytewax._bytewax
#
Internal Bytewax symbols from Rust.
These are re-imported elsewhere in the public bytewax
module for
use.
Classes#
- class BytewaxTracer#
Utility class used to handle tracing.
It keeps a tokio runtime that is alive as long as the struct itself.
This should only be built via
setup_tracing
.
- class ClockConfig#
Base class for a clock config.
This describes how a windowing operator should determine the current time and the time for each element.
Use a specific subclass of this that matches the time definition you’d like to use.
Initialization
- class RecoveryConfig(db_dir, backup_interval=None, snapshot_serde=None)#
Configuration settings for recovery.
- Parameters:
db_dir (Path) – Local filesystem directory to search for recovery database partitions.
backup_interval (Optional[timedelta]) – Amount of system time to wait to permanently delete a state snapshot after it is no longer needed. You should set this to the interval at which you are backing up the recovery partitions off of the workers into archival storage (e.g. S3). Defaults to zero duration.
snapshot_serde (Optional[Serde]) – Format to use when encoding state snapshot objects in the recovery partitions. Defaults to
JsonPickleSerde
.
Initialization
- property backup_interval#
- property db_dir#
- property snapshot_serde#
- class TracingConfig#
Base class for tracing/logging configuration.
There defines what to do with traces and logs emitted by Bytewax.
Use a specific subclass of this to configure where you want the traces to go.
Initialization
- class WindowConfig#
Base class for a windower config.
This describes the type of windows you would like.
Use a specific subclass of this that matches the window definition you’d like to use.
Initialization
- class WindowMetadata(open_time, close_time)#
Contains information about a window.
Initialization
- property close_time#
The time that the window closes.
For some window types like
SessionWindow
, this value can change as new data is received.
- property open_time#
The time that the window starts.
- class EventClockConfig(dt_getter, wait_for_system_duration)#
- Bases:
Use a getter function to lookup the timestamp for each item.
The watermark is the largest item timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means items will be correctly processed as long as they are not out of order more than the waiting duration in system time.
If the dataflow has no more input, all windows are closed.
- Parameters:
- Returns:
Config object. Pass this as the
clock_config
parameter to your windowing operator.
Initialization
- property dt_getter#
- property wait_for_system_duration#
- class SystemClockConfig#
- Bases:
Use the current system time as the timestamp for each item.
The watermark is also the current system time.
If the dataflow has no more input, all windows are closed.
Returns: Config object. Pass this as the
clock_config
parameter to your windowing operator.Initialization
- class JaegerConfig(service_name, endpoint=None, sampling_ratio=1.0)#
- Bases:
Configure tracing to send traces to a Jaeger instance.
The endpoint can be configured with the parameter passed to this config, or with two environment variables:
OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1" OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"
- Parameters:
Initialization
- property endpoint#
- property sampling_ratio#
- property service_name#
- class OtlpTracingConfig(service_name, url=None, sampling_ratio=1.0)#
- Bases:
Send traces to the OpenTelemetry collector.
See OpenTelemetry collector docs for more info.
Only supports GRPC protocol, so make sure to enable it on your OTEL configuration.
This is the recommended approach since it allows the maximum flexibility in what to do with all the data bytewax can generate.
- Parameters:
Initialization
- property sampling_ratio#
- property service_name#
- property url#
- class SessionWindow(gap)#
- Bases:
Session windowing with a fixed inactivity gap.
Each time a new item is received, it is added to the latest window if the time since the latest event is <
gap
. Otherwise a new window is created that starts at current clock’s time.Warning
Currently, session windows do not support out-of-order data. Out of order data will be placed in their own sessions rather than merging adjacent sessions.
Ensure that your data source is always in order if using an
EventClockConfig
. Even if it is in-order, you cannot use event time session windows with any windowing join operator.SystemClockConfig
is always in order, so should be fine to use with any operator.- Parameters:
gap (timedelta) – Gap of inactivity before considering a session closed. The gap should not be negative.
- Returns:
Config object. Pass this as the
window_config
parameter to your windowing operator.
Initialization
- property gap#
- class SlidingWindow(length, offset, align_to)#
- Bases:
Sliding windows of fixed duration.
If
offset == length
, windows cover all time but do not overlap. Each item will fall in exactly one window. This would be equivalent to aTumblingWindow
.If
offset < length
, windows overlap. Each item will fall in multiple windows.If
offset > length
, there will be gaps between windows. Each item can fall in up to one window, but might fall into none.Window start times are inclusive, but end times are exclusive.
- Parameters:
- Returns:
Config object. Pass this as the
window_config
parameter to your windowing operator.
Initialization
- property align_to#
- property length#
- property offset#
- class TumblingWindow(length, align_to)#
- Bases:
Tumbling windows of fixed duration.
Each item will fall in exactly one window.
Window start times are inclusive, but end times are exclusive.
- Parameters:
- Returns:
Config object. Pass this as the
window_config
parameter to your windowing operator.
Initialization
- property align_to#
- property length#
Functions#
- cli_main(
- flow,
- *,
- workers_per_process=1,
- process_id=None,
- addresses=None,
- epoch_interval=None,
- recovery_config=None,
- cluster_main(
- flow,
- addresses,
- proc_id,
- *,
- epoch_interval=None,
- recovery_config=None,
- worker_count_per_proc=1,
Execute a dataflow in the current process as part of a cluster.
This is only used for unit testing. See
bytewax.run
.Blocks until execution is complete.
1>>> from bytewax.dataflow import Dataflow 2>>> import bytewax.operators as op 3>>> from bytewax.testing import TestingSource, cluster_main 4>>> from bytewax.connectors.stdio import StdOutSink 5>>> flow = Dataflow("my_df") 6>>> s = op.input("inp", flow, TestingSource(range(3))) 7>>> op.output("out", s, StdOutSink()) 8>>> # In a real example, use "host:port" of all other workers. 9>>> addresses = [] 10>>> proc_id = 0 11>>> cluster_main(flow, addresses, proc_id) 120 131 142
- Parameters:
flow (Dataflow) – Dataflow to run.
addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).
proc_id (int) – Index of this process in cluster; starts from 0.
epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.
recovery_config (Optional[RecoveryConfig]) – State recovery config. If
None
, state will not be persisted.worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to
1
.
- init_db_dir(db_dir, count)#
Create and init a set of empty recovery partitions.
- run_main(flow, *, epoch_interval=None, recovery_config=None)#
Execute a dataflow in the current thread.
Blocks until execution is complete.
This is only used for unit testing. See
bytewax.run
.1>>> from bytewax.dataflow import Dataflow 2>>> import bytewax.operators as op 3>>> from bytewax.testing import TestingSource, run_main 4>>> from bytewax.connectors.stdio import StdOutSink 5>>> flow = Dataflow("my_df") 6>>> s = op.input("inp", flow, TestingSource(range(3))) 7>>> op.output("out", s, StdOutSink()) 8>>> run_main(flow) 90 101 112
- Parameters:
flow (Dataflow) – Dataflow to run.
epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.
recovery_config (Optional[RecoveryConfig]) – State recovery config. If
None
, state will not be persisted.
- setup_tracing(tracing_config=None, log_level=None)#
Setup Bytewax’s internal tracing and logging.
By default it starts a tracer that logs all
ERROR
-level messages to stdout.Note: To make this work, you have to keep a reference of the returned object.
1from bytewax.tracing import setup_tracing 2 3tracer = setup_tracing()
- Parameters:
tracing_config (TracingConfig) – The specific backend you want to use.
log_level (str) – String of the log level. One of
"ERROR"
,"WARN"
,"INFO"
,"DEBUG"
,"TRACE"
. Defaults to"ERROR"
.
- test_cluster(
- flow,
- *,
- epoch_interval=None,
- recovery_config=None,
- processes=1,
- workers_per_process=1,
Execute a Dataflow by spawning multiple Python processes.
Blocks until execution is complete.
This function should only be used for testing purposes.