from __future__ import annotations
from functools import cached_property
import numpy as np
import numpy.linalg as la
from numpy import float64
from numpy.typing import NDArray
from scipy import special
import decent_bench.centralized_algorithms as ca
import decent_bench.utils.interoperability as iop
from decent_bench.utils._tags import tags
from decent_bench.utils.array import Array
from decent_bench.utils.types import (
Dataset,
EmpiricalRiskBatchSize,
EmpiricalRiskIndices,
EmpiricalRiskReduction,
SupportedDevices,
SupportedFrameworks,
)
from ._empirical_risk_cost import EmpiricalRiskCost
[docs]
@tags("classification", "empirical-risk")
class LogisticRegressionCost(EmpiricalRiskCost):
r"""
Logistic regression cost function.
Given a data matrix :math:`\mathbf{A} \in \mathbb{R}^{m \times n}` and target vector
:math:`\mathbf{b} \in \mathbb{R}^{m}`, the logistic regression cost function is defined as:
.. math:: f(\mathbf{x}) =
-\frac{1}{m}\left[ \mathbf{b}^T \log( \sigma(\mathbf{Ax}) )
+ ( \mathbf{1} - \mathbf{b} )^T
\log( 1 - \sigma(\mathbf{Ax}) ) \right]
= -\frac{1}{m}\sum_{i = 1}^m \left[ b_i \log( \sigma(A_i x) )
+ (1 - b_i) \log( 1 - \sigma(A_i x) ) \right]
where :math:`\sigma(z) = \frac{1}{1 + e^{-z}}` is the sigmoid function, :math:`A_i` and :math:`b_i` are the i-th
row of :math:`\mathbf{A}` and the i-th element of :math:`\mathbf{b}` respectively.
In the stochastic setting, a mini-batch of size :math:`b < m` is used to compute the cost and its derivatives.
The cost function then becomes:
.. math:: f(\mathbf{x}) =
-\frac{1}{b} \left[ \mathbf{b}_{\mathcal{B}}^T \log( \sigma(\mathbf{A}_{\mathcal{B}}\mathbf{x}) )
+ ( \mathbf{1} - \mathbf{b}_{\mathcal{B}} )^T
\log( 1 - \sigma(\mathbf{A}_{\mathcal{B}}\mathbf{x}) ) \right]
= -\frac{1}{b} \sum_{i \in \mathcal{B}} \left[ b_i \log( \sigma(A_i x) )
+ (1 - b_i) \log( 1 - \sigma(A_i x) ) \right]
where :math:`\mathcal{B}` is a sampled batch of :math:`b` indices from :math:`\{1, \ldots, m\}`,
:math:`\mathbf{A}_B` and :math:`\mathbf{b}_B` are the rows corresponding to the batch :math:`\mathcal{B}`.
"""
[docs]
def __init__(self, dataset: Dataset, batch_size: EmpiricalRiskBatchSize = "all"):
"""
Initialize logistic regression cost function.
Args:
dataset (Dataset): Dataset containing features and targets. The expected shapes are:
- Features: (n_features,)
- Targets: single dimensional values
batch_size (EmpiricalRiskBatchSize): Size of mini-batch to use for stochastic methods.
If "all", full-batch methods are used.
Raises:
ValueError: If input dimensions are incorrect or batch_size is invalid.
TypeError: If dataset targets are not single dimensional values.
"""
if len(iop.shape(dataset[0][0])) != 1:
raise ValueError(f"Dataset features must be vectors, got: {dataset[0][0]}")
if iop.to_numpy(dataset[0][1]).shape != (1,):
raise TypeError(
f"Dataset targets must be single dimensional values, got: {dataset[0][1]} "
f"with shape {iop.to_numpy(dataset[0][1]).shape}, expected shape is (1,)."
)
if isinstance(batch_size, int) and (batch_size <= 0 or batch_size > len(dataset)):
raise ValueError(
f"Batch size must be positive and at most the number of samples, "
f"got: {batch_size} and number of samples is: {len(dataset)}."
)
if isinstance(batch_size, str) and batch_size != "all":
raise ValueError(f"Invalid batch size string. Supported value is 'all', got {batch_size}.")
class_labels = {iop.to_numpy(y).item() for _, y in dataset}
if len(class_labels) != 2:
raise ValueError("Dataset must contain exactly two classes")
self._dataset = dataset
self._label_mapping = dict(enumerate(class_labels))
self._batch_size = self.n_samples if batch_size == "all" else batch_size
# Cache data matrices for efficiency when using full dataset
self.A: NDArray[float64] | None = None
self.b: NDArray[float64] | None = None
@property
def shape(self) -> tuple[int, ...]:
return iop.shape(self._dataset[0][0])
@property
def framework(self) -> SupportedFrameworks:
return SupportedFrameworks.NUMPY
@property
def device(self) -> SupportedDevices:
return SupportedDevices.CPU
@property
def n_samples(self) -> int:
return len(self._dataset)
@property
def batch_size(self) -> int:
return self._batch_size
@property
def dataset(self) -> Dataset:
return self._dataset
[docs]
@cached_property
def m_smooth(self) -> float: # pyright: ignore[reportIncompatibleMethodOverride]
r"""
The cost function's smoothness constant.
.. math::
\frac{1}{m} \frac{m}{4} \max_i \|\mathbf{A}_i\|^2 = \frac{1}{4} \max_i \|\mathbf{A}_i\|^2
where m is the number of rows in :math:`\mathbf{A}`.
For the general definition, see
:attr:`Cost.m_smooth <decent_bench.costs.Cost.m_smooth>`.
"""
A, _ = self._get_batch_data("all") # noqa: N806
return float(max(pow(la.norm(row), 2) for row in A) / 4)
@property
def m_cvx(self) -> float:
"""
The cost function's convexity constant, 0.
For the general definition, see
:attr:`Cost.m_cvx <decent_bench.costs.Cost.m_cvx>`.
"""
return 0
[docs]
@iop.autodecorate_cost_method(EmpiricalRiskCost.predict)
def predict(self, x: NDArray[float64], data: list[NDArray[float64]]) -> NDArray[float64]:
r"""
Make predictions at x on the given data.
The predicted targets are computed as :math:`\sigma(\mathbf{Ax}) > 0.5`,
where :math:`\sigma` is the sigmoid function.
Args:
x: Point to make predictions at.
data: List of NDArray containing data to make predictions on.
Returns:
Predicted targets as an array.
"""
pred_data = np.stack(data) if isinstance(data, list) else data
logits = pred_data.dot(x)
sig = special.expit(logits)
return np.array([self._label_mapping[label] for label in (sig >= 0.5).astype(int)])
[docs]
@iop.autodecorate_cost_method(EmpiricalRiskCost.function)
def function(self, x: NDArray[float64], indices: EmpiricalRiskIndices = "batch") -> float:
r"""
Evaluate function at x using datapoints at the given indices.
Supported values for indices are:
- int: datapoint to use.
- list[int]: datapoints to use.
- "all": use the full dataset.
- "batch": draw a batch with :attr:`batch_size` samples.
If no batching is used, this is:
.. math::
-\frac{1}{m}\left[ \mathbf{b}^T \log( \sigma(\mathbf{Ax}) )
+ ( \mathbf{1} - \mathbf{b} )^T
\log( 1 - \sigma(\mathbf{Ax}) ) \right]
If indices is "batch", a random batch :math:`\mathcal{B}` is drawn with :attr:`batch_size` samples.
.. math::
-\frac{1}{b} \left[ \mathbf{b}_{\mathcal{B}}^T \log( \sigma(\mathbf{A}_{\mathcal{B}}\mathbf{x}) )
+ ( \mathbf{1} - \mathbf{b}_{\mathcal{B}} )^T
\log( 1 - \sigma(\mathbf{A}_{\mathcal{B}}\mathbf{x}) ) \right]
where :math:`\sigma` is the sigmoid function, :math:`\mathbf{A}_B` and :math:`\mathbf{b}_B` are
the rows corresponding to the batch :math:`\mathcal{B}`.
"""
A, b = self._get_batch_data(indices) # noqa: N806
Ax = A.dot(x) # noqa: N806
neg_log_sig = np.logaddexp(0.0, -Ax)
cost = b.dot(neg_log_sig) + (1 - b).dot(Ax + neg_log_sig)
return float(cost) / len(self.batch_used)
[docs]
@iop.autodecorate_cost_method(EmpiricalRiskCost.gradient)
def gradient(
self,
x: NDArray[float64],
indices: EmpiricalRiskIndices = "batch",
reduction: EmpiricalRiskReduction = "mean",
) -> NDArray[float64]:
r"""
Gradient at x using datapoints at the given indices.
Supported values for indices are:
- int: datapoint to use.
- list[int]: datapoints to use.
- "all": use the full dataset.
- "batch": draw a batch with :attr:`batch_size` samples.
Supported values for reduction are:
- "mean": average the gradients over the samples.
- None: return the gradients for each sample, index as the first dimension.
If no batching is used, this is:
.. math::
\frac{1}{m}\mathbf{A}^T (\sigma(\mathbf{Ax}) - \mathbf{b})
If indices is "batch", a random batch :math:`\mathcal{B}` is drawn with :attr:`batch_size` samples.
.. math::
\frac{1}{b} \mathbf{A}_{\mathcal{B}}^T (\sigma(\mathbf{A}_{\mathcal{B}}\mathbf{x})
- \mathbf{b}_{\mathcal{B}})
where :math:`\sigma` is the sigmoid function, :math:`\mathbf{A}_B` and :math:`\mathbf{b}_B`
are the rows corresponding to the batch :math:`\mathcal{B}`.
Note:
When reduction is None, the returned array will have an additional leading dimension
corresponding to the number of samples used. Indexing into this dimension will give the gradient
for the respective sample in :attr:`batch_used <decent_bench.costs.EmpiricalRiskCost.batch_used>`.
"""
if reduction is None:
return self._per_sample_gradients(x, indices)
A, b = self._get_batch_data(indices) # noqa: N806
sig = special.expit(A.dot(x))
res: NDArray[float64] = A.T.dot(sig - b) / len(self.batch_used)
return res
def _per_sample_gradients(
self,
x: NDArray[float64],
indices: EmpiricalRiskIndices = "batch",
) -> NDArray[float64]:
A, b = self._get_batch_data(indices) # noqa: N806
sig = special.expit(A.dot(x))
res = [A[i, :].reshape(-1) * (sig[i] - b[i]) for i in range(A.shape[0])]
return np.asarray(res)
[docs]
@iop.autodecorate_cost_method(EmpiricalRiskCost.hessian)
def hessian(self, x: NDArray[float64], indices: EmpiricalRiskIndices = "batch") -> NDArray[float64]:
r"""
Hessian at x using datapoints at the given indices.
Supported values for indices are:
- int: datapoint to use.
- list[int]: datapoints to use.
- "all": use the full dataset.
- "batch": draw a batch with :attr:`batch_size` samples.
If no batching is used, this is:
.. math::
\frac{1}{m}\mathbf{A}^T \mathbf{DA}
where :math:`\sigma` is the sigmoid function and :math:`\mathbf{D}` is a diagonal matrix such that
:math:`\mathbf{D}_i = \sigma(\mathbf{Ax}_i) (1-\sigma(\mathbf{Ax}_i))`
If indices is "batch", a random batch :math:`\mathcal{B}` is drawn with :attr:`batch_size` samples.
.. math::
\frac{1}{b} \mathbf{A}_{\mathcal{B}}^T \mathbf{D}_{\mathcal{B}} \mathbf{A}_{\mathcal{B}}
where :math:`\mathbf{A}_B` and :math:`\mathbf{D}_B` are the rows corresponding to the batch :math:`\mathcal{B}`.
"""
A, _ = self._get_batch_data(indices) # noqa: N806
sig = special.expit(A.dot(x))
D = np.diag(sig * (1 - sig)) # noqa: N806
res: NDArray[float64] = A.T.dot(D).dot(A) / len(self.batch_used)
return res
[docs]
@iop.autodecorate_cost_method(EmpiricalRiskCost.proximal)
def proximal(self, x: Array, penalty: float) -> Array:
"""
Proximal at x solved using an iterative method.
The proximal for logistic regression does not have closed form solution, will use
a gradient based approximation method over the entire dataset, over at most 100 iterations.
See
:meth:`Cost.proximal() <decent_bench.costs.Cost.proximal>`
for the general proximal definition.
"""
prev_batch_size = self.batch_size
self._batch_size = self.n_samples # Use full dataset for proximal
approx = ca.proximal_solver(self, x, penalty)
self._batch_size = prev_batch_size # Restore previous batch size
return approx
[docs]
def _get_batch_data(self, indices: EmpiricalRiskIndices = "batch") -> tuple[NDArray[float64], NDArray[float64]]:
"""Get data for a batch. Returns A and b for the batch."""
indices = self._sample_batch_indices(indices)
if len(indices) == self.n_samples:
# Use full dataset
if self.A is None or self.b is None:
self.A = np.stack([iop.to_numpy(x) for x, _ in self._dataset])
self.b = np.stack([iop.to_numpy(y) for _, y in self._dataset]).squeeze()
for k in self._label_mapping:
self.b[np.where(self.b == self._label_mapping[k])] = k
return self.A, self.b
A, b = np.empty((len(indices), *self.shape)), np.empty(len(indices)) # noqa: N806
for i, idx in enumerate(indices):
x_i, y_i = self._dataset[idx]
A[i, :] = iop.to_numpy(x_i)
b[i] = iop.to_numpy(y_i).item()
for k in self._label_mapping:
b[np.where(b == self._label_mapping[k])] = k
return A, b