import operator
from collections.abc import Sequence
from functools import lru_cache, reduce
from typing import TYPE_CHECKING
import numpy as np
from numpy import float64
from numpy import linalg as la
from numpy.typing import NDArray
from rich.progress import BarColumn, Progress, TaskProgressColumn, TextColumn, TimeRemainingColumn
from rich.table import Column
from sklearn import metrics as sk_metrics
import decent_bench.utils.interoperability as iop
from decent_bench.costs import Cost, EmpiricalRiskCost
from decent_bench.metrics._metrics_view import AgentMetricsView
from decent_bench.utils.array import Array
from decent_bench.utils.logger import LOGGER
from decent_bench.utils.types import Dataset
if TYPE_CHECKING:
from decent_bench.benchmark import BenchmarkProblem
CACHE_MAX_SIZE = 50_000
class MetricProgressBar(Progress):
"""
Progress bar for metric calculations.
Make sure to set the field *status* in the task to show custom status messages.
"""
def __init__(self) -> None:
super().__init__(
TextColumn(
"[progress.description]{task.description}",
table_column=Column(width=24, no_wrap=True),
),
BarColumn(),
TaskProgressColumn(),
TimeRemainingColumn(elapsed_when_finished=True),
TextColumn("{task.fields[status]}"),
)
def _clear_caches() -> None:
"""Clear module-level functools caches used by metric utilities."""
x_mean.cache_clear()
_predict_agent.cache_clear()
def _drifts(agents: Sequence[AgentMetricsView], server: AgentMetricsView, iteration: int) -> list[float]:
"""Calculate each agent's distance from the server state at *iteration*."""
server_x = server.x_history[iteration]
return [float(iop.norm(agent.x_history[iteration] - server_x)) for agent in agents]
[docs]
@lru_cache(maxsize=CACHE_MAX_SIZE)
def x_mean(agents: tuple[AgentMetricsView, ...], iteration: int = -1) -> Array:
"""
Calculate the mean x at *iteration* (or using the agents' final x if *iteration* is -1).
Agents that did not reach *iteration* are disregarded.
Raises:
ValueError: if no agent reached *iteration*
"""
all_x_at_iter = [a.x_history[iteration] for a in agents]
if len(all_x_at_iter) == 0:
raise ValueError(f"No agent reached iteration {iteration}")
return reduce(operator.add, all_x_at_iter) / len(all_x_at_iter)
def _regret(agents: Sequence[AgentMetricsView], problem: "BenchmarkProblem", iteration: int = -1) -> float:
r"""
Calculate the global regret at *iteration* (or using the agents' final x if *iteration* is -1).
Global regret is defined as:
.. include:: snippets/global_cost_error.rst
"""
x_opt = problem.x_optimal
mean_x = x_mean(tuple(agents), iteration)
optimal_cost, actual_cost = 0.0, 0.0
for a in agents:
kwargs = {"indices": "all"} if isinstance(a.cost, EmpiricalRiskCost) else {}
optimal_cost += a.cost.function(x_opt, **kwargs) # type: ignore[arg-type]
actual_cost += a.cost.function(mean_x, **kwargs)
return (actual_cost - optimal_cost) / len(agents)
def _gradient_norm(agents: Sequence[AgentMetricsView], iteration: int = -1) -> float:
r"""
Calculate the global gradient norm at *iteration* (or using the agents' final x if *iteration* is -1).
Global gradient norm is defined as:
.. include:: snippets/global_gradient_optimality.rst
"""
mean_x = x_mean(tuple(agents), iteration)
gradients = []
for a in agents:
kwargs = {"indices": "all"} if isinstance(a.cost, EmpiricalRiskCost) else {}
gradients.append(a.cost.gradient(mean_x, **kwargs))
grad_avg = reduce(operator.add, gradients) / len(gradients)
return float(iop.norm(grad_avg))
def _x_error(agents: Sequence[AgentMetricsView], problem: "BenchmarkProblem", iteration: int = -1) -> float:
r"""
Calculate x error at *iteration* (or at the final x if *iteration* is -1).
.. math::
\|\mathbf{\bar{x}}_k - \mathbf{x}^\star\|
where :math:`\mathbf{\bar{x}}_k` is the mean of the agents' local x at iteration k,
and :math:`\mathbf{x}^\star` is the optimal x defined in the *problem*.
"""
mean_x = x_mean(tuple(agents), iteration)
return float(iop.norm(mean_x - problem.x_optimal)) # type: ignore[operator]
def _observed_rounds(agents: Sequence[AgentMetricsView]) -> int:
"""
Get the number of completed rounds observed in the agents' histories.
The initial snapshot is stored at iteration 0, so the maximum recorded iteration corresponds to the number of
completed algorithm rounds when the final snapshot is present.
"""
if not agents:
return 0
return max(agent.x_history.max() for agent in agents)
def _losses(agents: Sequence[AgentMetricsView], iteration: int) -> list[float]:
"""Calculate each agent's loss at *iteration*."""
return [
agent.cost.function(agent.x_history[iteration], indices="all")
if isinstance(agent.cost, EmpiricalRiskCost)
else agent.cost.function(agent.x_history[iteration])
for agent in agents
]
def _accuracy(agents: Sequence[AgentMetricsView], problem: "BenchmarkProblem", iteration: int) -> list[float]:
"""
Calculate the accuracy per agent.
Accuracy is only applicable for problems using :class:`~decent_bench.costs.EmpiricalRiskCost`.
Args:
agents: sequence of agents to calculate accuracy for
problem: benchmark problem containing test data
iteration: iteration to calculate accuracy at, or -1 to use the agents' final x
Returns:
list of accuracies per agent at *iteration*
"""
_, test_y = _split_dataset(problem.test_data) # type: ignore[arg-type]
ret: list[float] = []
for agent in agents:
preds = _predict_agent(agent, iteration, problem)
if np.any(~np.isfinite(preds)):
LOGGER.warning(
"Predictions contain NaN or Inf values, which are not valid for Accuracy calculation, "
"returning NaN for Accuracy"
)
return [np.nan for _ in agents]
ret.append(float(sk_metrics.accuracy_score(test_y, preds)))
return ret
def _mse(agents: Sequence[AgentMetricsView], problem: "BenchmarkProblem", iteration: int) -> list[float]:
"""
Calculate the mean squared error (MSE) per agent.
MSE is only applicable for problems using :class:`~decent_bench.costs.EmpiricalRiskCost`.
Args:
agents: sequence of agents to calculate MSE for
problem: benchmark problem containing test data
iteration: iteration to calculate MSE at, or -1 to use the agents' final x
Returns:
list of MSE per agent
"""
ret: list[float] = []
_, test_y = _split_dataset(problem.test_data) # type: ignore[arg-type]
for agent in agents:
preds = _predict_agent(agent, iteration, problem)
if np.any(~np.isfinite(preds)):
LOGGER.warning(
"Predictions contain NaN or Inf values, which are not valid for MSE calculation, returning NaN for MSE"
)
return [np.nan for _ in agents]
ret.append(sk_metrics.mean_squared_error(test_y, preds))
return ret
def _predict_cost_at_x(cost: Cost, x: Array, problem: "BenchmarkProblem") -> NDArray[float64]:
"""Get predictions from *cost* at an arbitrary model state *x*."""
test_x, _ = _split_dataset(problem.test_data) # type: ignore[arg-type]
return iop.to_numpy(cost.predict(x, list(test_x))) # type: ignore[attr-defined]
def _accuracy_at_x(cost: Cost, x: Array, problem: "BenchmarkProblem") -> float:
"""Calculate accuracy from *cost* predictions at an arbitrary model state *x*."""
_, test_y = _split_dataset(problem.test_data) # type: ignore[arg-type]
preds = _predict_cost_at_x(cost, x, problem)
if np.any(~np.isfinite(preds)):
LOGGER.warning(
"Predictions contain NaN or Inf values, which are not valid for Accuracy calculation, "
"returning NaN for Accuracy"
)
return np.nan
return float(sk_metrics.accuracy_score(test_y, preds))
def _mse_at_x(cost: Cost, x: Array, problem: "BenchmarkProblem") -> float:
"""Calculate MSE from *cost* predictions at an arbitrary model state *x*."""
_, test_y = _split_dataset(problem.test_data) # type: ignore[arg-type]
preds = _predict_cost_at_x(cost, x, problem)
if np.any(~np.isfinite(preds)):
LOGGER.warning(
"Predictions contain NaN or Inf values, which are not valid for MSE calculation, returning NaN for MSE"
)
return np.nan
return float(sk_metrics.mean_squared_error(test_y, preds))
def _precision(agents: Sequence[AgentMetricsView], problem: "BenchmarkProblem", iteration: int) -> list[float]:
"""
Calculate the precision per agent.
Precision is only applicable for problems using :class:`~decent_bench.costs.EmpiricalRiskCost`.
Calculated using :func:`sklearn.metrics.precision_score` with micro averaging.
Args:
agents: sequence of agents to calculate precision for
problem: benchmark problem containing test data
iteration: iteration to calculate precision at, or -1 to use the agents' final x
Returns:
list of precision per agent at *iteration*
"""
_, test_y = _split_dataset(problem.test_data) # type: ignore[arg-type]
ret: list[float] = []
for agent in agents:
preds = _predict_agent(agent, iteration, problem)
if np.any(~np.isfinite(preds)):
LOGGER.warning(
"Predictions contain NaN or Inf values, which are not valid for Precision calculation, "
"returning NaN for Precision"
)
return [np.nan for _ in agents]
ret.append(float(sk_metrics.precision_score(test_y, preds, average="micro")))
return ret
def _recall(agents: Sequence[AgentMetricsView], problem: "BenchmarkProblem", iteration: int) -> list[float]:
"""
Calculate the recall per agent.
Recall is only applicable for problems using :class:`~decent_bench.costs.EmpiricalRiskCost`.
Calculated using :func:`sklearn.metrics.recall_score` with micro averaging.
Args:
agents: sequence of agents to calculate recall for
problem: benchmark problem containing test data
iteration: iteration to calculate recall at, or -1 to use the agents' final x
Returns:
list of recall per agent at *iteration*
"""
_, test_y = _split_dataset(problem.test_data) # type: ignore[arg-type]
ret: list[float] = []
for agent in agents:
preds = _predict_agent(agent, iteration, problem)
if np.any(~np.isfinite(preds)):
LOGGER.warning(
"Predictions contain NaN or Inf values, which are not valid for Recall calculation,"
" returning NaN for Recall"
)
return [np.nan for _ in agents]
ret.append(float(sk_metrics.recall_score(test_y, preds, average="micro")))
return ret
def _split_dataset(data: Dataset) -> tuple[tuple[Array, ...], NDArray[float64]]:
"""
Split dataset into features and labels.
Args:
data: dataset to split, as a tuple of (features, labels)
Returns:
tuple of (features, labels)
"""
x, y = zip(*data, strict=True)
test_x = tuple(x)
test_y = np.array(y)
return test_x, test_y
@lru_cache(maxsize=CACHE_MAX_SIZE)
def _predict_agent(agent: AgentMetricsView, iteration: int, problem: "BenchmarkProblem") -> NDArray[float64]:
"""Get the predictions of *agent* at *iteration*. Cached since predictions may be expensive."""
test_x, _ = _split_dataset(problem.test_data) # type: ignore[arg-type]
return iop.to_numpy(agent.cost.predict(agent.x_history[iteration], list(test_x))) # type: ignore[attr-defined]
[docs]
def all_sorted_iterations(agents: Sequence[AgentMetricsView]) -> list[int]:
"""
Get a sorted list of all iterations reached by any agent in *agents*.
Args:
agents: sequence of agents to get the iterations from
Returns:
sorted list of iterations reached by any agent
"""
all_iters = set.union(*(set(a.x_history.keys()) for a in agents)) if agents else set()
return sorted(all_iters)
[docs]
def linear_convergence_rate(y: Sequence[float]) -> float:
r"""
Compute the linear (a.k.a. exponential or geometric) convergence rate from a given trajectory.
Fits a piecewise linear model to the log10-scaled trajectory to identify
the transitory phase and extract its slope. The convergence rate is then computed
as :math:`10^{\text{slope}}`, giving the multiplicative factor by which the error
decreases per iteration during the transitory phase. A convergence rate below :math:`1`
indicates convergence, while above :math:`1` indicates divergence. The smaller the
convergence rate, the faster the convergence.
Args:
y: sequence of error values from optimization trajectory (assumed to be positive)
Returns:
the convergence rate (multiplicative factor per iteration)
Example:
>>> print("Convergence rate of:")
>>> for alg, results in metric_results.plot_results.items():
>>> for metric, stat_results in results.items():
>>> if type(metric) == metric_library.GradientNorm:
>>> print(f"\t- {alg.name}: {utils.linear_convergence_rate(stat_results[1])}")
"""
y_array: NDArray[float64] = np.asarray(y, dtype=float64)
log_y: NDArray[float64] = np.log10(y_array)
results = fit_elbow_curve(log_y)
return float(10 ** results[1])
[docs]
def fit_elbow_curve(
y: NDArray[float64], max_trials: int = 10, tol: float = 1e-5, num_grid_points: int = 10
) -> tuple[float, float, int]:
r"""
Fit a piecewise linear "elbow curve" to data.
Fits two connected line segments to the input data: one with a slope for
the transitory phase and one horizontal for the steady-state phase. Formally, the elbow curve is defined as
.. math::
f(x) = \begin{cases}
s x + y_0 & \text{if } x \leq b \\
s b + y_0 & \text{if } x > b
\end{cases}
where :math:`s` is the slope, :math:`y_0` is the intercept, :math:`b` the breakpoint.
The parameters :math:`s`, :math:`y_0`, and :math:`b` are fitted to the input data using
linear regression with an analytical solution (for efficiency), and grid search to find the optimal breakpoint.
Args:
y: 1D array of data points to fit
max_trials: maximum number of refinement iterations for grid search
tol: grid search stops when fit of residual is less than this value
num_grid_points: number of candidate breakpoints to evaluate in each grid search iteration
Returns:
the *intercept*, *slope*, and *breakpoint* fitted to the data
Note:
A large value of *max_trials*, a small value of *tol*, a large value of *num_grid_points* will increase
the accuracy of the fit, but will require longer computational time.
Note:
`numpy.nan`, `numpy.inf` or `-numpy.inf` values in *y* are disregarded during the fit. These values might
occur in case of divergence (there is only a transient phase, with positive slope), and discarding them
allows to still fit the slope.
Raises:
ValueError: if the input arguments are invalid
"""
# validate arguments
if len(y) < 2:
raise ValueError("At least 2 data points are required to fit an elbow curve")
if max_trials < 1:
raise ValueError("max_trials must be at least 1")
if tol < 0:
raise ValueError("tol must be non-negative")
if num_grid_points < 2:
raise ValueError("num_grid_points must be at least 2")
# discard inf and nan
mask: NDArray[np.bool_] = np.isfinite(y)
y = y[mask]
m: int = int(y.size) # num. datapoints
# define search space for breakpoint b
b_1: int = 1
b_2: int = m - 1
# initialize residual of current best fit
best_res: float = float("inf")
x_hat: NDArray[float64] = np.zeros((2, num_grid_points), dtype=float64)
grid: NDArray[np.int_] = np.zeros(num_grid_points, dtype=int)
for _ in range(max_trials):
# build search grid for b
grid = np.linspace(b_1, b_2, num=num_grid_points, dtype=int)
x_hat = np.zeros((2, num_grid_points), dtype=float64) # fitted parameters for each candidate breakpoint
res: NDArray[float64] = np.zeros(num_grid_points, dtype=float64) # residual for each candidate breakpoint
# solve linear regression for points in grid
for i, b in enumerate(grid):
x_hat[:, i], res[i] = _fit_elbow_curve_given_breakpoint(y, b)
best_idx: int = int(np.argmin(res)) # best candidate breakpoint
# zoom in on region containing the best candidate
b_1, b_2 = grid[max(0, best_idx - 1)], grid[min(num_grid_points - 1, best_idx + 1)]
# stop if best residual did not improve significantly
if best_res - res[best_idx] < tol:
break
best_res = res[best_idx]
return float(x_hat[:, best_idx][1]), float(x_hat[:, best_idx][0]), int(grid[best_idx] - 1)
def _fit_elbow_curve_given_breakpoint(y: NDArray[float64], b: int) -> tuple[NDArray[float64], float]:
"""
Perform least squares fit for piecewise linear model with fixed breakpoint.
Fits a piecewise linear model where the first segment (0 to b) has a slope
and intercept, and the second segment (b+1 to end) is horizontal at the
same intercept value. Uses analytical solution for efficiency.
Args:
y: 1D array of data points to fit, as column of shape (m, 1)
b: breakpoint index (0-based)
Returns:
the fitted parameters *x_hat* and *residual* of the fit
Note:
It is assumed that *y* does not contain `numpy.nan`, `numpy.inf` or `-numpy.inf`.
"""
m = len(y) # num. datapoints
# build the regression matrix, assuming the breakpoint is b
R: NDArray[float64] = np.hstack( # noqa: N806
(
np.vstack(
(
np.arange(0, b + 1, 1).reshape((b + 1, 1)),
b * np.ones((m - b - 1, 1)),
)
),
np.ones((m, 1)),
)
)
# analytical expression for inverse of R.T @ R
S_00 = b * (b + 1) * (2 * b + 1) / 6.0 + (m - b - 1) * b**2 # noqa: N806
S_01 = b * (b + 1) / 2.0 + (m - b - 1) * b # noqa: N806
S_11 = m # noqa: N806
S_inv: NDArray[float64] = np.array([[S_11, -S_01], [-S_01, S_00]]) / (S_00 * S_11 - S_01**2) # noqa: N806
# fit parameters and compute residual
x_hat: NDArray[float64] = S_inv @ (R.T @ y)
res = float(la.norm(R @ x_hat - y))
# return fitted parameters and residual of fit
return x_hat, res