Source code for decent_bench.algorithms.p2p._wang_elia
from dataclasses import dataclass
import decent_bench.utils.interoperability as iop
from decent_bench.algorithms.utils import initial_states
from decent_bench.networks import P2PNetwork
from decent_bench.utils._tags import tags
from decent_bench.utils.types import InitialStates
from ._p2p_algorithm import P2PAlgorithm
_STATE_CHANNEL = "state"
_PREVIOUS_STATE_CHANNEL = "previous_state"
[docs]
@tags("peer-to-peer", "gradient-tracking")
@dataclass(eq=False)
class WangElia(P2PAlgorithm):
r"""
Wang-Elia gradient tracking algorithm characterized by the updates below, see :footcite:p:`Alg_Wang_1, Alg_Wang_2`.
.. math::
\mathbf{x}_{i, k+1} = \mathbf{x}_{i, k} - \sum_j \mathbf{K}_{ij} (\mathbf{x}_{j, k} + \mathbf{z}_{j, k})
- \rho \nabla f_i(\mathbf{x}_{i,k})
.. math::
\mathbf{z}_{i, k+1} = \mathbf{z}_{i, k} + \sum_j \mathbf{K}_{ij} \mathbf{x}_{j, k}
where
:math:`\mathbf{x}_{i, k}` is agent i's local optimization variable at iteration k,
:math:`\rho` is the step size,
:math:`f_i` is agent i's local cost function,
j is a neighbor of i or i itself,
and :math:`\mathbf{K}_{ij}` is the weight between agent i and j.
The matrix :math:`\mathbf{K}` is chosen as :math:`0.5 (\mathbf{I} - \mathbf{W})`,
where :math:`\mathbf{W}` is the Metropolis weight matrix.
.. footbibliography::
"""
iterations: int = 100
step_size: float = 0.001
x0: InitialStates = None
name: str = "Wang-Elia"
def __post_init__(self) -> None:
"""
Validate hyperparameters.
Raises:
ValueError: if hyperparameters are invalid.
"""
if self.step_size <= 0:
raise ValueError("`step_size` must be positive")
def initialize(self, network: P2PNetwork) -> None:
self.x0 = initial_states(self.x0, network)
for i in network.agents():
i.initialize(x=self.x0[i], aux_vars={"z": iop.zeros_like(self.x0[i]), "x_old": self.x0[i]})
W = network.weights # noqa: N806
K = 0.5 * (iop.eye_like(W) - W) # noqa: N806
self.K = K
def step(self, network: P2PNetwork, _: int) -> None:
# 1st communication round
for i in network.active_agents():
msg = i.x + i.aux_vars["z"]
i.aux_vars["msg"] = msg
network.broadcast(i, msg, channel=_STATE_CHANNEL)
# do consensus and local gradient step
for i in network.active_agents():
neighborhood_avg = self.K[i, i] * i.aux_vars["msg"]
for j, msg_j in i.messages(_STATE_CHANNEL).items():
neighborhood_avg += self.K[i, j] * msg_j
i.aux_vars["x_old"] = i.x
i.x = i.x - neighborhood_avg - self.step_size * i.cost.gradient(i.x)
# 2nd communication round
for i in network.active_agents():
network.broadcast(i, i.aux_vars["x_old"], channel=_PREVIOUS_STATE_CHANNEL)
# update auxiliary variable
for i in network.active_agents():
neighborhood_avg = self.K[i, i] * i.aux_vars["x_old"]
for j, x_old_j in i.messages(_PREVIOUS_STATE_CHANNEL).items():
neighborhood_avg += self.K[i, j] * x_old_j
i.aux_vars["z"] += neighborhood_avg