Source code for stsynphot.observationmode

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Module to handle observations based on observation modes."""

import re
import warnings

import numpy as np

from astropy import log
from astropy import units as u
from astropy.utils.decorators import lazyproperty
from astropy.utils.exceptions import AstropyUserWarning

from synphot import exceptions as synexceptions
from synphot import units
from synphot.models import Empirical1D
from synphot.spectrum import SourceSpectrum, SpectralElement
from synphot.thermal import ThermalSpectralElement
from synphot.utils import merge_wavelengths

from . import exceptions, stio
from .config import conf
from .spectrum import Vega, interpolate_spectral_element
from .tables import GraphTable, CompTable
from .wavetable import WAVECAT

__all__ = ['reset_cache', 'Component', 'ThermalComponent',
           'BaseObservationMode', 'ObservationMode', 'ThermalObservationMode']

_band_patt = re.compile(r'band\((.*?)\)', re.IGNORECASE)

# Cache previously loaded graph, component, thermal, and detector tables

[docs]def reset_cache(): """Empty the table dictionaries cache.""" global _GRAPHDICT, _COMPDICT, _THERMDICT, _DETECTORDICT _GRAPHDICT.clear() _COMPDICT.clear() _THERMDICT.clear() _DETECTORDICT.clear()
[docs]class Component: """Class to handle individual components in `BaseObservationMode`. Parameters ---------- throughput_name : str Component filename. interpval : float or `None` If not `None`, interpolate to the given value. See :func:`stsynphot.spectrum.interpolate_spectral_element`. Attributes ---------- throughput_name : str Component filename. throughput : `synphot.spectrum.SpectralElement` or `None` Component spectrum object. """ def __init__(self, throughput_name, interpval=None): self.throughput_name = throughput_name # Extract bandpass unless component is a CLEAR filter. if throughput_name != conf.clear_filter: if interpval is None: self.throughput = SpectralElement.from_file(throughput_name) else: self.throughput = interpolate_spectral_element( throughput_name, interpval) else: self.throughput = None @property def empty(self): """`True` if ``self.throughput`` is empty.""" return self.throughput is None def __str__(self): return str(self.throughput)
[docs]class ThermalComponent(Component): """Class to handle thermal components in `BaseObservationMode`. Parameters ---------- throughput_name, thermal_name : str Optical and thermal component filenames. interpval : float or `None` If not `None`, interpolate to the given value (optical only). See :func:`stsynphot.spectrum.interpolate_spectral_element`. Attributes ---------- throughput_name, thermal_name : str Optical and thermal component filenames. throughput : `synphot.spectrum.SpectralElement` or `None` Optical component spectrum object. emissivity : `synphot.thermal.ThermalSpectralElement` or `None` Thermal component spectrum object. """ def __init__(self, throughput_name, thermal_name, interpval=None): super(ThermalComponent, self).__init__( throughput_name, interpval=interpval) self.thermal_name = thermal_name # Extract thermal spectrum unless component is a CLEAR filter. if thermal_name != conf.clear_filter: self.emissivity = ThermalSpectralElement.from_file(thermal_name) else: self.emissivity = None @property def empty(self): """`True` if component is empty.""" return self.throughput is None and self.emissivity is None def __str__(self): return str(self.emissivity)
def _process_graphtable(graphtable): """Load and cache graphtable. Get primary area.""" global _GRAPHDICT # Use default graph table if not given if graphtable is None: gtname = conf.graphtable else: gtname = graphtable # Load and cache graphtable, if not in cache if gtname in _GRAPHDICT: gt = _GRAPHDICT[gtname] else: gt = GraphTable(gtname) _GRAPHDICT[gtname] = gt # Set telescope collecting area if gt.primary_area is None: area = conf.area else: area = gt.primary_area primary_area = units.validate_quantity(area, units.AREA) return gt, gtname, primary_area
[docs]class BaseObservationMode: """Base class to handle an observation that uses the graph and optical component tables, common to both optical and thermal observation modes. .. note:: ``modes`` for parameterized mode is set such that the parameterized value is stripped away; e.g., ``'acs,wfc1,f555w,mjd#53000'`` becomes ``['acs', 'wfc1', 'f555w', 'mjd#']``. Instead, the parameterized value is kept in ``pardict``; e.g., ``'mjd#53000'`` becomes ``{'mjd': 53000.0}``. ``pixscale`` is set from ``stsynphot.config.conf.detectorfile``, which is parsed with :func:`stsynphot.stio.read_detector_pars`. ``binset`` is set by ``stsynphot.wavetable.WAVECAT``. Parameters ---------- obsmode : str Observation mode. graphtable : str or `None` Graph table filename. If `None`, uses ``stsynphot.config.conf.graphtable``. comptable : str or `None` Optical component table filename. If `None`, uses ``stsynphot.config.conf.comptable``. Attributes ---------- modes : list List of individual modes within observation mode. pardict : dict Maps parameterized mode to its value. gtname, ctname : str Graph and component table filenames. compnames, thcompnames : list of str Optical and thermal components. components : list of obj List of component objects. primary_area : `astropy.units.quantity.Quantity` Telescope collecting area. pixscale : `astropy.units.quantity.Quantity` Detector pixel scale. binset : str Wavelength table filename/param string from matching obsmode. bandwave : `astropy.units.quantity.Quantity` Wavelength set defined by ``binset``. """ def __init__(self, obsmode, graphtable=None, comptable=None): global _GRAPHDICT, _COMPDICT # Strip "band()" syntax if present, and force lowercase tmatch = if tmatch: self._obsmode = else: self._obsmode = obsmode.lower() # Split obsmode and separate parameterized modes modes = self._obsmode.replace(' ', '').split(',') self.pardict = {} if '#' in self._obsmode: self.modes = [] for m in modes: if '#' in m: key, val = m.split('#') self.pardict[key] = float(val) self.modes.append(f'{key:s}#') else: self.modes.append(m) else: self.modes = modes # Get graph table and primary area gt, self.gtname, self.primary_area = _process_graphtable(graphtable) # Get optical and thermal components self.compnames, self.thcompnames = gt.get_comp_from_gt(self.modes, 1) # Use default optical component table if not given if comptable is None: self.ctname = conf.comptable else: self.ctname = comptable # Load and cache comptable, if not in cache if self.ctname in _COMPDICT: ct = _COMPDICT[self.ctname] else: ct = CompTable(self.ctname) _COMPDICT[self.ctname] = ct # Set by sub-classes self.components = None # Get optical component filenames self._throughput_filenames = ct.get_filenames(self.compnames) # Set detector pixel scale self._set_pixscale() # For sensitivity calculations self._constant = self.primary_area / units.HC # Get wavelength set try: self.binset, self.bandwave = WAVECAT.load_waveset(self._obsmode) except (KeyError, exceptions.AmbiguousObsmode): self.binset = '' self.bandwave = None def _set_pixscale(self): """Set pixel scale. If multiple matches found, only first match is used. """ global _DETECTORDICT det_file = stio.irafconvert(conf.detectorfile) if det_file in _DETECTORDICT: data = _DETECTORDICT[det_file] else: data = stio.read_detector_pars(det_file) _DETECTORDICT[det_file] = data obsmode = ','.join(self._obsmode.split(',')[:2]) pixscales = data[data['OBSMODE'] == obsmode]['SCALE'].data if pixscales.size < 1: self.pixscale = None else: self.pixscale = pixscales[0] * u.arcsec def _get_components(self): raise NotImplementedError('To be implemented by subclasses.') def __str__(self): return self._obsmode def __len__(self): return len(self.components) @staticmethod def _parkey_from_filename(filename): """Extract parkey from component filename in the format of ``name[parkey#]``. """ if filename.endswith('#]'): x = filename.split('[') parkey = x[1][:-2] else: parkey = None return parkey
[docs] def showfiles(self): # pragma: no cover """Display optical component filenames. .. note:: Similar to IRAF SYNPHOT SHOWFILES. """ info_str = '#Throughput table names:\n' for name in self._throughput_filenames: if name != conf.clear_filter: info_str += f'{name}\n'
[docs]class ObservationMode(BaseObservationMode): """Class to handle an observation that uses the graph and optical component tables. See `BaseObservationMode` for additional attributes. Parameters ---------- obsmode, graphtable, comptable See `BaseObservationMode`. component_dict : dict Maps component filename to corresponding `Component`. """ def __init__(self, obsmode, graphtable=None, comptable=None, component_dict={}): super(ObservationMode, self).__init__( obsmode, graphtable=graphtable, comptable=comptable) self._component_dict = component_dict self.components = self._get_components() def _get_components(self): """Get optical components.""" components = [] for throughput_name in self._throughput_filenames: parkey = self._parkey_from_filename(throughput_name) # Hack for WFPC2 ramp filters if parkey == 'wave' and self.modes[0] == 'wfpc2': parkey = 'lrf' cdict_key = (throughput_name, self.pardict.get(parkey)) if cdict_key not in self._component_dict: self._component_dict[cdict_key] = Component( cdict_key[0], interpval=cdict_key[1]) component = self._component_dict[cdict_key] if not component.empty: components.append(component) return components def _mul_thru(self, index): """Multiply all component spectra starting at given index.""" product = self.components[index].throughput if len(self.components) > index: for component in self.components[index + 1:]: if not component.empty: product = product * component.throughput product.meta['header'] = '' # Clean up messy header return product @lazyproperty def throughput(self): """Combined throughput from multiplying all the components together.""" try: thru = self._mul_thru(0) except IndexError as e: # pragma: no cover thru = None warnings.warn( f'Graph table is broken: {repr(e)}', AstropyUserWarning) return thru @lazyproperty def sensitivity(self): """Sensitivity spectrum to convert flux in :math:`erg \\; cm^{-2} \\; s^{-1} \\; \\AA^{-1}` to :math:`count s^{-1} \\AA^{-1}`. Calculation is done by combining the throughput curves with :math:`\\frac{h \\; c}{\\lambda}` . """ x = self.throughput.waveset y = self.throughput(x) thru = y.value * x.value * self._constant.value meta = {'expr': f'Sensitivity for {self._obsmode}'} return SpectralElement( Empirical1D, points=x, lookup_table=thru, meta=meta)
[docs] def thermal_spectrum(self, thermtable=None): """Calculate thermal spectrum using :func:`ThermalObservationMode.to_spectrum`. Parameters ---------- thermtable : str or `None` Thermal component table filename. If `None`, uses ``stsynphot.config.conf.thermtable``. Returns ------- sp : `synphot.spectrum.SourceSpectrum` Thermal spectrum in PHOTLAM. Raises ------ synphot.exceptions.SynphotError Calculation failed. """ thom = ThermalObservationMode( self._obsmode, graphtable=self.gtname, comptable=self.ctname, thermtable=thermtable) try: sp = thom.to_spectrum() except IndexError as e: # pragma: no cover raise synexceptions.SynphotError(f'Broken graph table: {repr(e)}') return sp
[docs]class ThermalObservationMode(BaseObservationMode): """Class to handle an observation that uses the graph, optical component, and thermal component tables. See `BaseObservationMode` for additional attributes. Parameters ---------- obsmode, graphtable, comptable See `BaseObservationMode`. thermtable : str or `None` Thermal component table filename. If `None`, uses ``stsynphot.config.conf.thermtable``. Attributes ---------- thname : str Thermal component table filename. Raises ------ NotImplementedError Clear filters only not supported. """ def __init__(self, obsmode, graphtable=None, comptable=None, thermtable=None): global _GRAPHDICT, _COMPDICT, _THERMDICT super(ThermalObservationMode, self).__init__( obsmode, graphtable=graphtable, comptable=comptable) # Check here to see if there are any valid filters. # "0.0" was added in tae17277m_tmt.fits (Apr 2017). if set(self.thcompnames).issubset(set([conf.clear_filter, '', '0.0'])): raise NotImplementedError( f'No thermal support provided for {self._obsmode}') # Use default thermal component table if not given if thermtable is None: self.thname = conf.thermtable else: self.thname = thermtable # Load and cache thermtable, if not in cache if self.thname in _THERMDICT: thct = _THERMDICT[self.thname] else: thct = CompTable(self.thname) _THERMDICT[self.thname] = thct # Get thermal component filenames self._thermal_filenames = thct.get_filenames(self.thcompnames) self.components = self._get_components() def _get_components(self): """Get thermal components.""" components = [] for throughput_name, thermal_name in zip( self._throughput_filenames, self._thermal_filenames): parkey = self._parkey_from_filename(throughput_name) component = ThermalComponent(throughput_name, thermal_name, interpval=self.pardict.get(parkey)) if not component.empty: components.append(component) return components def __str__(self): return f'{self._obsmode} (thermal)' def _merge_em_wave(self): """Merge emissivity wavelength sets in Angstrom.""" index = 1 # Find first wavelength set for component in self.components: emissivity = component.emissivity if emissivity is None: index += 1 else: result = emissivity.waveset.value break # Merge subsequent wavelength sets for component in self.components[index:]: emissivity = component.emissivity if emissivity is not None: result = merge_wavelengths(result, emissivity.waveset.value) return result def _get_wave_intersection(self): """Find wavelengths in Angstrom where ``stsynphot.config.conf.waveset_array`` intersects with ``stsynphot.spectrum.Vega``. .. note:: No one knows why Vega is the chosen one. """ def_wave = conf.waveset_array # Angstrom minw = min(def_wave) maxw = max(def_wave) # Refine min and max wavelengths using thermal components for component in self.components[1:]: emissivity = component.emissivity if emissivity is not None: w = emissivity.waveset.value # Angstrom minw = max(minw, w.min()) maxw = min(maxw, w.max()) w = self._merge_em_wave() result = w[(w > minw) & (w < maxw)] # Intersect with Vega if Vega is None: raise synexceptions.SynphotError('Missing Vega spectrum.') w = Vega.waveset.value # Angstrom return result[(result > w.min()) & (result < w.max())]
[docs] def to_spectrum(self): """Get thermal spectrum. The calculations start with zero-flux spectrum. Then for each component: #. Multiply with optical throughput. #. Add in thermal source spectrum from :meth:`synphot.thermal.ThermalSpectralElement.thermal_source`. Returns ------- sp : `synphot.spectrum.SourceSpectrum` Thermal spectrum in PHOTLAM. """ # Create zero-flux spectrum x = self._get_wave_intersection() # Angstrom y = np.zeros_like(x, dtype=np.float64) # PHOTLAM minw, maxw = x[([0, -1], )] sp = SourceSpectrum(Empirical1D, points=x, lookup_table=y) for component in self.components: # Transmissive section (optical passband) if component.throughput is not None: sp = sp * component.throughput # Thermal section if component.emissivity is not None: sp = sp + component.emissivity.thermal_source() # Trim spectrum w = sp.waveset.value mask = (w >= minw) & (w <= maxw) x = w[mask] y = sp(x) sp = SourceSpectrum(Empirical1D, points=x, lookup_table=y) meta = {'expr': f'{self._obsmode} ThermalSpectrum'} sp.meta.update(meta) return sp