Source code for decent_bench.datasets._kaggle_handler
from __future__ import annotations
from collections.abc import Sequence
import numpy as np
from numpy.typing import DTypeLike
try:
import kagglehub # type: ignore[import-untyped]
import pandas as pd
KAGGLE_AVAILABLE = True
except ImportError:
KAGGLE_AVAILABLE = False
import decent_bench.utils.interoperability as iop
from decent_bench.utils.types import Dataset, SupportedDevices, SupportedFrameworks
from ._dataset_handler import DatasetHandler
[docs]
class KaggleDatasetHandler(DatasetHandler):
def __init__(
self,
kaggle_handle: str,
path: str,
feature_columns: list[str],
target_columns: list[str],
n_partitions: int = 1,
*,
framework: SupportedFrameworks = SupportedFrameworks.NUMPY,
device: SupportedDevices = SupportedDevices.CPU,
dtype: DTypeLike = np.float64,
samples_per_partition: int | None = None,
) -> None:
"""
Dataset wrapper for Kaggle datasets.
Args:
kaggle_handle: Kaggle dataset handle, e.g. "user_name/dataset_name"
path: Path to the dataset file within the Kaggle dataset
feature_columns: List of feature column names
target_columns: List of target column names
n_partitions: Number of partitions to split the dataset into
framework: Framework to use for data representation
device: Device to use for data representation
dtype: Data type of the returned arrays
samples_per_partition: Number of samples per partition
Raises:
ImportError: If kagglehub or pandas is not installed
RuntimeError: If the dataset fails to load from Kaggle
ValueError: If there are not enough samples to create the requested partitions
Note:
If you need to authenticate with Kaggle, ensure that your Kaggle API credentials
are set up correctly. Easiest solution is to set your api token in the environment variable
``KAGGLE_API_TOKEN``. Refer to https://www.kaggle.com/docs/api for more information.
"""
if not KAGGLE_AVAILABLE:
raise ImportError(
"kagglehub and pandas are required to use KaggleDataset. "
"Install them with: pip install kagglehub pandas"
)
self.kaggle_handle = kaggle_handle
self.path = path
self.feature_columns = feature_columns
self.target_columns = target_columns
self._n_partitions = n_partitions
self.framework = framework
self.device = device
self.dtype = dtype
self._partitions: Sequence[Dataset] | None = None
self._df: pd.DataFrame = kagglehub.dataset_load( # pyright: ignore[reportPossiblyUnboundVariable]
kagglehub.KaggleDatasetAdapter.PANDAS, # pyright: ignore[reportPossiblyUnboundVariable]
self.kaggle_handle,
self.path,
)
if self._df is None:
raise RuntimeError(f"Failed to load dataset from Kaggle handle: {self.kaggle_handle}, path: {self.path}")
self.samples_per_partition = (
len(self._df) // self.n_partitions if samples_per_partition is None else samples_per_partition
)
if self.samples_per_partition * self.n_partitions > len(self._df):
raise ValueError(
f"Not enough samples in dataset ({len(self._df)}) to create "
f"{self.n_partitions} partitions with {self.samples_per_partition} "
f"samples each ({self.samples_per_partition * self.n_partitions} total)."
)
@property
def n_samples(self) -> int:
return len(self._df)
@property
def n_partitions(self) -> int:
return self._n_partitions
@property
def n_features(self) -> int:
return len(self.feature_columns)
@property
def n_targets(self) -> int:
return len(self.target_columns)
[docs]
def get_datapoints(self) -> Dataset:
return self._create_partition(self._df)
[docs]
def get_partitions(self) -> Sequence[Dataset]:
"""
Return the dataset divided into partitions for distribution among agents.
This method provides the core partitioning functionality for decentralized
optimization. Each partition represents the local dataset of an agent in
the network.
Each partition is sampled uniformly at random from the dataset without replacement.
Returns:
Sequence[Dataset]: Sequence of Dataset objects, where each partition is a list of
(features, targets) tuples.
"""
if self._partitions is None:
self._partitions = self._random_split(self._df)
return self._partitions
def _random_split(self, df: pd.DataFrame) -> Sequence[Dataset]:
# Shuffle the dataframe
df = df.sample(frac=1, random_state=iop.rng_numpy(), replace=False).reset_index(drop=True)
partitions: list[Dataset] = []
for i in range(self.n_partitions):
start_idx = i * self.samples_per_partition
end_idx = start_idx + self.samples_per_partition
partition_df = df.iloc[start_idx:end_idx]
partitions.append(self._create_partition(partition_df))
return partitions
def _create_partition(self, df_partition: pd.DataFrame) -> Dataset:
partition: Dataset = []
for _, row in df_partition.iterrows():
x = iop.to_array(
row[self.feature_columns].to_numpy().astype(self.dtype),
framework=self.framework,
device=self.device,
)
y = iop.to_array(
row[self.target_columns].to_numpy().astype(self.dtype),
framework=self.framework,
device=self.device,
)
partition.append((x, y))
return partition