# copyright 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr -- mailto:contact@logilab.fr
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Mappers with "jsonschema.relation" regid."""
import abc
import iso8601
from six import (
add_metaclass,
text_type,
)
from logilab.common.decorators import cachedproperty
from yams import BadSchemaDefinition, ValidationError
from yams.constraints import StaticVocabularyConstraint
from cubicweb import (
Binary,
Unauthorized,
neg_role,
_,
)
from cubicweb.predicates import (
match_kwargs,
)
from cubicweb_jsonschema import (
CREATION_ROLE,
EDITION_ROLE,
VIEW_ROLE,
orm_rtype,
)
from .base import (
JSONSchemaMapper,
JSONSchemaDeserializer,
JSONSchemaSerializer,
add_descriptive_metadata,
add_links,
object_schema,
ProtectedDict,
)
from .predicates import (
_etype_from_context,
yams_component_target,
yams_final_rtype,
yams_match,
)
__all__ = [
'BaseRelationMapper',
'AttributeMapper',
'StringMapper',
'FloatMapper',
'IntMapper',
'BooleanMapper',
'PasswordMapper',
'DateMapper',
'DatetimeMapper',
'BytesMapper',
'InlinedRelationMapper',
'InlinedRelationItemMapper',
'RelationMapper',
'ETypeRelationItemMapper',
'EntityRelationItemMapper',
]
[docs]class BaseRelationMapper(JSONSchemaMapper):
"""Base abstract class to fill the gap between a yams relation and it's json
schema mapping.
They should be selected depending on the relation (`etype`, `rtype`, `role`
and optionaly `target_types`).
"""
__regid__ = 'jsonschema.relation'
__select__ = match_kwargs('rtype', 'role')
__abstract__ = True
@property
def etype(self):
"""The entity type bound to this mapper."""
return _etype_from_context(self.cw_extra_kwargs)
def __init__(self, _cw, **kwargs):
#: relation type name
self.rtype = kwargs.pop('rtype')
#: role of `etype` in relation
self.role = kwargs.pop('role')
#: possible target types of the relation (empty for attribute relations)
self.target_types = []
rschema = _cw.vreg.schema[self.rtype]
target_types = kwargs.pop('target_types', None)
for target_eschema in sorted(rschema.targets(role=self.role)):
if target_eschema.final:
continue
target_type = target_eschema.type
if target_types is not None and target_type not in target_types:
continue
self.target_types.append(target_type)
super(BaseRelationMapper, self).__init__(_cw, **kwargs)
def __repr__(self):
return ('<{0.__class__.__name__} etype={0.etype} rtype={0.rtype} '
'role={0.role} target_types={0.target_types}>'.format(self))
@cachedproperty
def description(self):
ntargets = len(self.target_types)
if ntargets > 1:
return None
elif ntargets == 1:
targettype = self.target_types[0]
else:
targettype = None
eschema = self._cw.vreg.schema[self.etype]
rdef = eschema.rdef(self.rtype, role=self.role,
targettype=targettype)
if rdef.description:
return self._cw._(rdef.description)
@cachedproperty
def title(self):
if self.role == 'object':
return self._cw._(self.rtype + '_object')
return self._cw._(self.rtype)
@cachedproperty
def orm_rtype(self):
return orm_rtype(self.rtype, self.role)
def links(self, schema_role=None, **kwargs):
"""Yield Link appobjects matching regid and selection context of this
mapper if schema_role is None.
"""
if schema_role is not None:
return
for link in super(BaseRelationMapper, self).links(
schema_role=schema_role, **kwargs):
yield link
[docs]@JSONSchemaDeserializer.register
@JSONSchemaSerializer.register
@add_metaclass(abc.ABCMeta)
class AttributeMapper(BaseRelationMapper):
"""Abstract base abstract class to map attribute relation.
Concrete class should implement the `json_type` property.
"""
__abstract__ = True
__select__ = yams_final_rtype() & match_kwargs('etype')
@abc.abstractproperty
def json_type(self):
"""JSON primitive type (e.g. "string", "number", etc.)"""
#: JSON Schema "format" keyword for semantic validation.
format = None
@property
def attr(self):
"""Relation definition for bound attribute."""
return self._cw.vreg.schema[self.etype].rdef(self.rtype)
def _constraint_mapper(self, cstr):
mapper = self._cw.vreg['mappers'].select_or_none(
'jsonschema.constraint', self._cw.vreg, self._cw._,
self.etype, self.rtype, cstr)
if mapper is not None:
return mapper
elif not isinstance(cstr, StaticVocabularyConstraint):
self.warning('ignored %s on %s', cstr.type(), self.attr)
return None
def required(self, schema_role):
"""Return True if mapped property is *required*."""
if schema_role in (CREATION_ROLE, EDITION_ROLE):
return self.attr.cardinality[0] == '1'
return False
@add_links
def schema_and_definitions(self, schema_role=None):
schema = {
'type': self.json_type,
}
if self.format is not None:
schema['format'] = self.format
if schema_role in (CREATION_ROLE, EDITION_ROLE):
vocabulary_constraint = next(
(cstr for cstr in self.attr.constraints
if isinstance(cstr, StaticVocabularyConstraint)), None)
if vocabulary_constraint:
# In presence of a vocabulary constraint, we wrap the field
# into a oneOf field with a single-value 'enum', ignoring
# other constraints.
oneof_items = []
for v in sorted(vocabulary_constraint.vocabulary()):
item_schema = schema.copy()
item_schema.update({
'enum': [v],
'title': self._cw._(v),
})
oneof_items.append(item_schema)
schema = {
'oneOf': oneof_items,
}
else:
for constraint in self.attr.constraints:
cstr_mapper = self._constraint_mapper(constraint)
if cstr_mapper is not None:
schema.update(cstr_mapper.json_schema(schema_role))
if self.attr.default is not None:
schema['default'] = self.attr.default
return add_descriptive_metadata(schema, self), None
def values(self, entity, instance):
"""Return a dictionary holding deserialized value for mapped attribute.
If mapped attribute is absent from `instance` and `entity` is not
None, the default value for attribute is returned as value of the
dictionnary.
"""
try:
value = instance.pop(self.rtype)
except KeyError:
if entity is None:
return {}
rschema = entity.e_schema.rdef(self.rtype, self.role)
value = rschema.default
else:
value = self._type(value)
if entity is not None and getattr(entity, self.orm_rtype) == value:
# Do not trigger update, if the value has not changed.
# This is useful for PUT requests for which all fields should
# be present even if they are not changed. By skipping
# unchanged value, we avoid possible security check that would
# be meaningless since the value is not changed.
return {}
return {self.orm_rtype: value}
[docs] @staticmethod
def _type(json_value):
"""Return properly typed value for use within a cubicweb's entity from
given JSON value.
Nothing to do by default.
"""
return json_value
def serialize(self, entity):
value = getattr(entity, self.orm_rtype)
if value is not None:
return self._value(value)
[docs] @staticmethod
def _value(value):
"""Return the serializable value from attribute `value`."""
return value
[docs]class StringMapper(AttributeMapper):
"""Attribute mapper for Yams' String type."""
__select__ = yams_match(target_types='String')
#:
json_type = 'string'
_type = text_type
[docs]class FloatMapper(AttributeMapper):
"""Attribute mapper for Yams' Float type."""
__select__ = yams_match(target_types='Float')
#:
json_type = 'number'
[docs]class IntMapper(AttributeMapper):
"""Attribute mapper for Yams' Int and BigInt types."""
__select__ = yams_match(target_types={'Int', 'BigInt'})
#:
json_type = 'integer'
[docs]class BooleanMapper(AttributeMapper):
"""Attribute mapper for Yams' Boolean type."""
__select__ = yams_match(target_types='Boolean')
#:
json_type = 'boolean'
[docs]class PasswordMapper(AttributeMapper):
"""Attribute mapper for Yams' Password type."""
__select__ = yams_match(target_types='Password')
#:
json_type = 'string'
#:
format = 'password'
def required(self, schema_role):
"""Possibly return True unless in *edition* role."""
if schema_role == EDITION_ROLE:
return False
return super(PasswordMapper, self).required(schema_role)
def values(self, entity, instance):
password_changed = self.orm_rtype in instance
values = super(PasswordMapper, self).values(entity, instance)
if entity is not None and not password_changed:
# We don't want the Password value to be reset if it has not
# changed.
del values[self.orm_rtype]
return values
def serialize(self, entity):
return None
[docs] @staticmethod
def _type(json_value):
"""Return an encoded string suitable for Password type."""
return json_value.encode('utf-8')
[docs]class DateMapper(StringMapper):
"""Attribute mapper for Yams' Date type."""
__select__ = yams_match(target_types=('Date'))
#:
format = 'date'
@staticmethod
def _type(value):
"""Return a datetime object parsed from ISO8601 `value` string."""
return iso8601.parse_date(value)
[docs]class DatetimeMapper(DateMapper):
"""Attribute mapper for Yams' (TZ)Datetime type."""
__select__ = yams_match(target_types=('Datetime', 'TZDatetime'))
#:
format = 'date-time'
[docs]class BytesMapper(StringMapper):
"""Attribute mapper for Yams' Bytes type."""
__select__ = yams_match(target_types='Bytes')
[docs] @staticmethod
def _type(value):
"""Return a Binary containing `value`."""
return Binary(value.encode('utf-8'))
[docs] @staticmethod
def _value(value):
"""Return a unicode string from Binary `value`."""
return value.getvalue().decode('utf-8')
class _RelationMapper(BaseRelationMapper):
"""Abstract class for true relation (as opposed to attribute) mapper.
"""
__abstract__ = True
__select__ = ~yams_final_rtype()
@add_links
def schema_and_definitions(self, schema_role=None):
item_mapper = self.select_mapper(
'jsonschema.item',
rtype=self.rtype, role=self.role, target_types=self.target_types,
**self.cw_extra_kwargs)
items_schema, defs = item_mapper.schema_and_definitions(schema_role)
schema = {
'type': 'array',
'items': items_schema,
}
if schema_role in (CREATION_ROLE, EDITION_ROLE):
cardinality = self._cardinality()
if cardinality in '+1':
schema['minItems'] = 1
if cardinality in '?1':
schema['maxItems'] = 1
return add_descriptive_metadata(schema, self), defs
def required(self, schema_role):
if schema_role in (CREATION_ROLE, EDITION_ROLE):
return self._cardinality() in '1+'
return False
def _cardinality(self):
"""Return role-cardinality if schema definition is consistent and
raise BadSchemaDefinition otherwise.
"""
rschema = self._cw.vreg.schema[self.rtype]
cardinality = None
for target_type in self.target_types:
rdef = rschema.role_rdef(self.etype, target_type, self.role)
card = rdef.role_cardinality(self.role)
if cardinality is None:
cardinality = card
elif card != cardinality:
raise BadSchemaDefinition(
'inconsistent {} cardinalities within {} relation '
'definitions'.format(self.role, self.rtype))
return cardinality
[docs]@JSONSchemaDeserializer.register
@JSONSchemaSerializer.register
class InlinedRelationMapper(_RelationMapper):
"""Map relation as 'inlined', i.e. the target of the relation is
created/edited along with its original entity.
"""
__select__ = (_RelationMapper.__select__
& yams_component_target()
& (match_kwargs('etype') | match_kwargs('entity')))
def values(self, entity, instance):
# Would require knownledge of the target type from "instance",
# but the generated JSON schema does not expose this yet.
assert len(self.target_types) == 1, \
'cannot handle multiple target types yet: {}'.format(
self.target_types)
target_type = self.target_types[0]
try:
values = instance.pop(self.rtype)
except KeyError:
if entity is None:
return {}
values = []
if not isinstance(values, list):
raise ValidationError(entity,
{self.rtype: _('value should be an array')})
if entity is not None:
# if entity already exists, delete entities related through
# this mapped relation
for linked_entity in getattr(entity, self.orm_rtype):
if linked_entity.cw_etype in self.target_types:
linked_entity.cw_delete()
target_mapper = self.select_mapper(
'jsonschema.entity', etype=target_type,
rtype=self.rtype, role=neg_role(self.role),
target_types={self.etype},
)
result = []
for subinstance in values:
subvalues = target_mapper.values(subinstance)
result.append(self._cw.create_entity(target_type, **subvalues))
return {self.orm_rtype: result}
def serialize(self, related_entities):
def serialize(entity):
mapper = self.select_mapper(
'jsonschema.entity', entity=entity,
rtype=self.rtype, role=neg_role(self.role),
target_types={self.etype},
)
return mapper.serialize()
return [serialize(related) for related in related_entities]
class InlinedRelationItemMapper(BaseRelationMapper):
"""Mapper for items of 'inlined' relation."""
__regid__ = 'jsonschema.item'
__select__ = InlinedRelationMapper.__select__
def schema_and_definitions(self, schema_role=None):
items, definitions = [], ProtectedDict()
for target_type in self.target_types:
mapper = self.select_mapper(
'jsonschema.entity', etype=target_type,
rtype=self.rtype, role=neg_role(self.role),
target_types={self.etype},
)
subschema, defs = mapper.schema_and_definitions(schema_role)
items.append({
'$ref': '#/definitions/{}'.format(target_type),
})
definitions[target_type] = subschema
if defs:
definitions.update(defs)
nitems = len(items)
if nitems == 0:
return None, None
elif nitems == 1:
return items[0], definitions
else:
schema = {
'oneOf': items,
}
return schema, definitions
[docs]@JSONSchemaDeserializer.register
@JSONSchemaSerializer.register
class RelationMapper(_RelationMapper):
"""Map relation as 'generic', i.e. the target of the relation may be
selected in preexisting possible targets.
"""
__select__ = (_RelationMapper.__select__
& ~yams_component_target()
& (match_kwargs('etype') | match_kwargs('entity')))
def values(self, entity, instance):
try:
values = instance.pop(self.rtype)
except KeyError:
return {}
if entity is not None:
entity.cw_set(**{self.orm_rtype: None})
try:
values = [int(x['id']) for x in values]
except (TypeError, ValueError):
msg = _('value should be an array of string-encoded integers')
raise ValidationError(entity, {self.rtype: msg})
return {self.orm_rtype: values}
def serialize(self, related_entities):
item_mapper = self.select_mapper(
'jsonschema.item',
rtype=self.rtype, role=self.role, **self.cw_extra_kwargs)
return [item_mapper.serialize(entity) for entity in related_entities]
class ETypeRelationItemMapper(BaseRelationMapper):
"""Map items of a 'generic' relation for an non-existant entity."""
__regid__ = 'jsonschema.item'
__select__ = (_RelationMapper.__select__
& ~yams_component_target()
& match_kwargs('etype', 'rtype', 'role'))
def relation_targets(self, schema_role):
entity = self._cw.vreg['etypes'].etype_class(self.etype)(self._cw)
potential_targets = []
for target_type in self.target_types:
try:
potential_targets.extend(entity.unrelated(
self.rtype, target_type, self.role).entities())
except Unauthorized:
continue
return potential_targets
@add_links
def schema_and_definitions(self, schema_role=None):
ids = [
{
'type': 'string',
'enum': [text_type(target.eid)],
'title': target.dc_title(),
}
for target in self.relation_targets(schema_role)
]
if not ids:
return False, None
properties = {
'id': {
'oneOf': ids,
},
}
required = []
if schema_role in (CREATION_ROLE, EDITION_ROLE):
required.append('id')
return object_schema(properties, required), None
# XXX copy of CollectionItemMapper's method
def links(self, schema_role=None, **kwargs):
kwargs['anchor'] = '#'
return super(ETypeRelationItemMapper, self).links(
schema_role=schema_role, **kwargs)
@JSONSchemaSerializer.register
class EntityRelationItemMapper(ETypeRelationItemMapper):
"""Map items of a 'generic' relation for an existing entity."""
__select__ = (_RelationMapper.__select__
& ~yams_component_target()
& match_kwargs('entity', 'rtype', 'role'))
@property
def entity(self):
return self.cw_extra_kwargs['entity']
def relation_targets(self, schema_role):
if schema_role == VIEW_ROLE:
return self.entity.related(
self.rtype, self.role,
targettypes=tuple(self.target_types)).entities()
if schema_role == CREATION_ROLE:
assert len(self.target_types) == 1, \
'cannot handle multiple target types in {} for {}'.format(
self, schema_role)
targettype = self.target_types[0]
return self.entity.unrelated(
self.rtype, targettype, role=self.role,
).entities()
return super(EntityRelationItemMapper, self).relation_targets(
schema_role)
def serialize(self, entity):
return {'id': str(entity.eid)}