Python sqlalchemy.orm 模块,class_mapper() 实例源码

我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用sqlalchemy.orm.class_mapper()

项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def is_class_sa_mapped(klass):
    """
    @rtype: C{bool}
    """
    if not isinstance(klass, type):
        klass = type(klass)

    for c in class_checkers:
        if c(klass):
            return False

    try:
        class_mapper(klass)
    except UnmappedInstanceError:
        return False

    return True
项目:flask-alcohol    作者:natfoster82    | 项目源码 | 文件源码
def _auto_update(self, mapper=None):
        cls = self.__class__
        if mapper is None:
            mapper = class_mapper(cls)
        for col in mapper.columns:
            # this order is the listed order except that hybrid properties go first
            api_info = cls._get_api_info(col.name)
            set_by = api_info['set_by']
            if set_by == 'json':
                if col.name in g.fields:
                    val = g.fields.get(col.name)
                    self._set_field_value(col.name, val)
            elif set_by == 'url':
                if col.name in request.view_args:
                    val = request.view_args.get(col.name)
                    self._set_field_value(col.name, val)
            elif set_by == 'server':
                # important not to let it try to set it from json
                # requires a @setter decorated function
                self._set_field_value(name=col.name, try_auto=False)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def __unicode__(self): # pragma: no cover
        repr = u'<%s' % self.__class__.__name__
        table = class_mapper(self.__class__).mapped_table
        for col in table.c:
            try:
                repr += u' %s=%s' % (col.name, getattr(self, col.name))
            except Exception, inst:
                repr += u' %s=%s' % (col.name, inst)

        repr += '>'
        return repr
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def getCustomProperties(self):
        self.mapper = class_mapper(self.klass)
        self.exclude_attrs.update(self.EXCLUDED_ATTRS)

        self.properties = []

        for prop in self.mapper.iterate_properties:
            self.properties.append(prop.key)

        self.encodable_properties.update(self.properties)
        self.decodable_properties.update(self.properties)

        self.exclude_sa_key = self.KEY_ATTR in self.exclude_attrs
        self.exclude_sa_lazy = self.LAZY_ATTR in self.exclude_attrs
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def as_dict(self):
        _dict = OrderedDict()
        table = orm.class_mapper(self.__class__).mapped_table
        for col in table.c:
            val = getattr(self, col.name)
            if isinstance(val, datetime.date):
                val = str(val)
            if isinstance(val, datetime.datetime):
                val = val.isoformat()
            _dict[col.name] = val
        return _dict
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def __unicode__(self):
        repr = u'<%s' % self.__class__.__name__
        table = orm.class_mapper(self.__class__).mapped_table
        for col in table.c:
            try:
                repr += u' %s=%s' % (col.name, getattr(self, col.name))
            except Exception, inst:
                repr += u' %s=%s' % (col.name, inst)

        repr += '>'
        return repr
项目:threatdetectionservice    作者:flyballlabs    | 项目源码 | 文件源码
def serialize(model):
    # get names of all columns in model
    columns = [c.key for c in class_mapper(model.__class__).columns]
    # return values in a dict
    return dict((c, getattr(model, c)) for c in columns)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _model_attrs(model):
    model_map = class_mapper(model)
    model_attrs = [x.key for x in model_map.column_attrs]
    if "_cidr" in model_attrs:
        model_attrs.append("cidr")
    if "_deallocated" in model_attrs:
        model_attrs.append("deallocated")
    return model_attrs
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:flask-boilerplate    作者:ItEngine    | 项目源码 | 文件源码
def serialize(self, model):
        """
        Transforms a model into a dictionary which can be dumped to JSON.
        """
        columns = [c.key for c in class_mapper(model.__class__).columns]
        return dict((c, getattr(model, c)) for c in columns)
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def set(self, state, dict_, initiator,
            passive=attributes.PASSIVE_OFF,
            check_old=None,
            pop=False):

        # Set us on the state.
        dict_[self.key] = initiator

        if initiator is None:
            # Nullify relationship args
            for id in self.parent_token.id:
                dict_[id.key] = None
            dict_[self.parent_token.discriminator.key] = None
        else:
            # Get the primary key of the initiator and ensure we
            # can support this assignment.
            class_ = type(initiator)
            mapper = class_mapper(class_)

            pk = mapper.identity_key_from_instance(initiator)[1]

            # Set the identifier and the discriminator.
            discriminator = six.text_type(class_.__name__)

            for index, id in enumerate(self.parent_token.id):
                dict_[id.key] = pk[index]
            dict_[self.parent_token.discriminator.key] = discriminator
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def fetch_mappers():
    # lets find all the mappers in our model
    mappers = []
    for attr in dir(_all):
        if attr[0] == '_': continue
        try:
            cls = getattr(_all, attr)
            mappers.append(class_mapper(cls))
        except Exception as ex:
            if isinstance(ex, sqlalchemy.exc.InvalidRequestError):
                if ex.message.startswith("One or more mappers failed to initialize"):
                    raise
            print "ignoring %s" % attr

    return mappers
项目:m    作者:comynli    | 项目源码 | 文件源码
def dictify(self, found=None, relationships=True, exclude=None):
        if found is None:
            found = set()
        if exclude is None:
            exclude = set()
        result = {}
        mapper = class_mapper(self.__class__)
        columns = [column.key for column in mapper.columns]
        for column in columns:
            if column in exclude:
                continue
            value = getattr(self, column)
            if isinstance(value, datetime):
                result[column] = value.isoformat()
            else:
                result[column] = value
        if relationships:
            for name, relation in mapper.relationships.items():
                if relation not in found:
                    found.add(relation)
                    related_obj = getattr(self, name)
                    prefix = '{}.'.format(name)
                    if related_obj is not None:
                        _exclude = {x.replace(prefix, '', 1) for x in exclude if x.startswith(prefix)}
                        if relation.uselist:
                            result[name] = [o.dictify(found=found, exclude=_exclude) for o in related_obj]
                        else:
                            result[name] = related_obj.dictify(found=found, exclude=_exclude)
        return result
项目:m    作者:comynli    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session)
        except UnmappedClassError:
            return None
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:ray    作者:felipevolpone    | 项目源码 | 文件源码
def columns(cls):
        return sorted([column.key for column in class_mapper(cls).columns])
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:ngx_status    作者:YoYoAdorkable    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def before_commit(self, session):
        session.flush()
        try:
            obj_cache = session._object_cache
            revision = session.revision
        except AttributeError:
            return
        if getattr(session, 'revisioning_disabled', False):
            return
        new = obj_cache['new']
        changed = obj_cache['changed']
        deleted = obj_cache['deleted']
        for obj in new | changed | deleted:
            if not hasattr(obj, '__revision_class__'):
                continue
            revision_cls = obj.__revision_class__
            revision_table = orm.class_mapper(revision_cls).mapped_table
            ## when a normal active transaction happens

            ### this is an sql statement as we do not want it in object cache
            session.execute(
                revision_table.update().where(
                    and_(revision_table.c.id == obj.id,
                         revision_table.c.current == '1')
                ).values(current='0')
            )

            q = session.query(revision_cls)
            q = q.filter_by(expired_timestamp=datetime.datetime(9999, 12, 31), id=obj.id)
            results = q.all()
            for rev_obj in results:
                values = {}
                if rev_obj.revision_id == revision.id:
                    values['revision_timestamp'] = revision.timestamp
                else:
                    values['expired_timestamp'] = revision.timestamp
                session.execute(
                    revision_table.update().where(
                        and_(revision_table.c.id == rev_obj.id,
                             revision_table.c.revision_id == rev_obj.revision_id)
                    ).values(**values)
                )
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def resource_dict_save(res_dict, context):
    model = context["model"]
    session = context["session"]

    id = res_dict.get("id")
    obj = None
    if id:
        obj = session.query(model.Resource).get(id)
    if not obj:
        new = True
        obj = model.Resource()
    else:
        new = False

    table = class_mapper(model.Resource).mapped_table
    fields = [field.name for field in table.c]

    # Resource extras not submitted will be removed from the existing extras
    # dict
    new_extras = {}
    for key, value in res_dict.iteritems():
        if isinstance(value, list):
            continue
        if key in ('extras', 'revision_timestamp', 'tracking_summary'):
            continue
        if key in fields:
            if isinstance(getattr(obj, key), datetime.datetime):
                if getattr(obj, key).isoformat() == value:
                    continue
                if key == 'last_modified' and not new:
                    obj.url_changed = True
            if key == 'url' and not new and obj.url != value:
                obj.url_changed = True
            setattr(obj, key, value)
        else:
            # resources save extras directly onto the object, instead
            # of in a separate extras field like packages and groups
            new_extras[key] = value

    obj.state = u'active'
    obj.extras = new_extras

    session.add(obj)
    return obj
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def table_dictize(obj, context, **kw):
    '''Get any model object and represent it as a dict'''

    result_dict = {}

    model = context["model"]
    session = model.Session

    if isinstance(obj, RowProxy):
        fields = obj.keys()
    else:
        ModelClass = obj.__class__
        table = class_mapper(ModelClass).mapped_table
        fields = [field.name for field in table.c]

    for field in fields:
        name = field
        if name in ('current', 'expired_timestamp', 'expired_id'):
            continue
        if name == 'continuity_id':
            continue
        value = getattr(obj, name)
        if value is None:
            result_dict[name] = value
        elif isinstance(value, dict):
            result_dict[name] = value
        elif isinstance(value, int):
            result_dict[name] = value
        elif isinstance(value, datetime.datetime):
            result_dict[name] = value.isoformat()
        elif isinstance(value, list):
            result_dict[name] = value
        else:
            result_dict[name] = unicode(value)

    result_dict.update(kw)

    ##HACK For optimisation to get metadata_modified created faster.

    context['metadata_modified'] = max(result_dict.get('revision_timestamp', ''),
                                       context.get('metadata_modified', ''))

    return result_dict
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def object_to_dict(obj, found=None, path="", notfollow=[], depth=0, maxdepth=1, subprimkeyonly=True):
    # print "%s %s"%(depth,path)
    depth += 1
    if found is None:
        found = set()
    mapper = class_mapper(obj.__class__)

    # #to work with dates, but we don't use that
    # def get_key_value (c):
    #     if isinstance(getattr(obj, c), datetime):
    #         return c,getattr(obj, c).isoformat()
    #     else:
    #         return c,getattr(obj, c)

    # if subprimkeyonly then we only return the primary key and not the other
    # properties of related objects
    if depth > 1 and subprimkeyonly:
        for column in mapper.columns:
            if column.primary_key:
                out = getattr(obj, column.key)
    else:
        out = {}
        for column in mapper.columns:
            out[column.key] = getattr(obj, column.key)
    path0 = copy(path)
    for name, relation in list(mapper.relationships.items()):
        path = path0 + "/%s" % name
        if path in notfollow:
            continue
        if depth > maxdepth:
            continue
        if relation not in found:
            found.add(relation)
            related_obj = getattr(obj, name)
            if related_obj is not None:
                if relation.uselist:
                    out[name] = [object_to_dict(
                        child, found, path, notfollow, depth, maxdepth, subprimkeyonly) for child in related_obj]
                else:
                    out[name] = object_to_dict(
                        related_obj, found, path, notfollow, depth, maxdepth, subprimkeyonly)
    return out