"""Functions for loading and preprocessing raw data from the disk."""
__author__ = "Tekin Bicer, Daniel Ching"
__copyright__ = "Copyright (c) 2021, UChicago Argonne, LLC."
import warnings
import logging
import typing
import h5py
import numpy as np
import numpy.typing as npt
from tike.constants import wavelength
import tike.precision
logger = logging.getLogger(__name__)
[docs]def position_units_to_pixels(
positions,
detector_distance,
detector_pixel_count,
detector_pixel_width,
photon_energy,
):
"""Convert scanning positions units from meters to pixels coordinates.
Parameters
----------
positions : array[float] [m]
Coordinates of the position of the beam when frames were collected.
detector_distance : float [m]
The propagation distance of the beam from the sample to the detector.
detector_pixel_count : int
The number of pixels across one edge of the detector. Assumes a square
detector.
detector_pixel_width : float [m]
The width of one detector pixel. Assumes square pixels.
photon_energy : float [eV]
The energy of the incident beam.
Returns
-------
positions : float [pixels]
The scanning positions in pixel coordinates.
"""
pixel_per_meter = (
(detector_pixel_width * detector_pixel_count) /
(detector_distance * wavelength(photon_energy / 1000) / 100))
logger.info(
f"For a detector of {detector_pixel_count:d} pixels"
f" each {detector_pixel_width:.3e} m wide"
f" with propagation distance {detector_distance:.3e} m"
f" and photon energy {photon_energy:.3e} eV;"
f" the reconstruction pixel size will be {1 / pixel_per_meter:.3e} m.")
return positions * pixel_per_meter
[docs]def read_aps_velociprobe(
diffraction_path,
position_path,
xy_columns: typing.Tuple[int, int] = (5, 1),
trigger_column: int = 7,
max_crop: int = 2048,
binned_pix: int = 1,
) -> typing.Tuple[npt.NDArray, npt.NDArray]:
"""Load ptychography data from the Advanced Photon Source Velociprobe.
Expects one HDF5 file and one CSV file with the following organization::
diffraction_path:
/entry
/data
/data_000000:int[FRAME, WIDE, HIGH] {unit: counts}
/data_000001:int[FRAME, WIDE, HIGH] {unit: counts}
...
/instrument
/detector
/beam_center_x:float {unit: pixel}
/beam_center_y:float {unit: pixel}
/detectorSpecific
/photon_energy:float {unit: eV}
/x_pixels_in_detector:int
/y_pixels_in_detector:int
/detector_distance:float {unit: m}
/x_pixel_size:float {unit: m}
/sample
/goniometer
/chi:float[] {unit: degree }
Where FRAME is the number of detector frames recorded and WIDE/HIGH is the
width and height. The number of data_000000 links may be more than the
actual number of files because of some problem where the master file is
created before the linked files are created.
We use lz4 to compress the data. In order to open these compressed
datasets, you have to install the lz4 filter plugins
(https://github.com/nexusformat/HDF5-External-Filter-Plugins).
The CSV position raw data file is a 8 column file with columns
corresponding to the following parameters: samz, samx, samy, zpx, zpy,
encoder y, encoder x, trigger number. Because the horizontal stage is on
top of the rotation, stage, we must use the rotation stage position to
correct the horizontal scanning positions. By default we use samx, encoder
y for the horizontal and vertical positions.
Parameters
----------
diffraction_path : string
The absolute path to the HDF5 file containing diffraction patterns and
other metadata.
position_path : string OR List[string]
The absolute path to the CSV file(s) containing position information.
xy_columns : 2-tuple of int
The columns in the 8 column raw position file to use for x,y positions
trigger_column : int
The column in the 8 column raw position file to use for grouping
positions together.
max_crop : int
Crop the diffraction pattern to at most this width.
binned_pix : int
The number of pixels binned together along each dimension
Returns
-------
data : (FRAME, WIDE, HIGH)
Diffraction patterns; cropped square and peak FFT shifted to corner.
scan : (POSI, 2) float32
Scan positions; rescaled to pixel coordinates but uncentered.
"""
with h5py.File(diffraction_path, 'r') as f:
photon_energy = f['/entry/instrument/detector'
'/detectorSpecific/photon_energy'][()] # eV
detect_width = int(f['/entry/instrument/detector'
'/detectorSpecific/x_pixels_in_detector'][()])
detect_height = int(f['/entry/instrument/detector'
'/detectorSpecific/y_pixels_in_detector'][()])
detector_dist = f['/entry/instrument/detector'
'/detector_distance'][()] # meter
det_pix_width = f['/entry/instrument/detector'
'/x_pixel_size'][()] # meter
beam_center_x = int(f['/entry/instrument/detector/beam_center_x'][()])
beam_center_y = int(f['/entry/instrument/detector/beam_center_y'][()])
chi = float(f['entry/sample/goniometer/chi'][0])
logger.info('Loading 2-ID-D ptychography data:\n'
f'\tstage rotation {chi} degrees\n'
f'\tphoton energy {photon_energy} eV\n'
f'\twidth: {detect_width}, center: {beam_center_x}\n'
f'\theight: {detect_height}, center: {beam_center_y}')
# Autodetect the diffraction pattern size by doubling until it
# doesn't fit on the detector anymore.
max_radius = max_crop // 2
radius = 2
while (
radius <= max_radius
and beam_center_x + radius < detect_width
and beam_center_y + radius < detect_height
and beam_center_x - radius >= 0 and beam_center_y - radius >= 0
): # yapf:disable
radius *= 2
radius = radius // 2
logger.info(f'Autodetected diffraction size is {2* radius}.')
binned_width = (2 * radius) // binned_pix
if binned_width * binned_pix != 2 * radius:
msg = (f"Invalid pixel binning provided! {2 * radius} cannot be "
f"evenly collected into bins of {binned_pix}.")
raise ValueError(msg)
logger.info(f'Post-binning diffraction size is {binned_width}.')
data = []
def crop_and_shift(x):
# yapf: disable
cropped = x[
...,
beam_center_y - radius:beam_center_y + radius,
beam_center_x - radius:beam_center_x + radius,
]
binned = np.sum(
np.reshape(
cropped,
(-1, binned_width, binned_pix, binned_width, binned_pix),
),
axis=(-3, -1),
keepdims=False,
dtype=x.dtype,
)
data.append(
np.fft.ifftshift(
binned,
axes=(-2, -1),
))
# yapf: enable
for x in f['/entry/data']:
try:
crop_and_shift(f[f'/entry/data/{x}'])
except KeyError:
# Catches links to non-files.
# TODO: Should be able to predict the number of datasets.
# However, let's just catch exception for now.
break
except OSError as error:
warnings.warn(
"The HDF5 compression plugin is probably missing. "
"See the conda-forge hdf5-external-filter-plugins package.")
raise error
data = np.concatenate(data, axis=0)
# Load data from six column file
if isinstance(position_path, list):
raw_position = [
np.genfromtxt(
p,
usecols=(*xy_columns, trigger_column),
delimiter=',',
dtype=tike.precision.integer,
) for p in position_path
]
raw_position = np.concatenate(raw_position, axis=0)
else:
raw_position = np.genfromtxt(
position_path,
usecols=(*xy_columns, trigger_column),
delimiter=',',
dtype=tike.precision.integer,
)
# Split positions where trigger number increases by 1. Assumes that
# positions are ordered by trigger number in file. Shift indices by 1
# because of how np.diff is defined.
sections = np.nonzero(np.diff(raw_position[:, -1]))[0] + 1
groups = np.split(
raw_position[:, :-1],
indices_or_sections=sections,
axis=0,
)
# Apply a reduction function to handle multiple positions per trigger
def position_reduce(g):
"""Average of the first and last position in each trigger group."""
# return np.mean(g, axis=0, keepdims=True)
return (g[:1] + g[-1:]) / 2
groups = list(map(position_reduce, groups))
scan = np.concatenate(groups, axis=0)
# Rescale according to geometry of velociprobe
scan[:, 0] *= -1e-9
scan -= np.mean(scan, axis=0, keepdims=True)
scan[:, 1] *= 1e-9 * np.cos(chi / 180 * np.pi)
logger.info(f'Loaded {len(scan)} scan positions.')
if len(data) != len(scan):
warnings.warn(
f"The number of positions {scan.shape} and frames {data.shape}"
" is not equal. One of the two will be truncated.")
num_frame = min(len(data), len(scan))
scan = scan[:num_frame, ...]
data = data[:num_frame, ...]
scan = position_units_to_pixels(
scan,
detector_dist,
data.shape[-1],
det_pix_width * binned_pix,
photon_energy,
)
if np.any(np.logical_not(np.isfinite(data))):
warnings.warn("Some values in the diffraction data are not finite. "
"Photon counts must be >= 0 and finite.")
if np.any(data < 0):
warnings.warn("Some values in the diffraction data are negative. "
"Photon counts must be >= 0 and finite.")
return data, scan.astype(tike.precision.floating)
[docs]def read_aps_lynx(
diffraction_path,
position_path,
photon_energy,
beam_center_x,
beam_center_y,
detector_dist,
xy_columns: typing.Tuple[int, int] = (6, 3),
trigger_column: int = 0,
max_crop: int = 2048,
gap_value: int = 2**12 - 1,
binned_pix: int = 1,
) -> typing.Tuple[npt.NDArray, npt.NDArray]:
"""Load ptychography data from Advanced Photon Source LYNX.
Expects one h5 file and one dat file with the following organization::
diffraction_path:
entry:NXentry
@NX_class = NXentry
data:NXdata
@NX_class = NXdata
eiger_4:NX_UINT16[12320,1030,1614] =
@Count_cutoff = 199998
@Exposure_period = 0.052000000000000005
@Exposure_time = 0.005
@Pixel_size = [7.5e-05]
DAT is a space-separated text file with two rows of headers with positions
in nanometers.
Parameters
----------
diffraction_path : string
The absolute path to the HDF5 file containing diffraction patterns.
position_path : string
The absolute path to the CSV file containing position information.
xy_columns : 2-tuple of int
The columns in the 8 column raw position file to use for x,y positions
trigger_column : int
The column in the 8 column raw position file to use for grouping
positions together.
max_crop : int
Crop the diffraction pattern to at most this width.
gap_value : int
The value used to encode detector gaps
binned_pix : int
The number of pixels binned together along each dimension
Returns
-------
data : (FRAME, WIDE, HIGH)
Diffraction patterns; cropped square and peak FFT shifted to corner.
scan : (POSI, 2) float32
Scan positions; rescaled to pixel coordinates but uncentered.
"""
with h5py.File(diffraction_path, 'r') as f:
det_pix_width = f['/entry/data/eiger_4'].attrs['Pixel_size'].item(
) # meter
_, detect_height, detect_width = f['/entry/data/eiger_4'].shape
logger.info('Loading 28-ID-C ptychography data:\n'
f'\tphoton energy {photon_energy} eV\n'
f'\twidth: {detect_width}, center: {beam_center_x}\n'
f'\theight: {detect_height}, center: {beam_center_y}\n'
f'\tdetector pixel width: {det_pix_width} m\n')
# Autodetect the diffraction pattern size by doubling until it
# doesn't fit on the detector anymore.
max_radius = max_crop // 2
radius = 2
while (
radius <= max_radius
and beam_center_x + radius < detect_width
and beam_center_y + radius < detect_height
and beam_center_x - radius >= 0 and beam_center_y - radius >= 0
): # yapf:disable
radius *= 2
radius = radius // 2
logger.info(f'Autodetected diffraction size is {2 * radius}.')
binned_width = (2 * radius) // binned_pix
if binned_width * binned_pix != 2 * radius:
msg = (f"Invalid pixel binning provided! {2 * radius} cannot be "
f"evenly collected into bins of {binned_pix}.")
raise ValueError(msg)
logger.info(f'Post-binning diffraction size is {binned_width}.')
data = []
def crop_and_shift(x):
# yapf: disable
cropped = x[
...,
beam_center_y - radius:beam_center_y + radius,
beam_center_x - radius:beam_center_x + radius,
]
# Set between panel values to zero
cropped[cropped == gap_value] = 0
binned = np.sum(
np.reshape(
cropped,
(-1, binned_width, binned_pix, binned_width, binned_pix),
),
axis=(-3, -1),
keepdims=False,
dtype=x.dtype,
)
data.append(
np.fft.ifftshift(
binned,
axes=(-2, -1),
))
# yapf: enable
try:
crop_and_shift(f['/entry/data/eiger_4'])
except OSError as error:
warnings.warn(
"The HDF5 compression plugin is probably missing. "
"See the conda-forge hdf5-external-filter-plugins package.")
raise error
data = np.concatenate(data, axis=0)
raw_position = np.genfromtxt(
position_path,
usecols=(*xy_columns, trigger_column),
delimiter=' ',
dtype=tike.precision.floating,
skip_header=2,
)
scan = raw_position[:, :2] * -1e-6
logger.info(f'Loaded {len(scan)} scan positions.')
if len(data) != len(scan):
warnings.warn(
f"The number of positions {scan.shape} and frames {data.shape}"
" is not equal. One of the two will be truncated.")
num_frame = min(len(data), len(scan))
scan = scan[:num_frame, ...]
data = data[:num_frame, ...]
scan = position_units_to_pixels(
scan,
detector_dist,
data.shape[-1],
det_pix_width * binned_pix,
photon_energy,
)
if np.any(np.logical_not(np.isfinite(data))):
warnings.warn("Some values in the diffraction data are not finite. "
"Photon counts must be >= 0 and finite.")
if np.any(data < 0):
warnings.warn("Some values in the diffraction data are negative. "
"Photon counts must be >= 0 and finite.")
return data, scan.astype(tike.precision.floating)