Coverage for larch/io/xas_data_source/hdf5_utils.py: 85%
55 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1from contextlib import contextmanager
2from typing import Iterator, Optional
3import h5py
5H5PY_VERSION = h5py.version.version_tuple[:3]
6H5PY_HAS_LOCKING = H5PY_VERSION >= (3, 5)
9@contextmanager
10def open(filename) -> Iterator[h5py.File]:
11 kw = {"mode": "r"}
12 if H5PY_HAS_LOCKING:
13 kw["locking"] = False
14 with h5py.File(filename, **kw) as f:
15 yield f
18def nexus_creator(filename: str) -> str:
19 with open(filename) as nxroot:
20 return nxroot.attrs.get("creator", "")
23def nexus_instrument(filename: str) -> str:
24 with open(filename) as nxroot:
25 entry = find_nexus_class(nxroot, "NXentry")
26 if entry is None:
27 return ""
29 instrument = find_nexus_class(entry, "NXinstrument")
30 if instrument is None:
31 return ""
33 if "name" in instrument:
34 return asstr(instrument["name"][()])
35 return ""
38def nexus_source(filename: str) -> str:
39 with open(filename) as nxroot:
40 entry = find_nexus_class(nxroot, "NXentry")
41 if entry is None:
42 return ""
44 source = find_nexus_class(entry, "NXsource")
45 if source is None:
46 instrument = find_nexus_class(entry, "NXinstrument")
47 if instrument is None:
48 return ""
49 source = find_nexus_class(instrument, "NXsource")
50 if source is None:
51 return ""
53 if "name" in source:
54 return asstr(source["name"][()])
55 return ""
58def asstr(s):
59 if isinstance(s, bytes):
60 return s.decode()
61 return s
64def find_nexus_class(parent: h5py.Group, nxclass: str) -> Optional[h5py.Group]:
65 for name in parent:
66 try:
67 child = parent[name]
68 except KeyError:
69 continue # broken line
70 if asstr(child.attrs.get("NX_class", "")) != nxclass:
71 continue
72 return child