Source code for thenamelisttool.util

"""
Utility methods widely used in the various TNT utilities.
"""

import contextlib
import glob
import logging
import os
import shutil

from bronx.fancies import loggers
from bronx.fancies.colors import termcolors
from .namadapter import BronxNamelistAdapter, NO_SORTING, FIRST_ORDER_SORTING, SECOND_ORDER_SORTING
from .config import TntRecipe

tntlog = loggers.getLogger('tntlog')
tntstacklog = loggers.getLogger('tntstacklog')


[docs]@contextlib.contextmanager def set_verbose(verbose, filename): """Set or reset the verbosity level.""" new_lformat = logging.Formatter( fmt=('# [%(name)s@{:s}][%(funcName)s:%(lineno)04d][%(levelname)s]: %(message)s' .format(os.path.basename(filename))) ) old_lformat = loggers.default_console.formatter old_tntstack_level = tntstacklog.level old_tnt_level = tntlog.level loggers.default_console.setFormatter(new_lformat) tntstacklog.setLevel('INFO') tntlog.setLevel('INFO' if verbose else 'WARNING') try: yield finally: tntstacklog.setLevel(old_tntstack_level) tntlog.setLevel(old_tnt_level) loggers.default_console.setFormatter(old_lformat)
[docs]def process_namelist(filename, directives, # options sorting=NO_SORTING, blocks_ref=None, in_place=False, outfilename=None, doctor=False, keep_index=False, squeeze=False): """ For the syntax of keys & blocks arguments, please refer to the according functions. The order of processing is that of the arguments. As movings are done first, check consistency. If **macros** is not None, it can contain the macros a.k.a. values to be replaced, e.g.: {'NPROC':8, 'substrA':None} will replace all NPROC values by 8 and will let substrA untouched. Other options: :param in_place: if True, the namelist is written back in the same file; else (default), the target namelist is suffixed with '.tnt' if not given as **outfilename**. :param outfilename: target file for out namelist :param sorting: Sorting option (from bronx.datagrip.namelist): NO_SORTING; FIRST_ORDER_SORTING => sort all keys within blocks; SECOND_ORDER_SORTING => sort only within indexes or attributes of the same key, within blocks. :param blocks_ref: if not None, defines the path for a reference namelist to which the set of blocks is asserted to be equal. :param doctor: if True, try to convert value to DOCTOR norm according type for moved keys :param keep_index: if True, moved keys in identical block keep the original index of key in block (except a sorting is requested later on. :param squeeze: squeeze the namelist: remove empty blocks. """ if not isinstance(directives, (list, tuple)): directives = [directives, ] # The initial namelist initial_nam = BronxNamelistAdapter(filename, macros=directives[0].macros) # Target namelist file if not in_place: target_namfile = outfilename or filename + '.tnt' else: target_namfile = filename if outfilename is not None: raise ValueError("Incompatibility between arguments *outfilename* and *in_place*.") for d in directives: # process (in the appropriate order !) if d.new_blocks is not None: initial_nam.add_blocks(d.new_blocks) if d.blocks_to_move is not None: initial_nam.move_blocks(d.blocks_to_move) if d.keys_to_move is not None: initial_nam.move_keys(d.keys_to_move, doctor=doctor, keep_index=keep_index) if d.keys_to_remove is not None: initial_nam.remove_keys(d.keys_to_remove) if d.keys_to_set is not None: initial_nam.add_keys(d.keys_to_set) if d.blocks_to_remove is not None: initial_nam.remove_blocks(d.blocks_to_remove) if d.namdelta is not None: try: ndelta = BronxNamelistAdapter(d.namdelta, macros=d.macros) except ValueError: tntlog.error("Error while parsing the following namelist's delta:\n%s", d.namdelta) raise initial_nam.merge(ndelta) if squeeze: initial_nam.squeeze() if blocks_ref is not None: cb = initial_nam.check_blocks(blocks_ref, directives[0].macros) if len(cb) != 0: tntlog.warning('Set of blocks is different from reference: ' + blocks_ref) tntlog.warning('diff: ' + str(cb)) with open(target_namfile, 'w', encoding='ascii') as fh_namout: fh_namout.write(initial_nam.dumps(sorting=sorting))
[docs]def process_tnt_stack(directive, sorting=SECOND_ORDER_SORTING): """Apply *directive* to the current working directory. :param TntStackDirective directive: The tntstack directive to apply :param sorting: Sorting option (from bronx.datagrip.namelist): NO_SORTING; FIRST_ORDER_SORTING => sort all keys within blocks; SECOND_ORDER_SORTING => sort only within indexes or attributes of the same key, within blocks. """ # Record the list of file contained in the directory initial_files = set() initial_subdirectories = set() for root, directories, files in os.walk('.'): for f in files: initial_files.add(os.path.normpath(os.path.join(root, f))) for d in directories: initial_subdirectories.add(os.path.normpath(os.path.join(root, d))) # Process the todolist for todo in directive.todolist: action = todo['action'] if action == 'tnt': for nam in todo['namelist']: for realnam in glob.glob(nam): tntstacklog.info("Namelist '%s': applying the following directives: %s", realnam, ",".join(todo['directive'])) process_namelist(realnam, [directive.directives[name] for name in todo['directive']], in_place=True, sorting=sorting) initial_files.discard(os.path.normpath(realnam)) elif action == 'create': if 'external' in todo: tntstacklog.info("Creating namelist '%s' from external file '%s'", todo['target'], todo['external']) shutil.copy(todo['external'], todo['target']) elif 'copy' in todo: tntstacklog.info("Creating namelist '%s' from file '%s'", todo['target'], todo['copy']) shutil.copy(todo['copy'], todo['target']) else: tntstacklog.info("Creating namelist '%s' from namelist '%s' by applying the following directives: %s", todo['target'], todo['namelist'], ",".join(todo['directive'])) process_namelist(todo['namelist'], [directive.directives[name] for name in todo['directive']], outfilename=todo['target'], sorting=sorting) initial_files.discard(os.path.normpath(todo['target'])) elif action in ('delete', 'touch'): for nam in todo['namelist']: for realnam in glob.glob(nam): if action == 'delete': tntstacklog.info("Deleting namelist '%s'", realnam) os.unlink(realnam) elif action == 'touch': tntstacklog.info("Marking file '%s' as touched'", realnam) initial_files.discard(os.path.normpath(realnam)) elif action == 'link': tntstacklog.info("Linking '%s' -> '%s'", todo['target'], todo['namelist']) os.symlink(todo['namelist'], todo['target']) initial_files.discard(os.path.normpath(todo['namelist'])) elif action == 'move': tntstacklog.info("Moving '%s' to '%s'", todo['namelist'], todo['target']) shutil.move(todo['namelist'], todo['target']) initial_files.discard(os.path.normpath(todo['namelist'])) initial_files.discard(os.path.normpath(todo['target'])) elif action == 'clean_untouched': for f in initial_files: tntstacklog.info("Deleting file '%s'", f) os.unlink(f) for d in [d for d in initial_subdirectories if not os.listdir(d)]: # Remove empty directories tntstacklog.info("Deleting empty directory '%s'", d) os.rmdir(d)
[docs]def namelist_read_and_sort(namfile): """Read a namelist and return it as a sorted string.""" try: namp = BronxNamelistAdapter(namfile) except (ValueError, OSError): tntlog.error("Something went wrong will reading: %s", namfile) raise if not len(namp): raise ValueError('Nothing to read in "{:s}": Is it a namelist ?'.format(namfile)) return namp.dumps(sorting=FIRST_ORDER_SORTING)
def _check_diffline(line, expected): return line and len(line) >= 2 and line[0] == expected and line[1] == ' ' def _color_diffline(prevline, line=''): """Colorise an individual line of a difflib output.""" if _check_diffline(prevline, '-') and _check_diffline(line, '?'): return termcolors.error(prevline) elif _check_diffline(prevline, '+') and _check_diffline(line, '?'): return termcolors.error(prevline) elif _check_diffline(prevline, '-'): return termcolors.critical(prevline) elif _check_diffline(prevline, '+'): return termcolors.okgreen(prevline) elif _check_diffline(prevline, '?'): return termcolors.error(prevline) else: return prevline
[docs]def colorise_diff(lines): """Colorise the lines of any kind of difflib outputs.""" newdiff = list() prevline = None for line in lines: if prevline: newdiff.append(_color_diffline(prevline, line)) prevline = line newdiff.append(_color_diffline(prevline)) return newdiff
[docs]def compose_namelist(recipe_filename, sourcenam_directory=None, suffix='.nam', sorting=NO_SORTING, squeeze=False, fhoutput=None): """ Compose a namelist from a **recipe_filename**. For the syntax of recipe, see template recipe. :param recipe_filename: The name of the recipe/directive YAML file :param sourcenam_directory: path to directory in which to look for source namelists :param suffix: suffix to add to generated namelist: output name is ./recipe_basename[suffix] :param sorting: Sorting option (from bronx.datagrip.namelist): NO_SORTING; FIRST_ORDER_SORTING => sort all keys within blocks; SECOND_ORDER_SORTING => sort only within indexes or attributes of the same key, within blocks. :param squeeze: squeeze the namelist: remove empty blocks. :param fhoutput: a file object where the result is written (if None, a new file named `basename(recipe_filename)` is created). """ # read recipe = TntRecipe(recipe_filename, sourcenam_directory=sourcenam_directory) # merge nam = recipe.ingredients[0] for ingredient in recipe.ingredients[1:]: nam.merge(ingredient) if squeeze: nam.squeeze() # write if fhoutput is None: namelistname = os.path.basename(recipe_filename.replace('.yaml', suffix)) with open(namelistname, 'w', encoding='ascii') as fh_namout: fh_namout.write(nam.dumps(sorting=sorting)) else: fhoutput.write(nam.dumps(sorting=sorting))