Coverage for larch/io/xas_data_source/hdf5_utils.py: 85%

55 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1from contextlib import contextmanager 

2from typing import Iterator, Optional 

3import h5py 

4 

5H5PY_VERSION = h5py.version.version_tuple[:3] 

6H5PY_HAS_LOCKING = H5PY_VERSION >= (3, 5) 

7 

8 

9@contextmanager 

10def open(filename) -> Iterator[h5py.File]: 

11 kw = {"mode": "r"} 

12 if H5PY_HAS_LOCKING: 

13 kw["locking"] = False 

14 with h5py.File(filename, **kw) as f: 

15 yield f 

16 

17 

18def nexus_creator(filename: str) -> str: 

19 with open(filename) as nxroot: 

20 return nxroot.attrs.get("creator", "") 

21 

22 

23def nexus_instrument(filename: str) -> str: 

24 with open(filename) as nxroot: 

25 entry = find_nexus_class(nxroot, "NXentry") 

26 if entry is None: 

27 return "" 

28 

29 instrument = find_nexus_class(entry, "NXinstrument") 

30 if instrument is None: 

31 return "" 

32 

33 if "name" in instrument: 

34 return asstr(instrument["name"][()]) 

35 return "" 

36 

37 

38def nexus_source(filename: str) -> str: 

39 with open(filename) as nxroot: 

40 entry = find_nexus_class(nxroot, "NXentry") 

41 if entry is None: 

42 return "" 

43 

44 source = find_nexus_class(entry, "NXsource") 

45 if source is None: 

46 instrument = find_nexus_class(entry, "NXinstrument") 

47 if instrument is None: 

48 return "" 

49 source = find_nexus_class(instrument, "NXsource") 

50 if source is None: 

51 return "" 

52 

53 if "name" in source: 

54 return asstr(source["name"][()]) 

55 return "" 

56 

57 

58def asstr(s): 

59 if isinstance(s, bytes): 

60 return s.decode() 

61 return s 

62 

63 

64def find_nexus_class(parent: h5py.Group, nxclass: str) -> Optional[h5py.Group]: 

65 for name in parent: 

66 try: 

67 child = parent[name] 

68 except KeyError: 

69 continue # broken line 

70 if asstr(child.attrs.get("NX_class", "")) != nxclass: 

71 continue 

72 return child