import numpy as np
from astropy.io import fits
from pathlib import PurePath
from soxs.events.utils import _region_filter
from soxs.utils import parse_value
def _write_spectrum(
bins,
spec,
parameters,
specfile,
overwrite=False,
noisy=True,
):
exp_time = parameters.get("EXPOSURE", None)
spectype = parameters["CHANTYPE"]
if noisy:
cnt_fmt = "1J"
cnt_type = "int32"
else:
cnt_fmt = "1D"
cnt_type = "float64"
col_ch = fits.Column(name="CHANNEL", format="1J", array=bins)
col_chf = fits.Column(name=spectype.upper(), format="1D", array=bins.astype("float64"))
cols = [col_ch, col_chf]
if exp_time is None:
rate = spec
else:
rate = spec / exp_time
col_cnt = fits.Column(name="COUNTS", format=cnt_fmt, array=spec.astype(cnt_type))
cols.append(col_cnt)
col_rate = fits.Column(name="COUNT_RATE", format="1D", array=rate)
cols.append(col_rate)
coldefs = fits.ColDefs(cols)
tbhdu = fits.BinTableHDU.from_columns(coldefs)
tbhdu.name = "SPECTRUM"
tbhdu.header["DETCHANS"] = spec.size
tbhdu.header["TOTCTS"] = spec.sum()
tbhdu.header["EXPOSURE"] = exp_time
tbhdu.header["LIVETIME"] = exp_time
tbhdu.header["CONTENT"] = spectype
tbhdu.header["HDUCLASS"] = "OGIP"
tbhdu.header["HDUCLAS1"] = "SPECTRUM"
tbhdu.header["HDUCLAS2"] = "TOTAL"
tbhdu.header["HDUCLAS3"] = "TYPE:I"
tbhdu.header["HDUCLAS4"] = "COUNT"
tbhdu.header["HDUVERS"] = "1.1.0"
tbhdu.header["HDUVERS1"] = "1.1.0"
tbhdu.header["CHANTYPE"] = spectype
tbhdu.header["BACKFILE"] = "none"
tbhdu.header["CORRFILE"] = "none"
tbhdu.header["POISSERR"] = noisy
for key in ["RESPFILE", "ANCRFILE", "MISSION", "TELESCOP", "INSTRUME"]:
tbhdu.header[key] = parameters[key]
tbhdu.header["AREASCAL"] = 1.0
tbhdu.header["CORRSCAL"] = 0.0
tbhdu.header["BACKSCAL"] = 1.0
hdulist = fits.HDUList([fits.PrimaryHDU(), tbhdu])
hdulist.writeto(specfile, overwrite=overwrite)
def _make_spectrum(
evtfile,
region=None,
format="ds9",
exclude=False,
emin=None,
emax=None,
tmin=None,
tmax=None,
):
from soxs.response import RedistributionMatrixFile
parameters = {}
if isinstance(evtfile, str):
with fits.open(evtfile) as f:
hdu = f["EVENTS"]
evt_mask = np.ones(hdu.data["ENERGY"].size, dtype="bool")
if region is not None:
evt_mask &= _region_filter(hdu, region, format=format, exclude=exclude)
if tmin is not None:
tmin = parse_value(tmin, "s")
evt_mask &= hdu.data["TIME"] > tmin
else:
tmin = 0.0
if tmax is not None:
tmax = parse_value(tmax, "s")
evt_mask &= hdu.data["TIME"] < tmax
else:
tmax = hdu.header["EXPOSURE"]
if emin is not None:
emin = parse_value(emin, "keV") * 1000.0
evt_mask &= hdu.data["ENERGY"] > emin
if emax is not None:
emax = parse_value(emax, "keV") * 1000.0
evt_mask &= hdu.data["ENERGY"] < emax
spectype = hdu.header["CHANTYPE"]
rmf = hdu.header["RESPFILE"]
p = hdu.data[spectype][evt_mask]
parameters["EXPOSURE"] = tmax - tmin
for key in ["RESPFILE", "ANCRFILE", "MISSION", "TELESCOP", "INSTRUME"]:
parameters[key] = hdu.header[key]
else:
rmf = evtfile["rmf"]
spectype = evtfile["channel_type"]
p = evtfile[spectype]
parameters["RESPFILE"] = PurePath(rmf).parts[-1]
parameters["ANCRFILE"] = PurePath(evtfile["arf"]).parts[-1]
parameters["TELESCOP"] = evtfile["telescope"]
parameters["INSTRUME"] = evtfile["instrument"]
parameters["MISSION"] = evtfile["mission"]
parameters["EXPOSURE"] = evtfile["exposure_time"]
parameters["CHANTYPE"] = spectype
rmf = RedistributionMatrixFile(rmf)
spec = np.bincount(p, minlength=rmf.n_ch + rmf.cmin)[rmf.cmin :]
bins = (np.arange(rmf.n_ch) + rmf.cmin).astype("int32")
return bins, spec, parameters
[docs]
def write_spectrum(
evtfile,
specfile,
region=None,
format="ds9",
exclude=False,
emin=None,
emax=None,
tmin=None,
tmax=None,
overwrite=False,
):
r"""
Bin event energies into a spectrum and write it to
a FITS binary table. Does not do any grouping of
channels, and will automatically determine PI or PHA.
Optionally allows filtering based on time, energy, or
spatial region.
Parameters
----------
evtfile : string
The name of the event file to read the events from.
specfile : string
The name of the spectrum file to be written.
region : string, Region, or Regions, optional
The region(s) to be used for the filtering. Default: None
format : string, optional
The file format specifier for the region. "ds9",
"crtf", "fits", etc. Default: "ds9"
exclude : boolean, optional
If True, the events in a specified *region* will be excluded
instead of included in the spectrum. Default: False
emin : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The minimum energy of the events to be included, in keV.
Default is the lowest energy available.
emax : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The maximum energy of the events to be included, in keV.
Default is the highest energy available.
tmin : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The minimum energy of the events to be included, in seconds.
Default is the earliest time available.
tmax : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The maximum energy of the events to be included, in seconds.
Default is the latest time available.
overwrite : boolean, optional
Whether to overwrite an existing file with
the same name. Default: False
"""
bins, spec, parameters = _make_spectrum(
evtfile,
region=region,
format=format,
exclude=exclude,
emin=emin,
emax=emax,
tmin=tmin,
tmax=tmax,
)
_write_spectrum(bins, spec, parameters, specfile, overwrite=overwrite)
[docs]
def plot_spectrum(
specfile,
plot_energy=True,
ebins=None,
lw=2,
xmin=None,
xmax=None,
ymin=None,
ymax=None,
xscale=None,
yscale=None,
label=None,
fontsize=18,
fig=None,
ax=None,
plot_counts=False,
per_keV=True,
noerr=False,
plot_used=False,
plot_steps=False,
**kwargs,
):
"""
Make a quick Matplotlib plot of a convolved spectrum
from a file. A Matplotlib figure and axis is returned.
Parameters
----------
specfile : string
The file to be opened for plotting.
plot_energy : boolean, optional
Whether to plot in energy or channel space. Default is
to plot in energy, unless the RMF for the spectrum
cannot be found.
ebins : integer, tuple, or NumPy array, optional
If set, these are the energy bin edges in which the spectrum
will be binned. If an integer, the counts spectrum will be reblocked
by this number. If a 2-tuple, the first element is the minimum
significance (assuming Poisson statistics) of each bin and the
second element is the maximum number of channels to be combined
in the bin. If a NumPy array, these are the bins that will be
used. If not set, the counts will be binned according
to channel. Default: None
lw : float, optional
The width of the lines in the plots. Default: 2.0 px.
xmin : float, optional
The left-most energy (in keV) or channel to plot. Default is the
minimum value in the spectrum.
xmax : float, optional
The right-most energy (in keV) or channel to plot. Default is the
maximum value in the spectrum.
ymin : float, optional
The lower extent of the y-axis. By default it is set automatically.
ymax : float, optional
The upper extent of the y-axis. By default it is set automatically.
xscale : string, optional
The scaling of the x-axis of the plot. Default: "linear"
yscale : string, optional
The scaling of the y-axis of the plot. Default: "linear"
label : string, optional
The label of the spectrum. Default: None
fontsize : int
Font size for labels and axes. Default: 18
fig : :class:`~matplotlib.figure.Figure`, optional
A Figure instance to plot in. Default: None, one will be
created if not provided.
ax : :class:`~matplotlib.axes.Axes`, optional
An Axes instance to plot in. Default: None, one will be
created if not provided.
plot_counts : boolean, optional
If set to True, the counts instead of the count rate will
be plotted. Default: False
per_keV : boolean, optional
If set to True, the spectrum will be plotted in counts per keV.
Otherwise, it will be plotted in counts per bin. Default: True
noerr : boolean, optional
If True, the spectrum will be plotted without errorbars. This
will always be the case if the spectrum is not noisy.
Default: False
plot_used : boolean, optional
If set to True, only the bins which contain more than 0
counts will be plotted. Default: False
plot_steps : boolean, optional
If set to True, the spectrum will be plotted with steps. Default: False
Returns
-------
A tuple of the :class:`~matplotlib.figure.Figure` object, the
:class:`~matplotlib.axes.Axes` object, and the NumPy array of
energy bins that are used.
"""
import matplotlib.pyplot as plt
from astropy.nddata import block_reduce
from soxs.instrument import RedistributionMatrixFile
f = fits.open(specfile)
hdu = f["SPECTRUM"]
chantype = hdu.header["CHANTYPE"]
y = hdu.data["COUNTS"].astype("float64")
if not hdu.header["POISSERR"]:
noerr = True
if plot_energy:
rmf = hdu.header.get("RESPFILE", None)
if rmf.strip() == "":
rmf = None
if rmf is not None:
rmf = RedistributionMatrixFile(rmf)
emin = rmf.ebounds_data["E_MIN"]
emax = rmf.ebounds_data["E_MAX"]
e = 0.5 * (emin + emax)
if ebins is None:
ebins = np.append(emin, emax[-1])
elif isinstance(ebins, int):
y = block_reduce(y, ebins)
ebins = np.append(emin[::ebins], emax[-1])
else:
if isinstance(ebins, tuple):
if len(ebins) != 2:
raise RuntimeError("If ebins is a tuple, it must have 2 elements!")
sigma, max_size = ebins
sigma2 = sigma * sigma
ebins = [emin[0]]
sum = 0.0
bin_size = 0
for i in range(y.size):
sum += y[i]
bin_size += 1
if sum >= sigma2 or bin_size == max_size or i == y.size - 1:
ebins.append(emax[i])
sum = 0.0
max_size = 0
ebins = np.array(ebins)
y = np.histogram(e, ebins, weights=y)[0].astype("float64")
xlabel = "Energy (keV)"
xmid = 0.5 * (ebins[1:] + ebins[:-1])
xerr = 0.5 * np.diff(ebins)
else:
raise RuntimeError(
"Cannot find the RMF associated with this spectrum, so I cannot plot in energy!"
)
else:
try:
xmid = hdu.data[chantype]
except KeyError:
try:
xmid = hdu.data["CHANNEL"]
except KeyError as e:
raise KeyError("Cannot find CHANNEL or CHANTYPE in the spectrum!") from e
xerr = 0.5
xlabel = f"Channel ({chantype})"
dx = 2.0 * xerr
yerr = np.sqrt(y)
if not plot_counts:
y /= hdu.header["EXPOSURE"]
yerr /= hdu.header["EXPOSURE"]
if per_keV and plot_energy:
yunit = "keV"
y /= dx
yerr /= dx
else:
yunit = "bin"
f.close()
if fig is None:
fig = plt.figure(figsize=(10, 10))
if xscale is None:
if ax is None:
xscale = "linear"
else:
xscale = ax.get_xscale()
if yscale is None:
if ax is None:
yscale = "linear"
else:
yscale = ax.get_yscale()
if ax is None:
ax = fig.add_subplot(111)
if plot_used:
used = y > 0
xmid = xmid[used]
y = y[used]
xerr = xerr[used]
yerr = yerr[used]
if noerr:
ax.plot(xmid, y, lw=lw, label=label, **kwargs)
else:
if plot_steps:
fmt = ""
ax.step(xmid, y, where="mid", **kwargs)
else:
fmt = "."
ax.errorbar(xmid, y, yerr=yerr, xerr=xerr, fmt=fmt, lw=lw, label=label, **kwargs)
ax.set_xscale(xscale)
ax.set_yscale(yscale)
ax.set_xlim(xmin, xmax)
ax.set_ylim(ymin, ymax)
ax.set_xlabel(xlabel, fontsize=fontsize)
if plot_counts:
ylabel = "Counts (counts/{0})"
else:
ylabel = "Count Rate (counts/s/{0})"
ax.set_ylabel(ylabel.format(yunit), fontsize=fontsize)
ax.tick_params(reset=True, labelsize=fontsize)
return fig, ax, ebins