Coverage for larch/xrf/xrf_model.py: 11%
494 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1from collections import namedtuple
2import time
3import json
4import numpy as np
5from numpy.linalg import lstsq
6from scipy.optimize import nnls
9from lmfit import Parameters, minimize, fit_report
11from xraydb import (material_mu, mu_elam, ck_probability, xray_edge,
12 xray_edges, xray_lines, xray_line)
13from xraydb.xray import XrayLine
15from .. import Group
16from ..math import index_of, interp, interp1d, savitzky_golay, hypermet, erfc
17from ..xafs import ftwindow
18from ..utils import group2dict, json_dump, json_load, gformat, unixpath
20xrf_prediction = namedtuple("xrf_prediction", ("weights", "total"))
21xrf_peak = namedtuple('xrf_peak', ('name', 'amplitude', 'center', 'step',
22 'tail', 'sigmax', 'beta', 'gamma',
23 'vary_center', 'vary_step', 'vary_tail',
24 'vary_sigmax', 'vary_beta', 'vary_gamma'))
26predict_methods = {'lstsq': lstsq, 'nnls': nnls}
28# Note on units: energies are in keV, lengths in cm
31# note on Fano Factors
32# efano = (energy to create e-h pair) * FanoFactor
33# material E-h excitation (eV) Fano Factor EFano (keV)
34# Si 3.66 0.115 0.000 4209
35# Ge 3.0 0.130 0.000 3900
36FanoFactors = {'Si': 0.4209e-3, 'Ge': 0.3900e-3}
38XRAY_EDGES = ('M5', 'M4', 'M3', 'M2', 'M1', 'L3', 'L2', 'L1', 'K')
40class XRF_Material:
41 def __init__(self, material='Si', thickness=0.050, density=None,
42 thickness_units='mm'):
43 self.material = material
44 self.density = density
45 self.thickness_units = thickness_units
46 self.thickness = thickness
47 self.mu_total = self.mu_photo = None
49 def calc_mu(self, energy):
50 "calculate mu for energy in keV"
51 # note material_mu works in eV!
52 self.mu_total = material_mu(self.material, 1000*energy,
53 density=self.density,
54 kind='total')
55 self.mu_photo = material_mu(self.material, 1000*energy,
56 density=self.density,
57 kind='photo')
59 def absorbance(self, energy, thickness=None, kind='total'):
60 """calculate absorbance (fraction absorbed)
62 Arguments
63 ----------
64 energy float or ndarray energy (keV) of X-ray
65 thicknesss float material thickness (cm)
67 Returns
68 -------
69 fraction of X-rays absorbed by material
70 """
71 if thickness is None:
72 thickness = self.thickness
73 if self.mu_total is None:
74 self.calc_mu(energy)
75 mu = self.mu_total
76 if kind == 'photo':
77 mu = self.mu_photo
79 t = 0.1*thickness
80 return (1.0 - np.exp(-t*mu))
82 def transmission(self, energy, thickness=None, kind='total'):
83 """calculate transmission (fraction transmitted through material)
85 Arguments
86 ---------
87 energy float or ndarray energy (keV) of X-ray
88 thicknesss float material thickness (cm)
90 Returns
91 -------
92 fraction of X-rays transmitted through material
93 """
95 if thickness is None:
96 thickness = self.thickness
97 if self.mu_total is None:
98 self.calc_mu(energy)
99 mu = self.mu_total
100 if kind == 'photo':
101 mu = self.mu_photo
103 t = 0.1*thickness
104 return np.exp(-t*mu)
107class XRF_Element:
108 def __init__(self, symbol, xray_energy=10, energy_min=1.5):
109 """get Xray emission lines for an element"""
110 self.symbol = symbol
112 # note: 'xray_energy' and 'energy_min' are in keV,
113 # most of this code works in eV!
114 if xray_energy is None: xray_energy = 30.0
115 if energy_min is None: energy_min = 1.5
116 self.xray_energy = xray_energy
117 self.energy_min = energy_min
119 en_xray_ev = xray_energy * 1000.0
120 en_low_ev = energy_min * 1000.0
121 self.mu = mu_elam(symbol, en_xray_ev, kind='photo')
122 self.edges = []
123 self.fyields = {edge: 0.0 for edge in XRAY_EDGES}
124 self.taus = {edge: 0.0 for edge in XRAY_EDGES}
125 jumps = {}
126 for ename, xedge in xray_edges(self.symbol).items():
127 if ename.lower() in ('k', 'l1', 'l2', 'l3', 'm5'):
128 if (xedge.energy < en_xray_ev and xedge.energy > en_low_ev):
129 self.edges.append(ename)
130 self.fyields[ename] = xedge.fyield
131 jumps[ename] = xedge.jump_ratio
133 # tau is sub-shell strength
134 for ename in XRAY_EDGES:
135 if ename in self.edges and ename in jumps:
136 jump = max(1.000001, jumps[ename])
137 self.taus[ename] = jump - 1.0
138 for key in self.taus:
139 self.taus[key] = self.taus[key]/jump
141 # look up X-ray lines, keep track of very close lines
142 # so that they can be consolidated
143 # slightly confusing (and working with XrayLine energies in ev)
144 self.lines = {}
145 all_lines = {}
146 en_max = 0
147 for ename in self.edges:
148 for key, xline in xray_lines(symbol, ename).items():
149 all_lines[key] = xline
150 en_max = max(en_max, xline.energy)
152 if en_max < 100:
153 en_max = en_xray_ev
154 overlap_energy = en_max*0.001 # should be around 1 to 10 eV.
156 dups = {}
157 for i, key in enumerate(all_lines):
158 xline = all_lines[key]
159 dup_found = False
160 for okey, oline in self.lines.items():
161 if (okey != key and ((xline.energy-oline.energy) < overlap_energy) and
162 xline.initial_level == oline.initial_level):
163 dups[key] = okey
164 if key not in dups:
165 self.lines[key] = xline
167 for key, dest in dups.items():
168 if dest in self.lines:
169 w1, w2 = all_lines[key].intensity, all_lines[dest].intensity
170 e1, e2 = all_lines[key].energy, all_lines[dest].energy
171 f1, f2 = all_lines[key].final_level, all_lines[dest].final_level
172 wt = w1+w2
173 en = (e1*w1 + e2*w2)/(wt)
175 newkey = "%s,%s" % (key, dest)
176 if key[:2] == dest[:2]:
177 newkey = "%s,%s" % (key, dest[2:])
179 flevel = "%s,%s" % (f1, f2)
180 if f1[0] == f2[0]:
181 flevel = "%s,%s" % (f1, f2[1:])
183 self.lines.pop(dest)
184 self.lines[newkey] = XrayLine(energy=en, intensity=wt,
185 initial_level=all_lines[key].initial_level,
186 final_level=flevel)
187 for k, d in dups.items():
188 if d == dest:
189 dups[k] = newkey
192class XRF_Model:
193 """model for X-ray fluorescence data
195 consists of parameterized components for
197 incident beam (energy, angle_in, angle_out)
198 matrix (list of material, thickness)
199 filters (list of material, thickness)
200 detector (material, thickness, step, tail, beta, gamma)
201 """
202 def __init__(self, xray_energy=None, energy_min=1.5, energy_max=30.,
203 count_time=1, bgr=None, iter_callback=None, **kws):
204 self.xray_energy = xray_energy
205 self.energy_min = energy_min
206 self.energy_max = energy_max
207 self.count_time = count_time
208 self.iter_callback = None
209 self.fit_in_progress = False
210 self.params = Parameters()
211 self.elements = []
212 self.scatter = []
213 self.comps = {}
214 self.eigenvalues = {}
215 self.transfer_matrix = None
216 self.matrix_layers = []
217 self.matrix = None
218 self.matrix_atten = 1.0
219 self.filters = []
220 self.det_atten = self.filt_atten = self.escape_amp = 0.0
221 self.mu_lines = {}
222 self.mu_energies = []
223 self.fit_iter = 0
224 self.fit_toler = 1.e-4
225 self.fit_step = 1.e-4
226 self.max_nfev = 1000
227 self.fit_log = False
228 self.bgr = None
229 self.use_pileup = False
230 self.use_escape = False
231 self.escape_scale = None
232 self.script = ''
233 self.mca = None
234 if bgr is not None:
235 self.add_background(bgr)
237 def set_detector(self, material='Si', thickness=0.40, noise=0.05,
238 peak_step=1e-3, peak_tail=0.01, peak_gamma=0,
239 peak_beta=0.5, cal_offset=0, cal_slope=10., cal_quad=0,
240 vary_thickness=False, vary_noise=True,
241 vary_peak_step=True, vary_peak_tail=True,
242 vary_peak_gamma=False, vary_peak_beta=False,
243 vary_cal_offset=True, vary_cal_slope=True,
244 vary_cal_quad=False):
245 """
246 set up detector material, calibration, and general settings for
247 the hypermet functions for the fluorescence and scatter peaks
248 """
249 self.detector = XRF_Material(material, thickness)
250 matname = material.title()
251 if matname not in FanoFactors:
252 matname = 'Si'
253 self.efano = FanoFactors[matname]
254 self.params.add('det_thickness', value=thickness, vary=vary_thickness, min=0)
255 self.params.add('det_noise', value=noise, vary=vary_noise, min=0)
256 self.params.add('cal_offset', value=cal_offset, vary=vary_cal_offset, min=-500, max=500)
257 self.params.add('cal_slope', value=cal_slope, vary=vary_cal_slope, min=0)
258 self.params.add('cal_quad', value=cal_quad, vary=vary_cal_quad)
259 self.params.add('peak_step', value=peak_step, vary=vary_peak_step, min=0, max=10)
260 self.params.add('peak_tail', value=peak_tail, vary=vary_peak_tail, min=0, max=10)
261 self.params.add('peak_beta', value=peak_beta, vary=vary_peak_beta, min=0)
262 self.params.add('peak_gamma', value=peak_gamma, vary=vary_peak_gamma, min=0)
264 def add_scatter_peak(self, name='elastic', amplitude=1000, center=None,
265 step=0.010, tail=0.5, sigmax=1.0, beta=0.5,
266 vary_center=True, vary_step=True, vary_tail=True,
267 vary_sigmax=True, vary_beta=False):
268 """add Rayleigh (elastic) or Compton (inelastic) scattering peak
269 """
270 if name not in self.scatter:
271 self.scatter.append(xrf_peak(name, amplitude, center, step, tail,
272 sigmax, beta, 0.0, vary_center, vary_step,
273 vary_tail, vary_sigmax, vary_beta, False))
275 if center is None:
276 center = self.xray_energy
278 self.params.add('%s_amp' % name, value=amplitude, vary=True, min=0)
279 self.params.add('%s_center' % name, value=center, vary=vary_center,
280 min=center*0.5, max=center*1.25)
281 self.params.add('%s_step' % name, value=step, vary=vary_step, min=0, max=10)
282 self.params.add('%s_tail' % name, value=tail, vary=vary_tail, min=0, max=20)
283 self.params.add('%s_beta' % name, value=beta, vary=vary_beta, min=0, max=20)
284 self.params.add('%s_sigmax' % name, value=sigmax, vary=vary_sigmax,
285 min=0, max=100)
287 def add_element(self, elem, amplitude=1.e6, vary_amplitude=True):
288 """add Element to XRF model
289 """
290 self.elements.append(XRF_Element(elem, xray_energy=self.xray_energy,
291 energy_min=self.energy_min))
292 self.params.add('amp_%s' % elem.lower(), value=amplitude,
293 vary=vary_amplitude, min=0)
295 def add_filter(self, material, thickness, density=None, vary_thickness=False):
296 self.filters.append(XRF_Material(material=material,
297 density=density,
298 thickness=thickness))
299 self.params.add('filterlen_%s' % material,
300 value=thickness, min=0, vary=vary_thickness)
302 def set_matrix(self, material, thickness, density=None):
303 self.matrix = XRF_Material(material=material, density=density,
304 thickness=thickness)
305 self.matrix_atten = 1.0
307 def add_background(self, data, vary=True):
308 self.bgr = data
309 self.params.add('background_amp', value=1.0, min=0, vary=vary)
311 def add_escape(self, scale=1.0, vary=True):
312 self.use_escape = True
313 self.params.add('escape_amp', value=scale, min=0, vary=vary)
315 def add_pileup(self, scale=1.0, vary=True):
316 self.use_pileup = True
317 self.params.add('pileup_amp', value=scale, min=0, vary=vary)
319 def clear_background(self):
320 self.bgr = None
321 self.params.pop('background_amp')
323 def calc_matrix_attenuation(self, energy):
324 """
325 calculate beam attenuation by a matrix built from layers
326 note that matrix layers and composition cannot be variable
327 so the calculation can be done once, ahead of time.
328 """
329 atten = 1.0
330 if self.matrix is not None:
331 ixray_en = index_of(energy, self.xray_energy)
332 # print("MATRIX ", ixray_en, self.matrix)
333 # layer_trans = self.matrix.transmission(energy) # transmission through layer
334 # incid_trans = layer_trans[ixray_en] # incident beam trans to lower layers
335 # ncid_absor = 1.0 - incid_trans # incident beam absorption by layer
336 # atten = layer_trans * incid_absor
337 self.matrix_atten = atten
339 def calc_escape_scale(self, energy, thickness=None):
340 """
341 calculate energy dependence of escape effect
343 X-rays penetrate a depth 1/mu(material, energy) and the
344 detector fluorescence escapes from that depth as
345 exp(-mu(material, KaEnergy)*thickness)
346 with a fluorecence yield of the material
348 """
349 det = self.detector
350 # note material_mu, xray_edge, xray_line work in eV!
351 escape_energy_ev = xray_line(det.material, 'Ka').energy
352 mu_emit = material_mu(det.material, escape_energy_ev)
353 self.escape_energy = 0.001 * escape_energy_ev
355 mu_input = material_mu(det.material, 1000*energy)
357 edge = xray_edge(det.material, 'K')
358 self.escape_scale = edge.fyield * np.exp(-mu_emit / (2*mu_input))
359 self.escape_scale[np.where(energy < 0.001*edge.energy)] = 0.0
361 def det_sigma(self, energy, noise=0):
362 """ energy width of peak """
363 return np.sqrt(self.efano*energy + noise**2)
365 def calc_spectrum(self, energy, params=None):
366 if params is None:
367 params = self.params
368 pars = params.valuesdict()
369 self.comps = {}
370 self.eigenvalues = {}
372 det_noise = pars['det_noise']
373 step = pars['peak_step']
374 tail = pars['peak_tail']
375 beta = pars['peak_beta']
376 gamma = pars['peak_gamma']
378 # escape: calc only if needed
379 if ((not self.fit_in_progress) or
380 self.params['det_thickness'].vary or
381 self.params['escape_amp'].vary):
382 if self.escape_scale is None:
383 self.calc_escape_scale(energy, thickness=pars['det_thickness'])
384 self.escape_amp = pars.get('escape_amp', 0.0) * self.escape_scale
385 if not self.use_escape:
386 self.escape_amp = 0
387 # detector attenuation: calc only if needed
388 if (not self.fit_in_progress) or self.params['det_thickness'].vary:
389 self.det_atten = self.detector.absorbance(energy, thickness=pars['det_thickness'])
390 # filter attenuation: calc only if needed
391 filt_pars = [self.params['filterlen_%s' % f.material] for f in self.filters]
392 if (not self.fit_in_progress) or any([f.vary for f in filt_pars]):
393 self.filt_atten = np.ones(len(energy))
394 for f in self.filters:
395 thickness = pars.get('filterlen_%s' % f.material, None)
396 if thickness is not None and int(thickness*1e6) > 1:
397 fx = f.transmission(energy, thickness=thickness)
398 self.filt_atten *= fx
400 self.atten = self.det_atten * self.filt_atten
401 # matrix corrections #1: get X-ray line energies, only if needed
402 if ((not self.fit_in_progress) or len(self.mu_energies)==0):
403 self.mu_lines = {}
404 self.mu_energies = []
405 for elem in self.elements:
406 for key, line in elem.lines.items():
407 self.mu_lines[f'{elem.symbol:s}_{key:s}'] = line.energy
408 self.mu_energies.append(line.energy)
410 self.mu_energies.append(1000*self.xray_energy)
411 self.mu_energies = np.array(self.mu_energies)
412 # print("Mu energies ", self.mu_energies, len(self.mu_energies))
413 # matrix
414 # if self.matrix_atten is None:
415 # self.calc_matrix_attenuation(energy)
416 # atten *= self.matrix_atten
418 nlines = 0
419 for elem in self.elements:
420 comp = 0. * energy
421 amp = pars.get('amp_%s' % elem.symbol.lower(), None)
422 if amp is None:
423 continue
424 for key, line in elem.lines.items():
425 ilevel = line.initial_level
426 ecen = 0.001*line.energy
427 nlines += 1
428 line_amp = (line.intensity * elem.mu *
429 elem.fyields[ilevel] * elem.taus[ilevel])
430 sigma = self.det_sigma(ecen, det_noise)
431 comp += hypermet(energy, amplitude=line_amp, center=ecen,
432 sigma=sigma, step=step, tail=tail,
433 beta=beta, gamma=gamma)
434 comp *= amp * self.atten * self.count_time
435 comp += self.escape_amp * interp1d(energy-self.escape_energy, comp, energy)
436 comp[np.where(np.isnan(comp))] = 0.0
437 self.comps[elem.symbol] = comp
438 self.eigenvalues[elem.symbol] = amp
440 # scatter peaks for Rayleigh and Compton
441 for peak in self.scatter:
442 p = peak.name
443 amp = pars.get('%s_amp' % p, None)
444 if amp is None:
445 continue
446 ecen = pars['%s_center' % p]
447 step = pars['%s_step' % p]
448 tail = pars['%s_tail' % p]
449 beta = pars['%s_beta' % p]
450 sigma = pars['%s_sigmax' % p]
451 sigma *= self.det_sigma(ecen, det_noise)
452 comp = hypermet(energy, amplitude=1.0, center=ecen,
453 sigma=sigma, step=step, tail=tail, beta=beta,
454 gamma=gamma)
455 comp *= amp * self.atten * self.count_time
456 comp += self.escape_amp * interp1d(energy-self.escape_energy, comp, energy)
457 comp[np.where(np.isnan(comp))] = 0.0
458 self.comps[p] = comp
459 self.eigenvalues[p] = amp
460 if self.bgr is not None:
461 bgr_amp = pars.get('background_amp', 0.0)
462 self.comps['background'] = bgr_amp * self.bgr
463 self.eigenvalues['background'] = bgr_amp
464 # calculate total spectrum
465 total = 0. * energy
466 for comp in self.comps.values():
467 total += comp
468 if self.use_pileup:
469 pamp = pars.get('pileup_amp', 0.0)
470 npts = len(energy)
471 pileup = pamp*1.e-9*np.convolve(total, total*1.0, 'full')[:npts]
472 self.comps['pileup'] = pileup
473 self.eigenvalues['pileup'] = pamp
474 total += pileup
475 # remove tiny values so that log plots are usable
476 floor = 1.e-10*max(total)
477 total[np.where(total<floor)] = floor
478 self.current_model = total
479 return total
481 def __resid(self, params, data, index):
482 pars = params.valuesdict()
483 self.best_en = (pars['cal_offset'] + pars['cal_slope'] * index +
484 pars['cal_quad'] * index**2)
485 self.fit_iter += 1
486 model = self.calc_spectrum(self.best_en, params=params)
487 if callable(self.iter_callback):
488 self.iter_callback(iter=self.fit_iter, pars=params)
489 return ((data - model) * self.fit_weight)[self.imin:self.imax]
491 def set_fit_weight(self, energy, counts, emin, emax, ewid=0.050):
492 """
493 set weighting factor to smoothed square-root of data
494 """
495 ewin = ftwindow(energy, xmin=emin, xmax=emax, dx=ewid, window='hanning')
496 self.fit_window = ewin
497 fit_wt = 0.1 + savitzky_golay( (counts+1.0)**(2/3.0), 15, 1)
498 self.fit_weight = 1.0/fit_wt
500 def fit_spectrum(self, mca, energy_min=None, energy_max=None,
501 fit_toler=None, fit_step=None, max_nfev=None):
502 if fit_toler is not None:
503 self.fit_toler = max(1.e-7, min(0.001, fit_toler))
504 if fit_step is not None:
505 self.fit_step = max(1.e-7, min(0.1, fit_step))
506 if max_nfev is not None:
507 self.max_nfev = max(200, min(12000, max_nfev))
509 self.mca = mca
510 work_energy = 1.0*mca.energy
511 work_counts = 1.0*mca.counts
512 floor = 1.e-10*np.percentile(work_counts, [99])[0]
513 work_counts[np.where(work_counts<floor)] = floor
515 if max(work_energy) > 250.0: # if input energies are in eV
516 work_energy /= 1000.0
518 imin, imax = 0, len(work_counts)
519 if energy_min is None:
520 energy_min = self.energy_min
521 if energy_min is not None:
522 imin = index_of(work_energy, energy_min)
523 if energy_max is None:
524 energy_max = self.energy_max
525 if energy_max is not None:
526 imax = index_of(work_energy, energy_max)
528 self.imin = max(0, imin-5)
529 self.imax = min(len(work_counts), imax+5)
530 self.npts = (self.imax - self.imin)
531 self.set_fit_weight(work_energy, work_counts, energy_min, energy_max)
532 self.fit_iter = 0
534 # reset attenuation calcs for matrix, detector, filters
535 self.matrix_atten = 1.0
536 self.escape_scale = None
537 self.detector.mu_total = None
538 for f in self.filters:
539 f.mu_total = None
541 self.fit_in_progress = False
542 self.init_fit = self.calc_spectrum(work_energy, params=self.params)
543 index = np.arange(len(work_counts))
544 userkws = dict(data=work_counts, index=index)
546 tol = self.fit_toler
547 self.fit_in_progress = True
548 self.result = minimize(self.__resid, self.params, kws=userkws,
549 method='leastsq', maxfev=self.max_nfev,
550 scale_covar=True, epsfcn=self.fit_step,
551 gtol=tol, ftol=tol, xtol=tol)
553 self.fit_report = fit_report(self.result, min_correl=0.5)
554 pars = self.result.params
555 self.fit_in_progress = False
557 self.best_en = (pars['cal_offset'] + pars['cal_slope'] * index +
558 pars['cal_quad'] * index**2)
559 self.fit_iter += 1
560 self.best_fit = self.calc_spectrum(work_energy, params=self.result.params)
562 # calculate transfer matrix for linear analysis using this model
563 tmat= []
564 for key, val in self.comps.items():
565 arr = val / self.eigenvalues[key]
566 floor = 1.e-12*max(arr)
567 arr[np.where(arr<floor)] = 0.0
568 tmat.append(arr)
569 self.transfer_matrix = np.array(tmat).transpose()
570 return self.get_fitresult()
572 def get_fitresult(self, label='XRF fit result', script='# no script supplied'):
573 """a simple compilation of fit settings results
574 to be able to easily save and inspect"""
575 out = XRFFitResult(label=label, script=script, mca=self.mca)
577 for attr in ('filename', 'label'):
578 setattr(out, 'mca' + attr, getattr(self.mca, attr, 'unknown'))
580 for attr in ('params', 'var_names', 'chisqr', 'redchi', 'nvarys',
581 'nfev', 'ndata', 'aic', 'bic', 'aborted', 'covar', 'ier',
582 'message', 'method', 'nfree', 'init_values', 'success',
583 'residual', 'errorbars', 'lmdif_message', 'nfree'):
584 setattr(out, attr, getattr(self.result, attr, None))
586 for attr in ('atten', 'best_en', 'best_fit', 'bgr', 'comps', 'count_time',
587 'eigenvalues', 'energy_max', 'energy_min', 'fit_iter', 'fit_log',
588 'fit_report', 'fit_toler', 'fit_weight', 'fit_window', 'init_fit',
589 'scatter', 'script', 'transfer_matrix', 'xray_energy'):
590 setattr(out, attr, getattr(self, attr, None))
592 elem_attrs = ('edges', 'fyields', 'lines', 'mu', 'symbol', 'xray_energy')
593 out.elements = []
594 for el in self.elements:
595 out.elements.append({attr: getattr(el, attr) for attr in elem_attrs})
597 mater_attrs = ('material', 'mu_photo', 'mu_total', 'thickness')
598 out.detector = {attr: getattr(self.detector, attr) for attr in mater_attrs}
599 out.matrix = None
600 if self.matrix is not None:
601 out.matrix = {attr: getattr(self.matrix, attr) for attr in mater_attrs}
602 out.filters = []
603 for ft in self.filters:
604 out.filters.append({attr: getattr(ft, attr) for attr in mater_attrs})
605 return out
607class XRFFitResult(Group):
608 """Result of XRF Fit"""
609 def __init__(self, label='xrf fit', filename=None, script='# No script',
610 mca=None, **kws):
611 kwargs = dict(label=label, filename=filename, script=script, mca=mca)
612 kwargs.update(kws)
613 Group.__init__(self, **kwargs)
615 def __repr__(self):
616 return 'XRFFitResult(%r, filename=%r)' % (self.label, self.filename)
618 def save(self, filename):
619 """save XRFFitResult result in a manner that can be loaded later"""
620 tmp = {}
621 for key, val in group2dict(self).items():
622 if key in ('__name__', '__repr__', 'save', 'load', 'export',
623 '_prep_decompose', 'decompose_mca', 'decompose_map'):
624 continue
625 if key == 'mca':
626 val = val.dump_mcafile()
627 tmp[key] = val
628 json_dump(tmp, unixpath(filename))
630 def load(self, filename):
631 from ..io import GSEMCA_File
632 for key, val in json_load(filename).items():
633 if key == 'mca':
634 val = GSEMCA_File(text=val)
635 setattr(self, key, val)
637 def export(self, filename):
638 """save result to text file"""
639 buff = ['# XRF Fit %s: %s' % (self.mca.label, self.label),
640 '#### Fit Script:']
641 for a in self.script.split('\n'):
642 buff.append('# %s' % a)
643 buff.append('#'*60)
644 buff.append('#### Fit Report:')
645 for a in self.fit_report.split('\n'):
646 buff.append('# %s' % a)
647 buff.append('#'*60)
648 labels = ['energy', 'counts', 'best_fit', 'best_energy', 'fit_window',
649 'fit_weight', 'attenuation']
650 labels.extend(list(self.comps.keys()))
651 buff.append('# %s' % (' '.join(labels)))
653 npts = len(self.mca.energy)
654 for i in range(npts):
655 dline = [gformat(self.mca.energy[i]),
656 gformat(self.mca.counts[i]),
657 gformat(self.best_fit[i]),
658 gformat(self.best_en[i]),
659 gformat(self.fit_window[i]),
660 gformat(self.fit_weight[i]),
661 gformat(self.atten[i])]
662 for c in self.comps.values():
663 dline.append(gformat(c[i]))
664 buff.append(' '.join(dline))
665 buff.append('\n')
666 with open(filename, 'w') as fh:
667 fh.write('\n'.join(buff))
669 def _prep_decompose(self, scale, count_time, method):
670 if self.transfer_matrix is None:
671 raise ValueError("XRFFitResult incomplete: need to fit a spectrum first")
672 fit_method = predict_methods.get(method, lstsq)
673 if count_time is None:
674 count_time = self.count_time
675 scale *= self.count_time / count_time
676 return fit_method, scale
678 def decompose_spectrum(self, counts, scale=1.0, count_time=None, method='lstsq'):
679 """
680 Apply XRFFitResult to another spectrum, decomposing it into elemenetal weights
682 Arguments:
683 ----------
684 counts MCA counts for spectrum, on the same energy grid as the fitted data.
685 scale scale factor to apply to output weights [1]
686 count_time count time in seconds [None - use count_time of fitted spectrum]
687 method decomposition method: one of `lstsq` for basic least-squares or
688 `nnls` for non-negative least-squares [`lstsq`]
690 Returns:
691 ---------
692 namedtuple of XRF prediction with elements:
693 weights dict of element: weights for all components used in the fit
694 total predicted total spectrum
695 """
696 method, scale = self._prep_decompose(scale, count_time, method)
697 results = method(self.transfer_matrix, counts*self.fit_window)
698 weights = {}
699 total = 0.0*counts
700 for i, name in enumerate(self.eigenvalues.keys()):
701 weights[name] = results[0][i] * scale
702 total += results[0][i] * self.transfer_matrix[:, i]
704 return xrf_prediction(weights, total)
706 def decompose_map(self, map, scale=1.0, pixel_time=1.0, method='lstsq',
707 nworkers=4):
708 """
709 Apply XRFFitResult to an XRF Map, decomposing it into maps of elemental weights
711 Arguments:
712 ----------
713 map XRF map array: [NY, NX, NMCA], on the same energy grid as the fitted data.
714 scale scale factor to apply to output weights [1]
715 pixel_time count time in seconds for each pixel [1.0]
716 method decomposition method: one of `lstsq` for basic least-squares or
717 `nnls` for non-negative least-squares [`lstsq`]
719 Returns:
720 ---------
721 dict of elements: weights maps (NY, NX) for all components used in the fit
722 """
723 method, scale = self._prep_decompose(scale, pixel_time, method)
724 ny, nx, nchan = map.shape
725 nchanx, ncomps = self.transfer_matrix.shape
726 nchanw = self.fit_window.shape[0]
727 if nchan != nchanx or nchan != nchanw:
728 raise ValueError("map data has wrong shape ", map.shape)
730 win = np.where(self.fit_window > 0)[0]
731 w0 = max(0, win[0]-100)
732 w1 = min(nchan-1, win[-1]+100)
734 xfer = self.transfer_matrix[w0:w1, :]
735 win = self.fit_window[w0:w1]
736 result = np.zeros((ny, nx, ncomps), dtype='float32')
738 def decomp_lstsq(i0, i1):
739 """very efficient lstsq"""
740 tmap = map[i0:i1, :, w0:w1].swapaxes(1, 2)
741 ny = tmap.shape[0]
742 win.shape = (win.shape[0], 1)
743 for iy in range(ny):
744 results = lstsq(xfer, win*tmap[iy])
745 for i in range(ncomps):
746 result[i0+iy,:,i] = results[0][i] * scale
748 def decomp_nnls(i0, i1):
749 """need to explicitly loop of nx as well as ny"""
750 tmap = map[i0:i1, :, w0:w1]
751 ny = tmap.shape[0]
752 for iy in range(ny):
753 for ix in range(nx):
754 results = nnls(xfer, win*tmap[iy,ix,:])
755 for i in range(ncomps):
756 result[i0+iy,ix,i] = results[0][i] * scale
758 decomp = decomp_lstsq
759 if method == nnls:
760 decomp = decomp_nnls
761 # if we're going to use up more than ~1Gb per lstsq, do it in chunks
762 if (ny*nx*(w1-w0) > 1e8):
763 nchunks = 1+int(1.e-8*ny*nx*(w1-w0))
764 ns = int(ny/nchunks)
765 for i in range(nchunks):
766 ilast = (i+1)*ns
767 if i == nchunks-1: ilast = ny
768 decomp(i*ns, ilast)
769 else:
770 decomp(0, ny)
771 return {name: result[:,:,i] for i, name in enumerate(self.eigenvalues.keys())}
773def xrf_model(xray_energy=None, energy_min=1500, energy_max=None, use_bgr=False, **kws):
774 """create an XRF Peak
776 Returns:
777 ---------
778 an XRF_Model instance
779 """
780 return XRF_Model(xray_energy=xray_energy, use_bgr=use_bgr,
781 energy_min=energy_min, energy_max=energy_max, **kws)
783def xrf_fitresult(save_file=None):
784 """create an XRF Fit Result, possibly restoring from saved file
786 Returns:
787 ---------
788 an XRFFitResult instance
789 """
791 out = XRFFitResult()
792 if save_file is not None:
793 out.load(save_file)
794 return out