Coverage for larch/io/gse_mcafile.py: 12%

277 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/python 

2 

3import os 

4import copy 

5import numpy as np 

6from scipy.interpolate import UnivariateSpline 

7 

8from larch import Group 

9from larch.xrf import MCA, ROI 

10 

11def str2floats(s, delim='&'): 

12 s = s.replace('&', ' ') 

13 return [float(i) for i in s.split()] 

14 

15def str2ints(s, delim='&'): 

16 return [int(i) for i in str2floats(s, delim=delim)] 

17 

18def str2str(s, delim='&'): 

19 s = s.strip() 

20 return [i.strip() for i in s.split(delim) if len(i) > 0] 

21 

22class GSEMCA_Header(object): 

23 version = 'unknown' 

24 date = '' 

25 elements = 1 

26 channels = 2048 

27 rois = [] 

28 live_time = [] 

29 real_time = [] 

30 cal_slope = [] 

31 cal_offset = [] 

32 cal_quad = [] 

33 

34class GSEMCA_File(Group): 

35 """ 

36 Read GSECARS style MCA / Multi-element MCA files 

37 """ 

38 def __init__(self, filename=None, text=None, bad=None, **kws): 

39 kwargs = {'name': 'GSE MCA File: %s' % filename} 

40 kwargs.update(kws) 

41 Group.__init__(self, **kwargs) 

42 self.mcas = [] 

43 self.__mca0 = None 

44 self.bad = bad 

45 if bad is None: 

46 self.bad = [] 

47 

48 self.filename = filename 

49 if filename: 

50 self.read(filename=filename) 

51 elif text is not None: 

52 self.readtext(text) 

53 

54 def __get_mca0(self, chan_min=2, min_counts=2): 

55 """ find first good detector for alignment 

56 'good' is defined as at least min_counts counts 

57 above channel chan_min 

58 """ 

59 if self.__mca0 is None: 

60 for imca, mca in enumerate(self.mcas): 

61 if mca.counts[chan_min:].sum() > min_counts: 

62 self.__mca0 = mca 

63 self.offset = mca.offset 

64 self.slope = mca.slope 

65 self.quad = mca.quad 

66 break 

67 elif imca not in self.bad: 

68 self.bad.append(imca) 

69 if self.__mca0 is None: 

70 self.__mca0 = mca = self.mcas[0] 

71 self.offset = mca.offset 

72 self.slope = mca.slope 

73 self.quad = mca.quad 

74 return self.__mca0 

75 

76 def get_energy(self, imca=None): 

77 "get energy, optionally selecting which mca to use" 

78 if imca is not None: 

79 mca = self.mcas[imca] 

80 else: 

81 mca = self.__get_mca0() 

82 return mca.get_energy() 

83 

84 def get_counts(self, dt_correct=True, align=True): 

85 """ get summed MCA spectra, 

86 

87 Options: 

88 -------- 

89 align align spectra in energy before summing (True). 

90 """ 

91 mca0 = self.__get_mca0() 

92 en = mca0.get_energy() 

93 dat = 0 

94 for mca in self.mcas: 

95 mdat = mca.counts 

96 if align and mca != mca0: 

97 _en = mca.get_energy() 

98 mdat = UnivariateSpline(_en, mdat, s=0)(en) 

99 if dt_correct: 

100 mdat = mdat * mca.dt_factor 

101 dat = dat + mdat 

102 return dat.astype(np.int32) 

103 

104 def predict_pileup(self, scale=None): 

105 """predict pileup for an MCA spectra from its auto-correlation. 

106 

107 Options: 

108 -------- 

109 scale factor to apply to convolution [found from data] 

110 

111 the output `pileup` will be the average of the `pileup` for each MCA 

112 

113 """ 

114 pileup = self.mcas[0].counts * 0.0 

115 for m in self.mcas: 

116 m.predict_pileup(scale=scale) 

117 pileup += m.pileup 

118 self.pileup = pileup / len(self.mcas) 

119 

120 def predict_escape(self, det='Si', scale=1): 

121 """predict detector escape, save to `escape` attribute 

122 

123 Options: 

124 -------- 

125 det detector material ['Si'] 

126 scale scale factor [1] 

127 

128 Outputs 'escape' attribute will contain the average `escape` for each MCA 

129 """ 

130 escape = self.mcas[0].counts * 0.0 

131 for m in self.mcas: 

132 m.predict_escape(det=det, scale=scale) 

133 escape += m.escape 

134 self.escape = escape / len(self.mcas) 

135 

136 def read(self, filename=None, bad=None): 

137 """read GSE MCA file""" 

138 self.filename = filename 

139 with open(filename, 'r') as fh: 

140 return self.readtext(fh.read(), bad=bad) 

141 

142 def readtext(self, text, bad=None): 

143 """read text of GSE MCA file""" 

144 lines = text.split('\n') 

145 if bad is None: 

146 bad = self.bad 

147 nrow = 0 

148 data_mode = 'HEADER' 

149 counts = [] 

150 rois = [] 

151 environ = [] 

152 self.incident_energy = None 

153 head = self.header = GSEMCA_Header() 

154 for l in lines: 

155 l = l.strip() 

156 if len(l) < 1: continue 

157 if data_mode == 'DATA': 

158 counts.append(str2ints(l)) 

159 else: 

160 pos = l.find(' ') 

161 if (pos == -1): pos = len(l) 

162 tag = l[0:pos].strip().lower() 

163 if tag.endswith(':'): 

164 tag = tag[:-1] 

165 val = l[pos:len(l)].strip() 

166 if tag in ('version', 'date'): 

167 setattr(head, tag, val) 

168 elif tag in ('elements', 'channels'): 

169 setattr(head, tag, int(val)) 

170 elif tag in ('real_time', 'live_time', 'cal_offset', 

171 'cal_slope', 'cal_quad'): 

172 setattr(head, tag, str2floats(val)) 

173 elif tag == 'rois': 

174 head.rois = str2ints(val) 

175 self.nrois = max(head.rois) 

176 elif tag == 'data': 

177 data_mode = 'DATA' 

178 elif tag == 'environment': 

179 addr, val = val.split('="') 

180 val, desc = val.split('"') 

181 val = val.strip() 

182 desc = desc.strip() 

183 if desc.startswith('(') and desc.endswith(')'): 

184 desc = desc[1:-1] 

185 environ.append((desc, val, addr)) 

186 if 'mono' in desc.lower() and 'energy' in desc.lower(): 

187 try: 

188 val = float(val) 

189 except ValueError: 

190 pass 

191 self.incident_energy = val 

192 elif tag[0:4] == 'roi_': 

193 iroi, item = tag[4:].split('_') 

194 iroi = int(iroi) 

195 if iroi >= len(rois): 

196 for ir in range(1 + iroi - len(rois)): 

197 rois.append({'label':[], 'right':[], 'left':[]}) 

198 if item == "label": 

199 rois[iroi]['label'] = str2str(val, delim='&') 

200 elif item == "left": 

201 rois[iroi]['left'] = str2ints(val) 

202 elif item == "right": 

203 rois[iroi]['right'] = str2ints(val) 

204 else: 

205 pass # print(" Warning: " , tag, " is not supported here!") 

206 

207 # 

208 counts = np.array(counts) 

209 ## Data has been read, now store in MCA objects 

210 sum_mca = None 

211 

212 for tag in ('real_time', 'live_time', 'cal_offset', 

213 'cal_slope', 'cal_quad'): 

214 val = getattr(head, tag) 

215 # print( ' Attr ', tag, val) 

216 if len(val) == 1 and head.elements > 1: 

217 val = [val[0]]*head.elements 

218 setattr(head, tag, val) 

219 for imca in range(head.elements): 

220 thismca = MCA(name='mca%i' % (imca+1), 

221 nchans=head.channels, 

222 counts=counts[:,imca], 

223 start_time=head.date, 

224 offset=head.cal_offset[imca], 

225 slope=head.cal_slope[imca], 

226 quad=head.cal_quad[imca], 

227 real_time=head.real_time[imca], 

228 live_time=head.live_time[imca]) 

229 

230 for desc, val, addr in environ: 

231 thismca.add_environ(desc=desc, val=val, addr=addr) 

232 

233 for roi in rois: 

234 left = roi['left'][imca] 

235 right = roi['right'][imca] 

236 label = roi['label'][imca] 

237 if right > 1 and len(label) > 1: 

238 thismca.add_roi(name=label, left=left, right=right, 

239 sort=False, counts=counts[:,imca]) 

240 thismca.rois.sort() 

241 self.mcas.append(thismca) 

242 

243 mca0 = self.__get_mca0() 

244 self.counts = self.get_counts() 

245 self.raw = self.get_counts(dt_correct=False) 

246 self.name = 'mcasum' 

247 self.energy = mca0.energy[:]*1.0 

248 self.environ = mca0.environ 

249 self.real_time = mca0.real_time 

250 self.live_time = mca0.live_time 

251 self.offset = mca0.offset 

252 self.slope = mca0.slope 

253 self.quad = mca0.quad 

254 self.rois = [] 

255 for roi in mca0.rois: 

256 self.add_roi(name=roi.name, left=roi.left, 

257 right=roi.right, sort=False, 

258 counts=counts, to_mcas=False) 

259 self.rois.sort() 

260 return 

261 

262 def add_roi(self, name='', left=0, right=0, bgr_width=3, 

263 counts=None, sort=True, to_mcas=True): 

264 """add an ROI to the sum spectra""" 

265 name = name.strip() 

266 # print('GSEMCA: Add ROI ', name, left, right) 

267 roi = ROI(name=name, left=left, right=right, 

268 bgr_width=bgr_width, counts=counts) 

269 rnames = [r.name.lower() for r in self.rois] 

270 if name.lower() in rnames: 

271 iroi = rnames.index(name.lower()) 

272 self.rois[iroi] = roi 

273 else: 

274 self.rois.append(roi) 

275 if sort: 

276 self.rois.sort() 

277 if to_mcas: 

278 mca0 = self.__get_mca0() 

279 slo0 = mca0.slope 

280 off0 = mca0.offset 

281 mca0.add_roi(name=name, left=left, right=right, 

282 bgr_width=bgr_width) 

283 for mca in self.mcas: 

284 if mca != mca0: 

285 xleft = int(0.5 + ((off0 + left*slo0) - mca.offset)/mca.slope) 

286 xright = int(0.5 + ((off0 + right*slo0) - mca.offset)/mca.slope) 

287 mca.add_roi(name=name, left=xleft, right=xright, 

288 bgr_width=bgr_width) 

289 

290 def save_mcafile(self, filename): 

291 """ 

292 write multi-element MCA file 

293 Parameters: 

294 ----------- 

295 * filename: output file name 

296 """ 

297 with open(filename, 'w') as fh: 

298 fh.write(self.dump_mcafile()) 

299 

300 def dump_mcafile(self): 

301 """return text of MCA file, not writing to disk, as for dumping""" 

302 nchans = len(self.counts) 

303 ndet = len(self.mcas) 

304 

305 # formatted count times and calibration 

306 rtimes = ["%f" % m.real_time for m in self.mcas] 

307 ltimes = ["%f" % m.live_time for m in self.mcas] 

308 offsets = ["%e" % m.offset for m in self.mcas] 

309 slopes = ["%e" % m.slope for m in self.mcas] 

310 quads = ["%e" % m.quad for m in self.mcas] 

311 

312 b = ['VERSION: 3.1'] 

313 b.append('ELEMENTS: %i' % ndet) 

314 b.append('DATE: %s' % self.mcas[0].start_time) 

315 b.append('CHANNELS: %i' % nchans) 

316 b.append('REAL_TIME: %s' % ' '.join(rtimes)) 

317 b.append('LIVE_TIME: %s' % ' '.join(ltimes)) 

318 b.append('CAL_OFFSET: %s' % ' '.join(offsets)) 

319 b.append('CAL_SLOPE: %s' % ' '.join(slopes)) 

320 b.append('CAL_QUAD: %s' % ' '.join(quads)) 

321 

322 # Write ROIS in channel units 

323 nrois = ["%i" % len(m.rois) for m in self.mcas] 

324 rois = [m.rois for m in self.mcas] 

325 b.append('ROIS: %s' % ' '.join(nrois)) 

326 

327 # don't assume number of ROIS is same for all elements 

328 nrois = max([len(r) for r in rois]) 

329 # print('NROIS ' , nrois, [len(r) for r in rois]) 

330 for ir, r in enumerate(rois): 

331 if len(r) < nrois: 

332 for i in range(nrois - len(r)): 

333 r.append(ROI(name='', left=0, right=0)) 

334 # print( 'NROIS ' , nrois, [len(r) for r in rois]) 

335 for i in range(len(rois[0])): 

336 names = ' & '.join([r[i].name for r in rois]) 

337 left = ' '.join(['%i' % r[i].left for r in rois]) 

338 right = ' '.join(['%i' % r[i].right for r in rois]) 

339 b.append('ROI_%i_LEFT: %s' % (i, left)) 

340 b.append('ROI_%i_RIGHT: %s' % (i, right)) 

341 b.append('ROI_%i_LABEL: %s &' % (i, names)) 

342 

343 # environment 

344 for e in self.environ: 

345 b.append('ENVIRONMENT: %s="%s" (%s)' % (e.addr, e.val, e.desc)) 

346 # data 

347 b.append('DATA: ') 

348 for i in range(nchans): 

349 d = ' '.join(["%i" % m.counts[i] for m in self.mcas]) 

350 b.append(" %s" % d) 

351 b.append('') 

352 return '\n'.join(b) 

353 

354 def save_ascii(self, filename): 

355 """ 

356 write multi-element MCA file to XDI-style ASCII file 

357 Parameters: 

358 ----------- 

359 * filename: output file name 

360 """ 

361 nchans = len(self.counts) 

362 ndet = len(self.mcas) 

363 

364 

365 mca0 = self.mcas[0] 

366 buff = ['# XDI/1.0 GSE/1.0', 

367 '# Collection.date: %s' % mca0.start_time, 

368 '# Collection.n_detectors: %i' % ndet, 

369 '# Collection.n_channels: %i' % nchans, 

370 '# Collection.real_time: %i' % mca0.real_time, 

371 '# Collection.live_time: %s' % mca0.live_time, 

372 '# Calibration.offset: %s' % mca0.offset, 

373 '# Calibration.slope: %s' % mca0.slope, 

374 '# Calibration.quad: %s' % mca0.quad, 

375 '# Column.1: energy keV'] 

376 

377 label = '# energy ' 

378 for i in range(ndet): 

379 buff.append('# Column.%i: MCA%i counts' % (i+2, i+1)) 

380 label = '%s MCA%i ' % (label, i+1) 

381 

382 froi = '# ROI.%i: %s [%i:%i]' 

383 fenv = '# ENV.%s: %s [%s]' 

384 for i, roi in enumerate(mca0.rois): 

385 buff.append(froi % (i, roi.name, roi.left, roi.right)) 

386 

387 

388 for e in self.environ: 

389 desc = e.desc.replace(' ', '_') 

390 buff.append(fenv % (desc, e.val, e.addr)) 

391 buff.append('#--------------------') 

392 buff.append(label) 

393 

394 # data 

395 for i in range(nchans): 

396 d = ['%9.3f' % self.energy[i]] 

397 d.extend(['%11i' % m.counts[i] for m in self.mcas]) 

398 buff.append(' %s' % ' '.join(d)) 

399 buff.append('') 

400 

401 fp = open(filename, 'w') 

402 fp.write('\n'.join(buff)) 

403 fp.close() 

404 

405 

406def gsemca_group(filename=None, text=None, **kws): 

407 """read GSECARS MCA file to larch group""" 

408 return GSEMCA_File(filename=filename, text=text)