我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用sqlalchemy.orm.class_mapper()。
def is_class_sa_mapped(klass): """ @rtype: C{bool} """ if not isinstance(klass, type): klass = type(klass) for c in class_checkers: if c(klass): return False try: class_mapper(klass) except UnmappedInstanceError: return False return True
def _auto_update(self, mapper=None): cls = self.__class__ if mapper is None: mapper = class_mapper(cls) for col in mapper.columns: # this order is the listed order except that hybrid properties go first api_info = cls._get_api_info(col.name) set_by = api_info['set_by'] if set_by == 'json': if col.name in g.fields: val = g.fields.get(col.name) self._set_field_value(col.name, val) elif set_by == 'url': if col.name in request.view_args: val = request.view_args.get(col.name) self._set_field_value(col.name, val) elif set_by == 'server': # important not to let it try to set it from json # requires a @setter decorated function self._set_field_value(name=col.name, try_auto=False)
def __get__(self, obj, type): try: mapper = orm.class_mapper(type) if mapper: return type.query_class(mapper, session=self.sa.session()) except UnmappedClassError: return None
def __unicode__(self): # pragma: no cover repr = u'<%s' % self.__class__.__name__ table = class_mapper(self.__class__).mapped_table for col in table.c: try: repr += u' %s=%s' % (col.name, getattr(self, col.name)) except Exception, inst: repr += u' %s=%s' % (col.name, inst) repr += '>' return repr
def getCustomProperties(self): self.mapper = class_mapper(self.klass) self.exclude_attrs.update(self.EXCLUDED_ATTRS) self.properties = [] for prop in self.mapper.iterate_properties: self.properties.append(prop.key) self.encodable_properties.update(self.properties) self.decodable_properties.update(self.properties) self.exclude_sa_key = self.KEY_ATTR in self.exclude_attrs self.exclude_sa_lazy = self.LAZY_ATTR in self.exclude_attrs
def as_dict(self): _dict = OrderedDict() table = orm.class_mapper(self.__class__).mapped_table for col in table.c: val = getattr(self, col.name) if isinstance(val, datetime.date): val = str(val) if isinstance(val, datetime.datetime): val = val.isoformat() _dict[col.name] = val return _dict
def __unicode__(self): repr = u'<%s' % self.__class__.__name__ table = orm.class_mapper(self.__class__).mapped_table for col in table.c: try: repr += u' %s=%s' % (col.name, getattr(self, col.name)) except Exception, inst: repr += u' %s=%s' % (col.name, inst) repr += '>' return repr
def serialize(model): # get names of all columns in model columns = [c.key for c in class_mapper(model.__class__).columns] # return values in a dict return dict((c, getattr(model, c)) for c in columns)
def _model_attrs(model): model_map = class_mapper(model) model_attrs = [x.key for x in model_map.column_attrs] if "_cidr" in model_attrs: model_attrs.append("cidr") if "_deallocated" in model_attrs: model_attrs.append("deallocated") return model_attrs
def serialize(self, model): """ Transforms a model into a dictionary which can be dumped to JSON. """ columns = [c.key for c in class_mapper(model.__class__).columns] return dict((c, getattr(model, c)) for c in columns)
def set(self, state, dict_, initiator, passive=attributes.PASSIVE_OFF, check_old=None, pop=False): # Set us on the state. dict_[self.key] = initiator if initiator is None: # Nullify relationship args for id in self.parent_token.id: dict_[id.key] = None dict_[self.parent_token.discriminator.key] = None else: # Get the primary key of the initiator and ensure we # can support this assignment. class_ = type(initiator) mapper = class_mapper(class_) pk = mapper.identity_key_from_instance(initiator)[1] # Set the identifier and the discriminator. discriminator = six.text_type(class_.__name__) for index, id in enumerate(self.parent_token.id): dict_[id.key] = pk[index] dict_[self.parent_token.discriminator.key] = discriminator
def fetch_mappers(): # lets find all the mappers in our model mappers = [] for attr in dir(_all): if attr[0] == '_': continue try: cls = getattr(_all, attr) mappers.append(class_mapper(cls)) except Exception as ex: if isinstance(ex, sqlalchemy.exc.InvalidRequestError): if ex.message.startswith("One or more mappers failed to initialize"): raise print "ignoring %s" % attr return mappers
def dictify(self, found=None, relationships=True, exclude=None): if found is None: found = set() if exclude is None: exclude = set() result = {} mapper = class_mapper(self.__class__) columns = [column.key for column in mapper.columns] for column in columns: if column in exclude: continue value = getattr(self, column) if isinstance(value, datetime): result[column] = value.isoformat() else: result[column] = value if relationships: for name, relation in mapper.relationships.items(): if relation not in found: found.add(relation) related_obj = getattr(self, name) prefix = '{}.'.format(name) if related_obj is not None: _exclude = {x.replace(prefix, '', 1) for x in exclude if x.startswith(prefix)} if relation.uselist: result[name] = [o.dictify(found=found, exclude=_exclude) for o in related_obj] else: result[name] = related_obj.dictify(found=found, exclude=_exclude) return result
def __get__(self, obj, type): try: mapper = class_mapper(type) if mapper: return type.query_class(mapper, session=self.sa.session) except UnmappedClassError: return None
def columns(cls): return sorted([column.key for column in class_mapper(cls).columns])
def before_commit(self, session): session.flush() try: obj_cache = session._object_cache revision = session.revision except AttributeError: return if getattr(session, 'revisioning_disabled', False): return new = obj_cache['new'] changed = obj_cache['changed'] deleted = obj_cache['deleted'] for obj in new | changed | deleted: if not hasattr(obj, '__revision_class__'): continue revision_cls = obj.__revision_class__ revision_table = orm.class_mapper(revision_cls).mapped_table ## when a normal active transaction happens ### this is an sql statement as we do not want it in object cache session.execute( revision_table.update().where( and_(revision_table.c.id == obj.id, revision_table.c.current == '1') ).values(current='0') ) q = session.query(revision_cls) q = q.filter_by(expired_timestamp=datetime.datetime(9999, 12, 31), id=obj.id) results = q.all() for rev_obj in results: values = {} if rev_obj.revision_id == revision.id: values['revision_timestamp'] = revision.timestamp else: values['expired_timestamp'] = revision.timestamp session.execute( revision_table.update().where( and_(revision_table.c.id == rev_obj.id, revision_table.c.revision_id == rev_obj.revision_id) ).values(**values) )
def resource_dict_save(res_dict, context): model = context["model"] session = context["session"] id = res_dict.get("id") obj = None if id: obj = session.query(model.Resource).get(id) if not obj: new = True obj = model.Resource() else: new = False table = class_mapper(model.Resource).mapped_table fields = [field.name for field in table.c] # Resource extras not submitted will be removed from the existing extras # dict new_extras = {} for key, value in res_dict.iteritems(): if isinstance(value, list): continue if key in ('extras', 'revision_timestamp', 'tracking_summary'): continue if key in fields: if isinstance(getattr(obj, key), datetime.datetime): if getattr(obj, key).isoformat() == value: continue if key == 'last_modified' and not new: obj.url_changed = True if key == 'url' and not new and obj.url != value: obj.url_changed = True setattr(obj, key, value) else: # resources save extras directly onto the object, instead # of in a separate extras field like packages and groups new_extras[key] = value obj.state = u'active' obj.extras = new_extras session.add(obj) return obj
def table_dictize(obj, context, **kw): '''Get any model object and represent it as a dict''' result_dict = {} model = context["model"] session = model.Session if isinstance(obj, RowProxy): fields = obj.keys() else: ModelClass = obj.__class__ table = class_mapper(ModelClass).mapped_table fields = [field.name for field in table.c] for field in fields: name = field if name in ('current', 'expired_timestamp', 'expired_id'): continue if name == 'continuity_id': continue value = getattr(obj, name) if value is None: result_dict[name] = value elif isinstance(value, dict): result_dict[name] = value elif isinstance(value, int): result_dict[name] = value elif isinstance(value, datetime.datetime): result_dict[name] = value.isoformat() elif isinstance(value, list): result_dict[name] = value else: result_dict[name] = unicode(value) result_dict.update(kw) ##HACK For optimisation to get metadata_modified created faster. context['metadata_modified'] = max(result_dict.get('revision_timestamp', ''), context.get('metadata_modified', '')) return result_dict
def object_to_dict(obj, found=None, path="", notfollow=[], depth=0, maxdepth=1, subprimkeyonly=True): # print "%s %s"%(depth,path) depth += 1 if found is None: found = set() mapper = class_mapper(obj.__class__) # #to work with dates, but we don't use that # def get_key_value (c): # if isinstance(getattr(obj, c), datetime): # return c,getattr(obj, c).isoformat() # else: # return c,getattr(obj, c) # if subprimkeyonly then we only return the primary key and not the other # properties of related objects if depth > 1 and subprimkeyonly: for column in mapper.columns: if column.primary_key: out = getattr(obj, column.key) else: out = {} for column in mapper.columns: out[column.key] = getattr(obj, column.key) path0 = copy(path) for name, relation in list(mapper.relationships.items()): path = path0 + "/%s" % name if path in notfollow: continue if depth > maxdepth: continue if relation not in found: found.add(relation) related_obj = getattr(obj, name) if related_obj is not None: if relation.uselist: out[name] = [object_to_dict( child, found, path, notfollow, depth, maxdepth, subprimkeyonly) for child in related_obj] else: out[name] = object_to_dict( related_obj, found, path, notfollow, depth, maxdepth, subprimkeyonly) return out