Coverage for larch/io/csvfiles.py: 15%

100 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2""" 

3Code to write and read CVS files 

4 

5""" 

6import sys 

7import os 

8import time 

9import json 

10import platform 

11import csv 

12 

13import numpy as np 

14from dateutil.parser import parse as dateparse 

15from larch import Group 

16from larch.math import interp, remove_dups 

17from larch.utils import bytes2str, fix_varname, gformat 

18 

19maketrans = str.maketrans 

20 

21def groups2csv(grouplist, filename, delim=',', 

22 x='energy', y='norm', individual=False): 

23 """save data from a list of groups to a CSV file 

24 

25 Arguments 

26 --------- 

27 grouplist list of groups to save arrays from 

28 filname name of output file 

29 x name of group member to use for `x` 

30 y name of group member to use for `y` 

31 individual toggle saving individual x/y in separate files 

32 

33 """ 

34 delim = delim.strip() + ' ' 

35 def get_label(grp): 

36 'get label for group' 

37 for attr in ('filename', 'label', 'name', 'file', '__name__'): 

38 o = getattr(grp, attr, None) 

39 if o is not None: 

40 return o 

41 return repr(o) 

42 

43 def save_group(g, delim=', ', x='energy', y='norm'): 

44 label = get_label(g) 

45 _x = getattr(g, x) 

46 _y = getattr(g, y) 

47 _n = len(_x) 

48 labels = [x, label] 

49 outarr = np.array([_x, _y]) 

50 buff = [f"#saved {time.ctime()}", 

51 f"#saving x array={x}, y array={y}", 

52 f"#{label}: {g.filename}", 

53 "#------------------------------------------", 

54 "# %s" % delim.join(labels)] 

55 for i in range(_n): 

56 buff.append(delim.join([gformat(_x[i]), gformat(_y[i])])) 

57 buff.append('') 

58 fnout = f"{label}.csv" 

59 with open(fnout, 'w', encoding=sys.getdefaultencoding()) as fh: 

60 fh.write("\n".join(buff)) 

61 print(f"Wrote group to {fnout}") 

62 

63 if individual is True: 

64 for g in grouplist: 

65 save_group(g, delim=delim, x=x, y=y) 

66 return 

67 

68 ngroups = len(grouplist) 

69 x0 = getattr(grouplist[0], x) 

70 npts = len(x0) 

71 columns = [x0, getattr(grouplist[0], y)] 

72 labels = [x, get_label(grouplist[0]) ] 

73 

74 buff = ["# %d files saved %s" % (len(grouplist), time.ctime()), 

75 "# saving x array='%s', y array='%s'" % (x, y), 

76 "# %s: %s" % (labels[1], grouplist[0].filename)] 

77 

78 for g in grouplist[1:]: 

79 label = get_label(g) 

80 buff.append("# %s: %s" % (label, g.filename)) 

81 labels.append(label) 

82 _x = remove_dups(getattr(g, x)) 

83 _y = getattr(g, y) 

84 if ((len(_x) != npts) or (abs(_x -x0)).sum() > 1.0): 

85 columns.append(interp(_x, _y, x0, kind='linear')) 

86 else: 

87 columns.append(_y) 

88 

89 buff.append("#------------------------------------------") 

90 buff.append("# %s" % delim.join(labels)) 

91 for i in range(npts): 

92 buff.append(delim.join([gformat(s[i]) for s in columns])) 

93 

94 buff.append('') 

95 with open(filename, 'w', encoding=sys.getdefaultencoding()) as fh: 

96 fh.write("\n".join(buff)) 

97 

98 print("Wrote %i groups to %s" % (len(columns)-1, filename)) 

99 

100 

101def str2float(word, allow_times=True): 

102 """convert a work to a float 

103 

104 Arguments 

105 --------- 

106 word str, word to be converted 

107 allow_times bool, whether to support time stamps [True] 

108 

109 Returns 

110 ------- 

111 either a float or text 

112 

113 Notes 

114 ----- 

115 The `allow_times` will try to support common date-time strings 

116 using the dateutil module, returning a numerical value as the 

117 Unix timestamp, using 

118 time.mktime(dateutil.parser.parse(word).timetuple()) 

119 """ 

120 mktime = time.mktime 

121 val = word 

122 try: 

123 val = float(word) 

124 except ValueError: 

125 try: 

126 val = mktime(dateparse(word).timetuple()) 

127 except ValueError: 

128 pass 

129 return val 

130 

131def read_csv(filename): 

132 """read CSV file, return group with data as columns""" 

133 csvfile = open(filename, 'r') 

134 dialect = csv.Sniffer().sniff(csvfile.read(), [',',';', '\t']) 

135 csvfile.seek(0) 

136 

137 data = None 

138 isfloat = None 

139 for row in csv.reader(csvfile, dialect): 

140 if data is None: 

141 ncols = len(row) 

142 data = [[] for i in range(ncols)] 

143 isfloat =[None]*ncols 

144 for i, word in enumerate(row): 

145 data[i].append(str2float(word)) 

146 if isfloat[i] is None: 

147 try: 

148 _ = float(word) 

149 isfloat[i] = True 

150 except ValueError: 

151 isfloat[i] = False 

152 

153 out = Group(filename=filename, data=data) 

154 for icol in range(ncols): 

155 cname = 'col_%2.2d' % (icol+1) 

156 val = data[icol] 

157 if isfloat[icol]: 

158 val = np.array(val) 

159 setattr(out, cname, val) 

160 

161 return out