"""
Classes for generating lists of detected events
"""
import os
import astropy.wcs as pywcs
import h5py
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from soxs.constants import erg_per_keV
from pyxsim.utils import mylog, parse_value
[docs]
class EventList:
def __init__(self, filespec):
"""
Read an EventList from disk so it can be exported to
other formats.
Parameters
----------
filespec : str
A filename, list of filenames, or globbable path
that yields a single or list of HDF5 files containing
event data.
"""
from glob import glob
if filespec.endswith(".h5"):
try:
with h5py.File(filespec, "r") as f:
if "info" in f and "filenames" in f["info"].attrs:
filenames = list(f["info"].attrs["filenames"])
else:
filenames = glob(filespec)
except OSError:
# Old way of doing things
filenames = glob(filespec)
elif isinstance(filespec, list):
if not np.all([fn.endswith(".h5") for fn in filespec]):
raise RuntimeError("Not all filenames are valid!")
filenames = filespec
else:
raise RuntimeError(f"Invalid EventList file spec: {filespec}")
filenames.sort()
self.parameters = {}
self.info = {}
self.num_events = []
self.filenames = []
get_params = True
for fn in filenames:
with h5py.File(fn, "r") as f:
n_events = f["data"]["xsky"].size
if n_events == 0:
continue
p = f["parameters"]
self.num_events.append(f["data"]["xsky"].size)
if get_params:
for field in p:
if isinstance(p[field][()], (str, bytes)):
self.parameters[field] = p[field].asstr()[()]
else:
self.parameters[field] = p[field][()]
if "info" in f:
for k, v in f["info"].attrs.items():
self.info[k] = v
get_params = False
self.filenames.append(fn)
self.num_files = len(self.filenames)
self.tot_num_events = np.sum(self.num_events)
self.observer = self.parameters.get("observer", "external")
[docs]
def write_fits_file(self, fitsfile, fov, nx, overwrite=False):
"""
Write events to a FITS binary table file. The result is an
"event file" which can be opened in ds9, binned into a
spectrum, etc., but NOTE that this does NOT represent an
actual observation since no instrumental effects are
included.
Parameters
----------
fitsfile : string
The name of the event file to write.
fov : float, (value, unit) tuple, :class:`~yt.units.yt_array.YTQuantity`, or :class:`~astropy.units.Quantity`
The field of view of the event file. If units are not
provided, they are assumed to be in arcminutes.
nx : integer
The resolution of the image (number of pixels on a side).
overwrite : boolean, optional
Set to True to overwrite a previous file.
"""
from astropy.time import Time, TimeDelta
fov = parse_value(fov, "arcmin")
t_begin = Time.now()
dt = TimeDelta(self.parameters["exp_time"], format="sec")
t_end = t_begin + dt
dtheta = fov.to_value("deg") / nx
wcs = pywcs.WCS(naxis=2)
wcs.wcs.crpix = [0.5 * (nx + 1)] * 2
wcs.wcs.crval = self.parameters["sky_center"]
wcs.wcs.cdelt = [-dtheta, dtheta]
wcs.wcs.ctype = ["RA---TAN", "DEC--TAN"]
wcs.wcs.cunit = ["deg"] * 2
n_events = 0
e = []
x = []
y = []
for fn in self.filenames:
with h5py.File(fn, "r") as f:
d = f["data"]
xx, yy = wcs.wcs_world2pix(d["xsky"][:], d["ysky"][:], 1)
keepx = np.logical_and(xx >= 0.5, xx <= float(nx) + 0.5)
keepy = np.logical_and(yy >= 0.5, yy <= float(nx) + 0.5)
keep = np.logical_and(keepx, keepy)
n_events += keep.sum()
e.append(d["eobs"][keep])
x.append(xx[keep])
y.append(yy[keep])
mylog.info(
"Threw out %d events because they fell outside the " "field of view.",
self.tot_num_events - n_events,
)
col_e = fits.Column(
name="ENERGY", format="E", unit="eV", array=np.concatenate(e) * 1000.0
)
col_x = fits.Column(name="X", format="D", unit="pixel", array=np.concatenate(x))
col_y = fits.Column(name="Y", format="D", unit="pixel", array=np.concatenate(y))
cols = [col_e, col_x, col_y]
coldefs = fits.ColDefs(cols)
tbhdu = fits.BinTableHDU.from_columns(coldefs)
tbhdu.name = "EVENTS"
tbhdu.header["MTYPE1"] = "sky"
tbhdu.header["MFORM1"] = "x,y"
tbhdu.header["MTYPE2"] = "EQPOS"
tbhdu.header["MFORM2"] = "RA,DEC"
tbhdu.header["TCTYP2"] = "RA---TAN"
tbhdu.header["TCTYP3"] = "DEC--TAN"
tbhdu.header["TCRVL2"] = self.parameters["sky_center"][0]
tbhdu.header["TCRVL3"] = self.parameters["sky_center"][1]
tbhdu.header["TCDLT2"] = -dtheta
tbhdu.header["TCDLT3"] = dtheta
tbhdu.header["TCRPX2"] = 0.5 * (nx + 1)
tbhdu.header["TCRPX3"] = 0.5 * (nx + 1)
tbhdu.header["TLMIN2"] = 0.5
tbhdu.header["TLMIN3"] = 0.5
tbhdu.header["TLMAX2"] = float(nx) + 0.5
tbhdu.header["TLMAX3"] = float(nx) + 0.5
tbhdu.header["EXPOSURE"] = self.parameters["exp_time"]
tbhdu.header["TSTART"] = 0.0
tbhdu.header["TSTOP"] = self.parameters["exp_time"]
tbhdu.header["AREA"] = self.parameters["area"]
tbhdu.header["HDUVERS"] = "1.1.0"
tbhdu.header["RADECSYS"] = "FK5"
tbhdu.header["EQUINOX"] = 2000.0
tbhdu.header["HDUCLASS"] = "OGIP"
tbhdu.header["HDUCLAS1"] = "EVENTS"
tbhdu.header["HDUCLAS2"] = "ACCEPTED"
tbhdu.header["DATE"] = t_begin.tt.isot
tbhdu.header["DATE-OBS"] = t_begin.tt.isot
tbhdu.header["DATE-END"] = t_end.tt.isot
hdulist = [fits.PrimaryHDU(), tbhdu]
fits.HDUList(hdulist).writeto(fitsfile, overwrite=overwrite)
[docs]
def get_data(self, i):
with h5py.File(self.filenames[i]) as f:
d = f["data"]
if self.num_events[i] > 0:
if self.observer == "internal":
c = SkyCoord(
d["xsky"][()], d["ysky"][()], unit="deg", frame="galactic"
)
ra = c.icrs.ra.value
dec = c.icrs.dec.value
else:
ra = d["xsky"][()]
dec = d["ysky"][()]
energy = d["eobs"][()]
flux = (
np.sum(energy * erg_per_keV)
/ self.parameters["exp_time"]
/ self.parameters["area"]
)
else:
ra = np.array([])
dec = np.array([])
energy = np.array([])
flux = 0.0
return ra, dec, energy, flux
[docs]
def write_to_simput(self, prefix, overwrite=False):
r"""
Write events to a SIMPUT catalog that may be utilized by various
instrument simulators.
Parameters
----------
prefix : string
The filename prefix. The files to be written have the
signature:
f"{prefix}_simput.fits", f"{prefix}_phlist.fits", etc.
overwrite : boolean, optional
Set to True to overwrite previous files.
"""
from soxs.simput import SimputCatalog, SimputPhotonList
simput_file = f"{prefix}_simput.fits"
begin_cat = True
for i, fn in enumerate(self.filenames):
if len(self.filenames) == 1:
phlist_file = f"{prefix}_phlist.fits"
name = os.path.basename(prefix)
else:
phlist_file = f"{prefix}_phlist.{i:04d}.fits"
name = f"{os.path.basename(prefix)}.{i:04d}"
ra, dec, energy, flux = self.get_data(i)
if ra.size > 0:
src = SimputPhotonList(ra, dec, energy, flux, name=name)
if begin_cat:
cat = SimputCatalog.from_source(
simput_file,
src,
src_filename=phlist_file,
overwrite=overwrite,
)
begin_cat = False
else:
cat.append(src, src_filename=phlist_file, overwrite=overwrite)
else:
mylog.warning("No events found in file %s, so skipping.", fn)
[docs]
def write_fits_image(
self, imagefile, fov, nx, emin=None, emax=None, overwrite=False
):
r"""
Generate an image by binning X-ray counts and write it to a FITS file.
Parameters
----------
imagefile : string
The name of the image file to write.
fov : float, (value, unit) tuple, :class:`~yt.units.yt_array.YTQuantity`, or :class:`~astropy.units.Quantity`
The field of view of the image. If units are not provided, they
are assumed to be in arcminutes.
nx : integer
The resolution of the image (number of pixels on a side).
emin : float, optional
The minimum energy of the photons to put in the image, in keV.
emax : float, optional
The maximum energy of the photons to put in the image, in keV.
overwrite : boolean, optional
Set to True to overwrite a previous file.
"""
fov = parse_value(fov, "arcmin")
if emin is None:
emin = -1.0
if emax is None:
emax = 1.0e10
dtheta = fov.to_value("deg") / nx
xbins = np.linspace(0.5, float(nx) + 0.5, nx + 1, endpoint=True)
ybins = np.linspace(0.5, float(nx) + 0.5, nx + 1, endpoint=True)
wcs = pywcs.WCS(naxis=2)
wcs.wcs.crpix = [0.5 * (nx + 1)] * 2
wcs.wcs.crval = self.parameters["sky_center"]
wcs.wcs.cdelt = [-dtheta, dtheta]
wcs.wcs.ctype = ["RA---TAN", "DEC--TAN"]
wcs.wcs.cunit = ["deg"] * 2
H = np.zeros((nx, nx))
for fn in self.filenames:
with h5py.File(fn, "r") as f:
d = f["data"]
mask = np.logical_and(d["eobs"][:] >= emin, d["eobs"][:] <= emax)
xx, yy = wcs.wcs_world2pix(d["xsky"][mask], d["ysky"][mask], 1)
H += np.histogram2d(xx, yy, bins=[xbins, ybins])[0]
hdu = fits.PrimaryHDU(H.T)
hdu.header["MTYPE1"] = "EQPOS"
hdu.header["MFORM1"] = "RA,DEC"
hdu.header["CTYPE1"] = "RA---TAN"
hdu.header["CTYPE2"] = "DEC--TAN"
hdu.header["CRPIX1"] = 0.5 * (nx + 1)
hdu.header["CRPIX2"] = 0.5 * (nx + 1)
hdu.header["CRVAL1"] = self.parameters["sky_center"][0]
hdu.header["CRVAL2"] = self.parameters["sky_center"][1]
hdu.header["CUNIT1"] = "deg"
hdu.header["CUNIT2"] = "deg"
hdu.header["CDELT1"] = -dtheta
hdu.header["CDELT2"] = dtheta
hdu.header["EXPOSURE"] = self.parameters["exp_time"]
hdu.writeto(imagefile, overwrite=overwrite)
[docs]
def write_spectrum(self, specfile, emin, emax, nchan, overwrite=False):
r"""
Bin observer-frame event energies into a spectrum and write it to
a FITS binary table. This is for an *unconvolved* spectrum.
Parameters
----------
specfile : string
The name of the FITS file to be written.
emin : float
The minimum energy of the spectral bins in keV.
emax : float
The maximum energy of the spectral bins in keV.
nchan : integer
The number of channels.
overwrite : boolean, optional
Set to True to overwrite a previous file.
"""
spec = np.zeros(nchan)
ebins = np.linspace(emin, emax, nchan + 1, endpoint=True)
emid = 0.5 * (ebins[1:] + ebins[:-1])
for fn in self.filenames:
with h5py.File(fn, "r") as f:
d = f["data"]
spec += np.histogram(d["eobs"][:], bins=ebins)[0]
col1 = fits.Column(
name="CHANNEL", format="1J", array=np.arange(nchan).astype("int32") + 1
)
col2 = fits.Column(name="ENERGY", format="1D", array=emid.astype("float64"))
col3 = fits.Column(name="COUNTS", format="1J", array=spec.astype("int32"))
col4 = fits.Column(
name="COUNT_RATE", format="1D", array=spec / self.parameters["exp_time"]
)
coldefs = fits.ColDefs([col1, col2, col3, col4])
tbhdu = fits.BinTableHDU.from_columns(coldefs)
tbhdu.name = "SPECTRUM"
tbhdu.header["DETCHANS"] = spec.shape[0]
tbhdu.header["TOTCTS"] = spec.sum()
tbhdu.header["EXPOSURE"] = self.parameters["exp_time"]
tbhdu.header["LIVETIME"] = self.parameters["exp_time"]
tbhdu.header["CONTENT"] = "pi"
tbhdu.header["HDUCLASS"] = "OGIP"
tbhdu.header["HDUCLAS1"] = "SPECTRUM"
tbhdu.header["HDUCLAS2"] = "TOTAL"
tbhdu.header["HDUCLAS3"] = "TYPE:I"
tbhdu.header["HDUCLAS4"] = "COUNT"
tbhdu.header["HDUVERS"] = "1.1.0"
tbhdu.header["HDUVERS1"] = "1.1.0"
tbhdu.header["CHANTYPE"] = "pi"
tbhdu.header["BACKFILE"] = "none"
tbhdu.header["CORRFILE"] = "none"
tbhdu.header["POISSERR"] = True
tbhdu.header["RESPFILE"] = "none"
tbhdu.header["ANCRFILE"] = "none"
tbhdu.header["MISSION"] = "none"
tbhdu.header["TELESCOP"] = "none"
tbhdu.header["INSTRUME"] = "none"
tbhdu.header["AREASCAL"] = 1.0
tbhdu.header["CORRSCAL"] = 0.0
tbhdu.header["BACKSCAL"] = 1.0
hdulist = fits.HDUList([fits.PrimaryHDU(), tbhdu])
hdulist.writeto(specfile, overwrite=overwrite)