Coverage for larch/wxxas/xas_controller.py: 0%
381 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1import os
2import time
3import shutil
4from glob import glob
5from pathlib import Path
6from copy import deepcopy
8import numpy as np
9import wx
11import larch
12from larch import Group, Journal, Entry
13from larch.larchlib import read_config, save_config
14from larch.utils import (group2dict, unique_name, fix_varname, get_cwd,
15 asfloat, get_sessionid, mkdir, unixpath)
16from larch.wxlib.plotter import last_cursor_pos
17from larch.wxlib import ExceptionPopup
18from larch.io import save_session
19from larch.site_config import home_dir, user_larchdir
21from .config import XASCONF, CONF_FILE, OLDCONF_FILE
23class XASController():
24 """
25 class holding the Larch session and doing the processing work for Larix
26 """
27 def __init__(self, wxparent=None, _larch=None):
28 self.wxparent = wxparent
29 self.filelist = None
30 self.group = None
31 self.groupname = None
32 self.plot_erange = None
33 self.report_frame = None
34 self.recentfiles = []
35 self.panels = {}
36 self.larch = _larch
37 if _larch is None:
38 self.larch = larch.Interpreter()
39 self.larix_folder = Path(user_larchdir, 'larix').as_posix()
40 self.config_file = Path(self.larix_folder, CONF_FILE).as_posix()
41 self.init_larch_session()
42 self.init_workdir()
44 def init_larch_session(self):
45 self.symtable = self.larch.symtable
46 self.file_groups = self.symtable._xasgroups = {}
48 config = {}
49 config.update(XASCONF)
50 # may migrate old 'xas_viewer' folder to 'larix' folder
51 xasv_folder = Path(user_larchdir, 'xas_viewer')
52 if Path(xasv_folder).exists() and not Path(self.larix_folder).exists():
53 print("Migrating xas_viewer to larix folder")
54 shutil.move(xasv_folder, self.larix_folder)
56 if not Path(self.larix_folder).exists():
57 try:
58 mkdir(self.larix_folder)
59 except:
60 title = "Cannot create Larix folder"
61 message = [f"Cannot create directory {larix_folder}"]
62 ExceptionPopup(self, title, message)
65 # may migrate old 'xas_viewer.conf' file to 'larix.conf'
66 old_config_file = Path(self.larix_folder, OLDCONF_FILE).as_posix()
67 if Path(old_config_file).exists() and not Path(self.config_file).exists():
68 shutil.move(old_config_file, self.config_file)
70 if Path(self.config_file).exists():
71 user_config = read_config(self.config_file)
72 if user_config is not None:
73 for sname in config:
74 if sname in user_config:
75 val = user_config[sname]
76 if isinstance(val, dict):
77 for k, v in val.items():
78 config[sname][k] = v
79 else:
80 config[sname] = val
82 self.config = self.larch.symtable._sys.larix_config = config
83 self.larch.symtable._sys.wx.plotopts = config['plot']
84 self.clean_autosave_sessions()
87 def install_group(self, groupname, filename, source=None, journal=None):
88 """add groupname / filename to list of available data groups"""
90 try:
91 thisgroup = getattr(self.symtable, groupname)
92 except AttributeError:
93 thisgroup = self.symtable.new_group(groupname)
95 # file /group may already exist in list
96 if filename in self.file_groups:
97 fbase, i = filename, 0
98 while i < 50000 and filename in self.file_groups:
99 filename = f"{fbase}_{i}"
100 i += 1
101 if i >= 50000:
102 raise ValueError(f"Too many repeated filenames: {fbase}")
104 filename = filename.strip()
105 if source is None:
106 source = filename
108 jopts = f"source='{source}'"
109 if isinstance(journal, dict):
110 jnl = {'source': f"{source}"}
111 jnl.update(journal)
112 jopts = ', '.join([f"{k}='{v}'" for k, v in jnl.items()])
113 elif isinstance(journal, (list, Journal)):
114 jopts = repr(journal)
116 cmds = [f"{groupname:s}.groupname = '{groupname:s}'",
117 f"{groupname:s}.filename = '{filename:s}'"]
118 needs_config = not hasattr(thisgroup, 'config')
119 if needs_config:
120 cmds.append(f"{groupname:s}.config = group(__name__='larix config')")
122 cmds.append(f"{groupname:s}.journal = journal({jopts:s})")
124 if hasattr(thisgroup, 'xdat') and not hasattr(thisgroup, 'xplot'):
125 thisgroup.xplot = deepcopy(thisgroup.xdat)
126 if hasattr(thisgroup, 'ydat') and not hasattr(thisgroup, 'yplot'):
127 thisgroup.yplot = deepcopy(thisgroup.ydat)
129 datatype = getattr(thisgroup, 'datatype', 'xydata')
130 if datatype == 'xas':
131 cmds.append(f"{groupname:s}.energy_orig = {groupname:s}.energy[:]")
132 array_labels = getattr(thisgroup, 'array_labels', [])
133 if len(array_labels) > 2 and getattr(thisgroup, 'data', None) is not None:
134 for i0name in ('i0', 'i_0', 'monitor'):
135 if i0name in array_labels:
136 i0x = array_labels.index(i0name)
137 cmds.append(f"{groupname:s}.i0 = {groupname:s}.data[{i0x}, :]")
139 self.larch.eval('\n'.join(cmds))
141 if needs_config:
142 # print("INSTALL GROUP, needs config", thisgroup)
143 # print('\n'.join(cmds))
144 self.init_group_config(thisgroup)
146 self.file_groups[filename] = groupname
147 self.filelist.Append(filename)
148 self.filelist.SetStringSelection(filename)
149 self.sync_xasgroups()
150 return filename
152 def sync_xasgroups(self):
153 "make sure `_xasgroups` is identical to file_groups"
154 if self.file_groups != self.symtable._xasgroups:
155 self.symtable._xasgroups = self.file_groups
157 def get_config(self, key, default=None):
158 "get top-level, program-wide configuration setting"
159 if key not in self.config:
160 return default
161 return deepcopy(self.config[key])
163 def init_group_config(self, dgroup):
164 """set up 'config' group with values from self.config"""
165 if not hasattr(dgroup, 'config'):
166 dgroup.config = larch.Group(__name__='larix config')
168 for sect in ('exafs', 'feffit', 'lincombo', 'pca', 'prepeaks',
169 'regression', 'xasnorm'):
170 setattr(dgroup.config, sect, deepcopy(self.config[sect]))
172 def get_plot_conf(self):
173 """get basic plot options to pass to plot() ** not window sizes **"""
174 dx = {'linewidth': 3, 'markersize': 4,
175 'show_grid': True, 'show_fullbox': True, 'theme': 'light'}
176 pconf = self.config['plot']
177 out = {}
178 for attr, val in dx.items():
179 out[attr] = pconf.get(attr, val)
180 return out
182 def save_config(self):
183 """save configuration"""
184 save_config(self.config_file, self.config)
186 def chdir_on_fileopen(self):
187 return self.config['main']['chdir_on_fileopen']
189 def set_workdir(self):
190 self.config['main']['workdir'] = get_cwd()
192 def save_workdir(self):
193 """save last workdir and recent session files"""
194 try:
195 with open(Path(self.larix_folder, 'workdir.txt'), 'w') as fh:
196 fh.write(f"{get_cwd()}\n")
197 except:
198 pass
200 buffer = []
201 rfiles = []
202 for tstamp, fname in sorted(self.recentfiles, key=lambda x: x[0], reverse=True)[:10]:
203 if fname not in rfiles:
204 buffer.append(f"{tstamp:.1f} {fname:s}")
205 rfiles.append(fname)
206 buffer.append('')
207 buffer = '\n'.join(buffer)
209 try:
210 with open(Path(self.larix_folder, 'recent_sessions.txt'), 'w') as fh:
211 fh.write(buffer)
212 except:
213 pass
215 def init_workdir(self):
216 """set initial working folder, read recent session files"""
217 if self.config['main'].get('use_last_workdir', False):
218 wfile = Path(self.larix_folder, 'workdir.txt')
219 if wfile.exists():
220 try:
221 with open(wfile, 'r') as fh:
222 workdir = fh.readlines()[0][:-1]
223 self.config['main']['workdir'] = workdir
224 except:
225 pass
226 try:
227 os.chdir(self.config['main']['workdir'])
228 except:
229 pass
231 rfile = Path(self.larix_folder, 'recent_sessions.txt')
232 if rfile.exists():
233 with open(rfile, 'r') as fh:
234 for line in fh.readlines():
235 if len(line) < 2 or line.startswith('#'):
236 continue
237 try:
238 w = line[:-1].split(None, maxsplit=1)
239 self.recentfiles.insert(0, (float(w[0]), w[1]))
240 except:
241 pass
244 def autosave_session(self):
245 conf = self.get_config('autosave', {})
246 fileroot = conf.get('fileroot', 'autosave')
247 nhistory = max(8, int(conf.get('nhistory', 4)))
249 fname = f"{fileroot:s}_{get_sessionid():s}.larix"
250 savefile = Path(self.larix_folder, fname).as_posix()
251 for i in reversed(range(1, nhistory)):
252 curf = savefile.replace('.larix', f'_{i:d}.larix' )
253 if Path(curf).exists():
254 newf = savefile.replace('.larix', f'_{i+1:d}.larix' )
255 shutil.move(curf, newf)
256 if Path(savefile).exists():
257 curf = savefile.replace('.larix', '_1.larix' )
258 shutil.move(savefile, curf)
259 save_session(savefile, _larch=self.larch)
260 return savefile
262 def clean_autosave_sessions(self):
263 conf = self.get_config('autosave', {})
264 fileroot = conf.get('fileroot', 'autosave')
265 max_hist = int(conf.get('maxfiles', 10))
267 def get_autosavefiles():
268 dat = []
269 for afile in os.listdir(self.larix_folder):
270 ffile = Path(self.larix_folder, afile)
271 if afile.endswith('.larix'):
272 mtime = os.stat(ffile).st_mtime
273 words = afile.replace('.larix', '').split('_')
274 try:
275 version = int(words[-1])
276 words.pop()
277 except:
278 version = 0
279 dat.append((ffile.as_posix(), version, mtime))
280 return sorted(dat, key=lambda x: x[2])
282 dat = get_autosavefiles()
283 nremove = max(0, len(dat) - max_hist)
284 # first remove oldest "version > 0" files
285 while nremove > 0 and len(dat) > 0:
286 dfile, version, mtime = dat.pop(0)
287 if version > 0:
288 os.unlink(dfile)
289 nremove -= 1
291 dat = get_autosavefiles()
292 nremove = max(0, len(dat) - max_hist)
293 # then remove the oldest "version 0" files
295 while nremove > 0 and len(dat) > 0:
296 dfile, vers, mtime = dat.pop(0)
297 if vers == 0 and abs(mtime - time.time()) > 86400:
298 os.unlink(dfile)
299 nremove -= 1
301 def get_recentfiles(self, max=10):
302 return sorted(self.recentfiles, key=lambda x: x[0], reverse=True)[:max]
304 def recent_autosave_sessions(self):
305 "return list of (timestamp, name) for most recent autosave session files"
306 conf = self.get_config('autosave', {})
307 fileroot = conf.get('fileroot', 'autosave')
308 max_hist = int(conf.get('maxfiles', 10))
309 flist = []
310 for afile in os.listdir(self.larix_folder):
311 ffile = Path(self.larix_folder, afile).as_posix()
312 if ffile.endswith('.larix'):
313 flist.append((os.stat(ffile).st_mtime, ffile))
315 return sorted(flist, key=lambda x: x[0], reverse=True)[:max_hist]
318 def clear_session(self):
319 self.larch.eval("clear_session()")
320 self.filelist.Clear()
321 self.init_larch_session()
324 def write_message(self, msg, panel=0):
325 """write a message to the Status Bar"""
326 self.wxparent.statusbar.SetStatusText(msg, panel)
328 def close_all_displays(self):
329 "close all displays, as at exit"
330 self.symtable._plotter.close_all_displays()
332 def get_display(self, win=1, stacked=False,
333 size=None, position=None):
334 wintitle='Larch XAS Plot Window %i' % win
336 conf = self.get_config('plot')
337 opts = dict(wintitle=wintitle, stacked=stacked,
338 size=size, position=position, win=win)
339 opts.update(conf)
340 return self.symtable._plotter.get_display(**opts)
342 def set_focus(self, topwin=None):
343 """
344 set wx focus to main window or selected Window,
345 even after plot
346 """
347 if topwin is None:
348 topwin = wx.GetApp().GetTopWindow()
349 flist = self.filelist
350 else:
351 flist = getattr(topwin, 'filelist', topwin)
352 time.sleep(0.025)
353 topwin.Raise()
355 def get_group(self, groupname=None):
356 if groupname is None:
357 groupname = self.groupname
358 if groupname is None:
359 return None
360 dgroup = getattr(self.symtable, groupname, None)
361 if dgroup is None and groupname in self.file_groups:
362 groupname = self.file_groups[groupname]
363 dgroup = getattr(self.symtable, groupname, None)
365 if dgroup is None and len(self.file_groups) > 0:
366 gname = list(self.file_groups.keys())[0]
367 dgroup = getattr(self.symtable, gname, None)
368 return dgroup
370 def filename2group(self, filename):
371 "convert filename (as displayed) to larch group"
372 return self.get_group(self.file_groups[str(filename)])
374 def merge_groups(self, grouplist, master=None, yarray='mu', outgroup=None):
375 """merge groups"""
376 if master is None:
377 master = grouplist[0]
378 gmaster = self.get_group(master)
379 xarray = 'xplot' if gmaster.datatype=='xydata' else 'energy'
380 outgroup = fix_varname(outgroup.lower())
381 if outgroup is None:
382 outgroup = 'merged'
383 outgroup = unique_name(outgroup, self.file_groups, max=1000)
385 glist = "[%s]" % (', '.join(grouplist))
387 cmd = f"""{outgroup} = merge_groups({glist}, master={master},
388 xarray='{xarray}', yarray='{yarray}', kind='cubic', trim=True)"""
389 self.larch.eval(cmd)
391 this = self.get_group(outgroup)
392 if not hasattr(gmaster, 'config'):
393 self.init_group_config(gmaster)
394 if not hasattr(this, 'config'):
395 self.init_group_config(this)
396 this.config.xasnorm.update(gmaster.config.xasnorm)
397 this.datatype = gmaster.datatype
398 if xarray == 'energy':
399 this.xplot = 1.0*this.energy
400 this.yplot = 1.0*getattr(this, yarray)
401 this.yerr = getattr(this, 'd' + yarray, 1.0)
402 if yarray != 'mu':
403 this.mu = this.yplot
404 this.plot_xlabel = xarray
405 this.plot_ylabel = yarray
406 return this
408 def set_plot_erange(self, erange):
409 self.plot_erange = erange
411 def copy_group(self, filename, new_filename=None):
412 """copy XAS group (by filename) to new group"""
413 groupname = self.file_groups[filename]
414 if not hasattr(self.larch.symtable, groupname):
415 return
417 ogroup = self.get_group(groupname)
418 ngroup = larch.Group(datatype=ogroup.datatype, copied_from=groupname)
420 for attr in dir(ogroup):
421 val = getattr(ogroup, attr, None)
422 if val is not None:
423 setattr(ngroup, attr, deepcopy(val))
425 if new_filename is None:
426 new_filename = filename + '_1'
427 ngroup.filename = unique_name(new_filename, self.file_groups.keys())
428 ngroup.groupname = unique_name(groupname, self.file_groups.values())
429 ngroup.journal.add('source_desc', f"copied from '{filename:s}'")
430 setattr(self.larch.symtable, ngroup.groupname, ngroup)
431 return ngroup
433 def get_cursor(self, win=None):
434 """get last cursor from selected window"""
435 return last_cursor_pos(win=win, _larch=self.larch)
437 def plot_group(self, groupname=None, title=None, plot_yarrays=None,
438 new=True, **kws):
439 ppanel = self.get_display(stacked=False).panel
440 newplot = ppanel.plot
441 oplot = ppanel.oplot
442 plotcmd = oplot
443 viewlims = ppanel.get_viewlimits()
444 if new:
445 plotcmd = newplot
447 dgroup = self.get_group(groupname)
448 if not hasattr(dgroup, 'xplot'):
449 if hasattr(dgroup, 'xdat'):
450 dgroup.xplot = deepcopy(dgroup.xdat)
451 else:
452 print("Cannot plot group ", groupname)
454 if ((getattr(dgroup, 'plot_yarrays', None) is None or
455 getattr(dgroup, 'energy', None) is None or
456 getattr(dgroup, 'mu', None) is None)):
457 self.process(dgroup)
459 if plot_yarrays is None and hasattr(dgroup, 'plot_yarrays'):
460 plot_yarrays = dgroup.plot_yarrays
462 popts = kws
463 fname = Path(dgroup.filename).name
464 if not 'label' in popts:
465 popts['label'] = dgroup.plot_ylabel
467 popts['xlabel'] = dgroup.plot_xlabel
468 popts['ylabel'] = dgroup.plot_ylabel
469 if getattr(dgroup, 'plot_y2label', None) is not None:
470 popts['y2label'] = dgroup.plot_y2label
472 plot_extras = None
473 if new:
474 if title is None:
475 title = fname
476 plot_extras = getattr(dgroup, 'plot_extras', None)
478 popts['title'] = title
480 narr = len(plot_yarrays) - 1
481 for i, pydat in enumerate(plot_yarrays):
482 yaname, yopts, yalabel = pydat
483 popts.update(yopts)
484 if yalabel is not None:
485 popts['label'] = yalabel
486 popts['delay_draw'] = (i != narr)
488 plotcmd(dgroup.xplot, getattr(dgroup, yaname), **popts)
489 plotcmd = oplot
491 if plot_extras is not None:
492 axes = ppanel.axes
493 for etype, x, y, opts in plot_extras:
494 if etype == 'marker':
495 popts = {'marker': 'o', 'markersize': 4,
496 'label': '_nolegend_',
497 'markerfacecolor': 'red',
498 'markeredgecolor': '#884444'}
499 popts.update(opts)
500 axes.plot([x], [y], **popts)
501 elif etype == 'vline':
502 popts = {'ymin': 0, 'ymax': 1.0,
503 'color': '#888888'}
504 popts.update(opts)
505 axes.axvline(x, **popts)
506 ppanel.canvas.draw()
507 self.set_focus()