import sys, os
from pathlib import Path
from copy import deepcopy, copy
import omnibelt as belt
from collections import defaultdict, OrderedDict
from omnibelt import save_yaml, load_yaml, get_printer
from .util import primitives, global_settings, configurize, pythonize, ConfigurizeFailed
from .errors import PythonizeError, MissingParameterError, UnknownActionError, InvalidKeyError
prt = get_printer(__name__)
_printing_instance = None
class Config_Printing:
'''
Internal class to manage the printing pulls/pushes of the config object. (eg. indent/line styles)
'''
def __new__(cls, *args, **kwargs): # singleton
global _printing_instance
if _printing_instance is None:
_printing_instance = super().__new__(cls)
_printing_instance.level = 0
_printing_instance.is_new_line = True
_printing_instance.unit = ' > '
_printing_instance.style = '| '
_printing_instance.silent = False
_printing_instance.src = None
return _printing_instance
def __repr__(self):
return f'ConfigPrinting[{hex(id(self))}]'
def __str__(self):
return f'ConfigPrinting'
# def set_src(self, src):
# self.src = src
def inc_indent(self):
self.level += 1
def dec_indent(self):
self.level = max(0, self.level-1)
def process_addr(self, *terms):
addr = []
skip = False
for term in terms[::-1]:
if term == '':
skip = True
addr.append('')
elif term is None:
pass
elif not skip:
addr.append(term)
else:
skip = False
if not len(addr):
return '.'
addr = '.'.join(addr[::-1])
return addr
def log_record(self, raw, end='\n',
silent=False):
indent = self.level * self.unit
style = self.style
src = '' if self.src is None else f'({self.src}) '
prefix = style + src + indent
msg = raw.replace('\n', '\n' + prefix)
if not self.is_new_line:
prefix = ''
msg = f'{prefix}{msg}{end}'
base = self.silent
if not (silent or base):
print(msg, end='')
self.is_new_line = (len(msg) == 0 and self.is_new_line) or msg[-1] == '\n'
return msg
[docs]class ConfigType(belt.Transactionable):
'''
The abstract super class of config objects.
The most important methods:
- ``push()`` - set a parameter in the config
- ``pull()`` - get a parameter in the config
- ``sub()`` - get a sub branch of this config
- ``seq()`` - iterate through all contents
- ``update()`` - update a config with a different config
- ``export()`` - save the config object as a yaml file
Another important property of the config object is that it acts as a tree where each node can hold parameters.
If a parameter cannot be found at one node, it will search up the tree for the parameter.
Config objects also allow for "deep" gets/sets, which means you can get and set parameters not just in
the current node, but any number of nodes deeper in the tree by passing a list/tuple of keys
or keys separated by "." (hereafter called the "address" of the parameter).
Note: that all parameters must not contain "." and should generally be valid python identifiers
(strings with no white space that don't start with a number).
'''
def __init__(self, parent=None, printer=None,
prefix=None, safe_mode=False, project=None,
data=None):
'''
Generally it should not be necessary to create a ConfigType manually, instead use ``get_config()``.
:param parent: parent config used for defaults
:param printer: printer object to handle printing messages
:param prefix: initial prefix used for printing
:param project: project this config responds to
:param safe_mode: don't save created component instances, unless during a transaction
:param data: raw parameters to immediately add to this config
'''
if printer is None:
printer = Config_Printing()
self.__dict__['_printer'] = printer
if prefix is None:
prefix = []
self.__dict__['_prefix'] = prefix
self.__dict__['_hidden_prefix'] = prefix.copy()
self.__dict__['_safe_mode'] = safe_mode
self.__dict__['_project'] = None
self.set_parent(parent)
# self._set_silent(silent)
if project is not None:
self.set_project(project)
super().__init__()
if data is not None:
self.update(data)
[docs] def __deepcopy__(self, memodict={}):
raise NotImplementedError
[docs] def __copy__(self):
raise NotImplementedError
[docs] def copy(self):
'''shallow copy of the config object'''
return copy(self)
[docs] def pythonize(self):
return pythonize(self)
[docs] @classmethod
def convert(cls, data, recurse):
'''used by configurize to turn a nested python object into a config object'''
return cls(data=[recurse(x) for x in data])
[docs] def sub(self, item):
'''
Used to get a subbranch of the overall config
:param item: address of the branch to return
:return: config object at the address
'''
# TODO: replace with a pull(raw=True)
val = self.get_nodefault(item)
if isinstance(item, (list, tuple)):
item = '.'.join(item)
if isinstance(val, ConfigType):
val._set_prefix(self.get_prefix() + [item])
val._store_prefix()
return val
[docs] def update(self, other):
'''
Used to merge two config nodes (and their children) together
This method must be implemented by child classes depending on how the contents of the node is stored
:param other: config node to overwrite ``self`` with
:return: None
'''
try:
return super().update(other)
except AttributeError:
raise NotImplementedError
def _record_action(self, action, suffix=None, val=None, silent=False, obj=None,
defaulted=False, pushed=False, _entry=False, _raw=False):
'''
Internal function to manage printing out messages after various actions have been taken with this config.
:param action: name of the action (see code for examples)
:param suffix: suffix of the address (aka. last item in the address)
:param val: contents at that address
:param silent: suppress printing a message for this action
:param obj: object created or used in this action (eg. a newly created component)
:param defaulted: is a default value being used
:param pushed: has this value just been pushed
:param _entry: internal flag used for printing messages
:return: formatted message corresponding to this action
'''
printer = self._get_printer()
if action == 'defaulted':
return ''
name = printer.process_addr(*self.get_prefix(), suffix)
if pushed:
name = '[Pushed] ' + name
if 'alias' in action:
return printer.log_record(f'{name} --> ', end='', silent=silent)
origins = ' (by default)' if defaulted else ''
if action == 'removing':
obj_type = action.split('-')[-1]
return printer.log_record(f'REMOVING {name}', silent=silent)
if action in {'creating', 'reusing'}:
assert val is not None, 'no info provided'
cmpn_type = val.pull('_type', silent=True)
mods = val.pull('_mod', None, silent=True)
mod_info = ''
if mods is not None and len(mods):
if not isinstance(mods, (list, tuple)):
mods = mods,
mod_info = ' (mods=[{}])'.format(', '.join(m for m in mods)) if len(mods) > 1 \
else f' (mod={mods[0]})'
end = ''
if action == 'reusing':
assert obj is not None, 'no object provided'
end = f': id={hex(id(obj))}'
# head = '' if suffix is None else f' {name}'
head = f' {name}'
out = printer.log_record(f'{action.upper()}{head} (type={cmpn_type}){mod_info}{origins}{end}',
silent=silent)
if action == 'creating':
printer.inc_indent()
return out
if action in {'iter-dict', 'iter-list'}:
obj_type = action.split('-')[-1]
head = '' if suffix is None else f'{name} [{obj_type} with {len(val)} item/s]'
return printer.log_record(f'ITERATOR {head}{origins}', silent=silent)
if action in {'pull-dict', 'pull-list'}:
assert val is not None, 'no obj provided'
obj_type = action.split('-')[-1]
out = printer.log_record(f'{name} [{obj_type} with {len(val)} item/s]', silent=silent)
printer.inc_indent()
return out
if action in {'created', 'pulled-dict', 'pulled-list'}:
assert obj is not None, 'no object provided'
printer.dec_indent()
return ''
# return printer.log_record(f'=> id={hex(id(obj))}', silent=silent)
pval = None if val is None else (f'[{type(val)}]' if _raw else repr(val))
if action == 'entry': # when pulling dict/list
assert suffix is not None, 'no suffix provided'
return printer.log_record(f'({suffix}): ', end='',
silent=(silent or self._get_silent()))
if action == 'pulled':
head = '' if suffix is None else f'{name}: '
if _entry:
head = ''
return printer.log_record(f'{head}{pval}{origins}', silent=silent)
raise UnknownActionError(action)
[docs] def pull(self, item, *defaults, silent=False, ref=False, no_parent=False, as_iter=False, raw=False):
'''
Top-level function to get parameters from the config object (including automatically creating components)
:param item: address of the parameter to get
:param defaults: default values to use if ``item`` is not found
:param silent: suppress printing message that this parameter was pulled
:param ref: if the parameter is a component that has already been created, get a reference to the created component instead of creating a new instance
:param no_parent: don't default to a parent node if the ``item`` is not found here
:param as_iter: return an iterator over the selected value (only works if the value is a dict/list)
:return: processed value of the parameter (or default if ``item`` is not found, or raises a ``MissingConfigError`` if not found)
'''
self._reset_prefix()
return self._pull(item, *defaults, silent=silent, ref=ref, no_parent=no_parent, as_iter=as_iter,
_origin=self, _raw=raw)
[docs] def pull_self(self, name=None, silent=False, as_iter=False, raw=False, ref=False):
'''
Process self as a value being pulled.
:param name: Name given to self for printed message
:param silent: suppress printing message
:param as_iter: Return self as an iterator (has same effect as calling ``seq()``)
:return: the processed value of self
'''
self._reset_prefix()
return self._process_val(name, self, silent=silent, reuse=ref, is_self=True, as_iter=as_iter,
_origin=self, _raw=raw)
def _pull(self, item, *defaults, silent=False, ref=False, no_parent=False, as_iter=False,
_defaulted=False, _origin=None, _raw=False, _allow_cousin=True):
'''
Internal pull method, should generally not be called manually (unless you know what you're doing)
:param item: remaining address to find
:param defaults: any default values that can be used if address is not found
:param silent: suppress messages
:param ref: return an instance of the value instead of creating a new instance, if one exists
:param no_parent: do not check for the parameter in the parent
:param as_iter: return the value as an iterator (only for dicts/lists)
:param _defaulted: flag that this value was once a provided default (used for printing)
:param _origin: reference to the original config node that was pulled (some pulls require returing to origin)
:param _raw: return unprocessed value
:return: processed value found at ``item`` or a processed default value
'''
# TODO: change defaults to be a keyword argument providing *1* default, and have item*s* instead,
# which are the keys to be checked in order
if '.' in item:
item = item.split('.')
line = []
if isinstance(item, (list, tuple)):
line = item[1:]
item = item[0]
defaulted = item not in self
byparent = not self.contains_nodefault(item)
if no_parent and byparent:
defaulted = True
if defaulted:
try:
if '__origin_key' in self and _allow_cousin: # cousins
origin = self['__origin_key']
if origin is not None:
parent = self.get_parent()
if parent is not None:
grandparent = parent.get_parent()
if grandparent is not None:
return grandparent._pull((origin, item), silent=silent, _defaulted=defaulted or _defaulted,
as_iter=as_iter, ref=ref, _origin=_origin, _raw=_raw, _allow_cousin=False)
except MissingParameterError:
pass
if len(defaults) == 0:
raise MissingParameterError(item)
val, *defaults = defaults
line = []
else:
val = self[item]
if len(line) and not isinstance(val, ConfigType):
defaulted = True
if len(defaults) == 0:
raise MissingParameterError(item)
val, *defaults = defaults
if defaulted and _origin is not None: # try again with new value
_origin._reset_prefix()
val = _origin._process_val(item, val, *defaults, silent=silent, defaulted=defaulted or _defaulted,
as_iter=as_iter, reuse=ref, _origin=_origin, _raw=_raw)
elif len(line): # child pull
if not isinstance(val, ConfigType):
prt.warning(f'Pulling through a non-config object: {val}')
val._set_prefix(self.get_prefix() + [item])
out = val._pull(line, *defaults, silent=silent, ref=ref, no_parent=no_parent, as_iter=as_iter,
_defaulted=_defaulted, _origin=_origin, _raw=_raw, _allow_cousin=_allow_cousin)
val = out
elif byparent and not item.startswith('_'): # parent pull
parent = self.get_parent()
parent._set_prefix(self.get_prefix() + [''])
val = parent._pull((item, *line), *defaults, silent=silent, ref=ref, no_parent=no_parent, as_iter=as_iter,
_origin=_origin, _raw=_raw, _allow_cousin=_allow_cousin)
else: # process val from me/defaults
val = self._process_val(item, val, *defaults, silent=silent, defaulted=defaulted or _defaulted,
as_iter=as_iter, reuse=ref, _origin=_origin, _raw=_raw)
if type(val) in {list, set}: # TODO: a little heavy handed
val = tuple(val)
return val
def _process_val(self, item, val, *defaults, silent=False,
reuse=False, is_self=False, as_iter=False, _origin=None, _raw=False,
**record_flags):
'''
This is used by ``pull()`` to process the recovered value and print the correct message if ``not silent``
:param item: remaining address where this value was found
:param val: value that was found given the original address
:param defaults: any additional defaults to use if processing fails with ``val``
:param silent: suppress messages
:param reuse: if an instance is found (under ``__obj``) then that should be returned instead of creating a new one
:param is_self: this config object should be returned after processing
:param as_iter: return an iterator of ``val`` (only works if ``val`` is a list/dict)
:param _origin: original config node where the pull or push request was intially called
:param record_flags: additional flags used for printing.
:return: processed value
'''
if as_iter and isinstance(val, ConfigType):
obj_type = 'list' if isinstance(val, list) else 'dict'
if obj_type == 'list' or '_type' not in val or not val.pull('_type', silent=True).startswith('iter'):
self._record_action(f'iter-{obj_type}', suffix=item, val=val, silent=silent, **record_flags)
if _raw:
return val
itr = ConfigIter(val, val)
return itr
if isinstance(val, ConfigDict) and not _raw:
val.push('__origin_key', item, silent=True)
typ = val._pull('_type', None, silent=True)
if typ is not None:
if reuse and '__obj' in val:
# print('WARNING: would usually reuse {} now, but instead creating a new one!!')
cmpn = val['__obj']
self._record_action('reusing', suffix=item, val=val, obj=cmpn, silent=silent, **record_flags)
else:
self._record_action('creating', suffix=item, val=val, silent=silent, **record_flags)
hidden = val._get_hidden_prefix()
val.get_prefix().clear()
val._store_prefix()
past = self._get_silent()
with self.silenced(silent or past):
cmpn = self.get_project().create_component(val)
if not self.in_safe_mode() or self.in_transaction():
if item is not None and len(item) and not is_self:
val['__obj'] = cmpn
else:
self['__obj'] = cmpn
val._set_prefix(hidden)
val._store_prefix()
self._record_action('created', suffix=item, val=val, obj=cmpn, silent=silent, **record_flags)
val = cmpn
else:
val.push('__origin_key', '_x_', silent=True)
self._record_action('pull-dict', suffix=item, val=val, silent=silent, **record_flags)
terms = {}
for k, v in val.items(): # WARNING: pulls all entries in dict
k = str(k)
self._record_action('entry', silent=silent, suffix=k)
terms[k] = val._process_val(k, v, reuse=reuse, silent=silent, _origin=_origin, _entry=True)
self._record_action('pulled-dict', suffix=item, val=val, obj=terms, silent=silent, **record_flags)
val = terms
elif isinstance(val, ConfigList) and not _raw:
self._record_action('pull-list', suffix=item, val=val, silent=silent, **record_flags)
terms = []
for i, v in enumerate(val): # WARNING: pulls all entries in list
self._record_action('entry', silent=silent, suffix=str(i))
terms.append(val._process_val(str(i), v, reuse=reuse, silent=silent, _origin=_origin, _entry=True))
self._record_action('pulled-list', suffix=item, val=val, obj=terms, silent=silent, **record_flags)
val = terms
elif isinstance(val, str) and val.startswith('<>'): # local alias (looks for alias locally)
alias = val[2:]
self._record_action('local-alias', suffix=item, val=alias, silent=silent, **record_flags)
val = self._pull(alias, *defaults, silent=silent, _origin=_origin, as_iter=as_iter, ref=reuse)
elif isinstance(val, str) and val.startswith('<o>'): # origin alias (returns to origin to find alias)
alias = val[3:]
self._record_action('origin-alias', suffix=item, val=alias, silent=silent, **record_flags)
_origin._reset_prefix()
val = _origin._pull(alias, *defaults, silent=silent, _origin=_origin, as_iter=as_iter)
elif isinstance(val, str) and val.startswith('<!>'): # copy alias (only for local aliases)
alias = val[3:]
self._record_action('copy-alias', suffix=item, val=alias, silent=silent, **record_flags)
val = self._pull(alias, *defaults, silent=silent, _origin=_origin, as_iter=as_iter, _raw=True)
# self[item] = deepcopy(val)
val = deepcopy(val)
self[item] = val
return self._process_val(item, val, silent=silent, _origin=_origin, as_iter=as_iter, _raw=_raw)
else:
self._record_action('pulled', suffix=item, val=val, silent=silent, _raw=_raw, **record_flags)
return val
[docs] def push(self, key, val, *_skip, silent=False, overwrite=True, no_parent=True, force_root=False, process=True):
'''
Set ``key`` with ``val`` in the config object, but pulls ``key`` first so that `val` is only set
if it is not found or ``overwrite`` is set to ``True``. It will return the current value of ``key`` after
possibly setting with ``val``.
:param key: key to check/set (can be list or '.' separated string)
:param val: data to possibly write into the config object
:param _skip: soak up all other positional arguments to make sure the remaining are keyword only
:param silent: Do not print messages
:param overwrite: If key is already set, overwrite with (configurized) 'val'
:param no_parent: Do not check parent object if not found in self
:param force_root: Push key to the root config object
:return: current val of key (updated if written)
'''
self._reset_prefix()
return self._push(key, val, silent=silent, overwrite=overwrite, no_parent=no_parent,
force_root=force_root, process=process)
def _push(self, key, val, silent=False, overwrite=True, no_parent=True, force_root=False,
process=True, _origin=None):
'''
Set ``key`` with ``val`` in the config object, but pulls ``key`` first so that `val` is only set
if it is not found or ``overwrite`` is set to ``True``. It will return the current value of ``key`` after
possibly setting with ``val``.
:param key: key to check/set (can be list or '.' separated string)
:param val: data to possibly write into the config object
:param _skip: soak up all other positional arguments to make sure the remaining are keyword only
:param silent: Do not print messages
:param overwrite: If key is already set, overwrite with (configurized) 'val'
:param no_parent: Do not check parent object if not found in self
:param force_root: Push key to the root config object
:return: current val of key (updated if written)
'''
if '.' in key:
key = key.split('.')
line = []
if isinstance(key, (list, tuple)):
line = key[1:]
key = key[0]
exists = self.contains_nodefault(key)
parent = self.get_parent()
if parent is not None and force_root:
return self.get_root().push((key, *line), val, silent=silent, overwrite=overwrite,
no_parent=no_parent, process=process)
elif no_parent:
parent = None
if exists and len(line): # push me
child = self.get_nodefault(key)
child._set_prefix(self.get_prefix() + [key])
out = child._push(line, val, silent=silent, overwrite=overwrite, no_parent=True, process=process)
return out
elif parent is not None and key in parent: # push parent
parent._set_prefix(self.get_prefix() + [''])
out = parent._push((key, *line), val, silent=silent, overwrite=overwrite,
no_parent=no_parent, process=process)
return out
elif len(line): # push child
child = self.get_nodefault(key)
child._set_prefix(self.get_prefix() + [key])
out = child._push(line, val, silent=silent, overwrite=overwrite, no_parent=True, process=process)
return out
if exists and not overwrite:
return self._pull(key, silent=True)
if isinstance(val, str) and val == '_x_':
if exists:
self._record_action('removing', suffix=key, val=val, silent=silent)
del self[key]
return
val = configurize(val)
self[key] = val
# val = self[key]
if process:
val = self._process_val(key, val, silent=silent, pushed=True)
return val
[docs] def export(self, path=None):
'''
Convert all data to raw data (using dict/list) and save as yaml file to ``path`` if provided.
Also returns yamlified data.
:param path: path to save data in this config (data is not saved to disk if not provided)
:return: raw "yamlified" data
'''
data = pythonize(self)
if path is not None:
path = Path(path)
if path.is_dir():
path = path / 'config.yaml'
save_yaml(data, path)
return path
return data
[docs] def seq(self):
'''
Returns an iterator over the contents of this config object where elements are lazily
processed during iteration (see ``ConfigIter`` for details).
:return: iterator over all arguments in self
'''
return ConfigIter(self, self)
[docs] def replace_vals(self, replacements):
raise NotImplementedError
# region Silencing
[docs] def set_silent(self, silent=True):
'''Sets whether pushes and pulls on this config object should be printed out to stdout'''
self.__dict__['_printer'].silent = silent
[docs] def silence(self, silent=True):
self.set_silent(silent)
# self._silent_config_flag = silent
def _get_silent(self):
return self.__dict__['_printer'].silent
class _Silent_Config:
'''Internal context manager to silence a config object'''
def __init__(self, config, setting):
self.config = config
self.setting = setting
self.prev = config._get_silent()
def __enter__(self):
self.config.set_silent(self.setting)
return self.config
def __exit__(self, exc_type, exc_val, exc_tb):
self.config.set_silent(self.prev)
[docs] def silenced(self, setting=True):
'''Returns a context manager to silence this config object'''
return ConfigType._Silent_Config(self, setting=setting)
# endregion
# region Parents
[docs] def is_root(self): # TODO: move to tree space
'''Check if this config object has a parent for defaults'''
return self.get_parent() is None
[docs] def set_parent(self, parent):
'''Sets the parent config object to be checked when a parameter is not found in `self`'''
self.__dict__['_parent'] = parent
[docs] def get_parent(self):
'''Get parent (returns None if this is the root)'''
return self.__dict__['_parent']
[docs] def set_process_id(self, name=None):
'''Set the unique ID to include when printing out pulls from this object'''
self._get_printer().src = name
[docs] def get_root(self):
'''Gets the root config object (returns ``self`` if ``self`` is the root)'''
parent = self.get_parent()
if parent is None:
return self
return parent.get_root()
# endregion
# region Addressing
def _get_printer(self):
return self.__dict__['_printer']
def get_prefix(self):
return self.__dict__['_prefix']
[docs] def _set_prefix(self, prefix):
self.__dict__['_prefix'] = prefix
[docs] def _reset_prefix(self):
self._set_prefix(self._get_hidden_prefix())
[docs] def _get_hidden_prefix(self):
return self.__dict__['_hidden_prefix']
def _store_prefix(self):
self.__dict__['_hidden_prefix'] = self.get_prefix().copy()
# endregion
# region Misc
[docs] def set_project(self, project):
self.get_root().__dict__['_project'] = project
[docs] def get_project(self):
return self.get_root().__dict__['_project']
[docs] def set_safe_mode(self, safe_mode):
if self.is_root():
self.__dict__['_safe_mode'] = safe_mode
else:
self.get_root().set_safe_mode(safe_mode)
[docs] def in_safe_mode(self):
if self.is_root():
return self.__dict__['_safe_mode']
return self.get_root().in_safe_mode()
[docs] def purge_volatile(self):
'''
Recursively remove any items where the key starts with "__"
Must be implemented by the child class
:return: None
'''
raise NotImplementedError
def __repr__(self):
return f'{type(self).__name__}[id={hex(id(self))}]'
def __str__(self):
# return repr(self)
return f'<{type(self).__name__}>'
# endregion
# region Get/Set/Contains
def __setitem__(self, key, value):
if isinstance(key, str) and '.' in key:
key = key.split('.')
if isinstance(key, (list, tuple)):
if len(key) == 1:
return self.__setitem__(key[0], value)
child = self.get_nodefault(*key)
assert isinstance(child, ConfigType)
return child.__setitem__(key[1:], value)
value = configurize(value)
if isinstance(value, ConfigType):
value.set_parent(self)
# value.set_project(self.get_project())
return self._single_set(key, value)
def __getitem__(self, item, *future):
if isinstance(item, str) and '.' in item:
item = item.split('.')
if isinstance(item, (list, tuple)):
if len(item) == 1:
item = item[0]
else:
return self.__getitem__(*item)[item[1:]]
parent = self.get_parent()
if not self.contains_nodefault(item) \
and parent is not None \
and item[0] != '_':
return parent[item]
return self._single_get(item, *future)
def get_nodefault(self, item, *future):
'''Get ``item`` without defaulting up the tree if not found.'''
if isinstance(item, str) and '.' in item:
item = item.split('.')
if isinstance(item, (list, tuple)):
if len(item) == 1:
item = item[0]
else:
return self.get_nodefault(*item)[item[1:]]
return self._single_get(item, *future)
[docs] def _single_set(self, key, val):
return super().__setitem__(key, val)
def _single_get(self, item, *context):
try:
val = super().__getitem__(item)
if val is EmptyElement and len(context):
raise KeyError(item)
except KeyError:
return self._missing_key(item, *context)
return val
[docs] def _missing_key(self, key, *context):
cls = ConfigDict
if len(context):
nxt = context[0]
try:
int(nxt)
except ValueError:
pass
else:
cls = ConfigList
obj = cls(parent=self)
self.__setitem__(key, obj)
return obj
def __contains__(self, item):
'''Check if ``item`` is in this config, item can be "deep" (multiple steps'''
if isinstance(item, str) and '.' in item:
item = item.split('.')
if isinstance(item, (tuple, list)):
if len(item) == 1:
item = item[0]
else:
return item[0] in self and item[1:] in self[item[0]]
parent = self.get_parent()
return self.contains_nodefault(item) \
or (not super().__contains__(item)
and parent is not None
and item[0] != '_'
and item in parent)
def contains_nodefault(self, item):
'''Check if ``item`` is contained in this config object without defaulting up the tree if ``item`` is not found'''
if isinstance(item, str) and '.' in item:
item = item.split('.')
if isinstance(item, (tuple, list)):
if len(item) == 1:
item = item[0]
else:
return self.contains_nodefault(item[0]) and self[item[0]].contains_nodefault(item[1:])
if super().__contains__(item):
return self.get_nodefault(item) is not '__x__'
return False
# endregion
[docs]class ConfigDict(ConfigType, belt.tdict):
'''
Dict like node in the config.
Keys should all be valid python attributes (strings with no whitespace, and not starting with a number).
NOTE: avoid setting keys that start with more than one underscore (especially '__obj')
(unless you really know what you are doing)
'''
[docs] def __deepcopy__(self, memodict={}):
new = self.__class__(data={k:deepcopy(v) for k,v in self.items() if not k.startswith('__')})
new.__dict__.update(self.__dict__)
return new
[docs] def __copy__(self):
new = self.__class__(data={k:v for k,v in self.items()})
new.__dict__.update(self.__dict__)
return new
[docs] def copy(self):
return copy(self)
[docs] def replace_vals(self, replacements):
for k,v in self.items():
if isinstance(v, primitives) and v in replacements:
self[k] = replacements[v]
elif isinstance(v, ConfigType):
v.replace_vals(replacements)
[docs] @classmethod
def convert(cls, data, recurse):
return cls(data={k: recurse(v) for k, v in data.items()})
[docs] def update(self, other):
'''
Merge self with another dict-like object
:param other: must be dict like
:return: None
'''
assert isinstance(other, dict), f'invalid: {type(other)}'
for k, v in other.items():
isconfig = isinstance(v, ConfigType)
exists = self.contains_nodefault(k)
if exists and '_x_' == v: # reserved for deleting settings in parents
del self[k]
elif exists and isconfig and \
(isinstance(v, self[k].__class__)
or isinstance(self[k], v.__class__)):
self[k].update(v)
else:
self[k] = v
[docs] def purge_volatile(self):
'''
Recursively remove any items where the key starts with "__"
:return: None
'''
bad = []
for k, v in self.items():
if k.startswith('__'):
bad.append(k)
elif isinstance(v, ConfigType):
v.purge_volatile()
for k in bad:
del self[k]
def __repr__(self):
info = '{{' + ', '.join(f'{k}' for k in self) + '}}'
return f'[{hex(id(self))}]{info}'
def __str__(self):
return repr(self)
return '{{' + ', '.join(f'{k}' for k in self) + '}}'
[docs]class EmptyElement:
pass
[docs]class ConfigList(ConfigType, belt.tlist):
'''
List like node in the config.
'''
def __init__(self, *args, empty_fill_value=EmptyElement, **kwargs):
super().__init__(*args, **kwargs)
self._empty_fill_value = empty_fill_value
[docs] def __deepcopy__(self, memodict={}):
new = self.__class__(data=[deepcopy(x) for x in self])
new.__dict__.update(self.__dict__)
return new
[docs] def __copy__(self):
new = self.__class__(data=[x for x in self])
new.__dict__.update(self.__dict__)
return new
def __repr__(self):
info = '[[' + ', '.join(f'{k}' for k in self) + ']]'
return f'[{hex(id(self))}]{info}'
def __str__(self):
return '[[' + ', '.join(f'{k}' for k in self) + ']]'
[docs] def replace_vals(self, replacements):
for i, x in enumerate(self):
if isinstance(x, primitives) and x in replacements:
self[i] = replacements[x]
elif isinstance(x, ConfigType):
x.replace_vals(replacements)
[docs] def purge_volatile(self):
'''
Recursively remove any items where the key starts with "__"
:return: None
'''
for x in self:
if isinstance(x, ConfigType):
x.purge_volatile()
[docs] def _str_to_int(self, item):
'''Convert the input items to indices of the list'''
if isinstance(item, int):
return item
try:
return int(item)
except (TypeError, ValueError):
pass
raise InvalidKeyError(f'failed to convert {item} to an index')
[docs] def update(self, other):
'''Overwrite ``self`` with the provided list ``other``'''
for i, x in enumerate(other):
isconfig = isinstance(x, ConfigType)
if len(self) == i:
self.append(x)
elif isconfig and (isinstance(x, self[i].__class__) or isinstance(self[i], x.__class__)):
self[i].update(x)
elif x is not EmptyElement:
self[i] = x
[docs] def _single_set(self, key, val):
if key == '_': # append to end of the list
key = len(self)
key = self._str_to_int(key)
if key >= len(self):
self.extend([self._empty_fill_value] * (key - len(self) + 1))
return super()._single_set(key, val)
def _single_get(self, item, *context):
if isinstance(item, slice):
return super(ConfigType, self).__getitem__(item)
if item == '_': # append to end of the list
item = len(self)
item = self._str_to_int(item)
if item < 0:
item += len(self)
if item >= len(self):
self.extend([self._empty_fill_value] * (item - len(self) + 1))
return super()._single_get(item, *context)
[docs] def push(self, first, *rest, overwrite=True, **kwargs):
'''
When pushing to a list, if you don't provide an index, the value is automatically pushed to the end of the list
:param first: if no additional args are provided in `rest`, then this is used as the value and the key is the end of the list, otherwise this is used as key and the first element in `rest` is the value
:param rest: optional second argument to specify the key, rather than defaulting to the end of the list
:param kwargs: same keyword args as for the ConfigDict
:return: same as for ConfigDict
'''
if len(rest):
val, *rest = rest
return super().push(first, val, *rest, overwrite=overwrite, **kwargs)
val = first
key = len(self)
self.append(None)
return super().push(key, val, *rest, overwrite=True, **kwargs) # note that *rest will have no effect
def contains_nodefault(self, item):
try:
idx = self._str_to_int(item)
except InvalidKeyError:
return isinstance(item, slice)
N = len(self)
return -N <= idx < N
[docs] def append(self, item):
super().append(item)
if isinstance(item, ConfigType):
item.set_parent(self)
# item.set_project(self.get_project())
[docs] def extend(self, item):
super().extend(item)
for x in item:
if isinstance(x, ConfigType):
x.set_parent(self)
# x.set_project(self.get_project())
[docs]class ConfigIter:
'''
Iterate through a list of parameters, processing each item lazily,
ie. only when it is iterated over (with ``next()``)
'''
def __init__(self, origin, elements=None, auto_pull=True, include_key=None, reversed=False):
'''
Can be used as a component or created manually (by providing the ``elements`` argument explicitly)
For dicts, this will behave like ``.items()``, ie. for each entry in the dict it will return
a tuple of the key and value.
:param origin: config object where the iterator info is
:param elements: manually provided elements to iterate over (uses contents of "_elements" in ``origin`` if not provided)
'''
# self._name = config._ident
# assert '_elements' in config, 'No elements found'
if elements is None:
elements = origin['_elements']
self._elms = elements
self._keys = [k for k in self._elms.keys()
if k not in {'_elements', '_mod', '_type', '__obj', '__origin_key'}
and self._elms[k] != '__x__'] \
if isinstance(self._elms, dict) else None
self._include_key = include_key if include_key is not None else self._keys is not None
self._prefix = origin.get_prefix().copy()
self._reversed = False
self._idx = 0
self.set_reversed(reversed)
self.set_auto_pull(auto_pull)
def __len__(self):
'''Returns the remaining length of this iterator instance'''
return len(self._elms if self._keys is None else self._keys) - self._idx
def _next_idx(self):
'''Find the next index or key'''
if self._keys is None:
if 0 > self._idx >= len(self._elms):
raise StopIteration
return str(self._idx)
# if not self._elms.contains_nodefault(self._idx):
# raise StopIteration
# return str(self._idx)
while 0 <= self._idx < len(self._elms):
idx = self._keys[self._idx]
if self._elms.contains_nodefault(idx):
return idx
self._idx += (-1)**self._reversed
raise StopIteration
[docs] def view(self):
'''Returns the next object without processing the item, may throw a StopIteration exception'''
idx = self._next_idx()
obj = self._elms.pull(idx, raw=True, silent=True)
# obj = self._elms[idx]
if isinstance(obj, ConfigType):
obj.push('_iter_key', idx, silent=True)
if isinstance(obj, global_settings['config_type']):
obj = self._elms.sub(idx)
return (idx, obj) if self._include_key else obj
[docs] def step(self):
obj = self.view()
self._idx += (-1)**self._reversed
return obj
[docs] def set_auto_pull(self, auto=True):
self._auto_pull = auto
[docs] def set_reversed(self, reversed=True):
self._reversed = reversed
if reversed:
self._idx = len(self)-1
[docs] def has_next(self):
return (self._reversed and self._idx >= 0) or (not self._reversed and self._idx < len(self._elms))
def __next__(self):
if not self.has_next():
raise StopIteration
obj = self.step()
key, val = obj if self._include_key else (None,obj)
if isinstance(val, global_settings['config_type']):
val = val.pull_self(raw=not self._auto_pull, silent=not self._auto_pull)
return (key,val) if self._include_key else val
[docs] def __iter__(self):
return self
nones = {'None', 'none', '_none', '_None', 'null', 'nil', }
[docs]def configurize_nones(s, recurse):
'''Turns strings into None, if they match the expected patterns'''
if s in nones:
return None
raise ConfigurizeFailed
global_settings.update({
'config_type': ConfigType,
'config_converters': OrderedDict([
(str, configurize_nones),
(dict, (False, ConfigDict.convert)),
(OrderedDict, (False, ConfigDict.convert)),
(list, ConfigList.convert),
# (tuple, ConfigList.convert),
(set, ConfigList.convert),
]),
})