"""Models for calibration."""
import logging
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
[docs]
def construct_polynomial_regression(
degree: int = 2, *, include_bias: bool = False
) -> Pipeline:
"""Create a polynomial regression model."""
return Pipeline(
[
("poly", PolynomialFeatures(degree=degree, include_bias=include_bias)),
("linear", LinearRegression()),
]
)
[docs]
class LOESSRegression(BaseEstimator, RegressorMixin):
"""scikit-learn estimator which implements a LOESS style local polynomial regression. The number of basis functions or kernels can be explicitly defined which allows for faster and cheaper training and inference.
Parameters
----------
n_kernels : int
default = 6, The number of local polynomial functions used to approximate the data. The location and extend of the kernels will be distributed to contain an equal number of datapoints in the training set.
kernel_size : float
default = 2, A factor increasing the kernel size to overlap with the neighboring kernel.
polynomial_degree : int
default = 2, Degree of the polynomial functions used for the local approximation.
uniform : bool
default = False, If True, the kernels are distributed uniformly over the input space.
If False, the kernels are distributed to contain an equal number of datapoints.
For every kernel at least polynomial_degree + 1 datapoints are required.
"""
[docs]
def __init__(
self,
n_kernels: int = 6,
kernel_size: float = 2.0,
polynomial_degree: int = 2,
*,
uniform: bool = False,
):
"""Initialize the LOESS regression model."""
self.n_kernels = n_kernels
self.kernel_size = kernel_size
self.polynomial_degree = polynomial_degree
self.uniform = uniform
def _more_tags(self) -> dict[str, list[str]]:
return {"X_types": ["1darray"]}
def _intervals_uniform(self, x: np.ndarray) -> np.ndarray:
"""Determine the intervals of the kernels.
The kernels are distributed uniformly over the input space.
Parameters
----------
x : np.ndarray
float, of shape (n_datapoints)
Returns
-------
np.ndarray, float
of shape (n_kernels, 2)
"""
minval = x[0]
maxval = x[-1]
interval_size = (maxval - minval) / self.n_kernels
start = np.arange(minval, maxval, interval_size) - (interval_size / 2) * (
self.kernel_size - 1
)
stop = start + interval_size + (interval_size) * (self.kernel_size - 1)
return np.column_stack([start, stop])
def _kernel_indices_uniform(self, x: np.ndarray) -> np.ndarray:
"""Determine the indices of the datapoints belonging to each kernel.
The kernels are distributed uniformly over the input space.
Parameters
----------
x : np.ndarray
float, of shape (n_datapoints)
Returns
-------
np.ndarray, int
of shape (n_kernels, 2)
"""
indices = np.searchsorted(x, self._intervals_uniform(x))
return indices.astype(int)
def _kernel_indices_density(self, x: np.ndarray) -> np.ndarray:
"""Determine the indices of the datapoints belonging to each kernel.
The kernels are distributed to contain an equal number of datapoints.
Parameters
----------
x : np.ndarray
float, of shape (n_datapoints)
Returns
-------
np.ndarray, int
of shape (n_kernels, 2)
"""
num_datapoints = len(x)
interval_size = num_datapoints // self.n_kernels
start = np.arange(0, self.n_kernels) * interval_size
end = start + interval_size
interval_extension = (interval_size * self.kernel_size - interval_size) // 2
start = start - interval_extension
start = np.maximum(0, start)
end = end + interval_extension
end = np.minimum(num_datapoints, end)
return np.column_stack([start, end]).astype(int)
[docs]
def fit(self, x: np.ndarray, y: np.ndarray) -> "LOESSRegression": # noqa: C901, PLR0912 Too complex, too many branches
"""Fit the model passed on provided training data.
Parameters
----------
x : np.ndarray
float, of shape (n_samples,) or (n_samples, 1), Training data. Note that only a single feature is supported at the moment.
y : np.ndarray, float
of shape (n_samples,) or (n_samples, 1) Target values.
Returns
-------
self: object
Returns the fitted estimator.
"""
# As required by scikit-learn estimator guidelines
self.n_features_in_ = 1
# === start === sanity checks ===
# Does not yet work with more than one input dimension
# axis-wise scaling and improved distance function need to be implemented
if len(x.shape) > 1 and x.shape[1] > 1:
raise ValueError(
"Input arrays with more than one feature not yet supported. Please provide a matrix of shape (n_datapoints, 1) or (n_datapoints,)"
)
# at least two datapoints required
if len(x.flat) < 2: # noqa: PLR2004
raise ValueError("At least two datapoints required for fitting.")
# sanity check for number of datapoints, reduce n_kernels if needed
degrees_freedom = (1 + self.polynomial_degree) * self.n_kernels
if len(x.flat) < degrees_freedom:
logging.info(
f"Curve fitting with {self.n_kernels} kernels and polynomials of {self.polynomial_degree} degree requires at least {degrees_freedom} datapoints."
)
self.n_kernels = np.max([len(x.flat) // (1 + self.polynomial_degree), 1])
logging.info(
f"Number of kernels will be reduced to {self.n_kernels} kernels."
)
# sanity check for number of datapoints, reduce degree of polynomial if necessary
degrees_freedom = (1 + self.polynomial_degree) * self.n_kernels
if len(x.flat) < degrees_freedom:
self.polynomial_degree = len(x.flat) - 1
logging.info(
f"Polynomial degree will be reduced to {self.polynomial_degree}."
)
# reshape both arrays to column arrays
if len(x.shape) == 1:
x = x[..., np.newaxis]
if len(y.shape) == 1:
y = y[..., np.newaxis]
# remove outliers by using only the 0.5 to 99.5 percentile
percentiles: np.ndarray = np.percentile(x, [0.1, 99.9]) # type: ignore[assignment]
mask = (percentiles[0] < x[:, 0]) & (x[:, 0] < percentiles[1])
x = x[mask, ...]
y = y[mask, ...]
# === end === sanity checks ===
# create flat version of the array for
idx_sorted = np.argsort(x.flat)
x_sorted = x.flat[idx_sorted]
# stores if uniform training is still possible this round
uniform = self.uniform
# === start === kernel indices ===
# get kernel indices matrix of shape (n_kernels, 2)
if uniform:
kernel_indices = self._kernel_indices_uniform(x_sorted)
# check number of datapoints per kernel
if np.any(np.diff(kernel_indices) < (1 + self.polynomial_degree)):
logging.info(
"Too few datapoints per kernel. Uniform kernels will be replaced by density kernels."
)
uniform = False
# a second if statement is used instead of an if-else to account for failed uniform training
if not uniform:
kernel_indices = self._kernel_indices_density(x_sorted)
# === end === kernel indices ===
# === start === calculate kernel dimensions ===
if uniform:
start_stop = self._intervals_uniform(x_sorted)
self.scale_mean = np.mean(start_stop, axis=1)
self.scale_max = np.max(start_stop, axis=1) - self.scale_mean
else:
# scale max and scale mean will then be used for calculating the weighht matrix
self.scale_mean = np.zeros(self.n_kernels)
self.scale_max = np.zeros(self.n_kernels)
# scale mean and max are calculated and contain the scaling before applying the kernel
for i, area in enumerate(kernel_indices):
area_slice = slice(*area)
self.scale_mean[i] = x_sorted[area_slice].mean()
self.scale_max[i] = np.max(
np.abs(x_sorted[area_slice] - self.scale_mean[i])
)
# === end === calculate kernel dimensions ===
# from here on, the original column arrays are used
w = self._get_weight_matrix(x)
# build design matrix
polynomial_transform = PolynomialFeatures(self.polynomial_degree)
x_design = polynomial_transform.fit_transform(x)
number_of_dimensions = len(x_design[0])
self.beta = np.zeros((number_of_dimensions, self.n_kernels))
for i, weights in enumerate(w.T):
loadings = np.linalg.inv(x_design.T * weights @ x_design) @ x_design.T
self.beta[:, i] = np.ravel((loadings * weights) @ y)
return self
[docs]
def predict(self, x: np.ndarray) -> np.ndarray:
"""Predict using the LOESS model.
Parameters
----------
x : np.ndarray
float, of shape (n_samples,) or (n_samples, 1) Feature data. Note that only a single feature is supported at the moment.
Returns
-------
y : np.ndarray, float
of shape (n_samples,)
Target values.
"""
if len(x.shape) == 1:
x = x[..., np.newaxis]
w = self._get_weight_matrix(x)
polynomial_transform = PolynomialFeatures(self.polynomial_degree)
x_design = polynomial_transform.fit_transform(x)
return np.sum(x_design @ self.beta * w, axis=1)
def _get_weight_matrix(self, x: np.ndarray) -> np.ndarray:
"""Applies the fitted scaling parameter and the kernel to yield a weight matrix.
The weight matrix is calculated based on the self.scale_mean and self.scale_max parameters which need to be calculated before calling this function.
They define the center and extend of the tricubic kernels. The first and last column are one-padded at the start and beginning to allow for extrapolation.
Parameters
----------
x: np.ndarray
Numpy array of shape (n_datapoints, 1) which should be transformed to weights.
Returns
-------
np.ndarray
Weight matrix with the shape (n_datapoints, n_kernels).
"""
w = np.tile(x, (1, self.n_kernels))
w = w - self.scale_mean
w = w / self.scale_max
# apply weighting kernel
w = _apply_kernel(w)
return w / np.sum(w, axis=1, keepdims=True)
def _apply_kernel(w: np.ndarray) -> np.ndarray | None:
"""Applies the tricubic kernel."""
num_cols = w.shape[1]
if num_cols == 1:
return np.ones(w.shape)
if num_cols == 2: # noqa: PLR2004
w[:, 0] = _left_open_tricubic(w[:, 0])
w[:, 1] = _right_open_tricubic(w[:, 1])
return w
if num_cols > 2: # noqa: PLR2004
w[:, 0] = _left_open_tricubic(w[:, 0])
w[:, 1:-1] = _tricubic(w[:, 1:-1])
w[:, -1] = _right_open_tricubic(w[:, -1])
return w
return None
def _tricubic(x: np.ndarray, epsilon: float = 1e-6) -> np.ndarray:
"""Tricubic weight kernel."""
mask = np.abs(x) <= 1
return mask * (np.power(1 - np.power(np.abs(x), 3), 3) + epsilon)
def _left_open_tricubic(x: np.ndarray) -> np.ndarray:
"""Tricubic weight kernel which weights assigns 1 to values x < 0."""
y = _tricubic(x)
y[x < 0] = 1
return y
def _right_open_tricubic(x: np.ndarray) -> np.ndarray:
"""Tricubic weight kernel which weights assigns 1 to values x > 0."""
y = _tricubic(x)
y[x > 0] = 1
return y
CalibrationModel = LOESSRegression | LinearRegression | Pipeline