Coverage for larch/io/specfile_reader.py: 11%
552 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""Utility wrapper for h5py-like API to Spec files
4===================================================
6This is a wrapper on top of `silx.io.open` to read Spec_ files via an HDF5-like API.
8.. _SPEC: http://www.certif.com/content/spec
10Requirements
11------------
12- silx (http://www.silx.org/doc/silx/latest/modules/io/spech5.html)
13"""
15__author__ = ["Mauro Rovezzi", "Matt Newville"]
16__version__ = "2024.1"
18import os
19import copy
20import datetime
21import six
22import collections
23import numpy as np
24import h5py
25from silx.io.utils import open as silx_open
26from silx.io.h5py_utils import File as silx_h5py_file
27from silx.io.convert import write_to_h5
29# from scipy.interpolate import interp1d
30# from scipy.ndimage import map_coordinates
31# from larch.math.utils import savitzky_golay
32from larch import Group
33from larch.utils.strutils import bytes2str
34from larch.math.normalization import norm1D
35from larch.math.deglitch import remove_spikes_medfilt1d
37#: Python 3.8+ compatibility
38try:
39 collectionsAbc = collections.abc
40except Exception:
41 collectionsAbc = collections
43# UTILITIES (the class is below!)
46def _str2rng(rngstr, keeporder=True, rebin=None):
47 """simple utility to convert a generic string representing a compact
48 list of scans to a (sorted) list of integers
50 Parameters
51 ----------
52 rngstr : string
53 with given syntax (see Example below)
54 keeporder : boolean [True]
55 to keep the original order
56 keeporder=False turn into a sorted list
57 rebin : integer [None]
58 force rebinning of the final range
60 Example
61 -------
62 > _str2rng('100, 7:9, 130:140:5, 14, 16:18:1')
63 > [7, 8, 9, 14, 16, 17, 18, 100, 130, 135, 140]
65 the string can also have file index prefix
67 > _str2rng('00019/100, 7:9, 130:140:5, 14, 16:18:1')
68 > ('0019', [7, 8, 9, 14, 16, 17, 18, 100, 130, 135, 140])
70 """
72 try:
73 file_idx, scan_str = rngstr.split("/")
74 return file_idx, _str2rng(scan_str)
75 except Exception:
76 pass
77 _rng = []
78 for _r in rngstr.split(", "): # the space is important!
79 if len(_r.split(",")) > 1:
80 raise NameError("Space after comma(s) is missing in '{0}'".format(_r))
81 _rsplit2 = _r.split(":")
82 if len(_rsplit2) == 1:
83 _rng.append(_r)
84 elif len(_rsplit2) == 2 or len(_rsplit2) == 3:
85 if len(_rsplit2) == 2:
86 _rsplit2.append("1")
87 if _rsplit2[0] == _rsplit2[1]:
88 raise NameError("Wrong range '{0}' in string '{1}'".format(_r, rngstr))
89 if int(_rsplit2[0]) > int(_rsplit2[1]):
90 raise NameError("Wrong range '{0}' in string '{1}'".format(_r, rngstr))
91 _rng.extend(range(int(_rsplit2[0]), int(_rsplit2[1]) + 1, int(_rsplit2[2])))
92 else:
93 raise NameError("Too many colon in {0}".format(_r))
95 # create the list and return it (removing the duplicates)
96 _rngout = [int(x) for x in _rng]
98 if rebin is not None:
99 try:
100 _rngout = _rngout[:: int(rebin)]
101 except Exception:
102 raise NameError("Wrong rebin={0}".format(int(rebin)))
104 def uniquify(seq):
105 # Order preserving uniquifier by Dave Kirby
106 seen = set()
107 return [x for x in seq if x not in seen and not seen.add(x)]
109 if keeporder:
110 return uniquify(_rngout)
111 else:
112 return list(set(_rngout))
115def _mot2array(motor, acopy):
116 """simple utility to generate a copy of an array containing a
117 constant value (e.g. motor position)
119 """
120 a = np.ones_like(acopy)
121 return np.multiply(a, motor)
124def _make_dlist(dall, rep=1):
125 """make a list of strings representing the scans to average
127 Parameters
128 ----------
129 dall : list of all good scans
130 rep : int, repetition
132 Returns
133 -------
134 dlist : list of lists of int
136 """
137 dlist = [[] for d in range(rep)]
138 for idx in range(rep):
139 dlist[idx] = dall[idx::rep]
140 return dlist
143def is_specfile(filename, require_multiple_scans=True):
144 """tests whether file may be a Specfile (text or HDF5)
146 Parameters
147 ----------
148 require_multiple_scans: bool [True]
149 for Text-based scans, return True only if the file contains
150 multiple scans.
152 """
153 if not os.path.exists(filename):
154 return False
155 with open(filename, "rb") as fh:
156 topbytes = fh.read(10)
158 is_hdf5 = topbytes.startswith(b"\x89HDF\r") # HDF5
159 is_text_one = topbytes.startswith(b"#S ") # partial Spec file (1 scan)
160 is_text_full = topbytes.startswith(b"#F ") # Full Spec File
162 if not (is_hdf5 or is_text_full or is_text_one) or (
163 is_text_one and require_multiple_scans
164 ):
165 return False
167 try:
168 scans = DataSourceSpecH5(filename)._scans
169 except Exception:
170 return False
172 if is_text_full and require_multiple_scans and len(scans) < 2:
173 return False
174 return True
177def update_nested(d, u):
178 """Update a nested dictionary
180 From: https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth
181 """
182 for k, v in six.iteritems(u):
183 dv = d.get(k, {})
184 if not isinstance(dv, collectionsAbc.Mapping):
185 d[k] = v
186 elif isinstance(v, collectionsAbc.Mapping):
187 d[k] = update_nested(dv, v)
188 else:
189 d[k] = v
190 return d
193def _atoi(text):
194 return int(text) if text.isdigit() else text
197def natural_keys(text):
198 """
199 FROM: https://stackoverflow.com/questions/5967500/how-to-correctly-sort-a-string-with-a-number-inside
201 alist.sort(key=natural_keys) sorts in human order
202 http://nedbatchelder.com/blog/200712/human_sorting.html
203 (See Toothy's implementation in the comments)
205 Usage
206 -----
208 alist=[
209 "something1",
210 "something12",
211 "something17",
212 "something2",
213 "something25",
214 "something29"]
216 alist.sort(key=natural_keys)
217 print(alist)
219 """
220 import re
222 return [_atoi(c) for c in re.split(r"(\d+)", text)]
225# ==================================================================
226# CLASS BASED ON SPECH5 (CURRENT/RECOMMENDED)
227# ==================================================================
228class DataSourceSpecH5(object):
229 """Data source utility wrapper for a Spec/BLISS file read as HDF5 object
230 via silx.io.open"""
232 _file_types = ("Spec", "HDF5")
234 def __init__(self, fname=None, logger=None, urls_fmt="silx", verbose=False):
235 """init with file name and default attributes
237 Parameters
238 ----------
239 fname : str
240 path string of a file that can be read by silx.io.open() [None]
241 logger : logging.getLogger() instance
242 [None -> larch.utils.logging.getLogger()]
243 urls_fmt : str
244 how the data are organized in the HDF5 container
245 'silx' : default
246 'spec2nexus' : as converted by spec2nexus
247 verbose : bool [False]
248 if True it lowers the logger level to INFO
249 if 'debug', it lowers the logger level to DEBUG (for testing)
250 othewise WARNING by default
251 """
252 if logger is None:
253 from larch.utils.logging import getLogger
255 _logger_name = "DataSourceSpecH5"
256 self._logger = getLogger(_logger_name, level="WARNING")
257 else:
258 self._logger = logger
260 if verbose:
261 self._logger.setLevel("INFO")
263 if isinstance(verbose, str) and verbose.lower() == "debug":
264 self._logger.setLevel("DEBUG")
266 self._fname = fname
267 self._fn = self._fname
268 self._sourcefile = None
269 self._sourcefile_type = None
270 self._scans = None
271 self._scans_names = None
272 self._scan_n = None
273 self._scan_str = None
275 self._scan_kws = { # to get data from scan
276 "ax_name": None,
277 "to_energy": None,
278 "sig_name": None,
279 "mon": None,
280 "deglitch": None,
281 "norm": None,
282 }
283 self._scangroup = None # ScanGroup
285 self._mots_url = "instrument/positioners"
286 self._cnts_url = "measurement"
287 self._title_url = "title"
288 self._time_start_url = "start_time"
289 self._time_end_url = "end_time"
290 self._sample_url = "sample/name"
291 self._plotcnts_url = "plotselect"
292 self._scan_header_url = "instrument/specfile/scan_header"
293 self._file_header_url = "instrument/specfile/file_header"
294 self._urls_fmt = "silx"
296 if urls_fmt == "spec2nexus":
297 self._mots_url = "positioners"
298 self._cnts_url = "data"
299 self._title_url = "title"
300 self._urls_fmt = "spec2nexus"
301 elif urls_fmt != "silx":
302 self._urls_fmt = None
303 self._logger.error("'urls_fmt' not understood")
304 self.set_group()
306 if self._fname is not None:
307 self._init_source_file()
309 def __enter__(self):
310 """enter method for with statement"""
311 if h5py.is_hdf5(self._fname):
312 self._sourcefile = silx_h5py_file(self._fname, mode="r")
313 else:
314 self._sourcefile = silx_open(self._fname)
315 return self
317 def __exit__(self):
318 """exit method for with statement"""
319 self.close()
320 return self
322 def _init_source_file(self):
323 """init source file object"""
324 #: source file object (h5py-like)
325 if not os.path.exists(self._fname):
326 _errmsg = f"{self._fname} does not exist"
327 self._logger.error(_errmsg)
328 raise FileNotFoundError(_errmsg)
329 try:
330 if h5py.is_hdf5(self._fname):
331 self._sourcefile = silx_h5py_file(self._fname, mode="r")
332 self._logger.debug("HDF5 open with silx.io.h5py_utils")
333 else:
334 self._sourcefile = silx_open(self._fname)
335 for ft in self._file_types:
336 if ft in str(self._sourcefile):
337 self._sourcefile_type = ft
338 self._scans = self.get_scans()
339 self._scans_names = [scn[0] for scn in self._scans]
340 try:
341 _iscn = 0
342 self.set_scan(self._scans[_iscn][0]) # set the first scan at init
343 while len(self.get_counters()) == 1:
344 self._logger.warning(
345 f"not enough data in scan {_iscn+1} '{self.get_title()}'"
346 )
347 _iscn += 1
348 self.set_scan(self._scans[_iscn][0])
349 except Exception as e:
350 self._logger.error(e)
351 #self.close()
352 except OSError:
353 _errmsg = f"cannot open {self._fname}"
354 self._logger.error(_errmsg)
355 raise OSError(_errmsg)
356 try:
357 self._fn = self._fname.split(os.sep)[-1]
358 except Exception:
359 self._logger.debug(f"cannot split {self._fname}")
360 pass
362 def open(self, mode="r"):
363 """Open the source file object with h5py in given mode"""
364 try:
365 if h5py.is_hdf5(self._fname):
366 self._sourcefile = silx_h5py_file(self._fname, mode)
367 else:
368 _errmsg = f"{self._fname} is not HDF5 file"
369 self._logger.error(_errmsg)
370 raise ValueError(_errmsg)
371 except OSError:
372 _errmsg = f"cannot open {self._fname}"
373 self._logger.error(_errmsg)
374 raise OSError(_errmsg)
376 def close(self):
377 """Close the source file"""
378 self._sourcefile.close()
379 self._sourcefile = None
381 def get_scangroup(self, scan=None):
382 """get current scan group
384 Parameters
385 ----------
386 scan : str, int, or None
387 scan address
388 """
389 if scan is not None:
390 self.set_scan(scan)
391 if self._scangroup is None:
392 raise AttributeError(
393 "Group/Scan not selected -> use 'self.set_scan()' first"
394 )
395 else:
396 return self._scangroup
398 def set_group(self, group_url=None):
399 """Select group url
401 Parameters
402 ----------
403 group_url : str (optional)
404 hdf5 url with respect to / where scans are stored [None -> /scans]
406 Returns
407 -------
408 none: sets attribute self._group_url
409 """
410 self._group_url = group_url
411 if self._group_url is not None:
412 self._logger.info(f"selected group {self._group_url}")
414 def set_scan(self, scan, scan_idx=1, group_url=None, scan_kws=None):
415 """Select a given scan
417 Parameters
418 ----------
419 scan : int or str
420 scan number or name
421 scan_idx : int (optional)
422 scan repetition index [1]
423 group_url : str
424 hdf5 url with respect to / where scans are stored [None -> /scans]
425 scan_kws : None or dict
426 additional keyword arguments used to get data from scan
428 Returns
429 -------
430 none: set attributes
431 self._scan_n, self._scan_str, self._scan_url, self._scangroup
432 """
433 if scan_kws is not None:
434 self._scan_kws = update_nested(self._scan_kws, scan_kws)
436 if isinstance(scan, int):
437 scn = f"{scan}_"
438 for slist in self._scans:
439 sl0 = slist[0]
440 if scn in sl0.lower():
441 self._logger.debug(f"scan '{scan}' -> '{sl0}'")
442 scan = sl0
443 break
445 if scan in self._scans_names:
446 self._scan_str = scan
447 self._scan_n = self._scans_names.index(scan)
448 else:
449 scan_n = scan
450 if isinstance(scan, str):
451 scan_split = scan.split(".")
452 scan_n = scan_split[0]
453 try:
454 scan_idx = scan_split[1]
455 except IndexError:
456 self._logger.warning("'scan_idx' kept at 1")
457 pass
458 try:
459 scan_n = int(scan_n)
460 scan_idx = int(scan_idx)
461 except ValueError:
462 _errmsg = "scan not selected, wrong 'scan' parameter!"
463 self._logger.error(_errmsg)
464 raise ValueError(_errmsg)
465 assert isinstance(scan_n, int), "'scan_n' must be an integer"
466 assert isinstance(scan_idx, int), "'scan_idx' must be an integer"
467 self._scan_n = scan_n
468 if self._urls_fmt == "silx":
469 self._scan_str = f"{scan_n}.{scan_idx}"
470 elif self._urls_fmt == "spec2nexus":
471 self._scan_str = f"S{scan_n}"
472 else:
473 _errmsg = "wrong 'urls_fmt'"
474 self._logger.error(_errmsg)
475 raise ValueError(_errmsg)
476 if group_url is not None:
477 self.set_group(group_url)
478 if self._group_url is not None:
479 self._scan_url = f"{self._group_url}/{self._scan_str}"
480 else:
481 self._scan_url = f"{self._scan_str}"
482 try:
483 self._scangroup = self._sourcefile[self._scan_url]
484 self._scan_title = self.get_title()
485 self._scan_start = self.get_time()
486 self._logger.info(
487 f"selected scan '{self._scan_url}' | '{self._scan_title}' | '{self._scan_start}'"
488 )
489 except KeyError:
490 self._scangroup = None
491 self._scan_title = None
492 _errmsg = f"'{self._scan_url}' is not valid"
493 self._logger.error(_errmsg)
494 raise KeyError(_errmsg)
496 def _list_from_url(self, url_str):
497 """Utility method to get a list from a scan url
499 .. warning:: the list is **not ordered**
501 """
502 try:
503 return [i for i in self.get_scangroup()[url_str].keys()]
504 except Exception:
505 _errmsg = f"[{self._fn}//{self._scan_n}] '{url_str}' not found"
506 self._logger.error(_errmsg)
507 #raise ValueError(_errmsg)
509 # ================== #
510 #: READ DATA METHODS
511 # ================== #
513 def _repr_html_(self):
514 """HTML representation for Jupyter notebook"""
516 scns = self.get_scans()
517 html = ["<table>"]
518 html.append("<tr>")
519 html.append("<td><b>Scan</b></td>")
520 html.append("<td><b>Title</b></td>")
521 html.append("<td><b>Start_time</b></td>")
522 html.append("</tr>")
523 for scn, tlt, sct in scns:
524 html.append("<tr>")
525 html.append(f"<td>{scn}</td>")
526 html.append(f"<td>{tlt}</td>")
527 html.append(f"<td>{sct}</td>")
528 html.append("</tr>")
529 html.append("</table>")
530 return "".join(html)
532 def get_scans(self):
533 """Get list of scans
535 Returns
536 -------
537 list of strings: [['scan.n', 'title', 'start_time'], ... ]
538 """
539 allscans = []
540 for sn in self._sourcefile["/"].keys():
541 try:
542 sg = self._sourcefile[sn]
543 except KeyError:
544 continue # broken HDF5 link
545 try:
546 allscans.append(
547 [
548 sn,
549 bytes2str(sg[self._title_url][()]),
550 bytes2str(sg[self._time_start_url][()]),
551 ]
552 )
553 except KeyError:
554 self._logger.error(f"'{sn}' is a datagroup!")
555 # go one level below and try take first dataset only
556 dt0 = list(sg.keys())[0]
557 sgg = sg[dt0]
558 try:
559 scname = f"{sn}/{dt0}"
560 allscans.append(
561 [
562 scname,
563 bytes2str(sgg[self._title_url][()]),
564 bytes2str(sgg[self._time_start_url][()]),
565 ]
566 )
567 except Exception:
568 self._logger.error(
569 f"{scname} does not have standard title/time URLs"
570 )
571 allscans.append([None, None, None])
573 # sort scan in natural/human order
574 allscans.sort(key=lambda row: natural_keys(row[0]))
576 return allscans
578 def get_motors(self):
579 """Get list of all available motors names"""
580 return self._list_from_url(self._mots_url)
582 def get_scan_motors(self):
583 """Get list of motors names actually used in the scan"""
584 all_motors = self._list_from_url(self._mots_url)
585 counters = self._list_from_url(self._cnts_url)
586 return [i for i in counters if i in all_motors]
588 def get_counters(self, remove_motors=False):
589 """Get list of counters names
591 Parameters
592 ----------
593 remove_motors: bool [False]
594 whether to remove counters that would also be in the motors list
595 """
596 counters = self._list_from_url(self._cnts_url)
597 if remove_motors:
598 motors = self._list_from_url(self._mots_url)
599 counters = [i for i in counters if i not in motors]
600 return counters
602 def get_title(self):
603 """Get title str for the current scan
605 Returns
606 -------
607 title (str): scan title self._scangroup[self._title_url][()]
608 """
609 sg = self.get_scangroup()
610 return bytes2str(sg[self._title_url][()])
612 def get_time(self):
613 """Get start time str for the current scan
615 Returns
616 -------
617 start_time (str): scan start time self._scangroup[self._time_start_url][()]
618 """
619 sg = self.get_scangroup()
620 return bytes2str(sg[self._time_start_url][()])
622 def get_timestamp(self):
623 """Get timestamp from the current scan"""
624 dt = np.datetime64(self.get_time())
625 return dt.astype(datetime.datetime).timestamp()
627 def get_scan_info_from_title(self):
628 """Parser to get scan information from title
630 Known types of scans
631 --------------------
632 Generic: <scan_type> <scan_axis> <start> <end> <npoints> <counting_time>
633 'Escan' (ESRF BM30/BM16 Spec -> Energy)
634 'Emiscan' (ESRF BM30/BM16 Spec -> Emi_Energy)
635 'fscan' (ESRF ID26 Spec -> mono_energy)
636 'contscan.motor' (ESRF ID24-DCM BLISS 2023-06 -> energy_enc)
637 'contscan.EnergyCont' (ESRF BM16 BLISS 2023-09 -> energy_enc)
638 'scans.exafs*' (ESRF BM23 BLISS 2023-06 -> energy_cenc)
640 Returns
641 -------
642 iscn : dict of str
643 {
644 scan_type : "type of scan",
645 scan_axis : "scanned axis",
646 scan_start : "",
647 scan_end : "",
648 scan_pts : "",
649 scan_ct : "",
650 scan_info : ""
651 }
652 """
653 iscn = dict(
654 scan_type=None,
655 scan_axis=None,
656 scan_start=None,
657 scan_end=None,
658 scan_pts=None,
659 scan_ct=None,
660 scan_info=None,
661 )
663 _title = self.get_title()
664 if isinstance(_title, np.ndarray):
665 _title = np.char.decode(_title)[0]
666 _title_splitted = [s for s in _title.split(" ") if not s == ""]
667 _scntype = _title_splitted[0]
668 iscn.update(dict(scan_type=_scntype))
669 try:
670 iscn.update(
671 dict(
672 scan_axis=_title_splitted[1],
673 scan_start=_title_splitted[2],
674 scan_end=_title_splitted[3],
675 scan_pts=_title_splitted[4],
676 scan_ct=_title_splitted[5],
677 )
678 )
679 except IndexError:
680 try:
681 iscn.update(
682 dict(
683 scan_start=_title_splitted[1],
684 scan_end=_title_splitted[2],
685 scan_pts=_title_splitted[3],
686 scan_ct=_title_splitted[4],
687 )
688 )
689 except IndexError:
690 pass
692 # === CUSTOM SCANS -> TODO(move to NeXus)
693 if _scntype == "Escan":
694 iscn.update(dict(scan_axis="Energy"))
695 iscn.update(dict(scan_info="ESRF/BM30-BM16 Energy scans with Spec"))
696 if _scntype == "Emiscan":
697 iscn.update(dict(scan_axis="Emi_Energy"))
698 iscn.update(dict(scan_info="ESRF/BM30-BM16 emission scans with Spec"))
699 if _scntype == "fscan":
700 iscn.update(dict(scan_axis="mono_energy"))
701 iscn.update(dict(scan_info="ESRF/ID26 fscan"))
702 if "scans.exafs" in _scntype:
703 iscn.update(dict(scan_axis="energy_cenc"))
704 iscn.update(dict(scan_info="ESRF/BM23 BLISS 2023-June"))
705 if _scntype == "contscan.motor":
706 iscn.update(dict(scan_axis="energy_enc"))
707 iscn.update(dict(scan_info="ESRF/ID24-DCM BLISS 2023-June"))
708 if _scntype == "contscan.EnergyCont":
709 iscn.update(dict(scan_axis="energy_enc"))
710 iscn.update(dict(scan_info="ESRF/BM16 BLISS 2023-Sept"))
711 if _scntype == "trigscan":
712 iscn.update(dict(scan_axis="energy_enc"))
713 iscn.update(dict(scan_info="ESRF/BLISS 2023-Dec"))
714 return iscn
716 def get_scan_axis(self):
717 """Get the name of the scanned axis from scan title"""
718 iscn = self.get_scan_info_from_title()
719 _axisout = iscn["scan_axis"]
720 _mots, _cnts = self.get_motors(), self.get_counters()
721 if not (_axisout in _mots):
722 self._logger.debug(f"'{_axisout}' not in (real) motors")
723 if not (_axisout in _cnts):
724 self._logger.debug(f"'{_axisout}' not in counters")
725 _axisout = _cnts[0]
726 self._logger.info(f"using the first counter: '{_axisout}'")
727 return _axisout
729 def get_array(self, cnt=0):
730 """Get array of a given counter
732 Parameters
733 ----------
734 cnt : str or int
735 counter name or index in the list of counters
737 Returns
738 -------
739 array
740 """
741 sg = self.get_scangroup()
742 cnts = self.get_counters()
743 if type(cnt) is int:
744 cnt = cnts[cnt]
745 self._logger.info("Selected counter %s", cnt)
746 if cnt in cnts:
747 sel_cnt = f"{self._cnts_url}/{cnt}"
748 return copy.deepcopy(sg[sel_cnt][()])
749 else:
750 errmsg = f"[{self._fn}//{self._scan_n}] '{cnt}' not found in available counters"
751 self._logger.error(errmsg)
752 raise ValueError(errmsg)
754 def get_motor_position(self, mot):
755 """Get motor position
757 Parameters
758 ----------
759 mot : str or int
760 motor name or index in the list of motors
762 Returns
763 -------
764 value
765 """
766 sg = self.get_scangroup()
767 mots = self.get_motors()
768 if type(mot) is int:
769 mot = mots[mot]
770 self._logger.info(f"Selected motor '{mot}'")
771 if mot in mots:
772 sel_mot = f"{self._mots_url}/{mot}"
773 return copy.deepcopy(sg[sel_mot][()])
774 else:
775 self._logger.error(f"[{self._fn}//{self._scan_n}] '{mot}' not found in available motors")
776 return None
778 def get_scan(self, scan=None, datatype=None):
779 """Get Larch group for the current scan
781 Parameters
782 ----------
783 scan : str, int, or None
784 scan address
785 datatype : str
786 type of data, e.g. 'raw', 'xas'
788 Returns
789 -------
790 larch Group with scan data
791 """
792 scan_group = self.get_scangroup(scan)
793 scan_index = self._scan_n
794 scan_name = self._scan_str
795 all_labels = self.get_counters()
796 motor_names = self.get_scan_motors()
797 title = self.get_title()
798 timestring = self.get_time()
799 timestamp = self.get_timestamp()
800 path, filename = os.path.split(self._fname)
801 axis = self.get_scan_axis()
802 array_labels = [axis]
803 array_labels.extend([i for i in motor_names if i not in array_labels])
804 array_labels.extend([i for i in all_labels if i not in array_labels])
806 scan_header = list(scan_group.get(self._scan_header_url, []))
807 file_header = list(scan_group.get(self._file_header_url, []))
808 file_type = self._sourcefile_type
809 header = []
810 for scanh in scan_header:
811 if scanh.startswith("#CXDI "):
812 header.append(scanh[6:].strip())
813 out = Group(
814 __name__=f"{file_type} file: {filename}, scan: {scan_name}",
815 path=path,
816 filename=filename,
817 datatype=datatype,
818 array_labels=array_labels,
819 motor_names=motor_names,
820 axis=axis,
821 scan_index=scan_index,
822 scan_name=scan_name,
823 title=title,
824 header=header,
825 scan_header=scan_header,
826 file_header=file_header,
827 timestring=timestring,
828 timestamp=timestamp,
829 )
831 arr_axis = self.get_array(axis).astype(np.float64)
832 axis_size = arr_axis.size
833 data = [arr_axis]
834 ptsdiffs = [0]
835 pop_labels = []
836 self._logger.debug(f"X array (=scan axis): `{axis}` (size: {axis_size})")
837 self._logger.debug("Y arrays >>> loading all arrays in array_labels (check size match with scan axis) <<<")
838 for label in array_labels[1:]: #: avoid loading twice arr_axis
839 arr = self.get_array(label).astype(np.float64)
840 ptsdiff = axis_size - arr.size
841 self._logger.debug(f"`{label}` ({arr.size}) -> {abs(ptsdiff)}")
842 if abs(ptsdiff) > 10 or ptsdiff < 0:
843 ipop = array_labels.index(label)
844 pop_labels.append(array_labels.pop(ipop))
845 else:
846 ptsdiffs.append(ptsdiff)
847 data.append(arr)
848 setattr(out, label, arr)
849 assert len(array_labels) == len(data) == len(ptsdiffs), "length of array_labels and data do not match"
850 if len(pop_labels):
851 self._logger.info(f"Y arrays >>> not loaded: `{pop_labels}` [excessive size mismatch with `{axis}`]")
852 #: in case of array shape mismatch strip last points
853 ptsdiff_max = max(ptsdiffs)
854 if ptsdiff_max > 0:
855 out_size = axis_size - ptsdiff_max
856 for iarr, (lab, arr) in enumerate(zip(array_labels, data)):
857 ptsdiff = axis_size - arr.size
858 arr = arr[:out_size]
859 data[iarr] = arr
860 setattr(out, lab, arr)
861 self._logger.info(f"Y arrays >>> removed {ptsdiff_max} last points")
862 out.data = np.array(data)
863 return out
865 def get_axis_data(self, ax_name=None, to_energy=None):
866 """Get data for the scan axis
868 Description
869 -----------
870 This method returns the data (=label and array) for a given axis of the
871 selected scan. It is primarily targeted to a "scanning" axis, but any
872 counter can be used. It is possible to control common conversions, like
873 Bragg angle to energy.
875 Parameters
876 ----------
877 ax_name : str or None
879 to_energy : dict
880 Controls the conversion of the signal to energy [None]
882 .. note:: Bragg angle assumed in mrad, output in eV
884 {
885 "bragg_ax": "str", #: name of counter used for Bragg angale
886 "bragg_ax_type": "str", #: 'motor' or 'counter'
887 "bragg_enc_units": float, #: units to convert encoder to deg (bragg_ax should contain 'enc')
888 }
890 Returns
891 -------
892 label, data
893 """
894 if (ax_name is not None) and (ax_name not in self.get_counters()):
895 self._logger.error("%s not a counter", ax_name)
896 return None, None
897 ax_label = ax_name or self.get_scan_axis()
898 ax_data = self.get_array(ax_label)
899 if to_energy is not None:
900 try:
901 from sloth.utils.bragg import ang2kev
902 except ImportError:
904 def ang2kev(theta, d):
905 from larch.utils.physical_constants import PLANCK_HC
907 theta = np.deg2rad(theta)
908 wlen = 2 * d * np.sin(theta)
909 return (PLANCK_HC / wlen) / 1000.0
911 bragg_ax = to_energy["bragg_ax"]
912 bragg_ax_type = to_energy["bragg_ax_type"]
913 bragg_d = to_energy["bragg_d"]
914 if bragg_ax_type == "counter":
915 bragg_deg = self.get_array(bragg_ax).mean()
916 elif bragg_ax_type == "motor":
917 bragg_deg = self.get_value(bragg_ax)
918 else:
919 self._logger.error("wrong 'bragg_ax_type' (motor or counter?)")
920 if "enc" in bragg_ax:
921 bragg_deg = (np.abs(bragg_deg) / to_energy["bragg_enc_units"]) * 360
922 ax_abs_deg = bragg_deg + np.rad2deg(ax_data) / 1000.0
923 ax_abs_ev = ang2kev(ax_abs_deg, bragg_d) * 1000.0
924 ax_data = ax_abs_ev
925 ax_label += "_abs_ev"
926 self._logger.debug("Converted axis %s", ax_label)
927 xmin = ax_data.min()
928 xmax = ax_data.max()
929 self._logger.info("%s range: [%.3f, %.3f]", ax_label, xmin, xmax)
930 return ax_label, ax_data
932 def get_signal_data(self, sig_name, mon=None, deglitch=None, norm=None):
933 """Get data for the signal counter
935 Description
936 -----------
937 This method returns the data (=label and array) for a given signal of the
938 selected scan. It is possible to control normalization and/or deglitching.
940 Order followed in the basic processing:
941 - raw data
942 - divide by monitor signal (+ multiply back by average)
943 - deglitch
944 - norm
946 Parameters
947 ----------
948 sig_name : str
949 mon : dict
950 Controls the normalization of the signal by a monitor signal [None]
951 {
952 "monitor": "str", #: name of counter used for normalization
953 "cps": bool, #: multiply back to np.average(monitor)
954 }
955 deglitch : dict
956 Controls :func:`larch.math.deglitch.remove_spikes_medfilt1d` [None]
957 norm : dict
958 Controls the normalization by given method
960 Returns
961 -------
962 label, data
963 """
964 #: get raw data
965 sig_data = self.get_array(sig_name)
966 sig_label = sig_name
967 #: (opt) divide by monitor signal + multiply back by average
968 if mon is not None:
969 if isinstance(mon, str):
970 mon = dict(monitor=mon, cps=False)
971 mon_name = mon["monitor"]
972 mon_data = self.get_array(mon_name)
973 sig_data /= mon_data
974 sig_label += f"_mon({mon_name})"
975 if mon["cps"]:
976 sig_data *= np.average(mon_data) #: put back in counts
977 sig_label += "_cps"
978 #: (opt) deglitch
979 if deglitch is not None:
980 sig_data = remove_spikes_medfilt1d(sig_data, **deglitch)
981 sig_label += "_dgl"
982 #: (opt) normalization
983 if norm is not None:
984 norm_meth = norm["method"]
985 sig_data = norm1D(sig_data, norm=norm_meth, logger=self._logger)
986 if norm_meth is not None:
987 sig_label += f"_norm({norm_meth})"
988 self._logger.info("Loaded signal: %s", sig_label)
989 return sig_label, sig_data
991 def get_curve(
992 self,
993 sig_name,
994 ax_name=None,
995 to_energy=None,
996 mon=None,
997 deglitch=None,
998 norm=None,
999 **kws,
1000 ):
1001 """Get XY data (=curve) for current scan
1003 Parameters
1004 ----------
1005 *args, **kws -> self.get_axis_data() and self.get_signal_data()
1007 Returns
1008 -------
1009 [ax_data, sig_data, label, attrs] : list of [array, array, str, dict]
1011 """
1012 ax_label, ax_data = self.get_axis_data(ax_name=ax_name, to_energy=to_energy)
1013 sig_label, sig_data = self.get_signal_data(
1014 sig_name, mon=mon, deglitch=deglitch, norm=norm
1015 )
1016 label = f"S{self._scan_n}_X({ax_label})_Y{sig_label}"
1017 attrs = dict(
1018 xlabel=ax_label,
1019 ylabel=sig_label,
1020 label=label,
1021 ax_label=ax_label,
1022 sig_label=sig_label,
1023 )
1024 return [ax_data, sig_data, label, attrs]
1026 # =================== #
1027 #: WRITE DATA METHODS
1028 # =================== #
1030 def write_scans_to_h5(
1031 self,
1032 scans,
1033 fname_out,
1034 scans_groups=None,
1035 h5path=None,
1036 overwrite=False,
1037 conf_dict=None,
1038 ):
1039 """Export a selected list of scans to HDF5 file
1041 .. note:: This is a simple wrapper to
1042 :func:`silx.io.convert.write_to_h5`
1044 Parameters
1045 ----------
1046 scans : str, list of ints or list of lists (str/ints)
1047 scan numbers to export (parsed by _str2rng)
1048 if a list of lists, scans_groups is required
1049 fname_out : str
1050 output file name
1051 scans_groups : list of strings
1052 groups of scans
1053 h5path : str (optional)
1054 path inside HDF5 [None -> '/']
1055 overwrite : boolean (optional)
1056 force overwrite if the file exists [False]
1057 conf_dict : None or dict (optional)
1058 configuration dictionary saved as '{hdfpath}/.config'
1059 """
1060 self._fname_out = fname_out
1061 self._logger.info(f"output file: {self._fname_out}")
1062 if os.path.isfile(self._fname_out) and os.access(self._fname_out, os.R_OK):
1063 self._logger.info(f"output file exists (overwrite is {overwrite})")
1064 _fileExists = True
1065 else:
1066 _fileExists = False
1068 #: out hdf5 file
1069 if overwrite and _fileExists:
1070 os.remove(self._fname_out)
1071 h5out = h5py.File(self._fname_out, mode="a", track_order=True)
1073 #: h5path
1074 if h5path is None:
1075 h5path = "/"
1076 else:
1077 h5path += "/"
1079 #: write group configuration dictionary, if given
1080 if conf_dict is not None:
1081 from silx.io.dictdump import dicttoh5
1083 _h5path = f"{h5path}.config/"
1084 dicttoh5(
1085 conf_dict,
1086 h5out,
1087 h5path=_h5path,
1088 create_dataset_args=dict(track_order=True),
1089 )
1090 self._logger.info(f"written dictionary: {_h5path}")
1092 #: write scans
1093 def _loop_scans(scns, group=None):
1094 for scn in scns:
1095 self.set_scan(scn)
1096 _scangroup = self._scangroup
1097 if _scangroup is None:
1098 continue
1099 if group is not None:
1100 _h5path = f"{h5path}{group}/{self._scan_str}/"
1101 else:
1102 _h5path = f"{h5path}{self._scan_str}/"
1103 write_to_h5(
1104 _scangroup,
1105 h5out,
1106 h5path=_h5path,
1107 create_dataset_args=dict(track_order=True),
1108 )
1109 self._logger.info(f"written scan: {_h5path}")
1111 if type(scans) is list:
1112 assert type(scans_groups) is list, "'scans_groups' should be a list"
1113 assert len(scans) == len(
1114 scans_groups
1115 ), "'scans_groups' not matching 'scans'"
1116 for scns, group in zip(scans, scans_groups):
1117 _loop_scans(_str2rng(scns), group=group)
1118 else:
1119 _loop_scans(_str2rng(scans))
1121 #: close output file
1122 h5out.close()
1125def str2rng_larch(rngstr, keeporder=True):
1126 """larch equivalent of _str2rng()"""
1127 return _str2rng(rngstr, keeporder=keeporder)
1130str2rng_larch.__doc__ = _str2rng.__doc__
1133def open_specfile(filename):
1134 return DataSourceSpecH5(filename)
1137def read_specfile(filename, scan=None):
1138 """simple mapping of a Spec/BLISS file to a Larch group"""
1139 df = DataSourceSpecH5(filename)
1140 return df.get_scan(scan)