# -*- coding: utf-8 -*-
'''
Quantitative Uncertainty Analysis for Diffraction (QUAD)
========================================================
Copyright (c) 2018, North Carolina State University
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. The names “North Carolina State University”, “NCSU” and any trade‐name,
personal name, trademark, trade device, service mark, symbol, image, icon, or
any abbreviation, contraction or simulation thereof owned by North Carolina
State University must not be used to endorse or promote products derived from
this software without prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
###############################################################################
'''
# Import specific functions
from __future__ import division # Use "real" division
from .utilities import gsas_install_display
# Import modules
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from timeit import default_timer as timer # Timing function
from scipy.stats import norm # Normal distribution
from scipy.stats import multivariate_normal as mvnorm # Multivariate normal
import statsmodels.api as sm # Library for lowess smoother
lowess = sm.nonparametric.lowess # Lowess smoothing function
try: # Bspline function
from .bspline import Bspline
except ModuleNotFoundError:
from bspline import Bspline
try: # Bspline helper function
from .splinelab import augknt
except ModuleNotFoundError:
from bspline.splinelab import augknt
try:
from .gsas_tools import Calculator as gsas_calculator
except ModuleNotFoundError:
gsas_install_display(module='gsas_tools')
[docs]def define_lattice(paramList, Calc):
'''
Convert the A values refined in GSAS-II to unit cell parameters
(a, b, c, alpha, beta, gamma). A = [G11, G22, G33, 2*G12, 2*G13, 2*G23]
where the G elements make up the reciprocal metric tensor. Add unit cell
parameters to the paramList to be refined in QUAD and add the respective
starting values to the starting parameter values list (start). Capabale of
handling crystal systems of cubic, tetragonal, hexagonal, rhombohedral,
trigonal, orthorhombic, monoclinic, and triclinic and can refine lattice
parameters of multiple phases.
Args:
* **paramList** (:py:class:`list`): List of parameter names for
refinement - size (q).
* **start** (:py:class:`list`): List of initial parameter values
in parameter space - size (q).
* **Calc** (:class:`.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
Returns:
* 2-tuple. Tuple entries are
#. **paramList** (:py:class:`list`): Updated list of parameter names
for refinement - size (q).
#. **start** (:class:`~numpy.ndarray`): Updated list of initial
parameter values in parameter space - size (q).
'''
start = [Calc._parmDict[param] for param in paramList if (
'::A0' not in param and
'::A1' not in param and
'::A2' not in param and
'::A3' not in param and
'::A4' not in param and
'::A5' not in param)]
latparamList = []
for phase_index in range(len(Calc._phase)):
latparamList.append(str(phase_index) + '::A0')
check = any(item in paramList for item in latparamList)
if check is True:
for jj in range(len(Calc._phase)):
symmetry = Calc._symmetry[Calc._phase[jj]]
A = [Calc._parmDict[str(jj) + '::A0'],
Calc._parmDict[str(jj) + '::A1'],
Calc._parmDict[str(jj) + '::A2'],
Calc._parmDict[str(jj) + '::A3'],
Calc._parmDict[str(jj) + '::A4'],
Calc._parmDict[str(jj) + '::A5']]
latparams = [str(jj) + '::A0',
str(jj) + '::A1',
str(jj) + '::A2',
str(jj) + '::A3',
str(jj) + '::A4',
str(jj) + '::A5']
unitcell = Calc.convert_lattice(latparams, A)
paramList = [item for item in paramList if item not in latparams]
if symmetry == 'cubic':
paramList.append(str(jj) + '::a')
start.append(unitcell[0])
elif symmetry == 'tetragonal':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::c')
start.append(unitcell[0])
start.append(unitcell[2])
elif symmetry == 'orthorhombic':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::b')
paramList.append(str(jj) + '::c')
start.append(unitcell[0])
start.append(unitcell[1])
start.append(unitcell[2])
elif symmetry == 'hexagonal':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::c')
start.append(unitcell[0])
start.append(unitcell[2])
elif symmetry == 'trigonal':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::c')
start.append(unitcell[0])
start.append(unitcell[2])
elif symmetry == 'rhombohedral':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::alpha')
start.append(unitcell[0])
start.append(unitcell[3])
elif symmetry == 'monoclinic':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::b')
paramList.append(str(jj) + '::c')
paramList.append(str(jj) + '::beta')
start.append(unitcell[0])
start.append(unitcell[1])
start.append(unitcell[2])
start.append(unitcell[4])
elif symmetry == 'triclinic':
paramList.append(str(jj) + '::a')
paramList.append(str(jj) + '::b')
paramList.append(str(jj) + '::c')
paramList.append(str(jj) + '::alpha')
paramList.append(str(jj) + '::beta')
paramList.append(str(jj) + '::gamma')
start.append(unitcell[0])
start.append(unitcell[1])
start.append(unitcell[2])
start.append(unitcell[3])
start.append(unitcell[4])
start.append(unitcell[5])
else:
raise ValueError("You have entered an invlaid lattie system."
"Thank you for shopping, please come again.")
else:
raise ValueError("No lattice parameters are refined. Lattice"
"definition is not needed.")
return paramList, start
[docs]def start2z(start, lower, upper):
'''
Transform starting parameter values to latent z-space for calculations.
Check the compatibility of the dimensions and values for the lower and
upper prior bounds.
Args:
* **start** (:py:class:`list`): List of initial parameter values
in parameter space - size (q).
* **upper** (:class:`~numpy.ndarray`): Vector of upper limits on a
uniform prior distribution in the parameter space - size (q,).
* **lower** (:class:`~numpy.ndarray`): Vector of lower limits on a
uniform prior distribution in the parameter space - size (q,).
Returns:
* **init_z** (:class:`~numpy.ndarray`): Vector of initial parameter
values in z-space - size (q,).
'''
if len(start) != len(lower):
raise ValueError("Number of lower bounds provided is not valid.")
if len(start) != len(upper):
raise ValueError("Number of upper bounds provided is not valid.")
jj = 0
for value in start:
if value > upper[jj]:
raise ValueError(f"""Upper bound is less than starting value.
Check entry number {jj}.""")
elif value < lower[jj]:
raise ValueError(f"""Lower bound is greater than starting value.
Check entry number {jj}.""")
elif abs(upper[jj] - value) == abs(value - lower[jj]):
raise ValueError(f"""Starting value must not be exactly in the
middle of bound range. Check entry {jj}.""")
else:
jj = jj + 1
# Set initial z-space values
init_z = norm.ppf((start-lower)/(upper-lower))
return init_z
[docs]def estimatecovariance(paramList, start, Calc, upper, lower,
x=None, y=None, L=20, delta=1e-3):
'''
Estimate the covariance of the initial parameter values to initialize the
proposal covarianc for DRAM. This is done by first calculating the
sensitivity matrix, :math:`\\chi`, through finite differences. Next, the
Fisher Information matrix is calulated as :math:`\\chi^T\\chi`. Finally,
the estimated covariance matrix is calculated as :math:`s^2*\\chi^T\\chi`,
where s is the standard deviation estimate.
Args:
* **paramList** (:py:class:`list`): List of parameter names for
refinement - size (q).
* **start** (:py:class:`list`): List of initial parameter values
in parameter space - size (q).
* **Calc** (:class:`.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
* **upper** (:class:`~numpy.ndarray`): Vector of upper limits on a
uniform prior distribution in the parameter space - size (q,).
* **lower** (:class:`~numpy.ndarray`): Vector of lower limits on a
uniform prior distribution in the parameter space - size (q,).
* **x** (:class:`~numpy.ndarray`): Vector of 2-theta values
- size (n,). Will be created from the GPX file if not
user-defined.
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,). Created from the GPX file if not
user-defined.
* **L** (:py:class:`int`): Number of cubic B-spline basis functions to
model the background intensity. Default is 20.
* **delta** (:py:class:`float`): Fraction by which the parameters are
perturbed with respect to the initial values for finite difference
calculation.
Returns:
* (:py:class:`dict`): Dictionary containing the estimated covariance
matrix, estimated variance, and the eigenvalues and eigenvectors of
the covariance matrix. The eigenvalues can be examined to determine
potentially non-indentifiable parameters by locating the eigenvalues
significantly close to zero in value.
'''
# Initialize inputs
init_z = start2z(start=start, lower=lower, upper=upper)
Calc.UpdateParameters(dict(zip(paramList, start)))
f0 = Calc.Calculate()
x, y = diffraction_file_data(x=x, y=y, Calc=Calc)
n = np.shape(y)[0]
p = np.shape(init_z)[0]
gamma = np.ones(L)
BG = np.matmul(calculate_bsplinebasis(x=x, L=L), gamma)
# Finite difference calculation
def finitediff(ii, init_z, paramList, f0, delta):
q_star = np.copy(init_z)
q_star[ii] = np.copy(init_z[ii])*(1+delta)
params_perturb = z2par(q_star, lower, upper)
Calc.UpdateParameters(dict(zip(paramList, params_perturb)))
f1 = Calc.Calculate()
diff = (f1-f0)/(delta*init_z[ii])
return diff
# Calculate sensitivity matrix
sensitivity = np.zeros([n, p])
for jj in range(0, np.shape(init_z)[0]):
sensitivity[:, jj] = finitediff(jj, init_z, paramList, f0, delta)
# Calculate fisher information matrix
fisher = np.dot(sensitivity.transpose(), sensitivity)
# Calcualte estimate for s^2
R = y-BG-f0 # residuals
s2 = (1./(n-p))*np.dot(R, R.transpose())
# Calculate covariance matrix
covariance = s2*np.linalg.inv(fisher)
evals = np.linalg.eig(covariance)
print("Covariance eignvalues:\n{}".format(evals[0]))
return dict(cov=covariance, s2=s2, evals=evals)
[docs]def z2par(z, lower, upper, grad=False):
'''
Transform between the bounded parameter space and continuous z-space.
Args:
* **z** (:class:`~numpy.ndarray`): Array of parameter values in
z-space - size (q,).
* **lower** (:class:`~numpy.ndarray`): Vector of lower limits on a
uniform prior distribution in the parameter space - size (q,).
* **upper** (:class:`~numpy.ndarray`): Vector of upper limits on a
uniform prior distribution in the parameter space - size (q,).
* **grad** (:py:class:`bool`): If False (default), converts from the
z-space to the parameter space.
Returns:
* **par** (:class:`~numpy.ndarray`): Array of parameter values in
parameter space from the given z values- size (q,).
'''
if (grad):
d = (upper-lower)*norm.pdf(z)
return d
else:
par = lower + (upper-lower)*norm.cdf(z)
# Avoid parameters approaching the boundaries
par[np.array([par[j] == upper[j] for j in range(len(par))])] -= 1e-10
par[np.array([par[j] == lower[j] for j in range(len(par))])] += 1e-10
return par
[docs]def prior_loglike(par, m0, sd0):
'''
Calculate the log likelihood of the parameters in z-space based on the
prior distributions. The prior is a uniform distribution in parameter space
which corresponds to the product of normal distributions for each value in
z-space.
Args:
* **par** (:class:`~numpy.ndarray`): Array of parameter values in
z-space - size (q,).
* **m0** (:py:class:`float`): Mean of prior normal distribution on
z. Default is 0.
* **sd0** (:py:class:`float`): Standard deviation of prior normal
distribution on z. Default is 1.
Returns:
* **prior** (:py:class:`float`): Value of prior distribution given
current z-values.
'''
return -0.5*(sd0**(-2))*np.inner(par-m0, par-m0)
[docs]def log_post(y, x, BG, Calc, paramList, z, lower,
upper, scale, tau_y, m0, sd0):
'''
Calculate the log of the posterior probabilities for a set of parameters.
The posterior is defined as a log-likelihood of the product of normal
distributions multiplied by the prior. For definition of the prior,
see :meth:`~prior_loglike`.
Args:
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,).
* **x** (:class:`~numpy.ndarray`): Vector of 2-theta values - size
(n,).
* **BG** (:class:`~numpy.ndarray`): Vector of background intensity
values - size (n,).
* **Calc** (:class:`~.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
* **paramList** (:py:class:`list`): List of parameter names for
refinement - size (q,).
* **z** (:class:`~numpy.ndarray`): Current parameter values in
z-space - size (q,).
* **lower** (:class:`~numpy.ndarray`): Vector of lower limits on a
uniform prior distribution in the parameter space - size (q,).
* **upper** (:class:`~numpy.ndarray`): Vector of upper limits on a
uniform prior distribution in the parameter space - size (q,).
* **scale** (:class:`~numpy.ndarray`): Vector that scales with the
intensity of data, heteroscedastic. See function
:meth:`~initialize_intensity_weight`
* **tau_y** (:py:class:`float`): Model precision. Default initial
valus is 1.
* **m0** (:py:class:`float`): Mean of prior normal distribution on z.
Default is 0.
* **sd0** (:py:class:`float`): Standard deviation of prior normal
distribution on z. Default is 1.
Returns:
* **posterior** (:py:class:`float`): Value of the prior times
likelihood given current z-space candidate values.
'''
params = z2par(z=z, lower=lower, upper=upper)
# Update the calculator to reflect the current parameter estimates
Calc.UpdateParameters(dict(zip(paramList, params)))
# Calculate residuals
R = y-BG-Calc.Calculate()
# Calculate weighted sum of squares error
S = np.inner(R/np.sqrt(scale), R/np.sqrt(scale))
# Add log-prior and log-likelihood values
logpost = 0.5*tau_y*S - prior_loglike(par=z, m0=m0, sd0=sd0)
return (-1)*logpost
[docs]def calculate_bsplinebasis(x, L):
'''
Calculate a B-spline basis for the 2-theta values.
Args:
* **x** (:class:`~numpy.ndarray`): Vector of 2-theta values
- size (n,).
* **L** (:py:class:`int`): Number of cubic B-spline basis functions to
model the background intensity. Default is 20.
Returns:
* **B** (:class:`~numpy.ndarray`): B-spline basis for data
- size (n, L)
'''
# Calculate a B-spline basis for the range of x
unique_knots = np.percentile(a=x, q=np.linspace(0, 100, num=(L - 2)))
knots = augknt(unique_knots, 3)
objB = Bspline(knots, order=3)
B = objB.collmat(x)
return B
[docs]def diffraction_file_data(x, y, Calc):
'''
Extract the intensity values (y) and 2-theta angles (x) from the
underlying GPX file or set the user-defined data values.
Args:
* **x** (:class:`~numpy.ndarray`): Predefined vector of 2-theta
values, if it exists.
* **y** (:class:`~numpy.ndarray`): Predefined vector of diffraction
pattern intensities, if it exists.
* **Calc** (:class:`~.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
Returns:
* 2-tuple containing the diffraction data. Tuple entries are
#. **x** (:class:`~numpy.ndarray`): Vector of 2-theta values
- size (n,).
#. **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,).
'''
# Assign the intensity vector (y) from the GPX file, if necessary
if y is None:
Index = np.where((Calc._tth > Calc._lowerLimit) &
(Calc._tth < Calc._upperLimit) == True) # noqa: E712
y = np.array(Calc._Histograms[list(Calc._Histograms.keys())[0]]['Data'][1][Index], copy=True) # noqa: E501
# Assign the grid of angles (x) from the GPX file, if no values are
# provided.
if x is None:
x = np.array(Calc._tthsample, copy=True)
# If values are provided externally, overwrite the _tthsample parameter
else:
# Update Calc internally for remainder of script
Calc._tthsample = np.array(x, copy=True)
return x, y
[docs]def smooth_ydata(x, y):
'''
Smooth diffraction data intensities at 2-theta values with
Locally Weighted Scatterplot Smoothing (lowess) function from statsmodels_.
.. _statsmodels: https://www.statsmodels.org/dev/generated/statsmodels
.nonparametric.smoothers_lowess.lowess.html
Args:
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,).
* **x** (:class:`~numpy.ndarray`): Vector of 2-theta values from
diffraction pattern- size (n,).
Returns:
* **y_sm** (:class:`~numpy.ndarray`): Vector of smoothed intensity
data - size (n,).
'''
# Smooth the observed Ys on the Xs, patch for negative or 0 values
y_sm = lowess(endog=y.reshape(y.size,), exog=x.reshape(x.size,),
frac=6.0/len(x), return_sorted=False)
y_sm = np.array([max(0, sm) for sm in y_sm])
return y_sm
[docs]def initialize_cov(initCov, q):
'''
If not previously defined, initialize the covariance for the proposal
distribution.
Args:
* **initCov** (:class:`~numpy.ndarray`): Pre-defined initial covariance
matrix in z-space - size (q, q).
* **q** (:py:class:`int`): Number of parameters.
Returns:
* **varS1** (:class:`~numpy.ndarray`): Covariance matrix in z-space
- size (q, q).
'''
if initCov is None:
varS1 = np.diag(0.05*np.ones(q))
elif initCov.shape == (q, q):
varS1 = initCov
else:
raise ValueError("Specification for initCov is not valid."
+ " Please provide a (%d x %d) matrix." % (q, q))
return varS1
def _initialize_output(iters, q, n_keep, L, update):
# Initialize output objects
all_Z = np.zeros((iters, q))
keep_params = np.zeros((n_keep, q))
keep_gamma = np.zeros((n_keep, L))
keep_b = np.zeros(n_keep)
keep_tau_y = np.zeros(n_keep)
keep_tau_b = np.zeros(n_keep)
accept_rate_S1 = np.zeros(n_keep//update)
accept_rate_S2 = np.zeros(n_keep//update)
return (all_Z, keep_params, keep_gamma, keep_b,
keep_tau_y, keep_tau_b, accept_rate_S1, accept_rate_S2)
[docs]def update_background(B, var_scale, tau_y, tau_b, L, Calc, y):
'''
Update the basis function loadings and then background values.
Args:
* **B** (:class:`~numpy.ndarray`): B-spline basis for 2-theta
range - size (n, L). See :meth:`~calculate_bsplinebasis`.
* **var_scale** (:class:`~numpy.ndarray`): Vector of scaling factors
corresponding to intensity data- size (n,).
See function :meth:`~initialize_intensity_weight`
* **tau_y** (:py:class:`float`): Model precision. See
:meth:`~updat_tauy`
* **tau_b** (:py:class:`float`): Loadings precision for background.
See :meth:`~update_taub`
* **L** (:py:class:`int`): Number of cubic B-spline basis functions to
model the background intensity. Default is 20.
* **Calc** (:class:`~.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensity data - size(nx1). See :meth:`~diffraction_file_data`.
Returns:
* 2-tuple containing the updated information for the background fit of
the data. Tuple entries are
#. **gamma** (:class:`~numpy.ndarray`): Vector of updated basis
loadings - size (L,)
#. **BG** (:class:`~numpy.ndarray`): Vector of updated background
intensity values - size (n,).
'''
BtB = np.matmul(np.transpose(B)/var_scale, B)
VV = np.linalg.inv(tau_y*BtB + tau_b*np.identity(L))
err = (y-Calc.Calculate())/var_scale
MM = np.matmul(VV, tau_y*np.sum(np.transpose(B)*err, axis=1))
gamma = np.random.multivariate_normal(mean=MM, cov=VV)
BG = np.matmul(B, gamma)
return gamma, BG
[docs]def stage1_acceptprob(z, varS1, y, x, BG, Calc, paramList, lower, upper,
var_scale, tau_y, m0, sd0):
'''
Calculate the acceptance probability for Stage 1 of the DRAM algorithm.
Args:
* **z** (:class:`~numpy.ndarray`): Vector of current parameter
values in z-space - size(q,).
* **varS1** (:class:`~numpy.ndarray`): Current covariance matrix
- size(q, q).
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,).
* **x** (:class:`~numpy.ndarray`): Vector of 2-theta values
- size (n,).
* **BG** (:class:`~numpy.ndarray`): Vector of background intensity
values - size (n,).
* **Calc** (:class:`~.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
* **paramList** (:py:class:`list`): List of parameter names for
refinement - size (q,).
* **lower** (:class:`~numpy.ndarray`): Vector of lower limits on a
uniform prior distribution in the parameter space - size (q,).
* **upper** (:class:`~numpy.ndarray`): Vector of upper limits on a
uniform prior distribution in the parameter space - size (q,).
* **var_scale** (:class:`~numpy.ndarray`): Vector that scales with the
intensity of data, heteroscedastic. See function
:meth:`~initialize_intensity_weight`
* **tau_y** (:py:class:`float`): Model precision. Default initial
valus is 1.
* **m0** (:py:class:`float`): Mean of prior normal distribution on z.
Default is 0.
* **sd0** (:py:class:`float`): Standard deviation of prior normal
distribution on z. Default is 1.
Returns:
* 4-tuple containing the stage 1 acceptance probability, candidates,
and respective log posterior values. Tuple entries are
#. **can_z1** (:class:`~numpy.ndarray`): Vector of candidate 1
parameter values in z-space - size(q,). The candidate is
constructed from a multivariate normal distribution with mean z
and covariance of varS1. :math:`can\\_z1 = N_q(z,varS1)`.
#. **can1_post** (:py:class:`float`): Value of the log posterior
probability for candidate 1 parameter values, can_z1. Computed with
:meth:`~log_post`.
#. **cur_post** (:py:class:`float`): Value of the log posterior
probability for the current parameter values, z. Computed with
:meth:`~log_post`.
#. **R1** (:py:class:`float`): Log of Stage 1 acceptance probability.
'''
# Draw a random candidate in z-space from a multivariate normal
can_z1 = np.random.multivariate_normal(mean=z, cov=varS1)
# Calculate the posterior probability of the candidate
can1_post = log_post(y=y, x=x, BG=BG, Calc=Calc, paramList=paramList,
z=can_z1, lower=lower, upper=upper, scale=var_scale,
tau_y=tau_y, m0=m0, sd0=sd0)
# Calculate the posterior probability of the current parameter values
# in z-space
cur_post = log_post(y=y, x=x, BG=BG, Calc=Calc, paramList=paramList,
z=z, lower=lower, upper=upper, scale=var_scale,
tau_y=tau_y, m0=m0, sd0=sd0)
# Calculate the acceptance probability
R1 = can1_post - cur_post
return can_z1, can1_post, cur_post, R1
[docs]def stage2_acceptprob(can1_post, can2_post, cur_post,
can_z1, can_z2, z, varS1):
'''
Calculate the acceptance probability for Stage 2 of the DRAM algorithm.
Args:
* **z** (:class:`~numpy.ndarray`): Vector of current parameter
values in z-space - size(q,).
* **cur_post** (:py:class:`float`): Value of the log posterior
probability for the current parameter values.
* **varS1** (:class:`~numpy.ndarray`): Current covariance matrix
- size(q, q).
* **can_z1** (:class:`~numpy.ndarray`): Vector of candidate 1 parameter
values in z-space - size(q,). The candidate is constructed from a
multivariate normal distribution with mean z and covariance of varS1.
:math:`can\\_z1 = N_q(z,varS1)`.
* **can_z2** (:class:`~numpy.ndarray`): Vector of candidate 2 parameter
values in z-space - size(q,). The candidate is constructed from a
multivariate normal distribution with mean z and covariance of
shrinkage*varS1. :math:`can\\_z2 = N_q(z,shrinkage*varS1)`
* **can1_post** (:py:class:`float`): Value of the log posterior
probability for candidate 1 parameter values from Stage 1. Computed
with :meth:`~log_post`.
* **can2_post** (:py:class:`float`): Value of the log posterior
probability for candidate 2 parameter values from Stage 2. Computed
with :meth:`~log_post`.
Returns:
* **R2** (:py:class:`float`): Log of Stage 2 acceptance probability.
'''
# Calculate the relative acceptance probabilities
inner_n = 1 - np.min([1, np.exp(can1_post - can2_post)])
inner_d = 1 - np.min([1, np.exp(can1_post - cur_post)])
# Adjust factors for inner_n and inner_d to avoid approaching
# the boundaries 0, 1
inner_n = inner_n + 1e-10 if inner_n == 0 else inner_n
inner_n = inner_n - 1e-10 if inner_n == 1 else inner_n
inner_d = inner_d + 1e-10 if inner_d == 0 else inner_d
inner_d = inner_d - 1e-10 if inner_d == 1 else inner_d
# Caclculate stage 2 acceptance probability
numer = (can2_post + mvnorm.logpdf(x=can_z1, mean=can_z2, cov=varS1)
+ np.log(inner_n))
denom = (cur_post + mvnorm.logpdf(x=can_z1, mean=z, cov=varS1)
+ np.log(inner_d))
R2 = numer - denom
return R2
[docs]def adapt_covariance(i, adapt, s_p, all_Z, epsilon, q, varS1):
'''
Adapt the covariance matrix at the given adaption interval.
Args:
* **i** (:py:class:`int`): Iteration number.
* **adapt** (:py:class:`int`): Adaption interval, user-defined.
Default is 20.
* **s_p** (:py:class:`float`): Scaling parameter for adapting the
covariance. Default is :math:`\\frac{2.4^2}{q}` where q is the size
of the parameter space.
* **all_Z** (:class:`~numpy.ndarray`): Storage of z-space samples for
each iteration - size(iters, q)
* **epsilon** (:py:class:`float`): Constant to prevent singularity of
adaptive covariance. Default is 0.0001.
* **q** (:py:class:`int`): Number of parameters.
* **varS1** (:class:`~numpy.ndarray`): Current covariance matrix
- size(q, q).
Returns:
* **varS1** (:class:`~numpy.ndarray`): Adapted covariance matrix
- size(q, q).
'''
if (0 < i) & (i % adapt == 0):
varS1 = (
s_p*np.cov(all_Z[range(i+1)].transpose())
+ s_p*epsilon*np.diag(np.ones(q))
)
else:
varS1 = varS1
return varS1
[docs]def update_taub(d_g, gamma, c_g, L):
'''
Update the background model precision.
Args:
* **d_g** (:py:class:`float`): Scale parameter for Gamma distribution
for the error in the prior distribution for the basis function
loadings. Default is 0.1.
* **gamma** (:class:`~numpy.ndarray`): Basis function loadings
- size (L,).
* **c_g** (:py:class:`float`): Shape parameter for Gamma distribution
for the error in the prior distribution for the basis function
loadings. Default is 0.1.
* **L** (:py:class:`int`): Number of cubic B-spline basis functions to
model the background intensity. Default is 20.
Returns:
* **tau_b** (:py:class:`float`): Loadings precision for background.
'''
rate = d_g + 0.5*np.inner(gamma, gamma)
tau_b = np.random.gamma(shape=(c_g + 0.5*L), scale=1/rate)
return tau_b
[docs]def update_tauy(y, BG, Calc, var_scale, d_y, c_y, n):
'''
Update the model precision.
Args:
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,).
* **BG** (:class:`~numpy.ndarray`): Vector of background intensity
values - size (n,).
* **Calc** (:class:`~.Calculator`): calculator operator that interacts
with the designated GPX file by referencing GSAS-II libraries.
* **var_scale** (:class:`~numpy.ndarray`): Vector that scales with the
intensity of data, heteroscedastic.
See function :meth:`~initialize_intensity_weight`
* **d_y** (:py:class:`float`): Scale parameter for Gamma distribution
of the error variance. Default is 0.1
* **c_y** (:py:class:`float`): Shape parameter for Gamma distribution
of the error variance.
* **n** (:py:class:`int`): Number data points
(equivalent to the length of intensities vector, y).
Returns:
* **tau_y** (:py:class:`float`): Updated model precision.
'''
err = (y-BG-Calc.Calculate())/np.sqrt(var_scale)
rate = d_y + 0.5*np.inner(err, err)
tau_y = np.random.gamma(shape=(c_y + 0.5*n), scale=1/rate)
return tau_y
def _print_update(curr_keep, update, n_keep, accept_S1, attempt_S1, accept_S2,
attempt_S2, accept_rate_S1, accept_rate_S2):
# Print an update if necessary
print("Collected %d of %d samples" % (curr_keep, n_keep))
print(' %03.2f acceptance rate for Stage 1 (%d attempts)'
% (accept_S1/attempt_S1, attempt_S1))
if attempt_S2 > 0:
rate_S2 = accept_S2/attempt_S2
else:
rate_S2 = 0
print(' %03.2f acceptance rate for Stage 2 (%d attempts)'
% (rate_S2, attempt_S2))
accept_rate_S1[(curr_keep//update)-1] = accept_S1/attempt_S1
accept_rate_S2[(curr_keep//update)-1] = rate_S2
return accept_rate_S1, accept_rate_S2
[docs]def traceplots(plot, q, keep_params, curr_keep, paramList, n_keep, update,
path):
'''
Produce traceplots of sampling at the update intervals. Plot in the console
window and save final traceplot in the current folder.
'''
if plot is True:
plt.figure(1, figsize=(20, 12))
plt.subplots_adjust(wspace=0.8)
for index in range(q):
plt.subplot(2, np.ceil(q/2.0).astype('int'), index+1)
plt.plot(keep_params[range(curr_keep), index], 'k')
plt.xlabel("Iteration")
plt.ylabel(paramList[index])
if ((n_keep-curr_keep) < update):
plt.savefig(path + '/DRAM_Trace.png')
plt.pause(0.1)
def _check_parameter_specification(Calc, paramList):
# Get indices of parameters to refine, even if they are "fixed" by bounds
useInd = [np.asscalar(np.where(np.array(Calc._varyList) == par)[0])
for par in paramList]
if (any(np.array(Calc._varyList)[useInd] != paramList)):
raise ValueError("Parameter list specification is not valid.")
def _check_parameter_initialization(paramList, init_z):
# Make sure initial z values are given for every parameter in paramList
if len(paramList) != len(init_z):
raise ValueError("Initial value specification for Z is not valid.")
[docs]def initialize_intensity_weight(x, y, scaling_factor=1):
'''
Define a heteroscedastic vector to scale the residuals between the model
and the data with the intensity of the smoothed data. The smoothed
intensity data, y_sm, is obtained from :meth:`smooth_ydata`.
Args:
* **y** (:class:`~numpy.ndarray`): Vector of diffraction pattern
intensities - size (n,).
* **x** (:class:`~numpy.ndarray`): Vector of 2-theta values from
diffraction pattern- size (n,).
* **scaling factor** (:py:class:`float`): Contribution of smoothed
intensity data. Default is 1.
Returns:
* **var_scale** (:class:`~numpy.ndarray`): Vector of scaling factors
corresponding to intensity data - size (n,).
'''
y_sm = smooth_ydata(x=x, y=y)
var_scale = scaling_factor*y_sm + 1 # Scale for y_sm/tau_y
return var_scale
# MCMC function
[docs]def sample(GPXfile, paramList, variables, start, lower, upper, path,
initCov=None, y=None, x=None, L=20, shrinkage=0.2,
s_p=(2.4**2), epsilon=1e-4, m0=0, sd0=1, c_y=0.1, d_y=0.1,
c_g=0.1, d_g=0.1, c_b=0.1, d_b=0.1, adapt=20, thin=1,
iters=5000, burn=2000, update=500, plot=True, fix=False):
'''
Args:
* **GPXfile** (:py:class:`str`):
Filepath for the GPX file underlying the current data
* **paramList** (:py:class:`list`):
(q) list of GSASII parameter names in the same order
as the upper and lower limits being provided
* **variables** (:class:`~numpy.ndarray`):
(q,) vector of parameter names that matches 'paramList'
* **start** (:py:class:`list`): List of initial parameter values
in parameter space - size (q)
* **lower** (:class:`~numpy.ndarray`):
(q,) vector of lower bounds for the parameter values
* **upper** (:class:`~numpy.ndarray`):
(q,) vector of upper bounds for the parameter values
* **path** (:py:class:`str`): File path to folder where results and
final plots from run will be saved. Folder must be created before
calling.
Kwargs:
* **initCov** (:class:`~numpy.ndarray`) - `None`: (q, q) matrix to
be used as the covariance matrix for the proposal distribution,
default value is None. Covariance matrix can be specified with the
estimate covariance function. If there is no matrix specified, the
function will use a diagonal matrix with 0.05 on the diagonal.
* **y** (:class:`~numpy.ndarray`) - `None`:
(n,) vector of intensities. If no values
are specified, the function uses the values from the provided GPX
file
* **x** (:class:`~numpy.ndarray`) - `None`:
(n,) vector of angles (2*theta). If no
values are specified, the function uses the values from the provided
GPX file
* **L** (:py:class:`float`) - `20`:
number of B-spline basis functions to
model the background intensity.
* **shrinkage** (:py:class:`float`) - `0.2`:
Governs covariance change between proposal stages,
default is 0.2
* **s_p** (:py:class:`float`): :math:`\\frac{2.4^2}{q}`
Scaling parameter for the adaptive covariance, default is
set to :math:`\\frac{2.4^2}{q}` as in Gelman (1995), where q is the
dimension of the parameter space
* **epsilon** (:py:class:`float`) - `0.0001`:
Ridge constant to prevent singularity of the adaptive
covariance.
* **m0** (:py:class:`float`) - `0`: Governs the mean value on the
latent z-space of the parameters.
* **sd0** (:py:class:`float`) - `1`: Governs the standard deviation
on the latent z-space of the parameters.
* **c_y** (:py:class:`float`) - 0.1: Shape parameter for Gamma
distribution of the error variance.
* **d_y** (:py:class:`float`) - 0.1: Scale parameter for Gamma
distribution of the error variance.
* **c_g** (:py:class:`float`) - 0.1: Shape parameter for Gamma
distribution for the error in the prior distribution for the
basis function loadings.
* **d_g** (:py:class:`float`) - 0.1: Scale parameter for Gamma
distribution for the error in the prior distribution for the
basis function loadings.
* **c_b** (:py:class:`float`) - 0.1: Shape parameter for Gamma
distribution for scale of the proportional constribution to the
error variance.
* **d_b** (:py:class:`float`) - 0.1: Scale parameter for Gamma
distribution for scale of the proportional constribution to the
error variance.
* **adapt** (:py:class:`float`): `20`: Controls the adaptation period.
* **thin** (:py:class:`float`) - `1`: Degree of thinning.
* **iters** (:py:class:`float`) - `5000`: Number of total iterations
to run.
* **burn** (:py:class:`float`) - `2000`: Number of samples to consider
as burn-in.
* **update** (:py:class:`float`) - `500`: Period between updates
printed to the console.
* **plot** (:py:class:`bool`) - `True`: Indicator for whether or not to
create trace plots as the sampler progresses.
Returns:
* dictionary containing the posterior samples for the parameters and
the model timing, dict entries are
#. **param_samples** (:class:`~numpy.ndarray`): Matrix of posterior
samples for the mean process parameters of interest - (nSamples, q)
#. **number_samples** (:py:class:`int`): Number of samples kept.
Equivalent to (iters - burn) if thin=1.
#. **final_covariance** (:class:`~numpy.ndarray`): Final adapated
covariance matrix - size(q, q).
#. **model_variance** (:class:`~numpy.ndarray`): Vector of posterior
samples for the overall model variance - size(nSamples,)
#. **gamma_samples** (:class:`~numpy.ndarray`): Matrix of posterior
samples for the basis function loadings modeling the background
intensity - (nSamples, L)
#. **run_time** (:py:class:`float`): Number of minutes the sampler
took to complete.
#. **stage1_accept** (:class:`~numpy.ndarray`): Acceptance rate of
stage 1 DRAM - size(n_keep//update,).
#. **stage2_accept** (:class:`~numpy.ndarray`): Acceptance rate of
stage 2 DRAM - size(n_keep//update).
'''
# Calculate starting values in z-space and check the lower and upper bounds
init_z = start2z(start=start, lower=lower, upper=upper)
Calc = gsas_calculator(GPXfile=GPXfile)
Calc._varyList = variables
# Set the scaling parameter based on the number of parameters
s_p = ((2.4**2)/len(paramList))
# Assign the intensity vector (y) and 2-theta angles (x) from the GPX file
# if no values are provided
x, y = diffraction_file_data(x=x, y=y, Calc=Calc)
# Calculate a B-spline basis for the range of x
B = calculate_bsplinebasis(x=x, L=L)
# Save dimensions
n = len(y) # Number of data observations
q = len(init_z) # Number of parameters of interest
# Smooth the observed intensities over the 2-theta data points with lowess
var_scale = initialize_intensity_weight(x=x, y=y)
_check_parameter_specification(Calc=Calc, paramList=paramList)
_check_parameter_initialization(paramList=paramList, init_z=init_z)
# Initialize parameter values
z = np.array(init_z, copy=True) # Latent process
params = z2par(z=init_z,
lower=lower,
upper=upper) # Parameters of interest
tau_y = 1 # Error variance for Y
gamma = np.ones(L) # Loadings
tau_b = 1 # Variance for loadings
BG = np.matmul(B, gamma) # Background intensity
# Update the parameters in the GSAS calculator
Calc.UpdateParameters(dict(zip(paramList, params)))
# Initialize covariance for proposal distribution
varS1 = initialize_cov(initCov=initCov, q=q)
# Set up counters for the parameters of interest
# Attempts / acceptances counters
attempt_S1 = attempt_S2 = accept_S1 = accept_S2 = 0
# Calculate the number of thinned samples to keep
n_keep = np.floor_divide(iters - burn - 1, thin) + 1
curr_keep = 0
# Initialize output objects
(all_Z, keep_params, keep_gamma, keep_b, keep_tau_y, keep_tau_b,
accept_rate_S1, accept_rate_S2) = _initialize_output(
iters=iters, q=q, n_keep=n_keep, L=L, update=update)
# Begin 2-stage Delayed Rejection Adaptive Metropolis
tick = timer()
for i in range(iters):
# Update basis function loadings and then background values
gamma, BG = update_background(B, var_scale, tau_y,
tau_b, L, Calc, y)
attempt_S1 += 1
# Stage 1 DRAM:
can_z1, can1_post, cur_post, R1 = stage1_acceptprob(
z=z, varS1=varS1, y=y, x=x, BG=BG, Calc=Calc,
paramList=paramList, lower=lower, upper=upper,
var_scale=var_scale, tau_y=tau_y, m0=m0, sd0=sd0)
# Accept candidate if acceptance probability is greater than a random
# draw and located away from the bounds
if (
(np.log(np.random.uniform()) < R1) &
(np.sum(np.abs(can_z1) > 3) == 0)
):
accept_S1 += 1
z = np.array(can_z1, copy=True) # Update latent
params = z2par(z=z, lower=lower, upper=upper) # Update parameters
Calc.UpdateParameters(dict(zip(paramList, params)))
# Continue to Stage 2 if candidate is rejected
else:
# Stage 2:
attempt_S2 += 1
# Propose the candidate
can_z2 = np.random.multivariate_normal(mean=z, cov=shrinkage*varS1)
if np.sum(np.abs(can_z2) > 3) == 0: # Ensures away from the bounds
can2_post = log_post(
y=y, x=x, BG=BG, Calc=Calc,
paramList=paramList, z=can_z2,
lower=lower, upper=upper, scale=var_scale,
tau_y=tau_y, m0=m0, sd0=sd0)
# Calculate the acceptance probability
R2 = stage2_acceptprob(
can1_post=can1_post,
can2_post=can2_post,
cur_post=cur_post,
can_z1=can_z1,
can_z2=can_z2,
z=z, varS1=varS1)
# Accept the candidate acceptance probability is greater than
# a random draw, otherwise reject
if np.log(np.random.uniform()) < R2:
accept_S2 += 1
z = np.array(can_z2, copy=True)
params = z2par(z=z, lower=lower, upper=upper)
Calc.UpdateParameters(dict(zip(paramList, params)))
del can_z2, can2_post, R2
else:
del can_z2
del can_z1, can1_post, cur_post, R1
# Store accepted candidates
all_Z[i] = z
# Adapt the proposal distribution covariance matrix
varS1 = adapt_covariance(i=i, adapt=adapt, s_p=s_p, all_Z=all_Z,
epsilon=epsilon, q=q, varS1=varS1)
# Update tau_b
tau_b = update_taub(d_g=d_g, gamma=gamma, c_g=c_g, L=L)
# Update tau_y
tau_y = update_tauy(
y=y, BG=BG, Calc=Calc,
var_scale=var_scale, d_y=d_y, c_y=c_y, n=n)
# Keep track of everything
if i >= burn:
# Store posterior draws if appropriate
if (i-burn) % thin == 0:
keep_params[curr_keep] = params
keep_gamma[curr_keep] = gamma
# keep_b[curr_keep] = b
keep_tau_y[curr_keep] = tau_y
keep_tau_b[curr_keep] = tau_b
curr_keep += 1
if curr_keep % update == 0:
# Print an update if necessary
accept_rate_S1, accpet_rate_S2 = _print_update(
curr_keep=curr_keep, update=update, n_keep=n_keep,
accept_S1=accept_S1, attempt_S1=attempt_S1,
accept_S2=accept_S2, attempt_S2=attempt_S2,
accept_rate_S1=accept_rate_S1,
accept_rate_S2=accept_rate_S2)
# Produce trace plots
traceplots(plot=plot, q=q, keep_params=keep_params,
curr_keep=curr_keep, paramList=paramList,
n_keep=n_keep, update=update, path=path)
tock = timer()
# Gather output into a dictionary
output = dict(param_samples=keep_params, number_samples=curr_keep,
final_covariance=varS1, model_variance=1.0/keep_tau_y,
gamma_samples=keep_gamma, run_time=(tock-tick)/60,
stage1_accept=accept_rate_S1, stage2_accept=accept_rate_S2)
return output
[docs]def run_summary(results, start, paramList, path):
'''
Print information to the console about the DRAM run after the sampling is
completed.
Args:
* **results** (:py:class:`dict`): Array of parameter values in
z-space - size (q,).
* **start** (:py:class:`list`): List of initial parameter values
in parameter space - size (q).
* **paramList** (:py:class:`list`): List of parameter names for
refinement - size (q).
* **path** (:py:class:`str`): File path to folder where results and
final plots from run will be saved.
'''
mins = results["run_time"]
params = results["param_samples"]
post_param_mean = np.mean(params, axis=0)
# Print true versus estimated values for the mean process parameters
print('Mean parameter estimates:')
for q in range(len(start)):
print(paramList[q] + ' Rietveld: %03.4f, QUAD: %03.4f' % (start[q],
post_param_mean[q]))
# Print run time
print("Model Time: %03.2f minutes (DRAM)" % (mins))
# Plot parameter posterior distributions
plt.figure(2, figsize=(25, 20))
plt.subplots_adjust(wspace=0.65)
sns.set_context("talk")
for index in range(0, len(start)):
plt.subplot(3, np.ceil((len(start)+1)/3.0).astype('int'), index+1)
sns.histplot(params[:, index], kde=True)
plt.xlabel(paramList[index])
plt.ylabel('Posterior Probability Density')
plt.savefig(path + '/PosteriorDensities')
[docs]def save_results(results, start, lower, upper, paramList, init_cov, path):
'''
Save DRAM results to individual files in selected file path. Also save the
input values of prior bounds, parameter starting values, and initial
proposal covariance. A text file of a list of the refined parameter names
is saved to the indicated file path.
Args:
* **results** (:py:class:`dict`): Array of parameter values in
z-space - size (q,).
* **start** (:py:class:`list`): List of initial parameter values
in parameter space - size (q).
* **lower** (:class:`~numpy.ndarray`): Vector of lower limits on a
uniform prior distribution in the parameter space - size (q,).
* **upper** (:class:`~numpy.ndarray`): Vector of upper limits on a
uniform prior distribution in the parameter space - size (q,).
* **paramList** (:py:class:`list`): List of parameter names for
refinement - size (q).
* **initCov** (:class:`~numpy.ndarray`): (q, q) matrix used as the
initial covariance matrix for the proposal distribution. Covariance
matrix can be specified with the estimate covariance function.
* **path** (:py:class:`str`): File path to folder where results and
final plots from run will be saved.
'''
mins = results["run_time"]
params = results["param_samples"]
post_param_mean = np.mean(params, axis=0)
# Save results to output folder
np.savetxt(path + '/parameter_samples', params)
np.savetxt(path + '/final_covariance', results["final_covariance"])
np.savetxt(path + '/initial_proposal_covariance', init_cov)
np.savetxt(path + '/gamma_samples', results["gamma_samples"])
np.savetxt(path + '/model_variance', results["model_variance"])
np.savetxt(path + '/run_time_mins', np.array([mins]))
np.savetxt(path + '/posterior_parameter_means', post_param_mean)
np.savetxt(path + '/parameter_starting_values', start)
np.savetxt(path + '/lower_prior_bounds', lower)
np.savetxt(path + '/upper_prior_bounds', upper)
np.savetxt(path + '/Stage1_acceptance', results["stage1_accept"])
np.savetxt(path + '/Stage2_acceptance', results["stage2_accept"])
# Save list of refined parameter names to text file
with open(path + '/parameter_list.txt', 'w') as outfile:
for item in paramList:
outfile.write("%s\n" % item)