"""Module to describe projection integrals of radial potential parametrizations."""
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING
import numpy as np
from numba import jit
from scipy import integrate
from scipy.interpolate import interp1d
from scipy.optimize import brentq, fsolve
from scipy.special import erf
from abtem.core.backend import cp, get_array_module
from abtem.core.backend import cupyx
from abtem.core.backend import device_name_from_array_module
from abtem.core.fft import fft2, ifft2
from abtem.core.grid import disc_meshgrid
from abtem.core.grid import polar_spatial_frequencies
from abtem.core.grid import spatial_frequencies
from abtem.core.utils import EqualityMixin, CopyMixin
from abtem.parametrizations import validate_parametrization
if cp is not None:
from abtem.core._cuda import (
interpolate_radial_functions as interpolate_radial_functions_cuda,
)
else:
interpolate_radial_functions_cuda = None
if TYPE_CHECKING:
from abtem.parametrizations import Parametrization
[docs]class ProjectionIntegrator:
"""Base class for projection integrator object used for calculating projection integrals of radial potentials."""
[docs] def integrate_on_grid(
self,
positions: np.ndarray,
a: np.ndarray,
b: np.ndarray,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
) -> np.ndarray:
"""
Integrate radial potential between two limits at the given 2D positions on a grid. The integration limits are
only used when the integration method is finite.
Parameters
----------
positions : np.ndarray
2D array of xy-positions of the centers of each radial function [Å].
a : np.ndarray
Lower integration limit of the projection integrals along z for each position [Å]. The limit is given
relative to the center of the radial function.
b : np.ndarray
Upper integration limit of the projection integrals along z for each position [Å]. The limit is given
relative to the center of the radial function.
gpts : two int
Number of grid points in `x` and `y` describing each slice of the potential.
sampling : two float
Sampling of the potential in `x` and `y` [1 / Å].
device : str, optional
The device used for calculating the potential, 'cpu' or 'gpu'. The default is determined by the user
configuration file.
"""
pass
[docs]class ProjectionIntegratorPlan(EqualityMixin, CopyMixin, metaclass=ABCMeta):
"""
The ProjectionIntegratorPlan facilitates the creation of :class:`.ProjectionIntegrator` objects using the ``.build``
method given a grid and a chemical symbol.
Parameters
----------
periodic : bool
True indicates that the projection integrals are periodic perpendicular to the projection direction.
finite : bool
True indicates that the projection integrals are finite along the projection direction.
"""
[docs] def __init__(self, periodic: bool, finite: bool):
self._periodic = periodic
self._finite = finite
@property
def periodic(self) -> bool:
"""True indicates that the created projection integrators are implemented only for periodic potentials."""
return self._periodic
@property
def finite(self) -> bool:
"""True indicates that the created projection integrators are implemented only for infinite potential
projections."""
return self._finite
[docs] @abstractmethod
def build(
self,
symbol: str,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str,
) -> ProjectionIntegrator:
"""
Build projection integrator for given chemical symbol, grid and device.
Parameters
----------
symbol : str
Chemical symbol to build the projection integrator for.
gpts : two int
Number of grid points in `x` and `y` describing each slice of the potential.
sampling : two float
Sampling of the potential in `x` and `y` [1 / Å].
device : str, optional
The device used for calculating the potential, 'cpu' or 'gpu'. The default is determined by the user
configuration file.
Returns
-------
projection_integrator : ProjectionIntegrator
The projection integrator for the specified chemical symbol.
"""
pass
[docs] @abstractmethod
def cutoff(self, symbol: str) -> float:
"""Radial cutoff of the potential for the given chemical symbol."""
pass
[docs]class GaussianScatteringFactors(ProjectionIntegrator):
[docs] def __init__(
self,
gaussian_scattering_factors,
error_function_scales,
correction_scattering_factors,
):
self._gaussian_scattering_factors = gaussian_scattering_factors
self._error_function_scales = error_function_scales
self._correction_scattering_factors = correction_scattering_factors
def _integrate_gaussian_scattering_factors(self, positions, a, b, sampling, device):
xp = get_array_module(device)
a = a - positions[:, 2]
b = b - positions[:, 2]
positions = (positions[:, :2] / sampling).astype(xp.float32)
weights = (
np.abs(
erf(self._error_function_scales[:, None] * b[None])
- erf(self._error_function_scales[:, None] * a[None])
)
/ 2
)
array = xp.zeros(
self._gaussian_scattering_factors.shape[-2:], dtype=xp.complex64
)
for i in range(5):
temp = xp.zeros_like(array, dtype=xp.complex64)
superpose_deltas(positions, temp, weights=weights[i])
array += fft2(temp, overwrite_x=False) * self._gaussian_scattering_factors[
i
].astype(xp.complex64)
return array
def _integrate_correction_factors(self, positions, a, b, sampling, device):
xp = get_array_module(device)
temp = xp.zeros(
self._gaussian_scattering_factors.shape[-2:], dtype=xp.complex64
)
positions = positions[(positions[:, 2] >= a) * (positions[:, 2] < b)]
positions = (positions[:, :2] / sampling).astype(xp.float32)
superpose_deltas(positions, temp)
return fft2(
temp, overwrite_x=False
) * self._correction_scattering_factors.astype(xp.complex64)
[docs] def integrate_on_grid(
self,
positions: np.ndarray,
a: np.ndarray,
b: np.ndarray,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
fourier_space: bool = False,
) -> np.ndarray:
shape = self._gaussian_scattering_factors.shape[-2:]
assert gpts == shape
array = self._integrate_gaussian_scattering_factors(
positions, a, b, sampling, device
)
if self._correction_scattering_factors is not None:
array += self._integrate_correction_factors(
positions, a, b, sampling, device
)
if fourier_space:
return array
else:
return ifft2(array / sinc(shape, sampling, device)).real
[docs]class GaussianProjectionIntegrals(ProjectionIntegratorPlan):
"""
Parameters
----------
gaussian_parametrization : str or Parametrization, optional
The Gaussian radial potential parametrization to integrate. Must be parametrization described by a superposition
of Gaussians. Default is the Peng parametrization.
correction_parametrization : str or Parametrization, optional
The correction radial potential parametrization to integrate. Used for correcting the dependence of the
potential close to the nuclear core. Default is the Lobato parametrization.
cutoff_tolerance : float, optional
The error tolerance used for deciding the radial cutoff distance of the potential [eV / e]. Default is 1e-3.
"""
[docs] def __init__(
self,
gaussian_parametrization: str | Parametrization = "peng",
correction_parametrization: str | Parametrization = "lobato",
cutoff_tolerance: float = 1e-3,
):
self._gaussian_parametrization = validate_parametrization(
gaussian_parametrization
)
if correction_parametrization is not None:
self._correction_parametrization = validate_parametrization(
correction_parametrization
)
else:
self._correction_parametrization = correction_parametrization
self._cutoff_tolerance = cutoff_tolerance
super().__init__(periodic=True, finite=True)
@property
def cutoff_tolerance(self):
"""The error tolerance used for deciding the radial cutoff distance of the potential [eV / e]."""
return self._cutoff_tolerance
@property
def gaussian_parametrization(self):
"""The error tolerance used for deciding the radial cutoff distance of the potential [eV / e]."""
return self._gaussian_parametrization
@property
def correction_parametrization(self):
return self._correction_parametrization
[docs] def cutoff(self, symbol: str) -> float:
return cutoff(
self.gaussian_parametrization.potential(symbol),
self.cutoff_tolerance,
a=1e-3,
b=1e3,
) # noqa
[docs] def build(
self,
symbol: str,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
):
xp = get_array_module(device)
k, _ = polar_spatial_frequencies(gpts, sampling, xp=xp)
parameters = xp.array(
self.gaussian_parametrization.scaled_parameters(
symbol, "projected_scattering_factor"
)
)
gaussian_scattering_factors = parameters[0, :, None, None] * np.exp(
-parameters[1, :, None, None] * k[None] ** 2.0
)
if self.correction_parametrization:
infinite_gaussian = (
self.gaussian_parametrization.projected_scattering_factor(symbol)
)
infinite_correction = (
self.correction_parametrization.projected_scattering_factor(symbol)
)
correction_scattering_factors = infinite_correction(
k**2
) - infinite_gaussian(k**2)
else:
correction_scattering_factors = None
error_function_scales = np.pi / np.sqrt(parameters[1])
return GaussianScatteringFactors(
gaussian_scattering_factors,
error_function_scales,
correction_scattering_factors,
)
[docs]def sinc(
gpts: tuple[int, int], sampling: tuple[float, float], device: str = "cpu"
) -> np.ndarray:
"""
Returns an array representing a 2D sinc function centered at [0, 0]. The result is used to
compensate for the finite size of single pixels used for representing delta functions.
Parameters
----------
gpts : two int
Number of grid points in the first and second dimension to evaluate the sinc over.
sampling : two float
Size of the pixels of the grid determining the scale of the sinc.
device : str
The array is created on this device ('cpu' or 'gpu').
"""
xp = get_array_module(device)
kx, ky = spatial_frequencies(gpts, sampling, return_grid=False, xp=xp)
k = xp.sqrt((kx[:, None] * sampling[0]) ** 2 + (ky[None] * sampling[1]) ** 2)
dk2 = sampling[0] * sampling[1]
k[0, 0] = 1
sinc = xp.sin(k) / k * dk2
sinc[0, 0] = dk2
return sinc
[docs]def superpose_deltas(
positions: np.ndarray,
array: np.ndarray,
weights: np.ndarray = None,
round_positions: bool = False,
) -> np.ndarray:
"""
Add superposition of delta functions at given positions to a 2D array.
Parameters
----------
positions : np.ndarray
Array of 2D positions as an nx2 array. The positions are given in units of pixels.
array : np.ndarray
The delta functions are added to this 2D array.
weights : np.ndarray, optional
If given each delta function is weighted by the given factor. Must match the length of `positions`.
round_positions : bool, optional
If True, the delta function positions are rounded to the center of the nearest pixel, otherwise subpixel
precision is used.
"""
xp = get_array_module(array)
shape = array.shape
positions = xp.array(positions)
if round_positions:
rounded = xp.round(positions).astype(xp.int32)
i, j = rounded[:, 0][None] % shape[0], rounded[:, 1][None] % shape[1]
v = xp.array([1.0], dtype=xp.float32)[:, None]
else:
rounded = xp.floor(positions).astype(xp.int32)
rows, cols = rounded[:, 0], rounded[:, 1]
x = positions[:, 0] - rows
y = positions[:, 1] - cols
xy = x * y
i = xp.array([rows % shape[0], (rows + 1) % shape[0]] * 2)
j = xp.array([cols % shape[1]] * 2 + [(cols + 1) % shape[1]] * 2)
v = xp.array([1 + xy - y - x, x - xy, y - xy, xy], dtype=xp.float32)
if weights is not None:
v = v * weights[None]
if device_name_from_array_module(xp) == "cpu":
xp.add.at(array, (i, j), v)
elif device_name_from_array_module(xp) == "gpu":
cupyx.scatter_add(array, (i, j), v)
else:
raise RuntimeError()
return array
[docs]class ProjectedScatteringFactorIntegrator(ProjectionIntegrator):
"""
A ProjectionIntegrator calculating infinite projections of radial potential parametrizations. The hybrid real and
reciprocal space method by Wouter Van den Broek et al. is used.
Parameters
----------
scattering_factor : 2d array
Array representing a projected scattering factor. The zero-frequency is assumed to be at [0,0].
References
----------
W. Van den Broek et al. Ultramicroscopy, 158:89–97, 2015. doi:10.1016/j.ultramic.2015.07.005.
"""
[docs] def __init__(self, scattering_factor: np.ndarray):
self._scattering_factor = scattering_factor
@property
def scattering_factor(self) -> np.ndarray:
"""Projected scattering factor array on a 2D grid."""
return self._scattering_factor
[docs] def integrate_on_grid(
self,
positions: np.ndarray,
a: np.ndarray,
b: np.ndarray,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
fourier_space: bool = False,
):
xp = get_array_module(device)
if len(positions) == 0:
return xp.zeros(gpts, dtype=xp.float32)
positions = (positions[:, :2] / sampling).astype(xp.float32)
array = xp.zeros(gpts, dtype=xp.float32)
array = superpose_deltas(positions, array).astype(xp.complex64)
array = fft2(array, overwrite_x=True)
f = self.scattering_factor / sinc(
self._scattering_factor.shape[-2:], sampling, device
)
array *= f
if fourier_space:
return array.real
else:
array = ifft2(array, overwrite_x=True).real
return array
[docs]class ProjectedScatteringFactors(ProjectionIntegratorPlan):
"""
The :class:`.ProjectedScatteringFactors` facilitates the creation of :class:`.ProjectedScatteringFactorIntegrator`
objects using the ``.build`` method given a grid and a chemical symbol.
Parameters
----------
parametrization : str or Parametrization, optional
The radial potential parametrization to integrate. Default is the Lobato parametrization.
"""
[docs] def __init__(self, parametrization: str | Parametrization = "lobato"):
self._parametrization = validate_parametrization(parametrization)
super().__init__(periodic=True, finite=False)
@property
def parametrization(self) -> Parametrization:
return self._parametrization
[docs] def cutoff(self, symbol: str) -> float:
return np.inf
[docs] def build(
self,
symbol: str,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
) -> ProjectedScatteringFactorIntegrator:
xp = get_array_module(device)
kx, ky = spatial_frequencies(gpts, sampling, xp=np)
k2 = kx[:, None] ** 2 + ky[None] ** 2
f = self.parametrization.projected_scattering_factor(symbol)(k2)
f = xp.asarray(f, dtype=xp.float32)
if symbol in self.parametrization.sigmas.keys():
sigma = self.parametrization.sigmas[symbol]
f = f * xp.exp(
-xp.asarray(k2, dtype=xp.float32)
* (xp.pi * sigma / xp.sqrt(3 / 2)) ** 2
)
return ProjectedScatteringFactorIntegrator(f)
[docs]@jit(nopython=True, nogil=True)
def interpolate_radial_functions(
array: np.ndarray,
positions: np.ndarray,
disk_indices: np.ndarray,
sampling: tuple[float, float],
radial_gpts: np.ndarray,
radial_functions: np.ndarray,
radial_derivative: np.ndarray,
):
n = radial_gpts.shape[0]
dt = np.log(radial_gpts[-1] / radial_gpts[0]) / (n - 1)
for i in range(positions.shape[0]):
px = int(round(positions[i, 0] / sampling[0]))
py = int(round(positions[i, 1] / sampling[1]))
for j in range(disk_indices.shape[0]):
k = px + disk_indices[j, 0]
m = py + disk_indices[j, 1]
if (k < array.shape[0]) & (m < array.shape[1]) & (k >= 0) & (m >= 0):
r_interp = np.sqrt(
(k * sampling[0] - positions[i, 0]) ** 2
+ (m * sampling[1] - positions[i, 1]) ** 2
)
idx = int(np.floor(np.log(r_interp / radial_gpts[0] + 1e-12) / dt))
if idx < 0:
array[k, m] += radial_functions[i, 0]
elif idx < n - 1:
slope = radial_derivative[i, idx]
array[k, m] += (
radial_functions[i, idx] + (r_interp - radial_gpts[idx]) * slope
)
[docs]class ProjectionIntegralTable(ProjectionIntegrator):
"""
A ProjectionIntegrator calculating finite projections of radial potential parametrizations. An integral table
for each used to evaluate the projection integrals for each atom in a slice given p integral limits.
The projected potential evaluated along the
Parameters
----------
radial_gpts : array
The points along a radial in the `xy`-plane where the projection integrals of the integral table are evaluated.
limits : array
The points along the projection direction where the projection integrals are evaluated.
"""
[docs] def __init__(self, radial_gpts: np.ndarray, limits: np.ndarray, values: np.ndarray):
assert values.shape[0] == len(limits)
assert values.shape[1] == len(radial_gpts)
self._radial_gpts = radial_gpts
self._limits = limits
self._values = values
@property
def radial_gpts(self) -> np.ndarray:
return self._radial_gpts
@property
def limits(self) -> np.ndarray:
return self._limits
@property
def values(self) -> np.ndarray:
return self._values
def integrate(self, a: float | np.ndarray, b: float | np.ndarray) -> np.ndarray:
f = interp1d(
self.limits, self.values, axis=0, kind="linear", fill_value="extrapolate"
)
return f(b) - f(a)
[docs] def integrate_on_grid(
self,
positions: np.ndarray,
a: float,
b: float,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
) -> np.ndarray:
# assert len(a) == len(b) == len(positions)
# assert len(a.shape) == 1
# assert len(b.shape) == 1
if np.isscalar(a):
a = np.array([a] * len(positions))
if np.isscalar(b):
b = np.array([b] * len(positions))
assert len(sampling) == 2
xp = get_array_module(device)
array = xp.zeros(gpts, dtype=xp.float32)
a = a - positions[:, 2]
b = b - positions[:, 2]
disk_indices = xp.asarray(
disc_meshgrid(int(np.ceil(self._radial_gpts[-1] / np.min(sampling))))
)
radial_potential = xp.asarray(self.integrate(a, b))
positions = xp.asarray(positions, dtype=np.float32)
# sampling = xp.array(sampling, dtype=xp.float32)
# positions[:, :2] = xp.round(positions[:, :2] / sampling) * sampling
radial_potential_derivative = xp.zeros_like(radial_potential)
radial_potential_derivative[:, :-1] = (
xp.diff(radial_potential, axis=1) / xp.diff(self.radial_gpts)[None]
)
if xp is cp:
interpolate_radial_functions_cuda(
array=array,
positions=positions,
disk_indices=disk_indices,
sampling=sampling,
radial_gpts=xp.asarray(self.radial_gpts),
radial_functions=radial_potential,
radial_derivative=radial_potential_derivative,
)
else:
interpolate_radial_functions(
array=array,
positions=positions,
disk_indices=disk_indices,
sampling=sampling,
radial_gpts=self.radial_gpts,
radial_functions=radial_potential,
radial_derivative=radial_potential_derivative,
)
return array
[docs]def cutoff(func: callable, tolerance: float, a: float, b: float) -> float:
"""
Calculate the point where a function becomes lower than a given tolerance within a given bracketing interval.
Parameters
----------
func : callable
The function to calculate the cutoff for.
tolerance : float
The tolerance to calculate the cutoff for.
a : float
One end of the bracketing interval.
b : float
The other end of the bracketing interval.
Returns
-------
cutoff : float
"""
f = brentq(f=lambda r: np.abs(func(r)) - tolerance, a=a, b=b) # noqa
return f
[docs]def cutoff_taper(radial_gpts, cutoff, taper):
taper_start = taper * cutoff
taper_mask = radial_gpts > taper_start
taper_values = np.ones_like(radial_gpts)
taper_values[taper_mask] = (
np.cos(np.pi * (radial_gpts[taper_mask] - taper_start) / (cutoff - taper_start))
+ 1.0
) / 2
return taper_values
[docs]class ProjectionQuadratureRule(ProjectionIntegratorPlan):
"""
Projection integration plan for calculating finite projection integrals based on Gaussian quadrature rule.
Parameters
----------
parametrization : str or Parametrization, optional
The potential parametrization describing the radial dependence of the potential. Default is 'lobato'.
cutoff_tolerance : float, optional
The error tolerance used for deciding the radial cutoff distance of the potential [eV / e]. Default is 1e-3.
taper : float, optional
The fraction from the cutoff of the radial distance from the core where the atomic potential starts tapering
to zero. Default is 0.85.
integration_step : float, optional
The step size between integration limits used for calculating the integral table. Default is 0.02.
quad_order : int, optional
Order of quadrature integration passed to scipy.integrate.fixed_quad. Default is 8.
"""
[docs] def __init__(
self,
parametrization: str | Parametrization = "lobato",
cutoff_tolerance: float = 1e-3,
taper: float = 0.85,
integration_step: float = 0.02,
quad_order: int = 8,
):
self._parametrization = validate_parametrization(parametrization)
self._taper = taper
self._quad_order = quad_order
self._cutoff_tolerance = cutoff_tolerance
self._integration_step = integration_step
super().__init__(periodic=False, finite=True)
@property
def parametrization(self):
"""The potential parametrization describing the radial dependence of the potential."""
return self._parametrization
@property
def quad_order(self):
"""Order of quadrature integration."""
return self._quad_order
@property
def cutoff_tolerance(self) -> float:
"""The error tolerance used for deciding the radial cutoff distance of the potential [eV / e]."""
return self._cutoff_tolerance
@property
def integration_step(self) -> float:
"""The step size between integration limits used for calculating the integral table."""
return self._integration_step
[docs] def cutoff(self, symbol: str) -> float:
return cutoff(
self.parametrization.potential(symbol), self.cutoff_tolerance, a=1e-3, b=1e3
)
@staticmethod
def _radial_gpts(inner_cutoff: float, cutoff: float) -> np.ndarray:
num_points = int(np.ceil(cutoff / inner_cutoff))
return np.geomspace(inner_cutoff, cutoff, num_points)
@staticmethod
def _taper_values(radial_gpts: np.ndarray, cutoff: float, taper: float):
taper_start = taper * cutoff
taper_mask = radial_gpts > taper_start
taper_values = np.ones_like(radial_gpts)
taper_values[taper_mask] = (
np.cos(
np.pi * (radial_gpts[taper_mask] - taper_start) / (cutoff - taper_start)
)
+ 1.0
) / 2
return taper_values
def _integral_limits(self, cutoff: float):
limits = np.linspace(-cutoff, 0, int(np.ceil(cutoff / self._integration_step)))
return np.concatenate((limits, -limits[::-1][1:]))
[docs] def build_integral_table(
self, symbol: str, inner_limit: float
) -> ProjectionIntegralTable:
"""
Build table of projection integrals of the radial atomic potential.
Parameters
----------
symbol : str
Chemical symbol to build the integral table.
inner_limit : float, optional
Smallest radius from the core at which to calculate the projection integral [Å].
Returns
-------
projection_integral_table :
ProjectionIntegralTable
"""
potential = self.parametrization.potential(symbol)
cutoff = self.cutoff(symbol)
radial_gpts = self._radial_gpts(inner_limit, cutoff)
limits = self._integral_limits(cutoff)
projection = lambda z: potential(
np.sqrt(radial_gpts[:, None] ** 2 + z[None] ** 2)
)
table = np.zeros((len(limits) - 1, len(radial_gpts)))
table[0, :] = integrate.fixed_quad(
projection, -limits[0] * 2, limits[0], n=self._quad_order
)[0]
for j, (a, b) in enumerate(zip(limits[1:-1], limits[2:])):
table[j + 1] = (
table[j] + integrate.fixed_quad(projection, a, b, n=self._quad_order)[0]
)
table = table * self._taper_values(radial_gpts, cutoff, self._taper)[None]
return ProjectionIntegralTable(radial_gpts, limits[1:], table)
[docs] def build(
self,
symbol: str,
gpts: tuple[int, int],
sampling: tuple[float, float],
device: str = "cpu",
) -> ProjectionIntegralTable:
inner_limit = min(sampling) / 2
return self.build_integral_table(symbol, inner_limit)