Coverage for larch/io/gse_xdiscan.py: 6%

277 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2 

3import os 

4import sys 

5import copy 

6import os 

7import time 

8import gc 

9 

10import numpy as np 

11from .. import Group 

12from ..utils import read_textfile 

13from ..utils.strutils import bytes2str 

14 

15from . import XDIFile, XDIFileException 

16 

17from .xsp3_hdf5 import XSPRESS3_TAU, estimate_icr 

18 

19def read_gsexdi(fname, nmca=128, bad=None, **kws): 

20 """Read GSE XDI Scan Data to larch group, 

21 summing ROI data for MCAs and apply deadtime corrections 

22 """ 

23 

24 MAX_MCAS = nmca 

25 group = Group() 

26 group.__name__ ='GSE XDI Data file %s' % fname 

27 xdi = XDIFile(str(fname)) 

28 

29 group.path = fname 

30 path, suffix = os.path.split(fname) 

31 group.filename = suffix 

32 group.npts = xdi.npts 

33 group.bad_channels = bad 

34 for family in ('scan', 'beamline', 'mono', 'facility'): 

35 for key, val in xdi.attrs.get(family, {}).items(): 

36 if '||' in val: 

37 val, addr = val.split('||') 

38 try: 

39 val = float(val) 

40 except: 

41 pass 

42 setattr(group, "%s_%s" % (family, key), val) 

43 

44 group._xdi = Group() 

45 

46 for thing in ('array_addrs', 'array_labels', 'array_units', 'attrs', 

47 'comments', 'status', 'user_labels', 'xdi_libversion', 

48 'xdi_version', 'xdi_pyversion'): 

49 

50 val = copy.deepcopy(getattr(xdi, thing, None)) 

51 if isinstance(val, bytes): 

52 val = val.decode('utf-8') 

53 setattr(group._xdi, thing, val) 

54 

55 scanparams = xdi.attrs.get('scanparameters', None) 

56 if scanparams is not None: 

57 scan_e0 = scanparams.get('e0', None) 

58 if scan_e0 is not None: 

59 group.scan_e0 = float(scan_e0) 

60 scan_elem = scanparams.get('element', None) 

61 if scan_elem is not None: 

62 group.element = scan_elem 

63 scan_edge = scanparams.get('edge', None) 

64 if scan_edge is not None: 

65 group.edge = scan_edge 

66 scan_type = scanparams.get('scantype', None) 

67 if scan_type is not None: 

68 group.scan_type = scan_type 

69 

70 ocrs, icrs = [], [] 

71 ctime = None 

72 for attrname in dir(xdi): 

73 if attrname.lower() == 'counttime': 

74 ctime = getattr(xdi, attrname) 

75 if ctime is None: 

76 try: 

77 ctime = xdi.TSCALER / 5.e7 

78 except AttributeError: 

79 ctime = 1.0 

80 

81 is_old_xsp3 = any(['13QX4' in a[1] for a in xdi.attrs['column'].items()]) 

82 

83 dtc_mode = 'icr/ocr' 

84 for i in range(MAX_MCAS): 

85 mca = "mca%i" % (i+1) 

86 ocr = getattr(xdi, 'OutputCounts_%s' % mca, None) 

87 clock = getattr(xdi, 'Clock_%s' % mca, None) 

88 icr = getattr(xdi, 'InputCounts_%s' % mca, None) 

89 dtfact = getattr(xdi, 'DTFactor_%s' % mca, None) 

90 resets = getattr(xdi, 'ResetTicks_%s' % mca, None) 

91 allevt = getattr(xdi, 'AllEvent_%s' % mca, None) 

92 if (ocr is None and icr is None and clock is None and 

93 dtfact is None and resets is None): 

94 nmca = i 

95 break 

96 if ocr is None: 

97 ocr = ctime 

98 ocr = ocr/ctime 

99 if icr is not None: 

100 icr = icr/ctime 

101 

102 # Get ICR from one of several alternatives: 

103 # 1. InputCounts_mca was given 

104 # 2. DTFactor_mca was given 

105 # 3. ResetTicks_mca and AllEvets_mca were given 

106 # 4. Use "known values" for tau 

107 if icr is None: 

108 if dtfact is not None: 

109 icr = ocr * dtfact 

110 dtc_mode = 'dtfactor' 

111 elif (clock is not None and 

112 resets is not None and 

113 allevt is not None): 

114 dtfact = clock/(clock - (6*allevt + resets)) 

115 icr = ocr * dtfact 

116 dtc_mode = 'resets' 

117 # finally estimate from measured values of tau: 

118 if icr is None: 

119 icr = 1.0*ocr 

120 if is_old_xsp3: 

121 icr = estimate_icr(ocr*1.00, XSPRESS3_TAU, niter=7) 

122 ocrs.append(ocr) 

123 icrs.append(icr) 

124 

125 group.dtc_mode = dtc_mode 

126 labels = [] 

127 sums = {} 

128 for i, arrname in enumerate(xdi.array_labels): 

129 dat = getattr(xdi, arrname) 

130 if arrname.lower() == 'data': 

131 arrname = '_data' 

132 aname = sumname = rawname = arrname.lower() 

133 if ('_mca' in aname and 

134 'outputcounts' not in aname and 

135 'dtfactor' not in aname and 

136 'clock' not in aname): 

137 sumname, imca = sumname.split('_mca') 

138 if bad is not None and int(imca) in bad: 

139 sumname = sumname + '_bad' 

140 datraw = dat*1.0 

141 rawname = sumname + '_nodtc' 

142 imca = int(imca) - 1 

143 dat = dat * icrs[imca]/ ocrs[imca] 

144 if any(np.isnan(dat)): 

145 nan_pts = np.where(np.isnan(dat))[0] 

146 dat[nan_pts] = datraw[nan_pts] 

147 

148 setattr(group, aname, dat) 

149 if sumname not in labels: 

150 labels.append(sumname) 

151 sums[sumname] = dat 

152 if rawname != sumname: 

153 sums[rawname] = datraw 

154 if rawname not in labels: 

155 labels.append(rawname) 

156 

157 else: 

158 sums[sumname] = sums[sumname] + dat 

159 if rawname != sumname: 

160 sums[rawname] = sums[rawname] + datraw 

161 

162 for name, dat in sums.items(): 

163 if not hasattr(group, name): 

164 setattr(group, name, dat) 

165 

166 for arrname in xdi.array_labels: 

167 sname = arrname.lower() 

168 if sname not in labels: 

169 labels.append(sname) 

170 

171 data = [] 

172 for name in labels: 

173 data.append(getattr(group, name)) 

174 group.data = np.array(data) 

175 for imca in range(nmca): 

176 setattr(group, 'ocr_mca%i' % (imca+1), ocrs[imca]) 

177 setattr(group, 'icr_mca%i' % (imca+1), icrs[imca]) 

178 group.array_labels = labels 

179 return group 

180 

181 

182DTC_header = '''# XDI/1.1 Epics StepScan File/2.0 

183# Beamline.name: 13-ID-E, GSECARS 

184# Monochromator.name: %(mono_cut)s, LN2 Cooled 

185# Monochromator.dspacing: %(mono_dspace)s 

186# Facility.name: APS 

187# Facility.xray_source: 3.6 cm undulator 

188# Detectors.i0: 20cm ion chamber, He 

189# Detectors.ifluor: Si SDD Vortex ME-4, 4 elements 

190# Detectors.ifluor_electronics: Quantum Xspress3 3.1.10''' 

191 

192def is_GSEXDI(filename): 

193 """test if file is GSE XDI data file 

194 reads only the first line of file 

195 """ 

196 line1 = read_textfile(filename, size=40).split('\n')[0] 

197 return (line1.startswith('#XDI/1') and 'Epics StepScan File' in line1) 

198 

199def gsexdi_deadtime_correct(fname, channelname, subdir='DT_Corrected', 

200 bad=None): 

201 """convert GSE XDI fluorescence XAFS scans to dead time corrected files""" 

202 if not is_GSEXDI(fname): 

203 print("'%s' is not a GSE XDI scan file\n" % fname) 

204 return 

205 

206 out = Group() 

207 out.orig_filename = fname 

208 try: 

209 xdi = read_gsexdi(fname, bad=bad) 

210 except: 

211 print('Could not read XDI file ', fname) 

212 return 

213 

214 for attr in ('energy', 'i0', 'i1', 'i2', 'tscaler', 

215 'counttime', 'scan_start_time', 'scan_end_time'): 

216 if hasattr(xdi, attr): 

217 setattr(out, attr, getattr(xdi, attr)) 

218 

219 # some scans may not record separate counttime, but TSCALER 

220 # is clock ticks for a 50MHz clock 

221 if not hasattr(out, 'counttime'): 

222 out.counttime = xdi.tscaler * 2.e-8 

223 

224 if hasattr(xdi, 'energy_readback'): 

225 out.energy = xdi.energy_readback 

226 

227 arrname = None 

228 channelname = channelname.lower().replace(' ', '_') 

229 

230 for arr in xdi.array_labels: 

231 if arr.lower().startswith(channelname): 

232 arrname = arr 

233 break 

234 if arrname is None: 

235 print('Cannot find Channel %s in file %s '% (channelname, fname)) 

236 return 

237 

238 out.ifluor = getattr(xdi, arrname) 

239 out.ifluor_raw = getattr(xdi, arrname) 

240 arrname_raw = arrname + '_nodtc' 

241 if arrname_raw in xdi.array_labels: 

242 out.ifluor_raw = getattr(xdi, arrname_raw) 

243 

244 out.mufluor = out.ifluor / out.i0 

245 TINY = 2.e-20 

246 if hasattr(out, 'i1') or hasattr(out, 'itrans'): 

247 i1 = getattr(out, 'i1', None) 

248 if i1 is None: 

249 i1 = getattr(out, 'itrans', None) 

250 if i1 is not None: 

251 i1[np.isnan(i1)] = TINY 

252 i1 = i1 / out.i0 

253 i1[np.where(i1<TINY)] = TINY 

254 out.mutrans = -np.log(i1) 

255 

256 

257 npts = len(out.i0) 

258 col0_name = xdi.array_labels[0].lower() 

259 col0_units = None 

260 col0_data = xdi.data[0, :] 

261 if col0_name == 'energy': 

262 col0_data = out.energy 

263 col0_units = 'eV' 

264 

265 buff = ['# XDI/1.0 GSE/1.0'] 

266 

267 header = {} 

268 hgroups = ['beamline', 'facility', 'mono', 'undulator', 'detectors', 

269 'scaler', 'detectorstage', 'samplestage', 'scan', 'scanparameters'] 

270 hskip = ['scanparameters.end', 'scanparameters.start'] 

271 for agroup in hgroups: 

272 attrs = xdi._xdi.attrs.get(agroup, {}) 

273 if agroup == 'mono': agroup = 'monochromator' 

274 header[agroup] = {} 

275 for sname in sorted(attrs.keys()): 

276 if "%s.%s" %( agroup, sname) not in hskip: 

277 header[agroup][sname] = attrs[sname] 

278 

279 

280 header['facility']['name'] = 'APS' 

281 header['facility']['xray_source'] = '3.6 cm undulator' 

282 header['beamline']['name'] = '13-ID-E, GSECARS' 

283 

284 header['detectors']['i0'] = '20cm ion chamber, He' 

285 header['detectors']['ifluor'] = 'Si SDD Vortex ME-4, 4 elements' 

286 header['detectors']['ifluor_electronics'] = 'Quantum Xspress3 3.1.10' 

287 

288 mono_cut = 'Si(111)' 

289 if xdi.mono_dspacing < 2: 

290 mono_cut = 'Si(311)' 

291 header['monochromator']['name'] = "%s, LN2 cooled" % mono_cut 

292 

293 out_arrays = {} 

294 out_arrays[col0_name] = (col0_name, col0_units) 

295 out_arrays['mufluor'] = ('mufluor', None) 

296 if hasattr(out, 'i1'): 

297 out_arrays['mutrans'] = ('mutrans', None) 

298 

299 out_arrays['ifluor'] = ('ifluor', '# deadtime-corrected') 

300 out_arrays['ifluor_raw'] = ('ifluor_raw', '# not deadtime-corrected') 

301 out_arrays['i0'] = ('i0', None) 

302 

303 if hasattr(out, 'i1'): 

304 out_arrays['itrans'] = ('i1', None) 

305 if hasattr(out, 'i2'): 

306 out_arrays['irefer'] = ('i2', None) 

307 

308 if hasattr(out, 'counttime'): 

309 out_arrays['counttime'] = ('counttime', 'sec') 

310 

311 arrlabel = [] 

312 for iarr, aname in enumerate(out_arrays): 

313 lab = "%12s " % aname 

314 if iarr == 0: lab = "%11s " % aname 

315 arrlabel.append(lab) 

316 extra = out_arrays[aname][1] 

317 if extra is None: extra = '' 

318 buff.append("# Column.%i: %s %s" % (iarr+1, aname, extra)) 

319 

320 arrlabel = '#%s' % (' '.join(arrlabel)) 

321 ncol = len(out_arrays) 

322 

323 for family, fval in header.items(): 

324 for attr, val in fval.items(): 

325 buff.append("# %s.%s: %s" % (family.title(), attr, val)) 

326 

327 

328 buff.append("# ///") 

329 for comment in bytes2str(xdi._xdi.comments).split('\n'): 

330 c = comment.strip() 

331 if len(c) > 0: 

332 buff.append('# %s' % c) 

333 buff.extend(["# summed %s fluorescence data from %s" % (channelname, fname), 

334 "# Dead-time correction applied", 

335 "#"+ "-"*78, arrlabel]) 

336 

337 efmt = "%11.4f" 

338 ffmt = "%13.7f" 

339 gfmt = "%13.7g" 

340 for i in range(npts): 

341 dline = ["", efmt % col0_data[i], ffmt % out.mufluor[i]] 

342 

343 if hasattr(out, 'i1'): 

344 dline.append(ffmt % out.mutrans[i]) 

345 

346 dline.extend([gfmt % out.ifluor[i], 

347 gfmt % out.ifluor_raw[i], 

348 gfmt % out.i0[i]]) 

349 

350 if hasattr(out, 'i1'): 

351 dline.append(gfmt % out.i1[i]) 

352 if hasattr(out, 'i2'): 

353 dline.append(gfmt % out.i2[i]) 

354 if hasattr(out, 'counttime'): 

355 dline.append(gfmt % out.counttime[i]) 

356 

357 buff.append(" ".join(dline)) 

358 ofile = fname[:] 

359 if ofile.startswith('..'): 

360 ofile = ofile[3:] 

361 ofile = ofile.replace('.', '_') + '.dat' 

362 ofile = os.path.join(subdir, ofile) 

363 if not os.path.exists(subdir): 

364 os.mkdir(subdir) 

365 try: 

366 fout = open(ofile, 'w', encoding=sys.getdefaultencoding()) 

367 fout.write("\n".join(buff)) 

368 fout.close() 

369 print("wrote %s, npts=%i, channel='%s'" % (ofile, npts, channelname)) 

370 except: 

371 print("could not open / write to output file %s" % ofile) 

372 

373 return out