diff --git a/.github/workflows/test-gpu.yaml b/.github/workflows/test-gpu.yaml index 68319e7..4f783b9 100644 --- a/.github/workflows/test-gpu.yaml +++ b/.github/workflows/test-gpu.yaml @@ -40,7 +40,7 @@ jobs: run: nvidia-smi - name: Install Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: "3.12" @@ -50,7 +50,7 @@ jobs: cache-dependency-glob: pyproject.toml - name: Install fknni - run: uv pip install --system -e ".[test,faissgpu]" + run: uv pip install --system -e ".[test,faissgpu,rapids12]" - name: Pip list run: pip list diff --git a/ci/environment.yml b/ci/environment.yml deleted file mode 100644 index 38e8c1d..0000000 --- a/ci/environment.yml +++ /dev/null @@ -1,10 +0,0 @@ -name: fknni -channels: - - rapidsai - - nvidia - - conda-forge -dependencies: - - rapids=25.10 - - python=3.13 - - cuda-version=12.9 - - cudnn diff --git a/docs/notebooks/faiss.ipynb b/docs/notebooks/faiss.ipynb index 6afcf3a..e96738d 100644 --- a/docs/notebooks/faiss.ipynb +++ b/docs/notebooks/faiss.ipynb @@ -28,7 +28,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2024-04-25T13:28:24.303873023Z", @@ -40,7 +40,7 @@ "source": [ "import numpy as np\n", "import pandas as pd\n", - "from fknni import FaissImputer\n", + "from fknni import FastKNNImputer\n", "from sklearn.impute import KNNImputer" ] }, @@ -795,7 +795,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2024-04-25T13:28:24.316976982Z", @@ -822,7 +822,7 @@ } ], "source": [ - "faiss_imputer = FaissImputer(n_neighbors=5, strategy=\"weighted\")\n", + "faiss_imputer = FastKNNImputer(n_neighbors=5, strategy=\"weighted\")\n", "\n", "df_imputed_faiss = faiss_imputer.fit_transform(df_missing)\n", "df_imputed_faiss" @@ -965,7 +965,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2024-04-25T13:29:06.633118482Z", @@ -998,7 +998,7 @@ "X.values[np.unravel_index(missing_indices, X.shape)] = np.nan\n", "\n", "knn_imputer = KNNImputer(n_neighbors=5)\n", - "faiss_imputer = FaissImputer(n_neighbors=5)\n", + "faiss_imputer = FastKNNImputer(n_neighbors=5)\n", "\n", "start_time = time.time()\n", "knn_imputed = knn_imputer.fit_transform(X)\n", @@ -1009,7 +1009,7 @@ "faiss_time = time.time() - start_time\n", "\n", "times = [knn_time, faiss_time]\n", - "labels = [\"scikit-learn KNNImputer\", \"FaissImputer\"]\n", + "labels = [\"scikit-learn KNNImputer\", \"FastKNNImputer\"]\n", "plt.bar(labels, times, color=[\"blue\", \"green\"])\n", "plt.ylabel(\"Time in seconds\")\n", "plt.title(\"Imputation Time Comparison for 10000 samples and 50 features with 10% missing rate\")\n", diff --git a/pyproject.toml b/pyproject.toml index 6f11b7e..a3ab0b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "lamin-utils", + "array-api-compat", "pandas", "scikit-learn", ] diff --git a/src/fknni/__init__.py b/src/fknni/__init__.py index 1f99739..246bdd6 100644 --- a/src/fknni/__init__.py +++ b/src/fknni/__init__.py @@ -1,7 +1,7 @@ from importlib.metadata import version -__all__ = ["faiss"] +__version__ = version("fknni") -from .faiss import FaissImputer +from .knn import FastKNNImputer -__version__ = version("fknni") +__all__ = ["FastKNNImputer", "FaissImputer"] diff --git a/src/fknni/faiss/__init__.py b/src/fknni/faiss/__init__.py deleted file mode 100644 index e339e1c..0000000 --- a/src/fknni/faiss/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .faiss import FaissImputer diff --git a/src/fknni/faiss/faiss.py b/src/fknni/faiss/faiss.py deleted file mode 100644 index 1df044e..0000000 --- a/src/fknni/faiss/faiss.py +++ /dev/null @@ -1,266 +0,0 @@ -from __future__ import annotations - -from collections.abc import Sequence -from typing import Any, Literal - -import faiss -import numpy as np -from lamin_utils import logger -from numpy import dtype -from sklearn.base import BaseEstimator, TransformerMixin - -try: - import faiss - - HAS_FAISS_GPU = hasattr(faiss, "StandardGpuResources") -except ImportError: - raise ImportError("faiss-cpu or faiss-gpu required") from None - - -class FaissImputer(BaseEstimator, TransformerMixin): - """Imputer for completing missing values using Faiss, incorporating weighted averages based on distance.""" - - def __init__( - self, - missing_values: int | float | str | None = np.nan, - n_neighbors: int = 5, - *, - metric: Literal["l2", "ip"] = "l2", - strategy: Literal["mean", "median", "weighted"] = "mean", - index_factory: str = "Flat", - min_data_ratio: float = 0.25, - temporal_mode: Literal["flatten", "per_variable"] = "flatten", - use_gpu: bool = False, - ): - """Initializes FaissImputer with specified parameters that are used for the imputation. - - Args: - missing_values: The missing value to impute. - n_neighbors: Number of neighbors to use for imputation. - metric: Distance metric to use for neighbor search. - strategy: Method to compute imputed values among neighbors. - The weighted strategy is similar to scikt-learn's implementation, - where closer neighbors have a higher influence on the imputation. - index_factory: Description of the Faiss index type to build. - min_data_ratio: The minimum (dimension 0) size of the FAISS index relative to the (dimension 0) size of the - dataset that will be used to train FAISS. Defaults to 0.25. See also `fit_transform`. - temporal_mode: How to handle 3D temporal data. 'flatten' treats all (variable, timestep) pairs as - independent features (fast but allows temporal leakage). - 'per_variable' imputes each variable independently across time (slower but respects temporal causality). - use_gpu: Whether to train using GPU. - """ - if n_neighbors < 1: - raise ValueError("n_neighbors must be at least 1.") - if strategy not in {"mean", "median", "weighted"}: - raise ValueError("Unknown strategy. Choose one of 'mean', 'median', 'weighted'") - if temporal_mode not in {"flatten", "per_variable"}: - raise ValueError("Unknown temporal_mode. Choose one of 'flatten', 'per_variable'") - - self.use_gpu = use_gpu - if use_gpu and not HAS_FAISS_GPU: - raise ValueError("use_gpu=True requires faiss-gpu package, install with: pip install faiss-gpu") from None - - self.missing_values = missing_values - self.n_neighbors = n_neighbors - self.metric = metric - self.strategy = strategy - self.index_factory = index_factory - self.temporal_mode = temporal_mode - self.X_full = None - self.features_nan = None - self.min_data_ratio = min_data_ratio - self.warned_fallback = False - self.warned_unsufficient_neighbors = False - super().__init__() - - def fit_transform( # noqa: D417 - self, X: np.ndarray, y: np.ndarray | None = None, **fit_params - ) -> np.ndarray[Any, dtype[Any]] | None: - """Imputes missing values in the data using the fitted Faiss index. This imputation will be performed in place. - - This imputation will use `min_data_ratio` to check if the index is of sufficient (dimension 0) size to perform a qualitative KNN lookup. - If not, it will temporarily exclude enough features to reach this threshold and try again. - If an index still can't be built, it will use fallbacks values as defined by self.strategy. - - Args: - X: Input data with potential missing values. Can be 2D (samples × features) or 3D (samples × features × timesteps). - y: Ignored, present for compatibility with sklearn's TransformerMixin. - - Returns: - Data with imputed values as a NumPy array of the original data type. - """ - original_shape = X.shape - - if X.ndim == 3 and self.temporal_mode == "per_variable": - n_obs, n_vars, n_t = X.shape - result = np.empty_like(X, dtype=np.float64) - for var_idx in range(n_vars): - X_slice = X[:, var_idx, :] - result[:, var_idx, :] = self._impute_2d(X_slice) - return result - - if X.ndim == 3: - n_obs, n_vars, n_t = X.shape - X = X.reshape(n_obs, n_vars * n_t) - - result = self._impute_2d(X) - - if len(original_shape) == 3: - result = result.reshape(original_shape) - - return result - - def _impute_2d(self, X: np.ndarray) -> np.ndarray: - self.X_full = np.asarray(X, dtype=np.float64) if not np.issubdtype(X.dtype, np.floating) else X - if np.isnan(self.X_full).all(axis=0).any(): - raise ValueError("Features with only missing values cannot be handled.") - - # Prepare fallback values, used to prefill the query vectors nan´s - # or as an imputation fallback if we can't build an index - global_fallbacks_ = ( - np.nanmean(self.X_full, axis=0) - if self.strategy in ["mean", "weighted"] - else np.nanmedian(self.X_full, axis=0) - ) - - # We will need to impute all features having nan´s - feature_indices_to_impute = [i for i in range(self.X_full.shape[1]) if np.isnan(self.X_full[:, i]).any()] - - # Now impute iteratively - while feature_indices_to_impute: - feature_indices_being_imputed, training_indices, training_data, index = self._fit_train_imputer( - feature_indices_to_impute - ) - - # Use fallback data if we can't build an index and iterate again - if index is None: - self._warn_fallback() - self.X_full[:, feature_indices_being_imputed] = global_fallbacks_[feature_indices_being_imputed] - continue - - # Extract the features from X that was used to train FAISS, and compute the sparseness matrix - x_imputed = self.X_full[:, training_indices] - x_imputed_missing_mask = np.isnan(x_imputed) - - # Iterate through the data to impute, ignoring already imputed rows - sample_missing_mask = [] - for idx in np.where(x_imputed_missing_mask.any(axis=1))[0]: - # Extract the sample - sample_missing_mask = x_imputed_missing_mask[idx] - sample = x_imputed[idx] - - # We need to prefill the query vector as FAISS doesn't accept nan´s - sample[sample_missing_mask] = global_fallbacks_[training_indices][sample_missing_mask] - missing_cols = np.where(sample_missing_mask)[0] - - # Call FAISS and retrieve data - distances, indices = index.search(sample.reshape(1, -1), self.n_neighbors) - assert len(indices[0]) == self.n_neighbors - valid_indices = indices[0][ - indices[0] >= 0 - ] # Filter out negative indices because they are FAISS error codes - - # FAISS couldn't find any neighbor, use fallback values, and go to next row - if len(valid_indices) == 0: - self._warn_fallback() - x_imputed[idx, missing_cols] = sample[missing_cols] - continue - - # FAISS couldn't find the amount of requested neighbors, warn user and proceed - if len(valid_indices) < self.n_neighbors: - if not self.warned_unsufficient_neighbors: - logger.warning( - "FAISS couldn't find all the requested neighbors. This warning will be displayed only once." - ) - self.warned_unsufficient_neighbors = True - - # Apply strategy on neighbors data - neighbors = training_data[valid_indices] - if self.strategy == "weighted": - weights = 1 / (distances[0] + 1e-10)[:, np.newaxis] - neighbor_vals = neighbors[:, missing_cols] - weighted_sum = np.nansum(neighbor_vals * weights, axis=0) - weight_sum = np.nansum(weights, axis=0) - x_imputed[idx, missing_cols] = weighted_sum / weight_sum - else: - func = np.nanmean if self.strategy == "mean" else np.nanmedian - x_imputed[idx, missing_cols] = func(neighbors[:, missing_cols], axis=0) - - # Transfer back to X - # Features added by _fit_train_imputer for training purpose only are placed on the right - # so we can just select the features on the left - self.X_full[:, feature_indices_being_imputed] = x_imputed[:, np.arange(len(feature_indices_being_imputed))] - - # Remove the imputed features from the to-do list - feature_indices_to_impute = [ - feature_indice - for feature_indice in feature_indices_to_impute - if feature_indice not in feature_indices_being_imputed - ] - - assert not np.isnan(self.X_full).any() - return self.X_full - - def _fit_train_imputer( - self, features_indices: Sequence[int] - ) -> tuple[list[int], list[int] | None, np.ndarray | None, faiss.Index | None]: - features_indices_to_impute = features_indices.copy() - - # See what features are already imputed - already_imputed_features_indices = [ - i for i in range(self.X_full.shape[1]) if not np.isnan(self.X_full[:, i]).any() - ] - - while True: - # Train data features are those indexed by features_indices AND those already fully imputed in - # the full X. Those indices are placed after the features to impute (ie on the right), - # so it will be easy to filter them out later. - train_indices = features_indices_to_impute + already_imputed_features_indices - - # Filter X with the features indices provided in column and not containing nan´s for rows - x_subset = self.X_full[:, train_indices] - x_non_missing = x_subset[~np.isnan(x_subset).any(axis=1)] - - # Check if we have enough data - if x_non_missing.shape[0] >= self.X_full.shape[0] * self.min_data_ratio: - # We have our list of features to impute! - return features_indices_to_impute, train_indices, x_non_missing, self._train(x_non_missing) - else: - # One feature left, meaning we can't build an index - if len(features_indices_to_impute) <= 1: - return features_indices, None, None, None - - # Remove the feature containing the largest amount of nan´s and iterate again - for value in self._features_indices_sorted_descending_on_nan(): - if value in features_indices_to_impute: - features_indices_to_impute.remove(value) - break - - def _features_indices_sorted_descending_on_nan(self) -> list[int]: - if self.features_nan is None: - self.features_nan = sorted( - (i for i in range(self.X_full.shape[1]) if np.isnan(self.X_full[:, i]).sum() > 0), - key=lambda i: np.isnan(self.X_full[:, i]).sum(), - reverse=True, - ) - - return self.features_nan - - def _train(self, x_train: np.ndarray) -> faiss.Index: - index = faiss.index_factory(x_train.shape[1], self.index_factory) - index.metric_type = faiss.METRIC_L2 if self.metric == "l2" else faiss.METRIC_INNER_PRODUCT - - if self.use_gpu: - res = faiss.StandardGpuResources() - index = faiss.index_cpu_to_gpu(res, 0, index) - - index.train(x_train) - index.add(x_train) - return index - - def _warn_fallback(self): - if not self.warned_fallback: - logger.warning( - "Fallback data (as defined by passed strategy) were used. This warning will only be displayed once." - ) - self.warned_fallback = True diff --git a/src/fknni/knn/__init__.py b/src/fknni/knn/__init__.py new file mode 100644 index 0000000..2ec98df --- /dev/null +++ b/src/fknni/knn/__init__.py @@ -0,0 +1 @@ +from .knn import FastKNNImputer diff --git a/src/fknni/knn/knn.py b/src/fknni/knn/knn.py new file mode 100644 index 0000000..42b7a62 --- /dev/null +++ b/src/fknni/knn/knn.py @@ -0,0 +1,438 @@ +from __future__ import annotations + +import warnings +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar + +import array_api_compat +import numpy as np +from numpy import dtype +from sklearn.base import BaseEstimator, TransformerMixin + +try: + from importlib.metadata import distributions + + import faiss + + HAS_FAISS_GPU = any(d.metadata["Name"].startswith("faiss-gpu") for d in distributions()) +except ImportError: + raise ImportError("faiss-cpu or faiss-gpu required") from None + +try: + import cupy as cp + from cuml.neighbors import NearestNeighbors as cuMLNearestNeighbors + + HAS_CUML = True +except ImportError: + HAS_CUML = False + cp = None + +if TYPE_CHECKING: + import cupy as cp + +ArrayT = TypeVar("ArrayT", np.ndarray, "cp.ndarray") + + +class NNIndex(Protocol): + """Protocol for nearest neighbor index implementations.""" + + def search(self, query: ArrayT, k: int) -> tuple[ArrayT, ArrayT]: + """Search for k nearest neighbors. + + Args: + query: Query vectors to search for. + k: Number of neighbors to return. + + Returns: + Tuple of (distances, indices) arrays. + """ + ... + + +class CuMLIndexWrapper: + """Wrapper around cuML NearestNeighbors to match FAISS search interface.""" + + def __init__(self, model: cuMLNearestNeighbors, X_train: cp.ndarray): + """Initialize wrapper with fitted cuML model. + + Args: + model: Fitted cuML NearestNeighbors model. + X_train: Training data used to fit the model. + """ + self._model = model + self._X_train = X_train + + def search(self, query: cp.ndarray, k: int) -> tuple[cp.ndarray, cp.ndarray]: + """Search for k nearest neighbors using cuML. + + Args: + query: Query vectors to search for. + k: Number of neighbors to return. + + Returns: + Tuple of (distances, indices) arrays. + """ + distances, indices = self._model.kneighbors(query, n_neighbors=k) + return distances, indices + + +class FastKNNImputer(BaseEstimator, TransformerMixin): + """Imputer for completing missing values using Faiss or cuML, incorporating weighted averages based on distance. + + Supports both numpy arrays (using FAISS) and cupy arrays (using cuML) for GPU-accelerated imputation. + When cupy arrays are passed, all computations stay on GPU. + """ + + def __init__( + self, + missing_values: int | float | str | None = np.nan, + n_neighbors: int = 5, + *, + metric: Literal["l2", "ip"] = "l2", + strategy: Literal["mean", "median", "weighted"] = "mean", + index_factory: str = "Flat", + min_data_ratio: float = 0.25, + temporal_mode: Literal["flatten", "per_variable"] = "flatten", + use_gpu: bool = False, + ): + """Initializes FaissImputer with specified parameters that are used for the imputation. + + Args: + missing_values: The missing value to impute. + n_neighbors: Number of neighbors to use for imputation. + metric: Distance metric to use for neighbor search. 'l2' for Euclidean, 'ip' for inner product. + strategy: Method to compute imputed values among neighbors. + The weighted strategy is similar to scikit-learn's implementation, + where closer neighbors have a higher influence on the imputation. + index_factory: Description of the Faiss index type to build (ignored for cupy arrays). + min_data_ratio: The minimum (dimension 0) size of the FAISS index relative to the (dimension 0) size of the + dataset that will be used to train FAISS. Defaults to 0.25. See also `fit_transform`. + temporal_mode: How to handle 3D temporal data. 'flatten' treats all (variable, timestep) pairs as + independent features (fast but allows temporal leakage). + 'per_variable' imputes each variable independently across time (slower but respects temporal causality). + use_gpu: Whether to train FAISS using GPU (only applies to numpy arrays, cupy arrays always use GPU via cuML). + """ + if n_neighbors < 1: + raise ValueError("n_neighbors must be at least 1.") + if strategy not in {"mean", "median", "weighted"}: + raise ValueError("Unknown strategy. Choose one of 'mean', 'median', 'weighted'") + if temporal_mode not in {"flatten", "per_variable"}: + raise ValueError("Unknown temporal_mode. Choose one of 'flatten', 'per_variable'") + + self.use_gpu = use_gpu + if use_gpu and not HAS_FAISS_GPU: + raise ValueError("use_gpu=True requires faiss-gpu package, install with: pip install faiss-gpu") + + self.missing_values = missing_values + self.n_neighbors = n_neighbors + self.metric = metric + self.strategy = strategy + self.index_factory = index_factory + self.temporal_mode = temporal_mode + self.X_full: np.ndarray | cp.ndarray | None = None + self.features_nan: list[int] | None = None + self.min_data_ratio = min_data_ratio + self.warned_fallback = False + self.warned_unsufficient_neighbors = False + super().__init__() + + def fit_transform( # noqa: D417 + self, X: np.ndarray | cp.ndarray, y: np.ndarray | None = None, **fit_params + ) -> np.ndarray[Any, dtype[Any]] | cp.ndarray: + """Imputes missing values in the data using the fitted Faiss/cuML index. This imputation will be performed in place. + + This imputation will use `min_data_ratio` to check if the index is of sufficient (dimension 0) size to perform a qualitative KNN lookup. + If not, it will temporarily exclude enough features to reach this threshold and try again. + If an index still can't be built, it will use fallbacks values as defined by self.strategy. + + For cupy arrays, computation stays entirely on GPU using cuML's NearestNeighbors. + + Args: + X: Input data with potential missing values. Can be 2D (samples × features) or 3D (samples × features × timesteps). + Accepts numpy arrays (uses FAISS) or cupy arrays (uses cuML). + y: Ignored, present for compatibility with sklearn's TransformerMixin. + + Returns: + Data with imputed values. Returns same array type as input (numpy or cupy). + """ + original_shape = X.shape + xp = array_api_compat.array_namespace(X) + + if X.ndim == 3 and self.temporal_mode == "per_variable": + n_obs, n_vars, n_t = X.shape + result = xp.empty_like(X, dtype=xp.float64) + for var_idx in range(n_vars): + X_slice = X[:, var_idx, :] + result[:, var_idx, :] = self._impute_2d(X_slice) + return result + + if X.ndim == 3: + n_obs, n_vars, n_t = X.shape + X = X.reshape(n_obs, n_vars * n_t) + + result = self._impute_2d(X) + + if len(original_shape) == 3: + result = result.reshape(original_shape) + + return result + + def _impute_2d(self, X: np.ndarray | cp.ndarray) -> np.ndarray | cp.ndarray: + """Impute missing values in a 2D array. + + Args: + X: 2D input array with potential missing values. + + Returns: + Array with imputed values, same type as input. + """ + xp = array_api_compat.array_namespace(X) + self.X_full = xp.asarray(X, dtype=xp.float64) if not xp.issubdtype(X.dtype, xp.floating) else X + + if xp.isnan(self.X_full).all(axis=0).any(): + raise ValueError("Features with only missing values cannot be handled.") + + # Prepare fallback values, used to prefill the query vectors nan´s + # or as an imputation fallback if we can't build an index + global_fallbacks_ = ( + xp.nanmean(self.X_full, axis=0) + if self.strategy in ["mean", "weighted"] + else xp.nanmedian(self.X_full, axis=0) + ) + + # We will need to impute all features having nan´s + feature_indices_to_impute = [i for i in range(self.X_full.shape[1]) if xp.isnan(self.X_full[:, i]).any()] + + # Now impute iteratively + while feature_indices_to_impute: + feature_indices_being_imputed, training_indices, training_data, index = self._fit_train_imputer( + feature_indices_to_impute + ) + + # Use fallback data if we can't build an index and iterate again + if index is None: + self._warn_fallback() + self.X_full[:, feature_indices_being_imputed] = global_fallbacks_[feature_indices_being_imputed] + continue + + # Extract the features from X that was used to train the index, and compute the sparseness matrix + x_imputed = self.X_full[:, training_indices] + x_imputed_missing_mask = xp.isnan(x_imputed) + + row_indices = xp.where(x_imputed_missing_mask.any(axis=1))[0] + n_rows = row_indices.shape[0] + + if n_rows == 0: + # No rows to impute, transfer back and continue + n_imputed = len(feature_indices_being_imputed) + self.X_full[:, feature_indices_being_imputed] = x_imputed[:, xp.arange(n_imputed)] + feature_indices_to_impute = [ + fi for fi in feature_indices_to_impute if fi not in feature_indices_being_imputed + ] + continue + + # Batch prefill: replace NaNs with fallback values for all query rows + queries = x_imputed[row_indices].copy() + query_missing_mask = x_imputed_missing_mask[row_indices] + fallbacks_for_training = global_fallbacks_[training_indices] + queries = xp.where(query_missing_mask, fallbacks_for_training, queries) + + # Batch search: single call for all queries + distances, indices = index.search(queries, self.n_neighbors) + + # Check for invalid indices (FAISS returns -1 for not found) + valid_mask = indices >= 0 + any_invalid = ~valid_mask.all() + + if any_invalid: + if not valid_mask.any(): + # No valid neighbors found at all, use fallback + self._warn_fallback() + n_imputed = len(feature_indices_being_imputed) + self.X_full[:, feature_indices_being_imputed] = x_imputed[:, xp.arange(n_imputed)] + feature_indices_to_impute = [ + fi for fi in feature_indices_to_impute if fi not in feature_indices_being_imputed + ] + continue + + if not self.warned_unsufficient_neighbors: + warnings.warn( + "Couldn't find all requested neighbors for some samples. This warning will be displayed only once.", + stacklevel=2, + ) + self.warned_unsufficient_neighbors = True + + # Retrieve all neighbors: shape (n_rows, n_neighbors, n_features) + # Clamp negative indices to 0 for gathering, then mask later + safe_indices = xp.where(valid_mask, indices, 0) + all_neighbors = training_data[safe_indices] + + # Compute imputed values based on strategy + if self.strategy == "weighted": + # Weights: (n_rows, n_neighbors, 1) + weights = 1.0 / (distances + 1e-10) + weights = xp.where(valid_mask, weights, 0.0) + weights = weights[:, :, xp.newaxis] + + # Weighted sum across neighbors + weighted_vals = all_neighbors * weights + weighted_sum = weighted_vals.sum(axis=1) + weight_sum = weights.sum(axis=1) + imputed_rows = weighted_sum / (weight_sum + 1e-10) + elif self.strategy == "mean": + # Mask invalid neighbors with NaN, then nanmean + all_neighbors = xp.where(valid_mask[:, :, xp.newaxis], all_neighbors, xp.nan) + imputed_rows = xp.nanmean(all_neighbors, axis=1) + else: # median + all_neighbors = xp.where(valid_mask[:, :, xp.newaxis], all_neighbors, xp.nan) + imputed_rows = xp.nanmedian(all_neighbors, axis=1) + + # Only update the missing positions + x_imputed[row_indices] = xp.where(query_missing_mask, imputed_rows, x_imputed[row_indices]) + + # Transfer back to X + # Features added by _fit_train_imputer for training purpose only are placed on the right + # so we can just select the features on the left + n_imputed = len(feature_indices_being_imputed) + self.X_full[:, feature_indices_being_imputed] = x_imputed[:, xp.arange(n_imputed)] + + # Remove the imputed features from the to-do list + feature_indices_to_impute = [ + fi for fi in feature_indices_to_impute if fi not in feature_indices_being_imputed + ] + + assert not xp.isnan(self.X_full).any() + + return self.X_full + + def _fit_train_imputer( + self, features_indices: Sequence[int] + ) -> tuple[list[int], list[int] | None, np.ndarray | cp.ndarray | None, NNIndex | None]: + """Build and train the nearest neighbor index for imputation. + + Args: + features_indices: Indices of features that need imputation. + + Returns: + Tuple of (features_to_impute, training_indices, training_data, index). + If index cannot be built, returns (features_indices, None, None, None). + """ + xp = array_api_compat.array_namespace(self.X_full) + features_indices_to_impute = list(features_indices) + + already_imputed_features_indices = [ + i for i in range(self.X_full.shape[1]) if not xp.isnan(self.X_full[:, i]).any() + ] + + while True: + train_indices = features_indices_to_impute + already_imputed_features_indices + x_subset = self.X_full[:, train_indices] + non_missing_rows = ~xp.isnan(x_subset).any(axis=1) + x_non_missing = x_subset[non_missing_rows] + + # Check if we have enough data + if x_non_missing.shape[0] >= self.X_full.shape[0] * self.min_data_ratio: + # We have our list of features to impute! + return ( + features_indices_to_impute, + train_indices, + x_non_missing, + self._train(x_non_missing), + ) + else: + # One feature left, meaning we can't build an index + if len(features_indices_to_impute) <= 1: + return list(features_indices), None, None, None + + # Remove the feature containing the largest amount of nan´s and iterate again + for value in self._features_indices_sorted_descending_on_nan(): + if value in features_indices_to_impute: + features_indices_to_impute.remove(value) + break + + def _features_indices_sorted_descending_on_nan(self) -> list[int]: + """Get feature indices sorted by number of NaN values in descending order. + + Returns: + List of feature indices with NaN values, sorted by NaN count (highest first). + """ + if self.features_nan is None: + xp = array_api_compat.array_namespace(self.X_full) + nan_counts = [(i, int(xp.isnan(self.X_full[:, i]).sum())) for i in range(self.X_full.shape[1])] + self.features_nan = [i for i, c in sorted(nan_counts, key=lambda x: x[1], reverse=True) if c > 0] + return self.features_nan + + def _train(self, x_train: np.ndarray | cp.ndarray) -> NNIndex: + """Train the nearest neighbor index. + + Automatically selects FAISS for numpy arrays or cuML for cupy arrays. + + Args: + x_train: Training data for building the index. + + Returns: + Trained nearest neighbor index (FAISS Index or CuMLIndexWrapper). + """ + if array_api_compat.is_cupy_array(x_train): + return self._train_cuml(x_train) + return self._train_faiss(x_train) + + def _train_faiss(self, x_train: np.ndarray) -> faiss.Index: + """Train a FAISS index for nearest neighbor search. + + Args: + x_train: Training data as numpy array. + + Returns: + Trained FAISS index. + """ + index = faiss.index_factory(x_train.shape[1], self.index_factory) + index.metric_type = faiss.METRIC_L2 if self.metric == "l2" else faiss.METRIC_INNER_PRODUCT + + if self.use_gpu: + res = faiss.StandardGpuResources() + index = faiss.index_cpu_to_gpu(res, 0, index) + + index.train(x_train) + index.add(x_train) + return index + + def _train_cuml(self, x_train: cp.ndarray) -> CuMLIndexWrapper: + """Train a cuML NearestNeighbors model for GPU-native nearest neighbor search. + + Args: + x_train: Training data as cupy array. + + Returns: + CuMLIndexWrapper with trained model. + + Raises: + ImportError: If cuML is not installed. + """ + if not HAS_CUML: + raise ImportError("cuML is required for GPU array imputation. Install with: pip install cuml-cu12") + metric = "euclidean" if self.metric == "l2" else "cosine" + model = cuMLNearestNeighbors(n_neighbors=self.n_neighbors, metric=metric) + model.fit(x_train) + return CuMLIndexWrapper(model, x_train) + + def _warn_fallback(self): + """Emit a warning when fallback values are used for imputation.""" + if not self.warned_fallback: + warnings.warn( + "Fallback data (as defined by passed strategy) were used. This warning will only be displayed once.", + stacklevel=2, + ) + self.warned_fallback = True + + +class FaissImputer(FastKNNImputer): + """Deprecated: Use FastKNNImputer instead.""" + + def __init__(self, *args, **kwargs): + warnings.warn( + "FaissImputer is deprecated, use FastKNNImputer instead.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) diff --git a/tests/cpu/test_faiss_imputation.py b/tests/cpu/test_faiss_imputation.py index 2c5c668..8922f1a 100644 --- a/tests/cpu/test_faiss_imputation.py +++ b/tests/cpu/test_faiss_imputation.py @@ -3,7 +3,7 @@ from sklearn.datasets import make_regression from tests.compare_predictions import _base_check_imputation -from fknni.faiss.faiss import FaissImputer +from fknni.knn.knn import FastKNNImputer @pytest.fixture @@ -21,7 +21,7 @@ def test_median_imputation(simple_test_df): """Tests if median imputation successfully fills all NaN values""" data, data_missing = simple_test_df data_original = data_missing.copy() - FaissImputer(n_neighbors=5, strategy="median").fit_transform(data_missing) + FastKNNImputer(n_neighbors=5, strategy="median").fit_transform(data_missing) _base_check_imputation(data_original, data_missing) @@ -29,7 +29,7 @@ def test_mean_imputation(simple_test_df): """Tests if mean imputation successfully fills all NaN values""" data, data_missing = simple_test_df data_original = data_missing.copy() - FaissImputer(n_neighbors=5, strategy="mean").fit_transform(data_missing) + FastKNNImputer(n_neighbors=5, strategy="mean").fit_transform(data_missing) _base_check_imputation(data_original, data_missing) @@ -37,7 +37,7 @@ def test_imputer_with_no_missing_values(simple_test_df): """Tests if imputer preserves data when no values are missing""" data, _ = simple_test_df data_original = data.copy() - FaissImputer(n_neighbors=5, strategy="median").fit_transform(data) + FastKNNImputer(n_neighbors=5, strategy="median").fit_transform(data) _base_check_imputation(data_original, data) @@ -47,7 +47,7 @@ def test_imputer_with_all_nan_column(rng): data_missing = data.copy() data_missing[:, 2] = np.nan with pytest.raises(ValueError): - FaissImputer(n_neighbors=5).fit_transform(data_missing) + FastKNNImputer(n_neighbors=5).fit_transform(data_missing) def test_imputer_with_all_nan_row(rng): @@ -57,7 +57,7 @@ def test_imputer_with_all_nan_row(rng): data_missing = data.copy() data_original = data.copy() - FaissImputer(n_neighbors=5).fit_transform(data_missing) + FastKNNImputer(n_neighbors=5).fit_transform(data_missing) _base_check_imputation(data_original, data_missing) @@ -68,8 +68,8 @@ def test_imputer_different_n_neighbors(simple_test_df): data_original = data_missing.copy() imputer_3 = data_missing.copy() imputer_7 = data_missing.copy() - FaissImputer(n_neighbors=3).fit_transform(imputer_3) - FaissImputer(n_neighbors=7).fit_transform(imputer_7) + FastKNNImputer(n_neighbors=3).fit_transform(imputer_3) + FastKNNImputer(n_neighbors=7).fit_transform(imputer_7) _base_check_imputation(data_original, imputer_3) _base_check_imputation(data_original, imputer_7) assert not np.array_equal(imputer_3, imputer_7) @@ -79,7 +79,7 @@ def test_regression_imputation(regression_dataset): """Tests if imputed data maintains predictive power in regression task""" X, X_missing, y = regression_dataset X_original = X_missing.copy() - FaissImputer(n_neighbors=5).fit_transform(X_missing) + FastKNNImputer(n_neighbors=5).fit_transform(X_missing) _base_check_imputation(X_original, X_missing) from sklearn.linear_model import LinearRegression @@ -95,15 +95,15 @@ def test_regression_imputation(regression_dataset): def test_invalid_strategy(): """Tests if imputer raises error for invalid strategy""" with pytest.raises(ValueError): - FaissImputer(strategy="invalid") + FastKNNImputer(strategy="invalid") def test_invalid_n_neighbors(): """Tests if imputer raises error for invalid n_neighbors values""" with pytest.raises(ValueError): - FaissImputer(n_neighbors=0) + FastKNNImputer(n_neighbors=0) with pytest.raises(ValueError): - FaissImputer(n_neighbors=-1) + FastKNNImputer(n_neighbors=-1) def test_no_full_rows(): @@ -119,7 +119,7 @@ def test_no_full_rows(): ] ) arr_original = arr.copy() - FaissImputer(n_neighbors=1).fit_transform(arr) + FastKNNImputer(n_neighbors=1).fit_transform(arr) _base_check_imputation(arr_original, arr) @@ -135,7 +135,7 @@ def test_3d_flatten_imputation(rng): data_missing[i, j, k] = np.nan data_original = data_missing.copy() - FaissImputer(n_neighbors=5, temporal_mode="flatten").fit_transform(data_missing) + FastKNNImputer(n_neighbors=5, temporal_mode="flatten").fit_transform(data_missing) _base_check_imputation(data_original, data_missing) assert data_missing.shape == (10, 5, 3) @@ -152,7 +152,7 @@ def test_3d_per_variable_imputation(rng): data_missing[i, j, k] = np.nan data_original = data_missing.copy() - FaissImputer(n_neighbors=5, temporal_mode="per_variable").fit_transform(data_missing) + FastKNNImputer(n_neighbors=5, temporal_mode="per_variable").fit_transform(data_missing) _base_check_imputation(data_original, data_missing) assert data_missing.shape == (10, 5, 3) @@ -171,8 +171,8 @@ def test_3d_modes_produce_different_results(rng): data_flatten = data_missing.copy() data_per_var = data_missing.copy() - FaissImputer(n_neighbors=5, temporal_mode="flatten").fit_transform(data_flatten) - FaissImputer(n_neighbors=5, temporal_mode="per_variable").fit_transform(data_per_var) + FastKNNImputer(n_neighbors=5, temporal_mode="flatten").fit_transform(data_flatten) + FastKNNImputer(n_neighbors=5, temporal_mode="per_variable").fit_transform(data_per_var) assert not np.array_equal(data_flatten, data_per_var) @@ -180,4 +180,4 @@ def test_3d_modes_produce_different_results(rng): def test_invalid_temporal_mode(): """Tests if imputer raises error for invalid temporal_mode""" with pytest.raises(ValueError): - FaissImputer(temporal_mode="invalid") + FastKNNImputer(temporal_mode="invalid") diff --git a/tests/gpu/test_gpu.py b/tests/gpu/test_gpu.py index 5ea13b4..8c41911 100644 --- a/tests/gpu/test_gpu.py +++ b/tests/gpu/test_gpu.py @@ -1,13 +1,55 @@ import pytest from tests.compare_predictions import _base_check_imputation -from fknni.faiss.faiss import FaissImputer +from fknni.knn.knn import FastKNNImputer + +cupy = pytest.importorskip("cupy") @pytest.mark.gpu -def test_median_imputation(simple_test_df): +def test_median_imputation_faiss(simple_test_df): """Tests if median imputation successfully fills all NaN values""" - data, data_missing = simple_test_df + _, data_missing = simple_test_df data_original = data_missing.copy() - FaissImputer(n_neighbors=5, strategy="median", use_gpu=True).fit_transform(data_missing) + FastKNNImputer(n_neighbors=5, strategy="median", use_gpu=True).fit_transform(data_missing) _base_check_imputation(data_original, data_missing) + + +@pytest.mark.gpu +def test_median_imputation_cupy(simple_test_df): + """Tests if median imputation with cupy arrays fills all NaN values and stays on GPU.""" + _, data_missing = simple_test_df + data_missing_cp = cupy.asarray(data_missing) + data_original_cp = data_missing_cp.copy() + + result = FastKNNImputer(n_neighbors=5, strategy="median").fit_transform(data_missing_cp) + + assert isinstance(result, cupy.ndarray), f"Expected cupy array, got {type(result)}" + assert not cupy.isnan(result).any(), "NaNs remain after imputation" + _base_check_imputation(data_original_cp.get(), result.get()) + + +@pytest.mark.gpu +@pytest.mark.parametrize("strategy", ["mean", "median", "weighted"]) +def test_cupy_strategies(simple_test_df, strategy): + """Tests all imputation strategies with cupy arrays.""" + _, data_missing = simple_test_df + data_missing_cp = cupy.asarray(data_missing) + + result = FastKNNImputer(n_neighbors=5, strategy=strategy).fit_transform(data_missing_cp) + + assert isinstance(result, cupy.ndarray) + assert not cupy.isnan(result).any() + + +@pytest.mark.gpu +def test_cupy_numpy_produce_same_results(simple_test_df): + """Tests that cupy and numpy paths produce equivalent results.""" + _, data_missing = simple_test_df + data_missing_np = data_missing.copy() + data_missing_cp = cupy.asarray(data_missing.copy()) + + result_np = FastKNNImputer(n_neighbors=5, strategy="mean").fit_transform(data_missing_np) + result_cp = FastKNNImputer(n_neighbors=5, strategy="mean").fit_transform(data_missing_cp) + + cupy.testing.assert_allclose(result_cp, cupy.asarray(result_np), rtol=1e-5)