Coverage for larch/io/xafs_beamlines.py: 93%

341 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2""" 

3classes for handling XAFS data in plaintext column files for various beamlines. 

4 

5 

6Basically, a class for XAFS Beamline data. This defines 

7 a) how to name the arrays for columns in the data file 

8 b) which column is most likely to hold the energy (or energy-definig) array 

9 c) what the energy units are most likely to be. 

10 

11Specific beamline data should define a class that derives from GenericBeamlineData 

12and has the following attributes/methods: 

13 

14 

15 energy_column : int index for default energy column 

16 

17 energy_units : str ('eV', 'keV', 'deg') for default expected energy units 

18 

19 beamline_matches(): method to decide whether data may be from the beamline 

20 should give more false positives than false negatives. 

21 

22 get_array_labels(): method to guess array labels. 

23 

24The XXX__BeamlineData class will be given *only* the headerlines (a list of lines) 

25from the text file. 

26 

27By default, that header will defined all the text before the data table. 

28 

29""" 

30 

31import numpy as np 

32from ..utils import fix_varname 

33 

34def guess_beamline(header=None): 

35 """ 

36 guess beamline data class used to parse headers from header lines 

37 """ 

38 if header is None: 

39 header = [''] 

40 if len(header) > 1: 

41 line1 = header[0].lower() 

42 full = '\n'.join(header).lower() 

43 

44 if line1.startswith('#'): 

45 line1 = line1.replace('#', '') 

46 

47 if 'xdi/1' in line1 and 'epics stepscan' in line1: 

48 return APSGSE_BeamlineData 

49 elif line1.startswith('; epics scan 1 dim'): 

50 return APSGSE_BeamlineData 

51 elif 'labview control panel' in line1: 

52 return APSXSD_BeamlineData 

53 elif 'mrcat_xafs' in line1: 

54 return APSMRCAT_BeamlineData 

55 elif line1.startswith('xdac'): 

56 return NSLSXDAC_BeamlineData 

57 elif 'ssrl' in line1 and 'exafs data collector' in line1: 

58 return SSRL_BeamlineData 

59 elif 'cls data acquisition' in line1: 

60 return CLSHXMA_BeamlineData 

61 elif 'kek-pf' in line1: 

62 return KEKPF_BeamlineData 

63 elif 'exafsscan' in full and 'exafs_region' in full: 

64 return APS12BM_BeamlineData 

65 return GenericBeamlineData 

66 

67 

68class GenericBeamlineData: 

69 """ 

70 Generic beamline data file - use as last resort 

71 

72 This parses the last header line for labels: 

73 First, it remove any leading '#', '#C', '#L', and 'C' as if 

74 collected by Spec or many other collection systems. 

75 

76 Next, it removes bad characters ',#@%&' and quotes. 

77 Then, it splits on whitespace and fixes names to make 

78 sure they are valid variable names 

79 """ 

80 energy_column = 1 

81 energy_units = 'eV' 

82 mono_dspace = -1 

83 name = 'generic' 

84 

85 def __init__(self, headerlines=None): 

86 if headerlines is None: 

87 headerlines = [''] 

88 self.headerlines = list(headerlines) 

89 

90 def beamline_matches(self): 

91 return len(self.headerlines) > 1 

92 

93 def get_array_labels(self, ncolumns=None): 

94 lastline = "# " 

95 if len(self.headerlines) >= 1: 

96 lastline = self.headerlines[-1].strip() 

97 for cchars in ('#L', '#C', '#', 'C'): 

98 if lastline.startswith(cchars): 

99 lastline = lastline[len(cchars):] 

100 for badchar in '\t,#@%&"\'': 

101 lastline = lastline.replace(badchar, ' ') 

102 return self._set_labels(lastline.split(), ncolumns=ncolumns) 

103 

104 def _set_labels(self, inlabels, ncolumns=None): 

105 """ 

106 final parsing, cleaning, ensuring number of columns is satisfied 

107 """ 

108 labels = [] 

109 for i, word in enumerate(inlabels): 

110 word = word.strip().lower() 

111 if len(word) > 0: 

112 word = fix_varname(word) 

113 else: 

114 word = 'col%d' % (i+1) 

115 labels.append(word) 

116 for i, lab in enumerate(labels): 

117 if lab in labels[:i]: 

118 labels[i] = lab + '_col%d' % (i+1) 

119 

120 if ncolumns is not None and len(labels) < ncolumns: 

121 for i in range(len(labels), ncolumns): 

122 labels.append('col%d' % (i+1)) 

123 self.labels = labels 

124 return labels 

125 

126 

127class APSGSE_BeamlineData(GenericBeamlineData): 

128 """ 

129 GSECARS EpicsScan data, APS 13ID, some NSLS-II XFM 4BM data 

130 """ 

131 name = 'GSE EpicsScan' 

132 energy_column = 1 

133 

134 def __init__(self, headerlines=None): 

135 GenericBeamlineData.__init__(self, headerlines=headerlines) 

136 

137 def beamline_matches(self): 

138 line1 = '' 

139 if len(self.headerlines) > 0: 

140 line1 = self.headerlines[0].lower() 

141 return (('xdi/1' in line1 and 'epics stepscan' in line1) or 

142 line1.startswith('; epics scan 1 dim')) 

143 

144 

145 def get_array_labels(self, ncolumns=None): 

146 if not self.beamline_matches(): 

147 raise ValueError('header is not from beamline %s' % self.name) 

148 

149 line1 = self.headerlines[0].lower() 

150 oldstyle = line1.startswith('; epics scan 1 dim') 

151 

152 labels = [] 

153 if oldstyle: 

154 mode = 'search' 

155 for line in self.headerlines: 

156 line = line[1:].strip() 

157 if mode == 'found legend': 

158 if len(line) < 2 or '-->' not in line: 

159 mode = 'legend done' 

160 else: 

161 pref, suff = line.split('-->', 1) 

162 pid, arg = pref.split('=') 

163 arg = arg.replace('{', '').replace('}','') 

164 labels.append(arg.strip()) 

165 elif mode == 'search' and 'column labels:' in line: 

166 mode = 'found legend' 

167 

168 

169 else: 

170 for line in self.headerlines: 

171 if line.startswith('#'): 

172 line = line[1:].strip() 

173 else: 

174 break 

175 if line.lower().startswith('column.') and '||' in line: 

176 label, pvname = line.split('||', 1) 

177 label, entry = label.split(':') 

178 entry = entry.strip() 

179 if ' ' in entry: 

180 words = [a.strip() for a in entry.split()] 

181 if len(words) > 1: 

182 entry, units = words[0], words[1] 

183 if 'energy' in entry.lower() and len(units) > 1: 

184 self.energy_units = units 

185 labels.append(entry) 

186 return self._set_labels(labels, ncolumns=ncolumns) 

187 

188 

189class APS12BM_BeamlineData(GenericBeamlineData): 

190 """ 

191 APS sector 12BM data 

192 """ 

193 name = 'APS 12BM' 

194 energy_column = 1 

195 

196 def __init__(self, headerlines=None): 

197 GenericBeamlineData.__init__(self, headerlines=headerlines) 

198 

199 def beamline_matches(self): 

200 """ must see 'exafs_region' """ 

201 match = False 

202 if len(self.headerlines) > 0: 

203 for line in self.headerlines: 

204 if not line.startswith('#'): 

205 match = False 

206 break 

207 if 'exafs_region' in line: 

208 match = True 

209 return match 

210 

211 def get_array_labels(self, ncolumns=None): 

212 if not self.beamline_matches(): 

213 raise ValueError('header is not from beamline %s' % self.name) 

214 

215 labelline = self.headerlines[-1].replace('#C', ' ').strip() 

216 words = labelline.split() 

217 

218 labels = [] 

219 for word in words: 

220 if '_' in word: 

221 pref, suff = word.split('_') 

222 isint = False 

223 try: 

224 ipref = int(pref) 

225 isint = True 

226 except ValueError: 

227 pass 

228 if isint: labels.append(suff) 

229 elif len(labels) == 1: 

230 word = word.replace('(', '').replace(')', '') 

231 self.energy_units = word 

232 return self._set_labels(labels, ncolumns=ncolumns) 

233 

234 

235class APSMRCAT_BeamlineData(GenericBeamlineData): 

236 """ 

237 APS sector 10ID or 10BM data 

238 """ 

239 name = 'APS MRCAT' 

240 energy_column = 1 

241 

242 def __init__(self, headerlines=None): 

243 GenericBeamlineData.__init__(self, headerlines=headerlines) 

244 

245 def beamline_matches(self): 

246 line1 = '' 

247 if len(self.headerlines) > 0: 

248 line1 = self.headerlines[0] 

249 return ('MRCAT_XAFS' in line1) 

250 

251 def get_array_labels(self, ncolumns=None): 

252 if not self.beamline_matches(): 

253 raise ValueError('header is not from beamline %s' % self.name) 

254 

255 labels = [] 

256 mode = 'search' 

257 for line in self.headerlines: 

258 if mode == 'found': 

259 labels = line.strip().split() 

260 break 

261 if mode == 'search' and '-------' in line: 

262 mode = 'found' 

263 

264 return self._set_labels(labels, ncolumns=ncolumns) 

265 

266 

267class APSXSD_BeamlineData(GenericBeamlineData): 

268 """ 

269 APS sector 20ID, 20BM, 9BM 

270 """ 

271 name = 'APS XSD' 

272 energy_column = 1 

273 

274 def __init__(self, headerlines=None): 

275 GenericBeamlineData.__init__(self, headerlines=headerlines) 

276 

277 def beamline_matches(self): 

278 line1 = '' 

279 if len(self.headerlines) > 0: 

280 line1 = self.headerlines[0] 

281 return ('LabVIEW Control Panel' in line1) 

282 

283 def get_array_labels(self, ncolumns=None): 

284 if not self.beamline_matches(): 

285 raise ValueError('header is not from beamline %s' % self.name) 

286 

287 # here we try two different ways for "older" and "newer" 20BM/9BM fles 

288 labels = [] 

289 mode = 'search' 

290 tmplabels = {} 

291 maxkey = -1 

292 for line in self.headerlines: 

293 line = line[1:].strip() 

294 if mode == 'search' and 'is a readable list of column' in line: 

295 mode = 'found legend' 

296 elif mode == 'found legend': 

297 if len(line) < 2: 

298 break 

299 if ')' in line: 

300 if line.startswith('#'): 

301 line = line[1:].strip() 

302 

303 pars = [] 

304 for k in range(len(line)): 

305 if line[k] == ')': 

306 pars.append(k) 

307 

308 pars.append(len(line)) 

309 for k in range(len(pars)-1): 

310 j = pars[k] 

311 i = max(0, j-2) 

312 key = line[i:j] 

313 z = pars[k+1] 

314 if z < len(line)-3: 

315 for o in range(1, 4): 

316 try: 

317 _ = int(line[z-o]) 

318 except: 

319 break 

320 z = z-o+1 

321 val = line[j+1:z].strip() 

322 if val.endswith('*'): 

323 val = val[:-1].strip() 

324 

325 try: 

326 key = int(key) 

327 maxkey = max(maxkey, key) 

328 except: 

329 break 

330 tmplabels[key] = val 

331 

332 

333 if len(tmplabels) > 1: 

334 maxkey = max(maxkey, len(tmplabels)) 

335 labels = ['']* (maxkey+5) 

336 for k, v in tmplabels.items(): 

337 labels[k] = v 

338 labels = [o for o in labels if len(o) > 0] 

339 

340 # older version: no explicit legend, parse last header line, uses '*' 

341 if len(labels) == 0: 

342 labelline = self.headerlines[-1].replace('#', '') 

343 words = labelline.split('*') 

344 if len(words) > 1: 

345 lastword = words.pop() 

346 words.extend(lastword.split()) 

347 labels = words 

348 

349 return self._set_labels(labels, ncolumns=ncolumns) 

350 

351 

352class NSLSXDAC_BeamlineData(GenericBeamlineData): 

353 """ 

354 NSLS (I) XDAC collected data 

355 """ 

356 name = 'NSLS XDAC' 

357 energy_column = 1 

358 

359 def __init__(self, headerlines=None): 

360 GenericBeamlineData.__init__(self, headerlines=headerlines) 

361 

362 def beamline_matches(self): 

363 line1 = '' 

364 if len(self.headerlines) > 0: 

365 line1 = self.headerlines[0].replace('#', '').strip() 

366 return line1.startswith('XDAC') 

367 

368 def get_array_labels(self, ncolumns=None): 

369 if not self.beamline_matches(): 

370 raise ValueError('header is not from beamline %s' % self.name) 

371 

372 labels = [] 

373 mode = 'search' 

374 for line in self.headerlines: 

375 if mode == 'found': 

376 labels = line.strip().split() 

377 break 

378 if mode == 'search' and '-------' in line: 

379 mode = 'found' 

380 

381 return self._set_labels(labels, ncolumns=ncolumns) 

382 

383 

384class SSRL_BeamlineData(GenericBeamlineData): 

385 """ 

386 SSRL EXAFS Data Collect beamline data 

387 """ 

388 name = 'SSRL' 

389 energy_column = 1 

390 

391 def __init__(self, headerlines=None): 

392 GenericBeamlineData.__init__(self, headerlines=headerlines) 

393 

394 def beamline_matches(self): 

395 line1 = '' 

396 if len(self.headerlines) > 0: 

397 line1 = self.headerlines[0] 

398 return ('ssrl' in line1.lower() and 'exafs data collector' in line1.lower()) 

399 

400 def get_array_labels(self, ncolumns=None): 

401 if not self.beamline_matches(): 

402 raise ValueError('header is not from beamline %s' % self.name) 

403 

404 labels = [] 

405 mode = 'search' 

406 for line in self.headerlines: 

407 line = line.strip() 

408 if mode == 'found legend': 

409 if len(line) < 2: 

410 mode = 'legend done' 

411 break 

412 else: 

413 labels.append(line) 

414 if 'energy' in line.lower(): 

415 self.energy_column = len(labels) 

416 elif mode == 'search' and line == 'Data:': 

417 mode = 'found legend' 

418 

419 return self._set_labels(labels, ncolumns=ncolumns) 

420 

421 

422class CLSHXMA_BeamlineData(GenericBeamlineData): 

423 """ 

424 CLS HXMA beamline data 

425 """ 

426 name = 'CLS HXMA' 

427 energy_column = 1 

428 

429 def __init__(self, headerlines=None): 

430 GenericBeamlineData.__init__(self, headerlines=headerlines) 

431 

432 def beamline_matches(self): 

433 line1 = '' 

434 if len(self.headerlines) > 0: 

435 line1 = self.headerlines[0] 

436 return ('cls data acquisition' in line1.lower()) 

437 

438 def get_array_labels(self, ncolumns=None): 

439 if not self.beamline_matches(): 

440 raise ValueError('header is not from beamline %s' % self.name) 

441 

442 labels = [] 

443 for line in self.headerlines: 

444 line = line.strip() 

445 if line.startswith('#(1)') and '$(' in line: 

446 line = line.replace('#(1)', '') 

447 for bchar in '"#$()\t': 

448 line = line.replace(bchar, ' ') 

449 labels = line.split() 

450 

451 labels = [fix_varname(word.strip().lower()) for word in labels] 

452 for i, label in enumerate(labels): 

453 if 'energy' in label: 

454 self.energy_column = i+1 

455 return self._set_labels(labels, ncolumns=ncolumns) 

456 

457 

458class KEKPF_BeamlineData(GenericBeamlineData): 

459 """ 

460 KEK-PF (Photon Factory Data), as from BL12C 

461 """ 

462 name = 'KEK PF' 

463 energy_column = 2 

464 energy_units = 'deg' 

465 

466 def __init__(self, headerlines=None): 

467 GenericBeamlineData.__init__(self, headerlines=headerlines) 

468 

469 def beamline_matches(self): 

470 line1 = '' 

471 if len(self.headerlines) > 0: 

472 line1 = self.headerlines[0].replace('#', '').strip() 

473 return 'KEK-PF' in line1 

474 

475 def get_array_labels(self, ncolumns=None): 

476 if not self.beamline_matches(): 

477 raise ValueError('header is not from beamline %s' % self.name) 

478 

479 for line in self.headerlines: 

480 line = line.lower().replace('#', ' ').strip() 

481 if 'mono :' in line: 

482 words = ['_'] + line.replace('=', ' ').split() 

483 for i, w in enumerate(words): 

484 if i == 0: continue 

485 if words[i-1] == 'd': 

486 try: 

487 self.mono_dspace = float(w) 

488 except ValueError: 

489 pass 

490 lastline = self.headerlines[-1] 

491 ncols = len(lastline.strip().split()) 

492 if ncolumns is not None: 

493 ncols = max(ncols, ncolumns) 

494 

495 labels= ['angle_drive', 'angle_read', 'time'] 

496 return self._set_labels(labels, ncolumns=ncols)