Source code for soxs.utils

import logging
import os
import warnings
from configparser import ConfigParser

import appdirs
import astropy.units as u
import numpy as np
import regions
from astropy.units import Quantity
from numpy.random import Generator, RandomState, default_rng
from scipy.interpolate import interp1d

# Configuration

soxs_cfg_defaults = {
    "soxs_data_dir": "/does/not/exist",
    "soxs_answer_dir": "/does/not/exist",
    "abund_table": "angr",
    "apec_vers": "3.0.9",
    "spex_vers": "3.07.03",
    "bkgnd_nH": 0.018,
    "bkgnd_absorb_model": "tbabs",
    "frgnd_spec_model": "default",
    "frgnd_velocity": 0.0,
    "frgnd_abund": 1.0,
}

config_root = os.environ.get(
    "XDG_CONFIG_HOME", os.path.join(os.path.expanduser("~"), ".config")
)
CONFIG_DIR = os.path.join(config_root, "soxs")

if not os.path.exists(CONFIG_DIR):
    try:
        os.makedirs(CONFIG_DIR)
    except OSError:
        warnings.warn("unable to create soxs config directory")

CURRENT_CONFIG_FILE = os.path.join(CONFIG_DIR, "soxs.cfg")

if not os.path.exists(CURRENT_CONFIG_FILE):
    cp = ConfigParser()
    cp.add_section("soxs")
    try:
        with open(CURRENT_CONFIG_FILE, "w") as new_cfg:
            cp.write(new_cfg)
    except IOError:
        warnings.warn("unable to write new config file")


soxs_cfg = ConfigParser(soxs_cfg_defaults)
soxs_cfg.read([CURRENT_CONFIG_FILE, "soxs.cfg"])
if not soxs_cfg.has_section("soxs"):
    soxs_cfg.add_section("soxs")

# Logging

soxsLogger = logging.getLogger("soxs")

ufstring = "%(name)-3s : [%(levelname)-9s] %(asctime)s %(message)s"
cfstring = "%(name)-3s : [%(levelname)-18s] %(asctime)s %(message)s"

soxs_sh = logging.StreamHandler()
# create formatter and add it to the handlers
formatter = logging.Formatter(ufstring)
soxs_sh.setFormatter(formatter)
# add the handler to the logger
soxsLogger.addHandler(soxs_sh)
soxsLogger.setLevel("INFO")
soxsLogger.propagate = False

mylog = soxsLogger

mylog.setLevel("INFO")

if soxs_cfg.get("soxs", "soxs_data_dir") == "/does/not/exist":
    soxs_data_dir = appdirs.user_cache_dir("soxs")
    mylog.warning(
        "Setting 'soxs_data_dir' to %s for this session. "
        "Please update your configuration if you want it somewhere else.",
        soxs_data_dir,
    )
    soxs_cfg.set("soxs", "soxs_data_dir", appdirs.user_cache_dir("soxs"))


def issue_deprecation_warning(msg):
    import warnings

    from numpy import VisibleDeprecationWarning

    warnings.warn(msg, VisibleDeprecationWarning, stacklevel=3)


soxs_path = os.path.abspath(os.path.dirname(__file__))
soxs_files_path = os.path.join(soxs_path, "files")


def parse_prng(prng):
    if isinstance(prng, (RandomState, Generator)):
        return prng
    else:
        return default_rng(seed=prng)


def iterable(obj):
    """
    Grabbed from Python Cookbook / matplotlib.cbook.
    Returns true/false for *obj* iterable.
    """
    try:
        len(obj)
    except TypeError:
        return False
    return True


def ensure_list(obj):
    """
    This function ensures that *obj* is a list.  Typically used to convert a
    string to a list, for instance ensuring the *fields* as an argument is a
    list.
    """
    if obj is None:
        return [obj]
    if not isinstance(obj, list):
        return [obj]
    return obj


def ensure_numpy_array(obj):
    """
    This function ensures that *obj* is a numpy array.
    Typically used to convert scalar, list or tuple
    argument passed to functions using Cython.
    """
    if isinstance(obj, np.ndarray):
        if obj.shape == ():
            return np.array([obj])
        # We cast to ndarray to catch ndarray subclasses
        return np.array(obj)
    elif isinstance(obj, (list, tuple)):
        return np.asarray(obj)
    else:
        return np.asarray([obj])


def parse_value(value, default_units, equivalence=None):
    if isinstance(value, str):
        v = value.split(",")
        if len(v) == 2:
            value = (float(v[0]), v[1])
        else:
            value = float(v[0])
    if hasattr(value, "to_astropy"):
        value = value.to_astropy()
    if isinstance(value, Quantity):
        q = Quantity(value.value, value.unit)
    elif iterable(value):
        q = Quantity(value[0], value[1])
    else:
        q = Quantity(value, default_units)
    return q.to(default_units, equivalencies=equivalence).value


def get_rot_mat(roll_angle):
    roll_angle = np.deg2rad(roll_angle)
    rot_mat = np.array(
        [
            [np.cos(roll_angle), -np.sin(roll_angle)],
            [np.sin(roll_angle), np.cos(roll_angle)],
        ]
    )
    return rot_mat


def downsample(myarr, factor, estimator=np.mean):
    """
    Downsample a 2D array by averaging over *factor* pixels in each axis.
    Crops upper edge if the shape is not a multiple of factor.

    This code is pure numpy and should be fast.

    keywords:
        estimator - default to mean.  You can downsample by summing or
            something else if you want a different estimator
            (e.g., downsampling error: you want to sum & divide by sqrt(n))
    """
    ys, xs = myarr.shape
    crarr = myarr[: ys - (ys % int(factor)), : xs - (xs % int(factor))]
    dsarr = estimator(
        np.concatenate(
            [
                [crarr[i::factor, j::factor] for i in range(factor)]
                for j in range(factor)
            ]
        ),
        axis=0,
    )
    return dsarr


def line_width_equiv(rest):
    from astropy.constants import c

    ckms = c.to_value("km/s")

    def forward(x):
        return rest * x / ckms

    def backward(x):
        return x / rest * ckms

    return [(u.km / u.s, u.keV, forward, backward)]


class DummyPbar:
    def __init__(self):
        pass

    def update(self, *args, **kwargs):
        pass

    def close(self):
        pass


def create_region(rtype, args, dx, dy):
    if rtype in ["Rectangle", "Box"]:
        xctr, yctr, xw, yw = args
        x = xctr + dx
        y = yctr + dy
        center = regions.PixCoord(x=x, y=y)
        reg = regions.RectanglePixelRegion(center=center, width=xw, height=yw)
        bounds = [x - 0.5 * xw, x + 0.5 * xw, y - 0.5 * yw, y + 0.5 * yw]
    elif rtype == "Circle":
        xctr, yctr, radius = args
        x = xctr + dx
        y = yctr + dy
        center = regions.PixCoord(x=x, y=y)
        reg = regions.CirclePixelRegion(center=center, radius=radius)
        bounds = [x - radius, x + radius, y - radius, y + radius]
    elif rtype == "Polygon":
        x = np.array(args[0]) + dx
        y = np.array(args[1]) + dy
        vertices = regions.PixCoord(x=x, y=y)
        reg = regions.PolygonPixelRegion(vertices=vertices)
        bounds = [x.min(), x.max(), y.min(), y.max()]
    else:
        raise NotImplementedError
    return reg, bounds


def process_fits_string(fitsstr):
    import re

    from astropy.io import fits

    fn = fitsstr.split("[")[0]
    brackets = re.findall(r"[^[]*\[([^]]*)\]", fitsstr)
    with fits.open(fn) as f:
        if len(brackets) == 0:
            imgs = np.array([hdu.is_image for hdu in f])
            if imgs.sum() > 1:
                raise IOError("Multiple HDUs in this file, please specify one to read!")
            ext = np.where(imgs)[0][0]
        else:
            ext = brackets[0]
            if ext.isdigit():
                ext = int(ext)
        imhdu = f[ext].copy()
    return imhdu


class PoochHandle:
    r"""
    Container for a pooch object used to fetch remote response that isn't
    already stored locally.
    """

    def __init__(self, cache_dir=None):
        import json

        import pooch

        if cache_dir is None:
            if os.path.isdir(soxs_cfg.get("soxs", "soxs_data_dir")):
                cache_dir = soxs_cfg.get("soxs", "soxs_data_dir")
            else:
                cache_dir = pooch.os_cache("soxs")
        with open(os.path.join(soxs_path, "file_hash_registry.json")) as f:
            self._registry = json.load(f)
        self.pooch_obj = pooch.create(
            path=cache_dir,
            registry=self._registry,
            env="SOXS_DATA_DIR",
            base_url="https://www.jzuhone.com/soxs_data/",
        )
        self.dl = pooch.HTTPDownloader(progressbar=True)

    def fetch(self, fname):
        return self.pooch_obj.fetch(fname, downloader=self.dl)


finley = PoochHandle()


def get_data_file(fn):
    rel_fn = os.path.split(fn)[-1]
    if os.path.exists(rel_fn):
        mylog.warning(
            "Using local file %s instead of the one from the database.", rel_fn
        )
        return fn
    else:
        return finley.fetch(rel_fn)


def image_pos(im, nph, prng):
    im[im < 0.0] = 0.0
    im = im / im.sum()
    idxs = prng.choice(im.size, size=nph, p=im.flatten())
    x, y = np.unravel_index(idxs, im.shape)
    dx = prng.uniform(low=0.5, high=1.5, size=x.size)
    dy = prng.uniform(low=0.5, high=1.5, size=y.size)
    return x + dx, y + dy


[docs] def set_soxs_config(option, value): """ Set SOXS configuration values. Parameters ---------- option : string The option to change. value : number or string The value to set the option to. """ soxs_cfg.set("soxs", option, value=str(value))
[docs] def set_mission_config(mission): """ Set configuration options most appropriate for a specific *mission*. Currently only takes "lem". """ if mission == "lem": frgnd_spec_model = "halosat" bkgnd_absorb_model = "tbabs" frgnd_velocity = 100.0 # km/s set_soxs_config("frgnd_spec_model", frgnd_spec_model) set_soxs_config("bkgnd_absorb_model", bkgnd_absorb_model) set_soxs_config("frgnd_velocity", frgnd_velocity) else: raise RuntimeError(f"Mission '{mission}' is not implemented!")
def regrid_spectrum(ebins_new, ebins, spec, clip_neg=True): cspec = np.insert(np.cumsum(spec, axis=-1), 0, 0.0, axis=-1) f = interp1d( ebins, cspec, axis=-1, fill_value=0.0, bounds_error=False, assume_sorted=True, copy=False, ) new_spec = np.diff(f(ebins_new), axis=-1) if clip_neg: new_spec.clip(0.0, None, out=new_spec) return new_spec def convert_endian(arr): import sys sys_byteorder = (">", "<")[sys.byteorder == "little"] if arr.dtype.byteorder not in ("=", sys_byteorder): return arr.byteswap().newbyteorder(sys_byteorder)