Coverage for larch/wxxas/xas_controller.py: 0%

381 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1import os 

2import time 

3import shutil 

4from glob import glob 

5from pathlib import Path 

6from copy import deepcopy 

7 

8import numpy as np 

9import wx 

10 

11import larch 

12from larch import Group, Journal, Entry 

13from larch.larchlib import read_config, save_config 

14from larch.utils import (group2dict, unique_name, fix_varname, get_cwd, 

15 asfloat, get_sessionid, mkdir, unixpath) 

16from larch.wxlib.plotter import last_cursor_pos 

17from larch.wxlib import ExceptionPopup 

18from larch.io import save_session 

19from larch.site_config import home_dir, user_larchdir 

20 

21from .config import XASCONF, CONF_FILE, OLDCONF_FILE 

22 

23class XASController(): 

24 """ 

25 class holding the Larch session and doing the processing work for Larix 

26 """ 

27 def __init__(self, wxparent=None, _larch=None): 

28 self.wxparent = wxparent 

29 self.filelist = None 

30 self.group = None 

31 self.groupname = None 

32 self.plot_erange = None 

33 self.report_frame = None 

34 self.recentfiles = [] 

35 self.panels = {} 

36 self.larch = _larch 

37 if _larch is None: 

38 self.larch = larch.Interpreter() 

39 self.larix_folder = Path(user_larchdir, 'larix').as_posix() 

40 self.config_file = Path(self.larix_folder, CONF_FILE).as_posix() 

41 self.init_larch_session() 

42 self.init_workdir() 

43 

44 def init_larch_session(self): 

45 self.symtable = self.larch.symtable 

46 self.file_groups = self.symtable._xasgroups = {} 

47 

48 config = {} 

49 config.update(XASCONF) 

50 # may migrate old 'xas_viewer' folder to 'larix' folder 

51 xasv_folder = Path(user_larchdir, 'xas_viewer') 

52 if Path(xasv_folder).exists() and not Path(self.larix_folder).exists(): 

53 print("Migrating xas_viewer to larix folder") 

54 shutil.move(xasv_folder, self.larix_folder) 

55 

56 if not Path(self.larix_folder).exists(): 

57 try: 

58 mkdir(self.larix_folder) 

59 except: 

60 title = "Cannot create Larix folder" 

61 message = [f"Cannot create directory {larix_folder}"] 

62 ExceptionPopup(self, title, message) 

63 

64 

65 # may migrate old 'xas_viewer.conf' file to 'larix.conf' 

66 old_config_file = Path(self.larix_folder, OLDCONF_FILE).as_posix() 

67 if Path(old_config_file).exists() and not Path(self.config_file).exists(): 

68 shutil.move(old_config_file, self.config_file) 

69 

70 if Path(self.config_file).exists(): 

71 user_config = read_config(self.config_file) 

72 if user_config is not None: 

73 for sname in config: 

74 if sname in user_config: 

75 val = user_config[sname] 

76 if isinstance(val, dict): 

77 for k, v in val.items(): 

78 config[sname][k] = v 

79 else: 

80 config[sname] = val 

81 

82 self.config = self.larch.symtable._sys.larix_config = config 

83 self.larch.symtable._sys.wx.plotopts = config['plot'] 

84 self.clean_autosave_sessions() 

85 

86 

87 def install_group(self, groupname, filename, source=None, journal=None): 

88 """add groupname / filename to list of available data groups""" 

89 

90 try: 

91 thisgroup = getattr(self.symtable, groupname) 

92 except AttributeError: 

93 thisgroup = self.symtable.new_group(groupname) 

94 

95 # file /group may already exist in list 

96 if filename in self.file_groups: 

97 fbase, i = filename, 0 

98 while i < 50000 and filename in self.file_groups: 

99 filename = f"{fbase}_{i}" 

100 i += 1 

101 if i >= 50000: 

102 raise ValueError(f"Too many repeated filenames: {fbase}") 

103 

104 filename = filename.strip() 

105 if source is None: 

106 source = filename 

107 

108 jopts = f"source='{source}'" 

109 if isinstance(journal, dict): 

110 jnl = {'source': f"{source}"} 

111 jnl.update(journal) 

112 jopts = ', '.join([f"{k}='{v}'" for k, v in jnl.items()]) 

113 elif isinstance(journal, (list, Journal)): 

114 jopts = repr(journal) 

115 

116 cmds = [f"{groupname:s}.groupname = '{groupname:s}'", 

117 f"{groupname:s}.filename = '{filename:s}'"] 

118 needs_config = not hasattr(thisgroup, 'config') 

119 if needs_config: 

120 cmds.append(f"{groupname:s}.config = group(__name__='larix config')") 

121 

122 cmds.append(f"{groupname:s}.journal = journal({jopts:s})") 

123 

124 if hasattr(thisgroup, 'xdat') and not hasattr(thisgroup, 'xplot'): 

125 thisgroup.xplot = deepcopy(thisgroup.xdat) 

126 if hasattr(thisgroup, 'ydat') and not hasattr(thisgroup, 'yplot'): 

127 thisgroup.yplot = deepcopy(thisgroup.ydat) 

128 

129 datatype = getattr(thisgroup, 'datatype', 'xydata') 

130 if datatype == 'xas': 

131 cmds.append(f"{groupname:s}.energy_orig = {groupname:s}.energy[:]") 

132 array_labels = getattr(thisgroup, 'array_labels', []) 

133 if len(array_labels) > 2 and getattr(thisgroup, 'data', None) is not None: 

134 for i0name in ('i0', 'i_0', 'monitor'): 

135 if i0name in array_labels: 

136 i0x = array_labels.index(i0name) 

137 cmds.append(f"{groupname:s}.i0 = {groupname:s}.data[{i0x}, :]") 

138 

139 self.larch.eval('\n'.join(cmds)) 

140 

141 if needs_config: 

142 # print("INSTALL GROUP, needs config", thisgroup) 

143 # print('\n'.join(cmds)) 

144 self.init_group_config(thisgroup) 

145 

146 self.file_groups[filename] = groupname 

147 self.filelist.Append(filename) 

148 self.filelist.SetStringSelection(filename) 

149 self.sync_xasgroups() 

150 return filename 

151 

152 def sync_xasgroups(self): 

153 "make sure `_xasgroups` is identical to file_groups" 

154 if self.file_groups != self.symtable._xasgroups: 

155 self.symtable._xasgroups = self.file_groups 

156 

157 def get_config(self, key, default=None): 

158 "get top-level, program-wide configuration setting" 

159 if key not in self.config: 

160 return default 

161 return deepcopy(self.config[key]) 

162 

163 def init_group_config(self, dgroup): 

164 """set up 'config' group with values from self.config""" 

165 if not hasattr(dgroup, 'config'): 

166 dgroup.config = larch.Group(__name__='larix config') 

167 

168 for sect in ('exafs', 'feffit', 'lincombo', 'pca', 'prepeaks', 

169 'regression', 'xasnorm'): 

170 setattr(dgroup.config, sect, deepcopy(self.config[sect])) 

171 

172 def get_plot_conf(self): 

173 """get basic plot options to pass to plot() ** not window sizes **""" 

174 dx = {'linewidth': 3, 'markersize': 4, 

175 'show_grid': True, 'show_fullbox': True, 'theme': 'light'} 

176 pconf = self.config['plot'] 

177 out = {} 

178 for attr, val in dx.items(): 

179 out[attr] = pconf.get(attr, val) 

180 return out 

181 

182 def save_config(self): 

183 """save configuration""" 

184 save_config(self.config_file, self.config) 

185 

186 def chdir_on_fileopen(self): 

187 return self.config['main']['chdir_on_fileopen'] 

188 

189 def set_workdir(self): 

190 self.config['main']['workdir'] = get_cwd() 

191 

192 def save_workdir(self): 

193 """save last workdir and recent session files""" 

194 try: 

195 with open(Path(self.larix_folder, 'workdir.txt'), 'w') as fh: 

196 fh.write(f"{get_cwd()}\n") 

197 except: 

198 pass 

199 

200 buffer = [] 

201 rfiles = [] 

202 for tstamp, fname in sorted(self.recentfiles, key=lambda x: x[0], reverse=True)[:10]: 

203 if fname not in rfiles: 

204 buffer.append(f"{tstamp:.1f} {fname:s}") 

205 rfiles.append(fname) 

206 buffer.append('') 

207 buffer = '\n'.join(buffer) 

208 

209 try: 

210 with open(Path(self.larix_folder, 'recent_sessions.txt'), 'w') as fh: 

211 fh.write(buffer) 

212 except: 

213 pass 

214 

215 def init_workdir(self): 

216 """set initial working folder, read recent session files""" 

217 if self.config['main'].get('use_last_workdir', False): 

218 wfile = Path(self.larix_folder, 'workdir.txt') 

219 if wfile.exists(): 

220 try: 

221 with open(wfile, 'r') as fh: 

222 workdir = fh.readlines()[0][:-1] 

223 self.config['main']['workdir'] = workdir 

224 except: 

225 pass 

226 try: 

227 os.chdir(self.config['main']['workdir']) 

228 except: 

229 pass 

230 

231 rfile = Path(self.larix_folder, 'recent_sessions.txt') 

232 if rfile.exists(): 

233 with open(rfile, 'r') as fh: 

234 for line in fh.readlines(): 

235 if len(line) < 2 or line.startswith('#'): 

236 continue 

237 try: 

238 w = line[:-1].split(None, maxsplit=1) 

239 self.recentfiles.insert(0, (float(w[0]), w[1])) 

240 except: 

241 pass 

242 

243 

244 def autosave_session(self): 

245 conf = self.get_config('autosave', {}) 

246 fileroot = conf.get('fileroot', 'autosave') 

247 nhistory = max(8, int(conf.get('nhistory', 4))) 

248 

249 fname = f"{fileroot:s}_{get_sessionid():s}.larix" 

250 savefile = Path(self.larix_folder, fname).as_posix() 

251 for i in reversed(range(1, nhistory)): 

252 curf = savefile.replace('.larix', f'_{i:d}.larix' ) 

253 if Path(curf).exists(): 

254 newf = savefile.replace('.larix', f'_{i+1:d}.larix' ) 

255 shutil.move(curf, newf) 

256 if Path(savefile).exists(): 

257 curf = savefile.replace('.larix', '_1.larix' ) 

258 shutil.move(savefile, curf) 

259 save_session(savefile, _larch=self.larch) 

260 return savefile 

261 

262 def clean_autosave_sessions(self): 

263 conf = self.get_config('autosave', {}) 

264 fileroot = conf.get('fileroot', 'autosave') 

265 max_hist = int(conf.get('maxfiles', 10)) 

266 

267 def get_autosavefiles(): 

268 dat = [] 

269 for afile in os.listdir(self.larix_folder): 

270 ffile = Path(self.larix_folder, afile) 

271 if afile.endswith('.larix'): 

272 mtime = os.stat(ffile).st_mtime 

273 words = afile.replace('.larix', '').split('_') 

274 try: 

275 version = int(words[-1]) 

276 words.pop() 

277 except: 

278 version = 0 

279 dat.append((ffile.as_posix(), version, mtime)) 

280 return sorted(dat, key=lambda x: x[2]) 

281 

282 dat = get_autosavefiles() 

283 nremove = max(0, len(dat) - max_hist) 

284 # first remove oldest "version > 0" files 

285 while nremove > 0 and len(dat) > 0: 

286 dfile, version, mtime = dat.pop(0) 

287 if version > 0: 

288 os.unlink(dfile) 

289 nremove -= 1 

290 

291 dat = get_autosavefiles() 

292 nremove = max(0, len(dat) - max_hist) 

293 # then remove the oldest "version 0" files 

294 

295 while nremove > 0 and len(dat) > 0: 

296 dfile, vers, mtime = dat.pop(0) 

297 if vers == 0 and abs(mtime - time.time()) > 86400: 

298 os.unlink(dfile) 

299 nremove -= 1 

300 

301 def get_recentfiles(self, max=10): 

302 return sorted(self.recentfiles, key=lambda x: x[0], reverse=True)[:max] 

303 

304 def recent_autosave_sessions(self): 

305 "return list of (timestamp, name) for most recent autosave session files" 

306 conf = self.get_config('autosave', {}) 

307 fileroot = conf.get('fileroot', 'autosave') 

308 max_hist = int(conf.get('maxfiles', 10)) 

309 flist = [] 

310 for afile in os.listdir(self.larix_folder): 

311 ffile = Path(self.larix_folder, afile).as_posix() 

312 if ffile.endswith('.larix'): 

313 flist.append((os.stat(ffile).st_mtime, ffile)) 

314 

315 return sorted(flist, key=lambda x: x[0], reverse=True)[:max_hist] 

316 

317 

318 def clear_session(self): 

319 self.larch.eval("clear_session()") 

320 self.filelist.Clear() 

321 self.init_larch_session() 

322 

323 

324 def write_message(self, msg, panel=0): 

325 """write a message to the Status Bar""" 

326 self.wxparent.statusbar.SetStatusText(msg, panel) 

327 

328 def close_all_displays(self): 

329 "close all displays, as at exit" 

330 self.symtable._plotter.close_all_displays() 

331 

332 def get_display(self, win=1, stacked=False, 

333 size=None, position=None): 

334 wintitle='Larch XAS Plot Window %i' % win 

335 

336 conf = self.get_config('plot') 

337 opts = dict(wintitle=wintitle, stacked=stacked, 

338 size=size, position=position, win=win) 

339 opts.update(conf) 

340 return self.symtable._plotter.get_display(**opts) 

341 

342 def set_focus(self, topwin=None): 

343 """ 

344 set wx focus to main window or selected Window, 

345 even after plot 

346 """ 

347 if topwin is None: 

348 topwin = wx.GetApp().GetTopWindow() 

349 flist = self.filelist 

350 else: 

351 flist = getattr(topwin, 'filelist', topwin) 

352 time.sleep(0.025) 

353 topwin.Raise() 

354 

355 def get_group(self, groupname=None): 

356 if groupname is None: 

357 groupname = self.groupname 

358 if groupname is None: 

359 return None 

360 dgroup = getattr(self.symtable, groupname, None) 

361 if dgroup is None and groupname in self.file_groups: 

362 groupname = self.file_groups[groupname] 

363 dgroup = getattr(self.symtable, groupname, None) 

364 

365 if dgroup is None and len(self.file_groups) > 0: 

366 gname = list(self.file_groups.keys())[0] 

367 dgroup = getattr(self.symtable, gname, None) 

368 return dgroup 

369 

370 def filename2group(self, filename): 

371 "convert filename (as displayed) to larch group" 

372 return self.get_group(self.file_groups[str(filename)]) 

373 

374 def merge_groups(self, grouplist, master=None, yarray='mu', outgroup=None): 

375 """merge groups""" 

376 if master is None: 

377 master = grouplist[0] 

378 gmaster = self.get_group(master) 

379 xarray = 'xplot' if gmaster.datatype=='xydata' else 'energy' 

380 outgroup = fix_varname(outgroup.lower()) 

381 if outgroup is None: 

382 outgroup = 'merged' 

383 outgroup = unique_name(outgroup, self.file_groups, max=1000) 

384 

385 glist = "[%s]" % (', '.join(grouplist)) 

386 

387 cmd = f"""{outgroup} = merge_groups({glist}, master={master}, 

388 xarray='{xarray}', yarray='{yarray}', kind='cubic', trim=True)""" 

389 self.larch.eval(cmd) 

390 

391 this = self.get_group(outgroup) 

392 if not hasattr(gmaster, 'config'): 

393 self.init_group_config(gmaster) 

394 if not hasattr(this, 'config'): 

395 self.init_group_config(this) 

396 this.config.xasnorm.update(gmaster.config.xasnorm) 

397 this.datatype = gmaster.datatype 

398 if xarray == 'energy': 

399 this.xplot = 1.0*this.energy 

400 this.yplot = 1.0*getattr(this, yarray) 

401 this.yerr = getattr(this, 'd' + yarray, 1.0) 

402 if yarray != 'mu': 

403 this.mu = this.yplot 

404 this.plot_xlabel = xarray 

405 this.plot_ylabel = yarray 

406 return this 

407 

408 def set_plot_erange(self, erange): 

409 self.plot_erange = erange 

410 

411 def copy_group(self, filename, new_filename=None): 

412 """copy XAS group (by filename) to new group""" 

413 groupname = self.file_groups[filename] 

414 if not hasattr(self.larch.symtable, groupname): 

415 return 

416 

417 ogroup = self.get_group(groupname) 

418 ngroup = larch.Group(datatype=ogroup.datatype, copied_from=groupname) 

419 

420 for attr in dir(ogroup): 

421 val = getattr(ogroup, attr, None) 

422 if val is not None: 

423 setattr(ngroup, attr, deepcopy(val)) 

424 

425 if new_filename is None: 

426 new_filename = filename + '_1' 

427 ngroup.filename = unique_name(new_filename, self.file_groups.keys()) 

428 ngroup.groupname = unique_name(groupname, self.file_groups.values()) 

429 ngroup.journal.add('source_desc', f"copied from '{filename:s}'") 

430 setattr(self.larch.symtable, ngroup.groupname, ngroup) 

431 return ngroup 

432 

433 def get_cursor(self, win=None): 

434 """get last cursor from selected window""" 

435 return last_cursor_pos(win=win, _larch=self.larch) 

436 

437 def plot_group(self, groupname=None, title=None, plot_yarrays=None, 

438 new=True, **kws): 

439 ppanel = self.get_display(stacked=False).panel 

440 newplot = ppanel.plot 

441 oplot = ppanel.oplot 

442 plotcmd = oplot 

443 viewlims = ppanel.get_viewlimits() 

444 if new: 

445 plotcmd = newplot 

446 

447 dgroup = self.get_group(groupname) 

448 if not hasattr(dgroup, 'xplot'): 

449 if hasattr(dgroup, 'xdat'): 

450 dgroup.xplot = deepcopy(dgroup.xdat) 

451 else: 

452 print("Cannot plot group ", groupname) 

453 

454 if ((getattr(dgroup, 'plot_yarrays', None) is None or 

455 getattr(dgroup, 'energy', None) is None or 

456 getattr(dgroup, 'mu', None) is None)): 

457 self.process(dgroup) 

458 

459 if plot_yarrays is None and hasattr(dgroup, 'plot_yarrays'): 

460 plot_yarrays = dgroup.plot_yarrays 

461 

462 popts = kws 

463 fname = Path(dgroup.filename).name 

464 if not 'label' in popts: 

465 popts['label'] = dgroup.plot_ylabel 

466 

467 popts['xlabel'] = dgroup.plot_xlabel 

468 popts['ylabel'] = dgroup.plot_ylabel 

469 if getattr(dgroup, 'plot_y2label', None) is not None: 

470 popts['y2label'] = dgroup.plot_y2label 

471 

472 plot_extras = None 

473 if new: 

474 if title is None: 

475 title = fname 

476 plot_extras = getattr(dgroup, 'plot_extras', None) 

477 

478 popts['title'] = title 

479 

480 narr = len(plot_yarrays) - 1 

481 for i, pydat in enumerate(plot_yarrays): 

482 yaname, yopts, yalabel = pydat 

483 popts.update(yopts) 

484 if yalabel is not None: 

485 popts['label'] = yalabel 

486 popts['delay_draw'] = (i != narr) 

487 

488 plotcmd(dgroup.xplot, getattr(dgroup, yaname), **popts) 

489 plotcmd = oplot 

490 

491 if plot_extras is not None: 

492 axes = ppanel.axes 

493 for etype, x, y, opts in plot_extras: 

494 if etype == 'marker': 

495 popts = {'marker': 'o', 'markersize': 4, 

496 'label': '_nolegend_', 

497 'markerfacecolor': 'red', 

498 'markeredgecolor': '#884444'} 

499 popts.update(opts) 

500 axes.plot([x], [y], **popts) 

501 elif etype == 'vline': 

502 popts = {'ymin': 0, 'ymax': 1.0, 

503 'color': '#888888'} 

504 popts.update(opts) 

505 axes.axvline(x, **popts) 

506 ppanel.canvas.draw() 

507 self.set_focus()