from numbers import Number
import numpy as np
from astropy import wcs
from astropy.io import fits
from pathlib import Path
from soxs.events.utils import _combine_events, _region_filter, wcs_from_header
from soxs.utils import parse_prng, parse_value
[docs]
def filter_events(
evtfile,
newfile,
region=None,
emin=None,
emax=None,
tmin=None,
tmax=None,
format="ds9",
exclude=False,
overwrite=False,
):
r"""
Parameters
----------
evtfile : string
The input events file to be read in.
newfile : string
The new event file that will be written.
region : string, Region, or Regions, optional
The region(s) to be used for the filtering. Default: None
emin : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The minimum energy of the events to be included, in keV.
Default is the lowest energy available.
emax : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The maximum energy of the events to be included, in keV.
Default is the highest energy available.
tmin : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The minimum energy of the events to be included, in seconds.
Default is the earliest time available.
tmax : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The maximum energy of the events to be included, in seconds.
Default is the latest time available.
format : string, optional
The file format specifier for the region, if supplied. "ds9",
"crtf", "fits", etc. Default: "ds9"
exclude : boolean, optional
If True, the events in a specified *region* will be excluded instead of
included. Default: False
overwrite : boolean, optional
Whether to overwrite an existing file with the
same name. Default: False
"""
with fits.open(evtfile) as f:
hdu = f["EVENTS"]
evt_mask = np.ones(hdu.data["ENERGY"].size, dtype="bool")
if region is not None:
evt_mask &= _region_filter(hdu, region, format=format, exclude=exclude)
if emin is not None:
emin = parse_value(emin, "keV") * 1000.0
evt_mask &= hdu.data["ENERGY"] > emin
if emax is not None:
emax = parse_value(emax, "keV") * 1000.0
evt_mask &= hdu.data["ENERGY"] < emax
if tmin is not None:
tmin = parse_value(tmin, "s")
evt_mask &= hdu.data["TIME"] > tmin
else:
tmin = 0.0
if tmax is not None:
tmax = parse_value(tmax, "s")
evt_mask &= hdu.data["TIME"] < tmax
else:
tmax = hdu.header["EXPOSURE"]
hdu.data = hdu.data[evt_mask]
hdu.header["EXPOSURE"] = tmax - tmin
gtihdu = f["STDGTI"]
gtihdu.data["START"][0] = tmin
gtihdu.data["STOP"][0] = tmax
gtihdu.header["TSTART"] = tmin
gtihdu.header["TSTOP"] = tmax
f.writeto(newfile, overwrite=overwrite)
[docs]
def write_radial_profile(
evt_file,
out_file,
ctr,
rmin,
rmax,
nbins,
ctr_type="celestial",
emin=None,
emax=None,
expmap_file=None,
overwrite=False,
):
r"""
Bin up events into a radial profile and write them to a FITS
table.
Parameters
----------
evt_file : string
Input event file.
out_file : string
The output file to write the profile to.
ctr : array-like
The central coordinate of the profile. Can either be in
celestial coordinates (the default) or "physical" pixel
coordinates. If the former, the ``ctr_type`` keyword
argument must be explicity set to "physical".
rmin : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`
The minimum radius of the profile, in arcseconds.
rmax : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`
The maximum radius of the profile, in arcseconds.
nbins : integer
The number of bins in the profile.
ctr_type : string, optional
The type of center coordinate. Either "celestial" for
(RA, Dec) coordinates (the default), or "physical" for
pixel coordinates.
emin : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The minimum energy of the events to be binned in keV.
Default is the lowest energy available.
emax : float, (value, unit) tuple, or :class:`~astropy.units.Quantity`, optional
The maximum energy of the events to be binned in keV.
Default is the highest energy available.
overwrite : boolean, optional
Whether to overwrite an existing file with the
same name. Default: False
expmap_file : string, optional
Supply an exposure map file to determine fluxes.
Default: None
"""
rmin = parse_value(rmin, "arcsec")
rmax = parse_value(rmax, "arcsec")
f = fits.open(evt_file)
hdu = f["EVENTS"]
orig_dx = hdu.header["TCDLT3"]
e = hdu.data["ENERGY"]
if emin is None:
emin = e.min()
else:
emin = parse_value(emin, "keV")
emin *= 1000.0
if emax is None:
emax = e.max()
else:
emax = parse_value(emax, "keV")
emax *= 1000.0
idxs = np.logical_and(e > emin, e < emax)
x = hdu.data["X"][idxs]
y = hdu.data["Y"][idxs]
exp_time = hdu.header["EXPOSURE"]
w = wcs_from_header(hdu.header)
dtheta = np.abs(w.wcs.cdelt[1]) * 3600.0
f.close()
if ctr_type == "celestial":
ctr = w.all_world2pix(ctr[0], ctr[1], 1)
r = np.sqrt((x - ctr[0]) ** 2 + (y - ctr[1]) ** 2)
rr = np.linspace(rmin / dtheta, rmax / dtheta, nbins + 1)
C = np.histogram(r, bins=rr)[0]
rbin = rr * dtheta
rmid = 0.5 * (rbin[1:] + rbin[:-1])
A = np.pi * (rbin[1:] ** 2 - rbin[:-1] ** 2)
Cerr = np.sqrt(C)
R = C / exp_time
Rerr = Cerr / exp_time
S = R / A
Serr = Rerr / A
col1 = fits.Column(name="RLO", format="D", unit="arcsec", array=rbin[:-1])
col2 = fits.Column(name="RHI", format="D", unit="arcsec", array=rbin[1:])
col3 = fits.Column(name="RMID", format="D", unit="arcsec", array=rmid)
col4 = fits.Column(name="AREA", format="D", unit="arcsec**2", array=A)
col5 = fits.Column(name="NET_COUNTS", format="D", unit="count", array=C)
col6 = fits.Column(name="NET_ERR", format="D", unit="count", array=Cerr)
col7 = fits.Column(name="NET_RATE", format="D", unit="count/s", array=R)
col8 = fits.Column(name="ERR_RATE", format="D", unit="count/s", array=Rerr)
col9 = fits.Column(name="SUR_BRI", format="D", unit="count/s/arcsec**2", array=S)
col10 = fits.Column(name="SUR_BRI_ERR", format="1D", unit="count/s/arcsec**2", array=Serr)
coldefs = [col1, col2, col3, col4, col5, col6, col7, col8, col9, col10]
if expmap_file is not None:
f = fits.open(expmap_file)
ehdu = f["EXPMAP"]
wexp = wcs.WCS(header=ehdu.header)
cel = w.all_pix2world(ctr[0], ctr[1], 1)
ectr = wexp.all_world2pix(cel[0], cel[1], 1)
exp = ehdu.data[:, :]
nx, ny = exp.shape
reblock = ehdu.header["CDELT2"] / orig_dx
x, y = np.mgrid[1 : nx + 1, 1 : ny + 1]
r = np.sqrt((x - ectr[0]) ** 2 + (y - ectr[1]) ** 2)
f.close()
E = np.histogram(r, bins=rr / reblock, weights=exp)[0] / np.histogram(r, bins=rr / reblock)[0]
with np.errstate(invalid="ignore", divide="ignore"):
F = R / E
Ferr = Rerr / E
SF = F / A
SFerr = Ferr / A
col11 = fits.Column(name="MEAN_SRC_EXP", format="D", unit="cm**2", array=E)
col12 = fits.Column(name="NET_FLUX", format="D", unit="count/s/cm**2", array=F)
col13 = fits.Column(name="NET_FLUX_ERR", format="D", unit="count/s/cm**2", array=Ferr)
col14 = fits.Column(name="SUR_FLUX", format="D", unit="count/s/cm**2/arcsec**2", array=SF)
col15 = fits.Column(name="SUR_FLUX_ERR", format="D", unit="count/s/cm**2/arcsec**2", array=SFerr)
coldefs += [col11, col12, col13, col14, col15]
tbhdu = fits.BinTableHDU.from_columns(fits.ColDefs(coldefs))
tbhdu.name = "PROFILE"
hdulist = fits.HDUList([fits.PrimaryHDU(), tbhdu])
hdulist.writeto(out_file, overwrite=overwrite)
[docs]
def fill_regions(in_img, out_img, src_reg, bkg_val, median=False, overwrite=False, format="ds9", prng=None):
"""
Fill regions which have been removed from an image (e.g. by
wavdetect) with a Poisson distribution of counts, either
from a single background region, a number of background regions
equal to the number of source regions, or from a single background
value.
Parameters
----------
in_img : string
The file path to the input image.
out_img : string
The file path to the image to be written with the filled
regions.
src_reg : string, Region, or Regions
The region(s) which will be filled.
bkg_val : float, string, Region, or Regions
The background value(s) to use for the Poisson distribution.
Can be a single value, a region, or a list of regions.
median : boolean
If True, then the median value of the counts within the
region will be used as the mean of the Poisson distribution
overwrite : boolean, optional
Whether to overwrite an existing file with
the same name. Default: False
format : string, optional
The file format specifier for the region. "ds9",
"crtf", "fits", etc. Default: "ds9"
"""
from regions import PixelRegion, Region, Regions, SkyRegion
prng = parse_prng(prng)
fill_op = np.median if median else np.mean
if isinstance(src_reg, str):
if Path(src_reg).exists():
src_reg = Regions.read(src_reg, format=format)
else:
src_reg = Regions.parse(src_reg, format=format)
elif not isinstance(src_reg, (Region, Regions)):
raise RuntimeError("'src_reg' argument is not valid!")
if isinstance(src_reg, Region):
src_reg = [src_reg]
if not isinstance(bkg_val, Number):
if isinstance(bkg_val, str):
if Path(bkg_val).exists():
bkg_val = Regions.read(bkg_val, format=format)
else:
bkg_val = Regions.parse(bkg_val, format=format)
elif not isinstance(bkg_val, (Region, Regions)):
raise RuntimeError("'bkg_val' argument is not valid!")
if isinstance(bkg_val, (Number, Region)):
bkg_val = [bkg_val] * len(src_reg)
if len(src_reg) != len(bkg_val):
raise ValueError(
"The number of background regions must either "
"be the same as the number of the source regions, "
"or there must be one background region!"
)
with fits.open(in_img) as f:
if f[0].is_image and f[0].header["NAXIS"] == 2:
hdu = f[0]
else:
hdu = f[1]
for src_r, bkg_v in zip(src_reg, bkg_val, strict=True):
if isinstance(src_r, PixelRegion):
src_mask = src_r.to_mask(mode="exact").astype("bool")
elif isinstance(src_r, SkyRegion):
w = wcs.WCS(header=hdu.header)
src_mask = src_r.to_pixel(w).to_mask(mode="exact")
else:
raise NotImplementedError
src_mask = src_mask.to_image(hdu.shape).astype("bool")
if isinstance(bkg_v, Number):
lam = bkg_v
else:
if isinstance(bkg_v, PixelRegion):
bkg_mask = bkg_v.to_mask().astype("bool")
elif isinstance(bkg_v, SkyRegion):
w = wcs.WCS(header=hdu.header)
bkg_mask = bkg_v.to_pixel(w).to_mask()
else:
raise NotImplementedError
bkg_mask = bkg_mask.to_image(hdu.shape).astype("bool")
lam = fill_op(hdu.data[bkg_mask])
n_src = src_mask.sum()
hdu.data[src_mask] = prng.poisson(lam=lam, size=n_src)
f.writeto(out_img, overwrite=overwrite)
[docs]
def merge_event_files(input_files, output_file, overwrite=False):
"""
Merge SOXS-generated event files together. The WCS used for the final
file will be based on the first source file, so the original celestial
coordinates for the other files will be discarded.
Parameters
----------
input_files : list of strings
The events file(s) containing the background events.
output_file : string
The merged events file.
overwrite : boolean, optional
Whether to overwrite an existing file with
the same name. Default: False
"""
with fits.open(input_files[0], memmap=True) as f:
wcs_out = wcs_from_header(f["EVENTS"].header)
shape_out = 2.0 * wcs_out.wcs.crpix - 1
_combine_events(input_files, wcs_out, shape_out, output_file, overwrite=overwrite)