from collections.abc import Sequence
from functools import reduce
from operator import add
import numpy as np
import decent_bench.utils.interoperability as iop
from decent_bench import centralized_algorithms as ca
from decent_bench.costs import Cost, LinearRegressionCost, LogisticRegressionCost, PyTorchCost, QuadraticCost
from decent_bench.datasets import SyntheticClassificationDatasetHandler, SyntheticRegressionDatasetHandler
from decent_bench.utils import logger
from decent_bench.utils.array import Array
from decent_bench.utils.logger import LOGGER
from decent_bench.utils.types import Dataset, EmpiricalRiskBatchSize, SupportedDevices, SupportedFrameworks
SOLVE_MAX_ITER = 10000
SOLVE_STOP_TOL = 1e-20
SOLVE_MAX_TOL = 1e-16
[docs]
def create_classification_problem(
cost_cls: type[LogisticRegressionCost | PyTorchCost] = LogisticRegressionCost,
*,
device: SupportedDevices = SupportedDevices.CPU,
n_agents: int = 100,
batch_size: EmpiricalRiskBatchSize = "all",
compute_x_optimal: bool = True,
show_progress: bool = True,
) -> tuple[Sequence[Cost], Array | None, Dataset]:
"""
Create out-of-the-box classification problems.
Args:
cost_cls: type of cost function
device: device to create the problem on (only relevant for PyTorchCost)
n_agents: number of agents
batch_size: size of mini-batches for stochastic methods, or "all" for full-batch
compute_x_optimal: if the optimal solution should be computed
(using :func:`~decent_bench.centralized_algorithms.solve`). It is ignored when PyTorchCost is selected.
show_progress: whether to display a progress bar while computing ``x_optimal``. Defaults to ``True``.
Note:
If cost_cls is :class:`~decent_bench.costs.PyTorchCost`, x_optimal is not computed and set to None.
Be aware that metrics that rely on x_optimal (e.g. :class:`~decent_bench.metrics.metric_library.Regret`)
will not be available when using PyTorchCost.
Raises:
ValueError: if an unsupported cost class is provided
ImportError: if PyTorchCost is selected but PyTorch is not installed
"""
if not LOGGER.handlers:
logger.start_logger()
LOGGER.info("Creating cost functions ...")
dataset = SyntheticClassificationDatasetHandler(
n_targets=2,
n_partitions=n_agents,
n_samples_per_partition=10,
n_features=3,
framework=SupportedFrameworks.PYTORCH if cost_cls is PyTorchCost else SupportedFrameworks.NUMPY,
device=device,
feature_dtype=np.float32 if cost_cls is PyTorchCost else np.float64,
squeeze_targets=cost_cls is PyTorchCost, # PyTorchCost expects squeezed targets for CrossEntropyLoss
)
test_data = SyntheticClassificationDatasetHandler(
n_targets=2,
n_partitions=1,
n_samples_per_partition=100, # 1 partition so this is number of samples in test set
n_features=3,
framework=SupportedFrameworks.PYTORCH if cost_cls is PyTorchCost else SupportedFrameworks.NUMPY,
device=device,
feature_dtype=np.float32 if cost_cls is PyTorchCost else np.float64,
squeeze_targets=cost_cls is PyTorchCost,
)
x_optimal = None
if cost_cls is PyTorchCost:
try:
import torch # noqa: PLC0415
except ImportError as e:
raise ImportError("PyTorch must be installed to use PyTorchCost") from e
from decent_bench.utils.pytorch_utils import ArgmaxActivation, SimpleLinearModel # noqa: PLC0415
def model_gen() -> torch.nn.Module:
return SimpleLinearModel(
input_size=3,
hidden_sizes=[],
activation=None,
output_size=2,
)
# Mypy cannot infer that cost_cls is PyTorchCost here
pytorch_costs: list[PyTorchCost] = [
PyTorchCost(
dataset=p,
model=model_gen(),
loss_fn=torch.nn.CrossEntropyLoss(),
final_activation=ArgmaxActivation(),
batch_size=batch_size,
device=device,
)
for p in dataset.get_partitions()
]
LOGGER.info("... done!")
costs: Sequence[Cost] = pytorch_costs
elif cost_cls is LogisticRegressionCost:
classification_costs: list[LogisticRegressionCost] = [
LogisticRegressionCost(dataset=p, batch_size=batch_size) for p in dataset.get_partitions()
]
LOGGER.info("... done!")
if compute_x_optimal:
# agents have the same n_samples, so minimizing a single logistic cost with all data is equivalent
sum_cost = LogisticRegressionCost(dataset=dataset.get_datapoints(), batch_size="all")
x_optimal = ca.solve(
sum_cost,
max_iter=SOLVE_MAX_ITER,
stop_tol=SOLVE_STOP_TOL,
max_tol=SOLVE_MAX_TOL,
show_progress=show_progress,
)
costs = classification_costs
else:
raise ValueError(f"Unsupported cost class: {cost_cls}")
return costs, x_optimal, test_data.get_datapoints()
[docs]
def create_regression_problem(
cost_cls: type[LinearRegressionCost | PyTorchCost] = LinearRegressionCost,
*,
device: SupportedDevices = SupportedDevices.CPU,
n_agents: int = 100,
batch_size: EmpiricalRiskBatchSize = "all",
compute_x_optimal: bool = True,
) -> tuple[Sequence[Cost], Array | None, Dataset]:
"""
Create out-of-the-box regression problems.
Args:
cost_cls: type of cost function
device: device to create the problem on (only relevant for PyTorchCost)
n_agents: number of agents
batch_size: size of mini-batches for stochastic methods, or "all" for full-batch
compute_x_optimal: if the optimal solution should be computed
(by solving the linear system of equations). It is ignored when PyTorchCost is selected.
Note:
If cost_cls is :class:`~decent_bench.costs.PyTorchCost`, x_optimal is not computed and set to None.
Be aware that metrics that rely on x_optimal (e.g. :class:`~decent_bench.metrics.metric_library.Regret`)
will not be available when using PyTorchCost.
Raises:
ValueError: if an unsupported cost class is provided
ImportError: if PyTorchCost is selected but PyTorch is not installed
"""
if not LOGGER.handlers:
logger.start_logger()
LOGGER.info("Creating cost functions ...")
dataset = SyntheticRegressionDatasetHandler(
n_targets=1,
n_partitions=n_agents,
n_samples_per_partition=10,
n_features=1,
framework=SupportedFrameworks.PYTORCH if cost_cls is PyTorchCost else SupportedFrameworks.NUMPY,
device=device,
feature_dtype=np.float32 if cost_cls is PyTorchCost else np.float64,
target_dtype=np.float32 if cost_cls is PyTorchCost else np.float64,
)
test_data = SyntheticRegressionDatasetHandler(
n_targets=1,
n_partitions=1,
n_samples_per_partition=100, # 1 partition so this is number of samples in test set
n_features=1,
framework=SupportedFrameworks.PYTORCH if cost_cls is PyTorchCost else SupportedFrameworks.NUMPY,
device=device,
feature_dtype=np.float32 if cost_cls is PyTorchCost else np.float64,
target_dtype=np.float32 if cost_cls is PyTorchCost else np.float64,
)
x_optimal = None
if cost_cls is PyTorchCost:
try:
import torch # noqa: PLC0415
except ImportError as e:
raise ImportError("PyTorch must be installed to use PyTorchCost") from e
from decent_bench.utils.pytorch_utils import SimpleLinearModel # noqa: PLC0415
def model_gen() -> torch.nn.Module:
return SimpleLinearModel(
input_size=1,
hidden_sizes=[],
activation=None,
output_size=1,
)
pytorch_costs: list[PyTorchCost] = [
PyTorchCost(dataset=p, model=model_gen(), loss_fn=torch.nn.MSELoss(), batch_size=batch_size, device=device)
for p in dataset.get_partitions()
]
LOGGER.info("... done!")
costs: Sequence[Cost] = pytorch_costs
elif cost_cls is LinearRegressionCost:
regression_costs: list[LinearRegressionCost] = [
LinearRegressionCost(dataset=p, batch_size=batch_size) for p in dataset.get_partitions()
]
LOGGER.info("... done!")
if compute_x_optimal:
x_optimal = ca.solve(
reduce(add, regression_costs),
max_iter=SOLVE_MAX_ITER,
stop_tol=SOLVE_STOP_TOL,
max_tol=SOLVE_MAX_TOL,
show_progress=False,
)
costs = regression_costs
else:
raise ValueError(f"Unsupported cost class: {cost_cls}")
return costs, x_optimal, test_data.get_datapoints()
[docs]
def create_quadratic_problem(
size: int = 10,
n_agents: int = 100,
) -> tuple[Sequence[Cost], Array]:
"""
Create out-of-the-box quadratic problems.
Args:
size: number of dimensions
n_agents: number of agents
"""
if not LOGGER.handlers:
logger.start_logger()
LOGGER.info("Creating cost functions ...")
A, b = [], [] # noqa: N806
for _ in range(n_agents):
A_i = iop.uniform(shape=(size, size), framework=SupportedFrameworks.NUMPY, device=SupportedDevices.CPU) # noqa: N806
A.append((A_i + iop.transpose(A_i)) / 2 + size * iop.eye_like(A_i))
b.append(iop.normal(shape=(size,), std=10, framework=SupportedFrameworks.NUMPY, device=SupportedDevices.CPU))
costs = [QuadraticCost(A[i], b[i]) for i in range(n_agents)]
LOGGER.info("... done!")
x_optimal = ca.solve(
reduce(add, costs),
max_iter=SOLVE_MAX_ITER,
stop_tol=SOLVE_STOP_TOL,
max_tol=SOLVE_MAX_TOL,
show_progress=False,
)
return costs, x_optimal