bytewax_rerun.sinks#

Sink to log data to Rerun from Bytewax.

See the top level documentation for usage.

Data#

RerunEntity#

The type used by Rerun to identify the argument for rr.log’s entity parameter.

It’s defined as rr.AsComponents | Iterable[rr.ComponentBatchLike].

Classes#

class RerunMessage#

Helper class that contains everything needed to log to Rerun.

See rerun’s docs for details.

Parameters:
  • entity_path (str | list[str]) – The path (or list of paths) of the entity to be logged. The connector prepends dataflow/worker{worker_index}/ to the entity path, so that you can distinguish between workers when looking at the data.

  • entity (RerunEntity) – The entity, or entities, you want to log to rerun.

  • timeline (None | str) – Timeline name to log the entity to. Optional. If you set this parameter, you also have to set time, as the message needs a time in the specified timeline to be logged.

  • time (None | float) – The time in seconds in the timeline specified above.

  • timeless (bool) – Set the item as timeless when logging. See Rerun docs.

  • bool (static) – Set the item as static when logging. See Rerun docs.

entity_path: str | list[str]#
entity: RerunEntity#
timeline: Optional[str]#
time: Optional[float]#
timeless: bool#
static: bool#
class RerunSink(
application_id: str,
recording_id: str,
operating_mode: Literal[spawn, connect, save, serve] = 'spawn',
address: Optional[str] = None,
save_dir: Optional[Path] = None,
)#
Bases:

Sink to handle Rerun instance and log data to it.

Handle the initialization of Rerun in Bytewax’s context, and logs items to it.

Instances of this sink also offer a decorator that can be used to log execution info of any function in your user code to Rerun. See RerunSink.rerun_log.

Parameters:
  • application_id (str) – The application_id to use in Rerun. You can use the same string as the flow_id. You MUST use the same application_id for all the sinks in your dataflow.

  • recording_id (str) – The recording_id. This needs to be specified here so that all the workers can log to the same recording. You must use the same recording_id for all the sinks in your dataflow.

  • operating_mode (str) –

    Rerun’s operating mode. Possible values are: "spawn", "connect", "save", "serve"

    The default mode is "spawn", where the first worker that runs spawns the viewer on the machine you run the dataflow on, and all the other workers connect to it. Use this during local development.

    "connect" is similar to spawn, but no worker spawns the viewer, which has to be ran outside the dataflow, and all the workers will connect to it. If you use "connect" mode, you have to specify the address parameter too.

    "save" mode saves the recording of each worker in a separate file. Files can be opened all together with the rerun viewer to replay the session. This is suitable for production use if your workers have access to a filesystem where the files can be written. If you use "save" mode, you also need to specify the save_dir parameter.

    "serve" mode runs a web version of the viewer an opens the browser at the page. Workers send data to the browser viewer through websocket. Only works with a single worker, as there’s seems to be no way of using this mode without spawning the webserver, so each worker would have to use a different viewer.

  • address (None | str) – The address of the viewer to connect to. For "connect" mode only.

  • save_dir (None | Path) – The directory where to save recording files. For "save" mode only.

Initialization

Initialize Rerun.

build(
step_id: str,
worker_index: int,
worker_count: int,
) bytewax_rerun.sinks._RerunPartition#

Builds a sink partition for the Rerun sink.

rerun_log(log_args: bool = False, log_return: bool = False) Callable#

Decorator to log any function’s execution info to Rerun.

Instantiate the sink outside the output operator, and use @sink.rerun_log() to decorate any function you want to see logged in rerun.

This decorator will create a separate timeline, named bytewax, where for each function each worker will log the moment the function was started and for how long it ran.

Parameters:
  • log_args (bool) – If this is True, the string representation of the arguments passed to the function will be logged.

  • log_return (bool) – If this is True, the string representation of the returned value will be logged.

Note: This needs to be a method of the RerunSink because we need to use the same application_id and recording_id, and we also need info on which worker this function is running at, so that we can log the correct info into Rerun.

Example usage:

 1# ...
 2sink = RerunSink("app_id", "rec_id")
 3
 4
 5@sink.rerun_log(log_args=True, log_return=True)
 6def my_function(item: int) -> int:
 7    return item * 2
 8
 9
10# ...
Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now