"""
Classes for generating lists of photons
"""
import h5py
import numpy as np
from soxs import __version__ as soxs_version
from soxs.constants import erg_per_keV
from soxs.utils import parse_prng
from tqdm.auto import tqdm
from unyt.array import unyt_array
from yt import __version__ as yt_version
from yt.utilities.cosmology import Cosmology
from yt.utilities.orientation import Orientation
from yt.utilities.parallel_tools.parallel_analysis_interface import (
communication_system,
parallel_objects,
)
from yt.utilities.physical_constants import clight
from pyxsim import __version__ as pyxsim_version
from pyxsim.lib.sky_functions import (
doppler_shift,
pixel_to_cel,
scatter_events,
scatter_events_allsky,
)
from pyxsim.spectral_models import absorb_models
from pyxsim.utils import mylog, parse_value
comm = communication_system.communicators[-1]
init_chunk = 100000
def determine_fields(ds, source_type, point_sources):
from yt.geometry.particle_geometry_handler import ParticleIndex
# Figure out if this is a particle field or otherwise
ptype = (
(source_type in ds.particle_types)
| (source_type in ds.known_filters)
| ((source_type == "gas") & isinstance(ds.index, ParticleIndex))
)
if ptype:
ppos = [f"particle_position_{ax}" for ax in "xyz"]
pvel = [f"particle_velocity_{ax}" for ax in "xyz"]
if source_type in ds.known_filters:
if ds.known_filters[source_type].filtered_type == "gas":
ppos = ["x", "y", "z"]
pvel = [f"velocity_{ax}" for ax in "xyz"]
elif source_type == "gas":
source_type = ds._sph_ptypes[0]
position_fields = [(source_type, ppos[i]) for i in range(3)]
velocity_fields = [(source_type, pvel[i]) for i in range(3)]
if source_type in ds._sph_ptypes:
width_field = (source_type, "smoothing_length")
else:
width_field = None
else:
position_fields = [("index", ax) for ax in "xyz"]
velocity_fields = [(source_type, f"velocity_{ax}") for ax in "xyz"]
width_field = ("index", "dx")
if point_sources:
width_field = None
return position_fields, velocity_fields, width_field
def find_object_bounds(data_source):
"""
This logic is required to determine the bounds of the object, which is
solely for fixing coordinates at periodic boundaries
"""
if hasattr(data_source, "base_object"):
# This is a cut region so we'll figure out
# its bounds from its parent object
data_src = data_source.base_object
else:
data_src = data_source
if hasattr(data_src, "left_edge"):
# Region or grid
c = 0.5 * (data_src.left_edge + data_src.right_edge)
w = data_src.right_edge - data_src.left_edge
le = -0.5 * w + c
re = 0.5 * w + c
elif hasattr(data_src, "radius") and not hasattr(data_src, "height"):
# Sphere
le = -data_src.radius + data_src.center
re = data_src.radius + data_src.center
else:
# Not sure what to do with any other object yet, so just
# return the domain edges and punt.
mylog.warning(
"You are using a region that is not currently "
"supported for straddling periodic boundaries. "
"Check to make sure that your region does not "
"do so."
)
le = data_source.ds.domain_left_edge
re = data_source.ds.domain_right_edge
return le.to_value("kpc"), re.to_value("kpc")
[docs]
def make_photons(
photon_prefix,
data_source,
redshift,
area,
exp_time,
source_model,
point_sources=False,
parameters=None,
center=None,
dist=None,
cosmology=None,
velocity_fields=None,
bulk_velocity=None,
observer="external",
fields_to_keep=None,
):
r"""
Write a photon list dataset to disk from a yt data source and assuming a
source model for the photons. The redshift, collecting area, exposure time,
and cosmology are stored in the *parameters* dictionary which is passed to
the *source_model* function.
Parameters
----------
photon_prefix : string
The prefix of the filename(s) to contain the photon list. If run in
serial, the filename will be "{photon_prefix}.h5", if run in
parallel, the filenames will be "{photon_prefix}.{mpi_rank}.h5".
data_source : :class:`~yt.data_objects.data_containers.YTSelectionContainer`
The data source from which the photons will be generated.
redshift : float
The cosmological redshift for the photons.
area : float, (value, unit) tuple, :class:`~yt.units.yt_array.YTQuantity`, or :class:`~astropy.units.Quantity`
The collecting area to determine the number of photons. If units are
not specified, it is assumed to be in cm^2.
exp_time : float, (value, unit) tuple, :class:`~yt.units.yt_array.YTQuantity`, or :class:`~astropy.units.Quantity`
The exposure time to determine the number of photons. If units are
not specified, it is assumed to be in seconds.
source_model : :class:`~pyxsim.source_models.sources.SourceModel`
A source model used to generate the photons.
point_sources : boolean, optional
If True, the photons will be assumed to be generated from the exact
positions of the cells or particles and not smeared around within
a volume. Default: False
parameters : dict, optional
A dictionary of parameters to be passed for the source model to use,
if necessary.
center : string or array_like, optional
The origin of the photon spatial coordinates. Accepts "c", "max", or
a coordinate. If array-like and without units, it is assumed to be in
units of kpc. If not specified, pyxsim attempts to use the "center"
field parameter of the data_source.
dist : float, (value, unit) tuple, :class:`~yt.units.yt_array.YTQuantity`, or :class:`~astropy.units.Quantity`, optional
The angular diameter distance, used for nearby sources. This may be
optionally supplied instead of it being determined from the
*redshift* and given *cosmology*. If units are not specified, it is
assumed to be in kpc. To use this, the redshift must be set to zero.
cosmology : :class:`~yt.utilities.cosmology.Cosmology`, optional
Cosmological information. If not supplied, we try to get the
cosmology from the dataset. Otherwise, LCDM with the default yt
parameters is assumed.
velocity_fields : list of fields
The yt fields to use for the velocity. If not specified, the
following will be assumed:
['velocity_x', 'velocity_y', 'velocity_z'] for grid datasets
['particle_velocity_x', 'particle_velocity_y', 'particle_velocity_z'] for particle datasets
bulk_velocity : array-like, optional
A 3-element array or list specifying the local velocity frame of
reference. If not a :class:`~yt.units.yt_array.YTArray`, it is assumed
to have units of km/s. Default: [0.0, 0.0, 0.0] km/s.
observer : string, optional
Controls whether the source is at a large distance, in which case
the observer sits far outside the source ("external"), or if the
observer is inside the X-ray-emitting source ("internal"), such as a
galaxy. Default: "external"
fields_to_keep : list of tuples
A list of fields to add to the HDF5 photon list in addition to the cell
or particle positions, velocities, and sizes. Default: None
Returns
-------
A tuple of two integers, the number of photons and the number of cells with
photons
Examples
--------
>>> thermal_model = pyxsim.CIESourceModel("apec", 0.1, 10.0, 1000, 0.3)
>>> redshift = 0.05
>>> area = 6000.0 # assumed here in cm**2
>>> exp_time = 2.0e5 # assumed here in seconds
>>> sp = ds.sphere("c", (500., "kpc"))
>>> fields_to_keep = [("gas", "density"), ("gas", "temperature")]
>>> n_photons, n_cells = pyxsim.make_photons(
... sp, redshift, area, exp_time, thermal_model,
... fields_to_keep=fields_to_keep
... )
"""
ds = data_source.ds
if photon_prefix.endswith(".h5"):
photon_prefix = photon_prefix[:-3]
if comm.size > 1:
photon_file = f"{photon_prefix}.{comm.rank:04d}.h5"
photon_files = [f"{photon_prefix}.{i:04d}.h5" for i in range(comm.size)]
else:
photon_file = f"{photon_prefix}.h5"
photon_files = [photon_file]
if parameters is None:
parameters = {}
if cosmology is None:
if hasattr(ds, "cosmology"):
cosmo = ds.cosmology
else:
cosmo = Cosmology()
else:
cosmo = cosmology
if observer == "external":
if dist is None:
if redshift <= 0.0:
msg = (
"If redshift <= 0.0, you must specify a distance to the "
"source using the 'dist' argument!"
)
mylog.error(msg)
raise ValueError(msg)
D_A = cosmo.angular_diameter_distance(0.0, redshift).to("Mpc")
else:
D_A = parse_value(dist, "kpc")
if redshift > 0.0:
mylog.warning(
"Redshift must be zero for nearby sources. "
"Resetting redshift to 0.0."
)
redshift = 0.0
else:
D_A = parse_value(0.0, "kpc")
if redshift > 0.0:
mylog.warning(
"Redshift must be zero for internal observers. "
"Resetting redshift to 0.0."
)
redshift = 0.0
if isinstance(center, str):
if center == "center" or center == "c":
parameters["center"] = ds.domain_center
elif center == "max" or center == "m":
parameters["center"] = ds.find_max("density")[-1]
elif isinstance(center, unyt_array):
parameters["center"] = center.in_units("code_length")
elif isinstance(center, tuple):
if center[0] == "min":
parameters["center"] = ds.find_min(center[1])[-1]
elif center[0] == "max":
parameters["center"] = ds.find_max(center[1])[-1]
else:
raise RuntimeError
elif isinstance(center, (list, np.ndarray)):
parameters["center"] = ds.arr(center, "code_length")
elif center is None:
if hasattr(data_source, "left_edge"):
parameters["center"] = 0.5 * (
data_source.left_edge + data_source.right_edge
)
else:
parameters["center"] = data_source.get_field_parameter("center")
if bulk_velocity is None:
bulk_velocity = ds.arr([0.0] * 3, "km/s")
elif isinstance(bulk_velocity, unyt_array):
bulk_velocity = bulk_velocity.to("km/s")
elif isinstance(bulk_velocity, (list, np.ndarray)):
bulk_velocity = ds.arr(bulk_velocity, "km/s")
parameters["bulk_velocity"] = bulk_velocity
parameters["fid_exp_time"] = parse_value(exp_time, "s")
parameters["fid_area"] = parse_value(area, "cm**2")
parameters["fid_redshift"] = redshift
parameters["observer"] = observer
parameters["hubble"] = cosmo.hubble_constant
parameters["omega_matter"] = cosmo.omega_matter
parameters["omega_lambda"] = cosmo.omega_lambda
parameters["center"].convert_to_units("kpc")
parameters["fid_d_a"] = D_A
if observer == "external":
if redshift > 0.0:
mylog.info(
"Cosmology: h = %g, omega_matter = %g, omega_lambda = %g",
cosmo.hubble_constant,
cosmo.omega_matter,
cosmo.omega_lambda,
)
else:
mylog.info("Observing local source at distance %g.", D_A)
else:
mylog.info("The observer is internal to the source.")
local_exp_time = parameters["fid_exp_time"].to_value("s")
D_A = parameters["fid_d_a"].to_value("cm")
dist_fac = 1.0 / (4.0 * np.pi)
if observer == "external":
dist_fac /= D_A * D_A * (1.0 + redshift) ** 2
spectral_norm = parameters["fid_area"].v * local_exp_time * dist_fac
dw = ds.domain_width.to_value("kpc")
le, re = find_object_bounds(data_source)
c = parameters["center"].to_value("kpc")
source_model.setup_model("photons", data_source, redshift)
p_fields, v_fields, w_field = determine_fields(
ds, source_model.ftype, point_sources
)
if velocity_fields is not None:
v_fields = velocity_fields
fields_store = []
if fields_to_keep is not None:
for field in fields_to_keep:
fd = ds._get_field_info(field)
fields_store.append(fd.name)
if p_fields[0] == ("index", "x"):
parameters["data_type"] = "cells"
else:
parameters["data_type"] = "particles"
source_model.set_pv(p_fields, v_fields, le, re, dw, c, ds.periodicity, observer)
f = h5py.File(photon_file, "w")
# Info
info = f.create_group("info")
info.attrs["yt_version"] = yt_version
info.attrs["pyxsim_version"] = pyxsim_version
info.attrs["soxs_version"] = soxs_version
info.attrs["dataset"] = str(data_source.ds)
info.attrs["data_source"] = str(data_source)
info.attrs["source_model"] = repr(source_model)
info.attrs["filenames"] = photon_files
# Parameters
p = f.create_group("parameters")
p.create_dataset("fid_area", data=float(parameters["fid_area"]))
p.create_dataset("fid_exp_time", data=float(parameters["fid_exp_time"]))
p.create_dataset("fid_redshift", data=parameters["fid_redshift"])
p.create_dataset("hubble", data=parameters["hubble"])
p.create_dataset("omega_matter", data=parameters["omega_matter"])
p.create_dataset("omega_lambda", data=parameters["omega_lambda"])
p.create_dataset("fid_d_a", data=parameters["fid_d_a"].to_value("Mpc"))
p.create_dataset("data_type", data=parameters["data_type"])
p.create_dataset("observer", data=parameters["observer"])
p.create_dataset("center", data=parameters["center"].d)
p.create_dataset("bulk_velocity", data=parameters["bulk_velocity"].d)
p.create_dataset("velocity_fields", data=np.array(v_fields).astype("S"))
n_cells = 0
n_photons = 0
c_offset = 0
p_offset = 0
c_size = init_chunk
p_size = init_chunk
cell_fields = ["x", "y", "z", "vx", "vy", "vz", "num_photons", "dx"]
if len(fields_store) > 0:
for field in fields_store:
cell_fields.append(field[1])
d = f.create_group("data")
for field in cell_fields + ["energy"]:
if field == "num_photons":
dtype = "int64"
else:
dtype = "float64"
d.create_dataset(
field,
data=np.zeros(init_chunk, dtype=dtype),
maxshape=(None,),
dtype=dtype,
chunks=True,
)
f.flush()
for chunk in parallel_objects(data_source.chunks([], "io")):
chunk_data = source_model.process_data("photons", chunk, spectral_norm)
if chunk_data is not None:
chunk_nc, number_of_photons, idxs, energies = chunk_data
chunk_nph = np.sum(number_of_photons)
if chunk_nph == 0:
continue
if c_size < n_cells + chunk_nc:
while chunk_nc + n_cells > c_size:
c_size *= 2
for field in cell_fields:
d[field].resize((c_size,))
if p_size < n_photons + chunk_nph:
while chunk_nph + n_photons > p_size:
p_size *= 2
d["energy"].resize((p_size,))
if w_field is None:
dx = 0.0
else:
dx = chunk[w_field][idxs].to_value("kpc")
for i, ax in enumerate("xyz"):
pos = chunk[p_fields[i]][idxs].to_value("kpc")
# Fix photon coordinates for regions crossing a periodic boundary
if ds.periodicity[i]:
tfl = pos + dx < le[i]
tfr = pos - dx > re[i]
pos[tfl] += dw[i]
pos[tfr] -= dw[i]
vel = chunk[v_fields[i]][idxs].to_value("km/s")
# Coordinates are centered
d[ax][c_offset : c_offset + chunk_nc] = pos - c[i]
# Velocities have the bulk velocity subtracted off
d[f"v{ax}"][c_offset : c_offset + chunk_nc] = vel - bulk_velocity.v[i]
d["num_photons"][c_offset : c_offset + chunk_nc] = number_of_photons
d["energy"][p_offset : p_offset + chunk_nph] = energies
if w_field is None:
d["dx"][c_offset : c_offset + chunk_nc] = 0.0
else:
d["dx"][c_offset : c_offset + chunk_nc] = dx
for field in fields_store:
d[field[1]][c_offset : c_offset + chunk_nc] = chunk[field][idxs]
n_cells += chunk_nc
n_photons += chunk_nph
c_offset = n_cells
p_offset = n_photons
f.flush()
if c_size > n_cells:
for field in cell_fields:
d[field].resize((n_cells,))
if p_size > n_photons:
d["energy"].resize((n_photons,))
f.close()
source_model.cleanup_model("photons")
all_nphotons = comm.mpi_allreduce(n_photons)
all_ncells = comm.mpi_allreduce(n_cells)
mylog.info("Finished generating photons.")
mylog.info("Number of photons generated: %d", all_nphotons)
mylog.info("Number of cells with photons: %d", all_ncells)
return all_nphotons, all_ncells
def _project_photons(
obs,
photon_prefix,
event_prefix,
normal,
sky_center,
absorb_model=None,
nH=None,
abund_table="angr",
no_shifting=False,
north_vector=None,
flat_sky=False,
sigma_pos=None,
kernel="top_hat",
save_los=False,
prng=None,
):
from yt.funcs import ensure_numpy_array
prng = parse_prng(prng)
if photon_prefix.endswith(".h5"):
photon_prefix = photon_prefix[:-3]
if event_prefix.endswith(".h5"):
event_prefix = event_prefix[:-3]
if not isinstance(normal, str):
L = np.array(normal)
orient = Orientation(L, north_vector=north_vector)
x_hat = orient.unit_vectors[0]
y_hat = orient.unit_vectors[1]
z_hat = orient.unit_vectors[2]
north_vector = orient.north_vector
else:
x_hat = np.zeros(3)
y_hat = np.zeros(3)
z_hat = np.zeros(3)
north_vector = None
if comm.size > 1:
photon_file = f"{photon_prefix}.{comm.rank:04d}.h5"
event_file = f"{event_prefix}.{comm.rank:04d}.h5"
event_files = [f"{event_prefix}.{i:04d}.h5" for i in range(comm.size)]
else:
photon_file = f"{photon_prefix}.h5"
event_file = f"{event_prefix}.h5"
event_files = [event_file]
sky_center = ensure_numpy_array(sky_center)
scale_shift = -1.0 / clight.to_value("km/s")
scale_shift2 = scale_shift * scale_shift
if isinstance(absorb_model, str):
if absorb_model not in absorb_models:
raise KeyError(f"{absorb_model} is not a known absorption model!")
absorb_model = absorb_models[absorb_model]
if absorb_model is not None:
if nH is None:
raise RuntimeError(
"You specified an absorption model, but didn't "
"specify a value for nH!"
)
absorb_model = absorb_model(nH, abund_table=abund_table)
if comm.rank == 0:
mylog.info(
"Foreground galactic absorption: using the %s model and nH = %g.",
absorb_model._name,
nH,
)
abs_model_name = absorb_model._name if absorb_model else "none"
if nH is None:
nH = 0.0
f = h5py.File(photon_file, "r")
p = f["parameters"]
data_type = p["data_type"].asstr()[()]
if "observer" in p:
observer = p["observer"].asstr()[()]
else:
observer = "external"
if obs != observer:
which_func = {"external": "", "internal": "_allsky"}
raise RuntimeError(
f"The function 'project_photons{which_func['obs']}' "
f"does not work with '{observer}' photon lists!"
)
if observer == "internal" and isinstance(normal, str):
raise RuntimeError(
"Must specify a vector for 'normal' if you are "
"doing an 'internal' observation!"
)
if sigma_pos is not None and data_type == "particles":
raise RuntimeError(
"The 'smooth_positions' argument should "
"not be used with particle-based datasets!"
)
d = f["data"]
D_A = p["fid_d_a"][()] * 1.0e3 # assumes that the value from the file is in Mpc
if d["energy"].size == 0:
mylog.warning("No photons are in file %s, so I am done.", photon_file)
n_events = 0
else:
fe = h5py.File(event_file, "w")
ie = fe.create_group("info")
ie.attrs["pyxsim_version"] = pyxsim_version
ie.attrs["yt_version"] = yt_version
ie.attrs["soxs_version"] = soxs_version
ie.attrs["photon_file"] = photon_file
ie.attrs["filenames"] = event_files
exp_time = float(p["fid_exp_time"][()])
area = float(p["fid_area"][()])
pe = fe.create_group("parameters")
pe.create_dataset("exp_time", data=exp_time)
pe.create_dataset("area", data=area)
pe.create_dataset("sky_center", data=sky_center)
pe.create_dataset("observer", data=observer)
pe.create_dataset("no_shifting", data=int(no_shifting))
pe.create_dataset("flat_sky", data=int(flat_sky))
pe.create_dataset("normal", data=normal)
if north_vector is not None:
pe.create_dataset("north_vector", data=north_vector)
pe.create_dataset("absoption_model", data=abs_model_name)
if absorb_model is not None:
pe.create_dataset("nH", data=nH)
pe.create_dataset("abund_table", data=abund_table)
if sigma_pos is not None:
pe.create_dataset("sigma_pos", data=sigma_pos)
pe.create_dataset("kernel", data=kernel)
event_fields = ["xsky", "ysky", "eobs"]
if save_los:
event_fields.append("los")
n_events = 0
e_offset = 0
e_size = init_chunk
cell_chunk = init_chunk
start_e = 0
de = fe.create_group("data")
for field in event_fields:
de.create_dataset(
field, data=np.zeros(init_chunk), maxshape=(None,), chunks=True
)
if isinstance(normal, str):
norm = "xyz".index(normal)
else:
norm = normal
n_cells = d["num_photons"].size
pbar = tqdm(
leave=True, total=n_cells, desc="Projecting photons from cells/particles "
)
flux = 0.0
emin = 1.0e99
emax = -1.0e99
for start_c in range(0, n_cells, cell_chunk):
end_c = min(start_c + cell_chunk, n_cells)
n_ph = d["num_photons"][start_c:end_c]
x = d["x"][start_c:end_c]
y = d["y"][start_c:end_c]
z = d["z"][start_c:end_c]
dx = d["dx"][start_c:end_c]
end_e = start_e + n_ph.sum()
eobs = d["energy"][start_e:end_e]
if observer == "internal":
r = np.sqrt(x * x + y * y + z * z)
else:
r = None
if not no_shifting:
if observer == "internal":
vn = (
-(
d["vx"][start_c:end_c] * x
+ d["vy"][start_c:end_c] * y
+ d["vz"][start_c:end_c] * z
)
/ r
)
else:
if isinstance(normal, str):
vn = d[f"v{normal}"][start_c:end_c]
else:
vn = (
d["vx"][start_c:end_c] * z_hat[0]
+ d["vy"][start_c:end_c] * z_hat[1]
+ d["vz"][start_c:end_c] * z_hat[2]
)
v2 = (
d["vx"][start_c:end_c] * d["vx"][start_c:end_c]
+ d["vy"][start_c:end_c] * d["vy"][start_c:end_c]
+ d["vz"][start_c:end_c] * d["vz"][start_c:end_c]
)
doppler_shift(vn * scale_shift, v2 * scale_shift2, n_ph, eobs)
if absorb_model is None:
det = np.ones(eobs.size, dtype="bool")
num_det = np.int64(eobs.size)
else:
det = absorb_model.absorb_photons(eobs, prng=prng)
num_det = np.int64(det.sum())
if num_det > 0:
if observer == "external":
if data_type == "particles":
dx *= 0.5
xsky, ysky, los = scatter_events(
norm,
prng,
kernel,
data_type,
num_det,
det,
n_ph,
x,
y,
z,
dx,
x_hat,
y_hat,
z_hat,
)
if data_type == "cells" and sigma_pos is not None:
sigma = sigma_pos * np.repeat(dx, n_ph)[det]
xsky += sigma * prng.normal(loc=0.0, scale=1.0, size=num_det)
ysky += sigma * prng.normal(loc=0.0, scale=1.0, size=num_det)
xsky /= D_A
ysky /= D_A
if flat_sky:
xsky = sky_center[0] - np.rad2deg(xsky)
ysky = sky_center[1] + np.rad2deg(ysky)
else:
pixel_to_cel(xsky, ysky, sky_center)
elif observer == "internal":
xsky, ysky, los = scatter_events_allsky(
data_type,
kernel,
prng,
num_det,
det,
n_ph,
x,
y,
z,
dx,
x_hat,
y_hat,
z_hat,
)
if e_size < n_events + num_det:
while n_events + num_det > e_size:
e_size *= 2
for field in event_fields:
de[field].resize((e_size,))
de["xsky"][e_offset : e_offset + num_det] = xsky
de["ysky"][e_offset : e_offset + num_det] = ysky
de["eobs"][e_offset : e_offset + num_det] = eobs[det]
flux += eobs[det].sum()
emin = min(eobs[det].min(), emin)
emax = max(eobs[det].max(), emax)
if save_los:
de["los"][e_offset : e_offset + num_det] = los
n_events += num_det
e_offset = n_events
f.flush()
pbar.update(end_c - start_c + 1)
start_e = end_e
pbar.close()
if e_size > n_events:
for field in event_fields:
de[field].resize((n_events,))
flux *= erg_per_keV / exp_time / area
pe.create_dataset("flux", data=flux)
pe.create_dataset("emin", data=emin)
pe.create_dataset("emax", data=emax)
fe.close()
f.close()
all_nevents = comm.mpi_allreduce(n_events)
mylog.info("Detected %d events.", all_nevents)
return all_nevents
[docs]
def project_photons(
photon_prefix,
event_prefix,
normal,
sky_center,
absorb_model=None,
nH=None,
abund_table="angr",
no_shifting=False,
north_vector=None,
sigma_pos=None,
flat_sky=False,
kernel="top_hat",
save_los=False,
prng=None,
):
r"""
Projects photons onto an image plane given a line of sight, and
stores them in an HDF5 dataset which contains an event list.
Parameters
----------
photon_prefix : string
The prefix of the filename(s) containing the photon list. If run in
serial, the filename will be "{photon_prefix}.h5", if run in
parallel, the filenames will be "{photon_prefix}.{mpi_rank}.h5".
event_prefix : string
The prefix of the filename(s) which will be written to contain the
event list. If run in serial, the filename will be "{event_prefix}.h5",
if run in parallel, the filename will be "{event_prefix}.{mpi_rank}.h5".
normal : character or array-like
Normal vector to the plane of projection. If "x", "y", or "z", will
assume to be along that axis (and will probably be faster). Otherwise,
should be an off-axis normal vector, e.g [1.0, 2.0, -3.0]
sky_center : array-like
Center RA, Dec of the events in degrees.
absorb_model : string
A model for foreground galactic absorption, to simulate the
absorption of events before being detected. Known options for
are "wabs" and "tbabs".
nH : float, optional
The foreground column density in units of 10^22 cm^{-2}. Only used
if absorption is applied.
abund_table : string
The abundance table to be used for abundances in the
absorption model (only used for TBabs). Default is set in the SOXS
configuration file, the default for which is "angr".
Built-in options are:
"angr" : from Anders E. & Grevesse N. (1989, Geochimica et
Cosmochimica Acta 53, 197)
"aspl" : from Asplund M., Grevesse N., Sauval A.J. & Scott
P. (2009, ARAA, 47, 481)
"feld" : from Feldman U. (1992, Physica Scripta, 46, 202)
"wilm" : from Wilms, Allen & McCray (2000, ApJ 542, 914
except for elements not listed which are given zero abundance)
"lodd" : from Lodders, K (2003, ApJ 591, 1220)
"cl17.03" : the default abundance table in Cloudy 17.03
no_shifting : boolean, optional
If set, the photon energies will not be Doppler shifted. Default: False
north_vector : a sequence of floats
A vector defining the "up" direction. This option sets the
orientation of the plane of projection. If not set, an arbitrary
grid-aligned north_vector perpendicular to the normal is chosen.
Ignored in the case where a particular axis (e.g., "x", "y", or
"z") is explicitly specified.
sigma_pos : float, optional
Apply a gaussian smoothing operation to the sky positions of the
events. This may be useful when the binned events appear blocky due
to their uniform distribution within simulation cells. However, this
will move the events away from their originating position on the
sky, and so may distort surface brightness profiles and/or spectra.
Should probably only be used for visualization purposes. Supply a
float here to smooth with a standard deviation with this fraction
of the cell size. Default: None
flat_sky : boolean, optional
If set, we assume that the sky is "flat" and RA, Dec positions are
computed using simple linear offsets
kernel : string, optional
The kernel used when smoothing positions of X-rays originating from
SPH particles, "gaussian" or "top_hat". Default: "top_hat".
save_los : boolean, optional
If True, save the line-of-sight positions along the projection axis in
units of kpc to the events list. Default: False
prng : integer or :class:`~numpy.random.RandomState` object
A pseudo-random number generator. Typically will only be specified
if you have a reason to generate the same set of random numbers,
such as for a test. Default is to use the :mod:`numpy.random`
module.
Returns
-------
A integer for the number of events created
Examples
--------
>>> L = np.array([0.1,-0.2,0.3])
>>> n_events = pyxsim.project_photons("my_photons.h5", "my_events.h5", L,
... [30., 45.], absorb_model='tbabs',
... nH=0.04)
"""
return _project_photons(
"external",
photon_prefix,
event_prefix,
normal,
sky_center,
absorb_model=absorb_model,
nH=nH,
abund_table=abund_table,
no_shifting=no_shifting,
north_vector=north_vector,
sigma_pos=sigma_pos,
flat_sky=flat_sky,
kernel=kernel,
save_los=save_los,
prng=prng,
)
[docs]
def project_photons_allsky(
photon_prefix,
event_prefix,
normal,
absorb_model=None,
nH=None,
abund_table="angr",
no_shifting=False,
center_vector=None,
kernel="top_hat",
save_los=False,
prng=None,
):
r"""
Projects photons onto the sky sphere given a normal vector ("z" or "up" in
spherical coordinates), and stores them in an HDF5 dataset which contains
an event list.
Parameters
----------
photon_prefix : string
The prefix of the filename(s) containing the photon list. If run in
serial, the filename will be "{photon_prefix}.h5", if run in
parallel, the filenames will be "{photon_prefix}.{mpi_rank}.h5".
event_prefix : string
The prefix of the filename(s) which will be written to contain the
event list. If run in serial, the filename will be "{event_prefix}.h5",
if run in parallel, the filename will be "{event_prefix}.{mpi_rank}.h5".
normal : array-like
The vector determining the "z" or "up" vector for the spherical coordinate
system for the all-sky projection, something like [1.0, 2.0, -3.0]. It
will be normalized before use.
absorb_model : string
A model for foreground galactic absorption, to simulate the
absorption of events before being detected. Known options are "wabs"
and "tbabs".
nH : float, optional
The foreground column density in units of 10^22 cm^{-2}. Only used
if absorption is applied.
abund_table : string
The abundance table to be used for abundances in the
absorption model (only used for TBabs). Default is set in the SOXS
configuration file, the default for which is "angr".
Built-in options are:
"angr" : from Anders E. & Grevesse N. (1989, Geochimica et
Cosmochimica Acta 53, 197)
"aspl" : from Asplund M., Grevesse N., Sauval A.J. & Scott
P. (2009, ARAA, 47, 481)
"feld" : from Feldman U. (1992, Physica Scripta, 46, 202)
"wilm" : from Wilms, Allen & McCray (2000, ApJ 542, 914
except for elements not listed which are given zero abundance)
"lodd" : from Lodders, K (2003, ApJ 591, 1220)
"cl17.03" : the default abundance table in Cloudy 17.03
no_shifting : boolean, optional
If set, the photon energies will not be Doppler shifted. Default: False
center_vector : a sequence of floats
A vector defining what direction will be placed at the center of
the lat/lon coordinate system. If not set, an arbitrary
grid-aligned center_vector perpendicular to the normal is chosen.
kernel : string, optional
The kernel used when smoothing positions of X-rays originating from
SPH particles, "gaussian" or "top_hat". Default: "top_hat".
save_los : boolean, optional
If True, save the line-of-sight radii in units of kpc to the events
file. Default: False
prng : integer or :class:`~numpy.random.RandomState` object
A pseudo-random number generator. Typically will only be specified
if you have a reason to generate the same set of random numbers,
such as for a test. Default is to use the :mod:`numpy.random`
module.
Returns
-------
A integer for the number of events created
Examples
--------
>>> L = np.array([0.1,-0.2,0.3])
>>> n_events = pyxsim.project_photons_allsky("my_photons.h5", "my_events.h5", L)
"""
return _project_photons(
"internal",
photon_prefix,
event_prefix,
normal,
[0.0, 0.0],
absorb_model=absorb_model,
nH=nH,
abund_table=abund_table,
no_shifting=no_shifting,
kernel=kernel,
save_los=save_los,
north_vector=center_vector,
prng=prng,
)
[docs]
class PhotonList:
def __init__(self, filespec):
"""
Read a PhotonList from disk to get information about it
or to export to other formats.
Parameters
----------
filespec : str
A filename, list of filenames, or globbable path
that yields a single or list of HDF5 files containing
event data.
"""
from glob import glob
if filespec.endswith(".h5"):
filenames = glob(filespec)
elif isinstance(filespec, list):
if not np.all([fn.endswith(".h5") for fn in filespec]):
raise RuntimeError("Not all filenames are valid!")
filenames = filespec
else:
raise RuntimeError(f"Invalid PhotonList file spec: {filespec}")
self.filenames = filenames
self.filenames.sort()
self.parameters = {}
self.info = {}
self.num_photons = []
for i, fn in enumerate(self.filenames):
with h5py.File(fn, "r") as f:
p = f["parameters"]
info = f["info"]
self.num_photons.append(f["data"]["energy"].size)
if i == 0:
for field in p:
if isinstance(p[field][()], (str, bytes)):
self.parameters[field] = p[field].asstr()[()]
else:
self.parameters[field] = p[field][()]
for k, v in info.attrs.items():
self.info[k] = v
self.tot_num_photons = np.sum(self.num_photons)
self.observer = self.parameters.get("observer", "external")
[docs]
def write_spectrum(self, specfile, emin, emax, nchan, overwrite=False):
"""
Bin photon energies into a spectrum and write it to a FITS binary
table. This is for an *unconvolved* spectrum.
Parameters
----------
specfile : string
The name of the FITS file to be written.
emin : float
The minimum energy of the spectral bins in keV.
emax : float
The maximum energy of the spectral bins in keV.
nchan : integer
The number of channels.
overwrite : boolean, optional
Set to True to overwrite a previous file.
"""
from astropy.io import fits
spec = np.zeros(nchan)
ebins = np.linspace(emin, emax, nchan + 1, endpoint=True)
emid = 0.5 * (ebins[1:] + ebins[:-1])
for fn in self.filenames:
with h5py.File(fn, "r") as f:
d = f["data"]
spec += np.histogram(d["energy"][:], bins=ebins)[0]
col1 = fits.Column(
name="CHANNEL", format="1J", array=np.arange(nchan).astype("int32") + 1
)
col2 = fits.Column(name="ENERGY", format="1D", array=emid.astype("float64"))
col3 = fits.Column(name="COUNTS", format="1J", array=spec.astype("int32"))
col4 = fits.Column(
name="COUNT_RATE", format="1D", array=spec / self.parameters["fid_exp_time"]
)
coldefs = fits.ColDefs([col1, col2, col3, col4])
tbhdu = fits.BinTableHDU.from_columns(coldefs)
tbhdu.name = "SPECTRUM"
tbhdu.header["DETCHANS"] = spec.shape[0]
tbhdu.header["TOTCTS"] = spec.sum()
tbhdu.header["EXPOSURE"] = self.parameters["fid_exp_time"]
tbhdu.header["LIVETIME"] = self.parameters["fid_exp_time"]
tbhdu.header["CONTENT"] = "pi"
tbhdu.header["HDUCLASS"] = "OGIP"
tbhdu.header["HDUCLAS1"] = "SPECTRUM"
tbhdu.header["HDUCLAS2"] = "TOTAL"
tbhdu.header["HDUCLAS3"] = "TYPE:I"
tbhdu.header["HDUCLAS4"] = "COUNT"
tbhdu.header["HDUVERS"] = "1.1.0"
tbhdu.header["HDUVERS1"] = "1.1.0"
tbhdu.header["CHANTYPE"] = "pi"
tbhdu.header["BACKFILE"] = "none"
tbhdu.header["CORRFILE"] = "none"
tbhdu.header["POISSERR"] = True
tbhdu.header["RESPFILE"] = "none"
tbhdu.header["ANCRFILE"] = "none"
tbhdu.header["MISSION"] = "none"
tbhdu.header["TELESCOP"] = "none"
tbhdu.header["INSTRUME"] = "none"
tbhdu.header["AREASCAL"] = 1.0
tbhdu.header["CORRSCAL"] = 0.0
tbhdu.header["BACKSCAL"] = 1.0
hdulist = fits.HDUList([fits.PrimaryHDU(), tbhdu])
hdulist.writeto(specfile, overwrite=overwrite)