# -*- coding: utf-8 -*-
"""
Input plug-in for a KKRimp calculation.
"""
from __future__ import absolute_import
from aiida.engine import CalcJob
from aiida.orm import CalcJobNode, Dict, RemoteData, SinglefileData, Bool
from aiida.common.utils import classproperty
from aiida.common.exceptions import (InputValidationError, ValidationError, UniquenessError)
from aiida.common.datastructures import (CalcInfo, CodeInfo)
from masci_tools.io.kkr_params import kkrparams
from .voro import VoronoiCalculation
from .kkr import KkrCalculation
from aiida_kkr.tools.tools_kkrimp import modify_potential, make_scoef, write_scoef_full_imp_cls
from aiida_kkr.tools.common_workfunctions import get_username
from masci_tools.io.common_functions import search_string, get_ef_from_potfile
import os
import tarfile
from numpy import array, array_equal, sqrt, sum, where, loadtxt
import six
from six.moves import range
__copyright__ = (u'Copyright (c), 2018, Forschungszentrum Jülich GmbH, '
'IAS-1/PGI-1, Germany. All rights reserved.')
__license__ = 'MIT license, see LICENSE.txt file'
__version__ = '0.7.0'
__contributors__ = (u'Philipp Rüßmann', u'Fabian Bertoldo')
#TODO: implement 'ilayer_center' consistency check
[docs]class KkrimpCalculation(CalcJob):
"""
AiiDA calculation plugin for a KKRimp calculation.
"""
# calculation plugin version
_CALCULATION_PLUGIN_VERSION = __version__
# List of mandatory input files
_CONFIG = u'config.cfg'
_POTENTIAL = u'potential'
_KKRFLEX_GREEN = KkrCalculation._KKRFLEX_GREEN
_KKRFLEX_TMAT = KkrCalculation._KKRFLEX_TMAT
_KKRFLEX_ATOMINFO = KkrCalculation._KKRFLEX_ATOMINFO
_KKRFLEX_INTERCELL_REF = KkrCalculation._KKRFLEX_INTERCELL_REF
_KKRFLEX_INTERCELL_CMOMS = KkrCalculation._KKRFLEX_INTERCELL_CMOMS
# List of optional input files (may be mandatory for some setting in inputputcard)
_SHAPEFUN = u'shapefun'
_KKRFLEX_ANGLE = u'kkrflex_angle'
_KKRFLEX_LLYFAC = u'kkrflex_llyfac'
# full list of kkrflex files
_ALL_KKRFLEX_FILES = KkrCalculation._ALL_KKRFLEX_FILES
_ALL_KKRFLEX_FILES.append(_KKRFLEX_ANGLE)
_ALL_KKRFLEX_FILES.append(_KKRFLEX_LLYFAC)
# List of output files that should always be present (are always retrieved)
_OUT_POTENTIAL = u'out_potential'
_OUTPUT_000 = u'out_log.000.txt'
_OUT_TIMING_000 = u'out_timing.000.txt'
_OUT_ENERGYSP = u'out_energysp_eV'
_OUT_ENERGYSP_PER_ATOM = u'out_energysp_per_atom_eV'
_OUT_ENERGYTOT = u'out_energytotal_eV'
_OUT_ENERGYTOT_PER_ATOM = u'out_energytotal_per_atom_eV'
# List of output files that are retrieved if special conditions are fulfilled
_OUT_JIJMAT = u'out_Jijmatrix'
_OUT_JIJ_OF_E_BASE = 'out_Jijmatrix_Eres_IE%0.3i.dat'
_OUT_LDOS_BASE = u'out_ldos.atom=%2i_spin%i.dat'
_OUT_LDOS_INTERPOL_BASE = u'out_ldos.interpol.atom=%2i_spin%i.dat'
_OUT_LMDOS_BASE = u'out_lmdos.atom=%2i_spin%i.dat'
_OUT_LMDOS_INTERPOL_BASE = u'out_lmdos.interpol.atom=%2i_spin%i.dat'
_OUT_MAGNETICMOMENTS = u'out_magneticmoments'
_OUT_ORBITALMOMENTS = u'out_orbitalmoments'
_LDAUPOT = 'ldaupot'
# template.product entry point defined in setup.json
_default_parser = u'kkr.kkrimpparser'
# default names used within aiida (verdi calculation out(in)putcat)
_OUTPUT_FILE_NAME = u'out_kkrimp'
_INPUT_FILE_NAME = _CONFIG
_DEFAULT_OUTPUT_FILE = _OUTPUT_FILE_NAME # will be shown with outputcat
_DEFAULT_INPUT_FILE = _INPUT_FILE_NAME # will be shown with inputcat
# name of tarfile which is created by parser after successful parsing (to reduce amount of data stored in repo)
_FILENAME_TAR = 'output_all.tar.gz'
_DIRNAME_GF_UPLOAD = 'kkrflex_green_upload'
[docs] @classmethod
def define(cls, spec):
"""
Init internal parameters at class load time
"""
# reuse base class function
super(KkrimpCalculation, cls).define(spec)
# now define input files and parser
spec.input(
'metadata.options.parser_name', valid_type=six.string_types, default=cls._default_parser, non_db=True
)
spec.input(
'metadata.options.input_filename',
valid_type=six.string_types,
default=cls._DEFAULT_INPUT_FILE,
non_db=True
)
spec.input(
'metadata.options.output_filename',
valid_type=six.string_types,
default=cls._DEFAULT_OUTPUT_FILE,
non_db=True
)
# define input nodes (optional ones have required=False)
spec.input(
'parameters',
valid_type=Dict,
required=False,
help='Use a node that specifies the input parameters (calculation settings).'
)
spec.input(
'host_Greenfunction_folder',
valid_type=RemoteData,
required=True,
help=
'Use a node that specifies the host KKR calculation contaning the host Green function and tmatrix (KkrCalculation with impurity_info input).'
)
spec.input(
'host_Greenfunction_folder_Efshift',
valid_type=RemoteData,
required=False,
help=
'Use a node that specifies the host KKR calculation contaning the host Green function and tmatrix with Fermi level shift (used to set Fermi level).'
)
spec.input(
'impurity_potential',
valid_type=SinglefileData,
required=False,
help='Use a node that contains the input potential.'
)
spec.input(
'parent_calc_folder',
valid_type=RemoteData,
required=False,
help='Use a node that specifies a parent KKRimp calculation.'
)
spec.input(
'impurity_info',
valid_type=Dict,
required=False,
help='Use a parameter node that specifies properties for a immpurity calculation.'
)
spec.input(
'settings_LDAU',
valid_type=Dict,
required=False,
help="""
Settings for running a LDA+U calculation. The Dict node should be of the form
settings_LDAU = Dict(dict={'iatom=0':{
'L': 3, # l-block which gets U correction (1: p, 2: d, 3: f-electrons)
'U': 7., # U value in eV
'J': 0.75, # J value in eV
'Eref_EF': 0., # reference energy in eV relative to the Fermi energy. This is the energy where the projector wavefunctions are calculated (should be close in energy where the states that are shifted lie (e.g. for Eu use the Fermi energy))
}})
Note: you can add multiple entries like the one for iatom==0 in this example. The atom index refers to the corresponding atom in the impurity cluster.
"""
)
spec.input(
'initial_noco_angles',
valid_type=Dict,
required=False,
help="""
Initial non-collinear angles for the magnetic moments of the impurities. These values will be written into the `kkrflex_angle` input file of KKRimp.
The Dict node should be of the form
initial_noco_angles = Dict(dict={
'theta': [theta_at1, theta_at2, ..., theta_atN], # list theta values in degrees (0..180)
'phi': [phi_at1, phi_at2, ..., phi_atN], # list phi values in degrees (0..360)
'fix_dir': [True, False, ..., True/False], # list of booleans indicating of the direction of the magentic moment should be fixed or is allowed to be updated (True means keep the direction of the magnetic moment fixed)
})
Note: The length of the theta, phi and fix_dir lists have to be equal to the number of atoms in the impurity cluster.
"""
)
spec.input(
'cleanup_outfiles',
valid_type=Bool,
required=False,
default=lambda: Bool(False),
help='Cleanup and compress output (works only in aiida-core<2.0 and breaks caching ability).'
)
# define outputs
spec.output('output_parameters', valid_type=Dict, required=True, help='results of the KKRimp calculation')
spec.default_output_node = 'output_parameters'
# define exit codes, also used in parser
spec.exit_code(301, 'ERROR_NO_RETRIEVED_FOLDER', message='Retrieved folder of KKRimp calculation not found.')
spec.exit_code(302, 'ERROR_PARSING_KKRIMPCALC', message='KKRimp parser returned an error.')
#TBD
[docs] def prepare_for_submission(self, tempfolder):
"""
Create input files.
:param tempfolder: aiida.common.folders.Folder subclass where
the plugin should put all its files.
:param inputdict: dictionary of the input nodes as they would
be returned by get_inputs_dict
"""
# Check inputdict and extrace nodes
tmp = self._check_and_extract_input_nodes(tempfolder)
parameters, code, imp_info = tmp[0], tmp[1], tmp[2]
kkrflex_file_paths, shapefun_path, shapes = tmp[3], tmp[4], tmp[5]
host_parent_calc, params_host, impurity_potential = tmp[6], tmp[7], tmp[8]
parent_calc_folder, structure = tmp[9], tmp[10]
# Prepare input files for KKRimp calculation
# 1. fill kkr params for KKRimp, write config file and eventually also kkrflex_llyfac file
self._extract_and_write_config(
parent_calc_folder, params_host, parameters, tempfolder,
kkrflex_file_paths[KkrCalculation._KKRFLEX_ATOMINFO]
)
# 2. write shapefun from impurity info and host shapefun and copy imp. potential
self._get_pot_and_shape(
imp_info, shapefun_path, shapes, impurity_potential, parent_calc_folder, tempfolder, structure
)
# 3. change kkrflex_atominfo to match impurity case
self._change_atominfo(imp_info, kkrflex_file_paths, tempfolder)
# prepare copy and retrieve lists
local_copy_list = [] # potential already in tempfolder which is copied in full
for filename in list(kkrflex_file_paths.keys()):
if filename != self._KKRFLEX_ATOMINFO:
src_path = kkrflex_file_paths[filename]
local_copy_list.append((src_path.uuid, filename, filename))
retrieve_list = [
self._OUTPUT_FILE_NAME, self._CONFIG, self._KKRFLEX_ATOMINFO, self._KKRFLEX_ANGLE, self._KKRFLEX_LLYFAC,
self._OUT_POTENTIAL, self._OUTPUT_000, self._OUT_TIMING_000, self._OUT_ENERGYSP_PER_ATOM,
self._OUT_ENERGYTOT_PER_ATOM
]
# extract run and test options (these change retrieve list in some cases)
allopts = self.get_run_test_opts(parameters)
# retrieve l(m)dos files
retrieve_list = self.add_lmdos_files_to_retrieve(tempfolder, allopts, retrieve_list, kkrflex_file_paths)
# change retrieve list for Chebychev solver
retrieve_list = self.adapt_retrieve_tmatnew(tempfolder, allopts, retrieve_list)
# write ldaupot file and change retrieved list for LDA+U calculation
# this is triggered with having the settings_LDAU dict node in input.
retrieve_list = self.init_ldau(tempfolder, retrieve_list, parent_calc_folder)
# add Jij files to retrieve list if calculation if in Jij mode
retrieve_list = self.add_jij_files(tempfolder, retrieve_list)
# change local and remote copy list if GF is found on remote machine
remote_symlink_list, local_copy_list = self.get_remote_symlink(local_copy_list)
# Prepare CalcInfo to be returned to aiida (e.g. retreive_list etc.)
calcinfo = CalcInfo()
calcinfo.uuid = self.uuid
calcinfo.local_copy_list = local_copy_list
calcinfo.remote_copy_list = []
calcinfo.remote_symlink_list = remote_symlink_list
calcinfo.retrieve_list = retrieve_list
codeinfo = CodeInfo()
codeinfo.cmdline_params = []
codeinfo.stdout_name = self._OUTPUT_FILE_NAME
codeinfo.code_uuid = code.uuid
calcinfo.codes_info = [codeinfo]
return calcinfo
#####################################################
# helper functions
[docs] def _get_and_verify_hostfiles(self, tempfolder):
"""
Check inputdict for host_Greenfunction_folder and extract impurity_info, paths to kkrflex-files and path of shapefun file
:param inputdict: input dictionary containing all input nodes to KkrimpCalculation
:returns:
* imp_info: Dict node containing impurity information like position, Z_imp, cluster size, etc.
* kkrflex_file_paths: dict of absolute file paths for the kkrflex files
* shapefun_path: absolute path of the shapefunction file in the host calculation (needed to construct shapefun_imp)
* shapes: mapping array of atoms to shapes (<SHAPE> input)
:note: shapefun_path is None if host_Greenfunction calculation was not full-potential
:raises:
* InputValidationError, if inputdict does not contain 'host_Greenfunction'
* InputValidationError, if host_Greenfunction_folder not of right type
* UniquenessError, if host_Greenfunction_folder does not have exactly one parent
* InputValidationError, if host_Greenfunction does not have an input node impurity_info
* InputValidationError, if host_Greenfunction was not a KKRFLEX calculation
"""
# get mandatory input nodes (extract host_Greenfunction_folder)
host_parent = self.inputs.host_Greenfunction_folder
# extract parent calculation
parent_calcs = host_parent.get_incoming(node_class=CalcJob)
n_parents = len(parent_calcs.all_link_labels())
if n_parents != 1:
raise UniquenessError(
'Input RemoteData is child of {} '
'calculation{}, while it should have a single parent'
''.format(n_parents, '' if n_parents == 0 else 's')
)
else:
parent_calc = parent_calcs.first().node
# extract impurity_info
if 'impurity_info' in self.inputs:
imp_info_inputnode = self.inputs.impurity_info
if not isinstance(imp_info_inputnode, Dict):
raise InputValidationError('impurity_info not of type Dict')
if 'impurity_info' in parent_calc.get_incoming().all_link_labels():
imp_info = parent_calc.get_incoming().get_node_by_label('impurity_info')
else:
imp_info = None
if imp_info is None:
raise InputValidationError('host_Greenfunction calculation does not have an input node impurity_info')
found_impurity_inputnode = True
found_host_parent = True
else:
if 'impurity_info' in parent_calc.get_incoming().all_link_labels():
imp_info = parent_calc.get_incoming().get_node_by_label('impurity_info')
else:
imp_info = None
if imp_info is None:
raise InputValidationError('host_Greenfunction calculation does not have an input node impurity_info')
found_impurity_inputnode = False
# if impurity input is seperate input, check if it is the same as
# the one from the parent calc (except for 'Zimp'). If that's not the
# case, raise an error
if found_impurity_inputnode and found_host_parent:
check_consistency_imp_info = False
#TODO: implement also 'ilayer_center' check
if 'imp_cls' in imp_info_inputnode.get_dict().keys():
input_imp_cls_arr = array(imp_info_inputnode.get_dict()['imp_cls'])
parent_imp_cls_arr = array(imp_info.get_dict()['imp_cls'])
is_identical = array_equal(input_imp_cls_arr[:, 0:4], parent_imp_cls_arr[:, 0:4])
if is_identical:
check_consistency_imp_info = True
else:
self.report('impurity_info node from input and from previous GF calculation are NOT compatible!.')
elif imp_info_inputnode.get_dict().get('Rcut') == imp_info.get_dict().get('Rcut'):
check_consistency_imp_info = True
try:
if (
imp_info_inputnode.get_dict().get('hcut') == imp_info.get_dict().get('hcut') and
imp_info_inputnode.get_dict().get('cylinder_orient')
== imp_info.get_dict().get('cylinder_orient') and
imp_info_inputnode.get_dict().get('Rimp_rel') == imp_info.get_dict().get('Rimp_rel')
):
self.report('impurity_info node from input and from previous GF calculation are compatible')
check_consistency_imp_info = True
else:
self.report(
'impurity_info node from input and from previous GF calculation are NOT compatible!. '
'Please check your impurity_info nodes for consistency.'
)
check_consistency_imp_info = False
except AttributeError:
self.report(
'Non default values of the impurity_info node from input and from previous '
"GF calculation are compatible. Default values haven't been checked"
)
check_consistency_imp_info = True
else:
self.report(
'impurity_info node from input and from previous GF calculation are NOT compatible!. '
'Please check your impurity_info nodes for consistency.'
)
check_consistency_imp_info = False
if check_consistency_imp_info:
imp_info = imp_info_inputnode
else:
raise InputValidationError('impurity_info nodes (input and GF calc) are not compatible')
# check if host parent was KKRFLEX calculation
hostfolder = parent_calc.outputs.retrieved
with hostfolder.open(KkrCalculation._DEFAULT_INPUT_FILE) as fhandle:
# use read_keywords_from_inputcard of kkrparams class
params_host_calc = kkrparams(params_type='kkr')
params_host_calc.read_keywords_from_inputcard(inputcard=fhandle)
if 'RUNOPT' not in list(params_host_calc.get_dict().keys()):
host_ok = False
elif 'KKRFLEX' not in params_host_calc.get_dict().get('RUNOPT', []):
host_ok = False
else:
host_ok = True
if not host_ok:
raise InputValidationError('host_Greenfunction calculation was not a KKRFLEX run')
# extract information from Efshift host GF input node (not mandatory)
if 'host_Greenfunction_folder_Efshift' in self.inputs:
host_parent_Efshift = self.inputs.host_Greenfunction_folder_Efshift
parent_calcs_Efshift = host_parent_Efshift.get_incoming(node_class=CalcJob)
parent_calc_Efshift = parent_calcs_Efshift.first().node
hostfolder_Efshift = parent_calc_Efshift.outputs.retrieved
else:
hostfolder_Efshift = None
kkrflex_file_paths = {}
for filename in self._ALL_KKRFLEX_FILES:
if filename in hostfolder.list_object_names():
kkrflex_file_paths[filename] = hostfolder
# take tmat and green file from Fermi level overwrite directory (second GF_writeout calculation)
if hostfolder_Efshift is not None and filename in [self._KKRFLEX_TMAT, self._KKRFLEX_GREEN]:
if filename in hostfolder_Efshift.list_object_names():
kkrflex_file_paths[filename] = hostfolder_Efshift
# extract shapes array from parameters read from inputcard
shapes = params_host_calc.get_dict().get('<SHAPE>', None)
if shapes is None:
# fallback if SHAPES is not explicityl set (assume each atom has it's own shapefun
shapes = range(1, params_host_calc.get_dict().get('NAEZ') + 1)
elif type(shapes) == int:
shapes = [shapes]
# extract input structure and voro_parent to get shapefun in next step
try:
structure, voro_parent = VoronoiCalculation.find_parent_structure(parent_calc)
except:
raise InputValidationError('No structure node found from host GF parent')
# extract shapefun path for read-in
shapefun_path = {}
if VoronoiCalculation._SHAPEFUN in voro_parent.outputs.retrieved.list_object_names():
shapefun_path = voro_parent.outputs.retrieved
else:
shapefun_path = None
return imp_info, kkrflex_file_paths, shapefun_path, shapes, parent_calc, params_host_calc, structure
[docs] def _extract_and_write_config(self, parent_calc_folder, params_host, parameters, tempfolder, GFhost_folder):
"""
fill kkr params for KKRimp and write config file
also writes kkrflex_llyfac file if Lloyd is used in the host system
"""
# initialize kkrimp parameter set with default values
params_kkrimp = kkrparams(
params_type='kkrimp',
NPAN_LOGPANELFAC=2,
RADIUS_MIN=-1,
NCOLL=0,
SPINORBIT=0,
SCFSTEPS=1,
IMIX=0,
MIXFAC=0.05,
ITDBRY=20,
BRYMIX=0.05,
QBOUND=10**-7,
RUNFLAG=[],
TESTFLAG=[],
HFIELD=[0.0, 0],
CALCFORCE=0,
CALCJIJMAT=0,
CALCORBITALMOMENT=0,
ICST=2
)
# keys that are being overwritten from host calculation settings
keys_overwrite = [
'NSPIN', 'KVREL', 'XC', 'INS', 'ICST', 'RADIUS_LOGPANELS', 'NPAN_EQ', 'NPAN_LOG', 'NCHEB', 'QBOUND'
]
for key in keys_overwrite:
if key == 'XC':
key0 = 'KEXCOR'
elif key == 'RADIUS_LOGPANELS':
key0 = 'R_LOG'
elif key == 'MIXFAC':
key0 = 'STRMIX'
else:
key0 = key
val = params_host.get_value(key0)
if val is not None:
params_kkrimp.set_value(key, val)
# settings for SOC solver
runopts = params_host.get_value('RUNOPT')
if 'NEWSOSOL' in runopts:
params_kkrimp.set_multiple_values(NCOLL=1, SPINORBIT=1, CALCORBITALMOMENT=1, TESTFLAG=['tmatnew'])
else:
params_kkrimp.set_multiple_values(NCOLL=0, SPINORBIT=0, CALCORBITALMOMENT=0, TESTFLAG=[])
# extract input RUNFLAGS
runflag = None
if parameters is not None:
for (key, val) in parameters.get_set_values():
if key == 'RUNFLAG':
runflag = list(val)
if runflag is None:
runflag = []
# special settings
runopts = params_host.get_value('RUNOPT')
if 'SIMULASA' in runopts or (params_kkrimp.get_value('NCOLL') > 0 and params_kkrimp.get_value('INS') == 0):
runflag.append('SIMULASA')
# take care of LLYsimple (i.e. Lloyd in host system)
if 'LLOYD' in runopts:
# add runflag for imp code
runflag.append('LLYsimple')
# also extract renormalization factor and create kkrflex_llyfac file (contains one value only)
with GFhost_folder.open('output.000.txt') as f:
txt = f.readlines()
iline = search_string('RENORM_LLY: Renormalization factor of total charge', txt)
if iline >= 0:
llyfac = txt[iline].split()[-1]
# now write kkrflex_llyfac to tempfolder where later on config file is also written
with tempfolder.open(self._KKRFLEX_LLYFAC, 'w') as f2:
f2.writelines([llyfac])
# now set runflags
params_kkrimp.set_value('RUNFLAG', runflag)
# overwrite keys if found in parent_calc (previous KKRimp calculation)
# here `parent_calc_folder` is the `remote` output node of the previous KKRimp calculation
if parent_calc_folder is not None:
parent_calc = parent_calc_folder.get_incoming().get_node_by_label('remote_folder')
params_parent = parent_calc.get_incoming().get_node_by_label('parameters')
else:
params_parent = None
if params_parent is not None:
params_parent = kkrparams(params_type='kkrimp', **params_parent.get_dict())
for (key, val) in params_parent.get_set_values():
self._check_key_setting_consistency(params_kkrimp, key, val)
params_kkrimp.set_value(key, val)
# finally overwrite from input parameters
if parameters is not None:
for (key, val) in parameters.get_set_values():
self._check_key_setting_consistency(params_kkrimp, key, val)
params_kkrimp.set_value(key, val)
# special run mode: calculation of Jijs
if parameters.get_value('CALCJIJMAT') is not None and parameters.get_value('CALCJIJMAT') == 1:
self.report('Found CALCJIJMAT=1: trigger JIJ mode which overwrites IMIX, MIXFAC, SCFSTEPS and RUNFLAGs')
# settings in config file
runflag.append('force_angles')
# take care of LDA+U
if 'settings_LDAU' in self.inputs:
# this prevents mixing LDAU potential in between iterations
runflag.append('freezeldau')
# now add runflags
params_kkrimp.set_multiple_values(IMIX=0, MIXFAC=0., SCFSTEPS=3, RUNFLAG=runflag)
# for DOS mode add flag to writeout Jij info energy resolved (this will be retrieved if )
testflag = params_kkrimp.get_value('TESTFLAG')
testflag.append('Jij(E)')
params_kkrimp.set_value('TESTFLAG', testflag)
# extract NATOM from atominfo file
with GFhost_folder.open(self._KKRFLEX_ATOMINFO) as file:
atominfo = file.readlines()
itmp = search_string('NATOM', atominfo)
if itmp >= 0:
natom = int(atominfo[itmp + 1].split()[0])
else:
raise ValueError('Could not extract NATOM value from kkrflex_atominfo')
# now write kkrflex_angle file
with tempfolder.open(self._KKRFLEX_ANGLE, 'w') as kkrflex_angle_file:
for istep in range(3):
for iatom in range(natom):
if istep == 0:
kkrflex_angle_file.write(f' 0.0 0.0 1\n')
elif istep == 1:
kkrflex_angle_file.write(f' 90.0 0.0 1\n')
else:
kkrflex_angle_file.write(f' 90.0 90.0 1\n')
# write kkrflex_angle file
# DOES NOT WORK TOGETHER WITH JIJ mode!!!
if 'initial_noco_angles' in self.inputs:
self.report('Found `initial_noco_angles` input node, writing kkrflex_angle file')
# check if calculation is no Jij run
if parameters.get_value('CALCJIJMAT') is not None and parameters.get_value('CALCJIJMAT') == 1:
raise InputValidationError('ERROR: ')
# extract NATOM from atominfo file
with GFhost_folder.open(self._KKRFLEX_ATOMINFO) as file:
atominfo = file.readlines()
itmp = search_string('NATOM', atominfo)
if itmp >= 0:
natom = int(atominfo[itmp + 1].split()[0])
else:
raise ValueError('Could not extract NATOM value from kkrflex_atominfo')
# extract values from input node
thetas = self.inputs.initial_noco_angles['theta']
if len(thetas) != natom:
raise InputValidationError(
'Error: `theta` list in `initial_noco_angles` input node needs to have the same length as number of atoms in the impurity cluster!'
)
phis = self.inputs.initial_noco_angles['phi']
if len(phis) != natom:
raise InputValidationError(
'Error: `phi` list in `initial_noco_angles` input node needs to have the same length as number of atoms in the impurity cluster!'
)
fix_dirs = self.inputs.initial_noco_angles['fix_dir']
if len(fix_dirs) != natom:
raise InputValidationError(
'Error: `fix_dir` list in `initial_noco_angles` input node needs to have the same length as number of atoms in the impurity cluster!'
)
# now write kkrflex_angle file
with tempfolder.open(self._KKRFLEX_ANGLE, 'w') as kkrflex_angle_file:
for iatom in range(natom):
theta, phi, fix_dir = thetas[iatom], phis[iatom], fix_dirs[iatom]
# check consistency
if theta < 0 or theta > 180:
raise InputValidationError(
f'Error: theta value out of range (0..180): iatom={iatom}, theta={theta}'
)
if phi < 0 or phi > 360:
raise InputValidationError(f'Error: phi value out of range (0..360): iatom={iatom}, phi={phi}')
# write line
kkrflex_angle_file.write(f' {theta} {phi} {fix_dir}\n')
# write config.cfg
with tempfolder.open(self._CONFIG, u'w') as config_file:
params_kkrimp.fill_keywords_to_inputfile(output=config_file)
[docs] def _change_atominfo(self, imp_info, kkrflex_file_paths, tempfolder):
"""
change kkrflex_atominfo to match impurity case
"""
# read in atominfo file as it is written out
with kkrflex_file_paths[KkrCalculation._KKRFLEX_ATOMINFO].open(KkrCalculation._KKRFLEX_ATOMINFO) as file:
atominfo = file.readlines()
#TODO implement logic to extract this info from imp_info
replace_zatom_imp = []
# read scoef for comparison with Rimp_rel
scoef = []
with tempfolder.open(KkrCalculation._SCOEF, u'r') as scoeffile:
scoef = loadtxt(scoeffile, skiprows=1)[:, :3]
# find replaceZimp list from Zimp and Rimp_rel
imp_info_dict = imp_info.get_dict()
Zimp_list = imp_info_dict.get(u'Zimp')
if type(Zimp_list) != list:
Zimp_list = [Zimp_list] # fast fix for cases when Zimp is not a list but a single value
Rimp_rel_list = imp_info_dict.get(u'Rimp_rel', [[0, 0, 0]])
self.report(f'DEBUG: Rimp_rel_list: {Rimp_rel_list}.')
for iatom in range(len(Zimp_list)):
rtmp = array(Rimp_rel_list[iatom])[:3]
self.report(f'INFO: Rimp_rel {iatom}, {rtmp}')
diff = sqrt(sum((rtmp - scoef)**2, axis=1))
Zimp = Zimp_list[iatom]
ipos_replace = where(diff == diff.min())[0][0]
replace_zatom_imp.append([ipos_replace, Zimp])
for (iatom, zimp) in replace_zatom_imp:
tmp = atominfo[iatom + 4].split()
x, y, z = float(tmp[0]), float(tmp[1]), float(tmp[2])
zatom = float(tmp[3])
virt, remove, lmax = int(tmp[4]), int(tmp[5]), int(tmp[6])
zatom = zimp
tmp = u' %24.16f %24.16f %24.16f %5.2f %4i %4i %4i\n' % (x, y, z, zatom, virt, remove, lmax)
atominfo[iatom + 4] = tmp
# write atominfo file
with tempfolder.open(self._KKRFLEX_ATOMINFO, u'w') as atominfofile:
atominfofile.writelines(atominfo)
[docs] def _get_pot_and_shape(
self, imp_info, shapefun, shapes, impurity_potential, parent_calc_folder, tempfolder, structure
):
"""
write shapefun from impurity info and host shapefun and copy imp. potential
returns: file handle to potential file
"""
imp_info_dict = imp_info.get_dict()
# create scoef file
if 'imp_cls' not in imp_info_dict:
# this means cluster is found from parameters in imp_info
# extract cluster settings
Rcut = imp_info_dict.get('Rcut', None)
hcut = imp_info_dict.get('hcut', -1.)
cylinder_orient = imp_info_dict.get('cylinder_orient', [0., 0., 1.])
ilayer_center = imp_info_dict.get('ilayer_center', 0)
# now write scoef file
self.report('Input parameters for make_scoef read in correctly!')
with tempfolder.open(KkrCalculation._SCOEF, 'w') as scoef_file:
make_scoef(structure, Rcut, scoef_file, hcut, cylinder_orient, ilayer_center)
else:
# this means the full imp cluster is given in the input
self.report(f"Write scoef from imp_cls input! {len(imp_info_dict.get('imp_cls'))}")
with tempfolder.open(KkrCalculation._SCOEF, 'w') as scoef_file:
write_scoef_full_imp_cls(imp_info, scoef_file)
# now create impurity shapefun (reads scoef file)
with tempfolder.open(KkrCalculation._SCOEF, u'r') as scoef_file:
with tempfolder.open(KkrimpCalculation._SHAPEFUN, u'w') as shapefun_new:
with shapefun.open(KkrimpCalculation._SHAPEFUN, u'r') as shapefun_file:
shapelen = len(shapefun_file.readlines())
if shapelen > 1:
modify_potential().shapefun_from_scoef(scoef_file, shapefun_file, shapes, shapefun_new)
# get path of tempfolder
with tempfolder.open('.dummy', 'w') as tmpfile:
tempfolder_path = os.path.dirname(tmpfile.name)
# find path to input potential
if impurity_potential is not None:
potfile_name, potfile_folder = impurity_potential.filename, impurity_potential
elif parent_calc_folder is not None:
self.report(
f'parent_calc_folder {parent_calc_folder} {parent_calc_folder.get_incoming().all_link_labels()}'
)
retrieved = parent_calc_folder.get_incoming(node_class=CalcJobNode
).first().node.get_outgoing().get_node_by_label('retrieved')
self.report(f'potfile {retrieved} {self._OUT_POTENTIAL}')
# extract file from host's tarball (extract to tempfolder and use from there, this way the unnessesary files are deleted once submission is done)
tar_filenames = []
if self._FILENAME_TAR in retrieved.list_object_names():
# get path of tarfile
with retrieved.open(self._FILENAME_TAR) as tf:
tfpath = tf.name
# extract file from tarfile of retrieved to tempfolder
with tarfile.open(tfpath) as tf:
tar_filenames = [ifile.name for ifile in tf.getmembers()]
self.report(f'extract potfile from tarball {tar_filenames}')
filename = self._OUT_POTENTIAL
if filename in tar_filenames:
tf.extract(filename, tempfolder_path) # extract to tempfolder
else: # otherwise copy from retrieved to tempfolder (rest of calculation needs files to be in tempfolder)
filename = self._OUT_POTENTIAL
if filename in retrieved.list_object_names():
with tempfolder.open(filename, u'w') as newfile:
with retrieved.open(filename, u'r') as oldfile:
newfile.writelines(oldfile.readlines())
self.report('copied potfile from retrieved to tempfolder')
# now out_potential is in tempfolder (either copied or extracted) and can be copied from there
potfile_name, potfile_folder = self._OUT_POTENTIAL, tempfolder
else:
raise InputValidationError(
'ERROR in _get_pot_and_shape: neither impurity potential nor parent_calc_folder given!'
)
# copy potential to correct input name ('potential')
with potfile_folder.open(potfile_name, u'r') as oldfile:
with tempfolder.open(self._POTENTIAL, u'w') as newfile:
newfile.writelines(oldfile.readlines())
# remove old potential file if still present (saves unnecessary copying)
if self._OUT_POTENTIAL in os.listdir(tempfolder_path):
os.remove(os.path.join(tempfolder_path, self._OUT_POTENTIAL))
if '.dummy' in os.listdir(tempfolder_path):
os.remove(os.path.join(tempfolder_path, '.dummy'))
[docs] def _check_key_setting_consistency(self, params_kkrimp, key, val):
"""
Check if key/value pair that is supposed to be set is not in conflict with previous settings of parameters in params_kkrimp
"""
param_ok = True
#TODO implement checks
if not param_ok:
raise ValueError(f'Trying to set key "{key}" with value "{val}" which is in conflict to previous settings!')
[docs] def get_run_test_opts(self, parameters):
"""Extract run and test options from input parameters"""
runopts = parameters.get_dict().get('RUNFLAG')
if runopts is None:
runopts = []
testopts = parameters.get_dict().get('TESTFLAG')
if testopts is None:
testopts = []
allopts = runopts + testopts
return allopts
[docs] def add_lmdos_files_to_retrieve(self, tempfolder, allopts, retrieve_list, kkrflex_file_paths):
"""Add DOS files to retrieve list"""
if 'lmdos' in allopts or 'ldos' in allopts:
# extract NSPIN
with tempfolder.open(self._CONFIG) as file:
config = file.readlines()
itmp = search_string('NSPIN', config)
if itmp >= 0:
nspin = int(config[itmp].split()[-1])
else:
raise ValueError('Could not extract NSPIN value from config.cfg')
# check if mode is Jij
itmp = search_string('CALCJIJMAT', config)
if itmp >= 0:
calcjijmat = int(config[itmp].split()[-1])
else:
raise ValueError('Could not extract CALCJIJMAT value from config.cfg')
# extract NATOM from atominfo file
with tempfolder.open(self._KKRFLEX_ATOMINFO) as file:
atominfo = file.readlines()
itmp = search_string('NATOM', atominfo)
if itmp >= 0:
natom = int(atominfo[itmp + 1].split()[0])
else:
raise ValueError('Could not extract NATOM value from kkrflex_atominfo')
# loop over atoms and spins to add DOS output files accordingly
for iatom in range(1, natom + 1):
for ispin in range(1, nspin + 1):
retrieve_list.append((self._OUT_LDOS_BASE % (iatom, ispin)).replace(' ', '0'))
retrieve_list.append((self._OUT_LDOS_INTERPOL_BASE % (iatom, ispin)).replace(' ', '0'))
retrieve_list.append((self._OUT_LMDOS_BASE % (iatom, ispin)).replace(' ', '0'))
retrieve_list.append((self._OUT_LMDOS_INTERPOL_BASE % (iatom, ispin)).replace(' ', '0'))
# add Jij of E file if Jij mode
if calcjijmat > 0:
with kkrflex_file_paths[self._KKRFLEX_TMAT].open(self._KKRFLEX_TMAT, 'r') as f:
txt = []
for iline in range(3):
txt.append(f.readline())
nepts = int(txt[1].split()[3])
for ie in range(1, nepts + 1):
retrieve_list.append(self._OUT_JIJ_OF_E_BASE % ie) # energy resolved values
retrieve_list.append((self._OUT_JIJ_OF_E_BASE.replace('IE', 'IE_int')) % ie) # integrated values
return retrieve_list
[docs] def adapt_retrieve_tmatnew(self, tempfolder, allopts, retrieve_list):
"""Add out_magneticmoments and orbitalmoments files to retrieve list"""
# extract NSPIN value
with tempfolder.open(self._CONFIG) as file:
config = file.readlines()
itmp = search_string('NSPIN', config)
if itmp >= 0:
nspin = int(config[itmp].split()[-1])
else:
raise ValueError('Could not extract NSPIN value from config.cfg')
# change retrieve list
if 'tmatnew' in allopts and nspin > 1:
retrieve_list.append(self._OUT_MAGNETICMOMENTS)
with tempfolder.open(self._CONFIG) as file:
outorb = file.readlines()
itmp = search_string('CALCORBITALMOMENT', outorb)
if itmp >= 0:
calcorb = int(outorb[itmp].split()[-1])
else:
calcorb = 0
if calcorb == 1:
retrieve_list.append(self._OUT_ORBITALMOMENTS)
return retrieve_list
[docs] def init_ldau(self, tempfolder, retrieve_list, parent_calc_folder):
"""
Check if settings_LDAU is in input and set up LDA+U calculation. Reuse old ldaupot of parent_folder contains a file ldaupot.
"""
# first check if settings_LDAU is in inputs
if 'settings_LDAU' not in self.inputs:
# do nothing
return retrieve_list
else:
# this means we need to set up LDA+U
#TODO: check consistency of settings_LDAU
# add ldaupot to retrieve and local copy lists
retrieve_list.append(self._LDAUPOT)
# add runopt LDA+U
with tempfolder.open(self._CONFIG) as file:
config = file.readlines()
itmp = search_string('RUNFLAG', config)
if itmp >= 0:
config[itmp] = config[itmp].replace('\n', ' LDA+U \n')
# overwrite config.cfg
with tempfolder.open(self._CONFIG, 'w') as file:
file.writelines(config)
else:
raise ValueError('Could not find RUNFLAG in config.cfg to add LDA+U option')
# now create ldaupot file
self.create_or_update_ldaupot(parent_calc_folder, tempfolder)
return retrieve_list
[docs] def create_or_update_ldaupot(self, parent_calc_folder, tempfolder):
"""
Writes ldaupot to tempfolder.
If parent_calc_folder is found and it contains an onld ldaupot, we reuse the values for wldau, uldau and phi from there.
"""
# extract Fermi energy from potential file
with tempfolder.open(self._POTENTIAL) as potfile:
ef_Ry = get_ef_from_potfile(potfile)
# extract NATOM from atominfo file
with tempfolder.open(self._KKRFLEX_ATOMINFO) as file:
atominfo = file.readlines()
itmp = search_string('NATOM', atominfo)
if itmp >= 0:
natom = int(atominfo[itmp + 1].split()[0])
else:
raise ValueError('Could not extract NATOM value from kkrflex_atominfo')
# get old ldaupot file
reuse_old_ldaupot = self.get_old_ldaupot(parent_calc_folder, tempfolder)
# settings dict (defines U, J etc.)
ldau_settings = self.inputs.settings_LDAU.get_dict()
if reuse_old_ldaupot:
# reuse wldau, ildau and phi from old ldaupot file
# Attention the first number needs to be non-zero
txt = get_ldaupot_text(ldau_settings, ef_Ry, natom, initialize=False)
# now we read the old file
with tempfolder.open(self._LDAUPOT + '_old', 'r') as ldaupot_file:
txt0 = ldaupot_file.readlines()
# find start of wldau etc.
ii = 0
for line in txt0:
if 'wldau' in line:
break
ii += 1
txt0 = txt0[ii:]
#remove last line (is replace from txt0)
txt.pop(-1)
# put new header and old bottom together
newtxt = txt + ['\n'] + txt0
else:
# initialize ldaupot file
# Attention: here the first number needs to be 0 which triggers generating initial values in KKRimp
newtxt = get_ldaupot_text(ldau_settings, ef_Ry, natom, initialize=True)
# now write to file
with tempfolder.open(self._LDAUPOT, 'w') as out_filehandle:
out_filehandle.writelines(newtxt)
[docs] def get_old_ldaupot(self, parent_calc_folder, tempfolder):
"""
Copy old ldaupot from retrieved of parent or extract from tarball.
If no parent_calc_folder is present this step is skipped.
"""
has_ldaupot = False
if parent_calc_folder is not None:
retrieved = parent_calc_folder.get_incoming(node_class=CalcJobNode
).first().node.get_outgoing().get_node_by_label('retrieved')
# extract file from host's tarball (extract to tempfolder and use from there, this way the unnessesary files are deleted once submission is done)
has_ldaupot = self.get_ldaupot_from_retrieved(retrieved, tempfolder)
return has_ldaupot
[docs] def get_remote_symlink(self, local_copy_list):
"""Check if host GF is found on remote machine and reuse from there"""
remote_symlink_list = []
# extract remote computer information
code = self.inputs.code
comp = code.computer
# set upload dir (get the remote username and try 5 times if there was a connection error
remote_user = get_username(comp)
workdir = comp.get_workdir().format(username=remote_user)
GFpath_remote = os.path.join(workdir, self._DIRNAME_GF_UPLOAD)
self.report(f'local_copy_list: {local_copy_list}')
# extract GF information from retrieved folder of host GF calc
uuid_GF_calc = local_copy_list[0][0]
GF_local_copy_info = [i for i in local_copy_list if i[1] == self._KKRFLEX_GREEN]
TM_local_copy_info = [i for i in local_copy_list if i[1] == self._KKRFLEX_TMAT]
if len(TM_local_copy_info) > 0:
TM_local_copy_info = TM_local_copy_info[0]
else:
TM_local_copy_info = None
if len(GF_local_copy_info) > 0:
GF_local_copy_info = GF_local_copy_info[0]
else:
GF_local_copy_info = None
# open transport to remote computer
with comp.get_transport() as connection:
# do this for GMAT if it was found in retreived folder, otherwise we must assume to find it on the remote
filename = self._KKRFLEX_GREEN
GF_remote_path = os.path.join(GFpath_remote, uuid_GF_calc, filename)
# check if file exists on remote
if connection.isfile(GF_remote_path):
# remove GF from local copy list and add to remote symlink list
if GF_local_copy_info is not None:
local_copy_list.remove(GF_local_copy_info)
remote_symlink_list.append((comp.uuid, GF_remote_path, filename))
# do the same for TMAT
filename = self._KKRFLEX_TMAT
TM_remote_path = os.path.join(GFpath_remote, uuid_GF_calc, filename)
# check if file exists on remote
if connection.isfile(TM_remote_path):
# remove TMAT from local copy list and add to remote symlink list
if TM_local_copy_info is not None:
local_copy_list.remove(TM_local_copy_info)
remote_symlink_list.append((comp.uuid, TM_remote_path, filename))
# print symlink and local copy list (for debugging purposes)
self.report(f'local_copy_list: {local_copy_list}')
self.report(f'symlink_list: {remote_symlink_list}')
# now return updated remote_symlink and local_copy lists
return remote_symlink_list, local_copy_list
[docs] @classmethod
def get_ldaupot_from_retrieved(self, retrieved, tempfolder):
"""
Extract ldaupot from output of KKRimp retreived to tempfolder.
The extracted file in tempfolder will be named ldaupot_old.
returns True of ldaupot was found, otherwise returns False
"""
tar_filenames = []
if self._FILENAME_TAR in retrieved.list_object_names():
# get path of tempfolder
with tempfolder.open('.dummy', 'w') as tmpfile:
tempfolder_path = os.path.dirname(tmpfile.name)
# get path of tarfile
with retrieved.open(self._FILENAME_TAR) as tf:
tfpath = tf.name
# extract file from tarfile of retrieved to tempfolder
with tarfile.open(tfpath) as tf:
tar_filenames = [ifile.name for ifile in tf.getmembers()]
if self._LDAUPOT in tar_filenames:
has_ldaupot = True
tf.extract(self._LDAUPOT, tempfolder_path) # extract to tempfolder
# copy to new filename
if has_ldaupot:
with tempfolder.open(self._LDAUPOT + '_old', u'w') as newfile:
with tempfolder.open(self._LDAUPOT, u'r') as oldfile:
newfile.writelines(oldfile.readlines())
# remove dummy file
if '.dummy' in os.listdir(tempfolder_path):
os.remove(os.path.join(tempfolder_path, '.dummy'))
else: # otherwise copy from retrieved to tempfolder (rest of calculation needs files to be in tempfolder)
if self._LDAUPOT in retrieved.list_object_names():
has_ldaupot = True
with tempfolder.open(self._LDAUPOT + '_old', u'w') as newfile:
with retrieved.open(self._LDAUPOT, u'r') as oldfile:
newfile.writelines(oldfile.readlines())
return has_ldaupot
[docs] def add_jij_files(self, tempfolder, retrieve_list):
"""
check if KkrimpCalculation is in Jij mode and add OUT_JIJMAT to retrieve list if needed
"""
# check if Jij mode is set in config file and add OUT_JIJMAT to retrieved list if needed
with tempfolder.open(self._CONFIG) as file:
config = file.readlines()
itmp = search_string('CALCJIJMAT', config)
if itmp >= 0:
calcjijmat = int(config[itmp].split()[-1])
if calcjijmat > 0:
retrieve_list.append(self._OUT_JIJMAT)
else:
raise ValueError('Could not extract CALCJIJMAT value from config.cfg')
return retrieve_list
[docs]def get_ldaupot_text(ldau_settings, ef_Ry, natom, initialize=True):
"""
create the text for the ldaupot file
"""
from masci_tools.io.common_functions import get_Ry2eV
eV2Ry = 1. / get_Ry2eV()
# these lists are extracted from ldau_settings node and then written to the ldaupot file
iatoms_ldau = []
lopt = []
ueff = []
jeff = []
eref = []
# extract values from ldau_settings
for key, val in ldau_settings.items():
if 'iatom' in key:
iatoms_ldau.append(int(key.split('=')[1]))
lopt.append(val['L'])
# add values in Ry units
jeff.append(val['J'] * eV2Ry)
ueff.append(val['U'] * eV2Ry)
eref.append(val['Eref_EF'] * eV2Ry + ef_Ry)
if initialize:
# this means we initialize this file
ldaurun = 0
else:
# this means wldau etc are reused (need to be added to the file)
ldaurun = 1
# collect text which is written to ldaupot
txt = [f'{ldaurun} ']
txt_lopt, txt_jeff, txt_ueff, txt_eref = [], [], [], []
ii = 0
for iatom in range(natom):
if iatom not in iatoms_ldau:
txt_lopt += [f'{-1} ']
txt_jeff += [f'{0.0} ']
txt_ueff += [f'{0.0} ']
txt_eref += [f'{0.0} ']
else:
txt_lopt += [f'{lopt[ii]} ']
txt_jeff += [f'{jeff[ii]} ']
txt_ueff += [f'{ueff[ii]} ']
txt_eref += [f'{eref[ii]} ']
ii += 1
txt += ['\n'] + txt_lopt + ['\n'] + txt_ueff + ['\n'] + txt_jeff + ['\n'] + txt_eref
txt += ['\nwldau\nuldau\nphi\n']
# add initial matrices
if initialize and 'initial_matrices' in ldau_settings.keys():
# save for consistency check
nldauatoms = ii
# change first number from 0 to 1 to signal reading-in instead of calculating initial matrices
txt[0] = '1 '
# remove last dummy line, will be replaced with starting values now
txt[-1] = '\n'
txt_wldau = ['wldau\n']
txt_uldau = ['uldau\n']
txt_phi = ['phi\n']
ii = 0
for iatom in range(natom):
if iatom in iatoms_ldau:
txt_wldau += [f'atom {iatom+1}\n'] + ldau_settings['initial_matrices'][f'iatom={iatom}']['wldau']
txt_uldau += [f'atom {iatom+1}\n'] + ldau_settings['initial_matrices'][f'iatom={iatom}']['uldau']
txt_phi += [f'atom {iatom+1}\n'] + ldau_settings['initial_matrices'][f'iatom={iatom}']['phi']
ii += 1 # count number of atoms
# consistency check
if nldauatoms != ii:
raise ValueError('initial_matrices input inconsistent')
# add additional lines to txt
txt = txt + txt_wldau + txt_uldau + txt_phi
return txt