bytewax_rerun.sinks#
Sink to log data to Rerun from Bytewax.
See the top level documentation for usage.
Data#
- RerunEntity#
The type used by Rerun to identify the argument for
rr.log’sentityparameter.It’s defined as
rr.AsComponents | Iterable[rr.ComponentBatchLike].
Classes#
- class RerunMessage#
Helper class that contains everything needed to log to Rerun.
See rerun’s docs for details.
- Parameters:
entity_path (str | list[str]) – The path (or list of paths) of the entity to be logged. The connector prepends
dataflow/worker{worker_index}/to the entity path, so that you can distinguish between workers when looking at the data.entity (RerunEntity) – The entity, or entities, you want to log to rerun.
timeline (None | str) – Timeline name to log the entity to. Optional. If you set this parameter, you also have to set
time, as the message needs a time in the specified timeline to be logged.time (None | float) – The time in seconds in the timeline specified above.
timeless (bool) – Set the item as
timelesswhen logging. See Rerun docs.bool (static) – Set the item as
staticwhen logging. See Rerun docs.
- entity: RerunEntity#
- class RerunSink(
- application_id: str,
- recording_id: str,
- operating_mode: Literal[spawn, connect, save, serve] = 'spawn',
- address: Optional[str] = None,
- save_dir: Optional[Path] = None,
- Bases:
Sink to handle Rerun instance and log data to it.
Handle the initialization of Rerun in Bytewax’s context, and logs items to it.
Instances of this sink also offer a decorator that can be used to log execution info of any function in your user code to Rerun. See
RerunSink.rerun_log.- Parameters:
application_id (str) – The
application_idto use in Rerun. You can use the same string as theflow_id. You MUST use the sameapplication_idfor all the sinks in your dataflow.recording_id (str) – The
recording_id. This needs to be specified here so that all the workers can log to the same recording. You must use the samerecording_idfor all the sinks in your dataflow.operating_mode (str) –
Rerun’s operating mode. Possible values are:
"spawn","connect","save","serve"The default mode is
"spawn", where the first worker that runs spawns the viewer on the machine you run the dataflow on, and all the other workers connect to it. Use this during local development."connect"is similar to spawn, but no worker spawns the viewer, which has to be ran outside the dataflow, and all the workers will connect to it. If you use"connect"mode, you have to specify theaddressparameter too."save"mode saves the recording of each worker in a separate file. Files can be opened all together with the rerun viewer to replay the session. This is suitable for production use if your workers have access to a filesystem where the files can be written. If you use"save"mode, you also need to specify thesave_dirparameter."serve"mode runs a web version of the viewer an opens the browser at the page. Workers send data to the browser viewer through websocket. Only works with a single worker, as there’s seems to be no way of using this mode without spawning the webserver, so each worker would have to use a different viewer.address (None | str) – The address of the viewer to connect to. For
"connect"mode only.save_dir (None | Path) – The directory where to save recording files. For
"save"mode only.
Initialization
Initialize Rerun.
- build( ) bytewax_rerun.sinks._RerunPartition#
Builds a sink partition for the Rerun sink.
- rerun_log(log_args: bool = False, log_return: bool = False) Callable#
Decorator to log any function’s execution info to Rerun.
Instantiate the sink outside the
outputoperator, and use@sink.rerun_log()to decorate any function you want to see logged in rerun.This decorator will create a separate timeline, named
bytewax, where for each function each worker will log the moment the function was started and for how long it ran.- Parameters:
Note: This needs to be a method of the RerunSink because we need to use the same
application_idandrecording_id, and we also need info on which worker this function is running at, so that we can log the correct info into Rerun.Example usage:
1# ... 2sink = RerunSink("app_id", "rec_id") 3 4 5@sink.rerun_log(log_args=True, log_return=True) 6def my_function(item: int) -> int: 7 return item * 2 8 9 10# ...