Coverage for larch/io/xdi.py: 14%

180 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2""" 

3Read/Write XAS Data Interchange Format for Python 

4""" 

5import os 

6from ctypes import c_long, c_double, c_char_p, c_void_p, pointer, Structure 

7 

8__version__ = '1.2.0larch' 

9 

10from numpy import array, exp, log, sin, arcsin 

11 

12from .. import Group 

13from ..larchlib import get_dll 

14from ..utils import read_textfile, bytes2str, str2bytes 

15from ..utils.physical_constants import RAD2DEG, PLANCK_HC 

16 

17class XDIFileStruct(Structure): 

18 "emulate XDI File" 

19 _fields_ = [('nmetadata', c_long), 

20 ('narrays', c_long), 

21 ('npts', c_long), 

22 ('narray_labels', c_long), 

23 ('nouter', c_long), 

24 ('error_lineno', c_long), 

25 ('dspacing', c_double), 

26 ('xdi_libversion',c_char_p), 

27 ('xdi_version', c_char_p), 

28 ('extra_version', c_char_p), 

29 ('filename', c_char_p), 

30 ('element', c_char_p), 

31 ('edge', c_char_p), 

32 ('comments', c_char_p), 

33 ('error_line', c_char_p), 

34 ('error_message', c_char_p), 

35 ('array_labels', c_void_p), 

36 ('outer_label', c_char_p), 

37 ('array_units', c_void_p), 

38 ('meta_families', c_void_p), 

39 ('meta_keywords', c_void_p), 

40 ('meta_values', c_void_p), 

41 ('array', c_void_p), 

42 ('outer_array', c_void_p), 

43 ('outer_breakpts', c_void_p)] 

44 

45string_attrs = ('comments', 'edge', 'element', 'error_line', 

46 'error_message', 'extra_version', 'filename', 

47 'outer_label', 'xdi_libversion', 'xdi_pyversion', 

48 'xdi_version') 

49 

50 

51def tostr(val): 

52 if isinstance(val, str): 

53 return val 

54 if isinstance(val, bytes): 

55 return val.decode() 

56 return str(val) 

57 

58def tostrlist(address, nitems): 

59 return [str(i, 'utf-8') for i in (nitems*c_char_p).from_address(address)] 

60 

61def add_dot2path(): 

62 """add this folder to begninng of PATH environmental variable""" 

63 sep = ':' 

64 if os.name == 'nt': sep = ';' 

65 paths = os.environ.get('PATH','').split(sep) 

66 paths.insert(0, os.path.abspath(os.curdir)) 

67 os.environ['PATH'] = sep.join(paths) 

68 

69 

70XDILIB = None 

71def get_xdilib(): 

72 """make initial connection to XDI dll""" 

73 global XDILIB 

74 if XDILIB is None: 

75 XDILIB = get_dll('xdifile') 

76 XDILIB.XDI_errorstring.restype = c_char_p 

77 return XDILIB 

78 

79class XDIFileException(Exception): 

80 """XDI File Exception: General Errors""" 

81 def __init__(self, msg, **kws): 

82 Exception.__init__(self) 

83 self.msg = msg 

84 def __str__(self): 

85 return self.msg 

86 

87class XDIFile(object): 

88 """ XAS Data Interchange Format: 

89 

90 See https://github.com/XraySpectrscopy/XAS-Data-Interchange 

91 

92 for further details 

93 

94 >>> xdi_file = XDFIile(filename) 

95 

96 Principle methods: 

97 read(): read XDI data file, set column data and attributes 

98 write(filename): write xdi_file data to an XDI file. 

99 """ 

100 _invalid_msg = "invalid data for '%s': was expecting %s, got '%s'" 

101 

102 def __init__(self, filename=None, labels=None): 

103 self.filename = filename 

104 self.xdi_pyversion = __version__ 

105 # self.xdilib = get_xdilib() 

106 self.comments = [] 

107 self.data = [] 

108 self.attrs = {} 

109 self.status = None 

110 self.user_labels = labels 

111 if self.filename: 

112 self.read(self.filename) 

113 

114 def write(self, filename): 

115 "write out an XDI File" 

116 print( 'Writing XDI file not currently supported') 

117 

118 def read(self, filename=None): 

119 """read validate and parse an XDI datafile into python structures 

120 """ 

121 if filename is None and self.filename is not None: 

122 filename = self.filename 

123 

124 text = read_textfile(filename) 

125 lines = text.split('\n') 

126 if len(text) < 256 or len(lines) < 6: 

127 msg = [f'Error reading XDIFile {filename}', 

128 'data file too small to be valid XDI'] 

129 raise ValueError('\n'.join(msg)) 

130 

131 pxdi = pointer(XDIFileStruct()) 

132 

133 xdilib = get_xdilib() 

134 self.status = xdilib.XDI_readfile(filename.encode(), pxdi) 

135 if self.status < 0: 

136 msg = bytes2str(xdilib.XDI_errorstring(self.status)) 

137 xdilib.XDI_cleanup(pxdi, self.status) 

138 msg = 'Error reading XDIFile %s\n%s' % (filename, msg) 

139 raise ValueError(msg) 

140 

141 xdi = pxdi.contents 

142 for attr in dict(xdi._fields_): 

143 setattr(self, attr, getattr(xdi, attr)) 

144 

145 self.array_labels = tostrlist(xdi.array_labels, self.narrays) 

146 

147 if self.user_labels is not None: 

148 ulab = self.user_labels.replace(',', ' ') 

149 ulabs = [l.strip() for l in ulab.split()] 

150 self.array_labels[:len(ulabs)] = ulabs 

151 

152 arr_units = tostrlist(xdi.array_units, self.narrays) 

153 self.array_units = [] 

154 self.array_addrs = [] 

155 for unit in arr_units: 

156 addr = '' 

157 if '||' in unit: 

158 unit, addr = [x.strip() for x in unit.split('||', 1)] 

159 self.array_units.append(unit) 

160 self.array_addrs.append(addr) 

161 

162 mfams = tostrlist(xdi.meta_families, self.nmetadata) 

163 mkeys = tostrlist(xdi.meta_keywords, self.nmetadata) 

164 mvals = tostrlist(xdi.meta_values, self.nmetadata) 

165 self.attrs = {} 

166 for fam, key, val in zip(mfams, mkeys, mvals): 

167 fam = fam.lower() 

168 key = key.lower() 

169 if fam not in self.attrs: 

170 self.attrs[fam] = {} 

171 self.attrs[fam][key] = val 

172 

173 parrays = (xdi.narrays*c_void_p).from_address(xdi.array)[:] 

174 self.data = [(xdi.npts*c_double).from_address(p)[:] for p in parrays] 

175 

176 nout = xdi.nouter 

177 outer, breaks = [], [] 

178 if nout > 1: 

179 outer = (nout*c_double).from_address(xdi.outer_array)[:] 

180 breaks = (nout*c_long).from_address(xdi.outer_breakpts)[:] 

181 for attr in ('outer_array', 'outer_breakpts', 'nouter'): 

182 delattr(self, attr) 

183 self.outer_array = array(outer) 

184 self.outer_breakpts = array(breaks) 

185 

186 self.data = array(self.data) 

187 self.data.shape = (self.narrays, self.npts) 

188 self._assign_arrays() 

189 for attr in ('nmetadata', 'narray_labels', 'meta_families', 

190 'meta_keywords', 'meta_values', 'array'): 

191 delattr(self, attr) 

192 xdilib.XDI_cleanup(pxdi, 0) 

193 

194 def _assign_arrays(self): 

195 """assign data arrays for principle data attributes: 

196 energy, angle, i0, itrans, ifluor, irefer, 

197 mutrans, mufluor, murefer, etc. """ 

198 

199 xunits = 'eV' 

200 xname = None 

201 ix = -1 

202 self.data = array(self.data) 

203 

204 for idx, name in enumerate(self.array_labels): 

205 dat = self.data[idx,:] 

206 setattr(self, name, dat) 

207 if name in ('energy', 'angle'): 

208 ix = idx 

209 xname = name 

210 units = self.array_units[idx] 

211 if units is not None: 

212 xunits = units 

213 

214 # convert energy to angle, or vice versa 

215 monodat = {} 

216 if 'mono' in self.attrs: 

217 monodat = self.attrs['mono'] 

218 elif 'monochromator' in self.attrs: 

219 monodat = self.attrs['monochromator'] 

220 

221 if ix >= 0 and 'd_spacing' in monodat: 

222 dspacing = monodat['d_spacing'].strip() 

223 dunits = 'Angstroms' 

224 if ' ' in dspacing: 

225 dspacing, dunits = dspacing.split(None, 1) 

226 self.dspacing = float(dspacing) 

227 self.dspacing_units = dunits 

228 

229 omega = PLANCK_HC/(2*self.dspacing) 

230 if xname == 'energy' and not hasattr(self, 'angle'): 

231 energy_ev = self.energy 

232 if xunits.lower() == 'kev': 

233 energy_ev = 1000. * energy_ev 

234 self.angle = RAD2DEG * arcsin(omega/energy_ev) 

235 elif xname == 'angle' and not hasattr(self, 'energy'): 

236 angle_rad = self.angle 

237 if xunits.lower() in ('deg', 'degrees'): 

238 angle_rad = angle_rad / RAD2DEG 

239 self.energy = omega/sin(angle_rad) 

240 

241 if hasattr(self, 'i0'): 

242 if hasattr(self, 'itrans') and not hasattr(self, 'mutrans'): 

243 self.mutrans = -log(self.itrans / (self.i0+1.e-12)) 

244 elif hasattr(self, 'mutrans') and not hasattr(self, 'itrans'): 

245 self.itrans = self.i0 * exp(-self.mutrans) 

246 if hasattr(self, 'ifluor') and not hasattr(self, 'mufluor'): 

247 self.mufluor = self.ifluor/(self.i0+1.e-12) 

248 

249 elif hasattr(self, 'mufluor') and not hasattr(self, 'ifluor'): 

250 self.ifluor = self.i0 * self.mufluor 

251 

252 if hasattr(self, 'itrans'): 

253 if hasattr(self, 'irefer') and not hasattr(self, 'murefer'): 

254 self.murefer = -log(self.irefer / (self.itrans+1.e-12)) 

255 

256 elif hasattr(self, 'murefer') and not hasattr(self, 'irefer'): 

257 self.irefer = self.itrans * exp(-self.murefer) 

258 

259 

260def read_xdi(filename, labels=None): 

261 """read an XDI File into a Group 

262 

263 Arguments: 

264 filename (str): name of file to read 

265 labels (str or None): string to use for setting array names [None] 

266 

267 Returns: 

268 Group 

269 

270 A data group containing data read from file, with XDI attributes and 

271 conventions. 

272 

273 Notes: 

274 1. See https://github.com/XraySpectrscopy/XAS-Data-Interchange 

275 

276 2. if `labels` is `None` (default), the names of the output arrays 

277 will be determined from the XDI column label in the XDI header. 

278 To override these array names, use a string with space or comma 

279 separating names for the arrays. 

280 

281 

282 Example: 

283 >>> from larch.io import xdi 

284 >>> fe3_data = read_xdi('FeXAFS_Fe2O3.001') 

285 >>> print(fe3_data.array_labels) 

286 ['energy', 'mutrans', 'i0'] 

287 

288 >>> fec3 = read_xdi('fe3c_rt.xdi', labels='e, x, y') 

289 >>> print(fec3.array_labels) 

290 ['e', 'x', 'y'] 

291 

292 See Also: 

293 read_ascii 

294 

295 """ 

296 xdif = XDIFile(filename, labels=labels) 

297 group = Group() 

298 for key, val in xdif.__dict__.items(): 

299 if not key.startswith('_'): 

300 if key in string_attrs: 

301 val = tostr(val) 

302 setattr(group, key, val) 

303 group.__name__ ='XDI file %s' % filename 

304 doc = ['%i arrays, %i npts' % (xdif.narrays, xdif.npts)] 

305 arr_labels = getattr(xdif, 'array_labels', None) 

306 if arr_labels is not None: 

307 doc.append("Array Labels: %s" % repr(arr_labels)) 

308 group.__doc__ = '\n'.join(doc) 

309 

310 group.path = filename 

311 path, fname = os.path.split(filename) 

312 group.filename = fname 

313 return group