#!/usr/bin/env python3
"""
Provide utility functions for testing.
Created on Thu Jun 4 12:28:21 2020
@author: Nicola VIGANÒ, Computational Imaging group, CWI, The Netherlands,
and ESRF - The European Synchrotron, Grenoble, France
"""
import numpy as np
from . import projectors, processing
try:
from . import physics
__has_physics__ = True
except ImportError as exc:
print("WARNING:", exc)
print("Physics based phantom creation will not be available.")
__has_physics__ = False
from collections.abc import Sequence
from typing import Optional
from typing import Union
from numpy.typing import DTypeLike
from numpy.typing import NDArray
import skimage.data as skd
import skimage.filters as skf
import skimage.transform as skt
import skimage.segmentation as sks
import skimage.morphology as skm
NDArrayFloat = NDArray[np.floating]
[docs]
def roundup_to_pow2(x: Union[int, float, NDArrayFloat], p: int, dtype: DTypeLike = int) -> Union[int, float, NDArrayFloat]:
"""Round first argument to the power of 2 indicated by second argument.
Parameters
----------
x : int | float | NDArrayFloat
Number to round up.
p : int
Power of 2.
dtype : DTypeLike, optional
data type of the output. The default is int.
Returns
-------
int | float | NDArrayFloat
Rounding up of input.
"""
return np.ceil(np.array(x) / (2**p)).astype(dtype) * (2**p)
[docs]
def download_phantom():
"""Download the phantom generation module from Nicolas Barbey's repository on github."""
phantom_url = "https://raw.githubusercontent.com/nbarbey/TomograPy/master/tomograpy/phantom.py"
phantom_path = "./phantom.py"
print(
"This example uses the phantom definition from the package Tomograpy, "
f"developed by Nicolas Barbey. The needed module will be downloaded from: {phantom_url}"
)
import urllib.request as urlreq
urlreq.urlretrieve(phantom_url, phantom_path)
with open(phantom_path, "r", encoding="utf-8") as f:
file_content = f.read()
with open(phantom_path, "w", encoding="utf-8") as f:
f.write(file_content.replace("xrange", "range"))
[docs]
def create_phantom_nuclei3d(
FoV_size: Union[int, None] = 100, dtype: DTypeLike = np.float32
) -> tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat]:
"""Create a 3D phantom of cell nuclei.
Parameters
----------
FoV_size : int | None
Size of the field-of-view in pixels, per edge, by default None
dtype : DTypeLike, optional
The dtype of the produced data, by default np.float32
Returns
-------
tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat]
The nuclei 3D phantom, the background, and the pixel sizes along each axis
"""
phantom = np.array(skd.cells3d(), dtype=dtype).swapaxes(0, 1)
nuclei: NDArrayFloat = phantom[1]
if FoV_size is None:
FoV_size = nuclei.shape[-1]
else:
nuclei = skt.rescale(nuclei, scale=FoV_size / nuclei.shape[-1], mode="reflect", channel_axis=0)
print(f"{FoV_size = }")
scale_factor = FoV_size / nuclei.shape[-1]
pixel_size_um = np.array([0.29, 0.26, 0.26], dtype=dtype) / scale_factor
circ_mask = processing.circular_mask([FoV_size, FoV_size], dtype=dtype)
altitude = skf.sobel(nuclei)
seeds_precip = nuclei > ((nuclei.max() - nuclei.min()) * 0.8 + nuclei.min())
seeds_nuclei = nuclei > (nuclei.mean() + nuclei.std())
seeds = np.zeros_like(nuclei, dtype=int)
seeds[seeds_nuclei] = 2
seeds[nuclei == nuclei.min()] = 1
seg_nuclei = sks.watershed(altitude, seeds)
seeds = np.zeros_like(nuclei, dtype=int)
seeds[np.logical_and(seeds_nuclei, np.logical_not(circ_mask))] = 2
seeds[nuclei == nuclei.min()] = 1
seg_nuclei_out = sks.watershed(altitude, seeds)
seg_nuclei = np.logical_and(seg_nuclei == 2, np.logical_not(seg_nuclei_out == 2))
seg_nuclei = skm.remove_small_objects(seg_nuclei)
seg_nuclei = skm.binary_dilation(seg_nuclei, footprint=skm.ball(1.5))
seeds = np.zeros_like(nuclei, dtype=int)
seeds[seeds_precip] = 2
seeds[nuclei == nuclei.min()] = 1
seg_precip = sks.watershed(skf.sobel(seg_nuclei), seeds)
seg_precip = skm.binary_dilation(seg_precip == 2)
ph_seg = seg_nuclei.copy() * 2
ph_seg[seg_precip] = 3
ph_seg[skm.dilation(seg_nuclei_out == 2, footprint=skm.ball(1.5))] = 1
background = nuclei * (ph_seg == 0)
for _ in range(15):
if np.all(background != 0):
break
dil_back = skm.dilation(nuclei * (ph_seg == 0), footprint=skm.ball(1))
background[background == 0] = dil_back[background == 0]
background = skf.gaussian(background, sigma=9)
return nuclei * (ph_seg == 2) + (ph_seg != 2) * background, background, pixel_size_um
[docs]
def phantom_assign_concentration(
ph_or: NDArrayFloat, element: str = "Ca", em_line: str = "KA", in_energy_keV: float = 20.0, voxel_size_um: float = 0.5
) -> tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat]:
"""Build an XRF phantom.
The created phantom has been used in:
- N. Viganò and V. A. Solé, “Physically corrected forward operators for
induced emission tomography: a simulation study,” Meas. Sci. Technol., no.
Advanced X-Ray Tomography, pp. 1–26, Nov. 2017.
Parameters
----------
ph_or : NDArrayFloat
The phases phantom map.
element : str, optional
Element symbol. The default is "Ca".
em_line : str, optional
Emission line. The default is "KA" (corresponding to K-alpha).
in_energy_keV : float, optional
Incoming beam energy in keV. The default is 20.0.
Returns
-------
vol_fluo_yield : NDArrayFloat
Voxel-wise fluorescence yields.
vol_att_in : NDArrayFloat
Voxel-wise attenuation at the incoming beam energy.
vol_att_out : NDArrayFloat
Voxel-wise attenuation at the emitted energy.
"""
if not __has_physics__:
raise RuntimeError("Physics module not available!")
ph_air = ph_or < 0.1
ph_FeO = 0.5 < ph_or
ph_CaO = np.logical_and(0.25 < ph_or, ph_or < 0.5)
ph_CaC = np.logical_and(0.1 < ph_or, ph_or < 0.25)
conv_mm_to_cm = 1e-1
conv_um_to_mm = 1e-3
voxel_size_cm = voxel_size_um * conv_um_to_mm * conv_mm_to_cm # cm to micron
print(f"Sample size: [{ph_or.shape[0] * voxel_size_um} {ph_or.shape[1] * voxel_size_um}] um")
phase_fractions = (ph_air, ph_FeO, ph_CaO, ph_CaC)
phase_compound_names = ("Air, Dry (near sea level)", "Ferric Oxide", "Calcium Oxide", "Calcium Carbonate")
phantom = physics.VolumeMaterial(phase_fractions, phase_compound_names, voxel_size_cm)
out_energy_keV, vol_fluo_yield = phantom.get_fluo_yield(element, in_energy_keV, em_line)
vol_lin_att_in = phantom.get_attenuation(in_energy_keV)
vol_lin_att_out = phantom.get_attenuation(out_energy_keV)
return (vol_fluo_yield, vol_lin_att_in, vol_lin_att_out)
[docs]
def phantom_assign_concentration_multi(
ph_or: NDArrayFloat,
elements: Sequence[str] = ("Ca", "Fe"),
em_lines: Union[str, Sequence[str]] = "KA",
in_energy_keV: float = 20.0,
detectors_pos_rad: Optional[float] = None,
) -> tuple[list[NDArrayFloat], NDArrayFloat, list[NDArrayFloat]]:
"""Build an XRF phantom.
The created phantom has been used in:
- N. Viganò and V. A. Solé, “Physically corrected forward operators for
induced emission tomography: a simulation study,” Meas. Sci. Technol., no.
Advanced X-Ray Tomography, pp. 1–26, Nov. 2017.
Parameters
----------
ph_or : NDArrayFloat
The phases phantom map.
elements : Sequence[str], optional
Element symbols. The default is ["Ca", "Fe"].
em_lines : str | Sequence[str], optional
Emission lines. The default is "KA" (corresponding to K-alpha).
in_energy_keV : float, optional
Incoming beam energy in keV. The default is 20.0.
detectors_pos_rad : float | tuple | list | NDArrayFloat, optional
Detector(s) positions in radians, with respect to incoming beam.
If None, Compton is not produced. The default is None.
Returns
-------
vol_yield : List[NDArrayFloat]
Voxel-wise fluorescence and Compton yields.
vol_att_in : NDArrayFloat
Voxel-wise attenuation at the incoming beam energy.
vol_att_out : List[NDArrayFloat]
Voxel-wise attenuation at the emitted energy.
"""
if not __has_physics__:
raise RuntimeError("Physics module not available!")
ph_air = ph_or < 0.1
ph_FeO = 0.5 < ph_or
ph_CaO = np.logical_and(0.25 < ph_or, ph_or < 0.5)
ph_CaC = np.logical_and(0.1 < ph_or, ph_or < 0.25)
conv_mm_to_cm = 1e-1
conv_um_to_mm = 1e-3
voxel_size_um = 0.5
voxel_size_cm = voxel_size_um * conv_um_to_mm * conv_mm_to_cm # cm to micron
print(f"Sample size: [{ph_or.shape[0] * voxel_size_um} {ph_or.shape[1] * voxel_size_um}] um")
phase_fractions = (ph_air, ph_FeO, ph_CaO, ph_CaC)
phase_compound_names = ("Air, Dry (near sea level)", "Ferric Oxide", "Calcium Oxide", "Calcium Carbonate")
phantom = physics.VolumeMaterial(phase_fractions, phase_compound_names, voxel_size_cm)
vol_lin_att_in = phantom.get_attenuation(in_energy_keV)
num_vols_out = len(elements) + (detectors_pos_rad is not None)
vol_yield = [np.array([])] * num_vols_out
vol_lin_att_out = [np.array([])] * num_vols_out
for ii, el in enumerate(elements):
if isinstance(em_lines, str):
line = em_lines
else:
line = em_lines[ii]
out_energy_keV, vol_yield[ii] = phantom.get_fluo_yield(el, in_energy_keV, line)
vol_lin_att_out[ii] = phantom.get_attenuation(out_energy_keV)
if detectors_pos_rad:
out_energy_keV, vol_yield[-1] = phantom.get_compton_scattering(in_energy_keV, angle_rad=detectors_pos_rad)
vol_lin_att_out[-1] = phantom.get_attenuation(out_energy_keV)
return (vol_yield, vol_lin_att_in, vol_lin_att_out)
[docs]
def add_noise(
img_clean: NDArray,
num_photons: Union[int, float],
add_poisson: bool = False,
readout_noise_std: Optional[float] = None,
background_avg: Optional[float] = None,
background_std: Optional[float] = None,
detection_efficiency: float = 1.0,
dtype: DTypeLike = np.float32,
) -> tuple[NDArray, NDArray, float]:
"""Add noise to an image (sinogram).
Parameters
----------
img_clean : NDArray
The clean input image.
num_photons : Union[int, float]
Number of photons corresponding to the value 1.0 in the image.
add_poisson : bool, optional
Whether to add Poisson noise, by default False.
readout_noise_std : Optional[float], optional
Standard deviation of the readout noise, by default None.
background_avg : Optional[float], optional
Average value of the background, by default None.
background_std : Optional[float], optional
Standard deviation of the background, by default None.
detection_efficiency : float, optional
Efficiency of the detection (e.g. detector solid angle, inclination, etc), by default 1.0.
dtype : DTypeLike, optional
Data type of the volumes, by default np.float32.
Returns
-------
Tuple[NDArray, NDArray, float]
The noised and clean images (scaled by the photons and efficiency), and the background.
"""
img_clean = num_photons * detection_efficiency * img_clean.copy().astype(dtype)
img_noise = img_clean.copy()
if background_avg is not None:
if background_std is None:
background_std = background_avg * 5e-2
img_noise += (
num_photons * detection_efficiency * np.abs(np.random.normal(background_avg, background_std, img_clean.shape))
)
background = float(np.mean((img_noise - img_clean).flatten()))
if add_poisson:
img_noise = np.random.poisson(img_noise).astype(dtype)
if readout_noise_std is not None:
img_noise += np.random.normal(0, readout_noise_std, img_clean.shape)
return img_noise, img_clean, background
[docs]
def create_sino(
ph: NDArrayFloat,
num_angles: int,
start_angle_deg: float = 0.0,
end_angle_deg: float = 180.0,
dwell_time_s: float = 1.0,
photon_flux: float = 1e9,
detectors_pos_rad: Union[float, Sequence[float], NDArrayFloat] = (np.pi / 2),
vol_att_in: Optional[NDArrayFloat] = None,
vol_att_out: Optional[NDArrayFloat] = None,
psf: Optional[NDArrayFloat] = None,
background_avg: Optional[float] = None,
background_std: Optional[float] = None,
add_poisson: bool = False,
readout_noise_std: Optional[float] = None,
dtype: DTypeLike = np.float32,
) -> tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat, float]:
"""Compute the sinogram from a given phantom.
Parameters
----------
ph : NDArrayFloat
The phantom volume, with the expected average photon production per voxel per impinging photon.
num_angles : int
The number of angles.
start_angle_deg : float, optional
Initial scan angle in degrees. The default is 0.
end_angle_deg : float, optional
Final scan angle in degrees. The default is 180.
dwell_time_s : float, optional
The acquisition time per sinogram point. The default is 1.
photon_flux : float, optional
The impinging photon flux per unit time (second). The default is 1e9.
detectors_pos_rad : float | Sequence[float] | NDArrayFloat, optional
Detector(s) positions in radians, with respect to incoming beam. The default is (np.pi / 2).
vol_att_in : NDArrayFloat, optional
Attenuation volume for the incoming beam. The default is None.
vol_att_out : NDArrayFloat, optional
Attenuation volume for the outgoing beam. The default is None.
psf : NDArrayFloat, optional
Point spread function or probing beam profile. The default is None.
background_avg : float, optional
Background average value. The default is None.
background_std : float, optional
Background standard deviation. The default is None.
add_poisson : bool, optional
Switch to turn on Poisson noise. The default is False.
readout_noise_std : float, optional
Read-out noise standard deviation. The default is None.
dtype : numpy.dtype, optional
Output datatype. The default is np.float32.
Returns
-------
Tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat, float]
The sinogram (detector readings), the angular positions, and the expected average phton production per voxel.
"""
print(f"Creating Sino with {num_angles} angles")
angles_deg = np.linspace(start_angle_deg, end_angle_deg, num_angles, endpoint=False)
angles_rad = np.deg2rad(angles_deg)
print(angles_deg)
num_photons = photon_flux * dwell_time_s
detector_solidangle_sr = (2.4 / 2 / 16 / 2) ** 2
if vol_att_in is None and vol_att_out is None:
with projectors.ProjectorUncorrected(ph.shape, angles_rad) as p:
sino = p.fp(ph)
else:
with projectors.ProjectorAttenuationXRF(
ph.shape, angles_rad, att_in=vol_att_in, att_out=vol_att_out, angles_detectors_rad=detectors_pos_rad, psf=psf
) as p:
sino = p.fp(ph)
# Adding noise
sino_noise, sino, background = add_noise(
sino,
num_photons=num_photons,
add_poisson=add_poisson,
readout_noise_std=readout_noise_std,
background_avg=background_avg,
background_std=background_std,
detection_efficiency=detector_solidangle_sr,
dtype=dtype,
)
return (sino_noise, angles_rad, ph * num_photons * detector_solidangle_sr, background)
[docs]
def create_sino_transmission(
ph: NDArrayFloat,
num_angles: int,
start_angle_deg: float = 0,
end_angle_deg: float = 180,
dwell_time_s: float = 1,
photon_flux: float = 1e9,
psf: Optional[NDArrayFloat] = None,
add_poisson: bool = False,
readout_noise_std: Optional[float] = None,
dtype: DTypeLike = np.float32,
) -> tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat, NDArrayFloat]:
"""Compute the sinogram from a given phantom.
Parameters
----------
ph : NDArrayFloat
The phantom volume, with the expected average photon production per voxel per impinging photon.
num_angles : int
The number of angles.
start_angle_deg : float, optional
Initial scan angle in degrees. The default is 0.
end_angle_deg : float, optional
Final scan angle in degrees. The default is 180.
dwell_time_s : float, optional
The acquisition time per sinogram point. The default is 1.
photon_flux : float, optional
The impinging photon flux per unit time (second). The default is 1e9.
psf : NDArrayFloat, optional
Point spread function or probing beam profile. The default is None.
add_poisson : bool, optional
Switch to turn on Poisson noise. The default is False.
readout_noise_std : float, optional
Read-out noise standard deviation. The default is None.
dtype : numpy.dtype, optional
Output datatype. The default is np.float32.
Returns
-------
Tuple[NDArrayFloat, NDArrayFloat, NDArrayFloat, NDArrayFloat, float]
The sinogram (detector readings), the flat-field, and the angular positions.
"""
print(f"Creating attenuation Sino with {num_angles} angles")
angles_deg = np.linspace(start_angle_deg, end_angle_deg, num_angles, endpoint=False)
angles_rad = np.deg2rad(angles_deg)
print(angles_deg)
num_photons = photon_flux * dwell_time_s
with projectors.ProjectorUncorrected(ph.shape, angles_rad, psf=psf) as p:
sino = np.exp(-p.fp(ph))
# Adding noise
sino_noise, _, _ = add_noise(
sino,
num_photons=num_photons,
add_poisson=add_poisson,
readout_noise_std=readout_noise_std,
dtype=dtype,
)
flat_noise, _, _ = add_noise(
np.ones_like(sino),
num_photons=num_photons,
add_poisson=add_poisson,
readout_noise_std=readout_noise_std,
dtype=dtype,
)
return (sino_noise, flat_noise, angles_rad, ph)
[docs]
def compute_error_power(expected_vol: NDArrayFloat, computed_vol: NDArrayFloat) -> tuple[float, float]:
"""Compute the expected volume signal power, and computed volume error power.
Parameters
----------
expected_vol : NDArrayFloat
The expected volume.
computed_vol : NDArrayFloat
The computed volume.
Returns
-------
Tuple[float, float]
The expected volume signal power, and the computed volume.
"""
vol_power = np.sqrt(np.sum((expected_vol) ** 2) / expected_vol.size)
error_power = np.sqrt(np.sum(np.abs(expected_vol - computed_vol) ** 2) / expected_vol.size)
return vol_power, error_power