Coverage for larch/io/xdi.py: 14%
180 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1#!/usr/bin/env python
2"""
3Read/Write XAS Data Interchange Format for Python
4"""
5import os
6from ctypes import c_long, c_double, c_char_p, c_void_p, pointer, Structure
8__version__ = '1.2.0larch'
10from numpy import array, exp, log, sin, arcsin
12from .. import Group
13from ..larchlib import get_dll
14from ..utils import read_textfile, bytes2str, str2bytes
15from ..utils.physical_constants import RAD2DEG, PLANCK_HC
17class XDIFileStruct(Structure):
18 "emulate XDI File"
19 _fields_ = [('nmetadata', c_long),
20 ('narrays', c_long),
21 ('npts', c_long),
22 ('narray_labels', c_long),
23 ('nouter', c_long),
24 ('error_lineno', c_long),
25 ('dspacing', c_double),
26 ('xdi_libversion',c_char_p),
27 ('xdi_version', c_char_p),
28 ('extra_version', c_char_p),
29 ('filename', c_char_p),
30 ('element', c_char_p),
31 ('edge', c_char_p),
32 ('comments', c_char_p),
33 ('error_line', c_char_p),
34 ('error_message', c_char_p),
35 ('array_labels', c_void_p),
36 ('outer_label', c_char_p),
37 ('array_units', c_void_p),
38 ('meta_families', c_void_p),
39 ('meta_keywords', c_void_p),
40 ('meta_values', c_void_p),
41 ('array', c_void_p),
42 ('outer_array', c_void_p),
43 ('outer_breakpts', c_void_p)]
45string_attrs = ('comments', 'edge', 'element', 'error_line',
46 'error_message', 'extra_version', 'filename',
47 'outer_label', 'xdi_libversion', 'xdi_pyversion',
48 'xdi_version')
51def tostr(val):
52 if isinstance(val, str):
53 return val
54 if isinstance(val, bytes):
55 return val.decode()
56 return str(val)
58def tostrlist(address, nitems):
59 return [str(i, 'utf-8') for i in (nitems*c_char_p).from_address(address)]
61def add_dot2path():
62 """add this folder to begninng of PATH environmental variable"""
63 sep = ':'
64 if os.name == 'nt': sep = ';'
65 paths = os.environ.get('PATH','').split(sep)
66 paths.insert(0, os.path.abspath(os.curdir))
67 os.environ['PATH'] = sep.join(paths)
70XDILIB = None
71def get_xdilib():
72 """make initial connection to XDI dll"""
73 global XDILIB
74 if XDILIB is None:
75 XDILIB = get_dll('xdifile')
76 XDILIB.XDI_errorstring.restype = c_char_p
77 return XDILIB
79class XDIFileException(Exception):
80 """XDI File Exception: General Errors"""
81 def __init__(self, msg, **kws):
82 Exception.__init__(self)
83 self.msg = msg
84 def __str__(self):
85 return self.msg
87class XDIFile(object):
88 """ XAS Data Interchange Format:
90 See https://github.com/XraySpectrscopy/XAS-Data-Interchange
92 for further details
94 >>> xdi_file = XDFIile(filename)
96 Principle methods:
97 read(): read XDI data file, set column data and attributes
98 write(filename): write xdi_file data to an XDI file.
99 """
100 _invalid_msg = "invalid data for '%s': was expecting %s, got '%s'"
102 def __init__(self, filename=None, labels=None):
103 self.filename = filename
104 self.xdi_pyversion = __version__
105 # self.xdilib = get_xdilib()
106 self.comments = []
107 self.data = []
108 self.attrs = {}
109 self.status = None
110 self.user_labels = labels
111 if self.filename:
112 self.read(self.filename)
114 def write(self, filename):
115 "write out an XDI File"
116 print( 'Writing XDI file not currently supported')
118 def read(self, filename=None):
119 """read validate and parse an XDI datafile into python structures
120 """
121 if filename is None and self.filename is not None:
122 filename = self.filename
124 text = read_textfile(filename)
125 lines = text.split('\n')
126 if len(text) < 256 or len(lines) < 6:
127 msg = [f'Error reading XDIFile {filename}',
128 'data file too small to be valid XDI']
129 raise ValueError('\n'.join(msg))
131 pxdi = pointer(XDIFileStruct())
133 xdilib = get_xdilib()
134 self.status = xdilib.XDI_readfile(filename.encode(), pxdi)
135 if self.status < 0:
136 msg = bytes2str(xdilib.XDI_errorstring(self.status))
137 xdilib.XDI_cleanup(pxdi, self.status)
138 msg = 'Error reading XDIFile %s\n%s' % (filename, msg)
139 raise ValueError(msg)
141 xdi = pxdi.contents
142 for attr in dict(xdi._fields_):
143 setattr(self, attr, getattr(xdi, attr))
145 self.array_labels = tostrlist(xdi.array_labels, self.narrays)
147 if self.user_labels is not None:
148 ulab = self.user_labels.replace(',', ' ')
149 ulabs = [l.strip() for l in ulab.split()]
150 self.array_labels[:len(ulabs)] = ulabs
152 arr_units = tostrlist(xdi.array_units, self.narrays)
153 self.array_units = []
154 self.array_addrs = []
155 for unit in arr_units:
156 addr = ''
157 if '||' in unit:
158 unit, addr = [x.strip() for x in unit.split('||', 1)]
159 self.array_units.append(unit)
160 self.array_addrs.append(addr)
162 mfams = tostrlist(xdi.meta_families, self.nmetadata)
163 mkeys = tostrlist(xdi.meta_keywords, self.nmetadata)
164 mvals = tostrlist(xdi.meta_values, self.nmetadata)
165 self.attrs = {}
166 for fam, key, val in zip(mfams, mkeys, mvals):
167 fam = fam.lower()
168 key = key.lower()
169 if fam not in self.attrs:
170 self.attrs[fam] = {}
171 self.attrs[fam][key] = val
173 parrays = (xdi.narrays*c_void_p).from_address(xdi.array)[:]
174 self.data = [(xdi.npts*c_double).from_address(p)[:] for p in parrays]
176 nout = xdi.nouter
177 outer, breaks = [], []
178 if nout > 1:
179 outer = (nout*c_double).from_address(xdi.outer_array)[:]
180 breaks = (nout*c_long).from_address(xdi.outer_breakpts)[:]
181 for attr in ('outer_array', 'outer_breakpts', 'nouter'):
182 delattr(self, attr)
183 self.outer_array = array(outer)
184 self.outer_breakpts = array(breaks)
186 self.data = array(self.data)
187 self.data.shape = (self.narrays, self.npts)
188 self._assign_arrays()
189 for attr in ('nmetadata', 'narray_labels', 'meta_families',
190 'meta_keywords', 'meta_values', 'array'):
191 delattr(self, attr)
192 xdilib.XDI_cleanup(pxdi, 0)
194 def _assign_arrays(self):
195 """assign data arrays for principle data attributes:
196 energy, angle, i0, itrans, ifluor, irefer,
197 mutrans, mufluor, murefer, etc. """
199 xunits = 'eV'
200 xname = None
201 ix = -1
202 self.data = array(self.data)
204 for idx, name in enumerate(self.array_labels):
205 dat = self.data[idx,:]
206 setattr(self, name, dat)
207 if name in ('energy', 'angle'):
208 ix = idx
209 xname = name
210 units = self.array_units[idx]
211 if units is not None:
212 xunits = units
214 # convert energy to angle, or vice versa
215 monodat = {}
216 if 'mono' in self.attrs:
217 monodat = self.attrs['mono']
218 elif 'monochromator' in self.attrs:
219 monodat = self.attrs['monochromator']
221 if ix >= 0 and 'd_spacing' in monodat:
222 dspacing = monodat['d_spacing'].strip()
223 dunits = 'Angstroms'
224 if ' ' in dspacing:
225 dspacing, dunits = dspacing.split(None, 1)
226 self.dspacing = float(dspacing)
227 self.dspacing_units = dunits
229 omega = PLANCK_HC/(2*self.dspacing)
230 if xname == 'energy' and not hasattr(self, 'angle'):
231 energy_ev = self.energy
232 if xunits.lower() == 'kev':
233 energy_ev = 1000. * energy_ev
234 self.angle = RAD2DEG * arcsin(omega/energy_ev)
235 elif xname == 'angle' and not hasattr(self, 'energy'):
236 angle_rad = self.angle
237 if xunits.lower() in ('deg', 'degrees'):
238 angle_rad = angle_rad / RAD2DEG
239 self.energy = omega/sin(angle_rad)
241 if hasattr(self, 'i0'):
242 if hasattr(self, 'itrans') and not hasattr(self, 'mutrans'):
243 self.mutrans = -log(self.itrans / (self.i0+1.e-12))
244 elif hasattr(self, 'mutrans') and not hasattr(self, 'itrans'):
245 self.itrans = self.i0 * exp(-self.mutrans)
246 if hasattr(self, 'ifluor') and not hasattr(self, 'mufluor'):
247 self.mufluor = self.ifluor/(self.i0+1.e-12)
249 elif hasattr(self, 'mufluor') and not hasattr(self, 'ifluor'):
250 self.ifluor = self.i0 * self.mufluor
252 if hasattr(self, 'itrans'):
253 if hasattr(self, 'irefer') and not hasattr(self, 'murefer'):
254 self.murefer = -log(self.irefer / (self.itrans+1.e-12))
256 elif hasattr(self, 'murefer') and not hasattr(self, 'irefer'):
257 self.irefer = self.itrans * exp(-self.murefer)
260def read_xdi(filename, labels=None):
261 """read an XDI File into a Group
263 Arguments:
264 filename (str): name of file to read
265 labels (str or None): string to use for setting array names [None]
267 Returns:
268 Group
270 A data group containing data read from file, with XDI attributes and
271 conventions.
273 Notes:
274 1. See https://github.com/XraySpectrscopy/XAS-Data-Interchange
276 2. if `labels` is `None` (default), the names of the output arrays
277 will be determined from the XDI column label in the XDI header.
278 To override these array names, use a string with space or comma
279 separating names for the arrays.
282 Example:
283 >>> from larch.io import xdi
284 >>> fe3_data = read_xdi('FeXAFS_Fe2O3.001')
285 >>> print(fe3_data.array_labels)
286 ['energy', 'mutrans', 'i0']
288 >>> fec3 = read_xdi('fe3c_rt.xdi', labels='e, x, y')
289 >>> print(fec3.array_labels)
290 ['e', 'x', 'y']
292 See Also:
293 read_ascii
295 """
296 xdif = XDIFile(filename, labels=labels)
297 group = Group()
298 for key, val in xdif.__dict__.items():
299 if not key.startswith('_'):
300 if key in string_attrs:
301 val = tostr(val)
302 setattr(group, key, val)
303 group.__name__ ='XDI file %s' % filename
304 doc = ['%i arrays, %i npts' % (xdif.narrays, xdif.npts)]
305 arr_labels = getattr(xdif, 'array_labels', None)
306 if arr_labels is not None:
307 doc.append("Array Labels: %s" % repr(arr_labels))
308 group.__doc__ = '\n'.join(doc)
310 group.path = filename
311 path, fname = os.path.split(filename)
312 group.filename = fname
313 return group