Coverage for larch/utils/__init__.py: 35%

129 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2import sys 

3from traceback import format_tb 

4import time 

5from datetime import datetime 

6from gzip import GzipFile 

7import io 

8import copy 

9import json 

10import logging 

11 

12from charset_normalizer import from_bytes 

13from .gformat import gformat, getfloat_attr 

14from .paths import uname, bindir, unixpath, get_homedir, path_split, get_cwd 

15from .debugtimer import debugtimer 

16 

17from .strutils import (fixName, isValidName, isNumber, bytes2str, str2bytes, 

18 fix_filename, fix_varname, strip_quotes, isLiteralStr, 

19 strip_comments, asfloat, find_delims, version_ge, 

20 unique_name, get_sessionid, strict_ascii) 

21 

22from .shellutils import (_more, _parent, ls, cd, cwd, mkdir) 

23 

24logging.basicConfig(format='%(levelname)s [%(asctime)s]: %(message)s', 

25 datefmt='%Y-%m-%d %H:%M:%S', level=logging.WARNING) 

26 

27def format_exception(with_traceback=True): 

28 """return exception message as list of strings, 

29 optionally including traceback 

30 """ 

31 etype, exc, tb = sys.exc_info() 

32 out = [] 

33 if with_traceback: 

34 out = ["Traceback (most recent calls last):"] 

35 for tline in format_tb(tb): 

36 if tline.endswith('\n'): tline = tline[:-1] 

37 out.append(tline) 

38 out.append('\n') 

39 out.append(f"{etype.__name__}: {exc}") 

40 return out 

41 

42 

43def write_log(msg, level='debug'): 

44 f = logging.debug 

45 if level in ('warn', 'warning', logging.WARNING): 

46 f = logging.warning 

47 elif level in ('info', logging.INFO): 

48 f = logging.info 

49 elif level in ('error', logging.ERROR): 

50 f = logging.error 

51 elif level in ('critical', logging.CRITICAL): 

52 f = logging.critical 

53 return f(msg) 

54 

55def log_warning(msg): 

56 return logging.warning(msg) 

57 

58def log_debug(msg): 

59 return logging.debug(msg) 

60 

61def log_info(msg): 

62 return logging.info(msg) 

63 

64def log_error(msg): 

65 return logging.error(msg) 

66 

67def log_critical(msg): 

68 return logging.critical(msg) 

69 

70 

71def is_gzip(filename): 

72 "is a file gzipped?" 

73 with open(unixpath(filename), 'rb') as fh: 

74 return fh.read(3) == b'\x1f\x8b\x08' 

75 return False 

76 

77def read_textfile(filename, size=None): 

78 """read text from a file as string 

79 

80 Argument 

81 -------- 

82 filename (str or file): name of file to read or file-like object 

83 size (int or None): number of bytes to read 

84 

85 Returns 

86 ------- 

87 text of file as string. 

88 

89 Notes 

90 ------ 

91 1. the encoding is detected with charset_normalizer.from_bytes 

92 which is then used to decode bytes read from file. 

93 2. line endings are normalized to be '\n', so that 

94 splitting on '\n' will give a list of lines. 

95 3. if filename is given, it can be a gzip-compressed file 

96 """ 

97 text = '' 

98 

99 def decode(bytedata): 

100 return str(from_bytes(bytedata).best()) 

101 

102 if isinstance(filename, io.IOBase): 

103 text = filename.read(size) 

104 if filename.mode == 'rb': 

105 text = decode(text) 

106 else: 

107 fopen = GzipFile if is_gzip(filename) else open 

108 with fopen(unixpath(filename), 'rb') as fh: 

109 text = decode(fh.read(size)) 

110 return text.replace('\r\n', '\n').replace('\r', '\n') 

111 

112 

113def group2dict(group, _larch=None): 

114 "return dictionary of group members" 

115 return group.__dict__ 

116 

117def dict2group(d, _larch=None): 

118 "return group created from a dictionary" 

119 from larch import Group 

120 return Group(**d) 

121 

122def copy_group(group, _larch=None): 

123 from larch import Group 

124 out = Group(datatype=getattr(group, 'datatype', 'unknown'), 

125 copied_from=getattr(group, 'groupname', repr(group))) 

126 for attr in dir(group): 

127 try: 

128 setattr(out, attr, copy.deepcopy(getattr(group, attr))) 

129 except: 

130 print(f"cannot copy attribute {attr} from {group}") 

131 return out 

132 

133def copy_xafs_group(group, _larch=None): 

134 """specialized group copy for XAFS data groups""" 

135 from larch import Group 

136 out = Group(datatype=getattr(group, 'datatype', 'unknown'), 

137 copied_from=getattr(group, 'groupname', repr(group))) 

138 

139 for attr in dir(group): 

140 if attr not in ('norm', 'flat', 'deriv', 'deconv', 

141 'post_edge', 'pre_edge', 'norm_mback', 

142 'norm_vict', 'norm_poly'): 

143 try: 

144 val = copy.deepcopy(getattr(group, attr)) 

145 setattr(out, attr, val) 

146 except ValueError: 

147 pass 

148 return out 

149 

150 

151def isotime(t=None, with_tzone=False, filename=False): 

152 if t is None: 

153 t = time.time() 

154 sout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)) 

155 if with_tzone: 

156 sout = "%s-%2.2i:00" % (sout, time.timezone/3600) 

157 if filename: 

158 sout = sout.replace(' ', '_').replace(':', '') 

159 return sout 

160 

161def time_ago(timestamp, precision=2): 

162 """ 

163 give a human-readable 'time ago' from the timestamp. 

164 

165 The output gives day, hours, minutes, seconds: 

166 52 days, 1 hour 

167 

168 the `precision` field gives the number of significant time units to 

169 show. This defaults to 2: 

170 'N days, H hours', 

171 'N hours, M minutes' 

172 """ 

173 def format(x, unit): 

174 return "%d %s%s" % (x, unit, "s" if x > 1 else "") 

175 

176 tdiff = datetime.now() - datetime.fromtimestamp(timestamp) 

177 days = tdiff.days 

178 hours = tdiff.seconds//3600 

179 minutes = tdiff.seconds%3600//60 

180 seconds = tdiff.seconds%3600%60 

181 

182 out = [] 

183 if days > 0: 

184 out.append(format(days, "day")) 

185 if hours > 0: 

186 out.append(format(hours, "hour")) 

187 if minutes > 0: 

188 out.append(format(minutes, "minute")) 

189 out.append(format(seconds, "second")) 

190 return ", ".join(out[:precision]) 

191 

192def json_dump(data, filename): 

193 """ 

194 dump object or group to file using json 

195 """ 

196 from .jsonutils import encode4js 

197 with open(unixpath(filename), 'w') as fh: 

198 fh.write(json.dumps(encode4js(data))) 

199 fh.write('\n') 

200 

201def json_load(filename): 

202 """ 

203 load object from json dump file 

204 """ 

205 from .jsonutils import decode4js 

206 with open(unixpath(filename), 'rb') as fh: 

207 data = fh.read().decode('utf-8') 

208 return decode4js(json.loads(data)) 

209 

210def _larch_init(_larch): 

211 """initialize xrf""" 

212 from ..symboltable import Group 

213 _larch.symtable._sys.display = Group(use_color=True, 

214 colors=dict(text={'color': 'black'}, 

215 text2={'color': 'blue'}, 

216 error={'color': 'red'})) 

217 

218_larch_builtins = dict(copy=copy.copy, deepcopy=copy.deepcopy, more= _more, 

219 parent=_parent, ls=ls, mkdir=mkdir, cd=cd, 

220 cwd=cwd, group2dict=group2dict, 

221 copy_group=copy_group, copy_xafs_group=copy_xafs_group, 

222 dict2group=dict2group, debugtimer=debugtimer, 

223 isotime=isotime, json_dump=json_dump, 

224 json_load=json_load, gformat=gformat, 

225 fix_filename=fix_filename, 

226 fix_varname=fix_varname, 

227 strip_quotes=strip_quotes)