# -*- coding: utf-8 -*-
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import warnings
from collections.abc import Iterable
from multiprocessing import Pool
import astropy.units as u
import emcee
import numpy as np
from astropy import log
from .utils import sed_conversion, validate_data_table
__all__ = [
"normal_prior",
"uniform_prior",
"log_uniform_prior",
"get_sampler",
"run_sampler",
]
# Define phsyical types used in plot and utils.validate_data_table
u.def_physical_type(u.erg / u.cm**2 / u.s, "flux")
u.def_physical_type(u.Unit("1/(s cm2 erg)"), "differential flux")
u.def_physical_type(u.Unit("1/(s erg)"), "differential power")
u.def_physical_type(u.Unit("1/TeV"), "differential energy")
u.def_physical_type(u.Unit("1/cm3"), "number density")
u.def_physical_type(u.Unit("1/(eV cm3)"), "differential number density")
# Prior functions
[docs]
def normal_prior(value, mean, sigma):
"""Normal prior distribution."""
return -0.5 * (2 * np.pi * sigma) - (value - mean) ** 2 / (2.0 * sigma)
def log_uniform_prior(value, umin=0, umax=None):
"""Log-uniform prior distribution."""
if value > 0 and value >= umin:
if umax is not None:
if value <= umax:
return 1 / value
else:
return -np.inf
else:
return 1 / value
else:
return -np.inf
# Probability function
def lnprobmodel(model, data):
# Check if conversion is required
model_is_sed = model.unit.physical_type in ["power", "flux"]
data_is_sed = data["flux"].unit.physical_type in ["power", "flux"]
if model_is_sed != data_is_sed:
unit, sed_factor = sed_conversion(data["energy"], model.unit, data_is_sed)
model = (model * sed_factor).to(data["flux"].unit)
ul = data["ul"]
notul = ~ul
difference = model[notul] - data["flux"][notul]
# use different errors for model above or below data
sign = difference > 0
loerr, hierr = 1 * ~sign, 1 * sign
logprob = -(difference**2) / (
2.0
* (loerr * data["flux_error_lo"][notul] + hierr * data["flux_error_hi"][notul])
** 2
)
totallogprob = np.sum(logprob)
if np.sum(ul) > 0:
# deal with upper limits at CL set by data['cl']
violated_uls = np.sum(model[ul] > data["flux"][ul])
totallogprob += violated_uls * np.log(1.0 - data["cl"][violated_uls])
return totallogprob
def lnprob(pars, data, modelfunc, priorfunc):
if priorfunc is None:
lnprob_priors = 0.0
else:
lnprob_priors = priorfunc(pars)
modelout = modelfunc(pars, data)
# Save blobs or save model if no blobs given
if isinstance(modelout, Iterable) and not isinstance(modelout, np.ndarray):
model = modelout[0]
blob = modelout
else:
model = modelout
# Avoid squeezing of the model output in order to retain units by
# adding a second dummy object blob
blob = (modelout, np.nan)
if not np.isinf(lnprob_priors):
lnprob_model = lnprobmodel(model, data)
total_lnprob = lnprob_model + lnprob_priors
else:
total_lnprob = lnprob_priors
return (total_lnprob, *blob)
# Sampler funcs
def _run_mcmc(sampler, pos, nrun):
for i, state in enumerate(sampler.sample(pos, iterations=nrun, store=True)):
progress = 100.0 * float(i) / float(nrun)
if progress % 5 < (5.0 / float(nrun)):
print(
"\nProgress of the run: {0:.0f} percent ({1} of {2} steps)".format(
int(progress), i, nrun
)
)
paravg, parstd = [], []
npars = sampler.get_chain().shape[-1]
for npar in range(npars):
paravg.append(np.median(state.coords[:, npar]))
parstd.append(np.std(state.coords[:, npar]))
print(
" "
+ (" ".join(["{%i:-^15}" % i for i in range(npars)])).format(
*sampler.labels
)
)
print(
" Last ensemble median : "
+ (" ".join(["{%i:^15.3g}" % i for i in range(npars)])).format(*paravg)
)
print(
" Last ensemble std : "
+ (" ".join(["{%i:^15.3g}" % i for i in range(npars)])).format(*parstd)
)
print(
" Last ensemble lnprob : avg: {0:.3f}, max: {1:.3f}".format(
np.average(state.log_prob), np.max(state.log_prob)
)
)
return sampler, state
def _prefit(p0, data, model, prior):
P0_IS_ML = False
from .extern.minimize import minimize
def flat_prior(*args):
return 0.0
if prior is None:
prior = flat_prior
def nll(*args):
return -lnprob(*args)[0]
log.info("Finding Maximum Likelihood parameters through Nelder-Mead fitting...")
log.info(" Initial parameters: {0}".format(p0))
log.info(" Initial lnprob(p0): {0:.3f}".format(-nll(p0, data, model, prior)))
with warnings.catch_warnings():
warnings.simplefilter("ignore")
result = minimize(
nll,
p0,
args=(data, model, flat_prior),
method="Nelder-Mead",
options={"maxfev": 500, "xtol": 1e-1, "ftol": 1e-3},
)
ll_prior = lnprob(result["x"], data, model, prior)[0]
if (result["success"] or result["status"] == 1) and not np.isinf(ll_prior):
# also keep result if we have reached maxiter, it is likely
# better than p0
if result["status"] == 1:
log.info(" Maximum number of function evaluations reached!")
if result["status"] == 1:
log.info(" New parameters : {0}".format(result["x"]))
else:
log.info(" New ML parameters : {0}".format(result["x"]))
P0_IS_ML = True
if -result["fun"] == ll_prior:
log.info(" Maximum lnprob(p0): {0:.3f}".format(-result["fun"]))
else:
log.info("flat prior lnprob(p0): {0:.3f}".format(-result["fun"]))
log.info("full prior lnprob(p0): {0:.3f}".format(ll_prior))
p0 = result["x"]
elif np.isinf(ll_prior):
log.warning(
"Maximum Likelihood procedure converged on a parameter"
" vector forbidden by prior,"
" using original parameters for MCMC"
)
else:
log.warning(
"Maximum Likelihood procedure failed to converge,"
" using original parameters for MCMC"
)
return p0, P0_IS_ML
[docs]
def get_sampler(
data_table=None,
p0=None,
model=None,
prior=None,
nwalkers=500,
nburn=100,
guess=True,
interactive=False,
prefit=False,
labels=None,
threads=None,
data_sed=None,
):
"""Generate a new MCMC sampler.
Parameters
----------
data_table : `~astropy.table.Table` or list of `~astropy.table.Table`
Table containing the observed spectrum. If multiple tables are passed
as a string, they will be concatenated in the order given. Each table
needs at least these columns, with the appropriate associated units
(with the physical type indicated in brackets below) as either a
`~astropy.units.Unit` instance or parseable string:
- ``energy``: Observed photon energy [``energy``]
- ``flux``: Observed fluxes [``flux`` or ``differential flux``]
- ``flux_error``: 68% CL gaussian uncertainty of the flux [``flux`` or
``differential flux``]. It can also be provided as ``flux_error_lo``
and ``flux_error_hi`` (see below).
Optional columns:
- ``energy_width``: Width of the energy bin [``energy``], or
- ``energy_error``: Half-width of the energy bin [``energy``], or
- ``energy_error_lo`` and ``energy_error_hi``: Distance from bin center
to lower and upper bin edges [``energy``], or
- ``energy_lo`` and ``energy_hi``: Energy edges of the corresponding
energy bin [``energy``]
- ``flux_error_lo`` and ``flux_error_hi``: 68% CL gaussian lower and
upper uncertainties of the flux.
- ``ul``: Flag to indicate that a flux measurement is an upper limit.
- ``flux_ul``: Upper limit to the flux. If not present, the ``flux``
column will be taken as an upper limit for those measurements with
the ``ul`` flag set to True or 1.
The ``keywords`` metadata field of the table can be used to provide the
confidence level of the upper limits with the keyword ``cl``, which
defaults to 90%. The `astropy.io.ascii` reader can recover all
the needed information from ASCII tables in the
:class:`~astropy.io.ascii.Ipac` and :class:`~astropy.io.ascii.Daophot`
formats, and everything except the ``cl`` keyword from tables in the
:class:`~astropy.io.ascii.Sextractor`. For the latter format, the cl
keyword can be added after reading the table with::
data.meta['keywords']['cl']=0.99
p0 : array
Initial position vector. The distribution for the ``nwalkers`` walkers
will be computed as a multidimensional gaussian of width 5% around the
initial position vector ``p0``.
model : function
A function that takes a vector in the parameter space and the data
dictionary, and returns the expected fluxes at the energies in the
spectrum. Additional return objects will be saved as blobs in the
sampler chain, see `the emcee documentation for the
format
<http://dan.iel.fm/emcee/current/user/advanced/#arbitrary-metadata-blobs>`_.
prior : function, optional
A function that takes a vector in the parameter space and returns the
log-likelihood of the Bayesian prior. Parameter limits can be specified
through a uniform prior, returning 0. if the vector is within the
parameter bounds and ``-np.inf`` otherwise.
nwalkers : int, optional
The number of Goodman & Weare “walkers”. Default is 500.
nburn : int, optional
Number of burn-in steps. After ``nburn`` steps, the sampler is reset
and chain history discarded. It is necessary to settle the sampler into
the maximum of the parameter space density. Default is 100.
labels : iterable of strings, optional
Labels for the parameters included in the position vector ``p0``. If
not provided ``['par1','par2', ... ,'parN']`` will be used.
threads : int, optional
Number of parallel processes to use for sampling. Default is the number
of CPU cores.
guess : bool, optional
Whether to attempt to guess the normalization (first) parameter of the
model. Default is True.
interactive : bool, optional
Whether to launch the interactive fitting window to set the initial
values for the prefitting or the MCMC run. Requires matplotlib. Default
is False.
prefit : bool, optional
Whether to attempt to find the maximum likelihood parameters with a
Nelder-Mead algorithm and use them as starting point of the MCMC run.
The parameter values in `p0` will be used as starting points for the
minimization. Note that the initial optimization is done without taking
the prior function into account to avoid the possibility of infinite
values in the objective function. If the best-fit parameter vector
without prior is forbidden by the prior given, it will be discarded.
data_sed : bool, optional
When providing more than one data table, whether to convert them to SED
format. If unset or None, all tables will be converted to the format of
the first table.
Returns
-------
sampler : :class:`~emcee.EnsembleSampler` instance
Ensemble sampler with walker positions after ``nburn`` burn-in steps.
pos : :class:`~numpy.ndarray`
Final position vector array.
See also
--------
emcee.EnsembleSampler
"""
if data_table is None:
raise TypeError("Data table is missing!")
else:
data = validate_data_table(data_table, sed=data_sed)
if model is None:
raise TypeError("Model function is missing!")
# Add parameter labels if not provided or too short
if labels is None:
# First is normalization
labels = ["norm"] + ["par{0}".format(i) for i in range(1, len(p0))]
elif len(labels) < len(p0):
labels += ["par{0}".format(i) for i in range(len(labels), len(p0))]
# Check that the model returns fluxes in same physical type as data
modelout = model(p0, data)
if (type(modelout) is tuple or type(modelout) is list) and (
type(modelout) is not np.ndarray
):
spec = modelout[0]
else:
spec = modelout
# check whether both can be converted to same physical type through
# sed_conversion
try:
# If both can be converted to differential flux, they can be compared
# Otherwise, sed_conversion will raise a u.UnitsError
sed_conversion(data["energy"], spec.unit, False)
sed_conversion(data["energy"], data["flux"].unit, False)
except u.UnitsError:
raise u.UnitsError(
"The physical type of the model and data units are not compatible,"
" please modify your model or data so they match:\n"
" Model units: {0} [{1}]\n Data units: {2} [{3}]\n".format(
spec.unit,
spec.unit.physical_type,
data["flux"].unit,
data["flux"].unit.physical_type,
)
)
if guess:
normNames = ["norm", "ampl", "we", "wp"]
normNameslog = ["log({0}".format(name) for name in normNames]
normNameslog10 = ["log10({0}".format(name) for name in normNames]
normNames += normNameslog + normNameslog10
idxs = []
for normName in normNames:
for l2 in labels:
if l2.lower().startswith(normName):
# check with startswith to include normalization,
# amplitude, etc.
idxs.append(labels.index(l2))
if len(idxs) == 1:
nunit, sedf = sed_conversion(data["energy"], spec.unit, False)
currFlux = np.trapezoid(
data["energy"] * (spec * sedf).to(nunit), data["energy"]
)
nunit, sedf = sed_conversion(data["energy"], data["flux"].unit, False)
dataFlux = np.trapezoid(
data["energy"] * (data["flux"] * sedf).to(nunit),
data["energy"],
)
ratio = dataFlux / currFlux
if labels[idxs[0]].startswith("log("):
p0[idxs[0]] += np.log(ratio)
elif labels[idxs[0]].startswith("log10("):
p0[idxs[0]] += np.log10(ratio)
else:
p0[idxs[0]] *= ratio
elif len(idxs) == 0:
log.warning(
"No label starting with [{0}] found: not applying"
" normalization guess.".format(",".join(normNames))
)
elif len(idxs) > 1:
log.warning(
"More than one label starting with [{0}] found:"
" not applying normalization guess.".format(",".join(normNames))
)
P0_IS_ML = False
if interactive:
try:
log.info("Launching interactive model fitter, close when finished")
import matplotlib.pyplot as plt
from .model_fitter import InteractiveModelFitter
iprev = plt.rcParams["interactive"]
plt.rcParams["interactive"] = False
imf = InteractiveModelFitter(model, p0, data, labels=labels, sed=True)
p0 = imf.pars
P0_IS_ML = imf.P0_IS_ML
plt.rcParams["interactive"] = iprev
except ImportError as e:
log.warning(
"Interactive fitting is not available because"
" matplotlib is not installed: {0}".format(e)
)
# If we already did the prefit call in ModelWidget (and didn't modify the
# parameters afterwards), avoid doing it here
if prefit and not P0_IS_ML:
p0, P0_IS_ML = _prefit(p0, data, model, prior)
sampler = emcee.EnsembleSampler(
nwalkers,
len(p0),
lnprob,
args=[data, model, prior],
pool=Pool(threads),
blobs_dtype=np.dtype(object),
)
# Add data and parameters properties to sampler
sampler.data_table = data_table
sampler.data = data
sampler.labels = labels
# Add model function to sampler
sampler.modelfn = model
# Add run_info dict
sampler.run_info = {
"n_walkers": nwalkers,
"n_burn": nburn,
# convert from np.float to regular float
"p0": [float(p) for p in p0],
"guess": guess,
}
# Initialize walkers in a ball of relative size 0.5% in all dimensions if
# the parameters have been fit to their ML values, or to 10% otherwise
spread = 0.005 if P0_IS_ML else 0.1
p0var = np.array([spread * pp for pp in p0])
p0 = np.vstack(
[p0 + p0var * np.random.normal(size=len(p0)) for i in range(nwalkers)]
)
if nburn > 0:
print("Burning in the {0} walkers with {1} steps...".format(nwalkers, nburn))
sampler, state = _run_mcmc(sampler, p0, nburn)
else:
state = emcee.State(p0)
sampler.run_info["p0_burn_median"] = [
float(p) for p in np.median(state.coords, axis=0)
]
return sampler, state
[docs]
def run_sampler(nrun=100, sampler=None, pos=None, **kwargs):
"""Run an MCMC sampler.
If no sampler or initial position vector is provided, extra ``kwargs`` are
passed to `get_sampler` to generate a new sampler.
Parameters
----------
nrun : int, optional
Number of steps to run
sampler : :class:`~emcee.EnsembleSampler` instance, optional
Sampler.
pos : :class:`~numpy.ndarray`, optional
A list of initial position vectors for the walkers. It should have
dimensions of ``(nwalkers,dim)``, where ``dim`` is the number of free
parameters. `emcee.utils.sample_ball` can be used to generate a
multidimensional gaussian distribution around a single initial
position.
Returns
-------
sampler : :class:`~emcee.EnsembleSampler` instance
Sampler containing the paths of the walkers during the ``nrun`` steps.
pos : array
List of final position vectors after the run.
"""
if sampler is None or pos is None:
sampler, pos = get_sampler(**kwargs)
sampler.run_info["n_run"] = nrun
print("\nWalker burn in finished, running {0} steps...".format(nrun))
sampler.reset()
sampler, pos = _run_mcmc(sampler, pos, nrun)
return sampler, pos