"""Kernel-based operations for candidate selection."""
import logging
import numba as nb
import numpy as np
from alphadia.raw_data import DiaDataJIT
from alphadia.utils import USE_NUMBA_CACHING
logger = logging.getLogger()
[docs]
@nb.njit(cache=USE_NUMBA_CACHING)
def multivariate_normal(x: np.ndarray, mu: np.ndarray, sigma: np.ndarray):
"""Multivariate normal distribution, probability density function
Most likely an absolutely inefficient implementation of the multivariate normal distribution.
Is only used for creating the gaussian kernel and will only be used a few times for small kernels.
Parameters
----------
x : np.ndarray
`(N, D,)`
mu : np.ndarray
`(1, D,)`
sigma : np.ndarray
`(D, D,)`
Returns
-------
np.ndarray, float32
array of shape `(N,)` with the density at each point
"""
k = mu.shape[0]
dx = x - mu
# implementation is not very efficient for large N as the N x N matrix will created only for storing the diagonal
a = np.exp(-1 / 2 * np.diag(dx @ np.linalg.inv(sigma) @ dx.T))
b = (np.pi * 2) ** (-k / 2) * np.linalg.det(sigma) ** (-1 / 2)
return a * b
[docs]
class GaussianKernel:
[docs]
def __init__(
self,
dia_data: DiaDataJIT,
fwhm_rt: float = 10.0,
sigma_scale_rt: float = 1.0,
fwhm_mobility: float = 0.03,
sigma_scale_mobility: float = 1.0,
kernel_height: int = 30,
kernel_width: int = 30,
):
"""Create a two-dimensional gaussian filter kernel for the RT and mobility dimensions of a DIA dataset.
First, the observed standard deviation is scaled by a linear factor. Second, the standard deviation is scaled by the resolution of the respective dimension.
This results in sigma_scale to be independent of the resolution of the data and FWHM of the peaks.
Parameters
----------
dia_data : DiaDataJIT
dia_data jit object.
fwhm_rt : float
Full width at half maximum in RT dimension of the peaks in the spectrum.
sigma_scale_rt : float
Scaling factor for the standard deviation in RT dimension.
fwhm_mobility : float
Full width at half maximum in mobility dimension of the peaks in the spectrum.
sigma_scale_mobility : float
Scaling factor for the standard deviation in mobility dimension.
kernel_size : int
Kernel shape in pixel. The kernel will be a square of size (kernel_size, kernel_size).
Should be even and will be rounded up to the next even number if necessary.
"""
self.dia_data = dia_data
self.fwhm_rt = fwhm_rt
self.sigma_scale_rt = sigma_scale_rt
self.fwhm_mobility = fwhm_mobility
self.sigma_scale_mobility = sigma_scale_mobility
self.kernel_height = int(
np.ceil(kernel_height / 2) * 2
) # make sure kernel size is even
self.kernel_width = int(
np.ceil(kernel_width / 2) * 2
) # make sure kernel size is even
[docs]
def determine_rt_sigma(self, cycle_length_seconds: float):
"""Determine the standard deviation of the gaussian kernel in RT dimension.
The standard deviation will be sclaed to the resolution of the raw data.
Parameters
----------
cycle_length_seconds : float
Cycle length of the duty cycle in seconds.
Returns
-------
float
Standard deviation of the gaussian kernel in RT dimension scaled to the resolution of the raw data.
"""
# a normal distribution has a FWHM of 2.3548 sigma
sigma = self.fwhm_rt / 2.3548
sigma_scaled = sigma * self.sigma_scale_rt / cycle_length_seconds
return sigma_scaled
[docs]
def determine_mobility_sigma(self, mobility_resolution: float):
"""Determine the standard deviation of the gaussian kernel in mobility dimension.
The standard deviation will be sclaed to the resolution of the raw data.
Parameters
----------
mobility_resolution : float
Resolution of the mobility dimension in 1/K_0.
Returns
-------
float
Standard deviation of the gaussian kernel in mobility dimension scaled to the resolution of the raw data.
"""
if not self.dia_data.has_mobility:
return 1.0
# a normal distribution has a FWHM of 2.3548 sigma
sigma = self.fwhm_mobility / 2.3548
sigma_scaled = sigma * self.sigma_scale_mobility / mobility_resolution
return sigma_scaled
[docs]
def get_dense_matrix(self, verbose: bool = True) -> np.ndarray:
"""Calculate the gaussian kernel for the given data set and parameters.
Parameters
----------
verbose : bool
If True, log information about the data set and the kernel.
Returns
-------
np.ndarray
Two-dimensional gaussian kernel.
"""
rt_datapoints = self.dia_data.cycle.shape[1]
rt_resolution = np.mean(np.diff(self.dia_data.rt_values[::rt_datapoints]))
mobility_datapoints = self.dia_data.cycle.shape[2]
mobility_resolution = np.mean(np.diff(self.dia_data.mobility_values[::-1]))
if verbose:
logger.info(
f"Duty cycle consists of {rt_datapoints} frames, {rt_resolution:.2f} seconds cycle time"
)
logger.info(
f"Duty cycle consists of {mobility_datapoints} scans, {mobility_resolution:.5f} 1/K_0 resolution"
)
rt_sigma = self.determine_rt_sigma(rt_resolution)
mobility_sigma = self.determine_mobility_sigma(mobility_resolution)
if verbose:
logger.info(
f"FWHM in RT is {self.fwhm_rt:.2f} seconds, sigma is {rt_sigma:.2f}"
)
logger.info(
f"FWHM in mobility is {self.fwhm_mobility:.3f} 1/K_0, sigma is {mobility_sigma:.2f}"
)
return self.gaussian_kernel_2d(
self.kernel_width, self.kernel_height, rt_sigma, mobility_sigma
).astype(np.float32)
[docs]
@staticmethod
def gaussian_kernel_2d(size_x: int, size_y: int, sigma_x: float, sigma_y: float):
"""Create a 2D gaussian kernel with a given size and standard deviation.
Parameters
----------
size : int
Width and height of the kernel matrix.
sigma_x : float
Standard deviation of the gaussian kernel in x direction. This will correspond to the RT dimension.
sigma_y : float
Standard deviation of the gaussian kernel in y direction. This will correspond to the mobility dimension.
Returns
-------
weights : np.ndarray, dtype=np.float32
2D gaussian kernel matrix of shape (size, size).
"""
# create indicies [-2, -1, 0, 1 ...]
x, y = np.meshgrid(
np.arange(-size_x // 2, size_x // 2), np.arange(-size_y // 2, size_y // 2)
)
xy = np.column_stack((x.flatten(), y.flatten())).astype("float32")
# mean is always zero
mu = np.array([[0.0, 0.0]])
# sigma is set with no covariance
sigma_mat = np.array([[sigma_x, 0.0], [0.0, sigma_y]])
weights = multivariate_normal(xy, mu, sigma_mat)
return weights.reshape(size_y, size_x).astype(np.float32)