# -*- coding: utf-8 -*-
"""
A bunch of classes and functions that deal with TNT configuration files and
directives.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import six
import collections
import io
import os
import pprint
import re
import string
import sys
from bronx.compat.moves import collections_abc
from bronx.fancies import loggers
from bronx.syntax.decorators import secure_getattr
from .namadapter import BronxNamelistAdapter
tntlog = loggers.getLogger('tntlog')
# Detect TNT's configuration files directory
TPL_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'templates')
# EXCEPTIONS
#
[docs]class TntDirectiveError(Exception):
"""Any exception raised by a TNT configuration class should derive form this one."""
pass
[docs]class TntDirectiveUnkownError(TntDirectiveError):
"""The TNT directive is unknown."""
def __init__(self, name):
errmsg = 'The "{:s}" directive is not allowed with TNT.'.format(name)
super(TntDirectiveUnkownError, self).__init__(errmsg)
[docs]class TntDirectiveValueError(TntDirectiveError):
"""An inappropriate value was given as a TNT directive."""
def __init__(self, name, value):
errmsg = '"{!s}" is not an appropriate value for the "{:s}" directive.'.format(value, name)
super(TntDirectiveValueError, self).__init__(errmsg)
[docs]class TntStackDirectiveError(TntDirectiveError):
"""Syntax error in the TNT stack directive file."""
pass
# TNT directives part
#
[docs]class TntDirective(object):
"""Class that holds all the necessary informations about TNT directives.
It is in charge of checking the correctness of all the provided attributes.
"""
_ALLOWED_DIRECTIVES = set(('keys_to_remove', 'keys_to_set', 'keys_to_move',
'blocks_to_move', 'blocks_to_remove', 'new_blocks',
'macros', 'namdelta'))
def _check_keytuple(self, val, theexc, unique=False):
"""Verify if *val* is some kind of a (block, key) tuple description."""
keylist = list()
# Reformat dictionaries, iterables, ...
if isinstance(val, collections_abc.Mapping):
for b, v in val.items():
if isinstance(v, collections_abc.Iterable) and not isinstance(v, six.string_types):
keylist.extend([(b, kk) for kk in v])
else:
keylist.append((b, v), )
elif isinstance(val, collections_abc.Iterable):
if all([isinstance(v, collections_abc.Iterable) and not isinstance(v, six.string_types)
for v in val]):
keylist.extend(val)
else:
keylist.append(tuple(val))
else:
raise theexc
# Final check...
if not all([len(k) == 2 and
isinstance(k[0], six.string_types) and
isinstance(k[1], six.string_types)
for k in keylist]):
raise theexc
# Unique element wanted ?
if unique:
if len(keylist) > 1:
raise theexc
return keylist[0]
else:
return set(keylist)
def _process_keys_to_remove(self, val):
myexc = TntDirectiveValueError('keys_to_remove', val)
return self._check_keytuple(val, myexc)
def _process_keys_to_set(self, val, theexc=None):
kdict = dict()
myexc = theexc or TntDirectiveValueError('keys_to_set', val)
if isinstance(val, collections_abc.Mapping):
for k, v in val.items():
if isinstance(v, collections_abc.Mapping):
for kk, vv in v.items():
kdict[self._check_keytuple((k, kk), myexc, unique=True)] = vv
elif isinstance(k, collections_abc.Iterable):
kdict[self._check_keytuple(k, myexc, unique=True)] = v
else:
raise myexc
else:
raise myexc
return kdict
def _process_keys_to_move(self, val):
kdict = dict()
myexc = TntDirectiveValueError('keys_to_move', val)
keystructure = self._process_keys_to_set(val, theexc=myexc)
for k, v in keystructure.items():
kdict[k] = self._check_keytuple(v, myexc, unique=True)
return kdict
def _process_blocks_to_move(self, val):
if not (isinstance(val, collections_abc.Mapping) and
all([isinstance(k, six.string_types) and isinstance(v, six.string_types)
for k, v in val.items()])):
raise TntDirectiveValueError('blocks_to_move', val)
return {k: v for k, v in val.items()}
def _process_set_of_blocks(self, val, realname):
if (isinstance(val, collections_abc.Iterable) and
not isinstance(val, six.string_types) and
all([isinstance(v, six.string_types) for v in val])):
return set(val)
elif isinstance(val, six.string_types):
return set([val, ])
else:
raise TntDirectiveValueError(realname, val)
def _process_blocks_to_remove(self, val):
return self._process_set_of_blocks(val, 'blocks_to_remove')
def _process_new_blocks(self, val):
return self._process_set_of_blocks(val, 'new_blocks')
def _process_macros(self, val):
if not (isinstance(val, collections_abc.Mapping) and
all([isinstance(k, six.string_types) for k in val.keys()])):
raise TntDirectiveValueError('macros', val)
return {k: v for k, v in val.items()}
def _process_namdelta(self, val):
if not (isinstance(val, six.string_types)):
raise TntDirectiveValueError('namdelta', val)
return val
def __init__(self, **kwargs):
self._internals = dict()
# Is the directive allowed ?
for k, v in kwargs.items():
if k not in self._ALLOWED_DIRECTIVES:
raise TntDirectiveUnkownError(k)
self._internals[k] = getattr(self, '_process_{:s}'.format(k))(v)
@secure_getattr
def __getattr__(self, item):
if item not in self._ALLOWED_DIRECTIVES:
raise TntDirectiveUnkownError(item)
else:
return self._internals.get(item, None)
[docs]def read_directives(filename):
"""
Read TNT directives in an external file (**filename**).
For a template of directives, call function *write_directives_template()*.
"""
if os.path.splitext(filename)[1] in ('.yaml', '.yml'):
import yaml
with io.open(filename, 'r') as yamlfh:
return TntDirective(**yaml.load(yamlfh, Loader=yaml.SafeLoader))
else:
prev_bytecode_flag = sys.dont_write_bytecode
try:
sys.dont_write_bytecode = True
if sys.version_info.major == 3 and sys.version_info.minor >= 4:
from importlib.machinery import SourceFileLoader # @UnresolvedImport
import importlib.util as imputil # @UnresolvedImport
spec = imputil.spec_from_loader('thenamelisttool.tnt_raw_configuration_module',
SourceFileLoader('thenamelisttool.tnt_raw_configuration_module',
os.path.abspath(filename)))
m = imputil.module_from_spec(spec)
spec.loader.exec_module(m)
else:
import imp # @UnresolvedImport
m = imp.load_source(filename, os.path.abspath(filename))
finally:
sys.dont_write_bytecode = prev_bytecode_flag
return TntDirective(**{k: v for k, v in m.__dict__.items() if not k.startswith('_')})
# TNTstack directives part
#
[docs]class TntStackDirective(object):
"""Class that holds all the necessary informations about TNTstack directives.
:param str basedir: The name of the directory where the TNTstack reqests lies
:param list[dict] todolist: The list of actions to be performed
:param dict[dict] directives: A list of TNT directives
"""
def __init__(self, basedir, todolist, directives=None):
self._basedir = basedir
self._directives = dict()
self._todolist = list()
self._directives_init(directives or dict())
self._todolist_init(todolist)
def _directives_init(self, directives):
"""
Read the *directives* attribute and create :class:`TntDirective` objects
from that.
"""
if not (isinstance(directives, collections_abc.Mapping) and
all([isinstance(v, collections_abc.Mapping) for v in directives.values()])):
raise TntStackDirectiveError('The directives argument must be a mapping of mappings')
for k, v in directives.items():
if 'external' in v:
newdir = read_directives(os.path.join(self._basedir, v['external']))
else:
newdir = TntDirective(**v)
self._directives[k] = newdir
def _checkdict(self, action, values, attr, str_or_list=False):
"""Check that the *values* dictionary has a valid *attr* item.
:param str action: The name of the current action (used to print meaningful
error messages).
:param bool str_or_list: It *True*, the *attr* item can be either a string
or a list of strings. (otherwise, it has to be a string).
"""
stuff = values.get(attr, None)
if stuff is None:
tntlog.error("Error while processing todo list item:\n%s", values)
raise TntStackDirectiveError('The "{:s}" entry is mandatory with the "{:s}" action'
.format(attr, action))
if isinstance(stuff, six.string_types) or not isinstance(stuff, collections_abc.Iterable):
stuff = [stuff, ]
else:
stuff = [v for v in stuff]
if str_or_list:
return stuff
else:
if len(stuff) != 1:
tntlog.error("Error while processing todo list item:\n%s", values)
raise TntStackDirectiveError('The "{:s}" entry requires a unique element ("{:s}" action).'
.format(attr, action))
return stuff[0]
def _todolist_init(self, todolist):
"""Read the *todolist* attribute and check that everything is ok."""
if not (isinstance(todolist, collections_abc.Iterable) and
all([isinstance(v, collections_abc.Mapping) for v in todolist])):
raise TntStackDirectiveError('The todolist argument must be an iterable of mappings')
for todo in todolist:
action = todo.get('action', None)
action_d = dict(action=action)
if action == 'tnt':
action_d['namelist'] = self._checkdict(action, todo, 'namelist', str_or_list=True)
action_d['directive'] = self._checkdict(action, todo, 'directive', str_or_list=True)
elif action == 'create':
action_d['target'] = self._checkdict(action, todo, 'target')
if 'external' in todo:
action_d['external'] = os.path.join(self._basedir,
self._checkdict(action, todo, 'external'))
if not os.path.isfile(action_d['external']):
raise TntStackDirectiveError('The "{:s}" does not exists.'
.format(action_d['external']))
elif 'copy' in todo:
action_d['copy'] = self._checkdict(action, todo, 'copy')
else:
action_d['namelist'] = self._checkdict(action, todo, 'namelist')
action_d['directive'] = self._checkdict(action, todo, 'directive', str_or_list=True)
elif action in ('delete', 'touch'):
action_d['namelist'] = self._checkdict(action, todo, 'namelist', str_or_list=True)
elif action in ('link', 'move'):
action_d['target'] = self._checkdict(action, todo, 'target')
action_d['namelist'] = self._checkdict(action, todo, 'namelist')
elif action == 'clean_untouched':
pass
else:
raise TntStackDirectiveError('Unknown action "{!s}" requested in the todolist'.format(action))
# Check the directive entry against existing directives
if 'directive' in action_d:
tocheck = ([action_d['directive'], ]
if isinstance(action_d['directive'], six.string_types)
else action_d['directive'])
for a_dir in tocheck:
if a_dir not in self.directives:
raise TntStackDirectiveError('The "{:s}" directive is not defined.'.format(a_dir))
self._todolist.append(action_d)
@property
def directives(self):
"""The dictionary of TNT directives (as :class:`TntDirective` objects)."""
return self._directives
@property
def todolist(self):
"""The todo's list (as a list of dictionaries)."""
return self._todolist
[docs]class TntRecipeSyntaxError(ValueError):
"""Raised when a syntax error is detected in the recipe file."""
pass
[docs]class TntRecipe(object):
"""
A YAML Recipe reader, that collects namelists and possibly filter them,
in sight of finally merging them.
"""
_ingredient_name_re = re.compile(r'(?P<nam>.+?)(?:/(?P<filter>(?:-|\+)))?$')
_ingredient_item_re = re.compile(r'(?P<block>[^/]+)/(?P<filter>(?:-|\+))$')
def __init__(self, recipe_filename, sourcenam_directory=None):
"""
:param recipe_filename: filepath to the YAML recipe
:param sourcenam_directory: an optional external directory in which to
pick the ingredient namelists
"""
self.sourcenam_directory = sourcenam_directory
self._load_recipe(recipe_filename)
def _throw_syntax_err(self, entry, wholeentry, msg):
"""Deal with a syntax error."""
tntlog.critical("Syntax error in the '%s' entry: %s.", entry, msg)
if wholeentry is not None:
tntlog.critical("The '%s' entry content is:\n%s.", entry,
pprint.pformat(wholeentry, indent=2))
raise TntRecipeSyntaxError('Syntax error in the {:s} entry of the Recipe file.'
.format(entry))
def _read_init_final_elements(self, what, ingredient):
"""Read '__initial__' or '__final__' step **ingredient**."""
if ingredient is not None:
if isinstance(ingredient, six.string_types):
# external namelist
if self.sourcenam_directory:
ingredient = os.path.join(self.sourcenam_directory, ingredient)
nam = BronxNamelistAdapter(ingredient, macros=self.macros)
elif isinstance(ingredient, dict):
# internal dict/yaml namelist
nam = BronxNamelistAdapter(six.StringIO(), macros=self.macros)
nam.add_blocks(list(ingredient.keys()))
keys_to_add = {}
for b, kv in ingredient.items():
if kv is not None:
if isinstance(kv, dict):
for k, v in ingredient[b].items():
keys_to_add[(b, k)] = v
else:
self._throw_syntax_err(what, ingredient,
"'{!s}': should be 'null' or a dictionary"
.format(b))
nam.add_keys(keys_to_add)
else:
self._throw_syntax_err(what, ingredient,
"Should be 'null', a string or a dictionary")
else:
nam = BronxNamelistAdapter(six.StringIO(), macros=self.macros)
return nam
def _process_ingredient(self, input_nam, blocks):
"""Preprocess ingredient: read according namelist and filter it."""
# Check input_nam
input_nam_m = self._ingredient_name_re.match(input_nam)
if not input_nam_m:
self._throw_syntax_err(input_nam, None, "Invalid ingredient name.")
# read namelist
input_nam_filename = input_nam_m.group('nam')
if self.sourcenam_directory:
input_nam_filename = os.path.join(self.sourcenam_directory,
input_nam_filename)
ingredient = BronxNamelistAdapter(input_nam_filename,
macros=self.macros)
# prepare filtering elements
blocks_filter = collections.defaultdict(list)
keys_filter = collections.defaultdict(dict)
# Process the various blocks provided by the user
if isinstance(blocks, (list, tuple, set)):
for b in blocks:
if isinstance(b, six.string_types):
blocks_filter[input_nam_m.group('filter')].append(b)
if isinstance(b, dict) and len(b) == 1:
bname, blist = list(b.items())[0]
bname_m = self._ingredient_item_re.match(bname)
if bname_m:
if not isinstance(blist, (list, tuple, set)):
self._throw_syntax_err(input_nam, blocks,
"'{:s}' is not a list".format(blist))
keys_filter[bname_m.group('filter')][bname_m.group('block')] = set(blist)
else:
self._throw_syntax_err(input_nam, blocks,
"invalid namelist block definition: '{!s}'"
.format(bname))
if input_nam_m.group('filter') == '+': # Include only case
authorized_blocks = set(blocks_filter['+'])
for a_keys_filter in keys_filter.values():
authorized_blocks.update(a_keys_filter.keys())
blocks_filter['-'] = list(set(ingredient) - authorized_blocks)
elif isinstance(blocks, six.string_types) and blocks == '__all__':
pass # Ok, includes everything
else:
self._throw_syntax_err(input_nam, blocks, 'invalid ingredient description')
# then filter
# blocks that are not requested
ingredient.remove_blocks(blocks_filter['-'])
# keys that are excluded
for b, keys in keys_filter['-'].items():
ingredient.remove_keys([(b, k) for k in keys])
# keys that are not requested
for b, keys in keys_filter['+'].items():
to_remove = [(b, k) for k in ingredient[b] if k not in keys]
ingredient.remove_keys(to_remove)
return ingredient
def _load_recipe(self, recipe_filename):
"""Read YAML file and preprocess ingredients."""
from bronx.datagrip.misc import load_ordered_yaml
# load yaml
recipe = load_ordered_yaml(recipe_filename)
if not isinstance(recipe, collections.OrderedDict):
raise TntRecipeSyntaxError('The recipe must be a dictionary.')
# specific cases: initialization, finalization, macros
self.macros = recipe.pop('__macros__', {})
initial = self._read_init_final_elements('__initial__',
recipe.pop('__initial__', None))
final = self._read_init_final_elements('__final__',
recipe.pop('__final__', None))
self.ingredients = []
# convert each ingredient into a namelist to be merged
for input_nam, blocks in recipe.items():
self.ingredients.append(self._process_ingredient(input_nam, blocks))
self.ingredients.insert(0, initial)
self.ingredients.append(final)
# Utility function that deals with template files
#
[docs]def get_template(tplname, encoding=None):
"""Retrieve a template file in the dedicated directory."""
tplfile = os.path.join(TPL_DIRECTORY, tplname)
with io.open(tplfile, 'r', encoding=encoding) as fhtpl:
tpl = string.Template(fhtpl.read())
return tpl
[docs]def write_directives_template(out=sys.stdout, tplname='tnt-directive.tpl.py'):
"""Write out a directives template."""
if isinstance(out, six.string_types):
outfh = io.open(out, 'w')
else:
outfh = out
outtpl = os.path.join(TPL_DIRECTORY, tplname)
try:
with io.open(outtpl, 'r') as tplfh:
for line in tplfh:
outfh.write(line)
finally:
if isinstance(out, six.string_types):
outfh.close()