Coverage for larch/epics/ad_mca.py: 22%

191 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-16 21:04 +0000

1import numpy as np 

2import time 

3 

4from epics import PV, caget, caput, poll, Device, get_pv 

5 

6MAX_CHAN = 4096 

7MAX_ROIS = 48 

8TOOMANY_ROIS = 'Too many ROIS, only %i ROIS allowed.' % (MAX_ROIS) 

9 

10class ADMCAROI(Device): 

11 """ 

12 MCA ROI using ROIStat plugin from areaDetector2, 

13 as used for Xspress3 detector. 

14 """ 

15 

16 _attrs =('Use', 'Name', 'MinX', 'SizeX', 

17 'BgdWidth', 'SizeX_RBV', 'MinX_RBV', 

18 'Total_RBV', 'Net_RBV') 

19 

20 _aliases = {'left': 'MinX', 

21 'width': 'SizeX', 

22 'name': 'Name', 

23 'sum': 'Total_RBV', 

24 'net': 'Net_RBV'} 

25 

26 _nonpvs = ('_prefix', '_pvs', '_delim', '_init', 

27 '_aliases', 'data_pv') 

28 

29 _reprfmt = "<ADMCAROI '%s', name='%s', range=[%s:%s]>" 

30 def __init__(self, prefix, roi=1, bgr_width=3, data_pv=None, with_poll=False): 

31 self._prefix = '%s:%i' % (prefix, roi) 

32 Device.__init__(self, self._prefix, delim=':', 

33 attrs=('Name', 'MinX'), with_poll=with_poll) 

34 self._aliases = {'left': 'MinX', 

35 'width': 'SizeX', 

36 'name': 'Name', 

37 'sum': 'Total_RBV', 

38 'net': 'Net_RBV'} 

39 

40 self.data_pv = data_pv 

41 

42 def __eq__(self, other): 

43 """used for comparisons""" 

44 return (self.MinX == getattr(other, 'MinX', None) and 

45 self.SizeX == getattr(other, 'SizeX', None) and 

46 self.BgdWidth == getattr(other, 'BgdWidth', None) ) 

47 

48 def __ne__(self, other): return not self.__eq__(other) 

49 def __lt__(self, other): return self.MinX < getattr(other, 'MinX', None) 

50 def __le__(self, other): return self.MinX <= getattr(other, 'MinX', None) 

51 def __gt__(self, other): return self.MinX > getattr(other, 'MinX', None) 

52 def __ge__(self, other): return self.MinX >= getattr(other, 'MinX', None) 

53 

54 def __repr__(self): 

55 "string representation" 

56 pref = self._prefix 

57 if pref.endswith('.'): 

58 pref = pref[:-1] 

59 

60 return self._reprfmt % (pref, self.Name, self.MinX, 

61 self.MinX+self.SizeX) 

62 

63 def get_right(self): 

64 return self.MinX + self.SizeX 

65 

66 def set_right(self, val): 

67 """set the upper ROI limit (adjusting size, leaving left unchanged)""" 

68 self._pvs['SizeX'].put(val - self.MinX) 

69 

70 right = property(get_right, set_right) 

71 

72 def get_center(self): 

73 return int(round(self.MinX + self.SizeX/2.0)) 

74 

75 def set_center(self, val): 

76 """set the ROI center (adjusting left, leaving width unchanged)""" 

77 self._pvs['MinX'].put(int(round(val - self.SizeX/2.0))) 

78 

79 center = property(get_center, set_center) 

80 

81 def clear(self): 

82 self.Name = '' 

83 self.MinX = 0 

84 self.SizeX = 0 

85 

86 def get_counts(self, data=None, net=False): 

87 """ 

88 calculate total and net counts for a spectra 

89 

90 Parameters: 

91 ----------- 

92 data numpy array of spectra or None to read from PV 

93 net bool to set net counts (default=False: total counts returned) 

94 """ 

95 if data is None and self.data_pv is not None: 

96 data = self.data_pv.get() 

97 out = self.Total_RBV 

98 if net: 

99 out = self.Net_RBV 

100 if isinstance(data, np.ndarray): 

101 lo = self.MinX 

102 hi = self.MinX + self.SizeX 

103 out = data[lo:hi+1].sum() 

104 if net: 

105 wid = int(self.bgr_width) 

106 jlo = max((lo - wid), 0) 

107 jhi = min((hi + wid), len(data)-1) + 1 

108 bgr = np.concatenate((data[jlo:lo], 

109 data[hi+1:jhi])).mean() 

110 out = out - bgr*(hi-lo) 

111 return out 

112 

113class ADMCA(Device): 

114 """ 

115 MCA using ROIStat plugin from areaDetector2, 

116 as used for Xspress3 detector. 

117 """ 

118 _attrs =('AcquireTime', 'Acquire', 'NumImages') 

119 _nonpvs = ('_prefix', '_pvs', '_delim', '_roi_prefix', 

120 '_npts', 'rois', '_nrois', 'rois', '_calib') 

121 _calib = (0.00, 0.01, 0.00) 

122 def __init__(self, prefix, data_pv=None, nrois=None, roi_prefix=None): 

123 

124 self._prefix = prefix 

125 Device.__init__(self, self._prefix, delim='', 

126 attrs=self._attrs, with_poll=False) 

127 if data_pv is not None: 

128 self._pvs['VAL'] = PV(data_pv, auto_monitor=False) 

129 self._npts = None 

130 self._nrois = nrois 

131 if self._nrois is None: 

132 self._nrois = MAX_ROIS 

133 

134 self._roi_prefix = roi_prefix 

135 for i in range(self._nrois): 

136 p = get_pv('%s:%i:Name' % (self._roi_prefix, i+1)) 

137 p = get_pv('%s:%i:MinX' % (self._roi_prefix, i+1)) 

138 p = get_pv('%s:%i:SizeX' % (self._roi_prefix, i+1)) 

139 self.get_rois() 

140 poll() 

141 

142 def start(self): 

143 "Start AD MCA" 

144 self.Acquire = 1 

145 poll() 

146 return self.Acquire 

147 

148 def stop(self): 

149 "Stop AD MCA" 

150 self.Acquire = 0 

151 return self.Acquire 

152 

153 def get_calib(self): 

154 """get energy calibration tuple (offset, slope, quad)""" 

155 return self._calib 

156 

157 def get_energy(self): 

158 """return energy for AD MCA""" 

159 if self._npts is None and self._pvs['VAL'] is not None: 

160 self._npts = len(self.get('VAL')) 

161 en = np.arange(self._npts, dtype='f8') 

162 cal = self._calib 

163 return cal[0] + en*(cal[1] + en*cal[2]) 

164 

165 def clear_rois(self, nrois=None): 

166 "clear all rois" 

167 if self.rois is None: 

168 self.get_rois() 

169 for roi in self.rois: 

170 roi.clear() 

171 self.rois = [] 

172 

173 def get_rois(self, nrois=None): 

174 "get all rois" 

175 self.rois = [] 

176 data_pv = self._pvs['VAL'] 

177 poll() 

178 data_pv.connect() 

179 prefix = self._roi_prefix 

180 if prefix is None: 

181 return self.rois 

182 

183 if nrois is None: 

184 nrois = self._nrois 

185 for i in range(nrois): 

186 roi = ADMCAROI(prefix=self._roi_prefix, roi=i+1, data_pv=data_pv) 

187 if roi.Name is None: 

188 roi = ADMCAROI(prefix=self._roi_prefix, roi=i+1, 

189 data_pv=data_pv, with_poll=True) 

190 if roi.Name is None: 

191 continue 

192 if len(roi.Name.strip()) > 0 and roi.MinX > 0 and roi.SizeX > 0: 

193 self.rois.append(roi) 

194 else: 

195 break 

196 poll(0.001, 1.0) 

197 return self.rois 

198 

199 def del_roi(self, roiname): 

200 "delete an roi by name" 

201 if self.rois is None: 

202 self.get_rois() 

203 for roi in self.rois: 

204 if roi.Name.strip().lower() == roiname.strip().lower(): 

205 roi.clear() 

206 poll(0.010, 1.0) 

207 self.sort_rois() 

208 

209 def add_roi(self, roiname, lo, wid=None, hi=None, sort=True): 

210 """ 

211 add an roi, given name, lo, and hi channels. 

212 """ 

213 if lo is None or (hi is None and wid is None): 

214 return 

215 if self.rois is None: 

216 self.get_rois() 

217 

218 try: 

219 iroi = len(self.rois) + 1 

220 except: 

221 iroi = 0 

222 

223 if iroi > MAX_ROIS: 

224 raise ValueError(TOOMANY_ROIS) 

225 data_pv = self._pvs['VAL'] 

226 prefix = self._roi_prefix 

227 

228 roi = ADMCAROI(prefix=prefix, roi=iroi, data_pv=data_pv) 

229 roi.Name = roiname.strip() 

230 

231 nmax = MAX_CHAN 

232 if self._npts is None and self._pvs['VAL'] is not None: 

233 nmax = self._npts = len(self.get('VAL')) 

234 

235 roi.MinX = min(nmax-1, lo) 

236 if hi is not None: 

237 hi = min(nmax, hi) 

238 roi.SizeX = hi-lo 

239 elif wid is not None: 

240 roi.SizeX = min(nmax, wid+roi.MinX) - roi.MinX 

241 self.rois.append(roi) 

242 if sort: 

243 self.sort_rois() 

244 

245 def sort_rois(self): 

246 """ 

247 make sure rois are sorted, and Epics PVs are cleared 

248 """ 

249 if self.rois is None: 

250 self.get_rois() 

251 

252 poll(0.05, 1.0) 

253 unsorted = [] 

254 empties = 0 

255 for roi in self.rois: 

256 if len(roi.Name) > 0 and roi.right > 0: 

257 unsorted.append(roi) 

258 else: 

259 empties =+ 1 

260 if empties > 3: 

261 break 

262 

263 self.rois = sorted(unsorted) 

264 rpref = self._roi_prefix 

265 roidat = [(r.Name, r.MinX, r.SizeX) for r in self.rois] 

266 

267 for iroi, roi in enumerate(roidat): 

268 caput("%s:%i:Name" % (rpref, iroi+1), roi[0]) 

269 caput("%s:%i:MinX" % (rpref, iroi+1), roi[1]) 

270 caput("%s:%i:SizeX" % (rpref, iroi+1), roi[2]) 

271 

272 iroi = len(roidat) 

273 caput("%s:%i:Name" % (rpref, iroi+1), '') 

274 caput("%s:%i:MinX" % (rpref, iroi+1), 0) 

275 caput("%s:%i:SizeX" % (rpref, iroi+1), 0) 

276 self.get_rois() 

277 

278 def set_rois(self, roidata): 

279 """ 

280 set all rois from list/tuple of (Name, Lo, Hi), 

281 and ensures they are ordered and contiguous. 

282 """ 

283 data_pv = self._pvs['VAL'] 

284 

285 iroi = 0 

286 self.clear_rois() 

287 for name, lo, hi in roidata: 

288 if len(name) > 0 and hi > lo and hi > 0: 

289 iroi +=1 

290 if iroi >= MAX_ROIS: 

291 raise ValueError(TOOMANY_ROIS) 

292 self.add_roi(name, lo, hi=hi, sort=False) 

293 self.sort_rois()