Coverage for larch/xrd/amcsd.py: 10%
869 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-16 21:04 +0000
1#!/usr/bin/env python
2"""
3AMCIFDB: American Mineralogical CIF database as sqlite3 database/python
5Usage:
6 amcifdb = AMCIFDB('amcif.db')
8add a CIF file:
9 amcifdb.add_ciffile('NewFile.cif')
11generatt the text of a CIF file from index:
12 cif_text = amcifdb.get_ciftext(300)
14OK, that looks like 'well, why not just save the CIF files'?
16And the answers are that there are simple methods for:
17 a) getting the XRD Q points
18 b) getting structure factors
19 c) getting atomic clustes as for feff files
20 d) saving Feff.inp files
22"""
24import sys
25import os
26import re
27import time
28import json
29from io import StringIO
30from string import ascii_letters
31from base64 import b64encode, b64decode
32from collections import namedtuple
34import requests
35from requests.packages.urllib3.exceptions import InsecureRequestWarning
36import atexit
37import numpy as np
39from sqlalchemy import MetaData, create_engine, func, text, and_, Table
40from sqlalchemy import __version__ as sqla_version
41from sqlalchemy.sql import select as sqla_select
42from sqlalchemy.orm import sessionmaker
45from .amcsd_utils import (make_engine, isAMCSD, put_optarray, get_optarray,
46 PMG_CIF_OPTS, CifParser, SpacegroupAnalyzer, pmg_version)
48from xraydb.chemparser import chemparse
49from xraydb import f0, f1_chantler, f2_chantler
52from .xrd_tools import generate_hkl, d_from_hkl, twth_from_q, E_from_lambda
53from .cif2feff import cif2feffinp
54from ..utils import isotime, mkdir
55from ..utils.strutils import version_ge, bytes2str
56from ..utils.physical_constants import TAU, ATOM_SYMS
57from ..site_config import user_larchdir
58from .. import logger
60_CIFDB = None
61ALL_HKLS = None
62AMCSD_TRIM = 'amcsd_cif1.db'
63AMCSD_FULL = 'amcsd_cif2.db'
65SOURCE_URLS = ('https://docs.xrayabsorption.org/databases/',
66 'https://millenia.cars.aps.anl.gov/xraylarch/downloads/')
68CIF_TEXTCOLUMNS = ('formula', 'compound', 'pub_title', 'formula_title', 'a',
69 'b', 'c', 'alpha', 'beta', 'gamma', 'cell_volume',
70 'crystal_density', 'atoms_sites', 'atoms_x', 'atoms_y',
71 'atoms_z', 'atoms_occupancy', 'atoms_u_iso',
72 'atoms_aniso_label', 'atoms_aniso_u11', 'atoms_aniso_u22',
73 'atoms_aniso_u33', 'atoms_aniso_u12', 'atoms_aniso_u13',
74 'atoms_aniso_u23', 'qdat','url', 'hkls')
78CifPublication = namedtuple('CifPublication', ('id', 'journalname', 'year',
79 'volume', 'page_first',
80 'page_last', 'authors'))
83StructureFactor = namedtuple('StructureFactor', ('q', 'intensity', 'hkl',
84 'twotheta', 'd',
85 'wavelength', 'energy',
86 'f2hkl', 'degen', 'lorentz'))
89# for packing/unpacking H, K, L to 2-character hash
90HKL_ENCODE = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_%'
91def pack_hkl(h, k, l):
92 """pack H, K, L values into 2 character sequence of
93 printable characters for storage and transmission
95 H, K, L must be unsigned integers from 0 to 15
97 see also unpack_hkl() to reverse the process.
98 """
99 if (h > 15 or k > 15 or l > 15 or
100 h < 0 or k < 0 or l < 0):
101 raise ValueError("hkl values out of range (max=15)")
102 x = h*256 + k*16 + l
103 return HKL_ENCODE[x//64] + HKL_ENCODE[x%64]
106def unpack_hkl(hash):
107 """unpack encoded H, K, L integers packed with pack_hkl()"""
108 a, b = HKL_ENCODE.index(hash[0]), HKL_ENCODE.index(hash[1])
109 s = a*64 + b
110 t = s//16
111 return t//16, t%16, s%16
114def pack_hkl_degen(hkls, degen):
115 """pack array of H, K, L and degeneracy values into printable
116 string for storage and transmission
117 hkl must be an array or list of list/tuples for H, K, L, with
118 each H, K, L value an unsigned integers from 0 to 15
120 hkls and degen must be ndarrays or lists of the same length
121 see also unpack_hkl_degen() to reverse the process.
122 """
123 if len(hkls) != len(degen):
124 raise ValueError("hkls and degen must be the same length in pack_hkl_degen()")
126 shkl = [pack_hkl(h, k, l) for h, k, l in hkls]
127 sdegen = json.dumps(degen.tolist()).replace(' ', '')
128 return f"{''.join(shkl)}|{sdegen}"
131def unpack_hkl_degen(sinp):
132 """pack arrays of h, k, l and degeneracies from string stored by pack_hkl_degen
133 see also pack_hkl_degen()
134 """
135 shkl, sdegen = sinp.split('|')
136 n = len(shkl)//2
137 hkls = []
138 for i in range(n):
139 hkls.append(unpack_hkl(shkl[2*i:2*i+2]))
140 degen = json.loads(sdegen)
141 return np.array(hkls), np.array(degen)
145def select(*args):
146 """wrap sqlalchemy select for version 1.3 and 2.0"""
147 # print("SELECT ", args, type(args))
148 # print(sqla_version, version_ge(sqla_version, '1.4.0'))
149 if version_ge(sqla_version, '1.4.0'):
150 return sqla_select(*args)
151 else:
152 return sqla_select(tuple(args))
155def get_nonzero(thing):
156 try:
157 if len(thing) == 1 and abs(thing[0]) < 1.e-5:
158 return None
159 except:
160 pass
161 return thing
163def clean_elemsym(sym):
164 sx = (sym + ' ')[:2]
165 return ''.join([s.strip() for s in sx if s in ascii_letters])
168def parse_cif_file(filename):
169 """parse ciffile, extract data for 1st listed structure,
170 and do some basic checks:
171 must have formula
172 must have spacegroup
173 returns dat, formula, json-dumped symm_xyz
174 """
175 if CifParser is None:
176 raise ValueError("CifParser from pymatgen not available. Try 'pip install pymatgen'.")
178 cif = CifParser(filename, **PMG_CIF_OPTS)
179 cifkey = list(cif._cif.data.keys())[0]
180 dat = cif._cif.data[cifkey].data
182 formula = None
183 for formname in ('_chemical_formula_sum', '_chemical_formula_moiety'):
184 if formname in dat:
185 try:
186 parsed_formula = chemparse(dat[formname])
187 formula = dat[formname]
188 except:
189 pass
190 if formula is None and '_atom_site_type_symbol' in dat:
191 comps = {}
192 complist = dat['_atom_site_type_symbol']
193 for c in complist:
194 if c not in comps:
195 nx = complist.count(c)
196 comps[c] = '%s%d' % (c, nx) if nx != 1 else c
197 formula = ''.join(comps.values())
199 if formula is None:
200 raise ValueError(f'Cannot read chemical formula from file {filename:s}')
202 # get spacegroup and symmetry
203 sgroup_name = dat.get('_symmetry_space_group_name_H-M', None)
204 if sgroup_name is None:
205 for key, val in dat.items():
206 if 'space_group' in key and 'H-M' in key:
207 sgroup_name = val
209 symm_xyz = dat.get('_space_group_symop_operation_xyz', None)
210 if symm_xyz is None:
211 symm_xyz = dat.get('_symmetry_equiv_pos_as_xyz', None)
212 if symm_xyz is None:
213 raise ValueError(f'Cannot read symmetries from file {filename:s}')
215 symm_xyz = json.dumps(symm_xyz)
216 return dat, formula, symm_xyz
219class CifStructure():
220 """representation of a Cif Structure
221 """
223 def __init__(self, ams_id=None, ams_db=None, publication=None, mineral=None,
224 spacegroup=None, hm_symbol=None, formula_title=None,
225 compound=None, formula=None, pub_title=None, a=None, b=None,
226 c=None, alpha=None, beta=None, gamma=None, hkls=None,
227 cell_volume=None, crystal_density=None,
228 atoms_sites='<missing>', atoms_aniso_label='<missing>',
229 atoms_x=None, atoms_y=None, atoms_z=None,
230 atoms_occupancy=None, atoms_u_iso=None, atoms_aniso_u11=None,
231 atoms_aniso_u22=None, atoms_aniso_u33=None,
232 atoms_aniso_u12=None, atoms_aniso_u13=None,
233 atoms_aniso_u23=None):
235 self.ams_id = ams_id
236 self.ams_db = ams_db
237 self.publication = publication
238 self.mineral = mineral
239 self.spacegroup = spacegroup
240 self.hm_symbol = hm_symbol
241 self.formula_title = formula_title
242 self.compound = compound
243 self.formula = formula
244 self.pub_title = pub_title
245 self.a = a
246 self.b = b
247 self.c = c
248 self.alpha = alpha
249 self.beta = beta
250 self.gamma = gamma
251 self.hkls = hkls
252 self.cell_volume = cell_volume
253 self.crystal_density = crystal_density
254 self.atoms_sites = atoms_sites
255 self.atoms_aniso_label = atoms_aniso_label
256 self.atoms_x = atoms_x
257 self.atoms_y = atoms_y
258 self.atoms_z = atoms_z
259 self.atoms_occupancy = get_nonzero(atoms_occupancy)
260 self.atoms_u_iso = get_nonzero(atoms_u_iso)
261 self.atoms_aniso_u11 = get_nonzero(atoms_aniso_u11)
262 self.atoms_aniso_u22 = get_nonzero(atoms_aniso_u22)
263 self.atoms_aniso_u33 = get_nonzero(atoms_aniso_u33)
264 self.atoms_aniso_u12 = get_nonzero(atoms_aniso_u12)
265 self.atoms_aniso_u13 = get_nonzero(atoms_aniso_u13)
266 self.atoms_aniso_u23 = get_nonzero(atoms_aniso_u23)
267 self.natoms = 0
268 self._ciftext = None
269 self.pmg_pstruct = None
270 self.pmg_cstruct = None
271 if atoms_sites not in (None, '<missing>'):
272 self.natoms = len(atoms_sites)
274 def __repr__(self):
275 if self.ams_id is None or self.formula is None:
276 return '<CifStructure empty>'
277 return f'<CifStructure, ams_id={self.ams_id:d}, formula={self.formula:s}>'
279 def get_mineralname(self):
280 minname = self.mineral.name
281 if minname == '<missing>':
282 minname =self.formula_title
283 if minname == '<missing>':
284 minname = 'missing'
285 return minname
288 @property
289 def ciftext(self):
290 if self._ciftext is not None:
291 return self._ciftext
293 out = ['data_global']
294 if self.formula_title != '<missing>':
295 out.append(f"_amcsd_formula_title '{self.formula_title:s}'")
297 if self.mineral.name != '<missing>':
298 out.append(f"_chemical_name_mineral '{self.mineral.name:s}'")
299 out.append('loop_')
300 out.append('_publ_author_name')
301 for a in self.publication.authors:
302 out.append(f"'{a:s}'")
304 out.append(f"_journal_name_full '{self.publication.journalname}'")
305 out.append(f"_journal_volume {self.publication.volume}")
306 out.append(f"_journal_year {self.publication.year}")
307 out.append(f"_journal_page_first {self.publication.page_first}")
308 out.append(f"_journal_page_last {self.publication.page_last}")
309 out.append('_publ_section_title')
310 out.append(';')
311 out.append(f"{self.pub_title:s}")
312 out.append(';')
313 out.append(f"_database_code_amcsd {self.ams_id:07d}")
314 if self.compound != '<missing>':
315 out.append(f"_chemical_compound_source '{self.compound}'")
316 out.append(f"_chemical_formula_sum '{self.formula}'")
317 out.append(f"_cell_length_a {self.a}")
318 out.append(f"_cell_length_b {self.b}")
319 out.append(f"_cell_length_c {self.c}")
320 out.append(f"_cell_angle_alpha {self.alpha}")
321 out.append(f"_cell_angle_beta {self.beta}")
322 out.append(f"_cell_angle_gamma {self.gamma}")
323 out.append(f"_cell_volume {self.cell_volume}")
324 out.append(f"_exptl_crystal_density_diffrn {self.crystal_density}")
325 out.append(f"_symmetry_space_group_name_H-M '{self.hm_symbol}'")
326 out.append('loop_')
327 out.append('_space_group_symop_operation_xyz')
328 for xyzop in json.loads(self.spacegroup.symmetry_xyz):
329 out.append(f" '{xyzop:s}'")
331 atoms_sites = self.atoms_sites
332 if atoms_sites not in (None, 'None', '0', '<missing>'):
333 out.append('loop_')
334 out.append('_atom_site_label')
335 out.append('_atom_site_fract_x')
336 out.append('_atom_site_fract_y')
337 out.append('_atom_site_fract_z')
340 natoms = len(atoms_sites)
341 atoms_x = self.atoms_x
342 atoms_y = self.atoms_y
343 atoms_z = self.atoms_z
344 atoms_occ = self.atoms_occupancy
345 atoms_u_iso = self.atoms_u_iso
346 if atoms_occ is not None:
347 out.append('_atom_site_occupancy')
348 if atoms_u_iso is not None:
349 out.append('_atom_site_U_iso_or_equiv')
350 for i in range(natoms):
351 adat = f"{atoms_sites[i]} {atoms_x[i]} {atoms_y[i]} {atoms_z[i]}"
352 if atoms_occ is not None:
353 adat += f" {atoms_occ[i]}"
354 if atoms_u_iso is not None:
355 adat += f" {atoms_u_iso[i]}"
356 out.append(adat)
358 aniso_label = self.atoms_aniso_label
359 if aniso_label not in (None, '0', '<missing>'):
360 out.append('loop_')
361 out.append('_atom_site_aniso_label')
362 out.append('_atom_site_aniso_U_11')
363 out.append('_atom_site_aniso_U_22')
364 out.append('_atom_site_aniso_U_33')
365 out.append('_atom_site_aniso_U_12')
366 out.append('_atom_site_aniso_U_13')
367 out.append('_atom_site_aniso_U_23')
368 natoms = len(aniso_label)
369 u11 = self.atoms_aniso_u11
370 u22 = self.atoms_aniso_u22
371 u33 = self.atoms_aniso_u33
372 u12 = self.atoms_aniso_u12
373 u13 = self.atoms_aniso_u13
374 u23 = self.atoms_aniso_u23
376 for i in range(natoms):
377 out.append(f"{aniso_label[i]} {u11[i]} {u22[i]} {u33[i]} {u12[i]} {u13[i]} {u23[i]}")
379 out.append('')
380 out.append('')
381 self._ciftext = '\n'.join(out)
382 return self.ciftext
385 def find_hkls(self, nmax=64, qmax=10, wavelength=0.75):
386 """find the HKLs and degeneracies of the strongest reflections
388 this will calculate structure factors, and sort them, but the
389 purpose is really to do a filter to find the strongest HKLs that
390 can then be saved and restored for structure factor calcs using
391 only the most important HKL values.
393 returns hkls, degen of the nmax reflections with the highest scattered intensity
394 """
395 self.get_pmg_struct()
397 pstruct = self.pmg_pstruct
398 cstruct = self.pmg_cstruct
399 if pstruct is None:
400 print(f"pymatgen could not parse CIF structure for CIF {self.ams_id}")
401 return
403 global ALL_HKLS
404 if ALL_HKLS is None:
405 ALL_HKLS = generate_hkl(hmax=15, kmax=15, lmax=15, positive_only=False)
407 hkls = ALL_HKLS[:]
408 unitcell = self.get_unitcell()
409 qhkls = TAU / d_from_hkl(hkls, **unitcell)
411 # remove q values outside of range
412 qfilt = (qhkls < qmax)
413 qhkls = qhkls[qfilt]
414 hkls = hkls[qfilt]
416 # find duplicate q-values, set degen
417 # scale up q values to better find duplicates
418 qscaled = [int(round(q*1.e9)) for q in qhkls]
419 q_unique, q_degen, hkl_unique = [], [], []
420 for i, q in enumerate(qscaled):
421 if q in q_unique:
422 q_degen[q_unique.index(q)] += 1
423 else:
424 q_unique.append(q)
425 q_degen.append(1)
426 hkl_unique.append(hkls[i])
428 qorder = np.argsort(q_unique)
429 qhkls = 1.e-9*np.array(q_unique)[qorder]
430 hkls = abs(np.array(hkl_unique)[qorder])
431 degen = np.array(q_degen)[qorder]
433 # note the f2 is calculated here without resonant corrections
434 f2 = self.calculate_f2(hkls, qhkls=qhkls, wavelength=None)
436 # filter out very small structure factors
437 ffilt = (f2 > 1.e-6*max(f2))
438 qhkls = qhkls[ffilt]
439 hkls = hkls[ffilt]
440 degen = degen[ffilt]
441 f2 = f2[ffilt]
443 # lorentz and polarization correction
444 arad = (TAU/360)*twth_from_q(qhkls, wavelength)
445 corr = (1+np.cos(arad)**2)/(np.sin(arad/2)**2*np.cos(arad/2))
447 intensity = f2 * degen * corr
448 ifilt = (intensity > 0.005*max(intensity))
450 intensity = intensity[ifilt] / max(intensity)
451 qhkls = qhkls[ifilt]
452 hkls = hkls[ifilt]
453 degen = degen[ifilt]
455 # indices of peaks in descending order of intensity
456 main_peaks = np.argsort(intensity)[::-1][:nmax]
458 hkls_main, degen_main = hkls[main_peaks], degen[main_peaks]
459 if self.ams_db is not None:
460 self.hkls = self.ams_db.set_hkls(self.ams_id, hkls_main, degen_main)
462 return hkls_main, degen_main
464 def get_structure_factors(self, wavelength=0.75):
465 """given arrays of HKLs and degeneracies (perhaps from find_hkls(),
466 return structure factors
468 This is a lot like find_hkls(), but with the assumption that HKLs
469 are not to be filtered or altered.
470 """
471 if self.hkls is None:
472 self.find_hkls(nmax=64, qmax=10, wavelength=wavelength)
474 hkls, degen = unpack_hkl_degen(self.hkls)
476 self.get_pmg_struct()
477 pstruct = self.pmg_pstruct
478 if pstruct is None:
479 print(f"pymatgen could not parse CIF structure for CIF {self.ams_id}")
480 return
482 unitcell = self.get_unitcell()
483 dhkls = d_from_hkl(hkls, **unitcell)
484 qhkls = TAU / dhkls
486 # sort by q
487 qsort = np.argsort(qhkls)
488 qhkls = qhkls[qsort]
489 dhkls = dhkls[qsort]
490 hkls = hkls[qsort]
491 degen = degen[qsort]
493 energy = E_from_lambda(wavelength, E_units='eV')
495 f2hkl = self.calculate_f2(hkls, qhkls=qhkls, wavelength=wavelength)
497 # lorentz and polarization correction
498 twoth = twth_from_q(qhkls, wavelength)
499 arad = (TAU/360)*twoth
500 corr = (1+np.cos(arad)**2)/(np.sin(arad/2)**2*np.cos(arad/2))
502 intensity = f2hkl * degen * corr
504 return StructureFactor(q=qhkls, intensity=intensity, hkl=hkls, d=dhkls,
505 f2hkl=f2hkl, twotheta=twoth, degen=degen,
506 lorentz=corr, wavelength=wavelength,
507 energy=energy)
510 def calculate_f2(self, hkls, qhkls=None, energy=None, wavelength=None):
511 """calculate F*F'.
513 If wavelength (in Ang) or energy (in eV) is not None, then
514 resonant corrections will be included.
515 """
516 if qhkls is None:
517 unitcell = self.get_unitcell()
518 qhkls = TAU / d_from_hkl(hkls, **unitcell)
519 sq = qhkls/(2*TAU)
520 sites = self.get_sites()
522 if energy is None and wavelength is not None:
523 energy = E_from_lambda(wavelength, E_units='eV')
525 # get f0 and resonant scattering factors
526 f0vals, f1vals, f2vals = {}, {}, {}
527 for elem in sites.keys():
528 if elem not in f0vals:
529 f0vals[elem] = f0(elem, sq)
530 if energy is not None:
531 f1vals[elem] = f1_chantler(elem, energy)
532 f2vals[elem] = f2_chantler(elem, energy)
534 # and f2
535 f2 = np.zeros(len(hkls))
536 for i, hkl in enumerate(hkls):
537 fsum = 0.
538 for elem in f0vals:
539 fval = f0vals[elem][i]
540 if energy is not None:
541 fval += f1vals[elem] - 1j*f2vals[elem]
542 for occu, fcoord in sites[elem]:
543 fsum += fval*occu*np.exp(1j*TAU*(fcoord*hkl).sum())
544 f2[i] = (fsum*fsum.conjugate()).real
545 return f2
548 def get_pmg_struct(self):
549 if self.pmg_cstruct is not None and self.pmg_pstruct is not None:
550 return
551 err = f"pymatgen {pmg_version} could not"
552 try:
553 pmcif = CifParser(StringIO(self.ciftext), **PMG_CIF_OPTS)
554 except:
555 print(f"{err} parse CIF text for CIF {self.ams_id}")
557 try:
558 self.pmg_cstruct = pmcif.parse_structures()[0]
559 except:
560 print(f"{err} parse structure for CIF {self.ams_id}")
562 try:
563 self.pmg_pstruct = SpacegroupAnalyzer(self.pmg_cstruct
564 ).get_conventional_standard_structure()
565 except:
566 print(f"{err} could not analyze spacegroup for CIF {self.ams_id}")
568 def get_unitcell(self):
569 "unitcell as dict, from PMG structure"
570 self.get_pmg_struct()
571 pstruct = self.pmg_pstruct
572 if pstruct is None:
573 print(f"pymatgen could not parse CIF structure for CIF {self.ams_id}")
574 return
575 pdict = pstruct.as_dict()
576 unitcell = {}
577 for a in ('a', 'b', 'c', 'alpha', 'beta', 'gamma', 'volume'):
578 unitcell[a] = pdict['lattice'][a]
579 return unitcell
581 def get_sites(self):
582 "dictionary of sites, from PMG structure"
583 self.get_pmg_struct()
584 pstruct = self.pmg_pstruct
585 if pstruct is None:
586 print(f"pymatgen could not parse CIF structure for CIF {self.ams_id}")
587 return
589 sites = {}
590 for site in pstruct.sites:
591 sdat = site.as_dict()
592 fcoords = sdat['abc']
594 for spec in sdat['species']:
595 elem = spec['element']
596 if elem == 'Nh': elem = 'N'
597 if elem == 'Og':
598 elem = 'O'
599 if elem in ('Hs', 'D'):
600 elem = 'H'
601 if elem.startswith('Dh') or elem.startswith('Dd') or elem.startswith('Dw'):
602 elem = 'H'
603 if elem == 'Fl':
604 elem = 'F'
605 occu = spec['occu']
606 if elem not in sites:
607 sites[elem] = [(occu, fcoords)]
608 else:
609 sites[elem].append([occu, fcoords])
610 return sites
614 def get_feffinp(self, absorber, edge=None, cluster_size=8.0, absorber_site=1,
615 with_h=False, version8=True):
616 pub = self.publication
617 journal = f"{pub.journalname} {pub.volume}, pp. {pub.page_first}-{pub.page_last} ({pub.year:d})"
618 authors = ', '.join(pub.authors)
619 titles = [f'Structure from AMCSD, AMS_ID: {self.ams_id:d}',
620 f'Mineral Name: {self.mineral.name:s}']
622 if not self.formula_title.startswith('<missing'):
623 titles.append(f'Formula Title: {self.formula_title}')
625 titles.extend([f'Journal: {journal}', f'Authors: {authors}'])
626 if not self.pub_title.startswith('<missing'):
627 for i, line in enumerate(self.pub_title.split('\n')):
628 titles.append(f'Title{i+1:d}: {line}')
630 return cif2feffinp(self.ciftext, absorber, edge=edge,
631 cluster_size=cluster_size, with_h=with_h,
632 absorber_site=absorber_site,
633 extra_titles=titles, version8=version8)
635 def save_feffinp(self, absorber, edge=None, cluster_size=8.0, absorber_site=1,
636 filename=None, version8=True):
637 feff6text = self.get_feffinp(absorber, edge=edge, cluster_size=cluster_size,
638 absorber_site=absorber_site, version8=version8)
639 if filename is None:
640 min_name = self.mineral.name.lower()
641 if min_name in ('', '<missing>', 'None'):
642 name = f'{absorber:s}_{edge:s}_CIF{self.ams_id:06d}'
643 else:
644 name = f'{absorber:s}_{edge:s}_{min_name:s}_CIF{self.ams_id:06d}'
646 ffolder = os.path.join(user_larchdir, 'feff', name)
647 mkdir(ffolder)
648 filename = os.path.join(ffolder, 'feff.inp')
649 with open(filename, 'w', encoding=sys.getdefaultencoding()) as fh:
650 fh.write(feff6text)
651 return filename
653class AMCSD():
654 """
655 Database of CIF structure data from the American Mineralogical Crystal Structure Database
657 http://rruff.geo.arizona.edu/AMS/amcsd.php
659 """
660 def __init__(self, dbname=None, read_only=False):
661 "connect to an existing database"
662 if dbname is None:
663 parent, _ = os.path.split(__file__)
664 dbname = os.path.join(parent, AMCSD_TRIM)
665 if not os.path.exists(dbname):
666 raise IOError("Database '%s' not found!" % dbname)
668 if not isAMCSD(dbname):
669 raise ValueError("'%s' is not a valid AMCSD Database!" % dbname)
671 self.connect(dbname, read_only=read_only)
672 atexit.register(self.finalize_amcsd)
673 ciftab = self.tables['cif']
674 for colname in CIF_TEXTCOLUMNS:
675 if colname not in ciftab.columns and not read_only:
676 self.session.execute(text(f'alter table cif add column {colname} text'))
677 self.close()
678 self.connect(dbname, read_only=read_only)
679 time.sleep(0.1)
680 self.insert('version', tag=f'with {colname}', date=isotime(),
681 notes=f'added {colname} column to cif table')
683 def finalize_amcsd(self):
684 conn = getattr(self, 'conn', None)
685 if conn is not None:
686 conn.close()
688 def connect(self, dbname, read_only=False):
689 self.dbname = dbname
690 self.engine = make_engine(dbname)
691 self.conn = self.engine.connect()
692 kwargs = {'bind': self.engine, 'autoflush': True, 'autocommit': False}
693 self.session = sessionmaker(**kwargs)()
694 if read_only:
695 def readonly_flush(*args, **kwargs):
696 return
697 self.session.flush = readonly_flush
699 self.metadata = MetaData()
700 self.metadata.reflect(bind=self.engine)
701 self.tables = self.metadata.tables
702 self.cif_elems = None
704 def close(self):
705 "close session"
706 self.session.flush()
707 self.session.close()
709 def query(self, *args, **kws):
710 "generic query"
711 return self.session.query(*args, **kws)
713 def insert(self, tablename, **kws):
714 if isinstance(tablename, Table):
715 table = tablename
716 else:
717 table = self.tables[tablename]
718 stmt = table.insert().values(kws)
719 out = self.session.execute(stmt)
720 self.session.commit()
721 self.session.flush()
723 def update(self, tablename, whereclause=False, **kws):
724 if isinstance(tablename, Table):
725 table = tablename
726 else:
727 table = self.tables[tablename]
729 stmt = table.update().where(whereclause).values(kws)
730 out = self.session.execute(stmt)
731 self.session.commit()
732 self.session.flush()
734 def execall(self, query):
735 return self.session.execute(query).fetchall()
737 def execone(self, query):
738 results = self.session.execute(query).fetchone()
739 if results is None or len(results) < 1:
740 return None
741 return results
743 def get_all(self, tablename):
744 return self.execall(self.tables[tablename].select())
747 def get_version(self, long=False, with_history=False):
748 """
749 return sqlite3 database and python library version numbers
751 Parameters:
752 long (bool): show timestamp and notes of latest version [False]
753 with_history (bool): show complete version history [False]
755 Returns:
756 string: version information
757 """
758 out = []
759 rows = self.get_all('version')
760 if not with_history:
761 rows = rows[-1:]
762 if long or with_history:
763 for row in rows:
764 out.append(f"AMCSD Version: {row.tag} [{row.date}] '{row.notes}'")
765 out.append(f"Python Version: {__version__}")
766 out = "\n".join(out)
767 elif rows is None:
768 out = f"AMCSD Version: unknown, Python Version: {__version__}"
769 else:
770 out = f"AMCSD Version: {rows[0].tag}, Python Version: {__version__}"
771 return out
773 def _get_tablerow(self, table, name, add=True):
774 tab = self.tables[table]
775 if '"' in name:
776 name = name.replace('"', '\"')
777 rows = self.execall(tab.select().where(tab.c.name==name))
778 if len(rows) == 0:
779 if not add:
780 return None
781 self.insert(tab, name=name)
782 rows = self.execall(tab.select().where(tab.c.name==name))
783 return rows[0]
785 def get_spacegroup(self, hm_name):
786 """get row from spacegroups table by HM notation. See add_spacegroup()
787 """
788 tab = self.tables['spacegroups']
789 rows = self.execall(tab.select().where(tab.c.hm_notation==hm_name))
790 if len(rows) >0:
791 return rows[0]
792 return None
795 def add_spacegroup(self, hm_name, symmetry_xyz, category=None):
796 """add entry to spacegroups table, including HM notation and CIF symmetry operations
797 """
798 sg = self.get_spacegroup(hm_name)
799 if sg is not None and sg.symmetry_xyz == symmetry_xyz:
800 return sg
802 args = {'hm_notation': hm_name, 'symmetry_xyz': symmetry_xyz}
803 if category is not None:
804 args['category'] = category
805 self.insert('spacegroups', **args)
806 return self.get_spacegroup(hm_name)
808 def get_publications(self, journalname=None, year=None, volume=None,
809 page_first=None, page_last=None, id=None):
810 """get rows from publications table by journalname, year (required)
811 and optionally volume, page_first, or page_last.
812 """
813 tab = self.tables['publications']
815 args = []
816 if journalname is not None:
817 args.append(func.lower(tab.c.journalname)==journalname.lower())
818 if year is not None:
819 args.append(tab.c.year==int(year))
820 if volume is not None:
821 args.append(tab.c.volume==str(volume))
822 if page_first is not None:
823 args.append(tab.c.page_first==str(page_first))
824 if page_last is not None:
825 args.append(tab.c.page_last==str(page_last))
826 if id is not None:
827 args.append(tab.c.id==id)
829 rows = self.execall(tab.select().where(and_(*args)))
830 if len(rows) > 0:
831 out = []
832 authtab = self.tables['authors']
833 patab = self.tables['publication_authors']
834 for row in rows:
835 q = select(authtab.c.name).where(and_(authtab.c.id==patab.c.author_id,
836 patab.c.publication_id==row.id))
837 authors = tuple([i[0] for i in self.execall(q)])
838 out.append(CifPublication(row.id, row.journalname, row.year,
839 row.volume, row.page_first,
840 row.page_last, authors))
841 return out
842 return None
845 def add_publication(self, journalname, year, authorlist, volume=None,
846 page_first=None, page_last=None, with_authors=True):
848 args = dict(journalname=journalname, year=year)
849 if volume is not None:
850 args['volume'] = volume
851 if page_first is not None:
852 args['page_first'] = page_first
853 if page_last is not None:
854 args['page_last'] = page_last
856 self.insert('publications', **args)
857 self.session.flush()
858 pub = self.get_publications(journalname, year, volume=volume,
859 page_first=page_first,
860 page_last=page_last)[0]
862 if with_authors:
863 for name in authorlist:
864 auth = self._get_tablerow('authors', name, add=True)
865 self.insert('publication_authors',
866 publication_id=pub.id, author_id=auth.id)
867 return pub
869 def add_cifdata(self, cif_id, mineral_id, publication_id,
870 spacegroup_id, formula=None, compound=None,
871 formula_title=None, pub_title=None, a=None, b=None,
872 c=None, alpha=None, beta=None, gamma=None, url='',
873 cell_volume=None, crystal_density=None,
874 atoms_sites=None, atoms_x=None, atoms_y=None,
875 atoms_z=None, atoms_occupancy=None, atoms_u_iso=None,
876 atoms_aniso_label=None, atoms_aniso_u11=None,
877 atoms_aniso_u22=None, atoms_aniso_u33=None,
878 atoms_aniso_u12=None, atoms_aniso_u13=None,
879 atoms_aniso_u23=None, with_elements=True):
881 self.insert('cif', id=cif_id, mineral_id=mineral_id,
882 publication_id=publication_id,
883 spacegroup_id=spacegroup_id,
884 formula_title=formula_title, pub_title=pub_title,
885 formula=formula, compound=compound, url=url, a=a, b=b,
886 c=c, alpha=alpha, beta=beta, gamma=gamma,
887 cell_volume=cell_volume,
888 crystal_density=crystal_density,
889 atoms_sites=atoms_sites, atoms_x=atoms_x,
890 atoms_y=atoms_y, atoms_z=atoms_z,
891 atoms_occupancy=atoms_occupancy,
892 atoms_u_iso=atoms_u_iso,
893 atoms_aniso_label=atoms_aniso_label,
894 atoms_aniso_u11=atoms_aniso_u11,
895 atoms_aniso_u22=atoms_aniso_u22,
896 atoms_aniso_u33=atoms_aniso_u33,
897 atoms_aniso_u12=atoms_aniso_u12,
898 atoms_aniso_u13=atoms_aniso_u13,
899 atoms_aniso_u23=atoms_aniso_u23)
901 if with_elements:
902 for element in chemparse(formula).keys():
903 self.insert('cif_elements', cif_id=cif_id, element=element)
904 return self.get_cif(cif_id)
907 def add_ciffile(self, filename, cif_id=None, url='', debug=False):
909 if CifParser is None:
910 raise ValueError("CifParser from pymatgen not available. Try 'pip install pymatgen'.")
911 try:
912 dat, formula, symm_xyz = parse_cif_file(filename)
913 except:
914 raise ValueError(f"unknown error trying to parse CIF file: {filename}")
916 # compound
917 compound = '<missing>'
918 for compname in ('_chemical_compound_source',
919 '_chemical_name_systematic',
920 '_chemical_name_common'):
921 if compname in dat:
922 compound = dat[compname]
925 # spacegroup
926 sgroup_name = dat.get('_symmetry_space_group_name_H-M', None)
927 if sgroup_name is None:
928 for key, val in dat.items():
929 if 'space_group' in key and 'H-M' in key:
930 sgroup_name = val
932 sgroup = self.get_spacegroup(sgroup_name)
933 if sgroup is not None and sgroup.symmetry_xyz != symm_xyz:
934 for i in range(1, 11):
935 tgroup_name = sgroup_name + f' %var{i:d}%'
936 sgroup = self.get_spacegroup(tgroup_name)
937 if sgroup is None or sgroup.symmetry_xyz == symm_xyz:
938 sgroup_name = tgroup_name
939 break
940 if sgroup is None:
941 sgroup = self.add_spacegroup(sgroup_name, symm_xyz)
943 min_name = '<missing>'
944 for mname in ('_chemical_name_mineral',
945 '_chemical_name_common'):
946 if mname in dat:
947 min_name = dat[mname]
948 mineral = self._get_tablerow('minerals', min_name)
950 # get publication data (including ISCD style of 'citation' in place of 'journal' )
951 pubdict = dict(journalname=dat.get('_journal_name_full', None),
952 year=dat.get('_journal_year', None),
953 volume=dat.get('_journal_volume', None),
954 page_first=dat.get('_journal_page_first', None),
955 page_last=dat.get('_journal_page_last', None))
957 for key, alt, dval in (('journalname', 'journal_full', 'No Journal'),
958 ('year', None, -1),
959 ('volume', 'journal_volume', 0),
960 ('page_first', None, 0),
961 ('page_last', None, 0)):
962 if pubdict[key] is None:
963 if alt is None:
964 alt = key
965 alt = '_citation_%s' % alt
966 pubdict[key] = dat.get(alt, [dval])[0]
967 authors = dat.get('_publ_author_name', None)
968 if authors is None:
969 authors = dat.get('_citation_author_name', ['Anonymous'])
971 pubs = self.get_publications(**pubdict)
972 if pubs is None:
973 pub = self.add_publication(pubdict['journalname'],
974 pubdict['year'], authors,
975 volume=pubdict['volume'],
976 page_first=pubdict['page_first'],
977 page_last=pubdict['page_last'])
978 else:
979 pub = pubs[0]
981 density = dat.get('_exptl_crystal_density_meas', None)
982 if density is None:
983 density = dat.get('_exptl_crystal_density_diffrn', -1.0)
985 if cif_id is None:
986 cif_id = dat.get('_database_code_amcsd', None)
987 if cif_id is None:
988 cif_id = dat.get('_cod_database_code', None)
989 if cif_id is None:
990 cif_id = self.next_cif_id()
991 cif_id = int(cif_id)
993 # check again for this cif id (must match CIF AMS id and formula
994 tabcif = self.tables['cif']
995 this = self.execone(select(tabcif.c.id, tabcif.c.formula
996 ).where(tabcif.c.id==int(cif_id)))
997 if this is not None:
998 _cid, _formula = this
999 if formula.replace(' ', '') == _formula.replace(' ', ''):
1000 return cif_id
1001 else:
1002 cif_id = self.next_cif_id()
1004 if debug:
1005 print("##CIF Would add Cif Data !" )
1006 print(cif_id, mineral.id, pub.id, sgroup.id)
1007 print("##CIF formuala / compound: ", formula, compound)
1008 print("titles: ",
1009 dat.get('_amcsd_formula_title', '<missing>'),
1010 dat.get('_publ_section_title', '<missing>'))
1011 print("##CIF atom sites :", json.dumps(dat['_atom_site_label']))
1012 print("##CIF locations : ",
1013 put_optarray(dat, '_atom_site_fract_x'),
1014 put_optarray(dat, '_atom_site_fract_y'),
1015 put_optarray(dat, '_atom_site_fract_z'),
1016 put_optarray(dat, '_atom_site_occupancy'),
1017 put_optarray(dat, '_atom_site_U_iso_or_equiv'))
1018 print("##CIF aniso label : ",
1019 json.dumps(dat.get('_atom_site_aniso_label', '<missing>')))
1020 print("##CIF aniso : ",
1021 put_optarray(dat, '_atom_site_aniso_U_11'),
1022 put_optarray(dat, '_atom_site_aniso_U_22'),
1023 put_optarray(dat, '_atom_site_aniso_U_33'),
1024 put_optarray(dat, '_atom_site_aniso_U_12'),
1025 put_optarray(dat, '_atom_site_aniso_U_13'),
1026 put_optarray(dat, '_atom_site_aniso_U_23'))
1027 print('##CIF cell data: ', dat['_cell_length_a'],
1028 dat['_cell_length_b'],
1029 dat['_cell_length_c'],
1030 dat['_cell_angle_alpha'],
1031 dat['_cell_angle_beta'],
1032 dat['_cell_angle_gamma'])
1033 print("##CIF volume/ density ", dat.get('_cell_volume', -1), density)
1034 print("##CIF url : ", type(url), url)
1036 self.add_cifdata(cif_id, mineral.id, pub.id, sgroup.id,
1037 formula=formula, compound=compound,
1038 formula_title=dat.get('_amcsd_formula_title', '<missing>'),
1039 pub_title=dat.get('_publ_section_title', '<missing>'),
1040 atoms_sites=json.dumps(dat['_atom_site_label']),
1041 atoms_x=put_optarray(dat, '_atom_site_fract_x'),
1042 atoms_y=put_optarray(dat, '_atom_site_fract_y'),
1043 atoms_z=put_optarray(dat, '_atom_site_fract_z'),
1044 atoms_occupancy=put_optarray(dat, '_atom_site_occupancy'),
1045 atoms_u_iso=put_optarray(dat, '_atom_site_U_iso_or_equiv'),
1046 atoms_aniso_label=json.dumps(dat.get('_atom_site_aniso_label', '<missing>')),
1047 atoms_aniso_u11=put_optarray(dat, '_atom_site_aniso_U_11'),
1048 atoms_aniso_u22=put_optarray(dat, '_atom_site_aniso_U_22'),
1049 atoms_aniso_u33=put_optarray(dat, '_atom_site_aniso_U_33'),
1050 atoms_aniso_u12=put_optarray(dat, '_atom_site_aniso_U_12'),
1051 atoms_aniso_u13=put_optarray(dat, '_atom_site_aniso_U_13'),
1052 atoms_aniso_u23=put_optarray(dat, '_atom_site_aniso_U_23'),
1053 a=dat['_cell_length_a'],
1054 b=dat['_cell_length_b'],
1055 c=dat['_cell_length_c'],
1056 alpha=dat['_cell_angle_alpha'],
1057 beta=dat['_cell_angle_beta'],
1058 gamma=dat['_cell_angle_gamma'],
1059 cell_volume=dat.get('_cell_volume', -1),
1060 crystal_density=density,
1061 url=url)
1062 return cif_id
1064 def get_cif(self, cif_id, as_strings=False):
1065 """get Cif Structure object """
1066 tab = self.tables['cif']
1068 cif = self.execone(tab.select().where(tab.c.id==cif_id))
1069 if cif is None:
1070 return
1072 tab_pub = self.tables['publications']
1073 tab_auth = self.tables['authors']
1074 tab_pa = self.tables['publication_authors']
1075 tab_min = self.tables['minerals']
1076 tab_sp = self.tables['spacegroups']
1077 mineral = self.execone(tab_min.select().where(tab_min.c.id==cif.mineral_id))
1078 sgroup = self.execone(tab_sp.select().where(tab_sp.c.id==cif.spacegroup_id))
1079 hm_symbol = sgroup.hm_notation
1080 if '%var' in hm_symbol:
1081 hm_symbol = hm_symbol.split('%var')[0]
1083 pub = self.get_publications(id=cif.publication_id)[0]
1085 out = CifStructure(ams_id=cif_id, publication=pub,
1086 mineral=mineral, spacegroup=sgroup,
1087 hm_symbol=hm_symbol, ams_db=self)
1089 for attr in ('formula_title', 'compound', 'formula', 'pub_title'):
1090 setattr(out, attr, getattr(cif, attr, '<missing>'))
1091 for attr in ('a', 'b', 'c', 'alpha', 'beta', 'gamma',
1092 'cell_volume', 'crystal_density'):
1093 val = getattr(cif, attr, '-1')
1094 if not as_strings:
1095 if val is not None:
1096 if '(' in val:
1097 val = val.split('(')[0]
1098 if ',' in val and '.' not in val:
1099 val = val.replace(',', '.')
1100 try:
1101 val = float(val)
1102 except:
1103 pass
1104 setattr(out, attr, val)
1106 for attr in ('atoms_sites', 'atoms_aniso_label'):
1107 val = getattr(cif, attr, '<missing>')
1108 val = '<missing>' if val in (None, '<missing>') else json.loads(val)
1109 setattr(out, attr, val)
1111 if out.atoms_sites not in (None, '<missing>'):
1112 out.natoms = len(out.atoms_sites)
1113 for attr in ('atoms_x', 'atoms_y', 'atoms_z', 'atoms_occupancy',
1114 'atoms_u_iso', 'atoms_aniso_u11', 'atoms_aniso_u22',
1115 'atoms_aniso_u33', 'atoms_aniso_u12',
1116 'atoms_aniso_u13', 'atoms_aniso_u23'):
1117 try:
1118 val = get_optarray(getattr(cif, attr))
1119 if val == '0':
1120 val = None
1121 elif not as_strings:
1122 tmp = []
1123 for i in range(len(val)):
1124 v = val[i]
1125 if v in ('?', '.'):
1126 v = 2.
1127 else:
1128 v = float(v)
1129 tmp.append(v)
1130 val = tmp
1131 setattr(out, attr, val)
1132 except:
1133 print(f"could not parse CIF entry for {cif_id} '{attr}': {val} ")
1135 # we're now ignoring per-cif qvalues
1136 # out.qval = None
1137 # if cif.qdat is not None:
1138 # out.qval = np.unpackbits(np.array([int(b) for b in b64decode(cif.qdat)],
1139 # dtype='uint8'))
1141 out.hkls = None
1142 if hasattr(cif, 'hkls'):
1143 out.hkls = cif.hkls
1145 return out
1147 def next_cif_id(self):
1148 """next available CIF ID > 200000 that is not in current table"""
1149 max_id = 200_000
1150 tabcif = self.tables['cif']
1151 for row in self.execall(select(tabcif.c.id).where(tabcif.c.id>200000)):
1152 if row[0] > max_id:
1153 max_id = row[0]
1154 return max_id + 1
1157 def all_minerals(self):
1158 names = []
1159 for row in self.get_all('minerals'):
1160 if row.name not in names:
1161 names.append(row.name)
1162 return names
1164 def all_authors(self):
1165 names = []
1166 for row in self.get_all('authors'):
1167 if row.name not in names:
1168 names.append(row.name)
1169 return names
1171 def all_journals(self):
1172 names = []
1173 for row in self.get_all('publications'):
1174 if row.journalname not in names:
1175 names.append(row.journalname)
1176 return names
1178 def get_cif_elems(self):
1179 if self.cif_elems is None:
1180 out = {}
1181 for row in self.get_all('cif_elements'):
1182 cifid = int(row.cif_id)
1183 if cifid not in out:
1184 out[cifid] = []
1185 if row.element not in out[cifid]:
1186 out[cifid].append(row.element)
1188 self.cif_elems = out
1189 return self.cif_elems
1192 def find_cifs(self, id=None, mineral_name=None, author_name=None,
1193 journal_name=None, contains_elements=None,
1194 excludes_elements=None, strict_contains=False,
1195 full_occupancy=False, max_matches=1000):
1196 """return list of CIF Structures matching mineral, publication, or elements
1197 """
1198 if id is not None:
1199 thiscif = self.get_cif(id)
1200 if thiscif is not None:
1201 return [thiscif]
1203 tabcif = self.tables['cif']
1204 tabmin = self.tables['minerals']
1205 tabpub = self.tables['publications']
1206 tabaut = self.tables['authors']
1207 tab_ap = self.tables['publication_authors']
1208 tab_ce = self.tables['cif_elements']
1210 matches = []
1211 t0 = time.time()
1212 if mineral_name is None:
1213 mineral_name = ''
1214 mineral_name = mineral_name.strip()
1216 if mineral_name not in (None, '') and ('*' in mineral_name or
1217 '^' in mineral_name or
1218 '$' in mineral_name):
1219 pattern = mineral_name.replace('*', '.*').replace('..*', '.*')
1220 matches = []
1221 for row in self.get_all('minerals'):
1222 if re.search(pattern, row.name, flags=re.IGNORECASE) is not None:
1223 query = select(tabcif.c.id).where(tabcif.c.mineral_id==row.id)
1224 for m in [row[0] for row in self.execall(query)]:
1225 if m not in matches:
1226 matches.append(m)
1228 if journal_name not in (None, ''):
1229 pattern = journal_name.replace('*', '.*').replace('..*', '.*')
1230 new_matches = []
1231 for c in matches:
1232 pub_id = self.execone(select(tabcif.c.publication_id
1233 ).where(tabcif.c.id==c))
1234 this_journal = self.execone(select(tabpub.c.journalname
1235 ).where(tabpub.c.id==pub_id))
1236 if re.search(pattern, this_journal, flags=re.IGNORECASE) is not None:
1237 new_matches.append[c]
1238 matches = new_matches
1241 else: # strict mineral name or no mineral name
1242 args = []
1243 if mineral_name not in (None, ''):
1244 args.append(func.lower(tabmin.c.name)==mineral_name.lower())
1245 args.append(tabmin.c.id==tabcif.c.mineral_id)
1247 if journal_name not in (None, ''):
1248 args.append(func.lower(tabpub.c.journalname)==journal_name.lower())
1249 args.append(tabpub.c.id==tabcif.c.publication_id)
1251 if author_name not in (None, ''):
1252 args.append(func.lower(tabaut.c.name)==author_name.lower())
1253 args.append(tabcif.c.publication_id==tab_ap.c.publication_id)
1254 args.append(tabaut.c.id==tab_ap.c.author_id)
1256 query = select(tabcif.c.id)
1257 if len(args) > 0:
1258 query = select(tabcif.c.id).where(and_(*args))
1259 matches = [row[0] for row in self.execall(query)]
1260 matches = list(set(matches))
1261 #
1262 cif_elems = self.get_cif_elems()
1263 if contains_elements is not None:
1264 for el in contains_elements:
1265 new_matches = []
1266 for row in matches:
1267 if row in cif_elems and el in cif_elems[row]:
1268 new_matches.append(row)
1269 matches = new_matches
1271 if strict_contains:
1272 excludes_elements = ATOM_SYMS[:]
1273 for c in contains_elements:
1274 if c in excludes_elements:
1275 excludes_elements.remove(c)
1276 if excludes_elements is not None:
1277 bad = []
1278 for el in excludes_elements:
1279 for row in matches:
1280 if el in cif_elems[row] and row not in bad:
1281 bad.append(row)
1282 for row in bad:
1283 matches.remove(row)
1286 if full_occupancy:
1287 good = []
1288 for cif_id in matches:
1289 cif = self.execone(tabcif.select().where(tabcif.c.id==cif_id))
1290 occ = get_optarray(getattr(cif, 'atoms_occupancy'))
1291 if occ in ('0', 0, None):
1292 good.append(cif_id)
1293 else:
1294 try:
1295 min_wt = min([float(x) for x in occ])
1296 except:
1297 min_wt = 0
1298 if min_wt > 0.96:
1299 good.append(cif_id)
1300 matches = good
1302 if len(matches) > max_matches:
1303 matches = matches[:max_matches]
1304 return [self.get_cif(cid) for cid in matches]
1306 def set_hkls(self, cifid, hkls, degens):
1307 ctab = self.tables['cif']
1308 packed_hkls = pack_hkl_degen(hkls, degens)
1309 self.update(ctab, whereclause=(ctab.c.id == cifid), hkls=packed_hkls)
1310 return packed_hkls
1312def get_amcsd(download_full=True, timeout=30):
1313 """return instance of the AMCSD CIF Database
1315 Returns:
1316 AMCSD database
1317 Example:
1319 """
1320 global _CIFDB
1321 if _CIFDB is not None:
1322 return _CIFDB
1324 dbfull = os.path.join(user_larchdir, AMCSD_FULL)
1325 if os.path.exists(dbfull):
1326 _CIFDB = AMCSD(dbfull)
1327 return _CIFDB
1328 t0 = time.time()
1329 if download_full:
1330 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
1331 for src in SOURCE_URLS:
1332 url = f"{src:s}/{AMCSD_FULL:s}"
1333 req = requests.get(url, verify=True, timeout=timeout)
1334 if req.status_code == 200:
1335 break
1336 if req.status_code == 200:
1337 with open(dbfull, 'wb') as fh:
1338 fh.write(req.content)
1339 print("Downloaded %s : %.2f sec" % (dbfull, time.time()-t0))
1340 time.sleep(0.25)
1341 _CIFDB = AMCSD(dbfull)
1342 return _CIFDB
1343 # finally download of full must have failed
1344 return AMCSD()
1346def get_cif(ams_id):
1347 """
1348 get CIF Structure by AMS ID
1349 """
1350 db = get_amcsd()
1351 return db.get_cif(ams_id)
1353def find_cifs(mineral_name=None, journal_name=None, author_name=None,
1354 contains_elements=None, excludes_elements=None,
1355 strict_contains=False, full_occupancy=False):
1357 """
1358 return a list of CIF Structures matching a set of criteria:
1360 mineral_name: case-insensitive match of mineral name
1361 journal_name:
1362 author_name:
1363 containselements: list of atomic symbols required to be in structure
1364 excludes_elements: list of atomic symbols required to NOT be in structure
1365 strict_contains: `contains_elements` is complete -- no other elements
1368 """
1369 db = get_amcsd()
1370 return db.find_cifs(mineral_name=mineral_name,
1371 journal_name=journal_name,
1372 author_name=author_name,
1373 contains_elements=contains_elements,
1374 excludes_elements=excludes_elements,
1375 strict_contains=strict_contains,
1376 full_occupancy=full_occupancy)