Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor incremental spmd algos #2248

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 3 additions & 47 deletions onedal/spmd/basic_statistics/incremental_basic_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,14 @@
# limitations under the License.
# ==============================================================================

from daal4py.sklearn._utils import get_dtype

from ...basic_statistics import (
IncrementalBasicStatistics as base_IncrementalBasicStatistics,
)
from ...datatypes import to_table
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalBasicStatistics(BaseEstimatorSPMD, base_IncrementalBasicStatistics):
def _reset(self):
self._need_to_finalize = False
self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend(
"basic_statistics", None, "partial_compute_result"
)

@support_input_format()
def partial_fit(self, X, weights=None, queue=None):
"""
Computes partial data for basic statistics
from data batch X and saves it to `_partial_result`.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

queue : dpctl.SyclQueue
If not None, use this queue for computations.

Returns
-------
self : object
Returns the instance itself.
"""
self._queue = queue
policy = super(base_IncrementalBasicStatistics, self)._get_policy(queue, X)
X_table, weights_table = to_table(X, weights, queue=queue)

if not hasattr(self, "_onedal_params"):
self._onedal_params = self._get_onedal_params(False, dtype=X_table.dtype)

self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend(
"basic_statistics",
None,
"partial_compute",
policy,
self._onedal_params,
self._partial_result,
X_table,
weights_table,
)

self._need_to_finalize = True
return self
return super().partial_fit(X, weights=weights, queue=queue)
60 changes: 3 additions & 57 deletions onedal/spmd/covariance/incremental_covariance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,70 +14,16 @@
# limitations under the License.
# ==============================================================================

import numpy as np

from daal4py.sklearn._utils import get_dtype

from ...covariance import (
IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance,
)
from ...datatypes import to_table
from ...utils import _check_array
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalEmpiricalCovariance(
BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance
):
def _reset(self):
self._need_to_finalize = False
self._partial_result = super(
base_IncrementalEmpiricalCovariance, self
)._get_backend("covariance", None, "partial_compute_result")

@support_input_format()
def partial_fit(self, X, y=None, queue=None):
"""
Computes partial data for the covariance matrix
from data batch X and saves it to `_partial_result`.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

y : Ignored
Not used, present for API consistency by convention.

queue : dpctl.SyclQueue
If not None, use this queue for computations.

Returns
-------
self : object
Returns the instance itself.
"""
X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True)

self._queue = queue

policy = super(base_IncrementalEmpiricalCovariance, self)._get_policy(queue, X)

X_table = to_table(X, queue=queue)

if not hasattr(self, "_dtype"):
self._dtype = X_table.dtype

params = self._get_onedal_params(self._dtype)
self._partial_result = super(
base_IncrementalEmpiricalCovariance, self
)._get_backend(
"covariance",
None,
"partial_compute",
policy,
params,
self._partial_result,
X_table,
)
self._need_to_finalize = True
return super().partial_fit(X, queue=queue)
105 changes: 3 additions & 102 deletions onedal/spmd/decomposition/incremental_pca.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,111 +14,12 @@
# limitations under the License.
# ==============================================================================

from daal4py.sklearn._utils import get_dtype

from ...datatypes import from_table, to_table
from ...decomposition import IncrementalPCA as base_IncrementalPCA
from ...utils import _check_array
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalPCA(BaseEstimatorSPMD, base_IncrementalPCA):
"""
Distributed incremental estimator for PCA based on oneDAL implementation.
Allows for distributed PCA computation if data is split into batches.

API is the same as for `onedal.decomposition.IncrementalPCA`
"""

def _reset(self):
self._need_to_finalize = False
self._partial_result = super(base_IncrementalPCA, self)._get_backend(
"decomposition", "dim_reduction", "partial_train_result"
)
if hasattr(self, "components_"):
del self.components_

@support_input_format()
def partial_fit(self, X, y=None, queue=None):
"""Incremental fit with X. All of X is processed as a single batch.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.

y : Ignored
Not used, present for API consistency by convention.

Returns
-------
self : object
Returns the instance itself.
"""
X = _check_array(X)
n_samples, n_features = X.shape

first_pass = not hasattr(self, "components_")
if first_pass:
self.components_ = None
self.n_samples_seen_ = n_samples
self.n_features_in_ = n_features
else:
self.n_samples_seen_ += n_samples

if self.n_components is None:
if self.components_ is None:
self.n_components_ = min(n_samples, n_features)
else:
self.n_components_ = self.components_.shape[0]
else:
self.n_components_ = self.n_components

self._queue = queue

policy = super(base_IncrementalPCA, self)._get_policy(queue, X)
X_table = to_table(X, queue=queue)

if not hasattr(self, "_dtype"):
self._dtype = X_table.dtype
self._params = self._get_onedal_params(X_table)

self._partial_result = super(base_IncrementalPCA, self)._get_backend(
"decomposition",
"dim_reduction",
"partial_train",
policy,
self._params,
self._partial_result,
X_table,
)
self._need_to_finalize = True
return self

def _create_model(self):
m = super(base_IncrementalPCA, self)._get_backend(
"decomposition", "dim_reduction", "model"
)
m.eigenvectors = to_table(self.components_)
m.means = to_table(self.mean_)
if self.whiten:
m.eigenvalues = to_table(self.explained_variance_)
self._onedal_model = m
return m

def predict(self, X, queue=None):
policy = super(base_IncrementalPCA, self)._get_policy(queue, X)
model = self._create_model()
X = to_table(X, queue=queue)
params = self._get_onedal_params(X, stage="predict")

result = super(base_IncrementalPCA, self)._get_backend(
"decomposition",
"dim_reduction",
"infer",
policy,
params,
model,
X,
)
return from_table(result.transformed_data)
return super().partial_fit(X, queue=queue)
76 changes: 3 additions & 73 deletions onedal/spmd/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,84 +14,14 @@
# limitations under the License.
# ==============================================================================

import numpy as np

from daal4py.sklearn._utils import get_dtype

from ...common.hyperparameters import get_hyperparameters
from ...datatypes import to_table
from ...linear_model import (
IncrementalLinearRegression as base_IncrementalLinearRegression,
)
from ...utils import _check_X_y, _num_features
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalLinearRegression(BaseEstimatorSPMD, base_IncrementalLinearRegression):
"""
Distributed incremental Linear Regression oneDAL implementation.

API is the same as for `onedal.linear_model.IncrementalLinearRegression`.
"""

def _reset(self):
self._partial_result = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression", "partial_train_result"
)

@support_input_format()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero copy support will move away from support input format, as we will fully generate #2206 #2207 and #2189

def partial_fit(self, X, y, queue=None):
"""
Computes partial data for linear regression
from data batch X and saves it to `_partial_result`.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

y: array-like of shape (n_samples,) or (n_samples, n_targets) in
case of multiple targets
Responses for training data.

queue : dpctl.SyclQueue
If not None, use this queue for computations.
Returns
-------
self : object
Returns the instance itself.
"""
module = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression"
)

self._queue = queue
policy = super(base_IncrementalLinearRegression, self)._get_policy(queue, X)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was done for the wonkiness of the get_backend + when the spmd policy is used and when the dpc policy is used.


X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

X_table, y_table = to_table(X, y, queue=queue)

if not hasattr(self, "_dtype"):
self._dtype = X_table.dtype
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y, dtype=self._dtype)

self.n_features_in_ = _num_features(X, fallback_1d=True)

hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
self._partial_result = module.partial_train(
policy,
self._params,
hparams.backend,
self._partial_result,
X_table,
y_table,
)
else:
self._partial_result = module.partial_train(
policy, self._params, self._partial_result, X_table, y_table
)
return super().partial_fit(X, y, queue=queue)
Loading