Source code for mpol.gridding

from __future__ import annotations

import warnings
from collections.abc import Callable, Sequence
from typing import Any

import numpy as np
import numpy.typing as npt
import torch
from fast_histogram import histogram as fast_hist

from mpol import utils
from mpol.coordinates import GridCoords
from mpol.datasets import GriddedDataset
from mpol.exceptions import DataError, ThresholdExceededError, WrongDimensionError


def _check_data_inputs_2d(
    uu: npt.NDArray[np.floating[Any]],
    vv: npt.NDArray[np.floating[Any]],
    weight: npt.NDArray[np.floating[Any]],
    data_re: npt.NDArray[np.floating[Any]],
    data_im: npt.NDArray[np.floating[Any]],
) -> tuple[np.ndarray, ...]:
    """
    This validation function is called by :class:`mpol.gridding.DataAverager` and
    :class:`mpol.gridding.DirtyImager`. We have decided to allow these routines to
    accept either np.ndarrays *or* torch.Tensors, even though they use np.ndarrays
    internally.

    So within this routine we will cast to np.ndarrays just in case.

    In addition, this routine checks
    * that all data inputs are the same shape
    * the weights are positive,
    * and the data_re and data_im are floats.

    Make a reasonable effort to ensure that Hermitian pairs are *not* included.

    If the user supplied 1d vectors of shape ``(nvis,)``, make them all 2d with one
    channel, ``(1,nvis)``.

    Returns
    -------
    tuple
        the ``uu``, `vv`, ``weight``, ``data_re`` and ``data_im`` products.
    """

    if not 1 <= uu.ndim <= 2:
        raise WrongDimensionError(
            "Input data vectors should be either 1D or 2D numpy arrays."
        )

    if not all(array.shape == uu.shape for array in [vv, weight, data_re, data_im]):
        raise WrongDimensionError(
            "All dataset inputs must be the same input shape and size."
        )

    if np.any(weight <= 0.0):
        raise ValueError("Not all thermal weights are positive, check inputs.")

    if data_re.dtype not in (np.single, np.double):
        raise TypeError("data_re should be of type np.single or np.double.")

    if data_im.dtype not in (np.single, np.double):
        raise TypeError("data_im should be of type np.single or np.double.")

    if uu.ndim == 1:
        uu = np.atleast_2d(uu)
        vv = np.atleast_2d(vv)
        weight = np.atleast_2d(weight)
        data_re = np.atleast_2d(data_re)
        data_im = np.atleast_2d(data_im)

    # check to see that uu, vv and data do not contain Hermitian pairs
    verify_no_hermitian_pairs(uu, vv, data_re + 1.0j * data_im)

    return uu, vv, weight, data_re, data_im


[docs] def verify_no_hermitian_pairs( uu: npt.NDArray[np.floating[Any]], vv: npt.NDArray[np.floating[Any]], data: npt.NDArray[np.complexfloating[Any, Any]], test_vis: int = 5, test_channel: int = 0, ) -> bool: r""" Check that the dataset does not contain Hermitian pairs. Because the sky brightness :math:`I_\nu` is real, the visibility function :math:`\mathcal{V}` is Hermitian, meaning that .. math:: \mathcal{V}(u, v) = \mathcal{V}^*(-u, -v). Most datasets (e.g., those extracted from CASA) will only record one visibility measurement per baseline and not include the duplicate Hermitian pair (to save storage space). This routine attempts to determine if the dataset contains Hermitian pairs or not by choosing one data point at a time and then searching the dataset to see if its Hermitian pair exists. The routine will declare that a dataset contains Hermitian pairs or not after it has searched ``test_vis`` number of data points. If 0 Hermitian pairs have been found for all ``test_vis`` points, then the dataset will be declared to have no Hermitian pairs. If ``test_vis`` Hermitian pairs were found for ``test_vis`` points searched, then the dataset will be declared to have Hermitian pairs. If more than 0 but fewer than ``test_vis`` Hermitian pairs were found for ``test_vis`` points, an error will be raised. Gridding objects like :class:`mpol.gridding.DirtyImager` will naturally augment an input dataset to include the Hermitian pairs, so that images of :math:`I_\nu` produced with the inverse Fourier transform turn out to be real. Args: uu (numpy array): array of u spatial frequency coordinates. Units of [:math:`\lambda`] vv (numpy array): array of v spatial frequency coordinates. Units of [:math:`\lambda`] data (numpy complex): array of data values test_vis (int): the number of data points to search for Hermitian 'matches' test_channel (int): the index of the channel to perform the check Returns: boolean : ``True`` if dataset does contain Hermitian pairs, ``False`` if not. """ # make sure everything is in (nchan, nvis) format, to make our lives easier if uu.ndim == 1: uu = np.atleast_2d(uu) vv = np.atleast_2d(vv) data = np.atleast_2d(data) # but only test a single-channel uu = uu[test_channel] vv = vv[test_channel] data = data[test_channel] # if the dataset contains Hermitian pairs, # then there will be a large number of visibilities that have matching # (uu, vv) and conjugate data values # We don't know what order uu or vv might have been augmented in, or sorted # after the fact, so we can't # rely on quick differencing operations num_pairs = 0 # make uv array same shape as data (nvis, 2) uu_vv = np.array([uu, vv]).T # (nvis, 2) for i in range(test_vis): # we will approach this as a sort operation. # choose a u,v point uv_point = uu_vv[i] # see if its conjugate exists in the full array # nonzero returns a tuple of (2, found_vis) # we only need the first dimension, not the u_v dimension ind = np.nonzero(uu_vv == -uv_point)[0] # if we found nothing, move on to checking the next point if ind.size == 0: continue # if we found something, then take the first result ind = ind[0] # test to see whether the data is a conjugate if data[i] == np.conj(data[ind]): num_pairs += 1 if num_pairs == 0: return True if num_pairs == test_vis: raise DataError( "Hermitian pairs were found in the data. Please provide data without" " Hermitian pairs." ) return False else: raise DataError( f"{num_pairs} Hermitian pairs were found out of {test_vis} visibilities" " tested, dataset is inconsistent." ) return False
[docs] class GridderBase: r""" This class is not designed to be used directly, but rather to be subclassed. Subclasses will need to implement a `_grid_visibilities(self,...)` method. The GridderBase object uses desired image dimensions (via the ``cell_size`` and ``npix`` arguments) to define a corresponding Fourier plane grid as a :class:`.GridCoords` object. A pre-computed :class:`.GridCoords` can be supplied in lieu of ``cell_size`` and ``npix``, but all three arguments should never be supplied at once. For more details on the properties of the grid that is created, see the :class:`.GridCoords` documentation. Subclasses will accept "loose" *ungridded* visibility data and store the arrays to the object as instance attributes. The input visibility data should be the set of visibilities over the full :math:`[-u,u]` and :math:`[-v,v]` domain, and should not contain Hermitian pairs (an error will be raised, if they are encountered). The visibilities can be 1d for a single continuum channel, or 2d for an image cube. If 1d, visibilities will be converted to 2d arrays of shape ``(1, nvis)``. Like the :class:`~mpol.images.ImageCube` class, after construction, GridderBase assumes that you are operating with a multi-channel set of visibilities. These routines will still work with single-channel 'continuum' visibilities, they will just have `nchan = 1` in the first dimension of most products. Args: coords (GridCoords): an object already instantiated from the GridCoords class. If providing this, cannot provide ``cell_size`` or ``npix``. uu (numpy array): (nchan, nvis) array of u spatial frequency coordinates. Units of [:math:`\lambda`] vv (numpy array): (nchan, nvis) array of v spatial frequency coordinates. Units of [:math:`\lambda`] weight (2d numpy array): (nchan, nvis) array of thermal weights. Units of [:math:`1/\mathrm{Jy}^2`] data_re (2d numpy array): (nchan, nvis) array of the real part of the visibility measurements. Units of [:math:`\mathrm{Jy}`] data_im (2d numpy array): (nchan, nvis) array of the imaginary part of the visibility measurements. Units of [:math:`\mathrm{Jy}`] """ def __init__( self, coords=GridCoords, uu=npt.NDArray[np.floating[Any]], vv=npt.NDArray[np.floating[Any]], weight=npt.NDArray[np.floating[Any]], data_re=npt.NDArray[np.floating[Any]], data_im=npt.NDArray[np.floating[Any]], ) -> None: # check everything should be 2d, expand if not # also checks data does not contain Hermitian pairs uu, vv, weight, data_re, data_im = _check_data_inputs_2d( uu, vv, weight, data_re, data_im ) # setup the coordinates object self.coords = coords self.nchan = len(uu) # make sure we still fit into the grid self.coords.check_data_fit(uu, vv) # classes that inherit this will need to set data attributes # deciding on whether to include Hermitian pairs self.uu = uu self.vv = vv self.weight = weight self.data_re = data_re self.data_im = data_im # and register cell indices against data self._create_cell_indices() def _create_cell_indices(self) -> None: # figure out which visibility cell each datapoint lands in, so that # we can later assign it the appropriate robust weight for that cell # do this by calculating the nearest cell index [0, N] for all samples self.index_u = np.array( [np.digitize(u_chan, self.coords.u_edges) - 1 for u_chan in self.uu] ) self.index_v = np.array( [np.digitize(v_chan, self.coords.v_edges) - 1 for v_chan in self.vv] ) def _sum_cell_values_channel( self, uu: npt.NDArray[np.floating[Any]], vv: npt.NDArray[np.floating[Any]], values: npt.NDArray[np.floating[Any]] | None = None, ) -> npt.NDArray[np.floating[Any]]: r""" Given a list of loose visibility points :math:`(u,v)` and their corresponding values :math:`x`, partition the points up into 2D :math:`u-v` cells defined by the ``coords`` object attached to the gridder, such that ``cell[i,j]`` has bounds between ``coords.u_edges[j, j+1]`` and ``coords.v_edges[i, i+1]``. Then, sum the corresponding values for each :math:`(u,v)` point that falls within each cell. The resulting cell value is .. math:: \mathrm{result}_{i,j} = \sum_k \mathrm{values}_k where :math:`k` indexes all :math:`(u,v)` points that fall within ``coords.u_edges[j, j+1]`` and ``coords.v_edges[i, i+1]``. In the case that all values are :math:`1`, the result is the number of visibilities within each cell (i.e., a histogram). Args: uu (np.array): 1D array of East-West spatial frequency coordinates for a specific channel. Units of [:math:`\lambda`] vv (np.array): 1D array of North-South spatial frequency coordinates for a specific channel. Units of [:math:`\lambda`] values (np.array): 1D array of values (the same length as uu and vv) to use in the sum over each cell. The default (``values=None``) corresponds to using ``values=np.ones_like(uu)`` such that the routine is equivalent to a histogram. Returns: A 2D array of size ``(npix, npix)`` in ground format containing the summed cell quantities. """ result: npt.NDArray[np.floating[Any]] = fast_hist.histogram2d( vv, uu, bins=self.coords.npix_u, range=[ [self.coords.v_bin_min, self.coords.v_bin_max], [self.coords.u_bin_min, self.coords.u_bin_max], ], weights=values, ) # only return the "H" value return result def _sum_cell_values_cube( self, values: npt.NDArray[np.floating[Any]] | Sequence[None] | None = None ) -> npt.NDArray[np.floating[Any]]: r""" Perform the :func:`~mpol.gridding.DataAverager.sum_cell_values_channel` routine over all channels of the input visibilities. Args: values (iterable): ``(nchan, nvis)`` array of values to use in the sum over each cell. The default (``values=None``) corresponds to using ``values=np.ones_like(uu)`` such that the routine is equivalent to a histogram. Returns: A 3D array of size ``(nchan, npix, npix)`` in ground format containing the summed cell quantities. """ # calculate the histogrammed result for all channels cube = np.empty( (self.nchan, self.coords.npix_v, self.coords.npix_u), dtype="float", ) if values is None: # pass None to every channel values = [None] * self.nchan for i in range(self.nchan): cube[i] = self._sum_cell_values_channel(self.uu[i], self.vv[i], values[i]) return cube def _extract_gridded_values_to_loose( self, gridded_quantity: npt.NDArray[np.floating[Any]] ) -> npt.NDArray[np.floating[Any]]: r""" Extract the gridded cell quantity corresponding to each of the loose visibilities. Args: A 3D array of size ``(nchan, npix, npix)`` in ground format containing the gridded cell quantities. Returns: A ``(nchan, nvis)`` array of values corresponding to the loose visibilities, using the quantity in that cell. """ return np.array( [ gridded_quantity[i][self.index_v[i], self.index_u[i]] for i in range(self.nchan) ] ) def _grid_visibilities(self) -> None: r""" Average the loose data visibilities to the Fourier grid. """ # create the cells as edges around the existing points # note that at this stage, the UV grid is strictly increasing # when in fact, later on, we'll need to fftshift for the FFT cell_weight = self._sum_cell_values_cube(self.weight) # boolean index for cells that *contain* visibilities mask = cell_weight > 0.0 # calculate the density weights under "uniform" # the density weights have the same shape as the re, im samples. # cell_weight is (nchan, npix_v, npix_u) # self.index_v, self.index_u are (nchan, nvis) # we want density_weights to be (nchan, nvis) density_weight = 1 / self._extract_gridded_values_to_loose(cell_weight) # grid the reals and imaginaries separately # outputs from _sum_cell_values_cube are *not* pre-packed data_re_gridded = self._sum_cell_values_cube( self.data_re * density_weight * self.weight ) data_im_gridded = self._sum_cell_values_cube( self.data_im * density_weight * self.weight ) # store the pre-packed FFT products for access by outside routines self.mask = np.fft.fftshift(mask, axes=(1, 2)) self.data_re_gridded = np.fft.fftshift(data_re_gridded, axes=(1, 2)) self.data_im_gridded = np.fft.fftshift(data_im_gridded, axes=(1, 2)) self.vis_gridded = self.data_re_gridded + self.data_im_gridded * 1.0j def _estimate_cell_standard_deviation( self, ) -> tuple[npt.NDArray[np.floating[Any]], npt.NDArray[np.floating[Any]]]: r""" Estimate the `standard deviation <https://en.wikipedia.org/wiki/Standard_deviation>`__ of the real and imaginary visibility values within each :math:`u,v` cell (:math:`\mathrm{cell}_{i,j}`) defined by ``self.coords`` using the following steps. 1. Calculate the mean real :math:`\mu_\Re` and imaginary :math:`\mu_\Im` values within each cell using a weighted mean, assuming that the visibility function is constant across the cell. 2. For each visibility :math:`k` that falls within the cell, calculate the real and imaginary residuals (:math:`r_\Re` and :math:`r_\Im`) in units of :math:`\sigma_k`, where :math:`\sigma_k = \sqrt{1/w_k}` and :math:`w_k` is the weight of that visibility. 3. Calculate the standard deviation :math:`s_{i,j}` of the residual distributions within each cell .. math:: s_{i,j} = \sqrt{\frac{1}{N} \sum_k \left (\sigma_k - \bar{\sigma}_{i,j} \right )^2} where :math:`\bar{\sigma}_{i,j}` is first estimated as .. math:: \bar{\sigma}_{i,j} = \frac{1}{N} \sum_k \sigma_k Returns: std_real, std_imag: two 3D arrays of size ``(nchan, npix, npix)`` in ground format containing the standard deviation of the real and imaginary values within each cell, in units of :math:`\sigma`. If everything is correctly calibrated, we expect :math:`s_{i,j} \approx 1 \forall i,j`. """ # 1. use the gridding routine to calculate the mean real and imaginary # values on the grid self._grid_visibilities() # convert grid back to ground format mu_re_gridded = np.fft.fftshift(self.data_re_gridded, axes=(1, 2)) mu_im_gridded = np.fft.fftshift(self.data_im_gridded, axes=(1, 2)) # extract the real and imaginary values corresponding to the # "loose" visibilities # mu_re_gridded and mu_im_gridded are arrays with # shape (nchan, npix_v, npix_u) # self.index_v, self.index_u are (nchan, nvis) # we want mu_re and mu_im to be (nchan, nvis) mu_re = self._extract_gridded_values_to_loose(mu_re_gridded) mu_im = self._extract_gridded_values_to_loose(mu_im_gridded) # 2. calculate the real and imaginary residuals for the loose visibilities # 1/sigma = np.sqrt(weight) residual_re = (self.data_re - mu_re) * np.sqrt(self.weight) residual_im = (self.data_im - mu_im) * np.sqrt(self.weight) # 3. calculate the standard deviation of the residual visibilities # 3.1 first calculate the mean residuals # calculate the number of visibilities with each cell nvis_cell_grid = self._sum_cell_values_cube() # extract this out as a quantity for each visibility nvis_cell_loose = self._extract_gridded_values_to_loose(nvis_cell_grid) # calculate the mean residuals # sum residual values bar_sigma_re = self._sum_cell_values_cube(residual_re / nvis_cell_loose) bar_sigma_im = self._sum_cell_values_cube(residual_im / nvis_cell_loose) # extract back to loose bar_sigma_re_loose = self._extract_gridded_values_to_loose(bar_sigma_re) bar_sigma_im_loose = self._extract_gridded_values_to_loose(bar_sigma_im) # 3.2 calculate the standard deviation of the residuals s_re = np.sqrt( self._sum_cell_values_cube( (residual_re - bar_sigma_re_loose) ** 2 / nvis_cell_loose ) ) s_im = np.sqrt( self._sum_cell_values_cube( (residual_im - bar_sigma_im_loose) ** 2 / nvis_cell_loose ) ) return s_re, s_im def _check_scatter_error( self, max_scatter: float = 1.2 ) -> dict[str, bool | np.floating[Any]]: """ Checks/compares visibility scatter to a given threshold value ``max_scatter`` and raises an AssertionError if the median scatter across all cells exceeds ``max_scatter``. Args: max_scatter (float): the maximum permissible scatter in units of standard deviation. Returns: a dictionary containing keys ``return_status``, ``median_re``, and ``median_im``. ``return_status`` is a boolean that is ``False`` if scatter is within acceptable limits of max_scatter (good), and is ``True`` if scatter exceeds acceptable limits. ``median_re`` and ``median_im`` are the median scatter values returned across all cells, in units of standard deviation (estimated from the provided weights). """ s_re, s_im = self._estimate_cell_standard_deviation() median_re = np.median(s_re[s_re > 0]) median_im = np.median(s_im[s_im > 0]) return_status = (median_re > max_scatter) or (median_im > max_scatter) return { "return_status": return_status, "median_re": median_re, "median_im": median_im, } def _fliplr_cube(self, cube): return cube[:, :, ::-1] @property def ground_cube(self) -> npt.NDArray[np.floating[Any]]: r""" The visibility FFT cube fftshifted for plotting with ``imshow``. Returns: (torch.complex tensor, of shape ``(nchan, npix, npix)``): the FFT of the image cube, in sky plane format. """ return np.fft.fftshift(self.vis_gridded, axes=(1, 2))
[docs] class DataAverager(GridderBase): r""" The DataAverager object uses desired image dimensions (via the ``cell_size`` and ``npix`` arguments) to define a corresponding Fourier plane grid as a :class:`.GridCoords` object. A pre-computed :class:`.GridCoords` can be supplied in lieu of ``cell_size`` and ``npix``, but all three arguments should never be supplied at once. For more details on the properties of the grid that is created, see the :class:`.GridCoords` documentation. The :class:`.DataAverager` object accepts "loose" *ungridded* visibility data and stores the arrays to the object as instance attributes. The input visibility data should be the set of visibilities over the full :math:`[-u,u]` and :math:`[-v,v]` domain, and should not contain Hermitian pairs (an error will be raised, if they are encountered). (Note that, unlike :class:`~mpol.gridding.DirtyImager`, this class *will not* augment the dataset to include Hermitian pairs. This is by design, since Hermitian pairs should not be used in likelihood calculations). The input visibilities can be 1d for a single continuum channel, or 2d for image cube. If 1d, visibilities will be converted to 2d arrays of shape ``(1, nvis)``. Like the :class:`~mpol.images.ImageCube` class, after construction, the DataAverager assumes that you are operating with a multi-channel set of visibilities. These routines will still work with single-channel 'continuum' visibilities, they will just have nchan = 1 in the first dimension of most products. Once the DataAverager object is initialized with loose visibilities, you can average them and export them for use in Regularized Maximum Likelihood imaging with the :func:`mpol.gridding.DataAverager.to_pytorch_dataset` routine. Example:: averager = gridding.DataAverager( coords=coords, uu=uu, vv=vv, weight=weight, data_re=data_re, data_im=data_im, ) dset = averager.to_pytorch_dataset() Args: coords (GridCoords): an object already instantiated from the GridCoords class. If providing this, cannot provide ``cell_size`` or ``npix``. uu (numpy array): (nchan, nvis) array of u spatial frequency coordinates. Units of [:math:`\lambda`] vv (numpy array): (nchan, nvis) array of v spatial frequency coordinates. Units of [:math:`\lambda`] weight (2d numpy array): (nchan, nvis) array of thermal weights. Units of [:math:`1/\mathrm{Jy}^2`] data_re (2d numpy array): (nchan, nvis) array of the real part of the visibility measurements. Units of [:math:`\mathrm{Jy}`] data_im (2d numpy array): (nchan, nvis) array of the imaginary part of the visibility measurements. Units of [:math:`\mathrm{Jy}`] """ def _grid_weights(self) -> None: r""" Average the visibility weights to the Fourier grid contained in ``self.coords``, such that the ``self.weight_gridded`` corresponds to the equivalent weight on the averaged visibilities within that cell. """ # create the cells as edges around the existing points # note that at this stage, the UV grid is strictly increasing # when in fact, later on, we'll need to fftshift for the FFT cell_weight = self._sum_cell_values_cube(self.weight) # instantiate uncertainties for each averaged visibility. self.weight_gridded = np.fft.fftshift(cell_weight, axes=(1, 2))
[docs] def to_pytorch_dataset( self, check_visibility_scatter: bool = True, max_scatter: float = 1.2 ) -> GriddedDataset: r""" Export gridded visibilities to a PyTorch :class:`mpol.datasets.GriddedDataset` object. Args: check_visibility_scatter (bool): whether the routine should check the standard deviation of visibilities in each within each :math:`u,v` cell (:math:`\mathrm{cell}_{i,j}`) defined by ``self.coords``. Default is ``True``. A ``RuntimeError`` will be raised if any cell has a scatter larger than ``max_scatter``. max_scatter (float): the maximum allowable standard deviation of visibility values in a given :math:`u,v` cell (:math:`\mathrm{cell}_{i,j}`) defined by ``self.coords``. Defaults to a factor of 120%. Returns: :class:`~mpol.datasets.GriddedDataset` with gridded visibilities. """ # check the visibility scatter and flag user if there are issues if check_visibility_scatter: d = self._check_scatter_error(max_scatter) if d["return_status"]: raise RuntimeError( "Visibility scatter exceeds ``max_scatter``:{:}, indicating a" " potential problem with data weights. Consider inspecting weights" " using CASA tools before exporting visibilities for use with MPoL." " Median real scatter: {:} x sigma. Median imag scatter: {:} x" " sigma.".format(max_scatter, d["median_re"], d["median_im"]) ) # grid visibilities (uniform weighting necessary here) and weights self._grid_visibilities() self._grid_weights() return GriddedDataset( coords=self.coords, nchan=self.nchan, vis_gridded=torch.from_numpy(self.vis_gridded), weight_gridded=torch.from_numpy(self.weight_gridded), mask=torch.from_numpy(self.mask), )
[docs] class DirtyImager(GridderBase): r""" This class is mainly used for producing diagnostic "dirty" images of the visibility data. The DirtyImager object uses desired image dimensions (via the ``cell_size`` and ``npix`` arguments) to define a corresponding Fourier plane grid as a :class:`.GridCoords` object. A pre-computed :class:`.GridCoords` can be supplied in lieu of ``cell_size`` and ``npix``, but all three arguments should never be supplied at once. For more details on the properties of the grid that is created, see the :class:`.GridCoords` documentation. The :class:`.DirtyImager` object accepts "loose" *ungridded* visibility data and stores the arrays to the object as instance attributes. The input visibility data should be the normal set of visibilities over the full :math:`[-u,u]` and :math:`[-v,v]` domain; internally the DirtyImager will automatically augment the dataset to include the complex conjugates, i.e. the 'Hermitian pairs.' The input visibilities can be 1d for a single continuum channel, or 2d for image cube. If 1d, visibilities will be converted to 2d arrays of shape ``(1, nvis)``. Like the :class:`~mpol.images.ImageCube` class, after construction, the DirtyImager assumes that you are operating with a multi-channel set of visibilities. These routines will still work with single-channel 'continuum' visibilities, they will just have nchan = 1 in the first dimension of most products. Example:: imager = gridding.DirtyImager( coords=coords, uu=uu, vv=vv, weight=weight, data_re=data_re, data_im=data_im, ) img, beam = imager.get_dirty_image(weighting="briggs", robust=0.0) Args: cell_size (float): width of a single square pixel in [arcsec] npix (int): number of pixels in the width of the image coords (GridCoords): an object already instantiated from the GridCoords class. If providing this, cannot provide ``cell_size`` or ``npix``. uu (numpy array): (nchan, nvis) array of u spatial frequency coordinates. Units of [:math:`\lambda`] vv (numpy array): (nchan, nvis) array of v spatial frequency coordinates. Units of [:math:`\lambda`] weight (2d numpy array): (nchan, nvis) array of thermal weights. Units of [:math:`1/\mathrm{Jy}^2`] data_re (2d numpy array): (nchan, nvis) array of the real part of the visibility measurements. Units of [:math:`\mathrm{Jy}`] data_im (2d numpy array): (nchan, nvis) array of the imaginary part of the visibility measurements. Units of [:math:`\mathrm{Jy}`] """ def __init__( self, coords: GridCoords, uu: npt.NDArray[np.floating[Any]], vv: npt.NDArray[np.floating[Any]], weight: npt.NDArray[np.floating[Any]], data_re: npt.NDArray[np.floating[Any]], data_im: npt.NDArray[np.floating[Any]], ) -> None: # check everything should be 2d, expand if not # also checks data does not contain Hermitian pairs uu, vv, weight, data_re, data_im = _check_data_inputs_2d( uu, vv, weight, data_re, data_im ) # setup the coordinates object self.coords = coords self.nchan = len(uu) # make sure we still fit into the grid self.coords.check_data_fit(uu, vv) # initialize base objects # Hermitian conjugates taken care of by @property below self.uu_base = uu self.vv_base = vv self.weight_base = weight self.data_re_base = data_re self.data_im_base = data_im # make sure we still fit into the grid (with expansion) self.coords.check_data_fit(self.uu, self.vv) # and register cell indices against data self._create_cell_indices()
[docs] @classmethod def from_tensors( cls, coords: GridCoords, uu: torch.Tensor, vv: torch.Tensor, weight: torch.Tensor, data: torch.Tensor, ) -> DirtyImager: """ :class:`mpol.gridding.DirtyImager` is written internally in numpy. However, one may wish to use :class:`mpol.gridding.DirtyImager` to image residual visibilities, which are commonly of type :class:`torch.Tensor`. This classmethod converts the tensors to numpy arrays and instantiates a :class:`mpol.gridding.DirtyImager` instance. """ uu_np: npt.NDArray[np.floating[Any]] = utils.torch2npy(uu) vv_np: npt.NDArray[np.floating[Any]] = utils.torch2npy(vv) weight_np: npt.NDArray[np.floating[Any]] = utils.torch2npy(weight) data_np: npt.NDArray[np.floating[Any]] = utils.torch2npy(data) return cls(coords, uu_np, vv_np, weight_np, np.real(data_np), np.imag(data_np))
@property def uu(self) -> npt.NDArray[np.floating[Any]]: return np.concatenate([self.uu_base, -self.uu_base], axis=1) @property def vv(self) -> npt.NDArray[np.floating[Any]]: return np.concatenate([self.vv_base, -self.vv_base], axis=1) @property def weight(self) -> npt.NDArray[np.floating[Any]]: return np.concatenate([self.weight_base, self.weight_base], axis=1) @property def data_re(self) -> npt.NDArray[np.floating[Any]]: return np.concatenate([self.data_re_base, self.data_re_base], axis=1) @property def data_im(self) -> npt.NDArray[np.floating[Any]]: return np.concatenate([self.data_im_base, -self.data_im_base], axis=1) def _grid_visibilities( self, weighting: str = "uniform", robust: float | None = None, taper_function: Callable[ [npt.NDArray[np.floating[Any]], npt.NDArray[np.floating[Any]]], npt.NDArray[np.floating[Any]], ] | None = None, ) -> None: r""" Grid the loose data visibilities to the Fourier grid in preparation for imaging. Args: weighting (string): The type of cell averaging to perform. Choices of ``"natural"``, ``"uniform"``, or ``"briggs"``, following CASA tclean. If ``"briggs"``, also specify a robust value. robust (float): If ``weighting='briggs'``, specify a robust value in the range [-2, 2]. ``robust=-2`` approximately corresponds to uniform weighting and ``robust=2`` approximately corresponds to natural weighting. taper_function (function reference): a function assumed to be of the form :math:`f(u,v)` which calculates a prefactor in the range :math:`[0,1]`. This prefactor is used to premultiply the visibility data as a taper. The tapering function must assume that :math:`u` and :math:`v` will be supplied in units of :math:`\mathrm{k}\lambda`. By default no taper is applied. """ if taper_function is None: tapering_weight = np.ones_like(self.weight) else: tapering_weight = taper_function(self.uu, self.vv) # create the cells as edges around the existing points # note that at this stage, the UV grid is strictly increasing # when in fact, later on, we'll need to fftshift for the FFT cell_weight = self._sum_cell_values_cube(self.weight) # boolean index for cells that *contain* visibilities mask = cell_weight > 0.0 # calculate the density weights # the density weights have the same shape as the re, im samples. if weighting == "natural": density_weight = np.ones_like(self.weight) elif weighting == "uniform": # cell_weight is (nchan, npix_v, npix_u) # self.index_v, self.index_u are (nchan, nvis) # we want density_weights to be (nchan, nvis) density_weight = 1 / self._extract_gridded_values_to_loose(cell_weight) elif weighting == "briggs": if robust is None or not -2 <= robust <= 2: raise ValueError( "With 'briggs' weighting, a robust value must be specified between" " [-2, 2]." ) # implement robust weighting using the definition used in CASA # https://casa.nrao.edu/casadocs-devel/stable/imaging/synthesis-imaging/data-weighting # calculate the robust parameter f^2 for each channel f_sq = ((5 * 10 ** (-robust)) ** 2) / ( np.sum(cell_weight**2, axis=(1, 2)) / np.sum(self.weight, axis=1) ) # the robust weight corresponding to the cell cell_robust_weight = 1 / (1 + cell_weight * f_sq[:, np.newaxis, np.newaxis]) # zero out cells that have no visibilities # to prevent normalization error in next step cell_robust_weight[~mask] = 0 # now assign the cell robust weight to each visibility within that cell density_weight = self._extract_gridded_values_to_loose(cell_robust_weight) else: raise ValueError( "weighting must be specified as one of 'natural', 'uniform', or" " 'briggs'" ) # the factor of 2 in the denominator is needed because # we are approximating the Eqn 3.8 of Briggs' thesis # we need to sum over the Hermitian quantities in the # normalization constant. self.C = 1 / np.sum(tapering_weight * density_weight * self.weight, axis=1) # grid the reals and imaginaries separately # outputs from _sum_cell_values_cube are *not* pre-packed data_re_gridded = self._sum_cell_values_cube( self.data_re * tapering_weight * density_weight * self.weight ) data_im_gridded = self._sum_cell_values_cube( self.data_im * tapering_weight * density_weight * self.weight ) # the beam is the response to a point source, which is data_re = constant, data_im = 0 # so we save time and only calculate the reals, because gridded_beam_im = 0 re_gridded_beam = self._sum_cell_values_cube( tapering_weight * density_weight * self.weight ) # store the pre-packed FFT products for access by outside routines self.mask = np.fft.fftshift(mask) self.data_re_gridded = np.fft.fftshift(data_re_gridded, axes=(1, 2)) self.data_im_gridded = np.fft.fftshift(data_im_gridded, axes=(1, 2)) self.vis_gridded = self.data_re_gridded + self.data_im_gridded * 1.0j self.re_gridded_beam = np.fft.fftshift(re_gridded_beam, axes=(1, 2)) def _get_dirty_beam( self, C: npt.NDArray[np.floating[Any]], re_gridded_beam: npt.NDArray[np.floating[Any]], ) -> npt.NDArray[np.floating[Any]]: """ Compute the dirty beam corresponding to the gridded visibilities. Args: C (1D np.array): normalization constants for each channel re_gridded_beam (3d np.array): the gridded visibilities corresponding to a unit point source in the center of the field. Returns: numpy image cube with a dirty beam (PSF) for each channel. By definition, the peak is normalized to 1.0. """ # if we're sticking to the dirty beam and image equations in Briggs' Ph.D. thesis, # no correction for du or dv prefactors needed here # that is because we are using the FFT to compute an already discretized # equation, not approximating a continuous equation. beam = self._fliplr_cube( np.fft.fftshift( self.coords.npix**2 * np.fft.ifft2( C[:, np.newaxis, np.newaxis] * re_gridded_beam, ), axes=(1, 2), ) ) if np.max(beam.imag) >= 1e-10: raise ThresholdExceededError( "Dirty beam contained substantial imaginary values, check input" " visibilities, otherwise raise a github issue." ) self.beam: npt.NDArray[np.floating[Any]] = beam.real return self.beam def _null_dirty_beam( self, ntheta: int = 24, single_channel_estimate: bool = True ) -> npt.NDArray[np.floating[Any]]: r"""Zero out (null) all pixels in the dirty beam exterior to the first null, for each channel. Args: ntheta (int): number of azimuthal wedges to use for the 1st null calculation. More wedges will result in a more accurate estimate of dirty beam area, but will also take longer. single_channel_estimate (bool): If ``True`` (the default), use the area estimated from the first channel for all channels in the multi-channel image cube. If ``False``, calculate the beam area for all channels. Returns: a cube like the dirty beam, but with all pixels exterior to the first null set to 0. """ try: self.beam except AttributeError: self._get_dirty_beam(self.C, self.re_gridded_beam) # consider the 2D beam for each channel described by polar coordinates r, theta. # # this routine works by finding the smallest r for which the beam goes negative # (the first null) as a function of theta. Then, for this same theta, all pixels # (negative or not) with values of r larger than # this are set to 0. # the end product of this routine will be a "nulled" beam, which can be used in # the calculation of dirty beam area. # the angular extent for each "slice" # the smaller the slice, the more accurate the area estimate, but also the # longer it takes da = 2 * np.pi / ntheta # radians azimuths = np.arange(0, 2 * np.pi, da) # calculate a meshgrid (same for all channels) ll, mm = np.meshgrid(self.coords.l_centers, self.coords.m_centers) rr = np.sqrt(ll**2 + mm**2) theta = np.arctan2(mm, ll) + np.pi # radians in range [0, 2pi] nulled_beam = self.beam.copy() # for each channel, # find the first occurrence of a non-zero value, such that we end up with a # continuous ring of masked values. for i in range(self.nchan): nb = nulled_beam[i] ind_neg = nb < 0 for a in azimuths: # examine values between a, a+da with some overlap ind_azimuth = (theta >= a - 0.3 * da) & (theta <= (a + 1.3 * da)) # find all negative values within azimuth slice ind_neg_and_az = ind_neg & ind_azimuth # find the smallest r within this slice min_r = np.min(rr[ind_neg_and_az]) # null all pixels within this slice with radii r or greater ind_r = rr >= min_r ind_r_and_az = ind_r & ind_azimuth nb[ind_r_and_az] = 0 if single_channel_estimate: # just copy the mask from the first channel to all channels ind_0 = nb == 0 nulled_beam[:, ind_0] = 0 break return nulled_beam
[docs] def get_dirty_beam_area( self, ntheta: int = 24, single_channel_estimate: bool = True ) -> npt.NDArray[np.floating[Any]]: r""" Compute the effective area of the dirty beam for each channel. Assumes that the beam has already been generated by running :func:`~mpol.gridding.DirtyImager.get_dirty_image`. This is an approximate calculation involving a simple sum over all pixels out to the first null (zero crossing) of the dirty beam. This quantity is designed to approximate the conversion of image units from :math:`[\mathrm{Jy}\,\mathrm{beam}^{-1}]` to :math:`[\mathrm{Jy}\,\mathrm{arcsec}^{-2}]`, even though units of :math:`[\mathrm{Jy}\,\mathrm{dirty\;beam}^{-1}]` are technically undefined. Args: ntheta (int): number of azimuthal wedges to use for the 1st null calculation. More wedges will result in a more accurate estimate of dirty beam area, but will also take longer. single_channel_estimate (bool): If ``True`` (the default), use the area estimated from the first channel for all channels in the multi-channel image cube. If ``False``, calculate the beam area for all channels. Returns: (1D numpy array float) beam area for each channel in units of :math:`[\mathrm{arcsec}^{2}]` """ nulled = self._null_dirty_beam( ntheta=ntheta, single_channel_estimate=single_channel_estimate ) area: npt.NDArray[np.floating[Any]] = self.coords.cell_size**2 * np.sum( nulled, axis=(1, 2) ) # arcsec^2 return area
[docs] def get_dirty_image( self, weighting: str = "uniform", robust: float | None = None, taper_function: Callable[ [npt.NDArray[np.floating[Any]], npt.NDArray[np.floating[Any]]], npt.NDArray[np.floating[Any]], ] | None = None, unit: str = "Jy/beam", check_visibility_scatter: bool = True, max_scatter: float = 1.2, **beam_kwargs, ) -> tuple[npt.NDArray[np.floating[Any]], npt.NDArray[np.floating[Any]]]: r""" Calculate the dirty image. Args: weighting (string): The type of cell averaging to perform. Choices of ``"natural"``, ``"uniform"``, or ``"briggs"``, following CASA tclean. If ``"briggs"``, also specify a robust value. robust (float): If ``weighting='briggs'``, specify a robust value in the range [-2, 2]. ``robust=-2`` approxmately corresponds to uniform weighting and ``robust=2`` approximately corresponds to natural weighting. taper_function (function reference): a function assumed to be of the form :math:`f(u,v)` which calculates a prefactor in the range :math:`[0,1]` and premultiplies the visibility data. The function must assume that :math:`u` and :math:`v` will be supplied in units of :math:`\mathrm{k}\lambda`. By default no taper is applied. unit (string): what unit should the image be in. Default is ``"Jy/beam"``. If ``"Jy/arcsec^2"``, then the effective area of the dirty beam will be used to convert from ``"Jy/beam"`` to ``"Jy/arcsec^2"``. check_visibility_scatter (bool): whether the routine should check the standard deviation of visibilities in each within each :math:`u,v` cell (:math:`\mathrm{cell}_{i,j}`) defined by ``self.coords``. Default is ``True``. A ``RuntimeWarning`` will be raised if any cell has a scatter larger than ``max_scatter``. max_scatter (float): the maximum allowable standard deviation of visibility values in a given :math:`u,v` cell (:math:`\mathrm{cell}_{i,j}`) defined by ``self.coords``. Defaults to a factor of 120%. **beam_kwargs: all additional keyword arguments passed to :func:`~mpol.gridding.get_dirty_beam_area` if ``unit="Jy/arcsec^2"``. Returns: 2-tuple of (``image``, ``beam``) where ``image`` is an (nchan, npix, npix) numpy array of the dirty image cube in units ``unit``. ``beam`` is an numpy image cube with a dirty beam (PSF) for each channel. The units of the beam are always Jy/{dirty beam}, i.e., the peak of the beam is normalized to 1.0. """ # check unit input if unit not in ["Jy/beam", "Jy/arcsec^2"]: raise ValueError("Unknown unit", unit) # check the visibility scatter and flag user if there are issues if check_visibility_scatter: d = self._check_scatter_error(max_scatter) if d["return_status"]: warnings.warn( RuntimeWarning( "Visibility scatter exceeds ``max_scatter``:{:}, indicating a" " potential problem with data weights. Consider inspecting" " weights using CASA tools before exporting visibilities for" " use with MPoL. Median real scatter: {:} x sigma. Median imag" " scatter: {:} x sigma.".format( max_scatter, d["median_re"], d["median_im"] ) ) ) # call _grid_visibilities # inputs for weighting will be checked inside _grid_visibilities self._grid_visibilities( weighting=weighting, robust=robust, taper_function=taper_function, ) img = self._fliplr_cube( np.fft.fftshift( self.coords.npix**2 * np.fft.ifft2(self.C[:, np.newaxis, np.newaxis] * self.vis_gridded), axes=(1, 2), ) ) # Jy/beam # calculate the beam # also pre-stores internal self.beam value for area routine, if necessary beam = self._get_dirty_beam(self.C, self.re_gridded_beam) # for units of Jy/arcsec^2, we could just leave out the C constant *if* we were # doing uniform weighting. The relationships get more complex for robust or # natural weighting, however, so it's safer to calculate the number of # arcseconds^2 per beam if unit == "Jy/arcsec^2": beam_area_per_chan = self.get_dirty_beam_area(**beam_kwargs) # [arcsec^2] # convert image # (Jy/1 arcsec^2) = (Jy/ 1 beam) * (1 beam/ n arcsec^2) # beam_area_per_chan is the n of arcsec^2 per 1 beam img /= beam_area_per_chan[:, np.newaxis, np.newaxis] if np.max(img.imag) >= 1e-10: raise ThresholdExceededError( "Dirty image contained substantial imaginary values, check input" " visibilities, otherwise raise a github issue." ) return img.real, beam