Source code for corrct.projectors

"""
Tomographic projectors.

@author: Nicola VIGANĂ’, Computational Imaging group, CWI, The Netherlands,
and ESRF - The European Synchrotron, Grenoble, France
"""

import concurrent.futures as cf
import multiprocessing as mp
from collections.abc import Sequence
from typing import Optional
from typing import Union
import numpy as np
from numpy.typing import ArrayLike
from numpy.typing import DTypeLike
from numpy.typing import NDArray
from scipy.sparse import spmatrix
from . import _projector_backends as prj_backends
from . import models
from . import operators
from .attenuation import AttenuationVolume


num_threads = round(np.log2(mp.cpu_count() + 1))
astra_available = prj_backends.has_astra and prj_backends.has_cuda


[docs] class ProjectorMatrix(operators.ProjectorOperator): """Projector that uses an explicit projection matrix.""" A: Union[NDArray, spmatrix] def __init__(self, A: Union[NDArray, spmatrix], vol_shape: ArrayLike, prj_shape: ArrayLike) -> None: """ Initialize the matrix projector. Parameters ---------- A : NDArray | spmatrix The projection matrix. vol_shape : ArrayLike Volume shape. prj_shape : ArrayLike Projection shape. """ self.vol_shape = np.array(vol_shape, ndmin=1, dtype=int) self.prj_shape = np.array(prj_shape, ndmin=1, dtype=int) self.A = A super().__init__() def _transpose(self) -> operators.ProjectorOperator: """ Create the transpose operator. Returns ------- operators.ProjectorOperator The transpose operator. """ return ProjectorMatrix(self.A.transpose(), self.prj_shape, self.vol_shape)
[docs] def absolute(self) -> operators.ProjectorOperator: """ Return the projection operator using the absolute value of the projection coefficients. Returns ------- operators.ProjectorOperator The absolute value operator. """ return ProjectorMatrix(np.abs(self.A), self.vol_shape, self.prj_shape) # type: ignore
[docs] def fp(self, x: NDArray) -> NDArray: """ Define the interface for the forward-projection. Parameters ---------- x : NDArray Input volume. Returns ------- NDArray The projection data. """ return self.A.dot(x.flatten()).reshape(self.prj_shape)
[docs] def bp(self, x: NDArray) -> NDArray: """ Define the interface for the back-projection. Parameters ---------- x : NDArray Input projection data. Returns ------- NDArray The back-projected volume. """ return self.A.transpose().dot(x.flatten()).reshape(self.vol_shape)
[docs] class ProjectorUncorrected(operators.ProjectorOperator): """Base projection class.""" vol_geom: models.VolumeGeometry projector_backend: prj_backends.ProjectorBackend prj_intensities: Union[NDArray[np.floating], None] psf: Union[NDArray[np.floating], float, None] def __init__( self, vol_geom: Union[Sequence[int], NDArray[np.integer], models.VolumeGeometry], angles_rot_rad: Union[Sequence[float], NDArray], rot_axis_shift_pix: Union[float, ArrayLike, NDArray, None] = None, *, prj_geom: Optional[models.ProjectionGeometry] = None, prj_intensities: Optional[ArrayLike] = None, psf: Optional[ArrayLike] = None, backend: Union[str, prj_backends.ProjectorBackend] = "astra" if astra_available else "skimage", create_single_projs: bool = False, ): """Initialize the base projection class. It implements the forward and back projection of the single lines of a sinogram. It takes care of initializing and disposing the ASTRA projectors when used in a *with* statement. It supports both 2D and 3D geometries. Parameters ---------- vol_geom : Sequence[int] | NDArray[np.integer] | models.VolumeGeometry The volume shape in Y X and optionally Z. angles_rot_rad : Sequence[float] | NDArray The rotation angles. rot_axis_shift_pix : float | ArrayLike | NDArray, optional The rotation axis shift(s) in pixels, by default None. prj_geom : models.ProjectionGeometry | None, optional The fully specified projection geometry. When active, the rotation axis shift is ignored, by default None. prj_intensities : ArrayLike | NDArray | None, optional Projection scaling factor, by default None. psf : ArrayLike | NDArray | None, optional The "point spread function" of the detector, by default None. backend : bool, optional Whether to use ASTRA or fall back to scikit-image. The default is True if CUDA and ASTRA are available, otherwise False. create_single_projs : bool, optional Whether to create projectors for single projections. Used for corrections and SART, by default False. Raises ------ ValueError When the geometry is not correct. """ if not astra_available: astra_status = f"astra: {prj_backends.has_astra}, cuda: {prj_backends.has_cuda}" if isinstance(backend, str) and backend == "astra": backend = "skimage" print(f"WARNING: ASTRA backend requested but not available ({astra_status}). Falling back to scikit-image.") elif isinstance(backend, prj_backends.ProjectorBackendASTRA): raise ValueError(f"Passed ASTRA projector, but astra not available ({astra_status}).") if not isinstance(vol_geom, models.VolumeGeometry): vol_geom = models.VolumeGeometry(_vol_shape_xyz=np.array(vol_geom)) self.vol_geom = vol_geom self.prj_geom = prj_geom if not len(self.vol_geom.shape_xyz) in (2, 3): raise ValueError("Only 2D or 3D volumes are valid") if not self.vol_geom.is_square(): raise ValueError("Only square volumes are valid") if isinstance(backend, str): if backend == "astra": if prj_backends.has_astra_direct: self.projector_backend = prj_backends.ProjectorBackendDirectASTRA() else: self.projector_backend = prj_backends.ProjectorBackendASTRA() elif backend == "skimage": self.projector_backend = prj_backends.ProjectorBackendSKimage() else: raise ValueError(f"Unknown backend: {backend}. Available options are: 'astra', 'skimage'.") else: self.projector_backend = backend if not (prj_geom is None or isinstance(self.projector_backend, prj_backends.ProjectorBackendASTRA)): raise ValueError("Using class `ProjectionGeometry` requires using astra-toolbox.") self.projector_backend.initialize_geometry( vol_geom=vol_geom, angles_rot_rad=angles_rot_rad, rot_axis_shift_pix=rot_axis_shift_pix, prj_geom=prj_geom, create_single_projs=create_single_projs, ) if prj_intensities is not None: prj_intensities = np.array(prj_intensities, dtype=np.floating) self.prj_intensities = prj_intensities self._set_psf(psf) self.vol_shape = np.array(self.projector_backend.get_vol_shape(), ndmin=1) self.prj_shape = np.array(self.projector_backend.get_prj_shape(), ndmin=1) super().__init__() @property def angles_rot_rad(self) -> NDArray: """Simplify access to the rotation angles (in radians). Returns ------- NDArray The rotation angles (in radians). """ return self.projector_backend.angles_w_rad def __enter__(self): """Initialize the with statement block.""" self.projector_backend.make_ready() return self def __exit__(self, *args): """De-initialize the with statement block.""" self.projector_backend.dispose() def _set_psf(self, psf: Optional[ArrayLike], is_conv_symm: bool = False) -> None: if psf is not None: psf = np.squeeze(np.array(psf)) if len(psf.shape) >= len(self.vol_geom.shape_xyz): raise ValueError( "PSF should either be 1D (for 2D and 3D reconstructions) or 2D (for 3D reconstructions)." + f" Passed PSF has shape: {psf.shape}, and the reconstruction is {len(self.vol_geom.shape_xyz)}D." ) # This redundancy is required, due to the different results between the single-angle and multi-angle projections prj_shape_vu = [*self.projector_backend.prj_shape_vu[:-2], self.projector_backend.prj_shape_vu[-1]] prj_shape_vu = np.array(prj_shape_vu, ndmin=1) self.psf_vu = operators.TransformConvolution(prj_shape_vu, kernel=psf, is_symm=is_conv_symm) prj_shape_vwu = np.array(self.projector_backend.prj_shape_vwu, ndmin=1) self.psf_vwu = operators.TransformConvolution(prj_shape_vwu, kernel=psf[..., None, :], is_symm=is_conv_symm) else: self.psf_vu = self.psf_vwu = None
[docs] def get_pre_weights(self) -> Union[NDArray, None]: """Compute the pre-weights of the projector geometry (notably for cone-beam geometries). Returns ------- Union[NDArray, None] The computed detector weights """ if self.prj_geom is None: return None else: return self.prj_geom.get_pre_weights([*self.prj_shape[-3:-2], self.prj_shape[-1]])
[docs] def fp_angle(self, vol: NDArray, angle_ind: int) -> NDArray: """Forward-project a volume to a single sinogram line. Parameters ---------- vol : NDArray The volume to forward-project. angle_ind : int The angle index to forward project. Returns ------- x : NDArray The forward-projected sinogram line. """ prj_vu = self.projector_backend.fp(vol, angle_ind) if self.prj_intensities is not None: prj_vu *= self.prj_intensities[angle_ind] if self.psf_vu is not None: prj_vu = self.psf_vu(prj_vu) return prj_vu
[docs] def bp_angle(self, prj_vu: NDArray, angle_ind: int) -> NDArray: """Back-project a single sinogram line to the volume. Parameters ---------- prj_vu : NDArray The sinogram to back-project or a single line. angle_ind : int The angle index to forward project. Returns ------- NDArray The back-projected volume. """ if self.prj_intensities is not None: prj_vu = prj_vu * self.prj_intensities[angle_ind] # It will make a copy if self.psf_vu is not None: prj_vu = self.psf_vu.T(prj_vu) return self.projector_backend.bp(prj_vu, angle_ind)
[docs] def fp(self, vol: NDArray) -> NDArray: """ Forward-projection of the volume to the projection data. Parameters ---------- vol : NDArray The volume to forward-project. Returns ------- NDArray The forward-projected projection data. """ prj_vwu = self.projector_backend.fp(vol) if self.prj_intensities is not None: prj_vwu *= self.prj_intensities[:, np.newaxis] if self.psf_vwu is not None: prj_vwu = self.psf_vwu(prj_vwu) return prj_vwu
[docs] def bp(self, prj_vwu: NDArray) -> NDArray: """ Back-projection of the projection data to the volume. Parameters ---------- prj_vwu : NDArray The projection data to back-project. Returns ------- NDArray The back-projected volume. """ if self.prj_intensities is not None: prj_vwu = prj_vwu * self.prj_intensities[:, np.newaxis] # Needs copy if self.psf_vwu is not None: prj_vwu = self.psf_vwu.T(prj_vwu) return self.projector_backend.bp(prj_vwu)
[docs] class ProjectorAttenuationXRF(ProjectorUncorrected): """ Attenuation corrected projection class for XRF, with multi-detector support. It includes the computation of the attenuation volumes. """ att_vol_angles: NDArray[np.floating] executor: Union[cf.ThreadPoolExecutor, None] def __init__( self, vol_geom: Union[Sequence[int], NDArray[np.integer], models.VolumeGeometry], angles_rot_rad: Union[Sequence[float], NDArray], rot_axis_shift_pix: Union[float, ArrayLike, NDArray, None] = None, *, prj_geom: Optional[models.ProjectionGeometry] = None, prj_intensities: Optional[ArrayLike] = None, backend: Union[str, prj_backends.ProjectorBackend] = "astra" if astra_available else "skimage", att_maps: Optional[NDArray[np.floating]] = None, att_in: Optional[NDArray[np.floating]] = None, att_out: Optional[NDArray[np.floating]] = None, angles_detectors_rad: Union[float, ArrayLike] = (np.pi / 2), weights_detectors: Optional[ArrayLike] = None, psf: Optional[ArrayLike] = None, is_symmetric: bool = False, weights_angles: Optional[ArrayLike] = None, use_multithreading: bool = True, data_type: DTypeLike = np.float32, verbose: bool = True, ): """ Initialize the (attenuation corrected) XRF dedicated projector. Parameters ---------- vol_geom : Sequence[int] | NDArray[np.integer] | models.VolumeGeometry The volume shape in X Y and optionally Z. angles_rot_rad : Sequence[float] | NDArray The rotation angles. rot_axis_shift_pix : float | ArrayLike | NDArray | None, optional The rotation axis shift(s) in pixels. The default is None. prj_geom : Optional[models.ProjectionGeometry], optional The fully specified projection geometry. When active, the rotation axis shift is ignored. The default is None. prj_intensities : Optional[ArrayLike], optional Projection scaling factor. The default is None. backend : str | prj_backends.ProjectorBackend, optional Projector backend to use, by default "astra" if astra is available, otherwise "skimage". att_maps : Optional[NDArray[np.floating]], optional Precomputed attenuation maps for each angle, by default None att_in : Optional[ArrayLike], optional Attenuation volume of the incoming beam. The default is None. att_out : Optional[ArrayLike], optional Attenuation volume of the outgoing beam. The default is None. angles_detectors_rad : float | ArrayLike, optional Angles of the detector elements with respect to incident beam. The default is (np.pi / 2). weights_detectors : Optional[ArrayLike], optional Weights (e.g. solid angle, efficiency, etc) of the detector elements. The default is None. psf : Optional[ArrayLike], optional Optical system's point spread function (PSF). The default is None. is_symmetric : bool, optional Whether the projector is symmetric or not. The default is False. weights_angles : Optional[ArrayLike], optional Projection weight for a given element at a given angle. The default is None. use_multithreading : bool, optional Whether to use multiple threads or not. The default is True. data_type : DTypeLike, optional Output data type. The default is np.float32. verbose : bool, optional Whether to produce verbose output. The default is True. Raises ------ ValueError When given inconsistent numbers of detector weights and detector angles. """ ProjectorUncorrected.__init__( self, vol_geom=vol_geom, angles_rot_rad=angles_rot_rad, rot_axis_shift_pix=rot_axis_shift_pix, prj_geom=prj_geom, psf=psf, prj_intensities=prj_intensities, backend=backend, create_single_projs=True, ) self.data_type = data_type self.use_multithreading = use_multithreading self.verbose = verbose self.is_symmetric = is_symmetric self.angles_det_rad = np.array(angles_detectors_rad, ndmin=1) num_det_angles = len(self.angles_det_rad) if weights_detectors is None: weights_detectors = np.ones_like(self.angles_det_rad) self.weights_det = np.array(weights_detectors, ndmin=1) num_det_weights = len(self.weights_det) if num_det_angles > 1: if num_det_weights == 1: self.weights_det = np.tile(self.weights_det, [num_det_angles]) elif num_det_weights > 1 and not num_det_weights == num_det_angles: raise ValueError( "Number of detector weights differs from number of" f" detector angles: {num_det_weights} vs {num_det_angles}" ) if weights_angles is None: weights_angles = np.ones((len(self.angles_rot_rad), num_det_angles)) else: weights_angles = np.array(weights_angles) self.weights_angles = weights_angles if att_maps is None: m = AttenuationVolume(att_in, att_out, self.angles_rot_rad, self.angles_det_rad) m.compute_maps(use_multithreading=self.use_multithreading, verbose=self.verbose) self.att_vol_angles = m.get_maps() else: if att_maps.shape[1] != num_det_angles: raise ValueError(f"Number of maps ({att_maps.shape[1]}) differs from number of detectors ({num_det_angles})") if not np.all(np.equal(att_maps.shape[-2:], self.vol_shape[-2:])): raise ValueError( f"Mismatching volume shape of input volume ({att_maps.shape[-2:]})" + f" with vol_shape ({self.vol_shape} in 2D -> {self.vol_shape[-2:]})" ) self.att_vol_angles = att_maps self.executor = None def __enter__(self): """Initialize the with statement block.""" if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): if self.executor is None: self.executor = cf.ThreadPoolExecutor(max_workers=num_threads) return super().__enter__() def __exit__(self, *args): """De-initialize the with statement block.""" super().__exit__() if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): if self.executor is not None: self.executor.shutdown() self.executor = None
[docs] def collapse_detectors(self) -> None: """Convert multi-detector configurations into single-detector.""" weights = np.reshape(self.weights_det, [1, -1, 1, 1]) / np.sum(self.weights_det) self.att_vol_angles = np.sum(self.att_vol_angles * weights, axis=1) weights = np.squeeze(weights) self.angles_det_rad = np.sum(self.angles_det_rad * weights, keepdims=True) self.weights_angles = np.sum(self.weights_angles * weights, axis=1, keepdims=True) self.weights_det = np.sum(self.weights_det, keepdims=True)
[docs] def fp_angle(self, vol: NDArray, angle_ind: int) -> NDArray: """Forward-project the volume to a single sinogram line. It applies the attenuation corrections. Parameters ---------- vol : NDArray The volume to forward-project. angle_ind : int The angle index to forward project. Returns ------- sino_line : NDArray The forward-projected sinogram line. """ temp_vol = vol * self.att_vol_angles[angle_ind, ...] weights = self.weights_det * self.weights_angles[angle_ind, :] sino_line = [ weights[ii] * ProjectorUncorrected.fp_angle(self, temp_vol[ii], angle_ind) for ii in range(len(self.angles_det_rad)) ] sino_line = np.ascontiguousarray(np.stack(sino_line, axis=0)) if sino_line.shape[0] == 1: sino_line = np.squeeze(sino_line, axis=0) return sino_line
[docs] def bp_angle(self, sino: NDArray, angle_ind: int, single_line: bool = False) -> NDArray: """Back-project a single sinogram line to the volume. It only applies the attenuation corrections if the projector is symmetric. Parameters ---------- sino : NDArray The sinogram to back-project or a single line. angle_ind : int The angle index to forward project. single_line : bool, optional Whether the input is a single sinogram line. The default is False. Returns ------- NDArray The back-projected volume. """ if single_line: sino_line = sino else: sino_line = sino[..., angle_ind, :] sino_line = np.reshape(sino_line, [len(self.weights_det), *sino_line.shape[-(len(self.vol_shape) - 1) :]]) weights = self.weights_det * self.weights_angles[angle_ind, :] vol = [ weights[ii] * ProjectorUncorrected.bp_angle(self, sino_line[ii, ...], angle_ind) for ii in range(len(self.angles_det_rad)) ] vol = np.stack(vol, axis=0) if self.is_symmetric: vol *= self.att_vol_angles[angle_ind, ...] return np.sum(vol, axis=0)
[docs] def fp(self, vol: NDArray) -> NDArray: """Forward-project the volume to the sinogram. It applies the attenuation corrections. Parameters ---------- vol : NDArray The volume to forward-project. Returns ------- NDArray The forward-projected sinogram. """ if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): sino_lines = self.executor.map(lambda x: self.fp_angle(vol, x), range(len(self.angles_rot_rad))) sino_lines = [*sino_lines] else: sino_lines = [self.fp_angle(vol, ii) for ii in range(len(self.angles_rot_rad))] sino_lines = np.ascontiguousarray(np.stack(sino_lines, axis=-2)) if sino_lines.shape[0] == 1: sino_lines = np.squeeze(sino_lines, axis=0) return sino_lines
[docs] def bp(self, sino: NDArray) -> NDArray: """Back-projection of the sinogram to the volume. Parameters ---------- sino : NDArray The sinogram to back-project. Returns ------- NDArray The back-projected volume. """ if self.is_symmetric: if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): vols = self.executor.map(lambda x: self.bp_angle(sino, x), range(len(self.angles_rot_rad))) vols = [*vols] else: vols = [self.bp_angle(sino, ii) for ii in range(len(self.angles_rot_rad))] return np.sum(vols, axis=0) else: sino = np.reshape(sino, [len(self.weights_det), *self.prj_shape]) vol = [ProjectorUncorrected.bp(self, sino[ii, ...]) for ii in range(len(self.angles_det_rad))] return np.sum(np.stack(vol, axis=0), axis=0)