Coverage for larch/math/pca.py: 19%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1#!/usr/bin/env python 

2""" 

3linear combination fitting 

4""" 

5import os 

6import sys 

7import time 

8import json 

9from itertools import combinations 

10from gzip import GzipFile 

11 

12import numpy as np 

13from numpy.random import randint 

14 

15try: 

16 from sklearn.decomposition import PCA 

17 HAS_SKLEARN = True 

18except ImportError: 

19 HAS_SKLEARN = False 

20 

21from lmfit import minimize, Parameters 

22 

23from .. import Group 

24from .utils import interp, index_of 

25from larch.utils import str2bytes, bytes2str, read_textfile, unixpath 

26 

27from .lincombo_fitting import get_arrays, get_label, groups2matrix 

28 

29 

30def nmf_train(groups, arrayname='norm', xmin=-np.inf, xmax=np.inf, 

31 solver='cd', beta_loss=2): 

32 """use a list of data groups to train a Non-negative model 

33 

34 Arguments 

35 --------- 

36 groups list of groups to use as components 

37 arrayname string of array name to be fit (see Note 2) ['norm'] 

38 xmin x-value for start of fit range [-inf] 

39 xmax x-value for end of fit range [+inf] 

40 beta_loss beta parameter for NMF [2] 

41 

42 Returns 

43 ------- 

44 group with trained NMF model, to be used with pca_fit 

45 

46 Notes 

47 ----- 

48 1. The group members for the components must match each other 

49 in data content and array names. 

50 2. arrayname can be one of `norm` or `dmude` 

51 """ 

52 xdat, ydat = groups2matrix(groups, arrayname, xmin=xmin, xmax=xmax) 

53 

54 ydat[np.where(ydat<0)] = 0 

55 opts = dict(n_components=len(groups), solver=solver) 

56 if solver == 'mu': 

57 opts.update(dict(beta_loss=beta_loss)) 

58 ret = NMF(**opts).fit(ydat) 

59 labels = [get_label(g) for g in groups] 

60 

61 return Group(x=xdat, arrayname=arrayname, labels=labels, ydat=ydat, 

62 components=ret.components_, 

63 xmin=xmin, xmax=xmax, model=ret) 

64 

65 

66def pca_train_sklearn(groups, arrayname='norm', xmin=-np.inf, xmax=np.inf): 

67 """use a list of data groups to train a Principal Component Analysis 

68 

69 Arguments 

70 --------- 

71 groups list of groups to use as components 

72 arrayname string of array name to be fit (see Note 2) ['norm'] 

73 xmin x-value for start of fit range [-inf] 

74 xmax x-value for end of fit range [+inf] 

75 

76 Returns 

77 ------- 

78 group with trained PCA or N model, to be used with pca_fit 

79 

80 Notes 

81 ----- 

82 1. The group members for the components must match each other 

83 in data content and array names. 

84 2. arrayname can be one of `norm` or `dmude` 

85 """ 

86 xdat, ydat = groups2matrix(groups, arrayname, xmin=xmin, xmax=xmax) 

87 if not HAS_SKLEARN: 

88 raise ImportError("scikit-learn not installed") 

89 

90 ret = PCA().fit(ydat) 

91 labels = [get_label(g) for g in groups] 

92 

93 return Group(x=xdat, arrayname=arrayname, labels=labels, ydat=ydat, 

94 xmin=xmin, xmax=xmax, model=ret, mean=ret.mean_, 

95 components=ret.components_, 

96 variances=ret.explained_variance_ratio_) 

97 

98 

99def pca_athena(groups, arrayname='norm', subtract_mean=True, 

100 normalize=True, xmin=-np.inf, xmax=np.inf): 

101 xdat, data = groups2matrix(groups, arrayname, xmin=xmin, xmax=xmax) 

102 if subtract_mean: 

103 data = data - data.mean(axis=0) 

104 

105 data = data.T 

106 data = data - data.mean(axis=0) 

107 if normalize: 

108 data = data / data.std(axis=0) 

109 

110 cor = np.dot(data.T, data) / data.shape[0] 

111 evals, var = np.linalg.eigh(cor) 

112 iorder = np.argsort(evals)[::-1] 

113 evals = evals[iorder] 

114 evec = np.dot(data, var)[:, iorder] 

115 return evec, evals 

116 

117def pca_train(groups, arrayname='norm', xmin=-np.inf, xmax=np.inf): 

118 """use a list of data groups to train a Principal Component Analysis 

119 

120 Arguments 

121 --------- 

122 groups list of groups to use as components 

123 arrayname string of array name to be fit (see Note 2) ['norm'] 

124 xmin x-value for start of fit range [-inf] 

125 xmax x-value for end of fit range [+inf] 

126 

127 Returns 

128 ------- 

129 group with trained PCA or N model, to be used with pca_fit 

130 

131 Notes 

132 ----- 

133 1. The group members for the components must match each other 

134 in data content and array names. 

135 2. arrayname can be one of `norm` or `dmude` 

136 """ 

137 xdat, ydat = groups2matrix(groups, arrayname, xmin=xmin, xmax=xmax) 

138 labels = [get_label(g) for g in groups] 

139 narr, nfreq = ydat.shape 

140 

141 ymean = ydat.mean(axis=0) 

142 ynorm = ydat - ymean 

143 

144 # normalize data to be centered at 0 with unit standard deviation 

145 ynorm = (ynorm.T - ynorm.mean(axis=1)) / ynorm.std(axis=1) 

146 eigval, eigvec_ = np.linalg.eigh(np.dot(ynorm.T, ynorm) / narr) 

147 eigvec = (np.dot(ynorm, -eigvec_)/narr).T 

148 eigvec, eigval = eigvec[::-1, :], eigval[::-1] 

149 

150 variances = eigval/eigval.sum() 

151 

152 # calculate IND statistic 

153 ind = None 

154 for r in range(narr-1): 

155 nr = narr - r - 1 

156 indval = np.sqrt(nfreq*eigval[r:].sum()/nr)/nr**2 

157 if ind is None: 

158 ind = [indval] 

159 ind.append(indval) 

160 ind = np.array(ind) 

161 

162 nsig = int(np.argmin(ind)) 

163 return Group(x=xdat, arrayname=arrayname, labels=labels, ydat=ydat, 

164 xmin=xmin, xmax=xmax, mean=ymean, components=eigvec, 

165 eigenvalues=eigval, variances=variances, ind=ind, nsig=nsig) 

166 

167 

168def save_pca_model(pca_model, filename): 

169 """save a PCA model to a file""" 

170 from larch.utils.jsonutils import encode4js 

171 buff = ['##Larch PCA Model: 1.0 : %s' % time.strftime('%Y-%m-%d %H:%M:%S')] 

172 buff.append('%s' % json.dumps(encode4js(pca_model))) 

173 

174 fh = GzipFile(unixpath(filename), "w") 

175 fh.write(str2bytes("\n".join(buff))) 

176 fh.close() 

177 

178def read_pca_model(filename): 

179 """read a PCA model from a file""" 

180 from larch.utils.jsonutils import decode4js 

181 text = read_textfile(filename) 

182 lines = text.split('\n') 

183 if not lines[0].startswith('##Larch PCA Model'): 

184 raise ValueError(f"Invalid Larch PCA Model: '{fname:s}'") 

185 return decode4js(json.loads(lines[1])) 

186 

187 

188def pca_statistics(pca_model): 

189 """return PCA arrays of statistics IND and F 

190 

191 For data of shape (p, n) (that is, p frequencies/energies, n spectra) 

192 

193 For index r, and eigv = eigenvalues 

194 

195 IND(r) = sqrt( eigv[r:].sum() / (p*(n-r))) / (n-r)**2 

196 

197 F1R(r) = eigv[r] / (p+1-r)*(n+1-r) / sum_i=r^n-1 (eigv[i] / ((p+1-i)*(n+1-i))) 

198 """ 

199 p, n = pca_model.ydat.shape 

200 eigv = pca_model.eigenvalues 

201 ind, f1r = [], [] 

202 for r in range(n-1): 

203 nr = n-r-1 

204 ind.append( np.sqrt(eigv[r:].sum()/ (p*nr))/nr**2) 

205 f1sum = 0 

206 for i in range(r, n): 

207 f1sum += eigv[i]/((p+1-i)*(n+1-i)) 

208 f1sum = max(1.e-10, f1sum) 

209 f1r.append(eigv[r] / (max(1, (p+1-r)*(n-r+1)) * f1sum)) 

210 

211 pca_model.ind = np.array(ind) 

212 pca_model.f1r = np.array(f1r) 

213 

214 return pca_model.ind, pca_model.f1r 

215 

216def _pca_scale_resid(params, ydat=None, pca_model=None, comps=None): 

217 scale = params['scale'].value 

218 weights, chi2, rank, s = np.linalg.lstsq(comps, ydat*scale-pca_model.mean) 

219 yfit = (weights * comps).sum(axis=1) + pca_model.mean 

220 return (scale*ydat - yfit) 

221 

222 

223def pca_fit(group, pca_model, ncomps=None, rescale=True): 

224 """ 

225 fit a spectrum from a group to a PCA training model from pca_train() 

226 

227 Arguments 

228 --------- 

229 group group with data to fit 

230 pca_model PCA model as found from pca_train() 

231 ncomps number of components to included 

232 rescale whether to allow data to be renormalized (True) 

233 

234 Returns 

235 ------- 

236 None, the group will have a subgroup name `pca_result` created 

237 with the following members: 

238 

239 x x or energy value from model 

240 ydat input data interpolated onto `x` 

241 yfit linear least-squares fit using model components 

242 weights weights for PCA components 

243 chi_square goodness-of-fit measure 

244 pca_model the input PCA model 

245 

246 """ 

247 # get first nerate arrays and interpolate components onto the unknown x array 

248 xdat, ydat = groups2matrix([group], pca_model.arrayname, xmin=pca_model.xmin, xmax=pca_model.xmax) 

249 

250 if xdat is None or ydat is None: 

251 raise ValueError("cannot get arrays for arrayname='%s'" % arrayname) 

252 

253 xshape = xdat.shape 

254 if len(xshape) == 2: 

255 xdat = xdat[0] 

256 

257 ydat = ydat[0] 

258 ydat = interp(xdat, ydat, pca_model.x, kind='cubic') 

259 

260 params = Parameters() 

261 params.add('scale', value=1.0, vary=True, min=0) 

262 

263 if ncomps is None: 

264 ncomps=len(pca_model.components) 

265 comps = pca_model.components[:ncomps].transpose() 

266 

267 if rescale: 

268 weights, chi2, rank, s = np.linalg.lstsq(comps, ydat-pca_model.mean) 

269 yfit = (weights * comps).sum(axis=1) + pca_model.mean 

270 

271 result = minimize(_pca_scale_resid, params, method='leastsq', 

272 gtol=1.e-5, ftol=1.e-5, xtol=1.e-5, epsfcn=1.e-5, 

273 kws = dict(ydat=ydat, comps=comps, pca_model=pca_model)) 

274 scale = result.params['scale'].value 

275 ydat *= scale 

276 weights, chi2, rank, s = np.linalg.lstsq(comps, ydat-pca_model.mean) 

277 yfit = (weights * comps).sum(axis=1) + pca_model.mean 

278 

279 else: 

280 weights, chi2, rank, s = np.linalg.lstsq(comps, ydat-pca_model.mean) 

281 yfit = (weights * comps).sum(axis=1) + pca_model.mean 

282 scale = 1.0 

283 

284 group.pca_result = Group(x=pca_model.x, ydat=ydat, yfit=yfit, 

285 pca_model=pca_model, chi_square=chi2[0], 

286 data_scale=scale, weights=weights) 

287 return