Coverage for larch/io/xafs_beamlines.py: 93%
341 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1#!/usr/bin/env python
2"""
3classes for handling XAFS data in plaintext column files for various beamlines.
6Basically, a class for XAFS Beamline data. This defines
7 a) how to name the arrays for columns in the data file
8 b) which column is most likely to hold the energy (or energy-definig) array
9 c) what the energy units are most likely to be.
11Specific beamline data should define a class that derives from GenericBeamlineData
12and has the following attributes/methods:
15 energy_column : int index for default energy column
17 energy_units : str ('eV', 'keV', 'deg') for default expected energy units
19 beamline_matches(): method to decide whether data may be from the beamline
20 should give more false positives than false negatives.
22 get_array_labels(): method to guess array labels.
24The XXX__BeamlineData class will be given *only* the headerlines (a list of lines)
25from the text file.
27By default, that header will defined all the text before the data table.
29"""
31import numpy as np
32from ..utils import fix_varname
34def guess_beamline(header=None):
35 """
36 guess beamline data class used to parse headers from header lines
37 """
38 if header is None:
39 header = ['']
40 if len(header) > 1:
41 line1 = header[0].lower()
42 full = '\n'.join(header).lower()
44 if line1.startswith('#'):
45 line1 = line1.replace('#', '')
47 if 'xdi/1' in line1 and 'epics stepscan' in line1:
48 return APSGSE_BeamlineData
49 elif line1.startswith('; epics scan 1 dim'):
50 return APSGSE_BeamlineData
51 elif 'labview control panel' in line1:
52 return APSXSD_BeamlineData
53 elif 'mrcat_xafs' in line1:
54 return APSMRCAT_BeamlineData
55 elif line1.startswith('xdac'):
56 return NSLSXDAC_BeamlineData
57 elif 'ssrl' in line1 and 'exafs data collector' in line1:
58 return SSRL_BeamlineData
59 elif 'cls data acquisition' in line1:
60 return CLSHXMA_BeamlineData
61 elif 'kek-pf' in line1:
62 return KEKPF_BeamlineData
63 elif 'exafsscan' in full and 'exafs_region' in full:
64 return APS12BM_BeamlineData
65 return GenericBeamlineData
68class GenericBeamlineData:
69 """
70 Generic beamline data file - use as last resort
72 This parses the last header line for labels:
73 First, it remove any leading '#', '#C', '#L', and 'C' as if
74 collected by Spec or many other collection systems.
76 Next, it removes bad characters ',#@%&' and quotes.
77 Then, it splits on whitespace and fixes names to make
78 sure they are valid variable names
79 """
80 energy_column = 1
81 energy_units = 'eV'
82 mono_dspace = -1
83 name = 'generic'
85 def __init__(self, headerlines=None):
86 if headerlines is None:
87 headerlines = ['']
88 self.headerlines = list(headerlines)
90 def beamline_matches(self):
91 return len(self.headerlines) > 1
93 def get_array_labels(self, ncolumns=None):
94 lastline = "# "
95 if len(self.headerlines) >= 1:
96 lastline = self.headerlines[-1].strip()
97 for cchars in ('#L', '#C', '#', 'C'):
98 if lastline.startswith(cchars):
99 lastline = lastline[len(cchars):]
100 for badchar in '\t,#@%&"\'':
101 lastline = lastline.replace(badchar, ' ')
102 return self._set_labels(lastline.split(), ncolumns=ncolumns)
104 def _set_labels(self, inlabels, ncolumns=None):
105 """
106 final parsing, cleaning, ensuring number of columns is satisfied
107 """
108 labels = []
109 for i, word in enumerate(inlabels):
110 word = word.strip().lower()
111 if len(word) > 0:
112 word = fix_varname(word)
113 else:
114 word = 'col%d' % (i+1)
115 labels.append(word)
116 for i, lab in enumerate(labels):
117 if lab in labels[:i]:
118 labels[i] = lab + '_col%d' % (i+1)
120 if ncolumns is not None and len(labels) < ncolumns:
121 for i in range(len(labels), ncolumns):
122 labels.append('col%d' % (i+1))
123 self.labels = labels
124 return labels
127class APSGSE_BeamlineData(GenericBeamlineData):
128 """
129 GSECARS EpicsScan data, APS 13ID, some NSLS-II XFM 4BM data
130 """
131 name = 'GSE EpicsScan'
132 energy_column = 1
134 def __init__(self, headerlines=None):
135 GenericBeamlineData.__init__(self, headerlines=headerlines)
137 def beamline_matches(self):
138 line1 = ''
139 if len(self.headerlines) > 0:
140 line1 = self.headerlines[0].lower()
141 return (('xdi/1' in line1 and 'epics stepscan' in line1) or
142 line1.startswith('; epics scan 1 dim'))
145 def get_array_labels(self, ncolumns=None):
146 if not self.beamline_matches():
147 raise ValueError('header is not from beamline %s' % self.name)
149 line1 = self.headerlines[0].lower()
150 oldstyle = line1.startswith('; epics scan 1 dim')
152 labels = []
153 if oldstyle:
154 mode = 'search'
155 for line in self.headerlines:
156 line = line[1:].strip()
157 if mode == 'found legend':
158 if len(line) < 2 or '-->' not in line:
159 mode = 'legend done'
160 else:
161 pref, suff = line.split('-->', 1)
162 pid, arg = pref.split('=')
163 arg = arg.replace('{', '').replace('}','')
164 labels.append(arg.strip())
165 elif mode == 'search' and 'column labels:' in line:
166 mode = 'found legend'
169 else:
170 for line in self.headerlines:
171 if line.startswith('#'):
172 line = line[1:].strip()
173 else:
174 break
175 if line.lower().startswith('column.') and '||' in line:
176 label, pvname = line.split('||', 1)
177 label, entry = label.split(':')
178 entry = entry.strip()
179 if ' ' in entry:
180 words = [a.strip() for a in entry.split()]
181 if len(words) > 1:
182 entry, units = words[0], words[1]
183 if 'energy' in entry.lower() and len(units) > 1:
184 self.energy_units = units
185 labels.append(entry)
186 return self._set_labels(labels, ncolumns=ncolumns)
189class APS12BM_BeamlineData(GenericBeamlineData):
190 """
191 APS sector 12BM data
192 """
193 name = 'APS 12BM'
194 energy_column = 1
196 def __init__(self, headerlines=None):
197 GenericBeamlineData.__init__(self, headerlines=headerlines)
199 def beamline_matches(self):
200 """ must see 'exafs_region' """
201 match = False
202 if len(self.headerlines) > 0:
203 for line in self.headerlines:
204 if not line.startswith('#'):
205 match = False
206 break
207 if 'exafs_region' in line:
208 match = True
209 return match
211 def get_array_labels(self, ncolumns=None):
212 if not self.beamline_matches():
213 raise ValueError('header is not from beamline %s' % self.name)
215 labelline = self.headerlines[-1].replace('#C', ' ').strip()
216 words = labelline.split()
218 labels = []
219 for word in words:
220 if '_' in word:
221 pref, suff = word.split('_')
222 isint = False
223 try:
224 ipref = int(pref)
225 isint = True
226 except ValueError:
227 pass
228 if isint: labels.append(suff)
229 elif len(labels) == 1:
230 word = word.replace('(', '').replace(')', '')
231 self.energy_units = word
232 return self._set_labels(labels, ncolumns=ncolumns)
235class APSMRCAT_BeamlineData(GenericBeamlineData):
236 """
237 APS sector 10ID or 10BM data
238 """
239 name = 'APS MRCAT'
240 energy_column = 1
242 def __init__(self, headerlines=None):
243 GenericBeamlineData.__init__(self, headerlines=headerlines)
245 def beamline_matches(self):
246 line1 = ''
247 if len(self.headerlines) > 0:
248 line1 = self.headerlines[0]
249 return ('MRCAT_XAFS' in line1)
251 def get_array_labels(self, ncolumns=None):
252 if not self.beamline_matches():
253 raise ValueError('header is not from beamline %s' % self.name)
255 labels = []
256 mode = 'search'
257 for line in self.headerlines:
258 if mode == 'found':
259 labels = line.strip().split()
260 break
261 if mode == 'search' and '-------' in line:
262 mode = 'found'
264 return self._set_labels(labels, ncolumns=ncolumns)
267class APSXSD_BeamlineData(GenericBeamlineData):
268 """
269 APS sector 20ID, 20BM, 9BM
270 """
271 name = 'APS XSD'
272 energy_column = 1
274 def __init__(self, headerlines=None):
275 GenericBeamlineData.__init__(self, headerlines=headerlines)
277 def beamline_matches(self):
278 line1 = ''
279 if len(self.headerlines) > 0:
280 line1 = self.headerlines[0]
281 return ('LabVIEW Control Panel' in line1)
283 def get_array_labels(self, ncolumns=None):
284 if not self.beamline_matches():
285 raise ValueError('header is not from beamline %s' % self.name)
287 # here we try two different ways for "older" and "newer" 20BM/9BM fles
288 labels = []
289 mode = 'search'
290 tmplabels = {}
291 maxkey = -1
292 for line in self.headerlines:
293 line = line[1:].strip()
294 if mode == 'search' and 'is a readable list of column' in line:
295 mode = 'found legend'
296 elif mode == 'found legend':
297 if len(line) < 2:
298 break
299 if ')' in line:
300 if line.startswith('#'):
301 line = line[1:].strip()
303 pars = []
304 for k in range(len(line)):
305 if line[k] == ')':
306 pars.append(k)
308 pars.append(len(line))
309 for k in range(len(pars)-1):
310 j = pars[k]
311 i = max(0, j-2)
312 key = line[i:j]
313 z = pars[k+1]
314 if z < len(line)-3:
315 for o in range(1, 4):
316 try:
317 _ = int(line[z-o])
318 except:
319 break
320 z = z-o+1
321 val = line[j+1:z].strip()
322 if val.endswith('*'):
323 val = val[:-1].strip()
325 try:
326 key = int(key)
327 maxkey = max(maxkey, key)
328 except:
329 break
330 tmplabels[key] = val
333 if len(tmplabels) > 1:
334 maxkey = max(maxkey, len(tmplabels))
335 labels = ['']* (maxkey+5)
336 for k, v in tmplabels.items():
337 labels[k] = v
338 labels = [o for o in labels if len(o) > 0]
340 # older version: no explicit legend, parse last header line, uses '*'
341 if len(labels) == 0:
342 labelline = self.headerlines[-1].replace('#', '')
343 words = labelline.split('*')
344 if len(words) > 1:
345 lastword = words.pop()
346 words.extend(lastword.split())
347 labels = words
349 return self._set_labels(labels, ncolumns=ncolumns)
352class NSLSXDAC_BeamlineData(GenericBeamlineData):
353 """
354 NSLS (I) XDAC collected data
355 """
356 name = 'NSLS XDAC'
357 energy_column = 1
359 def __init__(self, headerlines=None):
360 GenericBeamlineData.__init__(self, headerlines=headerlines)
362 def beamline_matches(self):
363 line1 = ''
364 if len(self.headerlines) > 0:
365 line1 = self.headerlines[0].replace('#', '').strip()
366 return line1.startswith('XDAC')
368 def get_array_labels(self, ncolumns=None):
369 if not self.beamline_matches():
370 raise ValueError('header is not from beamline %s' % self.name)
372 labels = []
373 mode = 'search'
374 for line in self.headerlines:
375 if mode == 'found':
376 labels = line.strip().split()
377 break
378 if mode == 'search' and '-------' in line:
379 mode = 'found'
381 return self._set_labels(labels, ncolumns=ncolumns)
384class SSRL_BeamlineData(GenericBeamlineData):
385 """
386 SSRL EXAFS Data Collect beamline data
387 """
388 name = 'SSRL'
389 energy_column = 1
391 def __init__(self, headerlines=None):
392 GenericBeamlineData.__init__(self, headerlines=headerlines)
394 def beamline_matches(self):
395 line1 = ''
396 if len(self.headerlines) > 0:
397 line1 = self.headerlines[0]
398 return ('ssrl' in line1.lower() and 'exafs data collector' in line1.lower())
400 def get_array_labels(self, ncolumns=None):
401 if not self.beamline_matches():
402 raise ValueError('header is not from beamline %s' % self.name)
404 labels = []
405 mode = 'search'
406 for line in self.headerlines:
407 line = line.strip()
408 if mode == 'found legend':
409 if len(line) < 2:
410 mode = 'legend done'
411 break
412 else:
413 labels.append(line)
414 if 'energy' in line.lower():
415 self.energy_column = len(labels)
416 elif mode == 'search' and line == 'Data:':
417 mode = 'found legend'
419 return self._set_labels(labels, ncolumns=ncolumns)
422class CLSHXMA_BeamlineData(GenericBeamlineData):
423 """
424 CLS HXMA beamline data
425 """
426 name = 'CLS HXMA'
427 energy_column = 1
429 def __init__(self, headerlines=None):
430 GenericBeamlineData.__init__(self, headerlines=headerlines)
432 def beamline_matches(self):
433 line1 = ''
434 if len(self.headerlines) > 0:
435 line1 = self.headerlines[0]
436 return ('cls data acquisition' in line1.lower())
438 def get_array_labels(self, ncolumns=None):
439 if not self.beamline_matches():
440 raise ValueError('header is not from beamline %s' % self.name)
442 labels = []
443 for line in self.headerlines:
444 line = line.strip()
445 if line.startswith('#(1)') and '$(' in line:
446 line = line.replace('#(1)', '')
447 for bchar in '"#$()\t':
448 line = line.replace(bchar, ' ')
449 labels = line.split()
451 labels = [fix_varname(word.strip().lower()) for word in labels]
452 for i, label in enumerate(labels):
453 if 'energy' in label:
454 self.energy_column = i+1
455 return self._set_labels(labels, ncolumns=ncolumns)
458class KEKPF_BeamlineData(GenericBeamlineData):
459 """
460 KEK-PF (Photon Factory Data), as from BL12C
461 """
462 name = 'KEK PF'
463 energy_column = 2
464 energy_units = 'deg'
466 def __init__(self, headerlines=None):
467 GenericBeamlineData.__init__(self, headerlines=headerlines)
469 def beamline_matches(self):
470 line1 = ''
471 if len(self.headerlines) > 0:
472 line1 = self.headerlines[0].replace('#', '').strip()
473 return 'KEK-PF' in line1
475 def get_array_labels(self, ncolumns=None):
476 if not self.beamline_matches():
477 raise ValueError('header is not from beamline %s' % self.name)
479 for line in self.headerlines:
480 line = line.lower().replace('#', ' ').strip()
481 if 'mono :' in line:
482 words = ['_'] + line.replace('=', ' ').split()
483 for i, w in enumerate(words):
484 if i == 0: continue
485 if words[i-1] == 'd':
486 try:
487 self.mono_dspace = float(w)
488 except ValueError:
489 pass
490 lastline = self.headerlines[-1]
491 ncols = len(lastline.strip().split())
492 if ncolumns is not None:
493 ncols = max(ncols, ncolumns)
495 labels= ['angle_drive', 'angle_read', 'time']
496 return self._set_labels(labels, ncolumns=ncols)