Source code for binoculars.backends.id03_xu

"""
BINocular backend for beamline ID03:EH2
This backend should serve as a basic example of a backend based on
xrayutilities [1]. It still uses PyMCA for parsing the spec,edf files.
The 'original' ID03 backend was used as a template.

Created on 2014-10-16

[1] http://xrayutilities.sourceforge.net/

author: Dominik Kriegner (dominik.kriegner@gmail.com)
"""

import sys
import os
import glob
import numpy

import xrayutilities as xu

from PyMca5.PyMca import EdfFile, specfile, specfilewrapper

from .. import backend, errors, util


[docs] class HKLProjection(backend.ProjectionBase): # scalars: mu, theta, [chi, phi, "omitted"] delta, gamR, gamT, ty, wavelength # 3x3 matrix: UB
[docs] def project(self, mu, theta, delta, gamR, gamT, ty, wavelength, UB, qconv): qconv.wavelength = wavelength h, k, l = qconv.area( mu, theta, mu, delta, ty, gamT, gamR, UB=UB.reshape((3, 3)) ) return (h, k, l)
[docs] def get_axis_labels(self): return "H", "K", "L"
[docs] class HKProjection(HKLProjection):
[docs] def project(self, mu, theta, delta, gamR, gamT, ty, wavelength, UB, qconv): H, K, L = super().project( mu, theta, delta, gamR, gamT, ty, wavelength, UB, qconv ) return (H, K)
[docs] def get_axis_labels(self): return "H", "K"
[docs] class QProjection(backend.ProjectionBase):
[docs] def project(self, mu, theta, delta, gamR, gamT, ty, wavelength, UB, qconv): qconv.wavelength = wavelength qx, qy, qz = qconv.area( mu, theta, mu, delta, ty, gamT, gamR, UB=numpy.identity(3) ) return (qx, qy, qz)
[docs] def get_axis_labels(self): return "qx", "qy", "qz"
[docs] class ID03Input(backend.InputBase): # OFFICIAL API
[docs] def generate_jobs(self, command): scans = util.parse_multi_range(",".join(command).replace(" ", ",")) if not len(scans): sys.stderr.write("error: no scans selected, nothing to do\n") for scanno in scans: scan = self.get_scan(scanno) try: pointcount = scan.lines() except specfile.error: # no points continue next(self.get_images(scan, 0, pointcount - 1, dry_run=True)) # dryrun if ( self.config.target_weight and pointcount > self.config.target_weight * 1.4 ): for s in util.chunk_slicer(pointcount, self.config.target_weight): yield backend.Job( scan=scanno, firstpoint=s.start, lastpoint=s.stop - 1, weight=s.stop - s.start, ) else: yield backend.Job( scan=scanno, firstpoint=0, lastpoint=pointcount - 1, weight=pointcount, )
[docs] def process_job(self, job): super().process_job(job) scan = self.get_scan(job.scan) scanparams = self.get_scan_params(scan) # wavelength, UB pointparams = self.get_point_params( scan, job.firstpoint, job.lastpoint ) # 1D array of diffractometer angles + mon + transm images = self.get_images(scan, job.firstpoint, job.lastpoint) # iterator! for pp, image in zip(pointparams, images): yield self.process_image(scanparams, pp, image)
[docs] def parse_config(self, config): super().parse_config(config) self.config.xmask = util.parse_multi_range(config.pop("xmask")) self.config.ymask = util.parse_multi_range(config.pop("ymask")) self.config.specfile = config.pop("specfile") self.config.imagefolder = config.pop("imagefolder", None) self.config.UB = config.pop("ub", None) if self.config.UB: self.config.UB = util.parse_tuple(self.config.UB, length=9, type=float) self.config.sdd = float(config.pop("sdd")) self.config.pixelsize = util.parse_tuple( config.pop("pixelsize"), length=2, type=float ) self.config.centralpixel = util.parse_tuple( config.pop("centralpixel"), length=2, type=int )
[docs] def get_destination_options(self, command): if not command: return False command = ",".join(command).replace(" ", ",") scans = util.parse_multi_range(command) return dict(first=min(scans), last=max(scans), range=",".join(command))
# CONVENIENCE FUNCTIONS _spec = None
[docs] def get_scan(self, scannumber): if self._spec is None: self._spec = specfilewrapper.Specfile(self.config.specfile) return self._spec.select(f"{scannumber}.1")
[docs] def find_edfs(self, pattern, scanno): files = glob.glob(pattern) ret = {} for file in files: try: filename = os.path.basename(file).split(".")[0] scan, point, image = filename.split("_")[-3:] scan, point, image = int(scan), int(point), int(image) if scan == scanno and point not in list(ret.keys()): ret[point] = file except ValueError: continue return ret
[docs] @staticmethod def apply_mask(data, xmask, ymask): roi = data[ymask, :] return roi[:, xmask]
# MAIN LOGIC
[docs] def get_scan_params(self, scan): UB = numpy.array(scan.header("G")[2].split(" ")[-9:], dtype=numpy.float) wavelength = float(scan.header("G")[1].split(" ")[-1]) return wavelength, UB
[docs] def get_images(self, scan, first, last, dry_run=False): try: uccdtagline = scan.header("UCCD")[0] UCCD = os.path.split(os.path.dirname(uccdtagline.split()[-1])) except Exception: print( "warning: UCCD tag not found, use imagefolder for proper file specification" ) UCCD = [] pattern = self._get_pattern(UCCD) matches = self.find_edfs(pattern, scan.number()) if set(range(first, last + 1)) > set(matches.keys()): raise errors.FileError( f"incorrect number of matches for scan {scan.number()}" f" using pattern {pattern}" ) if dry_run: yield else: for i in range(first, last + 1): edf = EdfFile.EdfFile(matches[i]) yield edf.GetData(0)
def _get_pattern(self, UCCD): imagefolder = self.config.imagefolder if imagefolder: try: imagefolder = imagefolder.format(UCCD=UCCD, rUCCD=list(reversed(UCCD))) except Exception as e: raise errors.ConfigError( "invalid 'imagefolder' specification" f" '{self.config.imagefolder}': {e!r}" ) else: imagefolder = os.path.join(*UCCD) if not os.path.exists(imagefolder): raise ValueError( "invalid 'imagefolder' specification" f" '{self.config.imagefolder}'." f" Path {imagefolder} does not exist" ) return os.path.join(imagefolder, "*")
[docs] class EH2(ID03Input): monitor_counter = "Monitor" # define ID03 goniometer, SIXC geometry with 2D detector mounted on a # translation-axis (distance changing with changing Gamma) # The geometry is: 1+3S+2D # sample axis mu, th, chi, phi -> here chi,phi are omitted # detector axis mu, del, gam # gam is realized by a translation along z (gamT) and rotation around x+ (gamR) qconv = xu.experiment.QConversion( ["x+", "z-"], ["x+", "z-", "ty", "tz", "x+"], [0, 1, 0] # 'y+', 'z+' ) # convention for coordinate system: y downstream; z outwards; x upwards # (righthanded) # QConversion will set up the goniometer geometry. So the first argument # describes the sample rotations, the second the detector rotations and the # third the primary beam direction. ty = 600.0 # mm
[docs] def parse_config(self, config): super().parse_config(config) centralpixel = self.config.centralpixel # (row, column) = (gamma, delta) # define detector parameters roi = ( self.config.ymask[0], self.config.ymask[-1] + 1, self.config.xmask[0], self.config.xmask[-1] + 1, ) self.qconv.init_area( "x+", "z-", cch1=centralpixel[1], cch2=centralpixel[0], Nch1=516, Nch2=516, pwidth1=self.config.pixelsize[1], pwidth2=self.config.pixelsize[0], distance=self.config.sdd - self.ty, roi=roi, ) # distance sdd-600 corresponds to distance of the detector chip from # the gamR rotation axis (rest is handled by the translations ty and # gamT (along z)) print(f"{'Mu':>9} {'Theta':>10} {'Delta':>9} {'Gamma':>9}")
[docs] def process_image(self, scanparams, pointparams, image): mu, theta, chi, phi, delta, gamma, mon, transm = pointparams wavelength, UB = scanparams data = image / mon / transm print(f"{mu:9.4f} {theta:10.4f} {delta:9.4f} {gamma:9.4f}") # recalculate detector translation (which should be saved!) gamT = self.ty * numpy.tan(numpy.radians(gamma)) # masking intensity = self.apply_mask(data, self.config.xmask, self.config.ymask) # no polarization correction for the moment! return ( intensity, numpy.ones_like(intensity), ( mu, theta, delta, gamma, gamT, # weights added to API. keeps functionality identical with wights of one self.ty, wavelength, UB, self.qconv, ), )
[docs] def get_point_params(self, scan, first, last): sl = slice(first, last + 1) MU, TH, CHI, PHI, DEL, GAM, MON, TRANSM = list(range(8)) params = numpy.zeros((last - first + 1, 8)) # Mu, Theta, Chi, Phi, Delta, Gamma, MON, transm params[:, CHI] = scan.motorpos("Chi") params[:, PHI] = scan.motorpos("Phi") params[:, TH] = scan.datacol("thcnt")[sl] params[:, GAM] = scan.datacol("gamcnt")[sl] params[:, DEL] = scan.datacol("delcnt")[sl] params[:, MON] = scan.datacol(self.monitor_counter)[sl] params[:, TRANSM] = scan.datacol("transm")[sl] params[:, MU] = scan.datacol("mucnt")[sl] return params