Source code for stsynphot.spectrum

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""This module contains spectrum classes specific to STScI formats."""

# STDLIB
import os
import re
import warnings

# THIRD-PARTY
import numpy as np

# ASTROPY
from astropy import log
from astropy import units as u
from astropy.utils.exceptions import AstropyUserWarning

# SYNPHOT
from synphot import binning, reddening, units
from synphot import exceptions as synexceptions
from synphot.config import conf as synconf
from synphot.models import Empirical1D
from synphot.spectrum import SourceSpectrum, SpectralElement

# LOCAL
from stsynphot import stio
from stsynphot.exceptions import PixscaleNotFoundError

__all__ = ['reset_cache', 'interpolate_spectral_element',
           'ObservationSpectralElement', 'band', 'ebmvx', 'load_vega', 'Vega']

_interpfilepatt = re.compile(r'\[(?P<col>.*?)\]')
_REDLAWS = {}  # Cache previously loaded reddening laws
Vega = None  # Cache Vega spectrum


[docs] def reset_cache(): """Empty the reddening laws cache.""" global _REDLAWS _REDLAWS.clear()
def _interp_spec(interpval, waves, lower_val, upper_val, lower_thru, upper_thru, doshift): """Interpolate between two spectra to parameter at given value. Called by :func:`interpolate_spectral_element`. Parameters ---------- interpval : float Desired parameter value. waves : array-like Wavelengths for all the parameterized spectra. lower_val, upper_val : float Low and high values of parameter to interpolate. lower_thru, upper_thru : array-like Throughput values are ``lower_val`` and ``upper_val``. doshift : bool Perform wavelength shift before interpolation. Returns ------- thru : array-like Interpolated throughput at ``interpval``. """ if doshift: # Adjust the wavelength table to bracket the range lwave = waves + (lower_val - interpval) uwave = waves + (upper_val - interpval) # Interpolate the columns at those ranges lower_thru = np.interp(lwave, waves, lower_thru) upper_thru = np.interp(uwave, waves, upper_thru) # Then interpolate between the two columns w = (interpval - lower_val) / (upper_val - lower_val) return (upper_thru * w) + lower_thru * (1.0 - w) def _extrap_spec(interpval, lower_val, upper_val, lower_thru, upper_thru): """Extrapolate using two spectra to parameter at given value. Called by :func:`interpolate_spectral_element`. Also see :func:`_interp_spec`. """ m = (upper_thru - lower_thru) / (upper_val - lower_val) b = lower_thru - m * lower_val return m * interpval + b
[docs] def interpolate_spectral_element(parfilename, interpval, ext=1): """Interpolate (or extrapolate) throughput spectra in given parameterized FITS table to given parameter value. FITS table is parsed with :func:`stsynphot.stio.read_interp_spec`. Parameterized values must be in ascending order in the table columns. If extrapolation is needed but not allowed, default throughput from ``THROUGHPUT`` column will be used. Parameters ---------- parfilename : str Parameterized filename contains a suffix followed by a column name specificationin between square brackets. For example, ``path/acs_fr656n_006_syn.fits[fr656n#]``. interpval : float Desired parameter value. ext : int, optional FITS extension index of the data table. Returns ------- sp : `synphot.spectrum.SpectralElement` Empirical bandpass at ``interpval``. Raises ------ synphot.exceptions.ExtrapolationNotAllowed Extrapolation is not allowed by data table. synphot.exceptions.SynphotError No columns available for interpolation or extrapolation. """ def_colname = 'THROUGHPUT' warndict = {} # Separate real filename and column name specification xre = _interpfilepatt.search(parfilename) if xre is None: raise synexceptions.SynphotError( f'{parfilename} must be in the format of "path/filename.fits' '[col#]"') filename = parfilename[0:xre.start()] col_prefix = xre.group('col').upper() # Read data table try: data, wave_unit, doshift, extrapolate = stio.read_interp_spec( filename, tab_ext=ext) except Exception as e: # pragma: no cover raise IOError(f'Failed to read {filename}[{ext}]: {repr(e)}') wave_unit = units.validate_unit(wave_unit) wave0 = data['WAVELENGTH'] # Determine the columns that bracket the desired value. # Grab all columns that begin with the parameter name (e.g. 'MJD#') # and then split off the numbers after the '#'. col_names = [] col_pars = [] for n in data.names: cn = n.upper() if cn.startswith(col_prefix): col_names.append(cn) col_pars.append(float(cn.split('#')[1])) if len(col_names) < 1: raise synexceptions.SynphotError( f'{filename} contains no interpolated columns for {col_prefix}.') # Assumes ascending order of parameter values in table. min_par = col_pars[0] max_par = col_pars[-1] # Exact match. No interpolation needed. if interpval in col_pars: thru = data[col_names[col_pars.index(interpval)]] # Need interpolation. elif (interpval > min_par) and (interpval < max_par): upper_ind = np.searchsorted(col_pars, interpval) lower_ind = upper_ind - 1 thru = _interp_spec( interpval, wave0, col_pars[lower_ind], col_pars[upper_ind], data[col_names[lower_ind]], data[col_names[upper_ind]], doshift) # Need extrapolation, if allowed. elif extrapolate: # Extrapolate below lowest columns. if interpval < min_par: thru = _extrap_spec(interpval, min_par, col_pars[1], data[col_names[0]], data[col_names[1]]) # Extrapolate above highest columns. else: # interpval > max_par thru = _extrap_spec(interpval, col_pars[-2], max_par, data[col_names[-2]], data[col_names[-1]]) # Extrapolation not allowed. else: # Use default, if available. if def_colname in data.names: warnings.warn( 'Extrapolation not allowed, using default throughput for ' f'{parfilename}.', AstropyUserWarning) warndict['DefaultThroughput'] = True thru = data[def_colname] # Nothing can be done. else: raise synexceptions.ExtrapolationNotAllowed( f'No default throughput for {parfilename}.') meta = {'expr': f'{filename}#{interpval:g}', 'warnings': warndict} return SpectralElement( Empirical1D, points=wave0 * wave_unit, lookup_table=thru, meta=meta)
[docs] class ObservationSpectralElement(SpectralElement): """Class to handle bandpass from observation mode. This class has additional methods that are specific to observation mode and instrument-specific wavelength set. .. note:: For methods that take ``area``, it is *recommended* but not required to use `area`. Parameters ---------- modelclass, kwargs See `~synphot.spectrum.BaseSpectrum`. obsmode : `~stsynphot.observationmode.ObservationMode` Observation mode for this bandpass. """ def __init__(self, modelclass, obsmode=None, **kwargs): if obsmode is None: raise synexceptions.SynphotError('Missing OBSMODE.') super(ObservationSpectralElement, self).__init__(modelclass, **kwargs) self._obsmode = obsmode self.meta['expr'] = str(obsmode) # Check for zero bounds, if applicable try: self.bounded_by_zero() except synexceptions.SynphotError: # pragma: no cover warnings.warn( 'Zero-bound check not done due to undefined waveset.', AstropyUserWarning) @property def obsmode(self): """Observation mode for this bandpass.""" return self._obsmode @property def binset(self): """Instrument-specific wavelength set from ``stsynphot.wavetable.WAVECAT`` based on `obsmode`.""" return self.obsmode.bandwave @property def area(self): """Telescope collecting area based on `obsmode`.""" return self.obsmode.primary_area def __len__(self): """Get the number of components in `obsmode`.""" return len(self.obsmode)
[docs] def taper(self, **kwargs): """Disabled.""" raise NotImplementedError('Method disabled.')
[docs] def bounded_by_zero(self, wavelengths=None, verbose=True): """Check if sampled throughout is bounded by zeroes. Parameters ---------- wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None` Wavelength values for sampling. If not a Quantity, assumed to be in Angstrom. If `None`, ``self.waveset`` is used. verbose : bool Print warning if unbounded. Returns ------- bounded : bool `True` if bounded by zeroes, `False` otherwise. """ thru = self(wavelengths) y = thru[::thru.size - 1].value bounded = np.all(y == 0) if not bounded and verbose: warnings.warn( f'Unbounded throughput; {y} when expecting zeroes.', AstropyUserWarning) return bounded
[docs] def showfiles(self): # pragma: no cover """Display ``self.obsmode`` optical component filenames. .. note:: Similar to IRAF SYNPHOT SHOWFILES. """ self.obsmode.showfiles()
[docs] def thermback(self, area=None, thermtable=None): """Calculate thermal background count rate for ``self.obsmode``. Calculation uses :func:`~stsynphot.observationmode.ObservationMode.thermal_spectrum` to extract thermal component source spectrum in PHOTLAM per square arcsec. Then this spectrum is integrated and multiplied by detector pixel scale and telescope collecting area to produce a count rate in count/s/pix. This unit is non-standard but used widely by STScI Exposure Time Calculator. .. note:: Similar to IRAF SYNPHOT THERMBACK. Parameters ---------- area : float, `~astropy.units.quantity.Quantity`, or `None` Area that flux covers. If not a Quantity, assumed to be in :math:`cm^{2}`. If `None`, use `area`. thermtable : str or `None` Thermal component table filename. If `None`, uses ``stsynphot.config.conf.thermtable``. Returns ------- bg : `~astropy.units.quantity.Quantity` Thermal background count rate. Raises ------ stsynphot.exceptions.PixscaleNotFoundError Undefined pixel scale for the given observation mode. """ if self.obsmode.pixscale is None: raise PixscaleNotFoundError( f'Undefined pixel scale for {self.obsmode}.') if area is None: area = self.area area = units.validate_quantity(area, units.AREA) sp = self.obsmode.thermal_spectrum(thermtable=thermtable) bg = sp.integrate() * self.obsmode.pixscale ** 2 * area return bg.value * (u.count / u.s / u.pix)
[docs] def binned_waverange(self, cenwave, npix, **kwargs): """Calculate the wavelength range covered by the given number of pixels centered on the given central wavelengths of `binset`. Parameters ---------- cenwave : float or `~astropy.units.quantity.Quantity` Desired central wavelength. If not a Quantity, assumed to be in Angstrom. npix : int Desired number of pixels, centered on ``cenwave``. kwargs : dict Keywords accepted by :func:`synphot.binning.wave_range`. Returns ------- waverange : `~astropy.units.quantity.Quantity` Lower and upper limits of the wavelength range, in the unit of ``cenwave``. Raises ------ synphot.exceptions.UndefinedBinset Undefined `binset`. """ if self.binset is None: raise synexceptions.UndefinedBinset( 'No binset specified for this passband.') # Calculation is done in the unit of cenwave. if not isinstance(cenwave, u.Quantity): cenwave = cenwave * self._internal_wave_unit bin_wave = units.validate_quantity( self.binset, cenwave.unit, equivalencies=u.spectral()) return binning.wave_range( bin_wave.value, cenwave.value, npix, **kwargs) * cenwave.unit
[docs] def binned_pixelrange(self, waverange, **kwargs): """Calculate the number of pixels within the given wavelength range and `binset`. Parameters ---------- waverange : tuple of float or `~astropy.units.quantity.Quantity` Lower and upper limits of the desired wavelength range. If not a Quantity, assumed to be in Angstrom. kwargs : dict Keywords accepted by :func:`synphot.binning.pixel_range`. Returns ------- npix : number Number of pixels. Raises ------ synphot.exceptions.UndefinedBinset Undefined `binset`. """ if self.binset is None: raise synexceptions.UndefinedBinset( 'No binset specified for this passband.') x = units.validate_quantity( waverange, self._internal_wave_unit, equivalencies=u.spectral()) return binning.pixel_range(self.binset.value, x.value, **kwargs)
[docs] def to_fits(self, filename, wavelengths=None, **kwargs): """Write the spectrum to a FITS file. Throughput column is automatically named 'THROUGHPUT'. Graph and optical component tables are written to table header (not primary) under these keywords: * ``GRFTABLE`` * ``CMPTABLE`` Parameters ---------- filename : str Output filename. wavelengths : array-like, `~astropy.units.quantity.Quantity`, or `None` Wavelength values for sampling. If not a Quantity, assumed to be in Angstrom. If `None`, ``self.waveset`` is used. kwargs : dict Keywords accepted by :func:`synphot.specio.write_fits_spec`. """ bkeys = { 'grftable': (os.path.basename(self.obsmode.gtname), 'graph table used'), 'cmptable': (os.path.basename(self.obsmode.ctname), 'component table used')} if 'ext_header' in kwargs: kwargs['ext_header'].update(bkeys) else: kwargs['ext_header'] = bkeys super(ObservationSpectralElement, self).to_fits( filename, wavelengths=wavelengths, **kwargs)
[docs] @classmethod def from_file(cls, filename, **kwargs): """Disabled.""" raise NotImplementedError('Class method disabled.')
[docs] @classmethod def from_filter(cls, filtername, **kwargs): """Disabled.""" raise NotImplementedError('Class method disabled.')
[docs] @classmethod def from_obsmode(cls, obsmode, graphtable=None, comptable=None, component_dict={}): """Create a bandpass from observation mode string. Parameters ---------- obsmode : str Observation mode. graphtable : str or `None` Graph table filename. If `None`, uses ``stsynphot.config.conf.graphtable``. comptable : str or `None` Optical component table filename. If `None`, uses ``stsynphot.config.conf.comptable``. component_dict : dict Maps component filename to corresponding `~stsynphot.observationmode.Component`. Returns ------- bp : `ObservationSpectralElement` Empirical bandpass. Raises ------ synphot.exceptions.SynphotError Observation mode yields no throughput. """ from stsynphot.observationmode import ObservationMode # Avoid circular import # noqa ob = ObservationMode( obsmode, graphtable=graphtable, comptable=comptable, component_dict=component_dict) if not isinstance(ob.throughput, SpectralElement): # pragma: no cover raise synexceptions.SynphotError(f'{obsmode} has no throughput.') return cls(ob.throughput, obsmode=ob)
[docs] def band(*args, **kwargs): """Convenience function to create a bandpass with an observation mode string. See :func:`ObservationSpectralElement.from_obsmode` and :ref:`stsynphot-obsmode`. """ return ObservationSpectralElement.from_obsmode(*args, **kwargs)
[docs] def ebmvx(redlaw_name, ebv): """Convenience function to create extinction curve for given reddening law and :math:`E(B-V)`. Parameters ---------- redlaw_name : str Reddening law model name (see :meth:`synphot.reddening.ReddeningLaw.from_extinction_model`). Choose from 'lmc30dor', 'lmcavg', 'mwavg', 'mwdense', 'mwrv21', 'mwrv40', 'smcbar', 'xgalsb', 'gal3', or `None`. ``gal3`` and `None` are same as ``mwavg`` and used for testing only. ebv : float or `~astropy.units.quantity.Quantity` :math:`E(B-V)` value in magnitude. See :meth:`synphot.reddening.ReddeningLaw.extinction_curve`. Returns ------- extcurve : `synphot.reddening.ExtinctionCurve` Extinction curve. """ global _REDLAWS if redlaw_name in ('gal3', None): m = 'mwavg' log.info(f'{redlaw_name} uses {m} reddening law.') else: m = redlaw_name if m not in _REDLAWS: _REDLAWS[m] = reddening.ReddeningLaw.from_extinction_model( m, encoding='binary') return _REDLAWS[m].extinction_curve(ebv)
[docs] def load_vega(vegafile=None, **kwargs): """Convenience function to load Vega spectrum that is used throughout ``stsynphot``. Parameters ---------- vegafile : str or `None`, optional Vega spectrum filename. If `None`, use ``synphot.config.conf.vega_file``. kwargs : dict Keywords acceptable by :func:`synphot.specio.read_remote_spec`. Returns ------- sp : `synphot.spectrum.SourceSpectrum` or `None` Vega spectrum. `None` if failed. """ global Vega if vegafile is None: vegafile = synconf.vega_file with synconf.set_temp('vega_file', vegafile): try: Vega = SourceSpectrum.from_vega(**kwargs) except Exception as e: Vega = None warnings.warn( f'Failed to load Vega spectrum from {vegafile}; Functionality ' f'involving Vega will be severely limited: {repr(e)}', AstropyUserWarning)
# Load default Vega load_vega(encoding='binary')