decent_bench.utils.checkpoint_manager#

class decent_bench.utils.checkpoint_manager.CheckpointManager(checkpoint_dir: str | Path, checkpoint_step: int | None = None, keep_n_checkpoints: int = 3, benchmark_metadata: dict[str, Any] | None = None, compression_level: int = 1)[source]#

Bases: object

Manages checkpoint directory structure and file operations for benchmark execution.

The CheckpointManager creates and maintains a hierarchical directory structure for storing checkpoint data during benchmark execution. This allows benchmarks to be resumed if interrupted, and provides incremental saving of results as trials complete.

Directory Structure:

The checkpoint directory is organized as follows:

checkpoint_dir/
├── metadata.json                   # Run configuration and algorithm metadata
├── benchmark_problem.pkl.zst       # Initial benchmark problem state (before any trials), zstd-compressed
├── initial_algorithms.pkl.zst      # Initial algorithm states (before any trials), zstd-compressed
├── metric_computation.pkl.zst      # Computed metrics results (after all trials complete), zstd-compressed
├── algorithm_0/                    # Directory for first algorithm
│   ├── trial_0/                    # Directory for trial 0
│   │   ├── checkpoint_0000100.pkl.zst  # Combined algorithm+network state at iteration 100, zstd-compressed
│   │   ├── checkpoint_0000200.pkl.zst  # Combined algorithm+network state at iteration 200, zstd-compressed
│   │   ├── progress.json           # {"last_completed_iteration": N}
│   │   └── complete.json           # Marker file, contains path to final checkpoint
│   ├── trial_1/
│   │   └── ...
│   └── trial_N/
│       └── ...
└── results/                        # Results directory for storing final tables and plots after completion
    ├── plots_fig1.png              # Final plot for figure 1 with plot results
    ├── plots_fig2.png              # Final plot for figure 2 with plot results
    ├── table.tex                   # Final LaTeX file with table results
    └── table.txt                   # Final text file with table results
File Descriptions:
  • metadata.json: Benchmark configuration and any user-provided metadata

    (e.g., hyperparameters, system info). User-provided metadata can be added through the benchmark() function or appended later using append_metadata().

  • benchmark_problem.pkl.zst: Initial benchmark problem state before any trials run,

    stored as a zstd-compressed pickle payload.

  • initial_algorithms.pkl.zst: Initial algorithm states before any trials run,

    stored as a zstd-compressed pickle payload.

  • metric_computation.pkl.zst: Computed metrics results after

    compute_metrics() completes, stored as a zstd-compressed pickle payload.

  • checkpoint_NNNNNNN.pkl.zst: Combined checkpoint containing both algorithm and network

    state, stored as a zstd-compressed pickle payload. This preserves shared object references and ensures consistency between algorithm and network states at each checkpoint. The checkpoint data is a dictionary with the following structure:

    where “algorithm” is the Algorithm object with its internal state at the checkpoint, “network” is the Network object with agent states at the checkpoint and “iteration” is the iteration number of the checkpoint.

  • progress.json: Tracks the last completed iteration within a trial.

  • complete.json: Marker file, contains path to final checkpoint.

  • plots_figX.png: Final plots for figures after benchmark completion.

  • table.tex: Final LaTeX file with table results after benchmark completion.

  • table.txt: Final text file with table results after benchmark completion.

Thread Safety:
  • Each trial writes to its own directory, avoiding write conflicts.

  • Completed trial results are loaded read-only.

  • Metadata is written once at initialization.

Parameters:
  • checkpoint_dir – Path to save checkpoints during execution. If provided, progress will be saved at regular intervals allowing resumption if interrupted. When starting a new benchmark the directory must be empty or non-existent.

  • checkpoint_step – Number of iterations between checkpoints within each trial. If None, only save at the end of each trial. For long-running algorithms, set this to checkpoint during trial execution (e.g., every 1000 iterations).

  • keep_n_checkpoints – Maximum number of iteration checkpoints to keep per trial. Older checkpoints are automatically deleted to save disk space.

  • benchmark_metadata – Optional dictionary of additional metadata to save in the checkpoint directory, such as hyperparameters or system information. This can be useful for keeping track of the benchmark configuration and context when analyzing results later.

  • compression_level – Level of compression to use for checkpoint files. Higher levels result in smaller file sizes but take more time to compress and decompress. See zstandard documentation (ZstdCompressor) for details on compression levels. Default is 1, which provides a good balance between compression ratio and speed for typical checkpoint payloads. Adjust as needed based on the size of the checkpoint data and performance requirements.

Raises:
  • ValueError – If checkpoint_step is not a positive integer or None.

  • ValueError – If keep_n_checkpoints is not a positive integer.

is_empty() bool[source]#

Check if checkpoint directory is empty or doesn’t exist.

initialize(algorithms: list[Algorithm[Network]], problem: BenchmarkProblem, n_trials: int) None[source]#

Initialize checkpoint directory structure for a new benchmark run.

Parameters:
  • algorithms – List of Algorithm objects to be benchmarked.

  • problem – BenchmarkProblem configuration for the benchmark.

  • n_trials – Total number of trials to run for each algorithm, used for resuming.

create_backup() Path[source]#

Create a backup of the existing checkpoint directory.

Returns:

Path to the created backup zip file.

Raises:

FileExistsError – If the backup file already exists.

append_metadata(additional_metadata: dict[str, Any]) dict[str, Any][source]#

Append additional metadata to existing checkpoint metadata.

This can be used to add information after initialization, such as system resource usage, hyperparameters, or other contextual information that may be relevant for analyzing results later.

Parameters:

additional_metadata – Dictionary of additional metadata to append to the existing metadata.

Returns:

Updated metadata dictionary after appending the additional metadata.

load_initial_algorithms(network: Network | None = None) list[Algorithm[Network]][source]#

Load initial algorithm states from checkpoint.

Parameters:

network – If provided, restore any agent-hash-keyed dicts back to Agent-keyed dicts using the agents from this network.

Returns:

List of Algorithm objects representing the initial algorithm states.

load_benchmark_problem() BenchmarkProblem[source]#

Load benchmark problem configuration from checkpoint.

Returns:

BenchmarkProblem object representing the benchmark problem configuration.

should_checkpoint(iteration: int) bool[source]#

Determine if a checkpoint should be saved at the current iteration.

Checkpointing occurs if:
  • checkpoint_step is set and iteration is a multiple of checkpoint_step

Parameters:

iteration – Current iteration number.

Returns:

True if a checkpoint should be saved, False otherwise.

Raises:

ValueError – If iteration number is negative.

save_checkpoint(*, alg_idx: int, trial: int, iteration: int, algorithm: Algorithm[Network], network: Network, rng_state: dict[str, Any]) Path[source]#

Save checkpoint for a specific algorithm trial at a given iteration.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • trial – Trial number (0-based).

  • iteration – Current iteration number.

  • algorithm – Algorithm object with current internal state.

  • network – Network object with current agent states and metrics.

  • rng_state – RNG snapshot for deterministic resume.

Returns:

Path to the saved checkpoint file.

load_checkpoint(alg_idx: int, trial: int) tuple[Algorithm[Network], Network, int, dict[str, Any]] | None[source]#

Load the latest checkpoint for a specific algorithm trial.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • trial – Trial number (0-based).

Returns:

Tuple of (algorithm, network, last_iteration, rng_state) or None if no checkpoint exists. Execution should resume from iteration (last_iteration + 1).

mark_trial_complete(*, alg_idx: int, trial: int, iteration: int, algorithm: Algorithm[Network], network: Network, rng_state: dict[str, Any]) Path[source]#

Mark a trial as complete and save final result.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • trial – Trial number (0-based).

  • iteration – The final iteration number.

  • algorithm – Final Algorithm state after all iterations complete.

  • network – Final Network state after all iterations complete.

  • rng_state – RNG snapshot for deterministic resume.

Returns:

Path to the saved final checkpoint file.

unmark_trial_complete(alg_idx: int, trial: int) None[source]#

Remove the completion marker for a trial, allowing it to be rerun.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • trial – Trial number (0-based).

is_trial_complete(alg_idx: int, trial: int) bool[source]#

Check if a trial has been completed.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • trial – Trial number (0-based).

Returns:

True if the trial has completed, False otherwise.

is_benchmark_started() bool[source]#

Check if the benchmark has been started by looking for any existing checkpoints.

Returns:

True if any trial has at least one checkpoint saved, False otherwise.

is_benchmark_completed() bool[source]#

Check if all trials for all algorithms have been completed.

Returns:

True if all trials for all algorithms are marked as complete, False otherwise.

are_metrics_computed() bool[source]#

Check if the metrics have been computed and saved in the checkpoint.

Returns:

True if the metrics result file exists, False otherwise.

load_trial_result(alg_idx: int, trial: int) tuple[Algorithm[Network], Network][source]#

Load final result of a completed trial.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • trial – Trial number (0-based).

Returns:

Tuple of (Algorithm object, Network object) with final state after all iterations.

Raises:

ValueError – If the trial is not marked as complete or if the checkpoint data is invalid.

get_completed_trials(alg_idx: int, n_trials: int) list[int][source]#

Get list of completed trial numbers for an algorithm.

Parameters:
  • alg_idx – Algorithm index (0-based).

  • n_trials – Total number of trials in the benchmark.

Returns:

List of completed trial numbers (0-based).

load_metadata() dict[str, Any][source]#

Load checkpoint metadata.

If no metadata file exists, returns an empty dictionary.

Returns:

Dictionary containing benchmark_metadata and algorithms list.

load_benchmark_result() BenchmarkResult[source]#

Load benchmark problem configuration and states from checkpoint.

If an algorithm does not have all trials completed, its results will be skipped and not included in the loaded benchmark result. This is to ensure that the metrics are not skewed by incomplete data and only include algorithms with full results. A warning will be logged for any incomplete algorithms.

Returns:

BenchmarkResult object containing the loaded benchmark problem, initial algorithms, and initial network.

save_metrics_result(metrics_result: MetricResult) None[source]#

Save the computed metrics result to the checkpoint directory.

Parameters:

metrics_result – MetricsResult object containing the computed metrics to save.

load_metrics_result(skip_network_views: bool = False) MetricResult[source]#

Load the computed metrics result from the checkpoint directory.

Parameters:

skip_network_views – If True, do not attempt to load network views from the benchmark result if they are not present in the checkpoint. This can save time if network views are not needed for the intended analysis, which can be useful for automatic analysis. Network views are needed for ComputationalCost and may be used if EmpiricalRiskCost is used.

Returns:

MetricsResult object containing the computed metrics.

get_results_path() Path[source]#

Get the path to the results directory within the checkpoint directory.

Returns:

Path to the results directory within the checkpoint directory.

clear() None[source]#

Remove entire checkpoint directory and all its contents.

Warning

This permanently deletes all checkpoint data.

checkpoint_size() int[source]#

Calculate the total size of all checkpoint files in MB.

Returns:

Total size of checkpoint files in MB.