import numpy as np
from hexrd.matrixutil import findDuplicateVectors
from hexrd.fitting import fitpeak
[docs]class PowderCalibrator(object):
def __init__(self, instr, plane_data, img_dict, flags,
tth_tol=None, eta_tol=0.25,
pktype='pvoigt'):
assert list(instr.detectors.keys()) == list(img_dict.keys()), \
"instrument and image dict must have the same keys"
self._instr = instr
self._plane_data = plane_data.makeNew() # need a copy to munge
self._plane_data.wavelength = self._instr.beam_energy # force
self._img_dict = img_dict
self._params = np.asarray(self.plane_data.lparms, dtype=float)
self._full_params = np.hstack(
[self._instr.calibration_parameters, self._params]
)
assert len(flags) == len(self._full_params), \
"flags must have %d elements" % len(self._full_params)
self._flags = flags
# for polar interpolation
self._tth_tol = tth_tol or np.degrees(plane_data.tThWidth)
self._eta_tol = eta_tol
# for peak fitting
# ??? fitting only, or do alternative peak detection?
self._pktype = pktype
@property
def npi(self):
return len(self._instr.calibration_parameters)
@property
def instr(self):
return self._instr
@property
def plane_data(self):
self._plane_data.wavelength = self._instr.beam_energy
return self._plane_data
@property
def img_dict(self):
return self._img_dict
@property
def tth_tol(self):
return self._tth_tol
@tth_tol.setter
def tth_tol(self, x):
assert np.isscalar(x), "tth_tol must be a scalar value"
self._tth_tol = x
@property
def eta_tol(self):
return self._eta_tol
@eta_tol.setter
def eta_tol(self, x):
assert np.isscalar(x), "eta_tol must be a scalar value"
self._eta_tol = x
@property
def params(self):
return self._params
@params.setter
def params(self, x):
x = np.atleast_1d(x)
if len(x) != len(self.plane_data.lparms):
raise RuntimeError("params must have %d elements"
% len(self.plane_data.lparms))
self._params = x
self._plane_data.lparms = x
@property
def full_params(self):
return self._full_params
@property
def npe(self):
return len(self._params)
@property
def flags(self):
return self._flags
@flags.setter
def flags(self, x):
x = np.atleast_1d(x)
nparams_instr = len(self.instr.calibration_parameters)
nparams_extra = len(self.params)
nparams = nparams_instr + nparams_extra
if len(x) != nparams:
raise RuntimeError("flags must have %d elements" % nparams)
self._flags = np.asarrasy(x, dtype=bool)
self._instr.calibration_flags = self._flags[:nparams_instr]
@property
def pktype(self):
return self._pktype
@pktype.setter
def pktype(self, x):
"""
currently only 'gaussian', 'lorentzian, 'pvoigt' or 'split_pvoigt'
"""
assert x in ['gaussian', 'lorentzian', 'pvoigt', 'split_pvoigt'], \
"pktype '%s' not understood"
self._pktype = x
def _interpolate_images(self):
"""
returns the iterpolated powder line data from the images in img_dict
??? interpolation necessary?
"""
return self.instr.extract_line_positions(
self.plane_data, self.img_dict,
tth_tol=self.tth_tol, eta_tol=self.eta_tol,
npdiv=2, collapse_eta=False, collapse_tth=False,
do_interpolation=True)
def _extract_powder_lines(self, fit_tth_tol=None):
"""
return the RHS for the instrument DOF and image dict
The format is a dict over detectors, each containing
[index over ring sets]
[index over azimuthal patch]
[xy_meas, tth_meas, tth_ref, eta_ref]
FIXME: can not yet handle tth ranges with multiple peaks!
"""
if fit_tth_tol is None:
fit_tth_tol = self.tth_tol/4.
fit_tth_tol = np.radians(fit_tth_tol)
# ideal tth
wlen = self.instr.beam_wavelength
dsp_ideal = np.atleast_1d(self.plane_data.getPlaneSpacings())
hkls_ref = self.plane_data.hkls.T
dsp0 = []
hkls = []
for idx in self.plane_data.getMergedRanges()[0]:
if len(idx) > 1:
eqv, uidx = findDuplicateVectors(np.atleast_2d(dsp_ideal[idx]))
if len(uidx) > 1:
raise NotImplementedError("can not handle multipeak yet")
else:
# if here, only degenerate ring case
uidx = idx[0]
else:
uidx = idx[0]
dsp0.append(dsp_ideal[uidx])
hkls.append(hkls_ref[uidx])
powder_lines = self._interpolate_images()
# GRAND LOOP OVER PATCHES
rhs = dict.fromkeys(self.instr.detectors)
for det_key, panel in self.instr.detectors.items():
rhs[det_key] = []
for i_ring, ringset in enumerate(powder_lines[det_key]):
tmp = []
for angs, intensities in ringset:
tth_centers = np.average(
np.vstack([angs[0][:-1], angs[0][1:]]),
axis=0)
eta_ref = angs[1]
int1d = np.sum(np.array(intensities).squeeze(), axis=0)
# peak profile fitting
p0 = fitpeak.estimate_pk_parms_1d(
tth_centers, int1d, self.pktype
)
p = fitpeak.fit_pk_parms_1d(
p0, tth_centers, int1d, self.pktype
)
# !!! this is where we can kick out bunk fits
tth_meas = p[1]
tth_pred = 2.*np.arcsin(0.5*wlen/dsp0[i_ring])
center_err = abs(tth_meas - tth_pred)
if p[0] < 0.01 or center_err > fit_tth_tol:
tmp.append(np.empty((0, 5)))
continue
xy_meas = panel.angles_to_cart([[tth_meas, eta_ref], ])
# distortion
if panel.distortion is not None:
xy_meas = panel.distortion.apply_inverse(xy_meas)
# cat results
tmp.append(
np.hstack(
[xy_meas.squeeze(),
tth_meas,
hkls[i_ring],
dsp0[i_ring],
eta_ref]
)
)
pass
if len(tmp) == 0:
rhs[det_key].append(np.empty((0, 5)))
else:
rhs[det_key].append(np.vstack(tmp))
pass
pass
return rhs
def _evaluate(self, reduced_params, data_dict, output='residual'):
"""
"""
# need this for dsp
bmat = self.plane_data.latVecOps['B']
# first update instrument from input parameters
full_params = np.asarray(self.full_params)
full_params[self.flags] = reduced_params
self.instr.update_from_parameter_list(full_params[:self.npi])
self.params = full_params[self.npi:]
wlen = self.instr.beam_wavelength
# build residual
retval = []
for det_key, panel in self.instr.detectors.items():
pdata = np.vstack(data_dict[det_key])
if len(pdata) > 0:
hkls = pdata[:, 3:6]
gvecs = np.dot(hkls, bmat.T)
dsp0 = 1./np.sqrt(np.sum(gvecs*gvecs, axis=1))
# dsp0 = pdata[:, -2]
eta0 = pdata[:, -1]
# derive reference tth
tth0 = 2.*np.arcsin(0.5*wlen/dsp0)
calc_xy = panel.angles_to_cart(np.vstack([tth0, eta0]).T)
# distortion if applicable, from ideal --> warped
if panel.distortion is not None:
calc_xy = panel.distortion.apply_inverse(calc_xy)
if output == 'residual':
retval.append(
(pdata[:, :2].flatten() - calc_xy.flatten())
)
elif output == 'model':
retval.append(
calc_xy.flatten()
)
else:
raise RuntimeError("unrecognized output flag '%s'"
% output)
else:
continue
return np.hstack(retval)
[docs] def residual(self, reduced_params, data_dict):
return self._evaluate(reduced_params, data_dict)
[docs] def model(self, reduced_params, data_dict):
return self._evaluate(reduced_params, data_dict, output='model')