Coverage for larch/utils/__init__.py: 35%
129 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1#!/usr/bin/env python
2import sys
3from traceback import format_tb
4import time
5from datetime import datetime
6from gzip import GzipFile
7import io
8import copy
9import json
10import logging
12from charset_normalizer import from_bytes
13from .gformat import gformat, getfloat_attr
14from .paths import uname, bindir, unixpath, get_homedir, path_split, get_cwd
15from .debugtimer import debugtimer
17from .strutils import (fixName, isValidName, isNumber, bytes2str, str2bytes,
18 fix_filename, fix_varname, strip_quotes, isLiteralStr,
19 strip_comments, asfloat, find_delims, version_ge,
20 unique_name, get_sessionid, strict_ascii)
22from .shellutils import (_more, _parent, ls, cd, cwd, mkdir)
24logging.basicConfig(format='%(levelname)s [%(asctime)s]: %(message)s',
25 datefmt='%Y-%m-%d %H:%M:%S', level=logging.WARNING)
27def format_exception(with_traceback=True):
28 """return exception message as list of strings,
29 optionally including traceback
30 """
31 etype, exc, tb = sys.exc_info()
32 out = []
33 if with_traceback:
34 out = ["Traceback (most recent calls last):"]
35 for tline in format_tb(tb):
36 if tline.endswith('\n'): tline = tline[:-1]
37 out.append(tline)
38 out.append('\n')
39 out.append(f"{etype.__name__}: {exc}")
40 return out
43def write_log(msg, level='debug'):
44 f = logging.debug
45 if level in ('warn', 'warning', logging.WARNING):
46 f = logging.warning
47 elif level in ('info', logging.INFO):
48 f = logging.info
49 elif level in ('error', logging.ERROR):
50 f = logging.error
51 elif level in ('critical', logging.CRITICAL):
52 f = logging.critical
53 return f(msg)
55def log_warning(msg):
56 return logging.warning(msg)
58def log_debug(msg):
59 return logging.debug(msg)
61def log_info(msg):
62 return logging.info(msg)
64def log_error(msg):
65 return logging.error(msg)
67def log_critical(msg):
68 return logging.critical(msg)
71def is_gzip(filename):
72 "is a file gzipped?"
73 with open(unixpath(filename), 'rb') as fh:
74 return fh.read(3) == b'\x1f\x8b\x08'
75 return False
77def read_textfile(filename, size=None):
78 """read text from a file as string
80 Argument
81 --------
82 filename (str or file): name of file to read or file-like object
83 size (int or None): number of bytes to read
85 Returns
86 -------
87 text of file as string.
89 Notes
90 ------
91 1. the encoding is detected with charset_normalizer.from_bytes
92 which is then used to decode bytes read from file.
93 2. line endings are normalized to be '\n', so that
94 splitting on '\n' will give a list of lines.
95 3. if filename is given, it can be a gzip-compressed file
96 """
97 text = ''
99 def decode(bytedata):
100 return str(from_bytes(bytedata).best())
102 if isinstance(filename, io.IOBase):
103 text = filename.read(size)
104 if filename.mode == 'rb':
105 text = decode(text)
106 else:
107 fopen = GzipFile if is_gzip(filename) else open
108 with fopen(unixpath(filename), 'rb') as fh:
109 text = decode(fh.read(size))
110 return text.replace('\r\n', '\n').replace('\r', '\n')
113def group2dict(group, _larch=None):
114 "return dictionary of group members"
115 return group.__dict__
117def dict2group(d, _larch=None):
118 "return group created from a dictionary"
119 from larch import Group
120 return Group(**d)
122def copy_group(group, _larch=None):
123 from larch import Group
124 out = Group(datatype=getattr(group, 'datatype', 'unknown'),
125 copied_from=getattr(group, 'groupname', repr(group)))
126 for attr in dir(group):
127 try:
128 setattr(out, attr, copy.deepcopy(getattr(group, attr)))
129 except:
130 print(f"cannot copy attribute {attr} from {group}")
131 return out
133def copy_xafs_group(group, _larch=None):
134 """specialized group copy for XAFS data groups"""
135 from larch import Group
136 out = Group(datatype=getattr(group, 'datatype', 'unknown'),
137 copied_from=getattr(group, 'groupname', repr(group)))
139 for attr in dir(group):
140 if attr not in ('norm', 'flat', 'deriv', 'deconv',
141 'post_edge', 'pre_edge', 'norm_mback',
142 'norm_vict', 'norm_poly'):
143 try:
144 val = copy.deepcopy(getattr(group, attr))
145 setattr(out, attr, val)
146 except ValueError:
147 pass
148 return out
151def isotime(t=None, with_tzone=False, filename=False):
152 if t is None:
153 t = time.time()
154 sout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t))
155 if with_tzone:
156 sout = "%s-%2.2i:00" % (sout, time.timezone/3600)
157 if filename:
158 sout = sout.replace(' ', '_').replace(':', '')
159 return sout
161def time_ago(timestamp, precision=2):
162 """
163 give a human-readable 'time ago' from the timestamp.
165 The output gives day, hours, minutes, seconds:
166 52 days, 1 hour
168 the `precision` field gives the number of significant time units to
169 show. This defaults to 2:
170 'N days, H hours',
171 'N hours, M minutes'
172 """
173 def format(x, unit):
174 return "%d %s%s" % (x, unit, "s" if x > 1 else "")
176 tdiff = datetime.now() - datetime.fromtimestamp(timestamp)
177 days = tdiff.days
178 hours = tdiff.seconds//3600
179 minutes = tdiff.seconds%3600//60
180 seconds = tdiff.seconds%3600%60
182 out = []
183 if days > 0:
184 out.append(format(days, "day"))
185 if hours > 0:
186 out.append(format(hours, "hour"))
187 if minutes > 0:
188 out.append(format(minutes, "minute"))
189 out.append(format(seconds, "second"))
190 return ", ".join(out[:precision])
192def json_dump(data, filename):
193 """
194 dump object or group to file using json
195 """
196 from .jsonutils import encode4js
197 with open(unixpath(filename), 'w') as fh:
198 fh.write(json.dumps(encode4js(data)))
199 fh.write('\n')
201def json_load(filename):
202 """
203 load object from json dump file
204 """
205 from .jsonutils import decode4js
206 with open(unixpath(filename), 'rb') as fh:
207 data = fh.read().decode('utf-8')
208 return decode4js(json.loads(data))
210def _larch_init(_larch):
211 """initialize xrf"""
212 from ..symboltable import Group
213 _larch.symtable._sys.display = Group(use_color=True,
214 colors=dict(text={'color': 'black'},
215 text2={'color': 'blue'},
216 error={'color': 'red'}))
218_larch_builtins = dict(copy=copy.copy, deepcopy=copy.deepcopy, more= _more,
219 parent=_parent, ls=ls, mkdir=mkdir, cd=cd,
220 cwd=cwd, group2dict=group2dict,
221 copy_group=copy_group, copy_xafs_group=copy_xafs_group,
222 dict2group=dict2group, debugtimer=debugtimer,
223 isotime=isotime, json_dump=json_dump,
224 json_load=json_load, gformat=gformat,
225 fix_filename=fix_filename,
226 fix_varname=fix_varname,
227 strip_quotes=strip_quotes)