Source code for properties.base.base

"""base.py: HasProperties class and metaclass"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from collections import OrderedDict
from warnings import warn

from six import iteritems, itervalues, PY2, string_types, with_metaclass

from .. import basic
from .. import handlers
from .. import utils

if PY2:
    from types import ClassType                                                #pylint: disable=no-name-in-module
    CLASS_TYPES = (type, ClassType)
else:
    CLASS_TYPES = (type,)

GENERIC_ERRORS = (ValueError, KeyError, TypeError, AttributeError)


[docs]class PropertyMetaclass(type): """Metaclass to establish behavior of **HasProperties** classes On class construction: * Build Property dictionary from the class dictionary and the base classes' Properties. * Build listener dictionaries from class dictionary and the base classes' listeners. * Check Property names are not private. * Ensure the Property names referred to by :ref:`Renamed Properties <renamed>` and handlers are valid. * Build class docstring. * Construct default value dictionary, and check that any provided defaults are valid. * Add the class to the **HasProperties** :code:`_REGISTRY` or the closest parent class with a new registry defined On class instantiation: * Initialize private backend dictionary where Property values are stored. * Initialize private listener dictionary and set the listeners on the class instance. * Set all the default values on the class without firing change notifications. """ def __new__(mcs, name, bases, classdict): #pylint: disable=too-many-locals, too-many-branches, too-many-statements # Grab all the properties, observers, and validators prop_dict = { key: value for key, value in classdict.items() if isinstance(value, basic.GettableProperty) } observer_dict = { key: value for key, value in classdict.items() if isinstance(value, handlers.Observer) } validator_dict = { key: value for key, value in classdict.items() if isinstance(value, handlers.ClassValidator) } # Get pointers to all inherited properties, observers, and validators _props = dict() _prop_observers = OrderedDict() _class_validators = OrderedDict() for base in reversed(bases): if not all((hasattr(base, '_props'), hasattr(base, '_prop_observers'), hasattr(base, '_class_validators'))): continue for key, val in iteritems(base._props): if key not in prop_dict and key in classdict: continue _props.update({key: val}) for key, val in iteritems(base._prop_observers): if key not in observer_dict and key in classdict: continue _prop_observers.update({key: val}) for key, val in iteritems(base._class_validators): if key not in validator_dict and key in classdict: continue _class_validators.update({key: val}) # Overwrite with this class's properties _props.update(prop_dict) _prop_observers.update(observer_dict) _class_validators.update(validator_dict) # Save these to the class classdict['_props'] = _props classdict['_prop_observers'] = _prop_observers classdict['_class_validators'] = _class_validators # Ensure prop names are valid and overwrite properties with @property for key, prop in iteritems(prop_dict): if isinstance(prop, basic.Renamed) and prop.new_name not in _props: raise TypeError('Invalid new name for renamed property: ' '{}'.format(prop.new_name)) prop.name = key classdict[key] = prop.get_property() # Ensure observed names are valid for key, handler in iteritems(observer_dict): if handler.names is utils.everything: continue for prop in handler.names: if prop in _props and isinstance(_props[prop], basic.Property): continue raise TypeError('Observed name must be a mutable ' 'property: {}'.format(prop)) # Overwrite observers and validators with their function observer_dict.update(validator_dict) for key, handler in iteritems(observer_dict): classdict[key] = handler.func handler.func = key # Determine if private properties should be documented or just public _doc_private = False for base in reversed(bases): _doc_private = getattr(base, '_doc_private', _doc_private) _doc_private = classdict.get('_doc_private', _doc_private) if not isinstance(_doc_private, bool): raise AttributeError('_doc_private must be a boolean') if _doc_private: documented_props = sorted(_props) else: documented_props = sorted(p for p in _props if p[0] != '_') # Order the properties for the docs (default is alphabetical) _doc_order = None for base in reversed(bases): _doc_order = getattr(base, '_doc_order', _doc_order) if ( not isinstance(_doc_order, (list, tuple)) or sorted(list(_doc_order)) != documented_props ): _doc_order = None _doc_order = classdict.get('_doc_order', _doc_order) if _doc_order is None: _doc_order = documented_props elif not isinstance(_doc_order, (list, tuple)): raise AttributeError( '_doc_order must be a list of property names' ) elif sorted(list(_doc_order)) != documented_props: raise AttributeError( '_doc_order must be unspecified or contain ALL property names' ) # Sort props into required, optional, and immutable doc_str = classdict.get('__doc__', '') req = [key for key in _doc_order if key[0] != '_' and getattr(_props[key], 'required', False)] opt = [key for key in _doc_order if key[0] != '_' and not getattr(_props[key], 'required', True)] imm = [key for key in _doc_order if key[0] != '_' and not hasattr(_props[key], 'required')] priv = [key for key in _doc_order if key[0] == '_'] # Build the documentation based on above sorting if req: doc_str += '\n\n**Required Properties:**\n\n' + '\n'.join( ('* ' + _props[key].sphinx() for key in req) ) if opt: doc_str += '\n\n**Optional Properties:**\n\n' + '\n'.join( ('* ' + _props[key].sphinx() for key in opt) ) if imm: doc_str += '\n\n**Other Properties:**\n\n' + '\n'.join( ('* ' + _props[key].sphinx() for key in imm) ) if priv: doc_str += '\n\n**Private Properties:**\n\n' + '\n'.join( ('* ' + _props[key].sphinx() for key in priv) ) classdict['__doc__'] = doc_str # Create the new class newcls = super(PropertyMetaclass, mcs).__new__( mcs, name, bases, classdict ) # Update the class defaults to include inherited values _defaults = dict() for parent in reversed(newcls.__mro__): _defaults.update(getattr(parent, '_defaults', dict())) # Ensure defaults are valid and add them to the class for key, value in iteritems(_defaults): if key not in newcls._props: raise AttributeError( "Default input '{}' is not a known property".format(key) ) try: if callable(value): value = value() if value is utils.undefined: continue newcls._props[key].validate(None, value) except ValueError: raise AttributeError( "Invalid default for property '{}'".format(key) ) newcls._defaults = _defaults # Save the class in the registry newcls._REGISTRY[name] = newcls return newcls def __call__(cls, *args, **kwargs): """Here additional instance setup happens here before init is called This allows subclasses of :code:`HasProperties` to override the init method without worrying about breaking setup. """ obj = cls.__new__(cls, *args, **kwargs) object.__setattr__(obj, '_backend', dict()) object.__setattr__(obj, '_listeners', dict()) # Register the listeners for val in itervalues(obj._prop_observers): handlers._set_listener(obj, val) # Set the GettableProperties from defaults - these are only set here for key, prop in iteritems(obj._props): if not isinstance(prop, basic.Property): if key in obj._defaults: val = obj._defaults[key] else: val = prop.default if val is utils.undefined: continue if callable(val): val = val() obj._backend[key] = prop.validate(obj, val) # Set the other defaults without triggering change notifications with handlers.listeners_disabled(): obj._reset() obj.__init__(*args, **kwargs) return obj
[docs]class HasProperties(with_metaclass(PropertyMetaclass, object)): """Base class to enable :ref:`property` behavior Classes that inherit **HasProperties** need simply to declare the Properties they need. **HasProperties** will save these Properties as :code:`_props` on the class. Property values will be saved to :code:`_backend` on the instance. **HasProperties** classes also store a registry of all **HasProperties** classes in as :code:`_REGISTRY`. If a subclass re-declares :code:`_REGISTRY`, the subsequent subclasses will be saved to this new registry. The :class:`PropertyMetaclass <properties.base.PropertyMetaclass>` contains more information about what goes into **HasProperties** class construction and validation. """ _defaults = dict() _REGISTRY = dict() def __init__(self, **kwargs): # Set the keyword arguments with change notifications self._getting_validated = True self._validation_error_tuples = [] self._non_validation_error = None try: for key, val in iteritems(kwargs): prop = self._props.get(key, None) if not prop and not hasattr(self, key): raise AttributeError( "Keyword input '{}' is not a known property or " "attribute of {}".format(key, self.__class__.__name__) ) if isinstance(prop, basic.DynamicProperty): raise AttributeError( "Dynamic property '{}' of {} cannot be set on " "init".format(key, self.__class__.__name__) ) try: setattr(self, key, val) except utils.ValidationError as val_err: self._validation_error_tuples += val_err.error_tuples except GENERIC_ERRORS as err: if not self._non_validation_error: self._non_validation_error = err continue if self._validation_error_tuples: self._error_hook(self._validation_error_tuples) msgs = ['Initialization failed:'] msgs += [val.message for val in self._validation_error_tuples] raise utils.ValidationError( message='\n- '.join(msgs), _error_tuples=self._validation_error_tuples, ) elif self._non_validation_error: raise self._non_validation_error #pylint: disable=raising-bad-type finally: self._getting_validated = False self._validation_error_tuples = None self._non_validation_error = None def _get(self, name): return self._backend.get(name, None) def _notify(self, change): listeners = handlers._get_listeners(self, change) for listener in listeners: if isinstance(listener.func, string_types): getattr(self, listener.func)(change) else: listener.func(self, change) def _set(self, name, value): prev = self._backend.get(name, utils.undefined) change = dict(name=name, previous=prev, value=value, mode='validate') self._notify(change) if change['value'] is utils.undefined: self._backend.pop(name, None) else: self._backend[name] = change['value'] if prev is utils.undefined and change['value'] is utils.undefined: pass elif( prev is utils.undefined or change['value'] is utils.undefined or not self._props[name].equal(prev, change['value']) ): change.update(name=name, previous=prev, mode='observe_change') self._notify(change) change.update(name=name, previous=prev, mode='observe_set') self._notify(change) def _reset(self, name=None): """Revert specified property to default value If no property is specified, all properties are returned to default. """ if name is None: for key in self._props: if isinstance(self._props[key], basic.Property): self._reset(key) return if name not in self._props: raise AttributeError("Input name '{}' is not a known " "property or attribute".format(name)) if not isinstance(self._props[name], basic.Property): raise AttributeError("Cannot reset GettableProperty " "'{}'".format(name)) if name in self._defaults: val = self._defaults[name] else: val = self._props[name].default if callable(val): val = val() setattr(self, name, val)
[docs] def validate(self): """Call all registered class validator methods These are all methods decorated with :code:`@properties.validator`. Validator methods are expected to raise a ValidationError if they fail. """ if getattr(self, '_getting_validated', False): return True self._getting_validated = True self._validation_error_tuples = [] self._non_validation_error = None try: for val in itervalues(self._class_validators): try: if isinstance(val.func, string_types): valid = getattr(self, val.func)() else: valid = val.func(self) if valid is False: raise utils.ValidationError( 'Validation failed', None, None, self ) except utils.ValidationError as val_err: self._validation_error_tuples += val_err.error_tuples except GENERIC_ERRORS as err: if not self._non_validation_error: self._non_validation_error = err if self._validation_error_tuples: self._error_hook(self._validation_error_tuples) msgs = ['Validation failed:'] msgs += [val.message for val in self._validation_error_tuples] raise utils.ValidationError( message='\n- '.join(msgs), _error_tuples=self._validation_error_tuples, ) elif self._non_validation_error: raise self._non_validation_error #pylint: disable=raising-bad-type return True finally: self._getting_validated = False self._validation_error_tuples = None self._non_validation_error = None
@handlers.validator def _validate_props(self): """Assert that all the properties are valid on validate()""" for key, prop in iteritems(self._props): try: value = self._get(key) err_msg = 'Invalid value for property {}: {}'.format(key, value) if value is not None: change = dict(name=key, previous=value, value=value, mode='validate') self._notify(change) if not prop.equal(value, change['value']): raise utils.ValidationError(err_msg, 'invalid', prop.name, self) if not prop.assert_valid(self): raise utils.ValidationError(err_msg, 'invalid', prop.name, self) except utils.ValidationError as val_err: if getattr(self, '_validation_error_tuples', None) is not None: self._validation_error_tuples += val_err.error_tuples else: raise return True def _error_hook(self, error_tuples): """Method called when property validation fails This allows HasProperties classes to customize how the validation error is handled. """
[docs] def serialize(self, include_class=True, save_dynamic=False, **kwargs): """Serializes a **HasProperties** instance to dictionary This uses the Property serializers to serialize all Property values to a JSON-compatible dictionary. Properties that are undefined are not included. If the **HasProperties** instance contains a reference to itself, a :code:`properties.SelfReferenceError` will be raised. **Parameters**: * **include_class** - If True (the default), the name of the class will also be saved to the serialized dictionary under key :code:`'__class__'` * **save_dynamic** - If True, dynamic properties are written to the serialized dict (default: False). * Any other keyword arguments will be passed through to the Property serializers. """ if getattr(self, '_getting_serialized', False): raise utils.SelfReferenceError('Object contains unserializable ' 'self reference') self._getting_serialized = True try: kwargs.update({ 'include_class': include_class, 'save_dynamic': save_dynamic }) if save_dynamic: prop_source = self._props else: prop_source = self._backend data = ( (key, self._props[key].serialize(getattr(self, key), **kwargs)) for key in prop_source ) json_dict = {k: v for k, v in data if v is not None} if include_class: json_dict.update({'__class__': self.__class__.__name__}) return json_dict finally: self._getting_serialized = False
[docs] @classmethod def deserialize(cls, value, trusted=False, strict=False, #pylint: disable=too-many-locals assert_valid=False, **kwargs): """Creates **HasProperties** instance from serialized dictionary This uses the Property deserializers to deserialize all JSON-compatible dictionary values into their corresponding Property values on a new instance of a **HasProperties** class. Extra keys in the dictionary that do not correspond to Properties will be ignored. **Parameters**: * **value** - Dictionary to deserialize new instance from. * **trusted** - If True (and if the input dictionary has :code:`'__class__'` keyword and this class is in the registry), the new **HasProperties** class will come from the dictionary. If False (the default), only the **HasProperties** class this method is called on will be constructed. * **strict** - Requires :code:`'__class__'`, if present on the input dictionary, to match the deserialized instance's class. Also disallows unused properties in the input dictionary. Default is False. * **assert_valid** - Require deserialized instance to be valid. Default is False. * Any other keyword arguments will be passed through to the Property deserializers. """ if not isinstance(value, dict): raise ValueError( 'HasProperties class {} must deserialize from dictionary, ' 'not input of type {}'.format( cls.__name__, value.__class__.__name__ ) ) output_cls = cls._deserialize_class( input_cls_name=value.get('__class__'), trusted=trusted, strict=strict, ) instance = kwargs.pop('_instance', None) if instance is not None and not isinstance(instance, output_cls): raise ValueError( 'Input _instance must be of class {}, not {}'.format( output_cls.__name__, instance.__class__.__name__ ) ) state, unused = utils.filter_props(output_cls, value, True) unused.pop('__class__', None) if unused and strict: raise ValueError( 'Unused properties during deserialization: {}'.format( ', '.join(unused) ) ) kwargs.update({'trusted': trusted, 'strict': strict}) newstate = {} for key, val in iteritems(state): newstate[key] = output_cls._props[key].deserialize(val, **kwargs) mutable, immutable = utils.filter_props(output_cls, newstate, False) with handlers.listeners_disabled(): if instance is None: instance = output_cls(**mutable) else: for key, val in iteritems(mutable): setattr(instance, key, val) for key, val in iteritems(immutable): valid_val = output_cls._props[key].validate(instance, val) instance._backend[key] = valid_val if assert_valid and not instance.validate(): raise utils.ValidationError('Deserialized instance is not valid') return instance
@classmethod def _deserialize_class(cls, input_cls_name, trusted, strict): """Returns the HasProperties class to use for deserialization""" if not input_cls_name or input_cls_name == cls.__name__: return cls if trusted and input_cls_name in cls._REGISTRY: return cls._REGISTRY[input_cls_name] if strict: raise ValueError( 'Class name {} from deserialization input dictionary does ' 'not match input class {}'.format(input_cls_name, cls.__name__) ) return cls def equal(self, other): """Determine if two **HasProperties** instances are equivalent Equivalence is determined by checking if all Property values on two instances are equal, using :code:`Property.equal`. """ warn('HasProperties.equal has been deprecated in favor of ' 'properties.equal and will be removed in the next release', FutureWarning) return equal(self, other)
[docs]def equal(value_a, value_b): """Determine if two **HasProperties** instances are equivalent Equivalence is determined by checking if (1) the two instances are the same class and (2) all Property values on two instances are equal, using :code:`Property.equal`. If the two values are the same HasProperties instance (eg. :code:`value_a is value_b`) this method returns True. Finally, if either value is not a HasProperties instance, equality is simply checked with ==. .. note:: HasProperties objects with recursive self-references will not evaluate to equal, even if their property values and structure are equivalent. """ if ( not isinstance(value_a, HasProperties) or not isinstance(value_b, HasProperties) ): return value_a == value_b if getattr(value_a, '_testing_equality', False): return False value_a._testing_equality = True #pylint: disable=protected-access try: if value_a is value_b: return True if value_a.__class__ is not value_b.__class__: return False for prop in itervalues(value_a._props): prop_a = getattr(value_a, prop.name) prop_b = getattr(value_b, prop.name) if prop_a is None and prop_b is None: continue if ( prop_a is not None and prop_b is not None and prop.equal(prop_a, prop_b) ): continue return False return True finally: value_a._testing_equality = False #pylint: disable=protected-access
[docs]def copy(value, **kwargs): """Return a copy of a **HasProperties** instance A copy is produced by serializing the HasProperties instance then deserializing it to a new instance. Therefore, if any properties cannot be serialized/deserialized, :code:`copy` will fail. Any keyword arguments will be passed through to both :code:`serialize` and :code:`deserialize`. """ if not isinstance(value, HasProperties): raise ValueError('properties.copy may only be used to copy' 'HasProperties instances') kwargs.update({'include_class': kwargs.get('include_class', True)}) kwargs.update({'trusted': kwargs.get('trusted', True)}) return value.__class__.deserialize(value.serialize(**kwargs), **kwargs)