Source code for nikamap.nikamap

from __future__ import absolute_import, division, print_function

from collections.abc import MutableMapping
from pathlib import Path

import numpy as np

from astropy.io import fits, registry
from astropy import units as u
from astropy.wcs import WCS
from astropy.nddata import StdDevUncertainty

import warnings

from .contmap import ContMap, ContBeam
from .utils import update_header

Jy_beam = u.Jy / u.beam

__all__ = ["NikaMap", "NikaFits", "NikaBeam"]


# TODO: Take care of operations (add/subtract/...) to add extra parameters...


[docs] class NikaBeam(object): """NikaBeam describe the beam of a NikaMap. This class is for back-compatibility, returning a ContBeam object Parameters ---------- fwhm : :class:`astropy.units.Quantity` Full width half maximum of the Gaussian kernel. pixel_scale : `astropy.units.equivalencies.pixel_scale` The pixel scale either in units of angle/pixel or pixel/angle. See also -------- :class:`astropy.convolution.Gaussian2DKernel` """ def __new__(cls, *args, **kwargs): warnings.warn("NikaBeam is deprecated, use ContBeam instead") fwhm = args[0] if len(args) > 0 else kwargs.pop("fwhm", None) pixel_scale = args[1] if len(args) > 1 else kwargs.pop("pixel_scale", None) return ContBeam(major=fwhm, pixscale=(1 * u.pix).to(fwhm.unit, equivalencies=pixel_scale))
[docs] class NikaMap(ContMap): """A NikaMap object represent a nika map with additionnal capabilities. It contains the metadata, wcs, and all attribute (data/stddev/time/unit/mask) as well as potential source list detected in these maps. Parameters ---------- data : :class:`~numpy.ndarray` or :class:`astropy.nddata.NDData` The actual data contained in this `NDData` object. Not that this will always be copies by *reference* , so you should make copy the ``data`` before passing it in if that's the desired behavior. uncertainty : :class:`astropy.nddata.NDUncertainty`, optional Uncertainties on the data. mask : :class:`~numpy.ndarray`-like, optional Mask for the data, given as a boolean Numpy array or any object that can be converted to a boolean Numpy array with a shape matching that of the data. The values must be ``False`` where the data is *valid* and ``True`` when it is not (like Numpy masked arrays). If ``data`` is a numpy masked array, providing ``mask`` here will causes the mask from the masked array to be ignored. hits : :class:`~numpy.ndarray`-like, optional The hit per pixel on the map sampling_freq : float or :class:`~astropy.units.Quantity` the sampling frequency of the experiment, default 1 Hz wcs : undefined, optional WCS-object containing the world coordinate system for the data. meta : `dict`-like object, optional Metadata for this object. "Metadata" here means all information that is included with this object but not part of any other attribute of this particular object. e.g., creation date, unique identifier, simulation parameters, exposure time, telescope name, etc. unit : :class:`astropy.units.UnitBase` instance or str, optional The units of the data. beam : :class:`radio_beam.Beam` The beam corresponding to the data, by default a gaussian constructed from the header 'BMAJ' 'BMIN', 'PA' keyword. fake_source : :class:`astropy.table.Table`, optional The table of potential fake sources included in the data .. note:: The table must contain at least 3 columns: ['ID', 'ra', 'dec'] sources : :class`astropy.table.Table`, optional The table of detected sources in the data. """ primary_header = None def __init__(self, data, *args, **kwargs): self.primary_header = kwargs.pop("primary_header", None) super().__init__(data, *args, **kwargs) if isinstance(data, NikaMap): if self.primary_header is None and data.primary_header is not None: self.primary_header = data.primary_header
def retrieve_primary_keys(filename, band="1mm", **kwd): """Retrieve usefulle keys in primary header.""" assert band in ["1mm", "2mm", "1", "2", "3"], "band should be either '1mm', '2mm', '1', '2', '3'" with fits.open(filename, **kwd) as hdus: # Fiddling to "fix" the fits file # extension params and info # hdus[14].header['EXTNAME'] = 'Param' # hdus[15].header['EXTNAME'] = 'Info' f_sampling = hdus[0].header["f_sampli"] * u.Hz if band in ["1mm", "1", "3"]: bmaj = hdus[0].header["FWHM_260"] * u.arcsec elif band in ["2mm", "2"]: bmaj = hdus[0].header["FWHM_150"] * u.arcsec return f_sampling, bmaj def idl_fits_nikamap_reader(filename, band="1mm", revert=False, **kwd): """NIKA2 IDL Pipeline Map reader. Parameters ---------- filename : str the fits filename band : str (1mm | 2mm | 1 | 2 | 3) the requested band revert : boolean use if to return -1 * data """ f_sampling, bmaj = retrieve_primary_keys(filename, band, **kwd) with fits.open(filename, **kwd) as hdus: primary_header = hdus[0].header brightness_key = "Brightness_{}".format(band) stddev_key = "Stddev_{}".format(band) hits_key = "Nhits_{}".format(band) data = hdus[brightness_key].data.astype(float) header = hdus[brightness_key].header e_data = hdus[stddev_key].data.astype(float) hits = hdus[hits_key].data.astype(int) header = update_header(header, bmaj) # time = (hits / f_sampling).to(u.h) # Mask unobserved regions unobserved = hits == 0 data[unobserved] = np.nan e_data[unobserved] = np.nan if revert: data *= -1 data = NikaMap( data, mask=unobserved, uncertainty=StdDevUncertainty(e_data), hits=hits, sampling_freq=f_sampling, unit=header["UNIT"], wcs=WCS(header), header=header, primary_header=primary_header, ) return data def idl_fits_nikamap_writer(nm_data, filename, band="1mm", append=False, **kwd): """Write NikaMap object on IDL Pipeline fits file format. Parameters ---------- filename : str the fits filename band : str (1mm | 2mm | 1 | 2 | 3) the output band append : boolean append nikamap to file """ assert band in ["1mm", "2mm", "1", "2", "3"], "band should be either '1mm', '2mm', '1', '2', '3'" if append: hdus = fits.HDUList.fromfile(filename, mode="update") else: hdus = fits.HDUList([fits.PrimaryHDU(None, getattr(nm_data, "primary_header", None))]) for hdu in nm_data.to_hdus( hdu_data="Brightness_{}".format(band), hdu_mask=None, hdu_uncertainty="Stddev_{}".format(band), hdu_hits="Nhits_{}".format(band), ): hdus.append(hdu) if append: hdus.flush() else: hdus.writeto(filename, **kwd) def piic_fits_nikamap_reader(filename, band=None, revert=False, unit="mJy/beam", **kwd): """NIKA2 PIIC Pipeline Map reader. Parameters ---------- filename : str or `Path the fits data filename band : str (1mm | 2mm | 1 | 2 | 3 ) the corresponding band revert : boolean use if to return -1 * data unit : str unit of the data (assuming mJy/beam) Notes ----- the snr filenames is assumed to be in the same directory ending in '_snr.fits' """ data_file = Path(filename) rgw_file = data_file.parent / (data_file.with_suffix("").name + "rgw.fits") assert data_file.exists() & rgw_file.exists(), "Either {} or {} could not be found".format( data_file.name, rgw_file.name ) with fits.open(data_file) as data_hdu, fits.open(rgw_file) as rgw_hdu: header = data_hdu[0].header data = data_hdu[0].data.astype(float) rgw = rgw_hdu[0].data.astype(float) rgw_header = rgw_hdu[0].header assert WCS(rgw_header).to_header() == WCS(header).to_header(), "{} and {} do not share the same WCS".format( data_file.name, rgw_file.name ) with np.errstate(invalid="ignore", divide="ignore"): e_data = 1 / np.sqrt(rgw) unobserved = np.isnan(data) | np.isnan(e_data) if revert: data *= -1 data = NikaMap( data, mask=unobserved, uncertainty=StdDevUncertainty(e_data), unit=unit, wcs=WCS(header), header=header, primary_header=None, hits=None, ) return data def identify_piic(origin, *args, **kwargs): data_file = Path(args[0]) rgw_file = data_file.parent / (data_file.with_suffix("").name + "rgw.fits") check = data_file.exists() & rgw_file.exists() if check: check &= fits.connect.is_fits("read", data_file.parent, data_file.open(mode="rb")) check &= fits.connect.is_fits("read", rgw_file.parent, rgw_file.open(mode="rb")) return check with registry.delay_doc_updates(NikaMap): registry.register_reader("piic", NikaMap, piic_fits_nikamap_reader) registry.register_identifier("piic", NikaMap, identify_piic) registry.register_reader("idl", NikaMap, idl_fits_nikamap_reader) registry.register_writer("idl", NikaMap, idl_fits_nikamap_writer) registry.register_identifier("idl", NikaMap, fits.connect.is_fits) class NikaFits(MutableMapping): def __init__(self, filename=None, **kwd): self._filename = filename self._kwd = kwd self.__data = {"1mm": None, "2mm": None, "1": None, "2": None, "3": None} with fits.open(filename, **kwd) as hdus: self.primary_header = hdus[0].header def __repr__(self): return "<NIKAFits(filename={})>".format(self._filename) def __getitem__(self, key): value = self.__data[key] if not isinstance(value, NikaMap): value = NikaMap.read(self._filename, band=key, **self._kwd) self.__data[key] = value return value def __delitem__(self, key): raise NotImplementedError def __setitem__(self, key, value): if key not in self.__data: raise KeyError(key) def __iter__(self): return iter(self.__data) def __len__(self): return len(self.__data) @classmethod def read(cls, *args, **kwargs): """NIKA2 IDL Pipeline Map reader Parameters if isinstance(data, ContMap): if self.hits is None and data.hits is not None: self.hits = data.hits if self.beam is None and data.beam is not None: self.beam = data.beam ---------- filename : str the fits filename revert : boolean use if to return -1 * data """ return cls(*args, **kwargs) def write(self, filename, *args, **kwargs): """Write NikaFits object on IDL Pipeline fits file format Parameters ---------- filename : str the fits filename """ hdus = [fits.PrimaryHDU(None, self.primary_header)] for band, item in self.items(): hdus += item.to_hdus( hdu_data="Brightness_{}".format(band), hdu_mask=None, hdu_uncertainty="Stddev_{}".format(band), hdu_hits="Nhits_{}".format(band), ) hdus = fits.HDUList(hdus) hdus.writeto(filename, **kwargs)