Source code for tike.ptycho.probe

"""Functions related to creating and manipulating probe arrays.

Ptychographic probes are represented as two separate components: a shared probe
whose values are the same for all positions and the varying component. The
former is required as it provides the shared probe constraint for ptychography
and the later relaxes the former constraint to accomodate real-world
illuminations which may vary with time.

The shared component consist of a single array representing at least one probe
each of which may have an accompanying varying component.

The varying components are stored sparsely as two arrays, and the full
representation of the varying comonents are only combined as needed. The first
array is an array of eigen probes (principal components) spanning the space of
the probe variation of all positions and the second is an array of weights that
map the variation for each position into this space.

Each probe may have its own set of eigen probes. The unique probe at a given
position is reconstructed by adding the shared probe to a weighted sum of the
eigen probes.

.. code-block:: python

    varying_probe = probe + np.sum(weights * eigen_probes)


Design comments
---------------
In theory, the probe representation could be implemented in as little as two
arrays: one with all of the shared components where the probe becomes the first
eigen probe and and one with the weights. Choosing to keep the eigen probes
separate from the probe as a third array provides backwards compatability and
allows for storing fewer eigen probes in the case when only some probes are
allowed to vary.

"""

import cupy as cp
import numpy as np

import tike.random


[docs]def get_varying_probe(shared_probe, eigen_probe=None, weights=None, m=None): """Construct the varying m-th probes. Parameters ---------- shared_probe : (..., 1, 1, SHARED, WIDE, HIGH) complex64 The shared probes amongst all positions. m : int or list(int) The index of the requested probe. eigen_probe : (..., 1, EIGEN, SHARED, WIDE, HIGH) complex64 The eigen probes for all positions. weights : (..., POSI, EIGEN, SHARED) float32 The relative intensity of the eigen probes at each position. Returns ------- unique_probes : (..., POSI, 1, 1, WIDE, HIGH) """ if m is None: m = list(range(shared_probe.shape[-3])) if type(m) is not list: m = [m] if weights is not None and eigen_probe is not None: return shared_probe[..., :, m, :, :] + np.sum( weights[..., m, None, None] * eigen_probe[..., m, :, :], axis=-4, keepdims=True, ) else: return shared_probe[..., :, m, :, :].copy()
[docs]def update_eigen_probe(comm, R, eigen_probe, weights, patches, diff, β=0.1): """Update eigen probes using residual probe updates. This update is copied from the source code of ptychoshelves. It is similar to, but not the same as, equation (31) described by Odstrcil et al (2018). It is is also different from updates described in Odstrcil et al (2016). However, they all aim to correct for probe variation. Parameters ---------- comm : :py:class:`tike.communicators.Comm` An object which manages communications between both GPUs and nodes. R : (..., POSI, 1, 1, WIDE, HIGH) complex64 Residual probe updates; what's left after subtracting the shared probe update from the varying probe updates for each position patches : (..., POSI, 1, 1, WIDE, HIGH) complex64 diff : (..., POSI, 1, 1, WIDE, HIGH) complex64 eigen_probe : (..., 1, 1, 1, WIDE, HIGH) complex64 The eigen probe being updated. β : float A relaxation constant that controls how quickly the eigen probe modes are updated. Recommended to be < 1 for mini-batch updates. weights : (..., POSI) float32 A vector whose elements are sums of the previous optimal updates for each posiiton. References ---------- M. Odstrcil, P. Baksh, S. A. Boden, R. Card, J. E. Chad, J. G. Frey, W. S. Brocklesby, "Ptychographic coherent diffractive imaging with orthogonal probe relaxation." Opt. Express 24, 8360 (2016). doi: 10.1364/OE.24.008360 Michal Odstrcil, Andreas Menzel, and Manuel Guizar-Sicaros. Iterative least-squares solver for generalized maximum-likelihood ptychography. Optics Express. 2018. """ assert R[0].shape[-3] == R[0].shape[-4] == 1 assert eigen_probe[0].shape[-3] == 1 == eigen_probe[0].shape[-5] assert R[0].shape[:-5] == eigen_probe[0].shape[:-5] == weights[0].shape[:-1] assert weights[0].shape[-1] == R[0].shape[-5] assert R[0].shape[-2:] == eigen_probe[0].shape[-2:] def _get_update(R, eigen_probe, weights): # (..., POSI, 1, 1, 1, 1) to match other arrays weights = weights[..., None, None, None, None] norm_weights = np.linalg.norm(weights[0], axis=-5, keepdims=True)**2 if np.all(norm_weights[0] == 0): raise ValueError('eigen_probe weights cannot all be zero?') # FIXME: What happens when weights is zero!? proj = (np.real(R.conj() * eigen_probe) + weights) / norm_weights return weights, np.mean( R * np.mean(proj, axis=(-2, -1), keepdims=True), axis=-5, keepdims=True, ) weights, update = (list(a) for a in zip(*comm.pool.map( _get_update, R, eigen_probe, weights, ))) update = comm.pool.bcast(comm.pool.reduce_mean( update, axis=-5, )) def _get_d(patches, diff, eigen_probe, update, β): eigen_probe += β * update / np.linalg.norm( update, axis=(-2, -1), keepdims=True, ) assert np.all(np.isfinite(eigen_probe)) eigen_probe /= np.linalg.norm(eigen_probe, axis=(-2, -1), keepdims=True) # Determine new eigen_weights for the updated eigen probe phi = patches * eigen_probe n = np.mean( np.real(diff * phi.conj()), axis=(-1, -2), keepdims=True, ) norm_phi = np.square(np.abs(phi)) d = np.mean(norm_phi, axis=(-1, -2), keepdims=True) d_mean = np.mean(d, axis=-5, keepdims=True) return n, d, d_mean (n, d, d_mean) = (list(a) for a in zip(*comm.pool.map( _get_d, patches, diff, eigen_probe, update, β=β, ))) d_mean = comm.pool.bcast(comm.pool.reduce_mean( d_mean, axis=-5, )) def _get_weights_mean(n, d, d_mean, weights): d += 0.1 * d_mean weight_update = (n / d).reshape(*weights.shape) assert np.all(np.isfinite(weight_update)) # (33) The sum of all previous steps constrained to zero-mean weights += weight_update return np.mean( weights, axis=-5, keepdims=True, ) weights_mean = list(comm.pool.map( _get_weights_mean, n, d, d_mean, weights, )) weights_mean = comm.pool.bcast(comm.pool.reduce_mean( weights_mean, axis=-5, )) def _update_weights(weights, weights_mean): weights -= weights_mean return weights[..., 0, 0, 0, 0] weights = comm.pool.map( _update_weights, weights, weights_mean, ) return eigen_probe, weights
[docs]def add_modes_random_phase(probe, nmodes): """Initialize additional probe modes by phase shifting the first mode. Parameters ---------- probe : (:, :, :, M, :, :) array A probe with M > 0 incoherent modes. nmodes : int The number of desired modes. References ---------- M. Odstrcil, P. Baksh, S. A. Boden, R. Card, J. E. Chad, J. G. Frey, W. S. Brocklesby, "Ptychographic coherent diffractive imaging with orthogonal probe relaxation." Opt. Express 24, 8360 (2016). doi: 10.1364/OE.24.008360 """ all_modes = np.empty((*probe.shape[:-3], nmodes, *probe.shape[-2:]), dtype='complex64') pw = probe.shape[-1] for m in range(nmodes): if m < probe.shape[-3]: # copy existing mode all_modes[..., m, :, :] = probe[..., m, :, :] else: # randomly shift the first mode shift = np.exp(-2j * np.pi * (np.random.rand(2, 1) - 0.5) * ((np.arange(0, pw) + 0.5) / pw - 0.5)) all_modes[..., m, :, :] = (probe[..., 0, :, :] * shift[0][None] * shift[1][:, None]) return all_modes
[docs]def simulate_varying_weights(scan, eigen_probe): """Generate weights for eigen probe that follow random sinusoid. The amplitude of the of weights is 1, the phase shift is (0, 2π], and the period is at most one full scan. """ N = scan.shape[1] x = np.arange(N)[..., :, None, None] period = N * np.random.rand(*eigen_probe.shape[:-2]) phase = 2 * np.pi * np.random.rand(*eigen_probe.shape[:-2]) return np.sin(2 * np.pi / period * x - phase)
[docs]def init_varying_probe(scan, shared_probe, N): """Initialize arrays for N eigen modes.""" eigen_probe = tike.random.numpy_complex( *shared_probe.shape[:-4], N, *shared_probe.shape[-3:], ).astype('complex64') eigen_probe /= np.linalg.norm(eigen_probe, axis=(-2, -1), keepdims=True) weights = 1e-6 * np.random.rand( *scan.shape[:-1], N, shared_probe.shape[-3], ).astype('float32') weights -= np.mean(weights, axis=-3, keepdims=True) return eigen_probe, weights
[docs]def orthogonalize_eig(x): """Orthogonalize modes of x using eigenvectors of the pairwise dot product. Parameters ---------- x : (..., nmodes, :, :) array_like complex64 An array of the probe modes vectorized References ---------- M. Odstrcil, P. Baksh, S. A. Boden, R. Card, J. E. Chad, J. G. Frey, W. S. Brocklesby, "Ptychographic coherent diffractive imaging with orthogonal probe relaxation." Opt. Express 24, 8360 (2016). doi: 10.1364/OE.24.008360 """ nmodes = x.shape[-3] # 'A' holds the dot product of all possible mode pairs. We only fill the # lower half of `A` because it is conjugate-symmetric A = cp.empty((*x.shape[:-3], nmodes, nmodes), dtype='complex64') for i in range(nmodes): for j in range(i + 1): A[..., i, j] = cp.sum(cp.conj(x[..., i, :, :]) * x[..., j, :, :], axis=(-1, -2)) _, vectors = cp.linalg.eigh(A, UPLO='L') # np.linalg.eigh guarantees that the eigen values are returned in ascending # order, so we just reverse the order of modes to have them sorted in # descending order. # TODO: Optimize this double-loop x_new = cp.zeros_like(x) for i in range(nmodes): for j in range(nmodes): # Sort new modes by eigen value in decending order. x_new[..., nmodes - 1 - j, :, :] += vectors[..., i, j, None, None] * x[..., i, :, :] assert x_new.shape == x.shape, [x_new.shape, x.shape] return x_new
[docs]def gaussian(size, rin=0.8, rout=1.0): """Return a complex gaussian probe distribution. Illumination probe represented on a 2D regular grid. A finite-extent circular shaped probe is represented as a complex wave. The intensity of the probe is maximum at the center and damps to zero at the borders of the frame. Parameters ---------- size : int The side length of the distribution rin : float [0, 1) < rout The inner radius of the distribution where the dampening of the intensity will start. rout : float (0, 1] > rin The outer radius of the distribution where the intensity will reach zero. """ r, c = np.mgrid[:size, :size] + 0.5 rs = np.sqrt((r - size / 2)**2 + (c - size / 2)**2) rmax = np.sqrt(2) * 0.5 * rout * rs.max() + 1.0 rmin = np.sqrt(2) * 0.5 * rin * rs.max() img = np.zeros((size, size), dtype='float32') img[rs < rmin] = 1.0 img[rs > rmax] = 0.0 zone = np.logical_and(rs > rmin, rs < rmax) img[zone] = np.divide(rmax - rs[zone], rmax - rmin) return img
if __name__ == "__main__": cp.random.seed(0) x = (cp.random.rand(7, 1, 9, 3, 3) + 1j * cp.random.rand(7, 1, 9, 3, 3)).astype('complex64') x1 = orthogonalize_eig(x) assert x1.shape == x.shape, x1.shape