import copy
import os
import shutil
import subprocess
import tempfile
import warnings
import astropy.units as u
import h5py
import numpy as np
from astropy.modeling.functional_models import Gaussian1D
from astropy.table import QTable
from pathlib import Path, PurePath
from soxs.constants import ckms, erg_per_keV, hc, sigma_to_fwhm, sqrt2pi
from soxs.spectra.foreground_absorption import tbabs_cross_section, wabs_cross_section
from soxs.utils import (
issue_deprecation_warning,
line_width_equiv,
mylog,
parse_prng,
parse_value,
regrid_spectrum,
)
class Energies(u.Quantity):
def __new__(cls, energy, flux):
ret = u.Quantity.__new__(cls, energy, unit="keV")
ret.flux = u.Quantity(flux, "erg/(cm**2*s)")
return ret
def _generate_energies(spec, t_exp, rate, prng, binscale, quiet=False):
cumspec = spec.cumspec
n_ph = prng.poisson(t_exp * rate)
if not quiet:
mylog.info("Creating %d energies from this spectrum.", n_ph)
randvec = prng.uniform(size=n_ph)
randvec.sort()
if binscale in ["linear", "custom"]:
bin_edges = spec.ebins.value
elif binscale == "log":
bin_edges = np.log10(spec.ebins.value)
e = np.interp(randvec, cumspec, bin_edges)
if binscale == "log":
e = 10**e
if not quiet:
mylog.info("Finished creating energies.")
return e
def distance_handler(redshift, dist, cosmology):
from astropy.cosmology import Planck18
if cosmology is None:
cosmology = Planck18
if redshift == 0.0 and dist is None:
raise ValueError("Either 'redshift' must be > 0 or 'dist' cannot be None!")
if dist is None:
D_A = cosmology.angular_diameter_distance(redshift).to_value("cm")
else:
D_A = u.Quantity(parse_value(dist, "kpc"), "kpc").to_value("cm")
return D_A
class BaseSpectrum:
_units = "" # set by subclasses
def __init__(self, ebins, spec, binscale="linear"):
self.ebins = u.Quantity(ebins, "keV")
self.emid = 0.5 * (self.ebins[1:] + self.ebins[:-1])
self._spec = u.Quantity(spec, self._units)
self.nbins = len(self.emid)
self.de = np.diff(self.ebins)
self.binscale = binscale
self._compute_totals()
self.noisy = False
self._spec_err = None
self.exp_time = None
self.arf = None
self.rmf = None
def __copy__(self):
# Create a new instance and copy attributes
return type(self)(self.ebins, self._spec.value, binscale=self.binscale)
def __deepcopy__(self, memo):
# Create a new instance and deep copy attributes
return type(self)(
copy.deepcopy(self.ebins, memo),
copy.deepcopy(self._spec, memo),
binscale=self.binscale,
)
def _compute_totals(self):
self._energy_spec = self._spec * self.emid.to("erg") / (1.0 * u.photon)
self._total_spec = (self._spec * self.de).sum()
self._total_energy_spec = (self._energy_spec * self.de).sum()
cumspec = np.cumsum((self._spec * self.de).value)
cumspec = np.insert(cumspec, 0, 0.0)
cumspec /= cumspec[-1]
self.cumspec = cumspec
self.func = lambda e: np.interp(e, self.emid.value, self._spec.value)
self._binned_spec = self._spec * self.de
self._binned_energy_spec = self._energy_spec * self.de
def _check_binning_units(self, other):
if self.nbins != other.nbins or not np.isclose(self.ebins.value, other.ebins.value).all():
raise RuntimeError("Energy binning for these two spectra is not the same!!")
if self._units != other._units:
raise RuntimeError("The units for these two spectra are not the same!")
def _compute_waves(self):
self._wvbins = self.ebins.to("angstrom", equivalencies=u.spectral())
self._wvmid = 0.5 * (self._wvbins[1:] + self._wvbins[:-1])
self._dwv = -np.diff(self._wvbins)
def _compute_freqs(self):
self._fbins = self.ebins.to("Hz", equivalencies=u.spectral())
self._fmid = 0.5 * (self._fbins[1:] + self._fbins[:-1])
self._df = np.diff(self._fbins)
_wvbins = None
@property
def wvbins(self):
if self._wvbins is None:
self._compute_waves()
return self._wvbins
_wvmid = None
@property
def wvmid(self):
if self._wvmid is None:
self._compute_waves()
return self._wvmid
_dwv = None
@property
def dwv(self):
if self._dwv is None:
self._compute_waves()
return self._dwv
_fbins = None
@property
def fbins(self):
if self._fbins is None:
self._compute_freqs()
return self._fbins
_fmid = None
@property
def fmid(self):
if self._fmid is None:
self._compute_freqs()
return self._fmid
_df = None
@property
def df(self):
if self._df is None:
self._compute_freqs()
return self._df
def __add__(self, other):
self._check_binning_units(other)
return type(self)(self.ebins, self._spec + other._spec, binscale=self.binscale)
def __sub__(self, other):
self._check_binning_units(other)
return type(self)(self.ebins, self._spec - other._spec, binscale=self.binscale)
def __mul__(self, other):
if hasattr(other, "eff_area"):
return ConvolvedSpectrum.convolve(self, other)
else:
return type(self)(self.ebins, other * self._spec, binscale=self.binscale)
__rmul__ = __mul__
def __truediv__(self, other):
return type(self)(self.ebins, self._spec / other)
__div__ = __truediv__
def __repr__(self):
s = f"{type(self).__name__} ({self.ebins[0]} - {self.ebins[-1]})\n"
s += f" Total Values:\n {self._total_spec}\n {self._total_energy_spec}\n"
return s
def __call__(self, e):
if hasattr(e, "to_astropy"):
e = e.to_astropy()
if isinstance(e, u.Quantity):
e = e.to("keV").value
return u.Quantity(self.func(e), self._units)
def regrid_spectrum(self, emin, emax, nbins, binscale="linear", vlos=0.0, vtot=None):
"""
Regrid an existing spectrum to a new energy binning and
return a new spectrum.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity
The minimum energy in the band, in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy in the band, in keV.
nbins : integer
The number of bins in the spectrum.
binscale : str, optional
The scale of the energy binning: "linear" or "log".
Default: "linear"
vlos : float, (value, unit) tuple, or Quantity, optional
The line-of-sight velocity of the source, in km/s.
Used for Doppler-shifting the spectrum. Default: 0.0
vtot : float, (value, unit) tuple, or Quantity, optional
The total velocity magnitude of the source, in km/s.
If not set, it is assumed to be equal to the absolute
value of the vlos argument. This value is mainly relevant
if the velocity is relativistic and is not completely
along the sight line. Default: None
"""
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
vlos = parse_value(vlos, "km/s") / ckms
if vtot is None:
vtot = np.abs(vlos)
else:
vtot = parse_value(vtot, "km/s") / ckms
shift = np.sqrt(1.0 - vtot**2) / (1.0 + vlos)
if binscale == "linear":
ebins = np.linspace(emin, emax, nbins + 1)
elif binscale == "log":
ebins = np.logspace(np.log10(emin), np.log10(emax), nbins + 1)
de = np.diff(ebins)
spec_new = (
shift
* shift
* shift
* regrid_spectrum(ebins / shift, self.ebins.value, self._spec.value * self.de.value)
/ de
)
return type(self)(ebins, spec_new, binscale=binscale)
def restrict_within_band(self, emin=None, emax=None):
"""
Zeros out the spectrum outside a specified energy band.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity, optional
The minimum energy in the band, in keV. Default is to use the lowest energy.
emax : float, (value, unit) tuple, or Quantity, optional
The maximum energy in the band, in keV. Default is to use the highest energy.
"""
if emin is not None:
emin = parse_value(emin, "keV")
self._spec[self.emid.value < emin] = 0.0
if emax is not None:
emax = parse_value(emax, "keV")
self._spec[self.emid.value > emax] = 0.0
self._compute_totals()
def _get_total_in_band(self, emin, emax):
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
range = np.logical_and(self.emid.value >= emin, self.emid.value <= emax)
pflux = self._binned_spec[range].sum()
eflux = self._binned_energy_spec[range].sum()
return pflux, eflux
@classmethod
def _from_ext_model(cls, ebins, flux):
ebins = np.array(ebins)
de = np.diff(ebins)
dloge = np.diff(np.log10(ebins))
flux = np.array(flux) / de
if np.isclose(de, de[0]).all():
binscale = "linear"
elif np.isclose(dloge, dloge[0]).all():
binscale = "log"
else:
binscale = "custom"
return cls(ebins, flux, binscale=binscale)
@classmethod
def from_constant(cls, const_value, emin, emax, nbins, binscale="linear"):
"""
Create a spectrum from a constant model.
Parameters
----------
const_value : float
The value of the constant in the units of the spectrum.
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the spectrum in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the spectrum in keV.
nbins : integer
The number of bins in the spectrum.
binscale : str, optional
The scale of the energy binning: "linear" or "log".
Default: "linear"
"""
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
if binscale == "linear":
ebins = np.linspace(emin, emax, nbins + 1)
elif binscale == "log":
ebins = np.logspace(np.log10(emin), np.log10(emax), nbins + 1)
flux = const_value * np.ones(nbins)
return cls(ebins, flux, binscale=binscale)
@classmethod
def from_powerlaw(cls, photon_index, redshift, norm, emin, emax, nbins, binscale="linear"):
"""
Create a spectrum from a power-law model. The form of the model
is F(E) = norm*(e*(1+z)/(1 keV))**-photon_index.
Parameters
----------
photon_index : float
The photon index of the source.
redshift : float
The redshift of the source.
norm : float
The normalization of the source in units of
the spectrum, photons/s/cm**2/keV, at 1 keV
in the source frame.
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the spectrum in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the spectrum in keV.
nbins : integer
The number of bins in the spectrum.
binscale : str, optional
The scale of the energy binning: "linear" or "log".
Default: "linear"
"""
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
if binscale == "linear":
ebins = np.linspace(emin, emax, nbins + 1)
elif binscale == "log":
ebins = np.logspace(np.log10(emin), np.log10(emax), nbins + 1)
emid = 0.5 * (ebins[1:] + ebins[:-1])
flux = norm * (emid * (1.0 + redshift)) ** (-photon_index)
return cls(ebins, flux, binscale=binscale)
@classmethod
def from_file(cls, filename):
"""
Read a spectrum from an ASCII ECSV, FITS, or HDF5 file, in
the forms written by ``write_ascii_file``, ``write_fits_file``,
and ``write_h5_file``.
Parameters
----------
filename : str
The path to the file containing the spectrum.
"""
kwargs = {}
try:
# First try reading the file as HDF5
with h5py.File(filename, "r") as f:
flux = f["spectrum"][()]
nbins = flux.size
binscale = f.attrs.get("binscale", None)
if binscale is None:
warnings.warn(
"This spectrum file was written without a 'binscale' attribute! "
"A linear energy binning will be assumed.",
stacklevel=2,
)
binscale = "linear"
if binscale == "linear":
ebins = np.linspace(f["emin"][()], f["emax"][()], nbins + 1)
elif binscale == "log":
ebins = np.logspace(np.log10(f["emin"][()]), np.log10(f["emax"][()]), nbins + 1)
for key in ["arf", "rmf", "noisy", "exp_time"]:
kwargs[key] = f.attrs.get(key, None)
except OSError:
# If that fails, try reading the file as ASCII or FITS
# using AstroPy QTable
try:
t = QTable.read(filename, format="fits")
except OSError:
t = QTable.read(filename, format="ascii.ecsv")
ebins = np.append(t["elo"].value, t["ehi"].value[-1])
flux = t["flux"].value
binscale = t.meta.get("binscale", t.meta.get("BINSCALE", None))
if binscale is None:
warnings.warn(
"This spectrum file was written without a 'binscale' attribute! "
"A linear energy binning will be assumed.",
stacklevel=2,
)
binscale = "linear"
units = t.meta.get("units", t.meta.get("UNITS", None))
if units is None or units != cls._units:
raise ValueError(
f"Spectrum units in file ({t.meta['UNITS']}) do not match "
f"the expected units ({cls._units})!"
) from None
for key in ["arf", "rmf", "noisy", "exp_time"]:
kwargs[key] = t.meta.get(key, t.meta.get(key.upper(), None))
if kwargs["arf"] is not None:
arf = kwargs.pop("arf")
return cls(
ebins,
flux,
arf,
binscale=binscale,
**kwargs,
)
else:
return cls(ebins, flux, binscale=binscale)
def _new_spec_from_band(self, emin, emax):
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
band = np.logical_and(self.ebins.value >= emin, self.ebins.value <= emax)
idxs = np.where(band)[0]
ebins = self.ebins.value[idxs]
flux = self._spec.value[idxs[:-1]]
return ebins, flux
def new_spec_from_band(self, emin, emax):
"""
Create a new Spectrum object
from a subset of an existing one defined by a particular
energy band.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the band in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the band in keV.
"""
ebins, flux = self._new_spec_from_band(emin, emax)
return type(self)(ebins, flux, binscale=self.binscale)
def _write_fits_or_ascii(self):
meta = {"binscale": self.binscale, "noisy": self.noisy, "units": self._units}
if self.arf is not None:
meta["arf"] = self.arf.filename
if self.rmf is not None:
meta["rmf"] = self.rmf.filename
if self.exp_time is not None:
meta["exp_time"] = self.exp_time.value
t = QTable(
[self.ebins[:-1], self.ebins[1:], self._spec],
names=["elo", "ehi", "flux"],
meta=meta,
)
return t
def write_ascii_file(self, specfile, overwrite=False):
"""
Write the spectrum to an ASCII file in the ECSV format
(https://docs.astropy.org/en/stable/io/ascii/ecsv.html).
Parameters
----------
specfile : str
The filename to write the file to.
overwrite : bool, optional
Whether to overwrite an existing
file with the same name. Default: False
"""
t = self._write_fits_or_ascii()
t.write(specfile, overwrite=overwrite, format="ascii.ecsv")
def write_file(self, specfile, overwrite=False):
"""
Write a Spectrum object to disk, in
any of three formats, which will be determined by
the chosen suffix:
ASCII ECSV: .dat, .txt. or .ecsv
HDF5: .hdf5 or .h5
FITS: .fits
Parameters
----------
specfile : str
The name of the file to write to.
overwrite : bool, optional
Whether to overwrite an existing
file with the same name. Default: False
"""
suffix = PurePath(specfile).suffix.lower()
if suffix in [".dat", ".txt", ".ecsv"]:
mylog.info("Writing ASCII ECSV file %s.", specfile)
self.write_ascii_file(specfile, overwrite=overwrite)
elif suffix in [".hdf5", ".h5"]:
mylog.info("Writing HDF5 file %s.", specfile)
self.write_hdf5_file(specfile, overwrite=overwrite)
elif suffix == ".fits":
mylog.info("Writing FITS file %s.", specfile)
self.write_fits_file(specfile, overwrite=overwrite)
else:
raise NotImplementedError(f"Unknown file suffix {suffix}!")
def write_hdf5_file(self, specfile, overwrite=False):
"""
Write the spectrum to an HDF5 file.
Parameters
----------
specfile : str
The filename to write the file to.
overwrite : bool, optional
Whether to overwrite an existing
file with the same name. Default: False
"""
if Path(specfile).exists() and not overwrite:
raise OSError(f"File {specfile} exists and overwrite=False!")
with h5py.File(specfile, "w") as f:
f.create_dataset("emin", data=self.ebins[0].value)
f.create_dataset("emax", data=self.ebins[-1].value)
f.create_dataset("spectrum", data=self._spec.value)
f.attrs["binscale"] = self.binscale
f.attrs["noisy"] = self.noisy
f.attrs["units"] = self._units
if self.arf is not None:
f.attrs["arf"] = self.arf.filename
if self.rmf is not None:
f.attrs["rmf"] = self.rmf.filename
if self.exp_time is not None:
f.attrs["exp_time"] = self.exp_time.value
def write_fits_file(self, specfile, overwrite=False):
"""
Write the spectrum to a FITS file.
Parameters
----------
specfile : str
The filename to write the file to.
overwrite : bool, optional
Whether to overwrite an existing
file with the same name. Default: False
"""
t = self._write_fits_or_ascii()
t.write(specfile, overwrite=overwrite, format="fits")
def add_emission_line(self, line_center, line_width, line_amp, line_type="gaussian"):
"""
Add an emission line to this spectrum.
Parameters
----------
line_center : float, (value, unit) tuple, or Quantity
The line center position in units of keV, in the observer frame.
line_width : one or more float, (value, unit) tuple, or Quantity
The line width (FWHM) in units of keV, in the observer frame. Can also
input the line width in units of velocity in the rest frame. For the Voigt
profile, a list, tuple, or array of two values should be provided since there
are two line widths, the Lorentzian and the Gaussian (in that order).
line_amp : float, (value, unit) tuple, or Quantity
The integrated line amplitude in the units of the spectrum
line_type : str, optional
The line profile type. Default: "gaussian"
"""
line_center = parse_value(line_center, "keV")
line_width = parse_value(line_width, "keV", equivalence=line_width_equiv(line_center))
line_amp = parse_value(line_amp, self._units)
if line_type == "gaussian":
sigma = line_width / sigma_to_fwhm
line_amp /= sqrt2pi * sigma
f = Gaussian1D(line_amp, line_center, sigma)
else:
raise NotImplementedError(f'Line profile type "{line_type}" not implemented!')
self._spec += u.Quantity(f(self.emid.value), self._units)
self._compute_totals()
def add_absorption_line(self, line_center, line_width, equiv_width, line_type="gaussian"):
"""
Add an absorption line to this spectrum.
Parameters
----------
line_center : float, (value, unit) tuple, or Quantity
The line center position in units of keV, in the observer frame.
line_width : one or more float, (value, unit) tuple, or Quantity
The line width (FWHM) in units of keV, in the observer frame. Can also
input the line width in units of velocity in the rest frame. For the Voigt
profile, a list, tuple, or array of two values should be provided since there
are two line widths, the Lorentzian and the Gaussian (in that order).
equiv_width : float, (value, unit) tuple, or Quantity
The equivalent width of the line, in units of milli-Angstrom
line_type : str, optional
The line profile type. Default: "gaussian"
"""
line_center = parse_value(line_center, "keV")
line_width = parse_value(line_width, "keV", equivalence=line_width_equiv(line_center))
equiv_width = parse_value(equiv_width, "1.0e-3*angstrom") # in milliangstroms
equiv_width *= 1.0e-3 # convert to angstroms
if line_type == "gaussian":
sigma = line_width / sigma_to_fwhm
B = equiv_width * line_center * line_center
B /= hc * sqrt2pi * sigma
f = Gaussian1D(B, line_center, sigma)
else:
raise NotImplementedError(f"Line profile type '{line_type}' not implemented!")
self._spec *= np.exp(-f(self.emid.value))
self._compute_totals()
def apply_foreground_absorption(self, nH, model="wabs", redshift=0.0, abund_table="angr"):
"""
Given a hydrogen column density, apply
galactic foreground absorption to the spectrum.
Parameters
----------
nH : float, (value, unit) tuple, or Quantity
The hydrogen column in units of 10**22 atoms/cm**2
model : str, optional
The model for absorption to use. Options are "wabs"
(Wisconsin, Morrison and McCammon; ApJ 270, 119) or
"tbabs" (Tuebingen-Boulder, Wilms, J., Allen, A., &
McCray, R. 2000, ApJ, 542, 914). Default: "wabs".
redshift : float, optional
The redshift of the absorbing material. Default: 0.0
abund_table : str
The abundance table to be used for abundances in the
absorption model. Default is set in the SOXS
configuration file, the default for which is "angr".
Built-in options are:
"angr" : from Anders E. & Grevesse N. (1989, Geochimica et
Cosmochimica Acta 53, 197)
"aspl" : from Asplund M., Grevesse N., Sauval A.J. & Scott
P. (2009, ARAA, 47, 481)
"feld" : from Feldman U. (1992, Physica Scripta, 46, 202)
"wilm" : from Wilms, Allen & McCray (2000, ApJ 542, 914
except for elements not listed which are given zero abundance)
"lodd" : from Lodders, K (2003, ApJ 591, 1220)
"cl17.03" : the default abundance table in Cloudy 17.03
"""
nH = parse_value(nH, "1.0e22*cm**-2")
e = self.emid.value * (1.0 + redshift)
if model == "wabs":
sigma = wabs_cross_section(e)
elif model == "tbabs":
if not isinstance(abund_table, str):
raise ValueError(
"Must supply a string corresponding to one of "
"the abundance tables provided in SOXS for "
"the TBabs model!"
)
sigma = tbabs_cross_section(e, abund_table=abund_table)
self._spec *= np.exp(-nH * 1.0e22 * sigma)
self._compute_totals()
def plot(
self,
lw=2,
xmin=None,
xmax=None,
ymin=None,
ymax=None,
xscale=None,
yscale=None,
label=None,
fontsize=18,
fig=None,
ax=None,
**kwargs,
):
"""
Make a quick Matplotlib plot of the spectrum. A Matplotlib
figure and axis is returned.
Parameters
----------
lw : float, optional
The width of the lines in the plots. Default: 2.0 px.
xmin : float, optional
The left-most energy in keV to plot. Default is the
minimum value in the spectrum.
xmax : float, optional
The right-most energy in keV to plot. Default is the
maximum value in the spectrum.
ymin : float, optional
The lower extent of the y-axis. By default, it is set automatically.
ymax : float, optional
The upper extent of the y-axis. By default, it is set automatically.
xscale : str, optional
The scaling of the x-axis of the plot. Default: "log"
yscale : str, optional
The scaling of the y-axis of the plot. Default: "log"
label : str, optional
The label of the spectrum. Default: None
fontsize : int
Font size for labels and axes. Default: 18
fig : :class:`~matplotlib.figure.Figure`, optional
A Figure instance to plot in. Default: None, one will be
created if not provided.
ax : :class:`~matplotlib.axes.Axes`, optional
An Axes instance to plot in. Default: None, one will be
created if not provided.
Returns
-------
A tuple of the :class:`~matplotlib.figure.Figure` and the
:class:`~matplotlib.axes.Axes` objects.
"""
import matplotlib.pyplot as plt
if fig is None:
fig = plt.figure(figsize=(10, 10))
if xscale is None:
if ax is None:
xscale = "log"
else:
xscale = ax.get_xscale()
if yscale is None:
if ax is None:
yscale = "log"
else:
yscale = ax.get_yscale()
if ax is None:
ax = fig.add_subplot(111)
if self.noisy:
ax.errorbar(
self.emid,
self._spec,
xerr=0.5 * self.de,
yerr=self._spec_err,
fmt=".",
lw=lw,
ms=lw,
label=label,
**kwargs,
)
else:
ax.plot(self.emid, self._spec, lw=lw, label=label, **kwargs)
ax.set_xscale(xscale)
ax.set_yscale(yscale)
ax.set_xlim(xmin, xmax)
ax.set_ylim(ymin, ymax)
ax.set_xlabel("Energy (keV)", fontsize=fontsize)
yunit = u.Unit(self._units).to_string("latex").replace("{}^{\\prime}", "arcmin")
ax.set_ylabel(f"Spectrum ({yunit})", fontsize=fontsize)
ax.tick_params(reset=True, axis="both", labelsize=fontsize)
return fig, ax
[docs]
class Spectrum(BaseSpectrum):
_units = "photon/(cm**2*s*keV)"
@property
def flux(self):
return self._spec
@property
def energy_flux(self):
return self._energy_spec
@property
def total_flux(self):
return self._total_spec
@property
def total_energy_flux(self):
return self._total_energy_spec
@property
def binned_flux(self):
return self._binned_spec
@property
def binned_energy_flux(self):
return self._binned_energy_spec
@property
def flux_per_wavelength(self):
return self.binned_flux / self.dwv
@property
def energy_flux_per_wavelength(self):
return self.binned_energy_flux / self.dwv
@property
def flux_per_frequency(self):
return self.binned_flux / self.df
@property
def energy_flux_per_frequency(self):
return self.binned_energy_flux / self.df
[docs]
def rescale_flux(self, new_flux, emin=None, emax=None, flux_type="photons"):
"""
Rescale the flux of the spectrum, optionally using
a specific energy band.
Parameters
----------
new_flux : float
The new flux in units of photons/s/cm**2.
emin : float, (value, unit) tuple, or Quantity, optional
The minimum energy of the band to consider,
in keV. Default: Use the minimum energy of
the entire spectrum.
emax : float, (value, unit) tuple, or Quantity, optional
The maximum energy of the band to consider,
in keV. Default: Use the maximum energy of
the entire spectrum.
flux_type : str, optional
The units of the flux to use in the rescaling:
"photons": photons/s/cm**2
"energy": erg/s/cm**2
"""
if emin is None:
emin = self.ebins[0].value
if emax is None:
emax = self.ebins[-1].value
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
idxs = np.logical_and(self.emid.value >= emin, self.emid.value <= emax)
if flux_type == "photons":
f = self._binned_spec[idxs].sum()
elif flux_type == "energy":
f = self._binned_energy_spec[idxs].sum()
self._spec *= new_flux / f.value
self._compute_totals()
[docs]
def get_flux_in_band(self, emin, emax):
"""
Determine the total flux within a band specified
by an energy range.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity
The minimum energy in the band, in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy in the band, in keV.
Returns
-------
A tuple of values for the flux/intensity in the
band: the first value is in terms of the photon
rate, the second value is in terms of the energy rate.
"""
return self._get_total_in_band(emin, emax)
[docs]
def get_lum_in_band(self, emin, emax, redshift=0.0, dist=None, cosmology=None):
"""
Determine the total photon count rate and luminosity
within a band specified by an energy range. Either a redshift or
or an explicit distance must be specified in order to convert
from the observer frame to the source frame.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity
The minimum energy in the band, in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy in the band, in keV.
redshift : float
The redshift to the source, assuming it is cosmological.
dist : float, (value, unit) tuple, or Quantity
The distance to the source, if it is not cosmological. If a unit
is not specified, it is assumed to be in kpc.
cosmology : :class:`~astropy.cosmology.Cosmology` object
An AstroPy cosmology object used to determine the luminosity
distance if needed. If not set, the default is the Planck 2018
cosmology.
Returns
-------
A tuple of values for the luminosity in the band: the first
value is in terms of the photon rate, the second value is
in terms of the energy rate.
"""
D_L = distance_handler(redshift, dist, cosmology) * (1.0 + redshift) ** 2
pflux, eflux = self.get_flux_in_band(emin / (1.0 + redshift), emax / (1.0 + redshift))
dist_fac = 4.0 * np.pi * D_L**2
elum = dist_fac * eflux
plum = dist_fac * pflux / (1.0 + redshift)
return plum, elum
[docs]
def generate_energies(self, t_exp, area, prng=None, quiet=False):
"""
Generate photon energies from this spectrum
given an exposure time and effective area.
Parameters
----------
t_exp : float, (value, unit) tuple, or Quantity
The exposure time in seconds.
area : float, (value, unit) tuple, or Quantity
The effective area in cm**2. If one is creating
events for a SIMPUT file, a constant should be
used, and it must be large enough so that a
sufficiently large sample is drawn for the ARF.
prng : RandomState object, integer, or None
A pseudo-random number generator. This will typically only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
quiet : bool, optional
If True, log messages will not be displayed when
creating energies. Useful if you have to loop over
a lot of spectra. Default: False
"""
t_exp = parse_value(t_exp, "s")
area = parse_value(area, "cm**2")
prng = parse_prng(prng)
rate = area * self.total_flux.value
energy = _generate_energies(self, t_exp, rate, prng, self.binscale, quiet=quiet)
flux = np.sum(energy) * erg_per_keV / t_exp / area
energies = Energies(energy, flux)
return energies
[docs]
@classmethod
def from_count_rate_spectrum(cls, spec, redshift=0.0, dist=None, cosmology=None):
"""
Create a Spectrum from a CountRateSpectrum object, assuming that the latter
is in the rest frame of the source. To get the Spectrum in the observer
frame, you must either provide a redshift or a distance to the source to
convert to flux. Note that the energy bins of the original CountRateSpectrum
object will be cosmologically redshifted if the redshift is nonzero.
Parameters
----------
spec : CountRateSpectrum
The CountRateSpectrum object to create the Spectrum from.
redshift : float
The redshift to the source, assuming it is cosmological.
dist : float, (value, unit) tuple, or Quantity
The distance to the source, if it is not cosmological. If a unit
is not specified, it is assumed to be in kpc.
cosmology : :class:`~astropy.cosmology.Cosmology` object
An AstroPy cosmology object used to determine the luminosity
distance if needed. If not set, the default is the Planck 2018
cosmology.
"""
D_A = distance_handler(redshift, dist, cosmology)
flux = spec.rate.value / (4.0 * np.pi * D_A**2 * (1.0 + redshift) ** 2)
return cls(spec.ebins.value / (1.0 + redshift), flux, binscale=spec.binscale)
[docs]
@classmethod
def from_spex_model(cls, spex_session, isect=1):
"""
Create a spectrum from a SPEX session.
Parameters
----------
spex_session : :class:`~pyspex.spex.Session`
The SPEX session object.
isect : integer, optional
The sector in the SPEX session to use. Default: 1.
"""
ss = spex_session.mod_spectrum
ss.get(isect)
ehi = ss.energy_upper.to_value("keV")
elo = ehi - ss.energy_width.to_value("keV")
ebins = np.append(elo, ehi[-1])
flux = ss.spectrum.to_value("ph/(bin*s*cm**2)")
return cls._from_ext_model(ebins, flux)
@classmethod
def _from_xspec(cls, xspec_in, emin, emax, nbins, binscale="linear", xspec_settings=None):
emin = parse_value(emin, "keV")
emax = parse_value(emax, "keV")
tmpdir = Path(tempfile.mkdtemp())
curdir = Path.cwd()
outscript = []
if xspec_settings is not None:
for key, value in xspec_settings.items():
outscript.append(f"xset {key} {value}\n")
outscript.extend(xspec_in)
outscript.append(f"dummyrsp {emin} {emax} {nbins} {binscale[:3]}\n")
outscript += [
f"set fp [open {str(tmpdir)}/spec_therm.xspec w+]\n",
"tclout energies\n",
"puts $fp $xspec_tclout\n",
"tclout modval\n",
"puts $fp $xspec_tclout\n",
"close $fp\n",
"quit\n",
]
with open(tmpdir / "xspec.in", "w") as f_xin:
f_xin.writelines(outscript)
logfile = curdir / "xspec.log"
if os.environ["SHELL"] in ["tcsh", "csh"]:
init_suffix = "csh"
else:
init_suffix = "sh"
sh_file = [
f"#!{os.environ['SHELL']}\n",
f". {os.environ['HEADAS']}/headas-init.{init_suffix}\n",
f"xspec - {str(tmpdir)}/xspec.in\n",
]
with open(tmpdir / "xspec.sh", "w") as f_xs:
f_xs.writelines(sh_file)
with open(logfile, "ab") as xsout:
subprocess.call(
[os.environ["SHELL"], f"{str(tmpdir)}/xspec.sh"],
stdout=xsout,
stderr=xsout,
)
with open(tmpdir / "spec_therm.xspec") as f_s:
lines = f_s.readlines()
ebins = np.array(lines[0].split()).astype("float64")
de = np.diff(ebins)
flux = np.array(lines[1].split()).astype("float64") / de
shutil.rmtree(str(tmpdir))
return cls(ebins, flux, binscale=binscale)
[docs]
@classmethod
def from_xspec_script(cls, infile, emin, emax, nbins, binscale="linear", xspec_settings=None):
"""
Create a model spectrum using a script file as
input to XSPEC.
Parameters
----------
infile : str
Path to the script file to use.
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the spectrum in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the spectrum in keV.
nbins : integer
The number of bins in the spectrum.
binscale : str, optional
The scale of the energy binning: "linear" or "log".
Default: "linear"
xspec_settings : dict, optional
A dictionary of XSPEC settings to set before running the
model, each of which will be run as (e.g.) "xset {key} {value}".
Default: None.
"""
with open(infile) as f:
xspec_in = f.readlines()
return cls._from_xspec(
xspec_in,
emin,
emax,
nbins,
binscale=binscale,
xspec_settings=xspec_settings,
)
[docs]
@classmethod
def from_xspec_model(
cls,
model_string,
params,
emin,
emax,
nbins,
binscale="linear",
xspec_settings=None,
):
"""
Create a model spectrum using a model string and parameters
as input to XSPEC.
Parameters
----------
model_string : str
The model to create the spectrum from. Use standard XSPEC
model syntax. Example: "wabs*mekal"
params : list
The list of parameters for the model. Must be in the order
that XSPEC expects.
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the spectrum in keV
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the spectrum in keV
nbins : integer
The number of bins in the spectrum.
binscale : str, optional
The scale of the energy binning: "linear" or "log".
Default: "linear"
xspec_settings : dict, optional
A dictionary of XSPEC settings to set before running the
model, each of which will be run as (e.g.) "xset {key} {value}".
Default: None.
"""
xspec_in = []
model_str = f"{model_string} &"
for param in params:
model_str += f" {param} &"
model_str += " /*"
xspec_in.append(f"model {model_str}\n")
return cls._from_xspec(
xspec_in,
emin,
emax,
nbins,
binscale=binscale,
xspec_settings=xspec_settings,
)
[docs]
@classmethod
def from_pyxspec_model(cls, model, spectrum_index=None):
"""
Create a spectrum from a PyXspec model object.
Parameters
----------
model : :class:`~xspec.Model`
The PyXspec model object.
spectrum_index : integer, optional
The index of the spectrum in the model object to
use for getting the energies and fluxes. Default: Will
use the first valid value of the spectral index, starting
from zero.
"""
if spectrum_index is None:
spectrum_index = 0
try:
ebins = model.energies(spectrum_index)
except Exception as e:
if spectrum_index == 0:
spectrum_index = 1
ebins = model.energies(spectrum_index)
else:
raise e
flux = model.values(spectrum_index)
return cls._from_ext_model(ebins, flux)
[docs]
class CountRateSpectrum(BaseSpectrum):
_units = "photon/(s*keV)"
@property
def rate(self):
return self._spec
@property
def luminosity(self):
return self._energy_spec
@property
def total_rate(self):
return self._total_spec
@property
def total_luminosity(self):
return self._total_energy_spec
@property
def binned_rate(self):
return self._binned_spec
@property
def binned_luminosity(self):
return self._binned_energy_spec
@property
def rate_per_wavelength(self):
return self.binned_rate / self.dwv
@property
def luminosity_per_wavelength(self):
return self.binned_luminosity / self.dwv
@property
def rate_per_frequency(self):
return self.binned_rate / self.df
@property
def luminosity_per_frequency(self):
return self.binned_luminosity / self.df
[docs]
def generate_energies(self, t_exp, prng=None, quiet=False):
"""
Generate photon energies from this count rate spectrum given an
exposure time.
Parameters
----------
t_exp : float, (value, unit) tuple, or Quantity
The exposure time in seconds.
prng : RandomState object, integer, or None
A pseudo-random number generator. Typically will only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
quiet : bool, optional
If True, log messages will not be displayed when
creating energies. Useful if you have to loop over
a lot of spectra. Default: False
"""
t_exp = parse_value(t_exp, "s")
prng = parse_prng(prng)
rate = self.total_flux.value
energy = _generate_energies(self, t_exp, rate, prng, self.binscale, quiet=quiet)
energies = u.Quantity(energy, "keV")
return energies
[docs]
@classmethod
def from_spectrum(cls, spec, redshift=0.0, dist=None, cosmology=None):
"""
Create a CountRateSpectrum from a Spectrum object, assuming that the latter
is in the observer frame. To get the CountRateSpectrum in the source
frame, you must either provide a redshift or a distance to the source to
convert to photon count rate and luminosity. Note that the energy bins
of the original Spectrum object will be cosmologically blueshifted if
the redshift is nonzero.
Parameters
----------
spec : Spectrum
The Spectrum object to create the CountRateSpectrum from.
redshift : float
The redshift to the source, assuming it is cosmological.
dist : float, (value, unit) tuple, or Quantity
The distance to the source, if it is not cosmological. If a unit
is not specified, it is assumed to be in kpc.
cosmology : :class:`~astropy.cosmology.Cosmology` object
An AstroPy cosmology object used to determine the luminosity
distance if needed. If not set, the default is the Planck 2018
cosmology.
"""
D_A = distance_handler(redshift, dist, cosmology)
rate = spec.flux.value * (4.0 * np.pi * D_A**2 * (1.0 + redshift) ** 2)
return cls(spec.ebins.value * (1.0 + redshift), rate, binscale=spec.binscale)
[docs]
@classmethod
def from_powerlaw(cls, photon_index, norm, emin, emax, nbins, binscale="linear"):
"""
Create a count rate spectrum from a power-law model, in the frame of the
source. The form of the model is R(E) = norm*(e/(1 keV))**-photon_index.
Parameters
----------
photon_index : float
The photon index of the source.
norm : float
The normalization of the source in units of
the spectrum, photons/s/keV, at 1 keV
in the source frame.
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the spectrum in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the spectrum in keV.
nbins : integer
The number of bins in the spectrum.
binscale : str, optional
The scale of the energy binning: "linear" or "log".
Default: "linear"
"""
return super().from_powerlaw(photon_index, 0.0, norm, emin, emax, nbins, binscale=binscale)
@classmethod
def from_xstar_model(cls, xstar_file, which_spectrum):
"""
Load a count rate spectrum from an XSTAR output FITS file.
See the XSTAR manual (link below) for details on how to generate the
output file.
https://heasarc.gsfc.nasa.gov/docs/software/xstar/
Parameters
----------
xstar_file : str
The path to the XSTAR output FITS file, typically with
"spect1" in the filename.
which_spectrum : str
Which spectrum to extract from the file. Options are:
"emit_outward", "emit_inward", "transmitted", "incident"
"""
from astropy.io import fits
with fits.open(xstar_file) as f:
if "XSTAR_SPECTRA" not in f:
raise ValueError("No XSTAR_SPECTRA extension found in file!")
hdu = f["XSTAR_SPECTRA"]
# energies in file are bin left edges in eV
elow = hdu.data["energy"].astype("float64") * 1.0e-3 # convert from eV to keV
# bin sizes are logarithmic, use the size of the last bin to get the upper edge
de = elow[-1] / elow[-2]
emax = elow[-1] * de
ebins = np.append(elow, emax)
emid = 0.5 * (ebins[1:] + ebins[:-1])
if which_spectrum not in hdu.data.dtype.names:
raise ValueError(
f"Spectrum '{which_spectrum}' not found in XSTAR file! "
f"Options are: {hdu.data.dtype.names}"
)
rate = hdu.data[which_spectrum] * 1.0e38
rate /= emid * erg_per_keV
rate *= erg_per_keV
return cls(ebins, rate, binscale="custom")
[docs]
def get_lum_in_band(self, emin, emax):
"""
Determine the total photon count rate and luminosity
within a band specified by an energy range.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity
The minimum energy in the band, in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy in the band, in keV.
Returns
-------
A tuple of values for the rate/luminosity in the
band: the first value is in terms of the photon
rate, the second value is in terms of the energy rate
(or simply the luminosity).
"""
return self._get_total_in_band(emin, emax)
def get_flux_in_band(self, emin, emax):
issue_deprecation_warning(
"'get_flux_in_band' is deprecated for CountRateSpectrum, use 'get_lum_in_band' instead!",
)
return self.get_lum_in_band(emin, emax)
[docs]
class ConvolvedSpectrum(CountRateSpectrum):
def __init__(self, ebins, flux, arf, rmf=None, binscale="linear", noisy=False, exp_time=None):
from numbers import Number
from soxs.response import (
AuxiliaryResponseFile,
FlatResponse,
RedistributionMatrixFile,
)
super().__init__(ebins, flux, binscale=binscale)
if isinstance(arf, Number):
arf = FlatResponse(ebins[0], ebins[-1], arf, ebins.size - 1)
elif isinstance(arf, str):
arf = AuxiliaryResponseFile(arf)
self.arf = arf
if rmf is not None and isinstance(rmf, str):
rmf = RedistributionMatrixFile(rmf)
self.rmf = rmf
self.noisy = noisy
if exp_time is not None:
exp_time = parse_value(exp_time, "s") * u.s
self.exp_time = exp_time
if self.exp_time is not None and self.noisy:
self._spec_err = (
np.sqrt(self._spec * self.exp_time * self.de).value * u.photon / (self.exp_time * self.de)
)
else:
self._spec_err = None
_counts = None
@property
def counts(self):
if self._counts is None and self.exp_time is not None:
counts = (self.rate * self.exp_time).value
if self.noisy:
counts = np.rint(counts.value).astype("int")
self._counts = counts * u.photon
return self._counts
_counts_err = None
@property
def counts_err(self):
if self._spec_err is not None:
self._counts_err = self._spec_err * self.exp_time * self.de
elif self._counts_err is None and self.exp_time is not None:
self._counts_err = np.sqrt(self.counts.value) * u.photon
return self._counts_err
def _check_binning_units(self, other):
super()._check_binning_units(other)
if self.noisy != other.noisy:
raise RuntimeError("The noisy flags for these two spectra are not the same!")
bad_exp = self.exp_time is None and other.exp_time is not None
bad_exp |= self.exp_time is not None and other.exp_time is None
if self.exp_time is not None and other.exp_time is not None:
bad_exp |= not np.isclose(self.exp_time.value, other.exp_time.value)
if bad_exp:
raise RuntimeError("The exposure times for these two spectra are not the same!")
bad_rmf = self.rmf is not None and other.rmf is None
bad_rmf |= self.rmf is None and other.rmf is not None
bad_rmf |= self.rmf.filename != other.rmf.filename
if bad_rmf:
raise RuntimeError("The RMF for these two spectra are not the same!")
bad_arf = self.arf is not None and other.arf is None
bad_arf |= self.arf is None and other.arf is not None
bad_arf |= self.arf.filename != other.arf.filename
if bad_arf:
raise RuntimeError("The ARF for these two spectra are not the same!")
def __copy__(self):
# Create a new instance and copy attributes
return type(self)(
self.ebins,
self._spec,
self.arf,
rmf=self.rmf,
binscale=self.binscale,
noisy=self.noisy,
exp_time=self.exp_time,
)
def __deepcopy__(self, memo):
# Create a new instance and deep copy attributes
from soxs.response import (
AuxiliaryResponseFile,
FlatResponse,
RedistributionMatrixFile,
)
if self.arf is None:
arf = None
elif isinstance(self.arf, FlatResponse):
arf = FlatResponse(self.arf.ebins[0], self.arf.ebins[-1], self.arf.max_area, self.arf.nbins)
else:
arf = AuxiliaryResponseFile(self.arf.filename)
rmf = None if self.rmf is None else RedistributionMatrixFile(self.rmf.filename)
return type(self)(
copy.deepcopy(self.ebins, memo),
copy.deepcopy(self._spec, memo),
arf,
rmf=rmf,
binscale=self.binscale,
noisy=self.noisy,
exp_time=self.exp_time,
)
def __add__(self, other):
self._check_binning_units(other)
return ConvolvedSpectrum(
self.ebins,
self._spec + other._spec,
self.arf,
binscale=self.binscale,
rmf=self.rmf,
noisy=self.noisy,
exp_time=self.exp_time,
)
def __sub__(self, other):
self._check_binning_units(other)
return ConvolvedSpectrum(
self.ebins,
self._spec - other._spec,
self.arf,
binscale=self.binscale,
rmf=self.rmf,
noisy=self.noisy,
exp_time=self.exp_time,
)
def __mul__(self, other):
return ConvolvedSpectrum(
self.ebins,
other * self._spec,
self.arf,
binscale=self.binscale,
rmf=self.rmf,
noisy=self.noisy,
exp_time=self.exp_time,
)
@classmethod
def from_pha_file(cls, specfile):
"""
Create a :class:`~soxs.spectra.base.ConvolvedSpectrum` object by reading it in
from a PHA/PI file.
Parameters
----------
specfile : str
The PHA/PI file to be opened.
"""
from astropy.io import fits
from soxs.response import AuxiliaryResponseFile, RedistributionMatrixFile
with fits.open(specfile) as f:
hdu = f["SPECTRUM"]
if "COUNT_RATE" in hdu.data:
rate = hdu.data["COUNT_RATE"].astype("float64")
elif "RATE" in hdu.data:
rate = hdu.data["RATE"].astype("float64")
elif "COUNTS" in hdu.data and "EXPOSURE" in hdu.header:
rate = hdu.data["COUNTS"].astype("float64") / hdu.header["EXPOSURE"]
else:
raise RuntimeError("Cannot determine count rate from this spectrum file!!")
rate *= u.photon / u.s
arf = AuxiliaryResponseFile(hdu.header["ANCRFILE"])
rmf = RedistributionMatrixFile(hdu.header["RESPFILE"])
ebins = np.append(rmf.ebounds_data["E_MIN"], rmf.ebounds_data["E_MAX"][-1]) * u.keV
de = np.diff(ebins)
rate /= de
return cls(
ebins,
rate,
arf,
binscale="linear",
rmf=rmf,
noisy=bool(hdu.header["POISSERR"]),
exp_time=hdu.header["EXPOSURE"],
)
def to_pha_file(self, specfile, overwrite=False):
"""
Write a :class:`~soxs.spectra.base.ConvolvedSpectrum` object to a
PHA/PI file.
Parameters
----------
specfile : str
The PHA/PI file to be written.
overwrite : bool, optional
Whether to overwrite an already existing file. Default: False
"""
from soxs.events.spectra import _write_spectrum
event_params = {
"RESPFILE": os.path.split(self.rmf.filename)[-1],
"ANCRFILE": os.path.split(self.arf.filename)[-1],
"TELESCOP": self.rmf.header["TELESCOP"],
"INSTRUME": self.rmf.header["INSTRUME"],
"MISSION": self.rmf.header.get("MISSION", ""),
"CHANTYPE": self.rmf.chan_type,
}
if self.exp_time is not None:
event_params["EXPOSURE"] = self.exp_time.value
spec = self.counts.value
else:
spec = self.rate.value
bins = (np.arange(self.rmf.n_ch) + self.rmf.cmin).astype("int32")
_write_spectrum(
bins,
spec,
event_params,
specfile,
overwrite=overwrite,
noisy=self.noisy,
)
@classmethod
def convolve(cls, spectrum, arf, use_arf_energies=False, rmf=None):
"""
Generate a model convolved spectrum by convolving a spectrum with an
ARF, and optionally an RMF.
Parameters
----------
spectrum : Spectrum object
The input spectrum to convolve with.
arf : str or :class:`~soxs.response.AuxiliaryResponseFile`
The ARF to use in the convolution.
use_arf_energies : bool, optional
If True, use the energy binning of the ARF for
the convolved spectrum. Default: False, which uses
the original spectral binning.
rmf : str or :class:`~soxs.response.RedistributionMatrixFile`, optional
The RMF to use in the convolution if desired. Default is no RMF.
"""
from soxs.response import AuxiliaryResponseFile, RedistributionMatrixFile
if not isinstance(arf, AuxiliaryResponseFile):
arf = AuxiliaryResponseFile(arf)
if rmf is not None and not isinstance(rmf, RedistributionMatrixFile):
rmf = RedistributionMatrixFile(rmf)
if use_arf_energies or rmf is not None:
rate = (
regrid_spectrum(
arf.ebins,
spectrum.ebins.value,
spectrum._spec.value * spectrum.de.value,
)
* arf.eff_area
)
if rmf is not None:
rate = rmf.convolve_spectrum((rate * arf.de).value, 1.0, noisy=False, rate=True)
rate = u.Quantity(rate / arf.de, "keV-1 ph s-1")
binscale = "linear"
ebins = u.Quantity(arf.ebins, "keV")
else:
earea = arf.interpolate_area(spectrum.emid.value)
rate = spectrum._spec * earea
binscale = spectrum.binscale
ebins = spectrum.ebins
return cls(ebins, rate, arf, binscale=binscale, rmf=rmf)
[docs]
def new_spec_from_band(self, emin, emax):
"""
Create a new :class:`~soxs.spectra.base.ConvolvedSpectrum` object
from a subset of an existing one defined by a particular
energy band.
Parameters
----------
emin : float, (value, unit) tuple, or Quantity
The minimum energy of the band in keV.
emax : float, (value, unit) tuple, or Quantity
The maximum energy of the band in keV.
"""
ebins, flux = self._new_spec_from_band(emin, emax)
return ConvolvedSpectrum(
ebins,
flux,
self.arf,
binscale=self.binscale,
noisy=self.noisy,
rmf=self.rmf,
exp_time=self.exp_time,
)
[docs]
def deconvolve(self):
"""
Return the deconvolved Spectrum
object associated with this convolved spectrum.
"""
if self.rmf is not None:
raise NotImplementedError("deconvolve is not implemented for ConvolvedSpectra with RMFs!")
earea = self.arf.interpolate_area(self.emid)
flux = self._spec / earea
flux = np.nan_to_num(flux.value)
return Spectrum(self.ebins.value, flux, binscale=self.binscale)
[docs]
def generate_energies(self, t_exp, prng=None, quiet=False):
"""
Generate photon energies from this convolved spectrum given an
exposure time.
Parameters
----------
t_exp : float, (value, unit) tuple, or Quantity
The exposure time in seconds.
prng : RandomState object, integer, or None
A pseudo-random number generator. Typically will only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
quiet : bool, optional
If True, log messages will not be displayed when
creating energies. Useful if you have to loop over
a lot of spectra. Default: False
"""
if self.rmf is not None:
raise NotImplementedError("generate_energies is not implemented for ConvolvedSpectra with RMFs!")
t_exp = parse_value(t_exp, "s")
prng = parse_prng(prng)
rate = self.total_flux.value
energy = _generate_energies(self, t_exp, rate, prng, self.binscale, quiet=quiet)
earea = self.arf.interpolate_area(energy).value
flux = np.sum(energy) * erg_per_keV / t_exp / earea.sum()
energies = Energies(energy, flux)
return energies
@classmethod
def from_constant(cls, const_value, emin, emax, nbins, binscale="linear"):
raise NotImplementedError
@classmethod
def from_powerlaw(cls, photon_index, norm, emin, emax, nbins, binscale="linear"):
raise NotImplementedError
@classmethod
def from_xspec_model(
cls,
model_string,
params,
emin,
emax,
nbins,
binscale="linear",
xspec_settings=None,
):
raise NotImplementedError
@classmethod
def from_xspec_script(cls, infile, emin, emax, nbins, binscale="linear", xspec_settings=None):
raise NotImplementedError
def apply_foreground_absorption(self, nH, model="wabs", redshift=0.0, abund_table="angr"):
raise NotImplementedError
def add_absorption_line(self, line_center, line_width, equiv_width, line_type="gaussian"):
raise NotImplementedError
def add_absorption_lines(self, line_center, line_width, equiv_lines):
raise NotImplementedError