#! /usr/bin/env python
"""
Module with utility functions to the MCMC (``emcee``) sampling for
parameter estimation.
"""
__author__ = 'V. Christiaens, O. Wertz, Carlos Alberto Gomez Gonzalez'
__all__ = ['gelman_rubin',
'gelman_rubin_from_chain',
'autocorr_test']
import numpy as np
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
[docs]def gelman_rubin(x):
"""
Determine the Gelman-Rubin \hat{R} statistical test between Markov chains.
Parameters
----------
x: numpy.array
The numpy.array on which the Gelman-Rubin test is applied. This array
should contain at least 2 set of data, i.e. x.shape >= (2,).
Returns
-------
out: float
The Gelman-Rubin \hat{R}.
Example
-------
>>> x1 = np.random.normal(0.0,1.0,(1,100))
>>> x2 = np.random.normal(0.1,1.3,(1,100))
>>> x = np.vstack((x1,x2))
>>> gelman_rubin(x)
1.0366629898991262
>>> gelman_rubin(np.vstack((x1,x1)))
0.99
"""
if np.shape(x) < (2,):
msg = 'Gelman-Rubin diagnostic requires multiple chains of the same '
msg += 'length'
raise ValueError(msg)
try:
m, n = np.shape(x)
except ValueError:
print("Bad shape for the chains")
return
# Calculate between-chain variance
B_over_n = np.sum((np.mean(x, 1) - np.mean(x)) ** 2) / (m - 1)
# Calculate within-chain variances
W = np.sum([(x[i] - xbar) ** 2 for i, xbar in
enumerate(np.mean(x, 1))]) / (m * (n - 1))
# (over) estimate of variance
s2 = W * (n - 1) / n + B_over_n
# Pooled posterior variance estimate
V = s2 + B_over_n / m
# Calculate PSRF
R = V / W
return R
[docs]def gelman_rubin_from_chain(chain, burnin):
""" Pack the MCMC chain and determine the Gelman-Rubin \hat{R} statistical
test. In other words, two sub-sets are extracted from the chain (burnin
parts are taken into account) and the Gelman-Rubin statistical test is
performed.
Parameters
----------
chain: numpy.array
The MCMC chain with the shape walkers x steps x model_parameters
burnin: float \in [0,1]
The fraction of a walker which is discarded.
Returns
-------
out: float
The Gelman-Rubin \hat{R}.
"""
dim = chain.shape[2]
k = chain.shape[1]
thr0 = int(np.floor(burnin*k))
thr1 = int(np.floor((1-burnin) * k * 0.25))
rhat = np.zeros(dim)
for j in range(dim):
part1 = chain[:, thr0:thr0+thr1, j].reshape((-1))
part2 = chain[:, thr0+3*thr1:thr0+4*thr1, j].reshape((-1))
series = np.vstack((part1, part2))
rhat[j] = gelman_rubin(series)
return rhat
def next_pow_two(n):
i = 1
while i < n:
i = i << 1
return i
def autocorr_func_1d(x, norm=True):
x = np.atleast_1d(x)
if len(x.shape) != 1:
raise ValueError("invalid dimensions for 1D autocorrelation function")
n = next_pow_two(len(x))
# Compute the FFT and then (from that) the auto-correlation function
f = np.fft.fft(x - np.mean(x), n=2 * n)
acf = np.fft.ifft(f * np.conjugate(f))[: len(x)].real
acf /= 4 * n
# Optionally normalize
if norm:
acf /= acf[0]
return acf
def auto_window(taus, c):
m = np.arange(len(taus)) < c * taus
if np.any(m):
return np.argmin(m)
return len(taus) - 1
def autocorr(y, c=5.0):
f = np.zeros(y.shape[1])
for yy in y:
f += autocorr_func_1d(yy)
f /= len(y)
taus = 2.0 * np.cumsum(f) - 1.0
window = auto_window(taus, c)
return taus[window]
[docs]def autocorr_test(chain):
"""
Function to measure the auto-correlation 'timescale' of the chain,
normalized by the length of the whole chain up to that point. This metrics
can then be used to check if the chain has likely converged. More details
here: https://emcee.readthedocs.io/en/stable/tutorials/autocorr/
Parameters
----------
chain: numpy.array
The numpy.array on which the auto-correlation is calculated.
Returns
-------
tau/N: float
The normalized auto-correlation timescale.
"""
N = chain.shape[1]
tau = autocorr(chain)
return tau/N