Coverage for larch/io/specfile_reader.py: 11%

552 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3"""Utility wrapper for h5py-like API to Spec files 

4=================================================== 

5 

6This is a wrapper on top of `silx.io.open` to read Spec_ files via an HDF5-like API. 

7 

8.. _SPEC: http://www.certif.com/content/spec 

9 

10Requirements 

11------------ 

12- silx (http://www.silx.org/doc/silx/latest/modules/io/spech5.html) 

13""" 

14 

15__author__ = ["Mauro Rovezzi", "Matt Newville"] 

16__version__ = "2024.1" 

17 

18import os 

19import copy 

20import datetime 

21import six 

22import collections 

23import numpy as np 

24import h5py 

25from silx.io.utils import open as silx_open 

26from silx.io.h5py_utils import File as silx_h5py_file 

27from silx.io.convert import write_to_h5 

28 

29# from scipy.interpolate import interp1d 

30# from scipy.ndimage import map_coordinates 

31# from larch.math.utils import savitzky_golay 

32from larch import Group 

33from larch.utils.strutils import bytes2str 

34from larch.math.normalization import norm1D 

35from larch.math.deglitch import remove_spikes_medfilt1d 

36 

37#: Python 3.8+ compatibility 

38try: 

39 collectionsAbc = collections.abc 

40except Exception: 

41 collectionsAbc = collections 

42 

43# UTILITIES (the class is below!) 

44 

45 

46def _str2rng(rngstr, keeporder=True, rebin=None): 

47 """simple utility to convert a generic string representing a compact 

48 list of scans to a (sorted) list of integers 

49 

50 Parameters 

51 ---------- 

52 rngstr : string 

53 with given syntax (see Example below) 

54 keeporder : boolean [True] 

55 to keep the original order 

56 keeporder=False turn into a sorted list 

57 rebin : integer [None] 

58 force rebinning of the final range 

59 

60 Example 

61 ------- 

62 > _str2rng('100, 7:9, 130:140:5, 14, 16:18:1') 

63 > [7, 8, 9, 14, 16, 17, 18, 100, 130, 135, 140] 

64 

65 the string can also have file index prefix  

66 

67 > _str2rng('00019/100, 7:9, 130:140:5, 14, 16:18:1') 

68 > ('0019', [7, 8, 9, 14, 16, 17, 18, 100, 130, 135, 140]) 

69 

70 """ 

71 

72 try: 

73 file_idx, scan_str = rngstr.split("/") 

74 return file_idx, _str2rng(scan_str) 

75 except Exception: 

76 pass 

77 _rng = [] 

78 for _r in rngstr.split(", "): # the space is important! 

79 if len(_r.split(",")) > 1: 

80 raise NameError("Space after comma(s) is missing in '{0}'".format(_r)) 

81 _rsplit2 = _r.split(":") 

82 if len(_rsplit2) == 1: 

83 _rng.append(_r) 

84 elif len(_rsplit2) == 2 or len(_rsplit2) == 3: 

85 if len(_rsplit2) == 2: 

86 _rsplit2.append("1") 

87 if _rsplit2[0] == _rsplit2[1]: 

88 raise NameError("Wrong range '{0}' in string '{1}'".format(_r, rngstr)) 

89 if int(_rsplit2[0]) > int(_rsplit2[1]): 

90 raise NameError("Wrong range '{0}' in string '{1}'".format(_r, rngstr)) 

91 _rng.extend(range(int(_rsplit2[0]), int(_rsplit2[1]) + 1, int(_rsplit2[2]))) 

92 else: 

93 raise NameError("Too many colon in {0}".format(_r)) 

94 

95 # create the list and return it (removing the duplicates) 

96 _rngout = [int(x) for x in _rng] 

97 

98 if rebin is not None: 

99 try: 

100 _rngout = _rngout[:: int(rebin)] 

101 except Exception: 

102 raise NameError("Wrong rebin={0}".format(int(rebin))) 

103 

104 def uniquify(seq): 

105 # Order preserving uniquifier by Dave Kirby 

106 seen = set() 

107 return [x for x in seq if x not in seen and not seen.add(x)] 

108 

109 if keeporder: 

110 return uniquify(_rngout) 

111 else: 

112 return list(set(_rngout)) 

113 

114 

115def _mot2array(motor, acopy): 

116 """simple utility to generate a copy of an array containing a 

117 constant value (e.g. motor position) 

118 

119 """ 

120 a = np.ones_like(acopy) 

121 return np.multiply(a, motor) 

122 

123 

124def _make_dlist(dall, rep=1): 

125 """make a list of strings representing the scans to average 

126 

127 Parameters 

128 ---------- 

129 dall : list of all good scans 

130 rep : int, repetition 

131 

132 Returns 

133 ------- 

134 dlist : list of lists of int 

135 

136 """ 

137 dlist = [[] for d in range(rep)] 

138 for idx in range(rep): 

139 dlist[idx] = dall[idx::rep] 

140 return dlist 

141 

142 

143def is_specfile(filename, require_multiple_scans=True): 

144 """tests whether file may be a Specfile (text or HDF5) 

145 

146 Parameters 

147 ---------- 

148 require_multiple_scans: bool [True] 

149 for Text-based scans, return True only if the file contains 

150 multiple scans. 

151 

152 """ 

153 if not os.path.exists(filename): 

154 return False 

155 with open(filename, "rb") as fh: 

156 topbytes = fh.read(10) 

157 

158 is_hdf5 = topbytes.startswith(b"\x89HDF\r") # HDF5 

159 is_text_one = topbytes.startswith(b"#S ") # partial Spec file (1 scan) 

160 is_text_full = topbytes.startswith(b"#F ") # Full Spec File 

161 

162 if not (is_hdf5 or is_text_full or is_text_one) or ( 

163 is_text_one and require_multiple_scans 

164 ): 

165 return False 

166 

167 try: 

168 scans = DataSourceSpecH5(filename)._scans 

169 except Exception: 

170 return False 

171 

172 if is_text_full and require_multiple_scans and len(scans) < 2: 

173 return False 

174 return True 

175 

176 

177def update_nested(d, u): 

178 """Update a nested dictionary 

179 

180 From: https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth 

181 """ 

182 for k, v in six.iteritems(u): 

183 dv = d.get(k, {}) 

184 if not isinstance(dv, collectionsAbc.Mapping): 

185 d[k] = v 

186 elif isinstance(v, collectionsAbc.Mapping): 

187 d[k] = update_nested(dv, v) 

188 else: 

189 d[k] = v 

190 return d 

191 

192 

193def _atoi(text): 

194 return int(text) if text.isdigit() else text 

195 

196 

197def natural_keys(text): 

198 """ 

199 FROM: https://stackoverflow.com/questions/5967500/how-to-correctly-sort-a-string-with-a-number-inside 

200 

201 alist.sort(key=natural_keys) sorts in human order 

202 http://nedbatchelder.com/blog/200712/human_sorting.html 

203 (See Toothy's implementation in the comments) 

204 

205 Usage 

206 ----- 

207 

208 alist=[ 

209 "something1", 

210 "something12", 

211 "something17", 

212 "something2", 

213 "something25", 

214 "something29"] 

215 

216 alist.sort(key=natural_keys) 

217 print(alist) 

218 

219 """ 

220 import re 

221 

222 return [_atoi(c) for c in re.split(r"(\d+)", text)] 

223 

224 

225# ================================================================== 

226# CLASS BASED ON SPECH5 (CURRENT/RECOMMENDED) 

227# ================================================================== 

228class DataSourceSpecH5(object): 

229 """Data source utility wrapper for a Spec/BLISS file read as HDF5 object 

230 via silx.io.open""" 

231 

232 _file_types = ("Spec", "HDF5") 

233 

234 def __init__(self, fname=None, logger=None, urls_fmt="silx", verbose=False): 

235 """init with file name and default attributes 

236 

237 Parameters 

238 ---------- 

239 fname : str 

240 path string of a file that can be read by silx.io.open() [None] 

241 logger : logging.getLogger() instance 

242 [None -> larch.utils.logging.getLogger()] 

243 urls_fmt : str 

244 how the data are organized in the HDF5 container 

245 'silx' : default 

246 'spec2nexus' : as converted by spec2nexus 

247 verbose : bool [False] 

248 if True it lowers the logger level to INFO 

249 if 'debug', it lowers the logger level to DEBUG (for testing) 

250 othewise WARNING by default 

251 """ 

252 if logger is None: 

253 from larch.utils.logging import getLogger 

254 

255 _logger_name = "DataSourceSpecH5" 

256 self._logger = getLogger(_logger_name, level="WARNING") 

257 else: 

258 self._logger = logger 

259 

260 if verbose: 

261 self._logger.setLevel("INFO") 

262 

263 if isinstance(verbose, str) and verbose.lower() == "debug": 

264 self._logger.setLevel("DEBUG") 

265 

266 self._fname = fname 

267 self._fn = self._fname 

268 self._sourcefile = None 

269 self._sourcefile_type = None 

270 self._scans = None 

271 self._scans_names = None 

272 self._scan_n = None 

273 self._scan_str = None 

274 

275 self._scan_kws = { # to get data from scan 

276 "ax_name": None, 

277 "to_energy": None, 

278 "sig_name": None, 

279 "mon": None, 

280 "deglitch": None, 

281 "norm": None, 

282 } 

283 self._scangroup = None # ScanGroup 

284 

285 self._mots_url = "instrument/positioners" 

286 self._cnts_url = "measurement" 

287 self._title_url = "title" 

288 self._time_start_url = "start_time" 

289 self._time_end_url = "end_time" 

290 self._sample_url = "sample/name" 

291 self._plotcnts_url = "plotselect" 

292 self._scan_header_url = "instrument/specfile/scan_header" 

293 self._file_header_url = "instrument/specfile/file_header" 

294 self._urls_fmt = "silx" 

295 

296 if urls_fmt == "spec2nexus": 

297 self._mots_url = "positioners" 

298 self._cnts_url = "data" 

299 self._title_url = "title" 

300 self._urls_fmt = "spec2nexus" 

301 elif urls_fmt != "silx": 

302 self._urls_fmt = None 

303 self._logger.error("'urls_fmt' not understood") 

304 self.set_group() 

305 

306 if self._fname is not None: 

307 self._init_source_file() 

308 

309 def __enter__(self): 

310 """enter method for with statement""" 

311 if h5py.is_hdf5(self._fname): 

312 self._sourcefile = silx_h5py_file(self._fname, mode="r") 

313 else: 

314 self._sourcefile = silx_open(self._fname) 

315 return self 

316 

317 def __exit__(self): 

318 """exit method for with statement""" 

319 self.close() 

320 return self 

321 

322 def _init_source_file(self): 

323 """init source file object""" 

324 #: source file object (h5py-like) 

325 if not os.path.exists(self._fname): 

326 _errmsg = f"{self._fname} does not exist" 

327 self._logger.error(_errmsg) 

328 raise FileNotFoundError(_errmsg) 

329 try: 

330 if h5py.is_hdf5(self._fname): 

331 self._sourcefile = silx_h5py_file(self._fname, mode="r") 

332 self._logger.debug("HDF5 open with silx.io.h5py_utils") 

333 else: 

334 self._sourcefile = silx_open(self._fname) 

335 for ft in self._file_types: 

336 if ft in str(self._sourcefile): 

337 self._sourcefile_type = ft 

338 self._scans = self.get_scans() 

339 self._scans_names = [scn[0] for scn in self._scans] 

340 try: 

341 _iscn = 0 

342 self.set_scan(self._scans[_iscn][0]) # set the first scan at init 

343 while len(self.get_counters()) == 1: 

344 self._logger.warning( 

345 f"not enough data in scan {_iscn+1} '{self.get_title()}'" 

346 ) 

347 _iscn += 1 

348 self.set_scan(self._scans[_iscn][0]) 

349 except Exception as e: 

350 self._logger.error(e) 

351 #self.close() 

352 except OSError: 

353 _errmsg = f"cannot open {self._fname}" 

354 self._logger.error(_errmsg) 

355 raise OSError(_errmsg) 

356 try: 

357 self._fn = self._fname.split(os.sep)[-1] 

358 except Exception: 

359 self._logger.debug(f"cannot split {self._fname}") 

360 pass 

361 

362 def open(self, mode="r"): 

363 """Open the source file object with h5py in given mode""" 

364 try: 

365 if h5py.is_hdf5(self._fname): 

366 self._sourcefile = silx_h5py_file(self._fname, mode) 

367 else: 

368 _errmsg = f"{self._fname} is not HDF5 file" 

369 self._logger.error(_errmsg) 

370 raise ValueError(_errmsg) 

371 except OSError: 

372 _errmsg = f"cannot open {self._fname}" 

373 self._logger.error(_errmsg) 

374 raise OSError(_errmsg) 

375 

376 def close(self): 

377 """Close the source file""" 

378 self._sourcefile.close() 

379 self._sourcefile = None 

380 

381 def get_scangroup(self, scan=None): 

382 """get current scan group 

383 

384 Parameters 

385 ---------- 

386 scan : str, int, or None 

387 scan address 

388 """ 

389 if scan is not None: 

390 self.set_scan(scan) 

391 if self._scangroup is None: 

392 raise AttributeError( 

393 "Group/Scan not selected -> use 'self.set_scan()' first" 

394 ) 

395 else: 

396 return self._scangroup 

397 

398 def set_group(self, group_url=None): 

399 """Select group url 

400 

401 Parameters 

402 ---------- 

403 group_url : str (optional) 

404 hdf5 url with respect to / where scans are stored [None -> /scans] 

405 

406 Returns 

407 ------- 

408 none: sets attribute self._group_url 

409 """ 

410 self._group_url = group_url 

411 if self._group_url is not None: 

412 self._logger.info(f"selected group {self._group_url}") 

413 

414 def set_scan(self, scan, scan_idx=1, group_url=None, scan_kws=None): 

415 """Select a given scan 

416 

417 Parameters 

418 ---------- 

419 scan : int or str 

420 scan number or name 

421 scan_idx : int (optional) 

422 scan repetition index [1] 

423 group_url : str 

424 hdf5 url with respect to / where scans are stored [None -> /scans] 

425 scan_kws : None or dict 

426 additional keyword arguments used to get data from scan 

427 

428 Returns 

429 ------- 

430 none: set attributes 

431 self._scan_n, self._scan_str, self._scan_url, self._scangroup 

432 """ 

433 if scan_kws is not None: 

434 self._scan_kws = update_nested(self._scan_kws, scan_kws) 

435 

436 if isinstance(scan, int): 

437 scn = f"{scan}_" 

438 for slist in self._scans: 

439 sl0 = slist[0] 

440 if scn in sl0.lower(): 

441 self._logger.debug(f"scan '{scan}' -> '{sl0}'") 

442 scan = sl0 

443 break 

444 

445 if scan in self._scans_names: 

446 self._scan_str = scan 

447 self._scan_n = self._scans_names.index(scan) 

448 else: 

449 scan_n = scan 

450 if isinstance(scan, str): 

451 scan_split = scan.split(".") 

452 scan_n = scan_split[0] 

453 try: 

454 scan_idx = scan_split[1] 

455 except IndexError: 

456 self._logger.warning("'scan_idx' kept at 1") 

457 pass 

458 try: 

459 scan_n = int(scan_n) 

460 scan_idx = int(scan_idx) 

461 except ValueError: 

462 _errmsg = "scan not selected, wrong 'scan' parameter!" 

463 self._logger.error(_errmsg) 

464 raise ValueError(_errmsg) 

465 assert isinstance(scan_n, int), "'scan_n' must be an integer" 

466 assert isinstance(scan_idx, int), "'scan_idx' must be an integer" 

467 self._scan_n = scan_n 

468 if self._urls_fmt == "silx": 

469 self._scan_str = f"{scan_n}.{scan_idx}" 

470 elif self._urls_fmt == "spec2nexus": 

471 self._scan_str = f"S{scan_n}" 

472 else: 

473 _errmsg = "wrong 'urls_fmt'" 

474 self._logger.error(_errmsg) 

475 raise ValueError(_errmsg) 

476 if group_url is not None: 

477 self.set_group(group_url) 

478 if self._group_url is not None: 

479 self._scan_url = f"{self._group_url}/{self._scan_str}" 

480 else: 

481 self._scan_url = f"{self._scan_str}" 

482 try: 

483 self._scangroup = self._sourcefile[self._scan_url] 

484 self._scan_title = self.get_title() 

485 self._scan_start = self.get_time() 

486 self._logger.info( 

487 f"selected scan '{self._scan_url}' | '{self._scan_title}' | '{self._scan_start}'" 

488 ) 

489 except KeyError: 

490 self._scangroup = None 

491 self._scan_title = None 

492 _errmsg = f"'{self._scan_url}' is not valid" 

493 self._logger.error(_errmsg) 

494 raise KeyError(_errmsg) 

495 

496 def _list_from_url(self, url_str): 

497 """Utility method to get a list from a scan url 

498 

499 .. warning:: the list is **not ordered** 

500 

501 """ 

502 try: 

503 return [i for i in self.get_scangroup()[url_str].keys()] 

504 except Exception: 

505 _errmsg = f"[{self._fn}//{self._scan_n}] '{url_str}' not found" 

506 self._logger.error(_errmsg) 

507 #raise ValueError(_errmsg) 

508 

509 # ================== # 

510 #: READ DATA METHODS 

511 # ================== # 

512 

513 def _repr_html_(self): 

514 """HTML representation for Jupyter notebook""" 

515 

516 scns = self.get_scans() 

517 html = ["<table>"] 

518 html.append("<tr>") 

519 html.append("<td><b>Scan</b></td>") 

520 html.append("<td><b>Title</b></td>") 

521 html.append("<td><b>Start_time</b></td>") 

522 html.append("</tr>") 

523 for scn, tlt, sct in scns: 

524 html.append("<tr>") 

525 html.append(f"<td>{scn}</td>") 

526 html.append(f"<td>{tlt}</td>") 

527 html.append(f"<td>{sct}</td>") 

528 html.append("</tr>") 

529 html.append("</table>") 

530 return "".join(html) 

531 

532 def get_scans(self): 

533 """Get list of scans 

534 

535 Returns 

536 ------- 

537 list of strings: [['scan.n', 'title', 'start_time'], ... ] 

538 """ 

539 allscans = [] 

540 for sn in self._sourcefile["/"].keys(): 

541 try: 

542 sg = self._sourcefile[sn] 

543 except KeyError: 

544 continue # broken HDF5 link 

545 try: 

546 allscans.append( 

547 [ 

548 sn, 

549 bytes2str(sg[self._title_url][()]), 

550 bytes2str(sg[self._time_start_url][()]), 

551 ] 

552 ) 

553 except KeyError: 

554 self._logger.error(f"'{sn}' is a datagroup!") 

555 # go one level below and try take first dataset only 

556 dt0 = list(sg.keys())[0] 

557 sgg = sg[dt0] 

558 try: 

559 scname = f"{sn}/{dt0}" 

560 allscans.append( 

561 [ 

562 scname, 

563 bytes2str(sgg[self._title_url][()]), 

564 bytes2str(sgg[self._time_start_url][()]), 

565 ] 

566 ) 

567 except Exception: 

568 self._logger.error( 

569 f"{scname} does not have standard title/time URLs" 

570 ) 

571 allscans.append([None, None, None]) 

572 

573 # sort scan in natural/human order 

574 allscans.sort(key=lambda row: natural_keys(row[0])) 

575 

576 return allscans 

577 

578 def get_motors(self): 

579 """Get list of all available motors names""" 

580 return self._list_from_url(self._mots_url) 

581 

582 def get_scan_motors(self): 

583 """Get list of motors names actually used in the scan""" 

584 all_motors = self._list_from_url(self._mots_url) 

585 counters = self._list_from_url(self._cnts_url) 

586 return [i for i in counters if i in all_motors] 

587 

588 def get_counters(self, remove_motors=False): 

589 """Get list of counters names 

590 

591 Parameters 

592 ---------- 

593 remove_motors: bool [False] 

594 whether to remove counters that would also be in the motors list 

595 """ 

596 counters = self._list_from_url(self._cnts_url) 

597 if remove_motors: 

598 motors = self._list_from_url(self._mots_url) 

599 counters = [i for i in counters if i not in motors] 

600 return counters 

601 

602 def get_title(self): 

603 """Get title str for the current scan 

604 

605 Returns 

606 ------- 

607 title (str): scan title self._scangroup[self._title_url][()] 

608 """ 

609 sg = self.get_scangroup() 

610 return bytes2str(sg[self._title_url][()]) 

611 

612 def get_time(self): 

613 """Get start time str for the current scan 

614 

615 Returns 

616 ------- 

617 start_time (str): scan start time self._scangroup[self._time_start_url][()] 

618 """ 

619 sg = self.get_scangroup() 

620 return bytes2str(sg[self._time_start_url][()]) 

621 

622 def get_timestamp(self): 

623 """Get timestamp from the current scan""" 

624 dt = np.datetime64(self.get_time()) 

625 return dt.astype(datetime.datetime).timestamp() 

626 

627 def get_scan_info_from_title(self): 

628 """Parser to get scan information from title 

629 

630 Known types of scans 

631 -------------------- 

632 Generic: <scan_type> <scan_axis> <start> <end> <npoints> <counting_time> 

633 'Escan' (ESRF BM30/BM16 Spec -> Energy) 

634 'Emiscan' (ESRF BM30/BM16 Spec -> Emi_Energy) 

635 'fscan' (ESRF ID26 Spec -> mono_energy) 

636 'contscan.motor' (ESRF ID24-DCM BLISS 2023-06 -> energy_enc) 

637 'contscan.EnergyCont' (ESRF BM16 BLISS 2023-09 -> energy_enc) 

638 'scans.exafs*' (ESRF BM23 BLISS 2023-06 -> energy_cenc) 

639 

640 Returns 

641 ------- 

642 iscn : dict of str 

643 { 

644 scan_type : "type of scan", 

645 scan_axis : "scanned axis", 

646 scan_start : "", 

647 scan_end : "", 

648 scan_pts : "", 

649 scan_ct : "", 

650 scan_info : "" 

651 } 

652 """ 

653 iscn = dict( 

654 scan_type=None, 

655 scan_axis=None, 

656 scan_start=None, 

657 scan_end=None, 

658 scan_pts=None, 

659 scan_ct=None, 

660 scan_info=None, 

661 ) 

662 

663 _title = self.get_title() 

664 if isinstance(_title, np.ndarray): 

665 _title = np.char.decode(_title)[0] 

666 _title_splitted = [s for s in _title.split(" ") if not s == ""] 

667 _scntype = _title_splitted[0] 

668 iscn.update(dict(scan_type=_scntype)) 

669 try: 

670 iscn.update( 

671 dict( 

672 scan_axis=_title_splitted[1], 

673 scan_start=_title_splitted[2], 

674 scan_end=_title_splitted[3], 

675 scan_pts=_title_splitted[4], 

676 scan_ct=_title_splitted[5], 

677 ) 

678 ) 

679 except IndexError: 

680 try: 

681 iscn.update( 

682 dict( 

683 scan_start=_title_splitted[1], 

684 scan_end=_title_splitted[2], 

685 scan_pts=_title_splitted[3], 

686 scan_ct=_title_splitted[4], 

687 ) 

688 ) 

689 except IndexError: 

690 pass 

691 

692 # === CUSTOM SCANS -> TODO(move to NeXus) 

693 if _scntype == "Escan": 

694 iscn.update(dict(scan_axis="Energy")) 

695 iscn.update(dict(scan_info="ESRF/BM30-BM16 Energy scans with Spec")) 

696 if _scntype == "Emiscan": 

697 iscn.update(dict(scan_axis="Emi_Energy")) 

698 iscn.update(dict(scan_info="ESRF/BM30-BM16 emission scans with Spec")) 

699 if _scntype == "fscan": 

700 iscn.update(dict(scan_axis="mono_energy")) 

701 iscn.update(dict(scan_info="ESRF/ID26 fscan")) 

702 if "scans.exafs" in _scntype: 

703 iscn.update(dict(scan_axis="energy_cenc")) 

704 iscn.update(dict(scan_info="ESRF/BM23 BLISS 2023-June")) 

705 if _scntype == "contscan.motor": 

706 iscn.update(dict(scan_axis="energy_enc")) 

707 iscn.update(dict(scan_info="ESRF/ID24-DCM BLISS 2023-June")) 

708 if _scntype == "contscan.EnergyCont": 

709 iscn.update(dict(scan_axis="energy_enc")) 

710 iscn.update(dict(scan_info="ESRF/BM16 BLISS 2023-Sept")) 

711 if _scntype == "trigscan": 

712 iscn.update(dict(scan_axis="energy_enc")) 

713 iscn.update(dict(scan_info="ESRF/BLISS 2023-Dec")) 

714 return iscn 

715 

716 def get_scan_axis(self): 

717 """Get the name of the scanned axis from scan title""" 

718 iscn = self.get_scan_info_from_title() 

719 _axisout = iscn["scan_axis"] 

720 _mots, _cnts = self.get_motors(), self.get_counters() 

721 if not (_axisout in _mots): 

722 self._logger.debug(f"'{_axisout}' not in (real) motors") 

723 if not (_axisout in _cnts): 

724 self._logger.debug(f"'{_axisout}' not in counters") 

725 _axisout = _cnts[0] 

726 self._logger.info(f"using the first counter: '{_axisout}'") 

727 return _axisout 

728 

729 def get_array(self, cnt=0): 

730 """Get array of a given counter 

731 

732 Parameters 

733 ---------- 

734 cnt : str or int 

735 counter name or index in the list of counters 

736 

737 Returns 

738 ------- 

739 array 

740 """ 

741 sg = self.get_scangroup() 

742 cnts = self.get_counters() 

743 if type(cnt) is int: 

744 cnt = cnts[cnt] 

745 self._logger.info("Selected counter %s", cnt) 

746 if cnt in cnts: 

747 sel_cnt = f"{self._cnts_url}/{cnt}" 

748 return copy.deepcopy(sg[sel_cnt][()]) 

749 else: 

750 errmsg = f"[{self._fn}//{self._scan_n}] '{cnt}' not found in available counters" 

751 self._logger.error(errmsg) 

752 raise ValueError(errmsg) 

753 

754 def get_motor_position(self, mot): 

755 """Get motor position 

756 

757 Parameters 

758 ---------- 

759 mot : str or int 

760 motor name or index in the list of motors 

761 

762 Returns 

763 ------- 

764 value 

765 """ 

766 sg = self.get_scangroup() 

767 mots = self.get_motors() 

768 if type(mot) is int: 

769 mot = mots[mot] 

770 self._logger.info(f"Selected motor '{mot}'") 

771 if mot in mots: 

772 sel_mot = f"{self._mots_url}/{mot}" 

773 return copy.deepcopy(sg[sel_mot][()]) 

774 else: 

775 self._logger.error(f"[{self._fn}//{self._scan_n}] '{mot}' not found in available motors") 

776 return None 

777 

778 def get_scan(self, scan=None, datatype=None): 

779 """Get Larch group for the current scan 

780 

781 Parameters 

782 ---------- 

783 scan : str, int, or None 

784 scan address 

785 datatype : str 

786 type of data, e.g. 'raw', 'xas' 

787 

788 Returns 

789 ------- 

790 larch Group with scan data 

791 """ 

792 scan_group = self.get_scangroup(scan) 

793 scan_index = self._scan_n 

794 scan_name = self._scan_str 

795 all_labels = self.get_counters() 

796 motor_names = self.get_scan_motors() 

797 title = self.get_title() 

798 timestring = self.get_time() 

799 timestamp = self.get_timestamp() 

800 path, filename = os.path.split(self._fname) 

801 axis = self.get_scan_axis() 

802 array_labels = [axis] 

803 array_labels.extend([i for i in motor_names if i not in array_labels]) 

804 array_labels.extend([i for i in all_labels if i not in array_labels]) 

805 

806 scan_header = list(scan_group.get(self._scan_header_url, [])) 

807 file_header = list(scan_group.get(self._file_header_url, [])) 

808 file_type = self._sourcefile_type 

809 header = [] 

810 for scanh in scan_header: 

811 if scanh.startswith("#CXDI "): 

812 header.append(scanh[6:].strip()) 

813 out = Group( 

814 __name__=f"{file_type} file: {filename}, scan: {scan_name}", 

815 path=path, 

816 filename=filename, 

817 datatype=datatype, 

818 array_labels=array_labels, 

819 motor_names=motor_names, 

820 axis=axis, 

821 scan_index=scan_index, 

822 scan_name=scan_name, 

823 title=title, 

824 header=header, 

825 scan_header=scan_header, 

826 file_header=file_header, 

827 timestring=timestring, 

828 timestamp=timestamp, 

829 ) 

830 

831 arr_axis = self.get_array(axis).astype(np.float64) 

832 axis_size = arr_axis.size 

833 data = [arr_axis] 

834 ptsdiffs = [0] 

835 pop_labels = [] 

836 self._logger.debug(f"X array (=scan axis): `{axis}` (size: {axis_size})") 

837 self._logger.debug("Y arrays >>> loading all arrays in array_labels (check size match with scan axis) <<<") 

838 for label in array_labels[1:]: #: avoid loading twice arr_axis 

839 arr = self.get_array(label).astype(np.float64) 

840 ptsdiff = axis_size - arr.size 

841 self._logger.debug(f"`{label}` ({arr.size}) -> {abs(ptsdiff)}") 

842 if abs(ptsdiff) > 10 or ptsdiff < 0: 

843 ipop = array_labels.index(label) 

844 pop_labels.append(array_labels.pop(ipop)) 

845 else: 

846 ptsdiffs.append(ptsdiff) 

847 data.append(arr) 

848 setattr(out, label, arr) 

849 assert len(array_labels) == len(data) == len(ptsdiffs), "length of array_labels and data do not match" 

850 if len(pop_labels): 

851 self._logger.info(f"Y arrays >>> not loaded: `{pop_labels}` [excessive size mismatch with `{axis}`]") 

852 #: in case of array shape mismatch strip last points 

853 ptsdiff_max = max(ptsdiffs) 

854 if ptsdiff_max > 0: 

855 out_size = axis_size - ptsdiff_max 

856 for iarr, (lab, arr) in enumerate(zip(array_labels, data)): 

857 ptsdiff = axis_size - arr.size 

858 arr = arr[:out_size] 

859 data[iarr] = arr 

860 setattr(out, lab, arr) 

861 self._logger.info(f"Y arrays >>> removed {ptsdiff_max} last points") 

862 out.data = np.array(data) 

863 return out 

864 

865 def get_axis_data(self, ax_name=None, to_energy=None): 

866 """Get data for the scan axis 

867 

868 Description 

869 ----------- 

870 This method returns the data (=label and array) for a given axis of the 

871 selected scan. It is primarily targeted to a "scanning" axis, but any 

872 counter can be used. It is possible to control common conversions, like 

873 Bragg angle to energy. 

874 

875 Parameters 

876 ---------- 

877 ax_name : str or None 

878 

879 to_energy : dict 

880 Controls the conversion of the signal to energy [None] 

881 

882 .. note:: Bragg angle assumed in mrad, output in eV 

883 

884 { 

885 "bragg_ax": "str", #: name of counter used for Bragg angale 

886 "bragg_ax_type": "str", #: 'motor' or 'counter' 

887 "bragg_enc_units": float, #: units to convert encoder to deg (bragg_ax should contain 'enc') 

888 } 

889 

890 Returns 

891 ------- 

892 label, data 

893 """ 

894 if (ax_name is not None) and (ax_name not in self.get_counters()): 

895 self._logger.error("%s not a counter", ax_name) 

896 return None, None 

897 ax_label = ax_name or self.get_scan_axis() 

898 ax_data = self.get_array(ax_label) 

899 if to_energy is not None: 

900 try: 

901 from sloth.utils.bragg import ang2kev 

902 except ImportError: 

903 

904 def ang2kev(theta, d): 

905 from larch.utils.physical_constants import PLANCK_HC 

906 

907 theta = np.deg2rad(theta) 

908 wlen = 2 * d * np.sin(theta) 

909 return (PLANCK_HC / wlen) / 1000.0 

910 

911 bragg_ax = to_energy["bragg_ax"] 

912 bragg_ax_type = to_energy["bragg_ax_type"] 

913 bragg_d = to_energy["bragg_d"] 

914 if bragg_ax_type == "counter": 

915 bragg_deg = self.get_array(bragg_ax).mean() 

916 elif bragg_ax_type == "motor": 

917 bragg_deg = self.get_value(bragg_ax) 

918 else: 

919 self._logger.error("wrong 'bragg_ax_type' (motor or counter?)") 

920 if "enc" in bragg_ax: 

921 bragg_deg = (np.abs(bragg_deg) / to_energy["bragg_enc_units"]) * 360 

922 ax_abs_deg = bragg_deg + np.rad2deg(ax_data) / 1000.0 

923 ax_abs_ev = ang2kev(ax_abs_deg, bragg_d) * 1000.0 

924 ax_data = ax_abs_ev 

925 ax_label += "_abs_ev" 

926 self._logger.debug("Converted axis %s", ax_label) 

927 xmin = ax_data.min() 

928 xmax = ax_data.max() 

929 self._logger.info("%s range: [%.3f, %.3f]", ax_label, xmin, xmax) 

930 return ax_label, ax_data 

931 

932 def get_signal_data(self, sig_name, mon=None, deglitch=None, norm=None): 

933 """Get data for the signal counter 

934 

935 Description 

936 ----------- 

937 This method returns the data (=label and array) for a given signal of the 

938 selected scan. It is possible to control normalization and/or deglitching. 

939 

940 Order followed in the basic processing: 

941 - raw data 

942 - divide by monitor signal (+ multiply back by average) 

943 - deglitch 

944 - norm 

945 

946 Parameters 

947 ---------- 

948 sig_name : str 

949 mon : dict 

950 Controls the normalization of the signal by a monitor signal [None] 

951 { 

952 "monitor": "str", #: name of counter used for normalization 

953 "cps": bool, #: multiply back to np.average(monitor) 

954 } 

955 deglitch : dict 

956 Controls :func:`larch.math.deglitch.remove_spikes_medfilt1d` [None] 

957 norm : dict 

958 Controls the normalization by given method 

959 

960 Returns 

961 ------- 

962 label, data 

963 """ 

964 #: get raw data 

965 sig_data = self.get_array(sig_name) 

966 sig_label = sig_name 

967 #: (opt) divide by monitor signal + multiply back by average 

968 if mon is not None: 

969 if isinstance(mon, str): 

970 mon = dict(monitor=mon, cps=False) 

971 mon_name = mon["monitor"] 

972 mon_data = self.get_array(mon_name) 

973 sig_data /= mon_data 

974 sig_label += f"_mon({mon_name})" 

975 if mon["cps"]: 

976 sig_data *= np.average(mon_data) #: put back in counts 

977 sig_label += "_cps" 

978 #: (opt) deglitch 

979 if deglitch is not None: 

980 sig_data = remove_spikes_medfilt1d(sig_data, **deglitch) 

981 sig_label += "_dgl" 

982 #: (opt) normalization 

983 if norm is not None: 

984 norm_meth = norm["method"] 

985 sig_data = norm1D(sig_data, norm=norm_meth, logger=self._logger) 

986 if norm_meth is not None: 

987 sig_label += f"_norm({norm_meth})" 

988 self._logger.info("Loaded signal: %s", sig_label) 

989 return sig_label, sig_data 

990 

991 def get_curve( 

992 self, 

993 sig_name, 

994 ax_name=None, 

995 to_energy=None, 

996 mon=None, 

997 deglitch=None, 

998 norm=None, 

999 **kws, 

1000 ): 

1001 """Get XY data (=curve) for current scan 

1002 

1003 Parameters 

1004 ---------- 

1005 *args, **kws -> self.get_axis_data() and self.get_signal_data() 

1006 

1007 Returns 

1008 ------- 

1009 [ax_data, sig_data, label, attrs] : list of [array, array, str, dict] 

1010 

1011 """ 

1012 ax_label, ax_data = self.get_axis_data(ax_name=ax_name, to_energy=to_energy) 

1013 sig_label, sig_data = self.get_signal_data( 

1014 sig_name, mon=mon, deglitch=deglitch, norm=norm 

1015 ) 

1016 label = f"S{self._scan_n}_X({ax_label})_Y{sig_label}" 

1017 attrs = dict( 

1018 xlabel=ax_label, 

1019 ylabel=sig_label, 

1020 label=label, 

1021 ax_label=ax_label, 

1022 sig_label=sig_label, 

1023 ) 

1024 return [ax_data, sig_data, label, attrs] 

1025 

1026 # =================== # 

1027 #: WRITE DATA METHODS 

1028 # =================== # 

1029 

1030 def write_scans_to_h5( 

1031 self, 

1032 scans, 

1033 fname_out, 

1034 scans_groups=None, 

1035 h5path=None, 

1036 overwrite=False, 

1037 conf_dict=None, 

1038 ): 

1039 """Export a selected list of scans to HDF5 file 

1040 

1041 .. note:: This is a simple wrapper to 

1042 :func:`silx.io.convert.write_to_h5` 

1043 

1044 Parameters 

1045 ---------- 

1046 scans : str, list of ints or list of lists (str/ints) 

1047 scan numbers to export (parsed by _str2rng) 

1048 if a list of lists, scans_groups is required 

1049 fname_out : str 

1050 output file name 

1051 scans_groups : list of strings 

1052 groups of scans 

1053 h5path : str (optional) 

1054 path inside HDF5 [None -> '/'] 

1055 overwrite : boolean (optional) 

1056 force overwrite if the file exists [False] 

1057 conf_dict : None or dict (optional) 

1058 configuration dictionary saved as '{hdfpath}/.config' 

1059 """ 

1060 self._fname_out = fname_out 

1061 self._logger.info(f"output file: {self._fname_out}") 

1062 if os.path.isfile(self._fname_out) and os.access(self._fname_out, os.R_OK): 

1063 self._logger.info(f"output file exists (overwrite is {overwrite})") 

1064 _fileExists = True 

1065 else: 

1066 _fileExists = False 

1067 

1068 #: out hdf5 file 

1069 if overwrite and _fileExists: 

1070 os.remove(self._fname_out) 

1071 h5out = h5py.File(self._fname_out, mode="a", track_order=True) 

1072 

1073 #: h5path 

1074 if h5path is None: 

1075 h5path = "/" 

1076 else: 

1077 h5path += "/" 

1078 

1079 #: write group configuration dictionary, if given 

1080 if conf_dict is not None: 

1081 from silx.io.dictdump import dicttoh5 

1082 

1083 _h5path = f"{h5path}.config/" 

1084 dicttoh5( 

1085 conf_dict, 

1086 h5out, 

1087 h5path=_h5path, 

1088 create_dataset_args=dict(track_order=True), 

1089 ) 

1090 self._logger.info(f"written dictionary: {_h5path}") 

1091 

1092 #: write scans 

1093 def _loop_scans(scns, group=None): 

1094 for scn in scns: 

1095 self.set_scan(scn) 

1096 _scangroup = self._scangroup 

1097 if _scangroup is None: 

1098 continue 

1099 if group is not None: 

1100 _h5path = f"{h5path}{group}/{self._scan_str}/" 

1101 else: 

1102 _h5path = f"{h5path}{self._scan_str}/" 

1103 write_to_h5( 

1104 _scangroup, 

1105 h5out, 

1106 h5path=_h5path, 

1107 create_dataset_args=dict(track_order=True), 

1108 ) 

1109 self._logger.info(f"written scan: {_h5path}") 

1110 

1111 if type(scans) is list: 

1112 assert type(scans_groups) is list, "'scans_groups' should be a list" 

1113 assert len(scans) == len( 

1114 scans_groups 

1115 ), "'scans_groups' not matching 'scans'" 

1116 for scns, group in zip(scans, scans_groups): 

1117 _loop_scans(_str2rng(scns), group=group) 

1118 else: 

1119 _loop_scans(_str2rng(scans)) 

1120 

1121 #: close output file 

1122 h5out.close() 

1123 

1124 

1125def str2rng_larch(rngstr, keeporder=True): 

1126 """larch equivalent of _str2rng()""" 

1127 return _str2rng(rngstr, keeporder=keeporder) 

1128 

1129 

1130str2rng_larch.__doc__ = _str2rng.__doc__ 

1131 

1132 

1133def open_specfile(filename): 

1134 return DataSourceSpecH5(filename) 

1135 

1136 

1137def read_specfile(filename, scan=None): 

1138 """simple mapping of a Spec/BLISS file to a Larch group""" 

1139 df = DataSourceSpecH5(filename) 

1140 return df.get_scan(scan)