# Licensed under a 3-clause BSD style license - see LICENSE.rst
import ast
import warnings
import astropy.units as u
import numpy as np
from astropy import log
from astropy.table import QTable, Table
from .extern.validator import validate_array, validate_scalar
__all__ = [
"generate_energy_edges",
"sed_conversion",
"build_data_table",
"estimate_B",
]
# Input validation tools
def validate_column(data_table, key, pt, domain="positive"):
try:
column = data_table[key]
array = validate_array(
key,
u.Quantity(column, unit=column.unit),
physical_type=pt,
domain=domain,
)
except KeyError:
raise TypeError(
'Data table does not contain required column "{0}"'.format(key)
)
return array
def validate_data_table(data_table, sed=None):
"""
Validate all columns of a data table. If a list of tables is passed, all
tables will be validated and then concatenated
Parameters
----------
data_table : `astropy.table.Table` or list of `astropy.table.Table`.
sed : bool, optional
Whether to convert the fluxes to SED. If unset, all data tables are
converted to the format of the first data table.
"""
if isinstance(data_table, Table) or isinstance(data_table, QTable):
data_table = [data_table]
try:
for dt in data_table:
if not isinstance(dt, Table) and not isinstance(dt, QTable):
raise TypeError(
"An object passed as data_table is not an astropy Table!"
)
except TypeError:
raise TypeError(
"Argument passed to validate_data_table is not a table and "
"not a list"
)
def dt_sed_conversion(dt, sed):
f_unit, sedf = sed_conversion(dt["energy"], dt["flux"].unit, sed)
# roundtrip to Table to change the units
t = Table(dt)
for col in ["flux", "flux_error_lo", "flux_error_hi"]:
t[col].unit = f_unit
ndt = QTable(t)
ndt["flux"] = (dt["flux"] * sedf).to(f_unit)
ndt["flux_error_lo"] = (dt["flux_error_lo"] * sedf).to(f_unit)
ndt["flux_error_hi"] = (dt["flux_error_hi"] * sedf).to(f_unit)
return ndt
data_list = []
for group, dt in enumerate(data_table):
dt_val = _validate_single_data_table(dt, group=group)
data_list.append(dt_val)
# concatenate input data tables
data_new = data_list[0].copy()
f_pt = data_new["flux"].unit.physical_type
if sed is None:
sed = f_pt in ["flux", "power"]
data_new = dt_sed_conversion(data_new, sed)
for dt in data_list[1:]:
nf_pt = dt["flux"].unit.physical_type
if ("flux" in nf_pt and "power" in f_pt) or (
"power" in nf_pt and "flux" in f_pt
):
raise TypeError(
"The physical types of the data tables could not be "
"matched: Some are in flux and others in luminosity units"
)
dt = dt_sed_conversion(dt, sed)
for row in dt:
data_new.add_row(row)
return data_new
def _validate_single_data_table(data_table, group=0):
data = QTable()
flux_types = ["flux", "differential flux", "power", "differential power"]
# Energy and flux arrays
data["energy"] = validate_column(data_table, "energy", "energy")
data["flux"] = validate_column(data_table, "flux", flux_types)
# Flux uncertainties
if "flux_error" in data_table.keys():
dflux = validate_column(data_table, "flux_error", flux_types)
data["flux_error_lo"] = dflux
data["flux_error_hi"] = dflux
elif (
"flux_error_lo" in data_table.keys()
and "flux_error_hi" in data_table.keys()
):
data["flux_error_lo"] = validate_column(
data_table, "flux_error_lo", flux_types
)
data["flux_error_hi"] = validate_column(
data_table, "flux_error_hi", flux_types
)
else:
raise TypeError(
"Data table does not contain required column"
' "flux_error" or columns "flux_error_lo"'
' and "flux_error_hi"'
)
if "group" in data_table.colnames:
# avoid overwriting groups
data["group"] = data_table["group"]
else:
data["group"] = [group] * len(data["energy"])
# Energy bin edges
if "energy_width" in data_table.keys():
energy_width = validate_column(data_table, "energy_width", "energy")
data["energy_error_lo"] = energy_width / 2.0
data["energy_error_hi"] = energy_width / 2.0
elif "energy_error" in data_table.keys():
energy_error = validate_column(data_table, "energy_error", "energy")
data["energy_error_lo"] = energy_error
data["energy_error_hi"] = energy_error
elif (
"energy_error_lo" in data_table.keys()
and "energy_error_hi" in data_table.keys()
):
energy_error_lo = validate_column(
data_table, "energy_error_lo", "energy"
)
data["energy_error_lo"] = energy_error_lo
energy_error_hi = validate_column(
data_table, "energy_error_hi", "energy"
)
data["energy_error_hi"] = energy_error_hi
elif "energy_lo" in data_table.keys() and "energy_hi" in data_table.keys():
energy_lo = validate_column(data_table, "energy_lo", "energy")
data["energy_error_lo"] = data["energy"] - energy_lo
energy_hi = validate_column(data_table, "energy_hi", "energy")
data["energy_error_hi"] = energy_hi - data["energy"]
else:
(
data["energy_error_lo"],
data["energy_error_hi"],
) = generate_energy_edges(data["energy"], groups=data["group"])
# Upper limit flags
if "ul" in data_table.keys():
# Check if it is a integer or boolean flag
ul_col = data_table["ul"]
if ul_col.dtype.type is np.int_ or ul_col.dtype.type is np.bool_:
data["ul"] = np.array(ul_col, dtype=bool)
elif ul_col.dtype.type is np.str_:
strbool = True
for ul in ul_col:
if ul != "True" and ul != "False":
strbool = False
if strbool:
data["ul"] = np.array(
[ast.literal_eval(ul) for ul in ul_col], dtype=bool
)
else:
raise TypeError("UL column is in wrong format")
else:
raise TypeError("UL column is in wrong format")
else:
data["ul"] = np.array([False] * len(data["energy"]))
if "flux_ul" in data_table.keys():
data["flux"][data["ul"]] = u.Quantity(data_table["flux_ul"])[
data["ul"]
]
HAS_CL = False
if "keywords" in data_table.meta.keys():
if "cl" in data_table.meta["keywords"].keys():
HAS_CL = True
CL = validate_scalar(
"cl", data_table.meta["keywords"]["cl"]["value"]
)
data["cl"] = [CL] * len(data["energy"])
if not HAS_CL:
data["cl"] = [0.9] * len(data["energy"])
if np.sum(data["ul"]) > 0:
log.warning(
'"cl" keyword not provided in input data table, upper limits'
" will be assumed to be at 90% confidence level"
)
return data
# Convenience tools
[docs]def sed_conversion(energy, model_unit, sed):
"""
Manage conversion between differential spectrum and SED
"""
model_pt = model_unit.physical_type
is_integral = model_pt in {
u.get_physical_type("power"),
u.get_physical_type("energy"),
u.get_physical_type("energy flux"),
}
is_differential = model_pt in {
u.get_physical_type("differential flux"),
u.get_physical_type("differential power"),
u.get_physical_type("differential energy"),
u.get_physical_type("differential number density"),
}
ones = np.ones(energy.shape)
if (sed and is_integral) or (not sed and is_differential):
sedf = ones
elif sed and is_differential:
sedf = energy ** 2
elif not sed and is_integral:
sedf = 1 / (energy ** 2)
else:
raise u.UnitsError(
"Model physical type ({0}) is not supported".format(model_pt),
"Supported physical types are: power, flux, differential"
" power, differential flux",
)
is_energy_flux = model_pt in {
u.get_physical_type("energy"),
u.get_physical_type("differential energy"),
}
is_particle_flux = model_pt in {
u.get_physical_type("energy flux"),
u.get_physical_type("differential flux"),
}
if sed:
if is_energy_flux:
f_unit = u.erg
elif is_particle_flux:
f_unit = u.erg / u.s / u.cm ** 2
else:
f_unit = u.erg / u.s
else:
if is_energy_flux:
f_unit = u.Unit("1/TeV")
elif is_particle_flux:
f_unit = u.Unit("1/(s TeV cm2)")
else:
f_unit = u.Unit("1/(s TeV)")
log.debug(
"Converted from {0} ({1}) into {2} ({3}) for sed={4}".format(
model_unit, model_pt, f_unit, f_unit.physical_type, sed
)
)
return f_unit, sedf
def trapz_loglog(y, x, axis=-1, intervals=False):
"""
Integrate along the given axis using the composite trapezoidal rule in
loglog space.
Integrate `y` (`x`) along given axis in loglog space.
Parameters
----------
y : array_like
Input array to integrate.
x : array_like, optional
Independent variable to integrate over.
axis : int, optional
Specify the axis.
Returns
-------
trapz : float
Definite integral as approximated by trapezoidal rule in loglog space.
"""
try:
y_unit = y.unit
y = y.value
except AttributeError:
y_unit = 1.0
try:
x_unit = x.unit
x = x.value
except AttributeError:
x_unit = 1.0
y = np.asanyarray(y)
x = np.asanyarray(x)
slice1 = [slice(None)] * y.ndim
slice2 = [slice(None)] * y.ndim
slice1[axis] = slice(None, -1)
slice2[axis] = slice(1, None)
slice1 = tuple(slice1)
slice2 = tuple(slice2)
if x.ndim == 1:
shape = [1] * y.ndim
shape[axis] = x.shape[0]
x = x.reshape(shape)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Compute the power law indices in each integration bin
b = np.log10(y[slice2] / y[slice1]) / np.log10(x[slice2] / x[slice1])
# if local powerlaw index is -1, use \int 1/x = log(x); otherwise use
# normal powerlaw integration
trapzs = np.where(
np.abs(b + 1.0) > 1e-10,
(
y[slice1]
* (x[slice2] * (x[slice2] / x[slice1]) ** b - x[slice1])
)
/ (b + 1),
x[slice1] * y[slice1] * np.log(x[slice2] / x[slice1]),
)
tozero = (y[slice1] == 0.0) + (y[slice2] == 0.0) + (x[slice1] == x[slice2])
trapzs[tozero] = 0.0
if intervals:
return trapzs * x_unit * y_unit
ret = np.add.reduce(trapzs, axis) * x_unit * y_unit
return ret
def _generate_energy_edges_single(ene):
"""Generate energy edges for single group"""
midene = np.sqrt((ene[1:] * ene[:-1]))
elo, ehi = np.zeros(len(ene)) * ene.unit, np.zeros(len(ene)) * ene.unit
elo[1:] = ene[1:] - midene
ehi[:-1] = midene - ene[:-1]
elo[0] = ene[0] * (1 - ene[0] / (ene[0] + ehi[0]))
ehi[-1] = elo[-1]
return u.Quantity([elo, ehi])
[docs]def generate_energy_edges(ene, groups=None):
"""Generate energy bin edges from given energy array.
Generate an array of energy edges from given energy array to be used as
abcissa error bar limits when no energy uncertainty or energy band is
provided.
Parameters
----------
ene : `astropy.units.Quantity` array instance
1-D array of energies with associated phsyical units.
Returns
-------
energy_err_lo, energy_error_hi : `astropy.units.Quantity` arrays
Arrays of low and high energy edges corresponding to each given energy
of the input array.
"""
if groups is None or len(ene) != len(groups):
return _generate_energy_edges_single(ene)
else:
eloehi = np.zeros((2, len(ene))) * ene.unit
for g in np.unique(groups):
group_edges = _generate_energy_edges_single(ene[groups == g])
eloehi[:, groups == g] = group_edges
# hstack throws away units
return eloehi
[docs]def build_data_table(
energy,
flux,
flux_error=None,
flux_error_lo=None,
flux_error_hi=None,
energy_width=None,
energy_lo=None,
energy_hi=None,
ul=None,
cl=None,
):
"""
Read data into data dict.
Parameters
----------
energy : :class:`~astropy.units.Quantity` array instance
Observed photon energy array [physical type ``energy``]
flux : :class:`~astropy.units.Quantity` array instance
Observed flux array [physical type ``flux`` or ``differential flux``]
flux_error, flux_error_hi, flux_error_lo : :class:`~astropy.units.Quantity`
array instance
68% CL gaussian uncertainty of the flux [physical type ``flux`` or
``differential flux``]. Either ``flux_error`` (symmetrical uncertainty)
or ``flux_error_hi`` and ``flux_error_lo`` (asymmetrical uncertainties)
must be provided.
energy_width, energy_lo, energy_hi : :class:`~astropy.units.Quantity` array
instance, optional
Width of the energy bins [physical type ``energy``]. Either
``energy_width`` (bin width) or ``energy_lo`` and ``energy_hi``
(Energies of the lower and upper bin edges) can be provided. If none
are provided, ``generate_energy_edges`` will be used.
ul : boolean or int array, optional
Boolean array indicating which of the flux values given in ``flux``
correspond to upper limits.
cl : float, optional
Confidence level of the flux upper limits given by ``ul``.
Returns
-------
data : :class:`astropy.table.QTable`
Data stored in an astropy Table.
"""
table = QTable()
if cl is not None:
cl = validate_scalar("cl", cl)
table.meta["keywords"] = {"cl": {"value": cl}}
table["energy"] = energy
if energy_width is not None:
table["energy_width"] = energy_width
elif energy_lo is not None and energy_hi is not None:
table["energy_lo"] = energy_lo
table["energy_hi"] = energy_hi
table["flux"] = flux
if flux_error is not None:
table["flux_error"] = flux_error
elif flux_error_lo is not None and flux_error_hi is not None:
table["flux_error_lo"] = flux_error_lo
table["flux_error_hi"] = flux_error_hi
else:
raise TypeError("Flux error not provided!")
if ul is not None:
ul = np.array(ul, dtype=int)
table["ul"] = ul
table.meta["comments"] = ["Table generated with naima.build_data_table"]
# test table units, format, etc
validate_data_table(table)
return table
[docs]def estimate_B(
xray_table, vhe_table, photon_energy_density=0.261 * u.eV / u.cm ** 3
):
"""Estimate magnetic field from synchrotron to Inverse Compton luminosity
ratio
Estimate the magnetic field from the ratio of X-ray to gamma-ray emission
according to:
.. math::
\\frac{L_\\mathrm{xray}}{L_\\gamma} =
\\frac{u_\\mathrm{B}}{u_\\mathrm{ph}} =
\\frac{B^2}{ 8 \\pi u_\\mathrm{ph}}
where :math:`L_\\mathrm{xray}` is the X-ray luminosity, :math:`L_\\gamma`
is the gamma-ray luminosity, and :math:`u_\\mathrm{ph}` is the seed photon
field energy density.
Note that this assumes that the ratio of observed fluxes is equal to the
ratio of bolometric synchrotron and IC luminosities, and that IC proceeds
in the Thomson regims. This assumption is safe as long as the X-ray and
gamma-ray emission contain the bulk of the bolometric emission (i.e., the
peak in the SED is in the X-ray and gamma-ray observed bands). Even if the
assumption does not hold, this is a good starting point for the magnetic
field when doing simultaneous X-ray and gamma-ray spectral fits.
Parameters
----------
xray_table : :class:`~astropy.table.Table`
Data table (see :ref:`dataformat` for details on the format) containing
the X-ray spectrum.
vhe_table : :class:`~astropy.table.Table`
Data table (see :ref:`dataformat` for details on the format) containing
the HE/VHE gamma-ray spectrum.
photon_energy_density : :class:`~astropy.units.Quantity` float, optional
Energy density of the seed photon field for IC emission. Defaults to
0.261 eV/cm3, the energy density of the CMB.
Returns
-------
B : :class:`~astropy.units.Quantity` float
Estimate of the magnetic flux density at the emitter.
"""
xray = validate_data_table(xray_table, sed=False)
vhe = validate_data_table(vhe_table, sed=False)
xray_lum = trapz_loglog(xray["flux"] * xray["energy"], xray["energy"])
vhe_lum = trapz_loglog(vhe["flux"] * vhe["energy"], vhe["energy"])
uph = (photon_energy_density.to("erg/cm3")).value
B0 = (
np.sqrt((xray_lum / vhe_lum).decompose().value * 8 * np.pi * uph) * u.G
).to("uG")
return B0