Source code for aiida_kkr.parsers.kkrimp

# -*- coding: utf-8 -*-
"""
Parser for the KKR-impurity Code.
The parser should never fail, but it should catch
all errors and warnings and show them to the user.
"""

from __future__ import absolute_import
import tarfile
import os
from aiida import __version__ as aiida_core_version
from aiida.orm import Dict
from aiida.common.exceptions import InputValidationError, NotExistent
from aiida.parsers.parser import Parser
from aiida_kkr.calculations.kkrimp import KkrimpCalculation
from aiida_kkr.tools.context import open_files_in_context
from masci_tools.io.parsers.kkrparser_functions import check_error_category
from masci_tools.io.parsers.kkrimp_parser_functions import KkrimpParserFunctions
from six.moves import range
from pprint import pprint
from contextlib import ExitStack

__copyright__ = (u'Copyright (c), 2018, Forschungszentrum Jülich GmbH, '
                 'IAS-1/PGI-1, Germany. All rights reserved.')
__license__ = 'MIT license, see LICENSE.txt file'
__version__ = '0.5.0'
__contributors__ = ('Philipp Rüßmann')


[docs]class KkrimpParser(Parser): """ Parser class for parsing output of the KKRimp code.. """
[docs] def __init__(self, calc): """ Initialize the instance of KkrimpParser """ self._ParserVersion = __version__ # reuse init of base class super(KkrimpParser, self).__init__(calc)
# pylint: disable=protected-access
[docs] def parse(self, debug=False, **kwargs): """ Parse output data folder, store results in database. :param retrieved: a dictionary of retrieved nodes, where the key is the link name """ success = False node_list = () # Check that the retrieved folder is there try: out_folder = self.retrieved except NotExistent: return self.exit_codes.ERROR_NO_RETRIEVED_FOLDER # check what is inside the folder list_of_files = out_folder.list_object_names() file_errors = [] files = {} # Parse output files of KKRimp calculation # first get path to files and catch errors if files are not present # append tupels (error_category, error_message) where error_category is # 1: critical error, always leads to failing of calculation # 2: warning, is inspected and checked for consistency with read-in # out_dict values (e.g. nspin, newsosol, ...) # first go through list of critical files that should always be there critical_files = [ ('outfile', KkrimpCalculation._DEFAULT_OUTPUT_FILE), ('out_log', KkrimpCalculation._OUTPUT_000), ('out_pot', KkrimpCalculation._OUT_POTENTIAL), ('out_timing', KkrimpCalculation._OUT_TIMING_000), ('out_enersp_at', KkrimpCalculation._OUT_ENERGYSP_PER_ATOM), ('out_enertot_at', KkrimpCalculation._OUT_ENERGYTOT_PER_ATOM), ] for keyname, fname in critical_files: self._check_file_existance(files, keyname, fname, 1, file_errors) additional_files = [ ('kkrflex_llyfac', KkrimpCalculation._KKRFLEX_LLYFAC), ('kkrflex_angles', KkrimpCalculation._KKRFLEX_ANGLE), ('out_spinmoms', KkrimpCalculation._OUT_MAGNETICMOMENTS), ('out_orbmoms', KkrimpCalculation._OUT_ORBITALMOMENTS), ] for keyname, fname in additional_files: self._check_file_existance(files, keyname, fname, 2, file_errors) if debug: pprint(files) # now parse file output out_dict = { 'parser_version': self._ParserVersion, 'calculation_plugin_version': KkrimpCalculation._CALCULATION_PLUGIN_VERSION } # open all files in context managers with ExitStack() as stack: # open files only if they exist # openend files are added to the context manager stack # so that at the end of the parsing all files are closed # when the context manager is exited fhandles = open_files_in_context(stack, out_folder, *files.values()) named_file_handles = dict([[key, fhandles[ik]] for ik, key in enumerate(files.keys())]) # now we can parse the output files success, msg_list, out_dict = KkrimpParserFunctions().parse_kkrimp_outputfile( out_dict, named_file_handles, debug=debug ) out_dict['parser_errors'] = msg_list # add file open errors to parser output of error messages for (err_cat, f_err) in file_errors: if err_cat == 1: msg_list.append(f_err) elif check_error_category(err_cat, f_err, out_dict): msg_list.append(f_err) else: if 'parser_warnings' not in list(out_dict.keys()): out_dict['parser_warnings'] = [] out_dict['parser_warnings'].append(f_err.replace('Error', 'Warning')) out_dict['parser_errors'] = msg_list # create output node and link self.out('output_parameters', Dict(dict=out_dict)) # cleanup after parsing (only if parsing was successful), only works below aiida-core v2.0 if success: if int(aiida_core_version.split('.')[0]) < 2: # check if we should do the cleanup or not cleanup_outfiles = False if 'cleanup_outfiles' in self.node.inputs: cleanup_outfiles = self.node.inputs.cleanup_outfiles.value if cleanup_outfiles: # reduce size of timing file self.cleanup_outfiles(files['out_timing'], ['Iteration number', 'time until scf starts']) # reduce size of out_log file self.cleanup_outfiles(files['out_log'], ['Iteration Number']) # delete completely parsed output files and create a tar ball to reduce size self.remove_unnecessary_files() self.final_cleanup() else: return self.exit_codes.ERROR_PARSING_KKRIMPCALC
[docs] def _check_file_existance(self, files, keyname, fname, icrit, file_errors): """Check if file called `fname` exists and then add it to the `files` dict with a given `keyname`. The icrit index determines how critical it is if a file is not found (1=critical error, 2=only a warning). """ if fname in self.retrieved.list_object_names(): files[keyname] = fname else: # file not if icrit == 1: crit_level = 'Critical file error!' elif icrit == 2: crit_level = 'Warning!' else: raise ValueError('icrit should be either 1 or 2') file_errors.append((icrit, crit_level + f" File '{fname}' not found.")) files[keyname] = None
[docs] def cleanup_outfiles(self, fileidentifier, keyslist): """open file and remove unneeded output""" if fileidentifier is not None: lineids = [] with self.retrieved.open(fileidentifier) as tfile: txt = tfile.readlines() for iline in range(len(txt)): for key in keyslist: # go through all keys if key in txt[iline]: # add line id to list if key has been found lineids.append(iline) # rewrite file deleting the middle part if len(lineids) > 1: # cut only if more than one iteration was found txt = txt[:lineids[0]] + \ ['# ... [removed output except for last iteration] ...\n'] + \ txt[lineids[-1]:] with self.retrieved.open(fileidentifier, 'w') as tfilenew: tfilenew.writelines(txt)
[docs] def remove_unnecessary_files(self): """ Remove files that are not needed anymore after parsing The information is completely parsed (i.e. in outdict of calculation) and keeping the file would just be a duplication. """ # first delete unused files (completely in parsed output) files_to_delete = [ KkrimpCalculation._OUT_ENERGYSP_PER_ATOM, KkrimpCalculation._OUT_ENERGYTOT_PER_ATOM, KkrimpCalculation._SHAPEFUN ] for fileid in files_to_delete: if fileid in self.retrieved.list_object_names(): self.retrieved.delete_object(fileid, force=True)
[docs] def final_cleanup(self): """Create a tarball of the rest.""" # short name for retrieved folder ret = self.retrieved # Now create tarball of output # # check if output has been packed to tarfile already # only if tarfile is not there we create the output tar file if KkrimpCalculation._FILENAME_TAR not in ret.list_object_names(): # first create dummy file which is used to extract the full path that is given to tarfile.open with ret.open(KkrimpCalculation._FILENAME_TAR, 'w') as f: filepath_tar = f.name # now create tarfile and loop over content of retrieved directory to_delete = [] with tarfile.open(filepath_tar, 'w:gz') as tf: for f in ret.list_object_names(): with ret.open(f) as ftest: filesize = os.stat(ftest.name).st_size ffull = ftest.name if ( f != KkrimpCalculation._FILENAME_TAR # ignore tar file and filesize > 0 # ignore empty files # ignore files starting with '.' like '.nfs...' and f[0] != '.' ): tf.add(ffull, arcname=os.path.basename(ffull)) to_delete.append(f) # finally delete files that have been added to tarfile for f in to_delete: ret.delete_object(f, force=True)