Python sqlalchemy.ext.declarative 模块,DeclarativeMeta() 实例源码
我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用sqlalchemy.ext.declarative.DeclarativeMeta()。
def default(self, obj):
fields = {}
if isinstance(obj.__class__, DeclarativeMeta):
# for sqlalchemy orm
for key, value in obj.__dict__.items():
if key.startswith('_'):
continue
if(isinstance(value, list)):
inner_jsonObj = []
for _row in value:
inner_jsonObj.append(
self.default(_row))
fields[key] = inner_jsonObj
else:
fields[key] = self.type_convert(value)
else:
# for raw sql
for field in obj.keys():
fields[field] = self.type_convert(
str(getattr(obj, field)))
return fields
def decode(obj):
if obj and isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and not x.endswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
if isinstance(data, datetime.datetime):
fields[field] = data.timestamp()
elif isinstance(data, datetime.date):
fields[field] = data.isoformat()
elif isinstance(data, datetime.timedelta):
fields[field] = (datetime.datetime.min + data).time().isoformat()
elif isinstance(data, int) or isinstance(data, float) or isinstance(data, str):
fields[field] = data
elif isinstance(data, enum.Enum):
fields[field] = data.value
elif isinstance(data.__class__, DeclarativeMeta):
fields[field] = AlchemyEncoder.decode(data)
elif isinstance(data, list):
fields[field] = [AlchemyEncoder.decode(d) for d in data]
return fields
else:
return obj
def _sqlalchemy_model():
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import sessionmaker
return [sessionmaker, DeclarativeMeta]
def test_declarative_base():
"""Test declarative_base()."""
class MetaClass(DeclarativeMeta):
pass
metadata = MetaData()
Model = declarative_base(metadata=metadata, metaclass=MetaClass)
assert Model.__bases__[0] is ModelBase
assert Model.metadata is metadata
assert Model.metaclass is MetaClass
def build_history_class(
cls: declarative.DeclarativeMeta,
prop: T_PROPS,
schema: str = None) -> nine.Type[TemporalProperty]:
"""build a sqlalchemy model for given prop"""
class_name = "%s%s_%s" % (cls.__name__, 'History', prop.key)
table = build_history_table(cls, prop, schema)
base_classes = (
TemporalProperty,
declarative.declarative_base(metadata=table.metadata),
)
class_attrs = {
'__table__': table,
'entity': orm.relationship(
lambda: cls,
backref=orm.backref('%s_history' % prop.key, lazy='dynamic')
),
}
if isinstance(prop, orm.RelationshipProperty):
class_attrs[prop.key] = orm.relationship(
prop.argument,
lazy='noload')
model = type(class_name, base_classes, class_attrs)
return model
def compile_zdb_score(element, compiler, **kw):
clauses = list(element.clauses)
if len(clauses) != 1:
raise ValueError("Incorrect params")
c = clauses[0]
if isinstance(c, BindParameter) and isinstance(c.value, DeclarativeMeta):
return "zdb_score(\'%s\', %s.ctid)" % (c.value.__tablename__, c.value.__tablename__)
raise ValueError("Incorrect param")
def destroy(session, data, model_class=None, synchronize_session=False):
"""Delete bulk `data`.
The `data` argument can be any of the following:
- Single instance of `model_class`
- List of `model_class` instances
- Primary key value (single value or ``tuple`` of values for composite
keys)
- List of primary key values.
- Dict containing primary key(s) mapping
- List of dicts with primary key(s) mappings
If a non-`model_class` instances are passed in, then `model_class` is
required to know which table to delete from.
Args:
session (Session): SQLAlchemy session object.
data (mixed): Data to delete from database.
synchronize_session (bool|str): Argument passed to
``Query.delete``.
Returns:
int: Number of deleted records.
"""
if not isinstance(data, list):
data = [data]
valid_model_class = isinstance(model_class, DeclarativeMeta)
mapped_data = defaultdict(list)
for idx, item in enumerate(data):
item_class = type(item)
if not isinstance(item_class, DeclarativeMeta) and valid_model_class:
class_ = model_class
else:
class_ = item_class
if not isinstance(class_, DeclarativeMeta):
raise TypeError('Type of value given to destory() function is not '
'a valid SQLALchemy declarative class and/or '
'model class argument is not valid. '
'Item with index {0} and with value "{1}" is '
'an instance of "{2}" and model class is {3}.'
.format(idx, item, type(item), model_class))
mapped_data[class_].append(item)
delete_count = 0
with transaction(session):
for model_class, data in iteritems(mapped_data):
count = (session.query(model_class)
.filter(primary_key_filter(data, model_class))
.options(orm.lazyload('*'))
.delete(synchronize_session=synchronize_session))
delete_count += count
return delete_count
def build_history_table(
cls: declarative.DeclarativeMeta,
prop: T_PROPS,
schema: str = None) -> sa.Table:
"""build a sql alchemy table for given prop"""
if isinstance(prop, orm.RelationshipProperty):
columns = [util.copy_column(column) for column in prop.local_columns]
else:
columns = [util.copy_column(column) for column in prop.columns]
local_table = cls.__table__
table_name = util.truncate_identifier(
_generate_history_table_name(local_table, columns)
)
# Build the foreign key(s), specifically adding an index since we may use
# a casted foreign key in our constraints. See _exclusion_in_uuid
entity_foreign_keys = list(util.foreign_key_to(local_table, index=True))
entity_constraints = [
_exclusion_in(fk.type, fk.key)
for fk in entity_foreign_keys
]
constraints = [
sa.Index(
util.truncate_identifier('%s_effective_idx' % table_name),
'effective',
postgresql_using='gist'
),
sap.ExcludeConstraint(
*itertools.chain(entity_constraints, [('vclock', '&&')]),
name=util.truncate_identifier('%s_excl_vclock' % table_name)
),
sap.ExcludeConstraint(
*itertools.chain(entity_constraints, [('effective', '&&')]),
name=util.truncate_identifier('%s_excl_effective' % table_name)
),
]
return sa.Table(
table_name,
local_table.metadata,
sa.Column('id',
sap.UUID(as_uuid=True),
default=uuid.uuid4,
primary_key=True),
sa.Column('effective',
sap.TSTZRANGE,
default=util.effective_now,
nullable=False),
sa.Column('vclock', sap.INT4RANGE, nullable=False),
*itertools.chain(entity_foreign_keys, columns, constraints),
schema=schema or local_table.schema,
keep_existing=True
) # memoization ftw
def compile_zdb_query(element, compiler, **kw):
query = []
tables = set()
format_args = []
limit = ""
for i, c in enumerate(element.clauses):
add_to_query = True
if isinstance(c, BinaryExpression):
tables.add(c.left.table.name)
elif isinstance(c, BindParameter):
if isinstance(c.value, str):
pass
elif isinstance(c.value, DeclarativeMeta):
if i > 0:
raise ValueError("Table can be specified only as first param")
tables.add(c.value.__tablename__)
add_to_query = False
elif isinstance(c, BooleanClauseList):
pass
elif isinstance(c, Column):
pass
else:
raise ValueError("Unsupported filter")
if add_to_query:
query.append(compile_clause(c, compiler, tables, format_args))
if not tables:
raise ValueError("No filters passed")
elif len(tables) > 1:
raise ValueError("Different tables passed")
else:
table = tables.pop()
if hasattr(element, "_zdb_order_by") and isinstance(element._zdb_order_by, (UnaryExpression, ZdbScore)):
limit = compile_limit(order_by=element._zdb_order_by,
offset=element._zdb_offset,
limit=element._zdb_limit)
sql = "zdb(\'%s\', ctid) ==> " % table
if format_args and isinstance(format_args, list):
sql += "\'%sformat(\'%s\', %s)\'" % (
limit,
" and ".join(query),
", ".join(format_args)
)
else:
sql += "\'%s%s\'" % (
limit,
" and ".join(query))
return sql