Source code for thenamelisttool.namadapter

# -*- coding: utf-8 -*-

"""
Adapter classes for namelist's parsers.

Set of adapter classes that uses external namelist's parsers to provide
functionalities that fit the needs of TNT utilities.
"""

from __future__ import absolute_import, division, print_function, unicode_literals

import abc
import re

import six

from bronx.compat.moves import collections_abc
from bronx.fancies import loggers

tntlog = loggers.getLogger('tntlog')

# Macros List from vortex's common.data.namelists
#: TNT's predefined list of macros
KNOWN_NAMELIST_MACROS = set(['NPROC', 'NBPROC', 'NBPROC_IO', 'NCPROC', 'NDPROC',
                             'NBPROCIN', 'NBPROCOUT', 'IDAT', 'CEXP',
                             'TIMESTEP', 'FCSTOP', 'NMODVAL', 'NBE', 'SEED',
                             'MEMBER', 'NUMOD', 'OUTPUTID', 'NRESX', 'PERTURB',
                             'JOUR', 'RES', 'LLADAJ', 'LLADMON', 'LLFLAG',
                             'LLARO', 'LLVRP', 'LLCAN', ])
# Other known namelist's macros (not to be substituted)
KNOWN_NAMELIST_MACROS.update(['substr6', 'substrA', 'substrC', 'XMP_TYPE',
                              'XLOPT_SCALAR', 'XNCOMBFLEN', 'val_sitr', 'val_sipr',
                              '_lbias_', '_lincr_'])

#: Output namelist sorting option: NO_SORTING. Keep the initial ordering.
NO_SORTING = 0
#: Output namelist sorting option: FIRST_ORDER_SORTING. Sort all keys within blocks.
FIRST_ORDER_SORTING = 1
#: Output namelist sorting option: SECOND_ORDER_SORTING. Sort only between indexes.
SECOND_ORDER_SORTING = 2


[docs]@six.add_metaclass(abc.ABCMeta) class AbstractNamelistAdapter(collections_abc.Mapping): """Every Namelist adapter must derive from this abstract class.""" @abc.abstractmethod def __init__(self, namelistsfile, macros=None): # @UnusedVariable """ :param str namelistsfile: The namelist itself or a path to a namelist file. """ self._parser = None @property def parser(self): """The internal namelist's parser.""" return self._parser # Public methods that operates on namelist's blocks
[docs] def add_blocks(self, blocks): """Add a set of new blocks in the present namelist's set. :param list[str] blocks: ['BLOCK1', 'BLOCK2', ...] """ for b in blocks: if b not in self: self._actual_newblock(b) else: tntlog.info('block "%s" is already present.', b)
[docs] def remove_blocks(self, blocks): """Remove a set of blocks from the present namelist's set. :param list[str] blocks: ['BLOCK1', 'BLOCK2', ...] """ for b in blocks: if b in self: self._actual_rmblock(b) else: tntlog.info('block "%s" to be removed but already missing.', b)
[docs] def move_blocks(self, blocks): """Move/Rename a set of blocks within the present namelist's set. :param dict[str] blocks: {'BLOCK_OLD':'BLOCK_NEW', ...} """ for (old_b, new_b) in blocks.items(): if old_b in self: if new_b not in self: self._actual_mvblock(old_b, new_b) else: raise ValueError('block "{:s} already present'.format(new_b)) else: tntlog.warning('block "%s" to be moved but missing from namelist: ignored.', old_b)
[docs] def check_blocks(self, another, macros=None): """ Check that the present namelist's set contains the same set of blocks as **another** does. **another** can be either the filename of a namelist to be read, or any kind of :class:`AbstractNamelistAdapter` instance. If **macros** is not None, it can contain the macros a.k.a. values to be replaced, e.g.: {'NPROC':8, 'substrA':None} will replace all NPROC values by 8 and will let substrA untouched. :return set: The set of blocks that differ. """ if not isinstance(another, AbstractNamelistAdapter): another = self.__class__(another, macros=macros) return set(self.keys()).symmetric_difference(set(another.keys()))
# Public methods that operates on namelist's keys
[docs] def add_keys(self, keys, doctor=False, indexes=None): """Set a set of keys in the present namelist's set. :param dict[tuple] keys: {('BLOCK','KEY'):value, ...} :param bool doctor: if true, try to convert value to DOCTOR norm according type :param dict[tuple] indexes: if present, set the keys at given index in block {('BLOCK','KEY'):index, ...} """ if indexes is None: indexes = {} for ((b, k), v) in keys.items(): idx = indexes.get((b, k), None) if b in self: self._actual_newkey(b, k, self._DOCTOR_convert(k, v) if doctor else v, index=idx) else: raise KeyError('block "{:s}" is missing: cannot set its "{:s}" key' .format(b, k))
[docs] def remove_keys(self, keys): """Remove a set of keys from the present namelist's set. :param list[tuple] keys: [('BLOCK1','KEY1'), ('BLOCK2','KEY2'), ...] """ for (b, k) in self._expand_keys(keys): if b in self: if k in self[b]: self._actual_rmkey(b, k) else: tntlog.info(('key "%s" to be removed but already missing from block "%s".', k, b)) else: tntlog.info('block "%s" missing: cannot remove its "%s" key.', b, k)
[docs] def move_keys(self, keys, doctor=False, keep_index=False): """ Move a set of keys within the present namelist's set. :param dict[tuple] keys: {('BLOCK_OLD','KEY_OLD'):('BLOCK_NEW','KEY_NEW'), ...} :param bool doctor: if True, try to convert value to DOCTOR norm according type :param bool keep_index: if True, moved keys in identical block keep the original index of key in block (except a sorting is requested later on. """ origin_keys = self._expand_keys(keys.keys(), radics=True) expanded_keys = {} for (ob, o_r, ok) in origin_keys: (nb, n_r) = keys[(ob, o_r)] expanded_keys[(ob, ok)] = (nb, ok.replace(o_r, n_r, 1)) for (ob, ok), (nb, nk) in expanded_keys.items(): if ob in self: if ok in self[ob]: if keep_index and ob == nb: idx = list(self[ob].keys()).index(ok) else: idx = None v = self[ob][ok] self.remove_keys([(ob, ok)]) if nb in self: if nk not in self[nb]: self.add_keys({(nb, nk): v}, doctor=doctor, indexes={(nb, nk): idx}) else: raise ValueError(('key "{:s}" in block "{:s}" ' + 'already exists: prevent moving from ' + 'block "{:s}" key "{:s}".') .format(nk, nb, ob, ok)) else: raise KeyError(('block "{:s}" missing: cannot move key "{:s}"' + 'from block "{:s}" to it as key "{:s}.') .format(nb, ok, ob, nk)) else: tntlog.warning('key "%s" missing from block "%s": cannot move it.', ok, ob) else: tntlog.warning('block "%s" missing: cannot move its key "%s".', ob, ok)
[docs] def squeeze(self): """Squeeze the namelist: remove empty blocks.""" self._actual_squeeze()
# Generic utility methods @staticmethod def _all_macros(arg_macros): macros = {k: None for k in KNOWN_NAMELIST_MACROS} if arg_macros is not None: macros.update(arg_macros) return macros def _expand_keys(self, keys, radics=False): """ Find all entries corresponding to the given keys, due to attributes and/or indexes. :param list[tuple] keys: [('BLOCK1','KEY1'), ('BLOCK2','KEY2'), ...] :param bool radics: add the radical in the tuples :return list[tuple]: The expanded list of namelist keys """ expanded_keys = [] for (b, k) in keys: if b in self: ek = [(b, nk) for nk in self[b].keys() if re.match(k.replace('(', r'\(').replace(')', r'\)') + r'(\(.+\)|%.+)*$', nk)] if radics: ek = [(b, k, nk) for (b, nk) in ek] expanded_keys.extend(ek) return set(expanded_keys) @staticmethod def _DOCTOR_convert(key, value, fatal=False): """ According to the DOCTOR norm, try to convert value to the adequate type. Cf. http://www.umr-cnrm.fr/gmapdoc/IMG/pdf/coding-rules.pdf """ t = key.split('%')[-1][0] try: if t in ('I', 'J', 'K', 'M', 'N'): try: value = int(value) except ValueError: raise ValueError('unable to convert variable {} from {} to int'. format(key, str(value))) elif t in ('L',): try: value = bool(value) except ValueError: raise ValueError('unable to convert variable {} from {} to bool'. format(key, str(value))) elif t in ('C',): try: value = str(value) except ValueError: raise ValueError('unable to convert variable {} from {} to str'. format(key, str(value))) else: try: value = float(value) except ValueError: raise ValueError('unable to convert variable {} from {} to float'. format(key, str(value))) except ValueError: if fatal: raise return value # Abstract methods that should be made available by concrete classes @abc.abstractmethod def __contains__(self, item): """Check if a block exists in the present namelist's set.""" pass @abc.abstractmethod def __getitem__(self, item): """Retrieve the content of a namelist block. :return: The namelist block content :rtype: any subclass of collections_abc.Mapping """ pass @abc.abstractmethod def __iter__(self): """Iterate through namelist blocks.""" pass @abc.abstractmethod def __len__(self): """The number of namelist blocks.""" pass @abc.abstractmethod def _actual_newblock(self, item): """Add a new block into the present namelist's set.""" pass @abc.abstractmethod def _actual_rmblock(self, item): """Remove a block from the present namelist's set.""" pass @abc.abstractmethod def _actual_mvblock(self, item, targetitem): """Move/Rename a block within the present namelist's set.""" pass @abc.abstractmethod def _actual_newkey(self, block, key, value, index=None): """Set a new key in the present namelist's set.""" pass @abc.abstractmethod def _actual_rmkey(self, block, key): """Remove a namelist keys from the present namelist's set.""" pass @abc.abstractmethod def _actual_squeeze(self): """Squeeze the namelist: remove empty blocks.""" pass
[docs] @abc.abstractmethod def dumps(self, sorting=NO_SORTING): """ Returns a string that represent the namelist's set (i.e. something readable by Fortran !) :param int sorting: The kind of sorting to apply within blocks """ pass
[docs] @abc.abstractmethod def merge(self, other): """Merge another namelist in the current one. :param AbstractNamelistAdapter other: Another namelist to merge in. """ pass
[docs]class AbstractMapableNamelistAdapter(AbstractNamelistAdapter): """ A generic NamelistAdapter that presumes that self._parser is itself some kind of Mapping object. """ @staticmethod def _assert_mapping(obj): assert isinstance(obj, collections_abc.Mapping), \ 'The "{!r}" must be some kind of Mapping.' def __contains__(self, item): """Check if a block exists in the present namelist's set.""" self._assert_mapping(self._parser) return item in self._parser def __getitem__(self, item): """Retrieve the content of a namelist block. :return: The namelist block content :rtype: any subclass of collections_abc.Mapping """ self._assert_mapping(self._parser) value = self._parser[item] self._assert_mapping(value) return value def __iter__(self): """Iterate through namelist blocks.""" self._assert_mapping(self._parser) return self._parser.__iter__() def __len__(self): """The number of namelist blocks.""" self._assert_mapping(self._parser) return len(self._parser)
[docs]class BronxNamelistAdapter(AbstractMapableNamelistAdapter): """ A NamelistAdapter that relies on the namelist parser provided by the :mod:`bronx` package. """ def __init__(self, namelistsfile, macros=None): super(BronxNamelistAdapter, self).__init__(namelistsfile) # Delay the import of the bronx library since one may want to use another backend from bronx.datagrip import namelist actual_macros = self._all_macros(macros) self._parser = namelist.namparse(namelistsfile, macros=actual_macros) for macro, value in actual_macros.items(): self._parser.setmacro(macro, value) def _actual_newblock(self, item): self.parser.newblock(item) def _actual_rmblock(self, item): del self.parser[item] def _actual_mvblock(self, item, targetitem): self.parser.mvblock(item, targetitem) def _actual_newkey(self, block, key, value, index=None): self[block].setvar(key, value, index=index) def _actual_rmkey(self, block, key): del self[block][key] def _actual_squeeze(self): for b in list(self.parser.keys()): if len(self.parser[b]) == 0: self._actual_rmblock(b)
[docs] def dumps(self, sorting=NO_SORTING): """Returns a string that represent the namelist's set.""" from bronx.datagrip import namelist as bnamelists sorting_map = dict(NO_SORTING=bnamelists.NO_SORTING, FIRST_ORDER_SORTING=bnamelists.FIRST_ORDER_SORTING, SECOND_ORDER_SORTING=bnamelists.SECOND_ORDER_SORTING) return self.parser.dumps(sorting=sorting_map.get(sorting, sorting))
[docs] def merge(self, other): """Merge another namelist in the current one. :param AbstractNamelistAdapter other: Another namelist to merge in. """ assert isinstance(other, self.__class__) self.parser.merge(other.parser)