Source code for corrct.projectors

# -*- coding: utf-8 -*-
"""
Tomographic projectors.

@author: Nicola VIGANĂ’, Computational Imaging group, CWI, The Netherlands,
and ESRF - The European Synchrotron, Grenoble, France
"""

import numpy as np

from scipy.sparse import spmatrix

from . import operators
from . import _projector_backends as prj_backends
from . import models
from .attenuation import AttenuationVolume

import concurrent.futures as cf
import multiprocessing as mp

from typing import Union, Sequence, Optional, Callable
from numpy.typing import ArrayLike, DTypeLike, NDArray


num_threads = round(np.log2(mp.cpu_count() + 1))
astra_available = prj_backends.has_astra and prj_backends.has_cuda


[docs]class ProjectorMatrix(operators.ProjectorOperator): """Projector that uses an explicit projection matrix.""" A: Union[NDArray, spmatrix] def __init__(self, A: Union[NDArray, spmatrix], vol_shape: ArrayLike, prj_shape: ArrayLike) -> None: """ Initialize the matrix projector. Parameters ---------- A : NDArray | spmatrix The projection matrix. vol_shape : ArrayLike Volume shape. prj_shape : ArrayLike Projection shape. """ self.vol_shape = np.array(vol_shape, ndmin=1, dtype=int) self.prj_shape = np.array(prj_shape, ndmin=1, dtype=int) self.A = A super().__init__() def _transpose(self) -> operators.ProjectorOperator: """ Create the transpose operator. Returns ------- operators.ProjectorOperator The transpose operator. """ return ProjectorMatrix(self.A.transpose(), self.prj_shape, self.vol_shape)
[docs] def absolute(self) -> operators.ProjectorOperator: """ Return the projection operator using the absolute value of the projection coefficients. Returns ------- operators.ProjectorOperator The absolute value operator. """ return ProjectorMatrix(np.abs(self.A), self.vol_shape, self.prj_shape) # type: ignore
[docs] def fp(self, x: NDArray) -> NDArray: """ Define the interface for the forward-projection. Parameters ---------- x : NDArray Input volume. Returns ------- NDArray The projection data. """ return self.A.dot(x.flatten()).reshape(self.prj_shape)
[docs] def bp(self, x: NDArray) -> NDArray: """ Define the interface for the back-projection. Parameters ---------- x : NDArray Input projection data. Returns ------- NDArray The back-projected volume. """ return self.A.transpose().dot(x.flatten()).reshape(self.vol_shape)
[docs]class ProjectorUncorrected(operators.ProjectorOperator): """Base projection class.""" vol_geom: models.VolumeGeometry projector_backend: prj_backends.ProjectorBackend prj_intensities: Union[NDArray[np.floating], None] psf: Union[NDArray[np.floating], float, None] def __init__( self, vol_geom: Union[Sequence[int], NDArray[np.integer], models.VolumeGeometry], angles_rot_rad: Union[Sequence[float], NDArray], rot_axis_shift_pix: Union[float, ArrayLike, NDArray, None] = None, *, prj_geom: Optional[models.ProjectionGeometry] = None, prj_intensities: Optional[ArrayLike] = None, psf: Optional[ArrayLike] = None, backend: Union[str, prj_backends.ProjectorBackend] = "astra" if astra_available else "skimage", create_single_projs: bool = False, ): """Initialize the base projection class. It implements the forward and back projection of the single lines of a sinogram. It takes care of initializing and disposing the ASTRA projectors when used in a *with* statement. It supports both 2D and 3D geometries. Parameters ---------- vol_geom : Sequence[int] | NDArray[np.integer] | models.VolumeGeometry The volume shape in Y X and optionally Z. angles_rot_rad : Sequence[float] | NDArray The rotation angles. rot_axis_shift_pix : float | ArrayLike | NDArray, optional The rotation axis shift(s) in pixels, by default None. prj_geom : models.ProjectionGeometry | None, optional The fully specified projection geometry. When active, the rotation axis shift is ignored, by default None. prj_intensities : ArrayLike | NDArray | None, optional Projection scaling factor, by default None. psf : ArrayLike | NDArray | None, optional The "point spread function" of the detector, by default None. backend : bool, optional Whether to use ASTRA or fall back to scikit-image. The default is True if CUDA and ASTRA are available, otherwise False. create_single_projs : bool, optional Whether to create projectors for single projections. Used for corrections and SART, by default False. Raises ------ ValueError When the geometry is not correct. """ if not astra_available: astra_status = f"astra: {prj_backends.has_astra}, cuda: {prj_backends.has_cuda}" if isinstance(backend, str) and backend == "astra": backend = "skimage" print(f"WARNING: ASTRA backend requested but not available ({astra_status}). Falling back to scikit-image.") elif isinstance(backend, prj_backends.ProjectorBackendASTRA): raise ValueError(f"Passed ASTRA projector, but astra not available ({astra_status}).") if not isinstance(vol_geom, models.VolumeGeometry): vol_geom = models.VolumeGeometry(_vol_shape_xyz=np.array(vol_geom)) self.vol_geom = vol_geom self.prj_geom = prj_geom if not len(self.vol_geom.shape_xyz) in (2, 3): raise ValueError("Only 2D or 3D volumes are valid") if not self.vol_geom.is_square(): raise ValueError("Only square volumes are valid") if isinstance(backend, str): if backend == "astra": if prj_backends.has_astra_direct: self.projector_backend = prj_backends.ProjectorBackendDirectASTRA() else: self.projector_backend = prj_backends.ProjectorBackendASTRA() elif backend == "skimage": self.projector_backend = prj_backends.ProjectorBackendSKimage() else: raise ValueError(f"Unknown backend: {backend}. Available options are: 'astra', 'skimage'.") else: self.projector_backend = backend if not (prj_geom is None or isinstance(self.projector_backend, prj_backends.ProjectorBackendASTRA)): raise ValueError("Using class `ProjectionGeometry` requires using astra-toolbox.") self.projector_backend.initialize_geometry( vol_geom=vol_geom, angles_rot_rad=angles_rot_rad, rot_axis_shift_pix=rot_axis_shift_pix, prj_geom=prj_geom, create_single_projs=create_single_projs, ) if prj_intensities is not None: prj_intensities = np.array(prj_intensities, dtype=np.floating) self.prj_intensities = prj_intensities self._set_psf(psf) self.vol_shape = np.array(self.projector_backend.get_vol_shape(), ndmin=1) self.prj_shape = np.array(self.projector_backend.get_prj_shape(), ndmin=1) super().__init__() @property def angles_rot_rad(self) -> NDArray: """Simplify access to the rotation angles (in radians). Returns ------- NDArray The rotation angles (in radians). """ return self.projector_backend.angles_w_rad def __enter__(self): """Initialize the with statement block.""" self.projector_backend.make_ready() return self def __exit__(self, *args): """De-initialize the with statement block.""" self.projector_backend.dispose() def _set_psf(self, psf: Optional[ArrayLike], is_conv_symm: bool = False) -> None: if psf is not None: psf = np.squeeze(np.array(psf)) if len(psf.shape) >= len(self.vol_geom.shape_xyz): raise ValueError( "PSF should either be 1D (for 2D and 3D reconstructions) or 2D (for 3D reconstructions)." + f" Passed PSF has shape: {psf.shape}, and the reconstruction is {len(self.vol_geom.shape_xyz)}D." ) # This redundancy is required, due to the different results between the single-angle and multi-angle projections prj_shape_vu = [*self.projector_backend.prj_shape_vu[:-2], self.projector_backend.prj_shape_vu[-1]] prj_shape_vu = np.array(prj_shape_vu, ndmin=1) self.psf_vu = operators.TransformConvolution(prj_shape_vu, kernel=psf, is_symm=is_conv_symm) prj_shape_vwu = np.array(self.projector_backend.prj_shape_vwu, ndmin=1) self.psf_vwu = operators.TransformConvolution(prj_shape_vwu, kernel=psf[..., None, :], is_symm=is_conv_symm) else: self.psf_vu = self.psf_vwu = None
[docs] def get_pre_weights(self) -> Union[NDArray, None]: """Compute the pre-weights of the projector geometry (notably for cone-beam geometries). Returns ------- Union[NDArray, None] The computed detector weights """ if self.prj_geom is None: return None else: return self.prj_geom.get_pre_weights([*self.prj_shape[-3:-2], self.prj_shape[-1]])
[docs] def fp_angle(self, vol: NDArray, angle_ind: int) -> NDArray: """Forward-project a volume to a single sinogram line. Parameters ---------- vol : NDArray The volume to forward-project. angle_ind : int The angle index to foward project. Returns ------- x : NDArray The forward-projected sinogram line. """ prj_vu = self.projector_backend.fp(vol, angle_ind) if self.prj_intensities is not None: prj_vu *= self.prj_intensities[angle_ind] if self.psf_vu is not None: prj_vu = self.psf_vu(prj_vu) return prj_vu
[docs] def bp_angle(self, prj_vu: NDArray, angle_ind: int) -> NDArray: """Back-project a single sinogram line to the volume. Parameters ---------- prj_vu : NDArray The sinogram to back-project or a single line. angle_ind : int The angle index to foward project. Returns ------- NDArray The back-projected volume. """ if self.prj_intensities is not None: prj_vu = prj_vu * self.prj_intensities[angle_ind] # It will make a copy if self.psf_vu is not None: prj_vu = self.psf_vu.T(prj_vu) return self.projector_backend.bp(prj_vu, angle_ind)
[docs] def fp(self, vol: NDArray) -> NDArray: """ Forward-projection of the volume to the projection data. Parameters ---------- vol : NDArray The volume to forward-project. Returns ------- NDArray The forward-projected projection data. """ prj_vwu = self.projector_backend.fp(vol) if self.prj_intensities is not None: prj_vwu *= self.prj_intensities[:, np.newaxis] if self.psf_vwu is not None: prj_vwu = self.psf_vwu(prj_vwu) return prj_vwu
[docs] def bp(self, prj_vwu: NDArray) -> NDArray: """ Back-projection of the projection data to the volume. Parameters ---------- prj_vwu : NDArray The projection data to back-project. Returns ------- NDArray The back-projected volume. """ if self.prj_intensities is not None: prj_vwu = prj_vwu * self.prj_intensities[:, np.newaxis] # Needs copy if self.psf_vwu is not None: prj_vwu = self.psf_vwu.T(prj_vwu) return self.projector_backend.bp(prj_vwu)
[docs]class ProjectorAttenuationXRF(ProjectorUncorrected): """ Attenuation corrected projection class for XRF, with multi-detector support. It includes the computation of the attenuation volumes. """ att_vol_angles: NDArray[np.floating] def __init__( self, vol_geom: Union[Sequence[int], NDArray[np.integer], models.VolumeGeometry], angles_rot_rad: Union[Sequence[float], NDArray], rot_axis_shift_pix: Union[float, ArrayLike, NDArray, None] = None, *, prj_geom: Optional[models.ProjectionGeometry] = None, prj_intensities: Optional[ArrayLike] = None, backend: Union[str, prj_backends.ProjectorBackend] = "astra" if astra_available else "skimage", att_maps: Optional[NDArray[np.floating]] = None, att_in: Optional[NDArray[np.floating]] = None, att_out: Optional[NDArray[np.floating]] = None, angles_detectors_rad: Union[float, ArrayLike] = (np.pi / 2), weights_detectors: Optional[ArrayLike] = None, psf: Optional[ArrayLike] = None, is_symmetric: bool = False, weights_angles: Optional[ArrayLike] = None, use_multithreading: bool = True, data_type: DTypeLike = np.float32, verbose: bool = True, ): """ Initialize the (attenuation corrected) XRF dedicated projector. Parameters ---------- vol_geom : Sequence[int] | NDArray[np.integer] | models.VolumeGeometry The volume shape in X Y and optionally Z. angles_rot_rad : Sequence[float] | NDArray The rotation angles. rot_axis_shift_pix : float | ArrayLike | NDArray | None, optional The rotation axis shift(s) in pixels. The default is None. prj_geom : Optional[models.ProjectionGeometry], optional The fully specified projection geometry. When active, the rotation axis shift is ignored. The default is None. prj_intensities : Optional[ArrayLike], optional Projection scaling factor. The default is None. backend : str | prj_backends.ProjectorBackend, optional Projector backend to use, by default "astra" if astra is available, otherwise "skimage". att_maps : Optional[NDArray[np.floating]], optional Precomputed attenuation maps for each angle, by default None att_in : Optional[ArrayLike], optional Attenuation volume of the incoming beam. The default is None. att_out : Optional[ArrayLike], optional Attenuation volume of the outgoing beam. The default is None. angles_detectors_rad : float | ArrayLike, optional Angles of the detector elements with respect to incident beam. The default is (np.pi / 2). weights_detectors : Optional[ArrayLike], optional Weights (e.g. solid angle, efficiency, etc) of the detector elements. The default is None. psf : Optional[ArrayLike], optional Optical system's point spread function (PSF). The default is None. is_symmetric : bool, optional Whether the projector is symmetric or not. The default is False. weights_angles : Optional[ArrayLike], optional Projection weight for a given element at a given angle. The default is None. use_multithreading : bool, optional Whether to use multiple threads or not. The default is True. data_type : DTypeLike, optional Output data type. The default is np.float32. verbose : bool, optional Whether to produce verbose output. The default is True. Raises ------ ValueError When given inconsistent numbers of detector weights and detector angles. """ ProjectorUncorrected.__init__( self, vol_geom=vol_geom, angles_rot_rad=angles_rot_rad, rot_axis_shift_pix=rot_axis_shift_pix, prj_geom=prj_geom, psf=psf, prj_intensities=prj_intensities, backend=backend, create_single_projs=True, ) self.data_type = data_type self.use_multithreading = use_multithreading self.verbose = verbose self.is_symmetric = is_symmetric self.angles_det_rad = np.array(angles_detectors_rad, ndmin=1) num_det_angles = len(self.angles_det_rad) if weights_detectors is None: weights_detectors = np.ones_like(self.angles_det_rad) self.weights_det = np.array(weights_detectors, ndmin=1) num_det_weights = len(self.weights_det) if num_det_angles > 1: if num_det_weights == 1: self.weights_det = np.tile(self.weights_det, [num_det_angles]) elif num_det_weights > 1 and not num_det_weights == num_det_angles: raise ValueError( "Number of detector weights differs from number of" + " detector angles: %d vs %d" % (num_det_weights, num_det_angles) ) if weights_angles is None: weights_angles = np.ones((len(self.angles_rot_rad), num_det_angles)) else: weights_angles = np.array(weights_angles) self.weights_angles = weights_angles if att_maps is None: m = AttenuationVolume(att_in, att_out, self.angles_rot_rad, self.angles_det_rad) m.compute_maps(use_multithreading=self.use_multithreading, verbose=self.verbose) self.att_vol_angles = m.get_maps() else: if att_maps.shape[1] != num_det_angles: raise ValueError(f"Number of maps ({att_maps.shape[1]}) differs from number of detectors ({num_det_angles})") if not np.all(np.equal(att_maps.shape[-2:], self.vol_shape[-2:])): raise ValueError( f"Mismatching volume shape of input volume ({att_maps.shape[-2:]})" + f" with vol_shape ({self.vol_shape} in 2D -> {self.vol_shape[-2:]})" ) self.att_vol_angles = att_maps def __enter__(self): """Initialize the with statement block.""" if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): self.executor = cf.ThreadPoolExecutor(max_workers=num_threads) return super().__enter__() def __exit__(self, *args): """De-initialize the with statement block.""" super().__exit__() if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): self.executor.shutdown()
[docs] def collapse_detectors(self) -> None: """Convert multi-detector configurations into single-detector.""" weights = np.reshape(self.weights_det, [1, -1, 1, 1]) / np.sum(self.weights_det) self.att_vol_angles = np.sum(self.att_vol_angles * weights, axis=1) weights = np.squeeze(weights) self.angles_det_rad = np.sum(self.angles_det_rad * weights, keepdims=True) self.weights_angles = np.sum(self.weights_angles * weights, axis=1, keepdims=True) self.weights_det = np.sum(self.weights_det, keepdims=True)
[docs] def fp_angle(self, vol: NDArray, angle_ind: int) -> NDArray: """Forward-project the volume to a single sinogram line. It applies the attenuation corrections. Parameters ---------- vol : NDArray The volume to forward-project. angle_ind : int The angle index to foward project. Returns ------- sino_line : NDArray The forward-projected sinogram line. """ temp_vol = vol * self.att_vol_angles[angle_ind, ...] weights = self.weights_det * self.weights_angles[angle_ind, :] sino_line = [ weights[ii] * ProjectorUncorrected.fp_angle(self, temp_vol[ii], angle_ind) for ii in range(len(self.angles_det_rad)) ] sino_line = np.ascontiguousarray(np.stack(sino_line, axis=0)) if sino_line.shape[0] == 1: sino_line = np.squeeze(sino_line, axis=0) return sino_line
[docs] def bp_angle(self, sino: NDArray, angle_ind: int, single_line: bool = False) -> NDArray: """Back-project a single sinogram line to the volume. It only applies the attenuation corrections if the projector is symmetric. Parameters ---------- sino : NDArray The sinogram to back-project or a single line. angle_ind : int The angle index to foward project. single_line : bool, optional Whether the input is a single sinogram line. The default is False. Returns ------- NDArray The back-projected volume. """ if single_line: sino_line = sino else: sino_line = sino[..., angle_ind, :] sino_line = np.reshape(sino_line, [len(self.weights_det), *sino_line.shape[-(len(self.vol_shape) - 1) :]]) weights = self.weights_det * self.weights_angles[angle_ind, :] vol = [ weights[ii] * ProjectorUncorrected.bp_angle(self, sino_line[ii, ...], angle_ind) for ii in range(len(self.angles_det_rad)) ] vol = np.stack(vol, axis=0) if self.is_symmetric: vol *= self.att_vol_angles[angle_ind, ...] return np.sum(vol, axis=0)
[docs] def fp(self, vol: NDArray) -> NDArray: """Forward-project the volume to the sinogram. It applies the attenuation corrections. Parameters ---------- vol : NDArray The volume to forward-project. Returns ------- NDArray The forward-projected sinogram. """ if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): sino_lines = self.executor.map(lambda x: self.fp_angle(vol, x), range(len(self.angles_rot_rad))) sino_lines = [*sino_lines] else: sino_lines = [self.fp_angle(vol, ii) for ii in range(len(self.angles_rot_rad))] sino_lines = np.ascontiguousarray(np.stack(sino_lines, axis=-2)) if sino_lines.shape[0] == 1: sino_lines = np.squeeze(sino_lines, axis=0) return sino_lines
[docs] def bp(self, sino: NDArray) -> NDArray: """Back-projection of the sinogram to the volume. Parameters ---------- sino : NDArray The sinogram to back-project. Returns ------- NDArray The back-projected volume. """ if self.is_symmetric: if self.use_multithreading and isinstance(self.projector_backend, prj_backends.ProjectorBackendSKimage): vols = self.executor.map(lambda x: self.bp_angle(sino, x), range(len(self.angles_rot_rad))) vols = [*vols] else: vols = [self.bp_angle(sino, ii) for ii in range(len(self.angles_rot_rad))] return np.sum(vols, axis=0) else: sino = np.reshape(sino, [len(self.weights_det), *self.prj_shape]) vol = [ProjectorUncorrected.bp(self, sino[ii, ...]) for ii in range(len(self.angles_det_rad))] return np.sum(np.stack(vol, axis=0), axis=0)