# -*- coding: utf-8 -*-
"""
Utility methods widely used in the various TNT utilities.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import contextlib
import glob
import io
import logging
import os
import shutil
from bronx.fancies import loggers
from bronx.fancies.colors import termcolors
from .namadapter import BronxNamelistAdapter, NO_SORTING, FIRST_ORDER_SORTING, SECOND_ORDER_SORTING
from .config import TntRecipe
tntlog = loggers.getLogger('tntlog')
tntstacklog = loggers.getLogger('tntstacklog')
[docs]@contextlib.contextmanager
def set_verbose(verbose, filename):
"""Set or reset the verbosity level."""
new_lformat = logging.Formatter(
fmt=('# [%(name)s@{:s}][%(funcName)s:%(lineno)04d][%(levelname)s]: %(message)s'
.format(os.path.basename(filename)))
)
old_lformat = loggers.default_console.formatter
old_tntstack_level = tntstacklog.level
old_tnt_level = tntlog.level
loggers.default_console.setFormatter(new_lformat)
tntstacklog.setLevel('INFO')
tntlog.setLevel('INFO' if verbose else 'WARNING')
try:
yield
finally:
tntstacklog.setLevel(old_tntstack_level)
tntlog.setLevel(old_tnt_level)
loggers.default_console.setFormatter(old_lformat)
[docs]def process_namelist(filename, directives,
# options
sorting=NO_SORTING,
blocks_ref=None,
in_place=False,
outfilename=None,
doctor=False,
keep_index=False,
squeeze=False):
"""
For the syntax of keys & blocks arguments, please refer to the according
functions.
The order of processing is that of the arguments. As movings are done first,
check consistency.
If **macros** is not None, it can contain the macros a.k.a. values to be
replaced, e.g.: {'NPROC':8, 'substrA':None} will replace all NPROC values
by 8 and will let substrA untouched.
Other options:
:param in_place: if True, the namelist is written back in the same file;
else (default), the target namelist is suffixed with '.tnt'
if not given as **outfilename**.
:param outfilename: target file for out namelist
:param sorting: Sorting option (from bronx.datagrip.namelist):
NO_SORTING;
FIRST_ORDER_SORTING => sort all keys within blocks;
SECOND_ORDER_SORTING => sort only within indexes or
attributes of the same key, within blocks.
:param blocks_ref: if not None, defines the path for a reference namelist to
which the set of blocks is asserted to be equal.
:param doctor: if True, try to convert value to DOCTOR norm according type
for moved keys
:param keep_index: if True, moved keys in identical block keep the original
index of key in block (except a sorting is requested
later on.
:param squeeze: squeeze the namelist: remove empty blocks.
"""
if not isinstance(directives, (list, tuple)):
directives = [directives, ]
# The initial namelist
initial_nam = BronxNamelistAdapter(filename, macros=directives[0].macros)
# Target namelist file
if not in_place:
target_namfile = outfilename or filename + '.tnt'
else:
target_namfile = filename
if outfilename is not None:
raise ValueError("Incompatibility between arguments *outfilename* and *in_place*.")
for d in directives:
# process (in the appropriate order !)
if d.new_blocks is not None:
initial_nam.add_blocks(d.new_blocks)
if d.blocks_to_move is not None:
initial_nam.move_blocks(d.blocks_to_move)
if d.keys_to_move is not None:
initial_nam.move_keys(d.keys_to_move,
doctor=doctor, keep_index=keep_index)
if d.keys_to_remove is not None:
initial_nam.remove_keys(d.keys_to_remove)
if d.keys_to_set is not None:
initial_nam.add_keys(d.keys_to_set)
if d.blocks_to_remove is not None:
initial_nam.remove_blocks(d.blocks_to_remove)
if d.namdelta is not None:
try:
ndelta = BronxNamelistAdapter(d.namdelta, macros=d.macros)
except ValueError:
tntlog.error("Error while parsing the following namelist's delta:\n%s",
d.namdelta)
raise
initial_nam.merge(ndelta)
if squeeze:
initial_nam.squeeze()
if blocks_ref is not None:
cb = initial_nam.check_blocks(blocks_ref, directives[0].macros)
if len(cb) != 0:
tntlog.warning('Set of blocks is different from reference: ' + blocks_ref)
tntlog.warning('diff: ' + str(cb))
with io.open(target_namfile, 'w', encoding='ascii') as fh_namout:
fh_namout.write(initial_nam.dumps(sorting=sorting))
[docs]def process_tnt_stack(directive, sorting=SECOND_ORDER_SORTING):
"""Apply *directive* to the current working directory.
:param TntStackDirective directive: The tntstack directive to apply
:param sorting: Sorting option (from bronx.datagrip.namelist):
NO_SORTING;
FIRST_ORDER_SORTING => sort all keys within blocks;
SECOND_ORDER_SORTING => sort only within indexes or
attributes of the same key, within blocks.
"""
# Record the list of file contained in the directory
initial_files = set()
initial_subdirectories = set()
for root, directories, files in os.walk('.'):
for f in files:
initial_files.add(os.path.normpath(os.path.join(root, f)))
for d in directories:
initial_subdirectories.add(os.path.normpath(os.path.join(root, d)))
# Process the todolist
for todo in directive.todolist:
action = todo['action']
if action == 'tnt':
for nam in todo['namelist']:
for realnam in glob.glob(nam):
tntstacklog.info("Namelist '%s': applying the following directives: %s",
realnam, ",".join(todo['directive']))
process_namelist(realnam,
[directive.directives[name] for name in todo['directive']],
in_place=True, sorting=sorting)
initial_files.discard(os.path.normpath(realnam))
elif action == 'create':
if 'external' in todo:
tntstacklog.info("Creating namelist '%s' from external file '%s'", todo['target'], todo['external'])
shutil.copy(todo['external'], todo['target'])
elif 'copy' in todo:
tntstacklog.info("Creating namelist '%s' from file '%s'", todo['target'], todo['copy'])
shutil.copy(todo['copy'], todo['target'])
else:
tntstacklog.info("Creating namelist '%s' from namelist '%s' by applying the following directives: %s",
todo['target'], todo['namelist'], ",".join(todo['directive']))
process_namelist(todo['namelist'],
[directive.directives[name] for name in todo['directive']],
outfilename=todo['target'], sorting=sorting)
initial_files.discard(os.path.normpath(todo['target']))
elif action in ('delete', 'touch'):
for nam in todo['namelist']:
for realnam in glob.glob(nam):
if action == 'delete':
tntstacklog.info("Deleting namelist '%s'", realnam)
os.unlink(realnam)
elif action == 'touch':
tntstacklog.info("Marking file '%s' as touched'", realnam)
initial_files.discard(os.path.normpath(realnam))
elif action == 'link':
tntstacklog.info("Linking '%s' -> '%s'", todo['target'], todo['namelist'])
os.symlink(todo['namelist'], todo['target'])
initial_files.discard(os.path.normpath(todo['namelist']))
elif action == 'move':
tntstacklog.info("Moving '%s' to '%s'", todo['namelist'], todo['target'])
shutil.move(todo['namelist'], todo['target'])
initial_files.discard(os.path.normpath(todo['namelist']))
initial_files.discard(os.path.normpath(todo['target']))
elif action == 'clean_untouched':
for f in initial_files:
tntstacklog.info("Deleting file '%s'", f)
os.unlink(f)
for d in [d for d in initial_subdirectories if not os.listdir(d)]:
# Remove empty directories
tntstacklog.info("Deleting empty directory '%s'", d)
os.rmdir(d)
[docs]def namelist_read_and_sort(namfile):
"""Read a namelist and return it as a sorted string."""
try:
namp = BronxNamelistAdapter(namfile)
except (ValueError, IOError):
tntlog.error("Something went wrong will reading: %s", namfile)
raise
if not len(namp):
raise ValueError('Nothing to read in "{:s}": Is it a namelist ?'.format(namfile))
return namp.dumps(sorting=FIRST_ORDER_SORTING)
def _check_diffline(line, expected):
return line and len(line) >= 2 and line[0] == expected and line[1] == ' '
def _color_diffline(prevline, line=''):
"""Colorise an individual line of a difflib output."""
if _check_diffline(prevline, '-') and _check_diffline(line, '?'):
return termcolors.error(prevline)
elif _check_diffline(prevline, '+') and _check_diffline(line, '?'):
return termcolors.error(prevline)
elif _check_diffline(prevline, '-'):
return termcolors.critical(prevline)
elif _check_diffline(prevline, '+'):
return termcolors.okgreen(prevline)
elif _check_diffline(prevline, '?'):
return termcolors.error(prevline)
else:
return prevline
[docs]def colorise_diff(lines):
"""Colorise the lines of any kind of difflib outputs."""
newdiff = list()
prevline = None
for l in lines:
if prevline:
newdiff.append(_color_diffline(prevline, l))
prevline = l
newdiff.append(_color_diffline(prevline))
return newdiff
[docs]def compose_namelist(recipe_filename,
sourcenam_directory=None,
suffix='.nam',
sorting=NO_SORTING,
squeeze=False,
fhoutput=None):
"""
Compose a namelist from a **recipe_filename**. For the syntax of recipe,
see template recipe.
:param recipe_filename: The name of the recipe/directive YAML file
:param sourcenam_directory: path to directory in which to look for source
namelists
:param suffix: suffix to add to generated namelist:
output name is ./recipe_basename[suffix]
:param sorting: Sorting option (from bronx.datagrip.namelist):
NO_SORTING;
FIRST_ORDER_SORTING => sort all keys within blocks;
SECOND_ORDER_SORTING => sort only within indexes or
attributes of the same key, within blocks.
:param squeeze: squeeze the namelist: remove empty blocks.
:param fhoutput: a file object where the result is written (if None, a
new file named `basename(recipe_filename)` is created).
"""
# read
recipe = TntRecipe(recipe_filename, sourcenam_directory=sourcenam_directory)
# merge
nam = recipe.ingredients[0]
for ingredient in recipe.ingredients[1:]:
nam.merge(ingredient)
if squeeze:
nam.squeeze()
# write
if fhoutput is None:
namelistname = os.path.basename(recipe_filename.replace('.yaml', suffix))
with io.open(namelistname, 'w', encoding='ascii') as fh_namout:
fh_namout.write(nam.dumps(sorting=sorting))
else:
fhoutput.write(nam.dumps(sorting=sorting))