bytewax.bytewax#

Internal Bytewax symbols from Rust.

These are re-imported elsewhere in the public bytewax module for use.

Classes#

class RecoveryConfig(db_dir, backup_interval=None, snapshot_serde=None)#

Configuration settings for recovery.

Parameters:
  • db_dir (Path) – Local filesystem directory to search for recovery database partitions.

  • backup_interval (Optional[timedelta]) – Amount of system time to wait to permanently delete a state snapshot after it is no longer needed. You should set this to the interval at which you are backing up the recovery partitions off of the workers into archival storage (e.g. S3). Defaults to zero duration.

  • snapshot_serde (Optional[Serde]) – Format to use when encoding state snapshot objects in the recovery partitions. Defaults to JsonPickleSerde.

Initialization

property db_dir#
property backup_interval#
property snapshot_serde#
class TracingConfig#

Base class for tracing/logging configuration.

There defines what to do with traces and logs emitted by Bytewax.

Use a specific subclass of this to configure where you want the traces to go.

Initialization

class BytewaxTracer#

Utility class used to handle tracing.

It keeps a tokio runtime that is alive as long as the struct itself.

This should only be built via setup_tracing.

class ClockConfig#

Base class for a clock config.

This describes how a windowing operator should determine the current time and the time for each element.

Use a specific subclass of this that matches the time definition you’d like to use.

Initialization

class WindowConfig#

Base class for a windower config.

This describes the type of windows you would like.

Use a specific subclass of this that matches the window definition you’d like to use.

Initialization

class WindowMetadata(open_time, close_time)#

Contains information about a window.

Initialization

property open_time#

The time that the window starts.

property close_time#

The time that the window closes.

For some window types like SessionWindow, this value can change as new data is received.

class JaegerConfig(service_name, endpoint=None, sampling_ratio=1.0)#
Bases:

Configure tracing to send traces to a Jaeger instance.

The endpoint can be configured with the parameter passed to this config, or with two environment variables:

OTEL_EXPORTER_JAEGER_AGENT_HOST="127.0.0.1"
OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"
Parameters:
  • service_name (str) – Identifies this dataflow in Jaeger.

  • endpoint (str) – Connection info. Takes precidence over env vars. Defaults to "127.0.0.1:6831".

  • sampling_ratio (float) – Fraction of traces to send between 0.0 and 1.0.

Initialization

property service_name#
property endpoint#
property sampling_ratio#
class OtlpTracingConfig(service_name, url=None, sampling_ratio=1.0)#
Bases:

Send traces to the OpenTelemetry collector.

See OpenTelemetry collector docs for more info.

Only supports GRPC protocol, so make sure to enable it on your OTEL configuration.

This is the recommended approach since it allows the maximum flexibility in what to do with all the data bytewax can generate.

Parameters:
  • service_name (str) – Identifies this dataflow in OTLP.

  • url (str) – Connection info. Defaults to "grpc:://127.0.0.1:4317".

  • sampling_ratio (float) – Fraction of traces to send between 0.0 and 1.0.

Initialization

property service_name#
property url#
property sampling_ratio#
class EventClockConfig(dt_getter, wait_for_system_duration)#
Bases:

Use a getter function to lookup the timestamp for each item.

The watermark is the largest item timestamp seen thus far, minus the waiting duration, plus the system time duration that has elapsed since that item was seen. This effectively means items will be correctly processed as long as they are not out of order more than the waiting duration in system time.

If the dataflow has no more input, all windows are closed.

Parameters:
  • dt_getter (Callable[[Any], datetime]) – Returns the timestamp for an item. The datetime returned must have tzinfo set to timezone.utc. E.g. datetime(1970, 1, 1, tzinfo=timezone.utc)

  • wait_for_system_duration (timedelta) – How much time to wait before considering an event late.

Returns:

Config object. Pass this as the clock_config parameter to your windowing operator.

Initialization

property wait_for_system_duration#
property dt_getter#
class SystemClockConfig#
Bases:

Use the current system time as the timestamp for each item.

The watermark is also the current system time.

If the dataflow has no more input, all windows are closed.

Returns: Config object. Pass this as the clock_config parameter to your windowing operator.

Initialization

class TumblingWindow(length, align_to)#
Bases:

Tumbling windows of fixed duration.

Each item will fall in exactly one window.

Window start times are inclusive, but end times are exclusive.

Parameters:
  • length (timedelta) – Length of windows.

  • align_to (timedelta) – Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property length#
property align_to#
class SlidingWindow(length, offset, align_to)#
Bases:

Sliding windows of fixed duration.

If offset == length, windows cover all time but do not overlap. Each item will fall in exactly one window. This would be equivalent to a TumblingWindow.

If offset < length, windows overlap. Each item will fall in multiple windows.

If offset > length, there will be gaps between windows. Each item can fall in up to one window, but might fall into none.

Window start times are inclusive, but end times are exclusive.

Parameters:
  • length (timedelta) – Length of windows.

  • offset (timedelta) – Duration between start times of adjacent windows.

  • align_to (datetime) – Align windows so this instant starts a window. This must be a constant. You can use this to align all windows to hour boundaries, e.g.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property align_to#
property offset#
property length#
class SessionWindow(gap)#
Bases:

Session windowing with a fixed inactivity gap.

Each time a new item is received, it is added to the latest window if the time since the latest event is < gap. Otherwise a new window is created that starts at current clock’s time.

Parameters:

gap (timedelta) – Gap of inactivity before considering a session closed. The gap should not be negative.

Returns:

Config object. Pass this as the window_config parameter to your windowing operator.

Initialization

property gap#

Functions#

init_db_dir(db_dir, count)#

Create and init a set of empty recovery partitions.

Parameters:
  • db_dir (Path) – Local directory to create partitions in.

  • count (int) – Number of partitions to create.

run_main(flow, *, epoch_interval=None, recovery_config=None)#

Execute a dataflow in the current thread.

Blocks until execution is complete.

This is only used for unit testing. See bytewax.run.

 1>>> from bytewax.dataflow import Dataflow
 2>>> import bytewax.operators as op
 3>>> from bytewax.testing import TestingSource, run_main
 4>>> from bytewax.connectors.stdio import StdOutSink
 5>>> flow = Dataflow("my_df")
 6>>> s = op.input("inp", flow, TestingSource(range(3)))
 7>>> op.output("out", s, StdOutSink())
 8>>> run_main(flow)
 90
101
112
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

cluster_main(
flow,
addresses,
proc_id,
*,
epoch_interval=None,
recovery_config=None,
worker_count_per_proc=1,
)#

Execute a dataflow in the current process as part of a cluster.

This is only used for unit testing. See bytewax.run.

Blocks until execution is complete.

 1>>> from bytewax.dataflow import Dataflow
 2>>> import bytewax.operators as op
 3>>> from bytewax.testing import TestingSource, cluster_main
 4>>> from bytewax.connectors.stdio import StdOutSink
 5>>> flow = Dataflow("my_df")
 6>>> s = op.input("inp", flow, TestingSource(range(3)))
 7>>> op.output("out", s, StdOutSink())
 8>>> # In a real example, use "host:port" of all other workers.
 9>>> addresses = []
10>>> proc_id = 0
11>>> cluster_main(flow, addresses, proc_id)
120
131
142
Parameters:
  • flow (Dataflow) – Dataflow to run.

  • addresses (List[str]) – List of host/port addresses for all processes in this cluster (including this one).

  • proc_id (int) – Index of this process in cluster; starts from 0.

  • epoch_interval (Optional[timedelta]) – System time length of each epoch. Defaults to 10 seconds.

  • recovery_config (Optional[RecoveryConfig]) – State recovery config. If None, state will not be persisted.

  • worker_count_per_proc (int) – Number of worker threads to start on each process. Defaults to 1.

cli_main(
flow,
*,
workers_per_process=1,
process_id=None,
addresses=None,
epoch_interval=None,
recovery_config=None,
)#
test_cluster(
flow,
*,
epoch_interval=None,
recovery_config=None,
processes=1,
workers_per_process=1,
)#

Execute a Dataflow by spawning multiple Python processes.

Blocks until execution is complete.

This function should only be used for testing purposes.

setup_tracing(tracing_config=None, log_level=None)#

Setup Bytewax’s internal tracing and logging.

By default it starts a tracer that logs all ERROR-level messages to stdout.

Note: To make this work, you have to keep a reference of the returned object.

1from bytewax.tracing import setup_tracing
2
3tracer = setup_tracing()
Parameters:
  • tracing_config (TracingConfig) – The specific backend you want to use.

  • log_level (str) – String of the log level. One of "ERROR", "WARN", "INFO", "DEBUG", "TRACE". Defaults to "ERROR".

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now