Coverage for larch/xrf/xrf_model.py: 11%

494 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1from collections import namedtuple 

2import time 

3import json 

4import numpy as np 

5from numpy.linalg import lstsq 

6from scipy.optimize import nnls 

7 

8 

9from lmfit import Parameters, minimize, fit_report 

10 

11from xraydb import (material_mu, mu_elam, ck_probability, xray_edge, 

12 xray_edges, xray_lines, xray_line) 

13from xraydb.xray import XrayLine 

14 

15from .. import Group 

16from ..math import index_of, interp, interp1d, savitzky_golay, hypermet, erfc 

17from ..xafs import ftwindow 

18from ..utils import group2dict, json_dump, json_load, gformat, unixpath 

19 

20xrf_prediction = namedtuple("xrf_prediction", ("weights", "total")) 

21xrf_peak = namedtuple('xrf_peak', ('name', 'amplitude', 'center', 'step', 

22 'tail', 'sigmax', 'beta', 'gamma', 

23 'vary_center', 'vary_step', 'vary_tail', 

24 'vary_sigmax', 'vary_beta', 'vary_gamma')) 

25 

26predict_methods = {'lstsq': lstsq, 'nnls': nnls} 

27 

28# Note on units: energies are in keV, lengths in cm 

29 

30 

31# note on Fano Factors 

32# efano = (energy to create e-h pair) * FanoFactor 

33# material E-h excitation (eV) Fano Factor EFano (keV) 

34# Si 3.66 0.115 0.000 4209 

35# Ge 3.0 0.130 0.000 3900 

36FanoFactors = {'Si': 0.4209e-3, 'Ge': 0.3900e-3} 

37 

38XRAY_EDGES = ('M5', 'M4', 'M3', 'M2', 'M1', 'L3', 'L2', 'L1', 'K') 

39 

40class XRF_Material: 

41 def __init__(self, material='Si', thickness=0.050, density=None, 

42 thickness_units='mm'): 

43 self.material = material 

44 self.density = density 

45 self.thickness_units = thickness_units 

46 self.thickness = thickness 

47 self.mu_total = self.mu_photo = None 

48 

49 def calc_mu(self, energy): 

50 "calculate mu for energy in keV" 

51 # note material_mu works in eV! 

52 self.mu_total = material_mu(self.material, 1000*energy, 

53 density=self.density, 

54 kind='total') 

55 self.mu_photo = material_mu(self.material, 1000*energy, 

56 density=self.density, 

57 kind='photo') 

58 

59 def absorbance(self, energy, thickness=None, kind='total'): 

60 """calculate absorbance (fraction absorbed) 

61 

62 Arguments 

63 ---------- 

64 energy float or ndarray energy (keV) of X-ray 

65 thicknesss float material thickness (cm) 

66 

67 Returns 

68 ------- 

69 fraction of X-rays absorbed by material 

70 """ 

71 if thickness is None: 

72 thickness = self.thickness 

73 if self.mu_total is None: 

74 self.calc_mu(energy) 

75 mu = self.mu_total 

76 if kind == 'photo': 

77 mu = self.mu_photo 

78 

79 t = 0.1*thickness 

80 return (1.0 - np.exp(-t*mu)) 

81 

82 def transmission(self, energy, thickness=None, kind='total'): 

83 """calculate transmission (fraction transmitted through material) 

84 

85 Arguments 

86 --------- 

87 energy float or ndarray energy (keV) of X-ray 

88 thicknesss float material thickness (cm) 

89 

90 Returns 

91 ------- 

92 fraction of X-rays transmitted through material 

93 """ 

94 

95 if thickness is None: 

96 thickness = self.thickness 

97 if self.mu_total is None: 

98 self.calc_mu(energy) 

99 mu = self.mu_total 

100 if kind == 'photo': 

101 mu = self.mu_photo 

102 

103 t = 0.1*thickness 

104 return np.exp(-t*mu) 

105 

106 

107class XRF_Element: 

108 def __init__(self, symbol, xray_energy=10, energy_min=1.5): 

109 """get Xray emission lines for an element""" 

110 self.symbol = symbol 

111 

112 # note: 'xray_energy' and 'energy_min' are in keV, 

113 # most of this code works in eV! 

114 if xray_energy is None: xray_energy = 30.0 

115 if energy_min is None: energy_min = 1.5 

116 self.xray_energy = xray_energy 

117 self.energy_min = energy_min 

118 

119 en_xray_ev = xray_energy * 1000.0 

120 en_low_ev = energy_min * 1000.0 

121 self.mu = mu_elam(symbol, en_xray_ev, kind='photo') 

122 self.edges = [] 

123 self.fyields = {edge: 0.0 for edge in XRAY_EDGES} 

124 self.taus = {edge: 0.0 for edge in XRAY_EDGES} 

125 jumps = {} 

126 for ename, xedge in xray_edges(self.symbol).items(): 

127 if ename.lower() in ('k', 'l1', 'l2', 'l3', 'm5'): 

128 if (xedge.energy < en_xray_ev and xedge.energy > en_low_ev): 

129 self.edges.append(ename) 

130 self.fyields[ename] = xedge.fyield 

131 jumps[ename] = xedge.jump_ratio 

132 

133 # tau is sub-shell strength 

134 for ename in XRAY_EDGES: 

135 if ename in self.edges and ename in jumps: 

136 jump = max(1.000001, jumps[ename]) 

137 self.taus[ename] = jump - 1.0 

138 for key in self.taus: 

139 self.taus[key] = self.taus[key]/jump 

140 

141 # look up X-ray lines, keep track of very close lines 

142 # so that they can be consolidated 

143 # slightly confusing (and working with XrayLine energies in ev) 

144 self.lines = {} 

145 all_lines = {} 

146 en_max = 0 

147 for ename in self.edges: 

148 for key, xline in xray_lines(symbol, ename).items(): 

149 all_lines[key] = xline 

150 en_max = max(en_max, xline.energy) 

151 

152 if en_max < 100: 

153 en_max = en_xray_ev 

154 overlap_energy = en_max*0.001 # should be around 1 to 10 eV. 

155 

156 dups = {} 

157 for i, key in enumerate(all_lines): 

158 xline = all_lines[key] 

159 dup_found = False 

160 for okey, oline in self.lines.items(): 

161 if (okey != key and ((xline.energy-oline.energy) < overlap_energy) and 

162 xline.initial_level == oline.initial_level): 

163 dups[key] = okey 

164 if key not in dups: 

165 self.lines[key] = xline 

166 

167 for key, dest in dups.items(): 

168 if dest in self.lines: 

169 w1, w2 = all_lines[key].intensity, all_lines[dest].intensity 

170 e1, e2 = all_lines[key].energy, all_lines[dest].energy 

171 f1, f2 = all_lines[key].final_level, all_lines[dest].final_level 

172 wt = w1+w2 

173 en = (e1*w1 + e2*w2)/(wt) 

174 

175 newkey = "%s,%s" % (key, dest) 

176 if key[:2] == dest[:2]: 

177 newkey = "%s,%s" % (key, dest[2:]) 

178 

179 flevel = "%s,%s" % (f1, f2) 

180 if f1[0] == f2[0]: 

181 flevel = "%s,%s" % (f1, f2[1:]) 

182 

183 self.lines.pop(dest) 

184 self.lines[newkey] = XrayLine(energy=en, intensity=wt, 

185 initial_level=all_lines[key].initial_level, 

186 final_level=flevel) 

187 for k, d in dups.items(): 

188 if d == dest: 

189 dups[k] = newkey 

190 

191 

192class XRF_Model: 

193 """model for X-ray fluorescence data 

194 

195 consists of parameterized components for 

196 

197 incident beam (energy, angle_in, angle_out) 

198 matrix (list of material, thickness) 

199 filters (list of material, thickness) 

200 detector (material, thickness, step, tail, beta, gamma) 

201 """ 

202 def __init__(self, xray_energy=None, energy_min=1.5, energy_max=30., 

203 count_time=1, bgr=None, iter_callback=None, **kws): 

204 self.xray_energy = xray_energy 

205 self.energy_min = energy_min 

206 self.energy_max = energy_max 

207 self.count_time = count_time 

208 self.iter_callback = None 

209 self.fit_in_progress = False 

210 self.params = Parameters() 

211 self.elements = [] 

212 self.scatter = [] 

213 self.comps = {} 

214 self.eigenvalues = {} 

215 self.transfer_matrix = None 

216 self.matrix_layers = [] 

217 self.matrix = None 

218 self.matrix_atten = 1.0 

219 self.filters = [] 

220 self.det_atten = self.filt_atten = self.escape_amp = 0.0 

221 self.mu_lines = {} 

222 self.mu_energies = [] 

223 self.fit_iter = 0 

224 self.fit_toler = 1.e-4 

225 self.fit_step = 1.e-4 

226 self.max_nfev = 1000 

227 self.fit_log = False 

228 self.bgr = None 

229 self.use_pileup = False 

230 self.use_escape = False 

231 self.escape_scale = None 

232 self.script = '' 

233 self.mca = None 

234 if bgr is not None: 

235 self.add_background(bgr) 

236 

237 def set_detector(self, material='Si', thickness=0.40, noise=0.05, 

238 peak_step=1e-3, peak_tail=0.01, peak_gamma=0, 

239 peak_beta=0.5, cal_offset=0, cal_slope=10., cal_quad=0, 

240 vary_thickness=False, vary_noise=True, 

241 vary_peak_step=True, vary_peak_tail=True, 

242 vary_peak_gamma=False, vary_peak_beta=False, 

243 vary_cal_offset=True, vary_cal_slope=True, 

244 vary_cal_quad=False): 

245 """ 

246 set up detector material, calibration, and general settings for 

247 the hypermet functions for the fluorescence and scatter peaks 

248 """ 

249 self.detector = XRF_Material(material, thickness) 

250 matname = material.title() 

251 if matname not in FanoFactors: 

252 matname = 'Si' 

253 self.efano = FanoFactors[matname] 

254 self.params.add('det_thickness', value=thickness, vary=vary_thickness, min=0) 

255 self.params.add('det_noise', value=noise, vary=vary_noise, min=0) 

256 self.params.add('cal_offset', value=cal_offset, vary=vary_cal_offset, min=-500, max=500) 

257 self.params.add('cal_slope', value=cal_slope, vary=vary_cal_slope, min=0) 

258 self.params.add('cal_quad', value=cal_quad, vary=vary_cal_quad) 

259 self.params.add('peak_step', value=peak_step, vary=vary_peak_step, min=0, max=10) 

260 self.params.add('peak_tail', value=peak_tail, vary=vary_peak_tail, min=0, max=10) 

261 self.params.add('peak_beta', value=peak_beta, vary=vary_peak_beta, min=0) 

262 self.params.add('peak_gamma', value=peak_gamma, vary=vary_peak_gamma, min=0) 

263 

264 def add_scatter_peak(self, name='elastic', amplitude=1000, center=None, 

265 step=0.010, tail=0.5, sigmax=1.0, beta=0.5, 

266 vary_center=True, vary_step=True, vary_tail=True, 

267 vary_sigmax=True, vary_beta=False): 

268 """add Rayleigh (elastic) or Compton (inelastic) scattering peak 

269 """ 

270 if name not in self.scatter: 

271 self.scatter.append(xrf_peak(name, amplitude, center, step, tail, 

272 sigmax, beta, 0.0, vary_center, vary_step, 

273 vary_tail, vary_sigmax, vary_beta, False)) 

274 

275 if center is None: 

276 center = self.xray_energy 

277 

278 self.params.add('%s_amp' % name, value=amplitude, vary=True, min=0) 

279 self.params.add('%s_center' % name, value=center, vary=vary_center, 

280 min=center*0.5, max=center*1.25) 

281 self.params.add('%s_step' % name, value=step, vary=vary_step, min=0, max=10) 

282 self.params.add('%s_tail' % name, value=tail, vary=vary_tail, min=0, max=20) 

283 self.params.add('%s_beta' % name, value=beta, vary=vary_beta, min=0, max=20) 

284 self.params.add('%s_sigmax' % name, value=sigmax, vary=vary_sigmax, 

285 min=0, max=100) 

286 

287 def add_element(self, elem, amplitude=1.e6, vary_amplitude=True): 

288 """add Element to XRF model 

289 """ 

290 self.elements.append(XRF_Element(elem, xray_energy=self.xray_energy, 

291 energy_min=self.energy_min)) 

292 self.params.add('amp_%s' % elem.lower(), value=amplitude, 

293 vary=vary_amplitude, min=0) 

294 

295 def add_filter(self, material, thickness, density=None, vary_thickness=False): 

296 self.filters.append(XRF_Material(material=material, 

297 density=density, 

298 thickness=thickness)) 

299 self.params.add('filterlen_%s' % material, 

300 value=thickness, min=0, vary=vary_thickness) 

301 

302 def set_matrix(self, material, thickness, density=None): 

303 self.matrix = XRF_Material(material=material, density=density, 

304 thickness=thickness) 

305 self.matrix_atten = 1.0 

306 

307 def add_background(self, data, vary=True): 

308 self.bgr = data 

309 self.params.add('background_amp', value=1.0, min=0, vary=vary) 

310 

311 def add_escape(self, scale=1.0, vary=True): 

312 self.use_escape = True 

313 self.params.add('escape_amp', value=scale, min=0, vary=vary) 

314 

315 def add_pileup(self, scale=1.0, vary=True): 

316 self.use_pileup = True 

317 self.params.add('pileup_amp', value=scale, min=0, vary=vary) 

318 

319 def clear_background(self): 

320 self.bgr = None 

321 self.params.pop('background_amp') 

322 

323 def calc_matrix_attenuation(self, energy): 

324 """ 

325 calculate beam attenuation by a matrix built from layers 

326 note that matrix layers and composition cannot be variable 

327 so the calculation can be done once, ahead of time. 

328 """ 

329 atten = 1.0 

330 if self.matrix is not None: 

331 ixray_en = index_of(energy, self.xray_energy) 

332 # print("MATRIX ", ixray_en, self.matrix) 

333 # layer_trans = self.matrix.transmission(energy) # transmission through layer 

334 # incid_trans = layer_trans[ixray_en] # incident beam trans to lower layers 

335 # ncid_absor = 1.0 - incid_trans # incident beam absorption by layer 

336 # atten = layer_trans * incid_absor 

337 self.matrix_atten = atten 

338 

339 def calc_escape_scale(self, energy, thickness=None): 

340 """ 

341 calculate energy dependence of escape effect 

342 

343 X-rays penetrate a depth 1/mu(material, energy) and the 

344 detector fluorescence escapes from that depth as 

345 exp(-mu(material, KaEnergy)*thickness) 

346 with a fluorecence yield of the material 

347 

348 """ 

349 det = self.detector 

350 # note material_mu, xray_edge, xray_line work in eV! 

351 escape_energy_ev = xray_line(det.material, 'Ka').energy 

352 mu_emit = material_mu(det.material, escape_energy_ev) 

353 self.escape_energy = 0.001 * escape_energy_ev 

354 

355 mu_input = material_mu(det.material, 1000*energy) 

356 

357 edge = xray_edge(det.material, 'K') 

358 self.escape_scale = edge.fyield * np.exp(-mu_emit / (2*mu_input)) 

359 self.escape_scale[np.where(energy < 0.001*edge.energy)] = 0.0 

360 

361 def det_sigma(self, energy, noise=0): 

362 """ energy width of peak """ 

363 return np.sqrt(self.efano*energy + noise**2) 

364 

365 def calc_spectrum(self, energy, params=None): 

366 if params is None: 

367 params = self.params 

368 pars = params.valuesdict() 

369 self.comps = {} 

370 self.eigenvalues = {} 

371 

372 det_noise = pars['det_noise'] 

373 step = pars['peak_step'] 

374 tail = pars['peak_tail'] 

375 beta = pars['peak_beta'] 

376 gamma = pars['peak_gamma'] 

377 

378 # escape: calc only if needed 

379 if ((not self.fit_in_progress) or 

380 self.params['det_thickness'].vary or 

381 self.params['escape_amp'].vary): 

382 if self.escape_scale is None: 

383 self.calc_escape_scale(energy, thickness=pars['det_thickness']) 

384 self.escape_amp = pars.get('escape_amp', 0.0) * self.escape_scale 

385 if not self.use_escape: 

386 self.escape_amp = 0 

387 # detector attenuation: calc only if needed 

388 if (not self.fit_in_progress) or self.params['det_thickness'].vary: 

389 self.det_atten = self.detector.absorbance(energy, thickness=pars['det_thickness']) 

390 # filter attenuation: calc only if needed 

391 filt_pars = [self.params['filterlen_%s' % f.material] for f in self.filters] 

392 if (not self.fit_in_progress) or any([f.vary for f in filt_pars]): 

393 self.filt_atten = np.ones(len(energy)) 

394 for f in self.filters: 

395 thickness = pars.get('filterlen_%s' % f.material, None) 

396 if thickness is not None and int(thickness*1e6) > 1: 

397 fx = f.transmission(energy, thickness=thickness) 

398 self.filt_atten *= fx 

399 

400 self.atten = self.det_atten * self.filt_atten 

401 # matrix corrections #1: get X-ray line energies, only if needed 

402 if ((not self.fit_in_progress) or len(self.mu_energies)==0): 

403 self.mu_lines = {} 

404 self.mu_energies = [] 

405 for elem in self.elements: 

406 for key, line in elem.lines.items(): 

407 self.mu_lines[f'{elem.symbol:s}_{key:s}'] = line.energy 

408 self.mu_energies.append(line.energy) 

409 

410 self.mu_energies.append(1000*self.xray_energy) 

411 self.mu_energies = np.array(self.mu_energies) 

412 # print("Mu energies ", self.mu_energies, len(self.mu_energies)) 

413 # matrix 

414 # if self.matrix_atten is None: 

415 # self.calc_matrix_attenuation(energy) 

416 # atten *= self.matrix_atten 

417 

418 nlines = 0 

419 for elem in self.elements: 

420 comp = 0. * energy 

421 amp = pars.get('amp_%s' % elem.symbol.lower(), None) 

422 if amp is None: 

423 continue 

424 for key, line in elem.lines.items(): 

425 ilevel = line.initial_level 

426 ecen = 0.001*line.energy 

427 nlines += 1 

428 line_amp = (line.intensity * elem.mu * 

429 elem.fyields[ilevel] * elem.taus[ilevel]) 

430 sigma = self.det_sigma(ecen, det_noise) 

431 comp += hypermet(energy, amplitude=line_amp, center=ecen, 

432 sigma=sigma, step=step, tail=tail, 

433 beta=beta, gamma=gamma) 

434 comp *= amp * self.atten * self.count_time 

435 comp += self.escape_amp * interp1d(energy-self.escape_energy, comp, energy) 

436 comp[np.where(np.isnan(comp))] = 0.0 

437 self.comps[elem.symbol] = comp 

438 self.eigenvalues[elem.symbol] = amp 

439 

440 # scatter peaks for Rayleigh and Compton 

441 for peak in self.scatter: 

442 p = peak.name 

443 amp = pars.get('%s_amp' % p, None) 

444 if amp is None: 

445 continue 

446 ecen = pars['%s_center' % p] 

447 step = pars['%s_step' % p] 

448 tail = pars['%s_tail' % p] 

449 beta = pars['%s_beta' % p] 

450 sigma = pars['%s_sigmax' % p] 

451 sigma *= self.det_sigma(ecen, det_noise) 

452 comp = hypermet(energy, amplitude=1.0, center=ecen, 

453 sigma=sigma, step=step, tail=tail, beta=beta, 

454 gamma=gamma) 

455 comp *= amp * self.atten * self.count_time 

456 comp += self.escape_amp * interp1d(energy-self.escape_energy, comp, energy) 

457 comp[np.where(np.isnan(comp))] = 0.0 

458 self.comps[p] = comp 

459 self.eigenvalues[p] = amp 

460 if self.bgr is not None: 

461 bgr_amp = pars.get('background_amp', 0.0) 

462 self.comps['background'] = bgr_amp * self.bgr 

463 self.eigenvalues['background'] = bgr_amp 

464 # calculate total spectrum 

465 total = 0. * energy 

466 for comp in self.comps.values(): 

467 total += comp 

468 if self.use_pileup: 

469 pamp = pars.get('pileup_amp', 0.0) 

470 npts = len(energy) 

471 pileup = pamp*1.e-9*np.convolve(total, total*1.0, 'full')[:npts] 

472 self.comps['pileup'] = pileup 

473 self.eigenvalues['pileup'] = pamp 

474 total += pileup 

475 # remove tiny values so that log plots are usable 

476 floor = 1.e-10*max(total) 

477 total[np.where(total<floor)] = floor 

478 self.current_model = total 

479 return total 

480 

481 def __resid(self, params, data, index): 

482 pars = params.valuesdict() 

483 self.best_en = (pars['cal_offset'] + pars['cal_slope'] * index + 

484 pars['cal_quad'] * index**2) 

485 self.fit_iter += 1 

486 model = self.calc_spectrum(self.best_en, params=params) 

487 if callable(self.iter_callback): 

488 self.iter_callback(iter=self.fit_iter, pars=params) 

489 return ((data - model) * self.fit_weight)[self.imin:self.imax] 

490 

491 def set_fit_weight(self, energy, counts, emin, emax, ewid=0.050): 

492 """ 

493 set weighting factor to smoothed square-root of data 

494 """ 

495 ewin = ftwindow(energy, xmin=emin, xmax=emax, dx=ewid, window='hanning') 

496 self.fit_window = ewin 

497 fit_wt = 0.1 + savitzky_golay( (counts+1.0)**(2/3.0), 15, 1) 

498 self.fit_weight = 1.0/fit_wt 

499 

500 def fit_spectrum(self, mca, energy_min=None, energy_max=None, 

501 fit_toler=None, fit_step=None, max_nfev=None): 

502 if fit_toler is not None: 

503 self.fit_toler = max(1.e-7, min(0.001, fit_toler)) 

504 if fit_step is not None: 

505 self.fit_step = max(1.e-7, min(0.1, fit_step)) 

506 if max_nfev is not None: 

507 self.max_nfev = max(200, min(12000, max_nfev)) 

508 

509 self.mca = mca 

510 work_energy = 1.0*mca.energy 

511 work_counts = 1.0*mca.counts 

512 floor = 1.e-10*np.percentile(work_counts, [99])[0] 

513 work_counts[np.where(work_counts<floor)] = floor 

514 

515 if max(work_energy) > 250.0: # if input energies are in eV 

516 work_energy /= 1000.0 

517 

518 imin, imax = 0, len(work_counts) 

519 if energy_min is None: 

520 energy_min = self.energy_min 

521 if energy_min is not None: 

522 imin = index_of(work_energy, energy_min) 

523 if energy_max is None: 

524 energy_max = self.energy_max 

525 if energy_max is not None: 

526 imax = index_of(work_energy, energy_max) 

527 

528 self.imin = max(0, imin-5) 

529 self.imax = min(len(work_counts), imax+5) 

530 self.npts = (self.imax - self.imin) 

531 self.set_fit_weight(work_energy, work_counts, energy_min, energy_max) 

532 self.fit_iter = 0 

533 

534 # reset attenuation calcs for matrix, detector, filters 

535 self.matrix_atten = 1.0 

536 self.escape_scale = None 

537 self.detector.mu_total = None 

538 for f in self.filters: 

539 f.mu_total = None 

540 

541 self.fit_in_progress = False 

542 self.init_fit = self.calc_spectrum(work_energy, params=self.params) 

543 index = np.arange(len(work_counts)) 

544 userkws = dict(data=work_counts, index=index) 

545 

546 tol = self.fit_toler 

547 self.fit_in_progress = True 

548 self.result = minimize(self.__resid, self.params, kws=userkws, 

549 method='leastsq', maxfev=self.max_nfev, 

550 scale_covar=True, epsfcn=self.fit_step, 

551 gtol=tol, ftol=tol, xtol=tol) 

552 

553 self.fit_report = fit_report(self.result, min_correl=0.5) 

554 pars = self.result.params 

555 self.fit_in_progress = False 

556 

557 self.best_en = (pars['cal_offset'] + pars['cal_slope'] * index + 

558 pars['cal_quad'] * index**2) 

559 self.fit_iter += 1 

560 self.best_fit = self.calc_spectrum(work_energy, params=self.result.params) 

561 

562 # calculate transfer matrix for linear analysis using this model 

563 tmat= [] 

564 for key, val in self.comps.items(): 

565 arr = val / self.eigenvalues[key] 

566 floor = 1.e-12*max(arr) 

567 arr[np.where(arr<floor)] = 0.0 

568 tmat.append(arr) 

569 self.transfer_matrix = np.array(tmat).transpose() 

570 return self.get_fitresult() 

571 

572 def get_fitresult(self, label='XRF fit result', script='# no script supplied'): 

573 """a simple compilation of fit settings results 

574 to be able to easily save and inspect""" 

575 out = XRFFitResult(label=label, script=script, mca=self.mca) 

576 

577 for attr in ('filename', 'label'): 

578 setattr(out, 'mca' + attr, getattr(self.mca, attr, 'unknown')) 

579 

580 for attr in ('params', 'var_names', 'chisqr', 'redchi', 'nvarys', 

581 'nfev', 'ndata', 'aic', 'bic', 'aborted', 'covar', 'ier', 

582 'message', 'method', 'nfree', 'init_values', 'success', 

583 'residual', 'errorbars', 'lmdif_message', 'nfree'): 

584 setattr(out, attr, getattr(self.result, attr, None)) 

585 

586 for attr in ('atten', 'best_en', 'best_fit', 'bgr', 'comps', 'count_time', 

587 'eigenvalues', 'energy_max', 'energy_min', 'fit_iter', 'fit_log', 

588 'fit_report', 'fit_toler', 'fit_weight', 'fit_window', 'init_fit', 

589 'scatter', 'script', 'transfer_matrix', 'xray_energy'): 

590 setattr(out, attr, getattr(self, attr, None)) 

591 

592 elem_attrs = ('edges', 'fyields', 'lines', 'mu', 'symbol', 'xray_energy') 

593 out.elements = [] 

594 for el in self.elements: 

595 out.elements.append({attr: getattr(el, attr) for attr in elem_attrs}) 

596 

597 mater_attrs = ('material', 'mu_photo', 'mu_total', 'thickness') 

598 out.detector = {attr: getattr(self.detector, attr) for attr in mater_attrs} 

599 out.matrix = None 

600 if self.matrix is not None: 

601 out.matrix = {attr: getattr(self.matrix, attr) for attr in mater_attrs} 

602 out.filters = [] 

603 for ft in self.filters: 

604 out.filters.append({attr: getattr(ft, attr) for attr in mater_attrs}) 

605 return out 

606 

607class XRFFitResult(Group): 

608 """Result of XRF Fit""" 

609 def __init__(self, label='xrf fit', filename=None, script='# No script', 

610 mca=None, **kws): 

611 kwargs = dict(label=label, filename=filename, script=script, mca=mca) 

612 kwargs.update(kws) 

613 Group.__init__(self, **kwargs) 

614 

615 def __repr__(self): 

616 return 'XRFFitResult(%r, filename=%r)' % (self.label, self.filename) 

617 

618 def save(self, filename): 

619 """save XRFFitResult result in a manner that can be loaded later""" 

620 tmp = {} 

621 for key, val in group2dict(self).items(): 

622 if key in ('__name__', '__repr__', 'save', 'load', 'export', 

623 '_prep_decompose', 'decompose_mca', 'decompose_map'): 

624 continue 

625 if key == 'mca': 

626 val = val.dump_mcafile() 

627 tmp[key] = val 

628 json_dump(tmp, unixpath(filename)) 

629 

630 def load(self, filename): 

631 from ..io import GSEMCA_File 

632 for key, val in json_load(filename).items(): 

633 if key == 'mca': 

634 val = GSEMCA_File(text=val) 

635 setattr(self, key, val) 

636 

637 def export(self, filename): 

638 """save result to text file""" 

639 buff = ['# XRF Fit %s: %s' % (self.mca.label, self.label), 

640 '#### Fit Script:'] 

641 for a in self.script.split('\n'): 

642 buff.append('# %s' % a) 

643 buff.append('#'*60) 

644 buff.append('#### Fit Report:') 

645 for a in self.fit_report.split('\n'): 

646 buff.append('# %s' % a) 

647 buff.append('#'*60) 

648 labels = ['energy', 'counts', 'best_fit', 'best_energy', 'fit_window', 

649 'fit_weight', 'attenuation'] 

650 labels.extend(list(self.comps.keys())) 

651 buff.append('# %s' % (' '.join(labels))) 

652 

653 npts = len(self.mca.energy) 

654 for i in range(npts): 

655 dline = [gformat(self.mca.energy[i]), 

656 gformat(self.mca.counts[i]), 

657 gformat(self.best_fit[i]), 

658 gformat(self.best_en[i]), 

659 gformat(self.fit_window[i]), 

660 gformat(self.fit_weight[i]), 

661 gformat(self.atten[i])] 

662 for c in self.comps.values(): 

663 dline.append(gformat(c[i])) 

664 buff.append(' '.join(dline)) 

665 buff.append('\n') 

666 with open(filename, 'w') as fh: 

667 fh.write('\n'.join(buff)) 

668 

669 def _prep_decompose(self, scale, count_time, method): 

670 if self.transfer_matrix is None: 

671 raise ValueError("XRFFitResult incomplete: need to fit a spectrum first") 

672 fit_method = predict_methods.get(method, lstsq) 

673 if count_time is None: 

674 count_time = self.count_time 

675 scale *= self.count_time / count_time 

676 return fit_method, scale 

677 

678 def decompose_spectrum(self, counts, scale=1.0, count_time=None, method='lstsq'): 

679 """ 

680 Apply XRFFitResult to another spectrum, decomposing it into elemenetal weights 

681 

682 Arguments: 

683 ---------- 

684 counts MCA counts for spectrum, on the same energy grid as the fitted data. 

685 scale scale factor to apply to output weights [1] 

686 count_time count time in seconds [None - use count_time of fitted spectrum] 

687 method decomposition method: one of `lstsq` for basic least-squares or 

688 `nnls` for non-negative least-squares [`lstsq`] 

689 

690 Returns: 

691 --------- 

692 namedtuple of XRF prediction with elements: 

693 weights dict of element: weights for all components used in the fit 

694 total predicted total spectrum 

695 """ 

696 method, scale = self._prep_decompose(scale, count_time, method) 

697 results = method(self.transfer_matrix, counts*self.fit_window) 

698 weights = {} 

699 total = 0.0*counts 

700 for i, name in enumerate(self.eigenvalues.keys()): 

701 weights[name] = results[0][i] * scale 

702 total += results[0][i] * self.transfer_matrix[:, i] 

703 

704 return xrf_prediction(weights, total) 

705 

706 def decompose_map(self, map, scale=1.0, pixel_time=1.0, method='lstsq', 

707 nworkers=4): 

708 """ 

709 Apply XRFFitResult to an XRF Map, decomposing it into maps of elemental weights 

710 

711 Arguments: 

712 ---------- 

713 map XRF map array: [NY, NX, NMCA], on the same energy grid as the fitted data. 

714 scale scale factor to apply to output weights [1] 

715 pixel_time count time in seconds for each pixel [1.0] 

716 method decomposition method: one of `lstsq` for basic least-squares or 

717 `nnls` for non-negative least-squares [`lstsq`] 

718 

719 Returns: 

720 --------- 

721 dict of elements: weights maps (NY, NX) for all components used in the fit 

722 """ 

723 method, scale = self._prep_decompose(scale, pixel_time, method) 

724 ny, nx, nchan = map.shape 

725 nchanx, ncomps = self.transfer_matrix.shape 

726 nchanw = self.fit_window.shape[0] 

727 if nchan != nchanx or nchan != nchanw: 

728 raise ValueError("map data has wrong shape ", map.shape) 

729 

730 win = np.where(self.fit_window > 0)[0] 

731 w0 = max(0, win[0]-100) 

732 w1 = min(nchan-1, win[-1]+100) 

733 

734 xfer = self.transfer_matrix[w0:w1, :] 

735 win = self.fit_window[w0:w1] 

736 result = np.zeros((ny, nx, ncomps), dtype='float32') 

737 

738 def decomp_lstsq(i0, i1): 

739 """very efficient lstsq""" 

740 tmap = map[i0:i1, :, w0:w1].swapaxes(1, 2) 

741 ny = tmap.shape[0] 

742 win.shape = (win.shape[0], 1) 

743 for iy in range(ny): 

744 results = lstsq(xfer, win*tmap[iy]) 

745 for i in range(ncomps): 

746 result[i0+iy,:,i] = results[0][i] * scale 

747 

748 def decomp_nnls(i0, i1): 

749 """need to explicitly loop of nx as well as ny""" 

750 tmap = map[i0:i1, :, w0:w1] 

751 ny = tmap.shape[0] 

752 for iy in range(ny): 

753 for ix in range(nx): 

754 results = nnls(xfer, win*tmap[iy,ix,:]) 

755 for i in range(ncomps): 

756 result[i0+iy,ix,i] = results[0][i] * scale 

757 

758 decomp = decomp_lstsq 

759 if method == nnls: 

760 decomp = decomp_nnls 

761 # if we're going to use up more than ~1Gb per lstsq, do it in chunks 

762 if (ny*nx*(w1-w0) > 1e8): 

763 nchunks = 1+int(1.e-8*ny*nx*(w1-w0)) 

764 ns = int(ny/nchunks) 

765 for i in range(nchunks): 

766 ilast = (i+1)*ns 

767 if i == nchunks-1: ilast = ny 

768 decomp(i*ns, ilast) 

769 else: 

770 decomp(0, ny) 

771 return {name: result[:,:,i] for i, name in enumerate(self.eigenvalues.keys())} 

772 

773def xrf_model(xray_energy=None, energy_min=1500, energy_max=None, use_bgr=False, **kws): 

774 """create an XRF Peak 

775 

776 Returns: 

777 --------- 

778 an XRF_Model instance 

779 """ 

780 return XRF_Model(xray_energy=xray_energy, use_bgr=use_bgr, 

781 energy_min=energy_min, energy_max=energy_max, **kws) 

782 

783def xrf_fitresult(save_file=None): 

784 """create an XRF Fit Result, possibly restoring from saved file 

785 

786 Returns: 

787 --------- 

788 an XRFFitResult instance 

789 """ 

790 

791 out = XRFFitResult() 

792 if save_file is not None: 

793 out.load(save_file) 

794 return out