#!/usr/bin/python
# -*- encoding: utf-8 -*-
"""
"""
from collections import namedtuple
import inspect
import logging
from sqlalchemy import inspect as sqinspect
from sqlalchemy.exc import NoInspectionAvailable
from sqlalchemy.ext.associationproxy import AssociationProxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import ColumnProperty, Query
from sqlalchemy.orm.attributes import QueryableAttribute
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.interfaces import MapperProperty
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.sql.operators import is_ordering_modifier
from sqlalchemy.util import memoized_property
__author__ = 'Martin Martimeo <martin@martimeo.de>'
__date__ = '27.04.13 - 00:14'
def _filter(instance, condition) -> dict:
"""
Filter properties of instace based on condition
:param instance:
:param condition:
:rtype: dict
"""
# Use iterate_properties when available
if hasattr(instance, 'iterate_properties'):
return {field.key: field for field in instance.iterate_properties
if condition(field)}
# Try sqlalchemy inspection
try:
return {field.key: field for key, field in sqinspect(instance).all_orm_descriptors.items()
if condition(field)}
# Use Inspect
except NoInspectionAvailable:
return {field.key: field for key, field in inspect.getmembers(instance)
if condition(field)}
def _is_ordering_expression(expression):
"""
Test an expression whether it is an ordering clause
"""
if hasattr(expression, 'operator') and is_ordering_modifier(expression.operator):
return True
if hasattr(expression, 'modifier') and is_ordering_modifier(expression.modifier):
return True
return False
[docs]class ModelWrapper(object):
"""
Wrapper around sqlalchemy model for having some easier functions
"""
def __init__(self, model):
self.model = model
@property
def __name__(self):
return self.model.__name__
@property
def __tablename__(self):
return self.model.__tablename__
@property
def __collectionname__(self):
try:
return self.model.__collectionname__
except AttributeError:
logging.warning("Missing collection name for %s using tablename" % self.model.__name__)
return self.model.__tablename__
@staticmethod
def get_primary_keys(instance) -> dict:
"""
Returns the primary keys
Inspired by flask-restless.helpers.primary_key_names
"""
return _filter(instance, lambda field: isinstance(field, ColumnProperty) and field.primary_key or (
isinstance(field, QueryableAttribute) and isinstance(field.property, ColumnProperty) and
hasattr(field.property.columns[0], 'primary_key') and field.property.columns[0].primary_key))
@memoized_property
[docs] def primary_keys(self):
"""
@see get_primary_keys
"""
return self.get_primary_keys(self.model)
primary_keys.__doc__ = get_primary_keys.__func__.__doc__
@staticmethod
def get_unique_keys(instance) -> dict:
"""
Returns the primary keys
Inspired by flask-restless.helpers.primary_key_names
"""
return _filter(instance, lambda field: isinstance(field, ColumnProperty) and field.unique or (
isinstance(field, QueryableAttribute) and isinstance(field.property, ColumnProperty) and
hasattr(field.property.columns[0], 'unique') and field.property.columns[0].unique))
@memoized_property
[docs] def unique_keys(self):
"""
@see get_primary_keys
"""
return self.get_unique_keys(self.model)
unique_keys.__doc__ = get_unique_keys.__func__.__doc__
@staticmethod
def get_foreign_keys(instance) -> list:
"""
Returns the foreign keys
Inspired by flask-restless.helpers.primary_key_names
"""
return {field.key: field for key, field in inspect.getmembers(instance)
if isinstance(field, QueryableAttribute)
and isinstance(field.property, ColumnProperty)
and field.foreign_keys}
@memoized_property
[docs] def foreign_keys(self):
"""
@see get_foreign_keys
"""
return self.get_foreign_keys(self.model)
foreign_keys.__doc__ = get_foreign_keys.__func__.__doc__
@staticmethod
def get_columns(instance) -> dict:
"""
Returns the columns objects of the model
"""
return _filter(instance, lambda field: isinstance(field, ColumnProperty) or (
isinstance(field, QueryableAttribute) and isinstance(field.property, ColumnProperty)))
@memoized_property
[docs] def columns(self):
"""
@see get_columns
"""
return self.get_columns(self.model)
columns.__doc__ = get_columns.__func__.__doc__
@staticmethod
def get_attributes(instance) -> dict:
"""
Returns the attributes of the model
"""
return _filter(instance,
lambda field: isinstance(field, MapperProperty) or isinstance(field, QueryableAttribute))
@memoized_property
[docs] def attributes(self):
"""
@see get_attributes
"""
return self.get_attributes(self.model)
attributes.__doc__ = get_attributes.__func__.__doc__
@staticmethod
def get_relations(instance) -> dict:
"""
Returns the relations objects of the model
"""
return _filter(instance, lambda field: isinstance(field, RelationshipProperty) or (
isinstance(field, QueryableAttribute) and isinstance(field.property, RelationshipProperty)))
@memoized_property
[docs] def relations(self):
"""
@see get_relations
"""
return self.get_relations(self.model)
relations.__doc__ = get_relations.__func__.__doc__
@staticmethod
def get_hybrids(instance) -> list:
"""
Returns the relations objects of the model
"""
Proxy = namedtuple('Proxy', ['key', 'field'])
if hasattr(instance, 'iterate_properties'):
return [Proxy(key, field) for key, field in sqinspect(instance).all_orm_descriptors.items()
if isinstance(field, hybrid_property)]
else:
return [Proxy(key, field) for key, field in inspect.getmembers(instance)
if isinstance(field, hybrid_property)]
@memoized_property
[docs] def hybrids(self) -> list:
"""
@see get_hybrids
"""
return self.get_hybrids(self.model)
hybrids.__doc__ = get_hybrids.__func__.__doc__
@staticmethod
def get_proxies(instance) -> list:
"""
Returns the proxies objects of the model
Inspired by https://groups.google.com/forum/?fromgroups=#!topic/sqlalchemy/aDi_M4iH7d0
"""
Proxy = namedtuple('Proxy', ['key', 'field'])
if hasattr(instance, 'iterate_properties'):
return [Proxy(key, field) for key, field in sqinspect(instance).all_orm_descriptors.items()
if isinstance(field, AssociationProxy)]
else:
return [Proxy(key, field) for key, field in inspect.getmembers(instance)
if isinstance(field, AssociationProxy)]
@memoized_property
[docs] def proxies(self):
"""
@see get_proxies
"""
return self.get_proxies(self.model)
proxies.__doc__ = get_proxies.__func__.__doc__
class SessionedModelWrapper(ModelWrapper):
"""
Wrapper around sqlalchemy model for having some easier functions
"""
def __init__(self, model, session):
super().__init__(model)
self.session = session
@staticmethod
def _apply_kwargs(instance: Query, **kwargs) -> Query:
for expression in kwargs.pop('filters', []):
if _is_ordering_expression(expression):
instance = instance.order_by(expression)
else:
instance = instance.filter(expression)
if 'offset' in kwargs:
offset = kwargs.pop('offset')
foffset = lambda instance: instance.offset(offset)
else:
foffset = lambda instance: instance
if 'limit' in kwargs:
limit = kwargs.pop('limit')
flimit = lambda instance: instance.limit(limit)
else:
flimit = lambda instance: instance
instance = instance.filter_by(**kwargs)
instance = foffset(instance)
instance = flimit(instance)
return instance
def one(self, filters: list=(), **kwargs) -> object:
"""
Gets one instance of the model filtered by filters
:param filters: Filters and OrderBy Clauses
:param kwargs: Additional filters passed to filter_by
:keyword offset: Offset for request
"""
if isinstance(self, SessionedModelWrapper):
instance = self.session.query(self.model)
else:
instance = self
return SessionedModelWrapper._apply_kwargs(instance, filters=filters, **kwargs).one()
def all(self, filters: list=(), **kwargs) -> list:
"""
Gets all instances of the query instance
:param filters: Filters and OrderBy Clauses
:param kwargs: Additional filters passed to filter_by
:keyword limit: Limit for request
:keyword offset: Offset for request
"""
if isinstance(self, SessionedModelWrapper):
instance = self.session.query(self.model)
else:
instance = self
return SessionedModelWrapper._apply_kwargs(instance, filters=filters, **kwargs).all()
def update(self, values: dict, filters: list=(), **kwargs) -> int:
"""
Updates all instances of the model filtered by filters
:param values: Dictionary of values
:param filters: Filters and OrderBy Clauses
:param kwargs: Additional filters passed to filter_by
:keyword limit: Limit for request
:keyword offset: Offset for request
"""
if isinstance(self, SessionedModelWrapper):
instance = self.session.query(self.model)
else:
instance = self
return SessionedModelWrapper._apply_kwargs(instance, filters=filters, **kwargs).update(values)
def delete(self, filters: list=(), **kwargs) -> int:
"""
Delete all instances of the model filtered by filters
:param values: Dictionary of values
:param filters: Filters and OrderBy Clauses
:param kwargs: Additional filters passed to filter_by
:keyword limit: Limit for request
:keyword offset: Offset for request
"""
if isinstance(self, SessionedModelWrapper):
instance = self.session.query(self.model)
else:
instance = self
return SessionedModelWrapper._apply_kwargs(instance, filters=filters, **kwargs).delete()
def count(self, filters: list=(), **kwargs) -> int:
"""
Gets the instance count
:param filters: Filters and OrderBy Clauses
:param kwargs: Additional filters passed to filter_by
"""
if isinstance(self, SessionedModelWrapper):
instance = self.session.query(self.model)
else:
instance = self
return SessionedModelWrapper._apply_kwargs(instance, filters=filters, **kwargs).count()
def get(self, *pargs) -> object:
"""
Gets one instance of the model based on primary_keys
:param pargs: ident
:raise NoResultFound: If no element has been received
"""
if isinstance(self, SessionedModelWrapper):
instance = self.session.query(self.model)
else:
instance = self
if not isinstance(pargs, tuple):
rtn = instance.get(*pargs)
else:
rtn = instance.get(pargs)
if not rtn:
raise NoResultFound("No element recieved for %s(%s)" % (self.model.__collectionname__, pargs))
return rtn
def __call__(self, **kwargs):
instance = self.model()
for key, value in kwargs.items():
setattr(instance, key, value)
self.session.add(instance)
return instance