我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用sqlalchemy.orm.object_session()。
def clock_tick(self, activity: TemporalActivityMixin = None): warnings.warn("clock_tick is going away in 0.5.0", PendingDeprecationWarning) """Increments vclock by 1 with changes scoped to the session""" if self.temporal_options.activity_cls is not None and activity is None: raise ValueError("activity is missing on edit") from None session = orm.object_session(self) with session.no_autoflush: yield self if session.is_modified(self): self.vclock += 1 new_clock_tick = self.temporal_options.clock_model( entity=self, tick=self.vclock) if activity is not None: new_clock_tick.activity = activity session.add(new_clock_tick)
def _set_loaned_flag(self, flag): """Sets loaned flag in current collection and all associated movies. :param flag: if True and there are loaned movies in the collection already, exception will be raised (whole collection cannot be loaned if one of the movies is not available). Please also remember to create new entry in loans table later (no need to do that if flag is False). """ session = object_session(self) if flag: # loaning whole collection loaned_movies = session.execute(select([tables.movies.columns.movie_id])\ .where(and_(tables.movies.columns.collection_id == self.collection_id,\ tables.movies.columns.loaned == True))).fetchall() if loaned_movies: log.error('cannot loan it, collection contains loaned movie(s): %s', loaned_movies) raise Exception('loaned movies in the collection already') self._loaned = flag update_query = update(tables.movies, tables.movies.columns.collection_id == self.collection_id) session.execute(update_query, params={'loaned': flag})
def _composition_listener(attr): ''' Attach event listeners to an InstrumentedAttribute to trigger formula parsing on load and on change. ''' @event.listens_for(attr, "set") def _update_composition_from_formula(target, value, oldvalue, initiator): session = object_session(target) if value == "" or value is None: return # If the object hasn't been associated with a session, # we can't look up bricks. if session is None: return target.composition = _formula_parser(value, session) @event.listens_for(attr.class_, "load") def _update_composition_on_load(target, context): value = getattr(target, attr.prop.key) if value == "" or value is None: return session = object_session(target) target.composition = _formula_parser(value, session)
def composition(self): composition = CompositionType() session = object_session(self) for fragment_composition_relation in self._fragment_composition: symbol = fragment_composition_relation.brick_string isotope, element = re.search(r"(?P<isotope>\d*)?(?P<element>\S+)", symbol).groups() count = fragment_composition_relation.count if count is not None: count = int(count) else: count = 1 if isotope != "": name = _make_isotope_string(element, isotope) else: name = element is_brick = session.query(Brick).filter(Brick.brick == name).first() if is_brick is None: composition[str(name)] += count else: composition += is_brick.composition * count return composition
def covers(self, item): return object_session(self).scalar( select([func.ST_Covers(Place.geom, item['location'])]).where(Place.place_id == self.place_id))
def _record(mapper, target, operation): s = orm.object_session(target) if isinstance(s, _SignallingSession): pk = tuple(mapper.primary_key_from_instance(target)) s._model_changes[pk] = (target, operation)
def _record(mapper, target, operation): s = orm.object_session(target) if isinstance(s, SignallingSession) and s.emit_modification_signals: pk = tuple(mapper.primary_key_from_instance(target)) s._model_changes[pk] = (target, operation)
def dataset_count(self): from annotator.models import Dataset return object_session(self).query(Dataset).filter( Dataset.problem == self ).count()
def _get_loan_history(self): where = [tables.loans.c.movie_id == self.movie_id] if self.collection_id is not None: where.append(tables.loans.c.collection_id == self.collection_id) if self.volume_id is not None: where.append(tables.loans.c.volume_id == self.volume_id) return object_session(self).query(Loan).filter(\ and_(tables.loans.c.return_date != None, or_(*where))).all()
def _get_loan_details(self): where = [tables.loans.c.movie_id == self.movie_id] if self.collection_id is not None: where.append(tables.loans.c.collection_id == self.collection_id) if self.volume_id is not None: where.append(tables.loans.c.volume_id == self.volume_id) return object_session(self).query(Loan).filter(and_(tables.loans.c.return_date == None, or_(*where))).first()
def _set_loaned_flag(self, flag): """Sets loaned flag in current volume and all associated movies. :param flag: if True, remember to create new entry in loans table later! """ session = object_session(self) self._loaned = flag update_query = update(tables.movies, tables.movies.columns.volume_id == self.volume_id) session.execute(update_query, params={'loaned': flag})
def set_name(self, name): self.name = name unique_name = 'ARXIVID_{}_USERNAME_{}_TAGNAME_{}'.format(self.paper.arxiv_id, self.creator.name, name) if object_session(self).query(Tag).filter_by(unique_name=unique_name).first() is not None: raise ValueError("Tag with unique_name='{}' already exists.".format(unique_name)) self.unique_name = unique_name
def __init__(self, serv, token=None, session=None, connection=None): self.serv = serv if not token and not connection: raise TypeError("Need to provide a token or a connection") self._connection = connection if connection: token = connection.token session = object_session(connection) self.token = token self.session = session self.log = self.serv.log
def best_scores(self): """ Return the best score for each feet """ return [self.best_score(feet[0]) for feet in (object_session(self) .query(song_stat.SongStat.feet) .filter_by(song_id=self.id) .group_by(song_stat.SongStat.feet) .order_by(asc(song_stat.SongStat.feet))) ]
def highscores(self, feet=None): """ Return the highest score for this song """ query = (object_session(self) .query(song_stat.SongStat) .filter_by(song_id=self.id) .order_by(desc(song_stat.SongStat.score))) if feet: query = query.filter_by(feet=feet) return query
def room_privilege(self, room_id): if room_id in self._room_level: return self._room_level[room_id] priv = Privilege.find(room_id, self.id, object_session(self)) self._room_level[room_id] = priv return priv
def set_level(self, room_id, level): session = object_session(self) if not room_id: self.rank = level session.commit() return level priv = Privilege.find_or_update(room_id, self.id, session, level=level) self._room_level[room_id] = priv return level
def last_game(self): return (object_session(self) .query(game.Game) .filter_by(room_id=self.id) .order_by(desc(game.Game.created_at)) .first())
def nb_players(self): """ Get the numer of users in the room """ if self._nb_players: return self._nb_players self._nb_players = ( object_session(self) .query(func.count(user.User.id)) .filter_by(online=True, room_id=self.id) .scalar()) return self._nb_players
def online_users(self): """ Get the onlines user in this room """ return (object_session(self) .query(user.User) .filter_by(online=True, room_id=self.id))
def moderators(self): """ Get all the moderators in this room """ return (object_session(self) .query(user.User) .join(privilege.Privilege) .filter( privilege.Privilege.room_id == self.id, privilege.Privilege.level >= 5 ))
def default_roles(self): if self.username == 'default': return [] if self._default_roles_cache is not None: return self._default_roles_cache self._default_roles_cache = orm.object_session(self).\ query(Role).join(User, Role.users).\ filter(User.username == 'default').\ all() return self._default_roles_cache
def loan_to(self, person, whole_collection=False): """ Loans movie, all other movies from the same volume and optionally movie's collection. :param person: Person instance or person_id. :param whole_collection=False: if True, will loan all movies from the same collection. """ if self.loaned: log.warn('movie already loaned: %s', self.loan_details) return False session = object_session(self) if hasattr(person, 'person_id'): person = person.person_id elif not isinstance(person, int): raise ValueError("expecting int or Person instance, got %s instead" % type(person)) loan = Loan() loan.person_id = person loan.movie = self if whole_collection: if self.collection: # next line will update the status of all other movies in collection # or raise and OtherMovieAlreadyLoanedError self.collection.loaned = True loan.collection_id = self.collection_id else: log.debug('this movie doesn\'t have collection assigned, whole_collection param ignored') if self.volume: # next line will update the status of all other movies in volume self.volume.loaned = True loan.volume_id = self.volume_id self.loaned = True session.add(loan) return True