Source code for aiida_kkr.calculations.kkrnano

# -*- coding: utf-8 -*-
Input plug-in for a KKRnano calculation.

import numpy as np
from import get_Ang2aBohr
from aiida.orm import CalcJobNode, Dict, Bool, Float, RemoteData, StructureData
from aiida.engine import CalcJob
from aiida.common import NotExistent
from aiida.common.datastructures import (CalcInfo, CodeInfo)
from aiida.common.exceptions import InputValidationError, UniquenessError
from import get_remote, get_parent
from import StrucWithPotData

__copyright__ = (u'Copyright (c), 2021, Forschungszentrum Jülich GmbH, '
                 'IAS-1/PGI-1, Germany. All rights reserved.')
__license__ = 'MIT license, see LICENSE.txt file'
__version__ = '0.0.2'
__contributors__ = ('Markus Struckmann', 'Philipp Rüßmann')

[docs]class KKRnanoCalculation(CalcJob): """ AiiDA calculation plugin for a KKRnano calculation """ #################### # File names etc. #################### # calculation plugin version _CALCULATION_PLUGIN_VERSION = __version__ # Default input and output files _DEFAULT_INPUT_FILE = 'input.conf' # will be shown with inputcat _DEFAULT_EFERMI_FILE = 'EFERMI' _DEFAULT_NOCO_INPUT_FILE = 'nonco_angle.dat' _DEFAULT_OUTPUT_FILE = 'out' #'shell output will be shown with outputcat _CONVERT_OUTPUT_FILE = 'convert_out' _DEFAULT_OUTPUT_PREP_FILE = 'output.0.txt' _DEFAULT_NOCO_OUTPUT_FILE = 'nonco_angle_out.dat' # template.product entry point defined in setup.json _DEFAULT_PARSER = 'kkr.kkrnanoparser' # File names _SHAPEFUN = 'shapefun' _POTENTIAL = 'potential' _OUT_POTENTIAL = 'out_potential' _RBASIS = '' _DEFAULT_KKRNANO_PARA = { 'bzdivide': {'value': [10,10,10],'required': True,'description': 'number of k-points in each direction'}, 'emin': {'value': -1.2, 'unit':'Rydberg', 'required': True, 'description': 'lower energy of contour'}, 'emax': {'value': 1.0 , 'unit':'Rydberg', 'required': True,\ 'description': 'upper energy of contour (relevant only for DOS calculation)'}, 'npnt1': {'value': 3, 'unit':'Rydberg', 'required': True,\ 'description': 'number of points starting at emin, parallel to imaginary axis'}, 'npnt2': {'value': 20, 'required': True,\ 'description': 'number of points parallel to real axis starting from emin + imag part.'}, 'npnt3': {'value': 3, 'required': True,\ 'description': 'number of points parallel to real axis in interval (E_F - 30*k*T + imag, E_F + imag)'}, 'npol': {'value': 7, 'required': True,\ 'description': 'Number of Matsubara poles, npol=0 triggers DOS calculation'} , 'scfsteps': {'value': 1,'required': True, 'description': 'Number of scf steps'}, 'imix': {'value': 6,'required': True,\ 'description': "mixing method: imix = 0 -> straight mixing; imix = 1 -> straight mixing;\ imix = 4 -> Broyden's 2nd method; imix = 5 -> gen. Anderson mixing;\ imix = 6 -> Broyden's 2nd method with support for >1 atom per process" }, 'mixing': {'value': 0.01, 'required': True,\ 'description': 'straight mixing parameter'}, 'rmax': {'value': 8.0, 'required': True,\ 'description': 'Ewald sum cutoff in real space in units of lattice constants'} , 'gmax': {'value': 48.0, 'required': True,\ 'description': 'Ewald sum cutoff in reciprocal space in units of 2*Pi/alat'}, 'nsra': {'value': 2, 'required': True, 'description': '1=non-scalar-relativistic 2=scalar-relativistic'}, 'kte': {'value': 1, 'required': True,\ 'description': '1=calculate energies, -1 = total energy only, less I/O'} , 'rclust_voronoi': {'value': 2.00, 'required': True,\ 'description': 'radius of cluster used for Voronoi (Should be irrelevant for use with aiida)'}#, #"soc":{"value": True} }
[docs] @classmethod def define(cls, spec): """ define internals and inputs / outputs of calculation """ # reuse base class (i.e. CalcJob) functions super(KKRnanoCalculation, cls).define(spec) # now define input files and parser spec.inputs['metadata']['options']['parser_name'].default = cls._DEFAULT_PARSER spec.inputs['metadata']['options']['input_filename'].default = cls._DEFAULT_INPUT_FILE spec.inputs['metadata']['options']['output_filename'].default = cls._DEFAULT_OUTPUT_FILE #spec.inputs['metadata']['options']['output_prep_filename'].default = cls._DEFAULT_OUTPUT_PREP_FILE ->does not work # define input nodes (optional ones have required=False) spec.input( 'parameters', valid_type=Dict, required=False, default=lambda: Dict(dict=cls._DEFAULT_KKRNANO_PARA), help='Dict node that specifies the input parameters for KKRnano (k-point density etc.)' ) spec.input( 'nocoangles', valid_type=Dict, required=False, default=lambda: Dict(dict={}), help='Dict node that specifies the starting angles for non-colinear calculations\ (only needed in conjunction with non-colinear calculations, i. e. KORBIT=1\ (which is also necessary for SOC calculations!))' ) spec.input( 'parent_folder', valid_type=RemoteData, required=False, help='Use a node that specifies a parent KKRnano or voronoi calculation' ) spec.input( 'convert', valid_type=Bool, required=False, default=lambda: Bool(False), help='Activate to use together with set up convert code in order to retrieve potential files.' ) spec.input( 'passed_lattice_param_angs', valid_type=Float, required=False, default=lambda: Float(-10000.0), help='Use a prespecified lattice constant in Angstrom as input for KKRnano, i. e. in the input.conf file. \ Default is the length of the longest Bravais vector in the structure object used for the voronoi calculation. \ This can be useful in the context of treating supercells.' ) spec.input('strucwithpot', valid_type=StrucWithPotData, required=False) #spec.input('structure', valid_type=StructureData, required=False, default:) # define outputs spec.output('output_parameters', valid_type=Dict, required=True, help='results of the calculation') spec.default_output_node = 'output_parameters' # define exit codes, also used in parser spec.exit_code(301, 'ERROR_NO_OUTPUT_FILE', message='KKRnano output file not found') spec.exit_code(302, 'ERROR_PARSING_FAILED', message='KKRnano parser retuned an error')
[docs] def prepare_for_submission(self, tempfolder): """Create the input files from the input nodes passed to this instance of the `CalcJob`. :param tempfolder: an `aiida.common.folders.Folder` to temporarily write files on disk :return: `aiida.common.datastructures.CalcInfo` instance """ #prepare inputs parameters = self.inputs.parameters.get_dict() #passed_lattice_const = self.inputs.passed_lattice_param_angs.value nonco_angles = self.inputs.nocoangles.get_dict() #parent_calc=self.inputs.parent_folder#.get_incoming(node_class=CalcJobNode).first().node #parent_calc_calc_node=parent_calc.get_incoming(node_class=CalcJobNode).first().node #structure = self._find_parent_struc_from_voro_or_stwpd(parent_calc_calc_node)[0].get_pymatgen_structure() #parent_outfolder = parent_calc_calc_node.outputs.retrieved code = self.inputs.code #determine number of MPI processes try: num_mpi_procs = self.metadata.options.resources['tot_num_mpiprocs'] except: try: num_mpi_procs=self.metadata.options.resources['num_machines']*\ self.metadata.options.resources['num_mpiprocs_per_machine'] except: raise InputValidationError( "The total number of MPI processes could not be determined. In case of doubt: Specify `builder.metadata.options.resources['tot_num_mpiprocs']=?` Do not forget number of machines" ) #Check if convert mode has been activated convert = self.inputs.convert.value #StrucWithPot object as starting point -> contains passed_lattice_constant, shapefun, potential, and structure if hasattr(self.inputs, 'strucwithpot') and not hasattr(self.inputs, 'parent_folder'): use_strucwithpot = True strucwithpot = self.inputs.strucwithpot structure = strucwithpot.structure.get_pymatgen_structure() if self.inputs.passed_lattice_param_angs.value == -10000.0: #check if default value is used if hasattr(strucwithpot, 'specified_lattice_constant'): passed_lattice_const = strucwithpot.specified_lattice_constant.value else: passed_lattice_const = self.inputs.passed_lattice_param_angs.value else: passed_lattice_const = self.inputs.passed_lattice_param_angs.value parent_outfolder_uuid = None #use dummy so that copylists are still available elif not hasattr(self.inputs, 'strucwithpot') and hasattr(self.inputs, 'parent_folder'): use_strucwithpot = False passed_lattice_const = self.inputs.passed_lattice_param_angs.value parent_calc = self.inputs.parent_folder #.get_incoming(node_class=CalcJobNode).first().node parent_calc_calc_node = parent_calc.get_incoming(node_class=CalcJobNode).first().node #Find structure unless convert mode is activated, as for that no structure is needed if not convert: structure = self.find_parent_struc_from_voro_or_stwpd(parent_calc_calc_node)[0].get_pymatgen_structure() parent_outfolder = parent_calc_calc_node.outputs.retrieved parent_outfolder_uuid = parent_outfolder.uuid else: raise InputValidationError( 'Either `strucwithpot` or a `parent_folder` has to be provided.\ If necessary remove one of the inputs.' ) print('passed lattice constant=', passed_lattice_const) # Local copy list for continuing KKRnano calculations (Note: parent_outfolder_uuid is None if use_strucwithpot) # if necessary, change some names in order to start from a preconverged calculation # (cf. jukkr/source/KKRnano/scripts/ local_copy_list_for_continued = [(parent_outfolder_uuid, 'bin.vpotnew', 'bin.vpotnew.0'), (parent_outfolder_uuid, 'bin.vpotnew.idx', 'bin.vpotnew.0.idx'), (parent_outfolder_uuid, 'bin.meshes', 'bin.meshes.0'), (parent_outfolder_uuid, 'bin.meshes.idx', 'bin.meshes.0.idx'), (parent_outfolder_uuid, 'bin.energy_mesh', 'bin.energy_mesh.0'), (parent_outfolder_uuid, 'bin.atoms', 'bin.atoms')] #(parent_outfolder_uuid,"nonco_angle_out.dat","bin.atoms" )] #TODO: Parse noco_angle_out.dat and write a new file from that #Convert mode is not available for call using strucwithpot if use_strucwithpot and convert: raise InputValidationError('Convert mode cannot be used for call using a strucwithpot object.') if convert: if not parent_calc_calc_node.process_label == 'KKRnanoCalculation': raise InputValidationError( 'A convert process can only be started from a KKRnano calculation. Check also that convert code is used!' ) #mark files from previous calc as such (Note: parent_outfolder_uuid is None if use_strucwithpot) local_copy_list_for_continued = [ (parent_outfolder_uuid, 'bin.vpotnew', 'bin.vpotnew'), (parent_outfolder_uuid, 'bin.vpotnew.idx', 'bin.vpotnew.idx'), (parent_outfolder_uuid, 'bin.meshes', 'bin.meshes'), (parent_outfolder_uuid, 'bin.meshes.idx', 'bin.meshes.idx'), (parent_outfolder_uuid, 'bin.energy_mesh', 'bin.energy_mesh'), (parent_outfolder_uuid, 'bin.atoms', 'bin.atoms'), (parent_outfolder_uuid, 'bin.dims', 'bin.dims'), (parent_outfolder_uuid, self._DEFAULT_OUTPUT_PREP_FILE, self._DEFAULT_OUTPUT_PREP_FILE), (parent_outfolder_uuid, self._DEFAULT_OUTPUT_FILE, self._DEFAULT_OUTPUT_FILE) ] # Check inputdict parameters = self._check_input_dict(parameters, num_mpi_procs, convert) #Check if parent is a KKRnano calculation write_efermi = False if not use_strucwithpot: self._check_valid_parent(parent_outfolder) if parent_calc_calc_node.process_label == 'KKRnanoCalculation': fermi = parent_calc_calc_node.outputs.output_parameters.get_dict()['fermi_energy_in_ryd'][-1] write_efermi = True # Check if non-colinear calculation mode is activated noco = False if 'KORBIT' in parameters: if parameters['KORBIT']['value'] == 1: noco = True if 'soc' in parameters: if parameters['soc']['value'] == True: if noco == False: parameters['KORBIT']['value'] = 1 self.logger.warn( 'KORBIT was set to 1 -> SOC is implemented for the NOCO-Chebyshev solver, only! This is however not changed in the input node and might lead to inconsistencies if this feature has been added in KKRnano!' ) noco = True if noco: with, u'w') as nonco_angles_handle: self._write_nonco_angles(nonco_angles_handle, nonco_angles, structure) # Prepare and input.conf from Structure and input parameter data unless convert mode if not convert: with, u'w') as input_file_handle: self._write_input_file(input_file_handle, parameters, structure, passed_lattice_const) with, u'w') as rbasis_handle: self._write_rbasis(rbasis_handle, structure, passed_lattice_const) if write_efermi: with, u'w') as efermi_file_handle: self._write_efermi_file(efermi_file_handle, fermi) # Prepare potential and shapefun file from strucwithpot, if necessary if use_strucwithpot: with, u'w') as potential_file_handle: self._write_potential_file(potential_file_handle, strucwithpot) with, u'w') as shapefun_file_handle: self._write_shapefun_file(shapefun_file_handle, strucwithpot) # Prepare CalcInfo to be returned to aiida calcinfo = CalcInfo() calcinfo.uuid = self.uuid if not use_strucwithpot: calcinfo.local_copy_list = self._get_local_copy_list(parent_calc, local_copy_list_for_continued) if use_strucwithpot: calcinfo.local_copy_list = [] calcinfo.remote_copy_list = [] #retrieve list #TODO add NOCO calcinfo.retrieve_list = [ self._DEFAULT_OUTPUT_PREP_FILE, self._DEFAULT_OUTPUT_FILE, self._DEFAULT_NOCO_OUTPUT_FILE, 'bin.dims', self._CONVERT_OUTPUT_FILE, 'vpot*', 'DOS*', 'nonco_angle_out.dat' ] # bin.dims is added here, as this should be retrieved for generating vpot-files (negligable in size anyway) for j in range( len(local_copy_list_for_continued) ): #add the binary files that are necessary to restart a calculation to the retrieved list calcinfo.retrieve_list.append(local_copy_list_for_continued[j][1]) codeinfo = CodeInfo() if convert: codeinfo.cmdline_params = ['--convert'] codeinfo.stdout_name = self._CONVERT_OUTPUT_FILE else: codeinfo.cmdline_params = [] codeinfo.stdout_name = self._DEFAULT_OUTPUT_FILE codeinfo.code_uuid = code.uuid calcinfo.codes_info = [codeinfo] return calcinfo
def _list2string(self, list0): string = '' for item in list0: string += str(item) + ' ' return string def _array2string(self, array): if array.shape == (): return str(array) else: return self._list2string(array)
[docs] def _getParametersEntry(self, key, value): """writing out various entry types from a dict into the format suitable for KKRnano""" if type(value) == str or type(value) == int: content = f'{key} = {value}' elif type(value) == float or type(value) == np.float_: content = f"{key} = {str.replace(str(value), 'e', 'D')}" elif type(value) == np.ndarray: content = f'{key} = {self._array2string(value)}' elif type(value) == list: content = f'{key} = {self._list2string(value)}' elif type(value) is bool: value2write = 'f' if value: value2write = 't''TEST123') content = f'{key} = {value2write}' else: print(f'WARNING: Unknown datatype of entry "{value}". Assume array and proceed') try: value = np.array(value) content = f'{key} = {self._array2string(value)}' print(f'{key} = {self._array2string(value)}') return content except: self.logger.error('ERROR: Datatype cannot be used as array!') content = content + '\n' return content
[docs] def _get_lattice_constant(self, structure, passed_lattice_const): """determine whether a passed lattice constant should be used""" # default value, see above. (Probably) No one would use this large a lattice constant to replace the other if passed_lattice_const == -10000.0: lattice_param_angs = max( elif passed_lattice_const > 0.0: lattice_param_angs = passed_lattice_const'Using passed lattice constant. The vectors are scaled accordingly!') else: raise InputValidationError('The passed lattice constant must not be negative!') return lattice_param_angs
[docs] def _write_input_file(self, input_file_handle, parameters, structure, passed_lattice_const): """write the input.conf file for KKRnano""" #lattice_param_angs=max( lattice_param_angs = self._get_lattice_constant(structure, passed_lattice_const) write_list = [] #header of input.conf write_list.append('# LATTICE \n \n# lattice parameter in units of the Bohr radius \n') write_list.append(f'alat = {lattice_param_angs * get_Ang2aBohr()}') write_list.append('\n# scale basis coordinates by these factors \nbasisscale = 1.0 1.0 1.0 \n \n') latt_dict = structure.lattice.as_dict() write_list.append('#BRAVAIS\n') rbrav = np.array(latt_dict['matrix']) / lattice_param_angs directions = ['a', 'b', 'c'] for i in range(3): write_list.append(f'bravais_{directions[i]}= {rbrav[i][0]:.15f} {rbrav[i][1]:.15f} {rbrav[i][2]:.15f}\n') write_list.append('\ncartesian = t\n') #writing other parameters from parameter dict #params=parameters.get_dict() params = parameters for key in params: write_list.append(self._getParametersEntry(key, params[key]['value'])) write_list.append('# CHANGED SCRIPT') input_file_handle.writelines(write_list)
[docs] def _write_nonco_angles(self, nonco_angles_handle, nonco_angles, structure): """ write nonco_angles.dat file for KKRnano created from dictionary with structure dict={'atom':{1:{'theta':0.0,'phi':00.0, 'fix_angle_mode':1},...} The angles 'theta' (polar angle going from z- to x-direction) and 'phi' (azimuthal angle) are given in deg. 'fix_angle_mode' takes values 0,1,2,3. 0 is for relaxation of the spin-direciton, 1 is for fixing it; 2 and 3 are for constraining fields calculations. """ n_atoms = len(structure.atomic_numbers) #TODO: Adapt to KKRhost style #Create dictionary if none was given in the input if nonco_angles == {}: nonco_angles = {'atom': {}} for i in range(n_atoms): nonco_angles['atom'][i + 1] = {'theta': 0.0, 'phi': 0.0, 'fix_angle_mode': 1} if n_atoms != len(nonco_angles['atom'].keys()): raise InputValidationError( 'The number of atoms in the structure must\ match the number of atoms specified for the non-colinear angles.' ) write_list = [] for key in nonco_angles['atom']: write_list.append('{} {} {}\n'.format(nonco_angles['atom'][key]['theta'],\ nonco_angles['atom'][key]['phi'],\ nonco_angles['atom'][key]['fix_angle_mode'])) nonco_angles_handle.writelines(write_list)
[docs] def _get_rbasis_atom_symbol(self, atomic_number): """returns either an element symbol or a vacuum symbol""" #PSE element symbols symbols = ( '__', 'H', 'He', 'Li', 'Be', 'B', 'C', 'N', 'O', 'F', 'Ne', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'Cl', 'Ar', 'K', 'Ca', 'Sc', 'Ti', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'Cu', 'Zn', 'Ga', 'Ge', 'As', 'Se', 'Br', 'Kr', 'Rb', 'Sr', 'Y', 'Zr', 'Nb', 'Mo', 'Tc', 'Ru', 'Rh', 'Pd', 'Ag', 'Cd', 'In', 'Sn', 'Sb', 'Te', 'I', 'Xe', 'Cs', 'Ba', 'La', 'Ce', 'Pr', 'Nd', 'Pm', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy', 'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'W', 'Re', 'Os', 'Ir', 'Pt', 'Au', 'Hg', 'Tl', 'Pb', 'Bi', 'Po', 'At', 'Rn', 'Fr', 'Ra', 'Ac', 'Th', 'Pa', 'U', 'Np', 'Pu', 'Am', 'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No', 'Lr', 'Rf', 'Db', 'Sg', 'Bh', 'Hs', 'Mt', 'Ds', 'Rg', 'Cn', 'Uut', 'Fl', 'Uup', 'Lv', 'Uus', 'Uuo' ) if abs(atomic_number) > len( symbols ) + 30: #pymatgen stores X sites with very high random `atomic numbers` that can also be negative return symbols[0] else: return symbols[atomic_number]
[docs] def _write_rbasis(self, rbasis_handle, structure, passed_lattice_const): """write the file for KKRnano""" try: space_group_info = str(structure.get_space_group_info()) except: space_group_info = 'No space group could be identifed' write_list=[str(len(structure.atomic_numbers)),'\n',\ str(structure.composition), ', ', space_group_info,'\n'] lattice_param_angs = self._get_lattice_constant(structure, passed_lattice_const) for i in range(np.shape(structure.cart_coords)[0]): write_list.append('{} {:.15f} {:.15f} {:.15f}\n'.format(self._get_rbasis_atom_symbol(structure.atomic_numbers[i]), \ structure.cart_coords[i,0]/lattice_param_angs, \ structure.cart_coords[i,1]/lattice_param_angs,\ structure.cart_coords[i,2]/lattice_param_angs)) rbasis_handle.writelines(write_list)
[docs] def _write_efermi_file(self, efermi_handle, efermi): """ write file EFERMI necessary to restart a calculation. """ efermi_handle.writelines([str(efermi)])
[docs] def _write_potential_file(self, potential_handle, strucwithpot): """ write file potential from strucwithpot object's potential files. """ for vpot in strucwithpot.potentials: potential_handle.writelines(vpot.get_content()) #.split('\n'))
[docs] def _write_shapefun_file(self, shapefun_handle, strucwithpot): """ write file potential from strucwithpot object's potential files. """ shapefun_handle.writelines(strucwithpot.make_shapefun()) #.split('\n'))
[docs] def _get_local_copy_list(self, parent_calc_remote, local_copy_list_for_continued): """find files to copy from the parent_folder's retrieved to the input of a KKRnano calculation""" #Note: the check if a non-empty copylist is needed, i. e. if started # from a parent calculation, is done in prepare_for_submission parent_calc = parent_calc_remote.get_incoming(node_class=CalcJobNode).first().node retrieved = parent_calc.outputs.retrieved local_copy_list = [] if not self._is_KkrnanoCalc(parent_calc): # copy input potential from voronoi output local_copy_list += [(retrieved.uuid, parent_calc.process_class._OUT_POTENTIAL_voronoi, self._POTENTIAL)] # copy shapefun from voronoi output local_copy_list += [(retrieved.uuid, parent_calc.process_class._SHAPEFUN, self._SHAPEFUN)] elif parent_calc.process_label == 'KKRnanoCalculation': local_copy_list = local_copy_list_for_continued return local_copy_list
[docs] def _check_valid_parent(self, parent_calc_folder): """ Check that calc is a valid parent for a KKRnano. It can be a VoronoiCalculation, KKRnanoCalculation """ # extract parent calculation parent_calcs = parent_calc_folder.get_incoming(node_class=CalcJobNode) n_parents = len(parent_calcs.all_link_labels()) if n_parents != 1: raise UniquenessError( 'Input RemoteData is child of {} ' 'calculation{}, while it should have a single parent' ''.format(n_parents, '' if n_parents == 0 else 's') ) else: parent_calc = parent_calcs.first().node overwrite_pot = True if (not parent_calc.process_label=='KKRnanoCalculation')\ and (not parent_calc.process_label=='VoronoiCalculation'): raise ValueError('Parent Calculation has to be another KKRnano or a Voronoi calculation.')
[docs] def _check_input_dict(self, inputdict, num_mpi_procs, convert): """ checks if all essential keys are contained in the inputdict and if it has the right format """ #Check if all essential keys are present all_present = True missing_keys = [] for key in self._DEFAULT_KKRNANO_PARA: if not key in inputdict: all_present = False missing_keys.append(key) if not all_present: raise InputValidationError( 'Not all essential keys were given in the input dictionary. \ At least the following keys are missing: \ {}'.format(missing_keys) ) #Check for format keys_in_wrong_format = [] correct_format = True for key in inputdict: try: if not 'value' in inputdict[key]: correct_format = False keys_in_wrong_format.append(key) except: correct_format = False keys_in_wrong_format.append(key) if not correct_format: self.logger.warn( 'Some keys were provided in an incorrect format. Dict entries must have entry `value`\ (see e. g. KKRnano-Plugin/aiida-kkr/aiida_kkr/calculations/ At least the following keys are in an incorrect format: \ {} \nMoving on by assuming just the key `value` was missing and adapt accordingly'.format(keys_in_wrong_format) ) #try to correct incorrect entries, most likely they do not have the 'value' feature try: for key in keys_in_wrong_format: likely_value = inputdict[key] inputdict[key] = {'value': likely_value} except: raise InputValidationError( 'Some keys were provided in an incorrect format. Dict entries must have entry `value`\ (see e. g. KKRnano-Plugin/aiida-kkr/aiida_kkr/calculations/ At least the following keys are in an incorrect format: \ {}.'.format(keys_in_wrong_format) ) #Check if the values are provided in the correct data type all_correct_dtype = True keys_with_incorrect_dtypes = [] values_with_incorrect_dtypes = [] correct_dtypes_for_incorrect_keys = [] for key in self._DEFAULT_KKRNANO_PARA: default_value = self._DEFAULT_KKRNANO_PARA[key]['value'] dtype_required = type(default_value) entered_value = inputdict[key]['value'] dtype_entered = type(entered_value) if dtype_required == list: try: for item_index in range(len(default_value)): default_entry = default_value[item_index] entered_entry = entered_value[item_index] if not type(default_entry) == type(entered_entry): all_correct_dtype = False keys_with_incorrect_dtypes.append(key) values_with_incorrect_dtypes.append(str(entered_value)) correct_dtypes_for_incorrect_keys.append( f'array/list of {str(type(default_entry))} (wrong entry type)' ) except IndexError: all_correct_dtype = False keys_with_incorrect_dtypes.append(key) values_with_incorrect_dtypes.append(str(entered_value)) correct_dtypes_for_incorrect_keys.append( 'array/list of {} (wrong length, \ length should be {} )'.format(str(type(default_entry)), str(len(default_value))) ) elif not dtype_required == dtype_entered: all_correct_dtype = False keys_with_incorrect_dtypes.append(key) values_with_incorrect_dtypes.append(str(entered_value)) correct_dtypes_for_incorrect_keys.append(f'{str(type(default_entry))}') if not all_correct_dtype: #Generate helpful error message dtype_error_message = 'Some keys were provided with an incorrect datatype. These values are \n' for j in range(len(keys_with_incorrect_dtypes)): dtype_error_message+='key: {}, entered value: {}, needed datatype: {}\n'.format(keys_with_incorrect_dtypes[j],\ values_with_incorrect_dtypes[j],correct_dtypes_for_incorrect_keys[j]) raise InputValidationError(dtype_error_message) #Check if the number of mpi processes, specified with the builder matches the settings in the inputdict mpikeys = ['NTHRDS', 'EMPID', 'SMPID', 'num_atom_procs'] needed_num_mpi_procs = 1 for key in mpikeys: if key in inputdict: needed_num_mpi_procs *= inputdict[key]['value'] if num_mpi_procs % needed_num_mpi_procs != 0 and not convert: raise ( InputValidationError( "Number of MPI processes does not match! In builder.metadata.options.resources['num_mpi_procs']= {}, however according to the builder.parameters node, the needed number of MPI process is {}" .format(num_mpi_procs, needed_num_mpi_procs) ) ) elif convert and num_mpi_procs != 1: #TODO check if convert step works with more cores raise ( InputValidationError( "Number of MPI processes does not match! In builder.metadata.options.resources['num_mpi_procs']= {}, however for the convert step the needed number of MPI processes is 1." .format(num_mpi_procs) ) ) return inputdict
[docs] def _is_KkrnanoCalc(self, calc): """ check if calc contains the file out_potential """ is_KKR = False if calc.process_type == 'aiida.calculations:kkr.kkrnano': retrieved_node = calc.get_retrieved_node() if True: #'out_potential' in retrieved_node.list_object_names(): is_KKR = True return is_KKR
[docs] @classmethod def _get_struc(self, parent_calc): """ Get strucwithpot from a parent_folder (result of a calculation, typically a remote folder) """ try: return parent_calc.inputs.strucwithpot.structure except: return parent_calc.inputs.structure
[docs] @classmethod def _has_struc(self, parent_folder): """ Check if parent_folder has strucwithpot information in its input """ success = True link_labels = parent_folder.get_incoming().all_link_labels() if 'strucwithpot' not in link_labels and 'structure' not in link_labels: success = False return success
[docs] @classmethod def find_parent_struc_from_voro_or_stwpd(self, parent_folder): """ Find the StructureData node recuresively in chain of parent calculations (structure node is input to voronoi calculation or in input StrucWithPotData node of KKRnano calculation) returns structure, parent_folder """ iiter = 0 Nmaxiter = 1000 parent_folder_tmp = get_remote(parent_folder) while not self._has_struc(parent_folder_tmp) and iiter < Nmaxiter: parent_folder_tmp = get_remote(get_parent(parent_folder_tmp)) iiter += 1 if iiter % 200 == 0: print( 'Warning: find_parent_structure takes quite long (already searched {} ancestors). Stop after {}'. format(iiter, Nmaxiter) ) if self._has_struc(parent_folder_tmp): struc = self._get_struc(parent_folder_tmp) return struc, parent_folder_tmp else: raise ValueError(f'structure not found {parent_folder_tmp}')