Coverage for larch/epics/xrf_detectors.py: 18%

385 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1import time 

2import numpy as np 

3from functools import partial 

4 

5try: 

6 import epics 

7 from epics.devices.mca import MultiXMAP 

8 from epics.devices.struck import Struck 

9 from epics.wx import EpicsFunction, DelayedEpicsCallback 

10 HAS_EPICS = True 

11except (NameError, ImportError, AttributeError): 

12 HAS_EPICS = False 

13 EpicsFunction = lambda fcn: fcn 

14 DelayedEpicsCallback = lambda fcn: fcn 

15 

16from .xspress3 import Xspress3, Xspress310 

17 

18from ..xrf import MCA, ROI, Environment 

19 

20def save_gsemcafile(filename, mcas, rois, environ=None): 

21 """save GSECARS-style MCA file 

22 

23 rois = self._xsp3.get_rois() 

24 realtime = self._xsp3.AcquireTime * self._xsp3.ArrayCounter_RBV 

25 nelem = len(self._xsp3.mcas) 

26 mcas = [self.get_mca(mca=i+1, with_rois=False) for i in range(nelem)] 

27 

28 npts = len(mcas[0].counts) 

29 nrois = len(rois[0]) 

30 nelem = len(mcas) 

31 """ 

32 nelem = len(mcas) 

33 npts = len(mcas[0].counts) 

34 nrois = len(rois[0]) 

35 

36 s_rtime = " ".join(["%.4f" % m.real_time for m in mcas]) 

37 s_off = " ".join(["%.6f" % m.offset for m in mcas]) 

38 s_quad = " ".join(["%.6f" % m.quad for m in mcas]) 

39 s_slope = " ".join(["%.6f" % m.slope for m in mcas]) 

40 s_rois = " ".join(["%i" % nrois for m in mcas]) 

41 

42 buff = [] 

43 buff.append('VERSION: 3.1') 

44 buff.append('ELEMENTS: %i' % nelem) 

45 buff.append('DATE: %s' % time.ctime()) 

46 buff.append('CHANNELS: %i' % npts) 

47 buff.append('REAL_TIME: %s' % s_rtime) 

48 buff.append('LIVE_TIME: %s' % s_rtime) 

49 buff.append('CAL_OFFSET: %s' % s_off) 

50 buff.append('CAL_SLOPE: %s' % s_slope) 

51 buff.append('CAL_QUAD: %s' % s_quad) 

52 

53 # Write ROIS in channel units 

54 buff.append('ROIS: %s' % s_rois) 

55 for iroi, roi in enumerate(rois[0]): 

56 name = [roi.name] 

57 left = ['%i' % roi.left] 

58 right = ['%i' % roi.right] 

59 for other in rois[1:]: 

60 name.append(other[iroi].name) 

61 left.append('%i' % other[iroi].left) 

62 right.append('%i' % other[iroi].right) 

63 name = '& '.join(name) 

64 left = '& '.join(left) 

65 right = '& '.join(right) 

66 buff.append('ROI_%i_LEFT: %s' % (iroi, left)) 

67 buff.append('ROI_%i_RIGHT: %s' % (iroi, right)) 

68 buff.append('ROI_%i_LABEL: %s' % (iroi, name)) 

69 

70 # environment 

71 if environ is None: 

72 environ = [] 

73 for addr, val, desc in environ: 

74 buff.append('ENVIRONMENT: %s="%s" (%s)' % (addr, val, desc)) 

75 

76 # data 

77 buff.append('DATA: ') 

78 for i in range(npts): 

79 x = ['%i' % m.counts[i] for m in mcas] 

80 buff.append("%s" % ' '.join(x)) 

81 

82 # write file 

83 buff.append('') 

84 fp = open(filename, 'w') 

85 fp.write("\n".join(buff)) 

86 fp.close() 

87 

88class Epics_Xspress3(object): 

89 """ 

90 multi-element MCA detector using Quantum Xspress3 electronics 

91 and Epics IOC based on AreaDetector2 IOC (3.2?) 

92 """ 

93 MIN_FRAMETIME = 0.25 

94 MAX_FRAMES = 12000 

95 def __init__(self, prefix=None, nmca=4, version=2, use_sum=True, **kws): 

96 self.nmca = nmca 

97 self.prefix = prefix 

98 self.version = version 

99 self.mca_array_name = 'MCASUM%i:ArrayData' 

100 if not use_sum: 

101 self.mca_array_name = 'MCA%i:ArrayData' 

102 if version < 2: 

103 self.mca_array_name = 'ARRSUM%i:ArrayData' 

104 self.environ = [] 

105 self.mcas = [] 

106 self.npts = 4096 

107 self.energies = [] 

108 self.connected = False 

109 self.elapsed_real = None 

110 self.elapsed_textwidget = None 

111 self.needs_refresh = False 

112 self._xsp3 = None 

113 if self.prefix is not None: 

114 self.connect() 

115 

116 self.nframes = 1 

117 self.frametime = 1.0 

118 

119 # determine max frames 

120 self.frametime = self.MIN_FRAMETIME 

121 self._xsp3._pvs['NumImages'].put(self.MAX_FRAMES, wait=True) 

122 time.sleep(0.05) 

123 rbv = self._xsp3.NumImages_RBV 

124 while rbv != self.MAX_FRAMES: 

125 self.MAX_FRAMES = self.MAX_FRAMES - 500.0 

126 self._xsp3._pvs['NumImages'].put(self.MAX_FRAMES, wait=True) 

127 time.sleep(0.1) 

128 rbv = self._xsp3.NumImages_RBV 

129 if self.MAX_FRAMES < 4000: 

130 break 

131 

132 # @EpicsFunction 

133 def connect(self): 

134 Creator = Xspress3 

135 if self.version < 2: 

136 Creator = Xspress310 

137 self._xsp3 = Creator(self.prefix, nmca=self.nmca) 

138 

139 counterpv = self._xsp3.PV('ArrayCounter_RBV') 

140 counterpv.clear_callbacks() 

141 counterpv.add_callback(self.onRealTime) 

142 for imca in range(1, self.nmca+1): 

143 self._xsp3.PV(self.mca_array_name % imca, auto_monitor=False) 

144 time.sleep(0.001) 

145 self.connected = True 

146 self.mcas = self._xsp3.mcas 

147 

148 @EpicsFunction 

149 def connect_displays(self, status=None, elapsed=None, deadtime=None): 

150 if elapsed is not None: 

151 self.elapsed_textwidget = elapsed 

152 if status is not None: 

153 pvs = self._xsp3._pvs 

154 attr = 'StatusMessage_RBV' 

155 pvs[attr].add_callback(partial(self.update_widget, wid=status)) 

156 

157 @DelayedEpicsCallback 

158 def update_widget(self, pvname, char_value=None, wid=None, **kws): 

159 if wid is not None: 

160 wid.SetLabel(char_value) 

161 

162 @DelayedEpicsCallback 

163 def onRealTime(self, pvname, value=None, **kws): 

164 self.elapsed_real = value * self.frametime 

165 self.needs_refresh = True 

166 if self.elapsed_textwidget is not None: 

167 self.elapsed_textwidget.SetLabel(" %8.2f" % self.elapsed_real) 

168 

169 def get_deadtime(self, mca=1): 

170 """return % deadtime""" 

171 try: 

172 dval = self._xsp3.get("C%iSCA:10:Value_RBV" % (mca)) 

173 except: 

174 dval = 0.0 

175 return dval 

176 

177 def get_dtfactor(self, mca=1): 

178 """return deadtime correction factor""" 

179 try: 

180 dval = self._xsp3.get("C%iSCA:9:Value_RBV" % (mca)) 

181 except: 

182 dval = 1.0 

183 return dval 

184 

185 def set_usesum(self, use_sum=True): 

186 self.mca_array_name = 'MCASUM%i:ArrayData' 

187 if not use_sum: 

188 self.mca_array_name = 'MCA%i:ArrayData' 

189 

190 def set_dwelltime(self, dtime=1.0, nframes=None, **kws): 

191 self._xsp3.useInternalTrigger() 

192 self._xsp3.FileCaptureOff() 

193 

194 if nframes is None: 

195 # count forever, or close to it 

196 frametime = self.MIN_FRAMETIME 

197 if dtime < self.MIN_FRAMETIME: 

198 nframes = self.MAX_FRAMES 

199 elif dtime > self.MAX_FRAMES*self.MIN_FRAMETIME: 

200 nframes = self.MAX_FRAMES 

201 frametime = 1.0*dtime/nframes 

202 else: 

203 nframes = int((dtime+frametime*0.1)/frametime) 

204 else: 

205 frametime = dtime 

206 

207 self._xsp3.NumImages = self.nframes = nframes 

208 self._xsp3.AcquireTime = self.frametime = frametime 

209 

210 def get_frametime(self): 

211 self.nframes = self._xsp3.NumImages 

212 self.frametime = self._xsp3.AcquireTime 

213 return self.frametime, self.nframes 

214 

215 def get_mca(self, mca=1, with_rois=True): 

216 if self._xsp3 is None: 

217 self.connect() 

218 time.sleep(0.5) 

219 

220 emca = self._xsp3.mcas[mca-1] 

221 if with_rois: 

222 emca.get_rois() 

223 

224 counts = self.get_array(mca=mca) 

225 if max(counts) < 1.0: 

226 counts = 0.5*np.ones(len(counts)) 

227 counts[0] = 2.0 

228 

229 thismca = MCA(counts=counts, offset=0.0, slope=0.01) 

230 thismca.energy = self.get_energy() 

231 thismca.dt_factor = self.get_dtfactor() 

232 thismca.counts = counts 

233 thismca.quad = 0.0 

234 thismca.rois = [] 

235 if with_rois: 

236 for eroi in emca.rois: 

237 thismca.rois.append(ROI(name=eroi.name, address=eroi._prefix, 

238 left=eroi.left, right=eroi.right)) 

239 return thismca 

240 

241 def get_energy(self, mca=1): 

242 return np.arange(self.npts)*.010 

243 

244 def get_array(self, mca=1): 

245 try: 

246 out = 1.0*self._xsp3.get(self.mca_array_name % mca) 

247 except TypeError: 

248 out = np.arange(self.npts)*0.91 

249 

250 if len(out) < 1: 

251 out = np.ones(self.npts)*1.7 + np.sin(np.arange(self.npts)/177.0)*0.8 

252 

253 

254 

255 if len(out) != self.npts and len(out)>0: 

256 self.npts = len(out) 

257 out[np.where(out<0.91)]= 0.91 

258 return out 

259 

260 def start(self, erase=True): 

261 'xspress3 start ' 

262 self.stop() 

263 if erase: 

264 self._xsp3.ERASE = 1 

265 time.sleep(0.01) 

266 

267 return self._xsp3.start(capture=False) 

268 

269 def stop(self, timeout=0.5): 

270 self._xsp3.stop() 

271 t0 = time.time() 

272 while self._xsp3.Acquire_RBV == 1 and time.time()-t0 < timeout: 

273 self._xsp3.stop() 

274 time.sleep(0.005) 

275 

276 def erase(self): 

277 self.stop() 

278 self._xsp3.ERASE = 1 

279 

280 def del_roi(self, roiname): 

281 for mca in self._xsp3.mcas: 

282 mca.del_roi(roiname) 

283 self.rois = self._xsp3.mcas[0].get_rois() 

284 

285 def add_roi(self, roiname, lo=-1, hi=-1): 

286 for mca in self._xsp3.mcas: 

287 mca.add_roi(roiname, lo=lo, hi=hi) 

288 self.rois = self._xsp3.mcas[0].get_rois() 

289 

290 @EpicsFunction 

291 def rename_roi(self, i, newname): 

292 for mca in self._xsp3.mcas: 

293 roi = mca.rois[i] 

294 roi.Name = newname 

295 

296 def restore_rois(self, roifile): 

297 self._xsp3.restore_rois(roifile) 

298 self.rois = self._xsp3.mcas[0].get_rois() 

299 

300 def save_rois(self, roifile): 

301 buff = self._xsp3.roi_calib_info() 

302 with open(roifile, 'w') as fout: 

303 fout.write("%s\n" % "\n".join(buff)) 

304 

305 def save_columnfile(self, filename, headerlines=None): 

306 "write summed counts to simple ASCII column file for mca counts" 

307 f = open(filename, "w+") 

308 f.write("#XRF counts for %s\n" % self.name) 

309 if headerlines is not None: 

310 for i in headerlines: 

311 f.write("#%s\n" % i) 

312 f.write("#\n") 

313 f.write("#EnergyCalib.offset = %.9g \n" % self.offset) 

314 f.write("#EnergyCalib.slope = %.9g \n" % self.slope) 

315 f.write("#EnergyCalib.quad = %.9g \n" % self.quad) 

316 f.write("#Acquire.RealTime = %.9g \n" % self.real_time) 

317 f.write("#Acquire.LiveTime = %.9g \n" % self.live_time) 

318 roiform = "#ROI_%i '%s': [%i, %i]\n" 

319 for i, r in enumerate(self.rois): 

320 f.write(roiform % (i+1, r.name, r.left, r.right)) 

321 

322 f.write("#-----------------------------------------\n") 

323 f.write("# energy counts log_counts\n") 

324 

325 for e, d in zip(self.energy, self.counts): 

326 dlog = 0. 

327 if d > 0: dlog = np.log10(max(d, 1)) 

328 f.write(" %10.4f %12i %12.6g\n" % (e, d, dlog)) 

329 f.write("\n") 

330 f.close() 

331 

332 def save_mcafile(self, filename, environ=None): 

333 """write MultiChannel MCA file 

334 

335 Parameters: 

336 ----------- 

337 * filename: output file name 

338 """ 

339 rois = self._xsp3.get_rois() 

340 nelem = len(self._xsp3.mcas) 

341 mcas = [self.get_mca(mca=i+1, with_rois=False) for i in range(nelem)] 

342 

343 realtime = self._xsp3.AcquireTime * self._xsp3.ArrayCounter_RBV 

344 for m in mcas: 

345 m.real_time = realtime 

346 m.live_time = realtime 

347 

348 save_gsemcafile(filename, mcas, rois, environ=environ) 

349 

350 

351class Epics_MultiXMAP(object): 

352 """multi-element MCA detector using XIA xMAP electronics 

353 and epics dxp 3.x series of software 

354 

355 mcas list of MCA objects 

356 

357 connect() 

358 set_dwelltime(dtime=0) 

359 start() 

360 stop() 

361 erase() 

362 add_roi(roiname, lo, hi) 

363 del_roi(roiname) 

364 clear_rois() 

365 save_ascii(filename) 

366 get_energy(mca=1) 

367 get_array(mca=1) 

368 """ 

369 def __init__(self, prefix=None, nmca=4, **kws): 

370 self.nmca = nmca 

371 self.prefix = prefix 

372 self.mcas = [] 

373 self.energies = [] 

374 self.connected = False 

375 self.elapsed_real = None 

376 self.elapsed_textwidget = None 

377 self.needs_refresh = False 

378 self.frametime = 1.0 

379 self.nframes = 1 

380 if self.prefix is not None: 

381 self.connect() 

382 

383 # @EpicsFunction 

384 def connect(self): 

385 self._xmap = MultiXMAP(self.prefix, nmca=self.nmca) 

386 self._xmap.PV('ElapsedReal', callback=self.onRealTime) 

387 self._xmap.PV('DeadTime') 

388 time.sleep(0.001) 

389 self.mcas = self._xmap.mcas 

390 self.connected = True 

391 # self._xmap.SpectraMode() 

392 self.rois = self._xmap.mcas[0].get_rois() 

393 

394 @EpicsFunction 

395 def connect_displays(self, status=None, elapsed=None, deadtime=None): 

396 pvs = self._xmap._pvs 

397 if elapsed is not None: 

398 self.elapsed_textwidget = elapsed 

399 for wid, attr in ((status, 'Acquiring'),(deadtime, 'DeadTime')): 

400 if wid is not None: 

401 pvs[attr].add_callback(partial(self.update_widget, wid=wid)) 

402 

403 @DelayedEpicsCallback 

404 def update_widget(self, pvname, char_value=None, wid=None, **kws): 

405 if wid is not None: 

406 wid.SetLabel(char_value) 

407 

408 @DelayedEpicsCallback 

409 def onRealTime(self, pvname, value=None, char_value=None, **kws): 

410 self.elapsed_real = value 

411 self.needs_refresh = True 

412 self.frametime = value 

413 if self.elapsed_textwidget is not None: 

414 self.elapsed_textwidget.SetLabel(" %8.2f" % value) 

415 

416 def set_usesum(self, usesum=True): 

417 pass 

418 

419 def get_deadtime(self, mca=1): 

420 """return deadtime info""" 

421 return self._xmap.get("DeadTime") 

422 

423 def get_dtfactor(self, mca=1): 

424 """return deadtime correction factor""" 

425 ocr = self._xmap.get("OutputCountRate") 

426 icr = self._xmap.get("InputCountRate") 

427 if icr < 1 or ocr < 1: 

428 return 1 

429 return icr/ocr 

430 

431 def set_dwelltime(self, dtime=0): 

432 if dtime <= 0.1: 

433 self._xmap.PresetMode = 0 

434 else: 

435 self._xmap.PresetMode = 1 

436 self._xmap.PresetReal = dtime 

437 

438 def start(self): 

439 return self._xmap.start() 

440 

441 def stop(self): 

442 return self._xmap.stop() 

443 

444 def erase(self): 

445 self._xmap.put('EraseAll', 1) 

446 

447 def get_array(self, mca=1): 

448 out = 1.0*self._xmap.mcas[mca-1].get('VAL') 

449 out[np.where(out<0.91)]= 0.91 

450 return out 

451 

452 def get_energy(self, mca=1): 

453 return self._xmap.mcas[mca-1].get_energy() 

454 

455 def get_mca(self, mca=1, with_rois=True): 

456 """return an MCA object """ 

457 emca = self._xmap.mcas[mca-1] 

458 if with_rois: 

459 emca.get_rois() 

460 counts = self.get_array(mca=mca) 

461 if max(counts) < 1.0: 

462 counts = 0.5*np.ones(len(counts)) 

463 counts[0] = 2.0 

464 

465 thismca = MCA(counts=counts, offset=emca.CALO, slope=emca.CALS) 

466 thismca.energy = emca.get_energy() 

467 thismca.counts = counts 

468 thismca.real_time = emca.ERTM 

469 thismca.live_time = emca.ELTM 

470 thismca.rois = [] 

471 if with_rois: 

472 for eroi in emca.rois: 

473 thismca.rois.append(ROI(name=eroi.NM, address=eroi.address, 

474 left=eroi.LO, right=eroi.HI)) 

475 return thismca 

476 

477 def clear_rois(self): 

478 for mca in self._xmap.mcas: 

479 mca.clear_rois() 

480 self.rois = self._xmap.mcas[0].get_rois() 

481 

482 @EpicsFunction 

483 def del_roi(self, roiname): 

484 for mca in self._xmap.mcas: 

485 mca.del_roi(roiname) 

486 self.rois = self._xmap.mcas[0].get_rois() 

487 

488 def add_roi(self, roiname, lo=-1, hi=-1): 

489 calib = self._xmap.mcas[0].get_calib() 

490 for mca in self._xmap.mcas: 

491 mca.add_roi(roiname, lo=lo, hi=hi, calib=calib) 

492 self.rois = self._xmap.mcas[0].get_rois() 

493 

494 @EpicsFunction 

495 def rename_roi(self, i, newname): 

496 roi = self._xmap.mcas[0].rois[i] 

497 roi.NM = newname 

498 rootname = roi._prefix 

499 for imca in range(1, len(self._xmap.mcas)): 

500 pvname = rootname.replace('mca1', 'mca%i' % (1+imca)) 

501 epics.caput(pvname+'NM', newname) 

502 

503 def restore_rois(self, roifile): 

504 self._xmap.restore_rois(roifile) 

505 self.rois = self._xmap.mcas[0].get_rois() 

506 

507 def save_rois(self, roifile): 

508 buff = self._xmap.roi_calib_info() 

509 with open(roifile, 'w') as fout: 

510 fout.write("%s\n" % "\n".join(buff)) 

511 

512 def save_mca(self, fname): 

513 buff = self._xmap 

514 

515 def save_mcafile(self, filename, environ=None): 

516 """write MultiChannel MCA file 

517 

518 Parameters: 

519 ----------- 

520 * filename: output file name 

521 """ 

522 nelem = len(self.mcas) 

523 rois = self._xmap.get_rois() 

524 mcas = [self.get_mca(mca=i+1, with_rois=False) for i in range(nelem)] 

525 

526 

527 save_gsemcafile(filename, mcas, rois, environ=environ)