我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用sqlalchemy.orm.load_only()。
def cache_provider(self, provider): if self.memcached_provider == provider: return self.memcached_provider = provider self.memcache = {} logger.info("Caching archived indicators for provider {}".format(provider)) q = self.handle().query(Indicator) \ .filter_by(provider=provider) \ .order_by(asc(Indicator.lasttime), asc(Indicator.firsttime), asc(Indicator.created_at)) q = q.options(load_only("indicator", "group", "tags", "firsttime", "lasttime")) q = q.yield_per(1000) for i in q: self.memcache[i.indicator] = (i.group, i.tags, i.firsttime, i.lasttime) logger.info("Cached provider {} in memory, {} objects".format(provider, len(self.memcache)))
def wbgetentities(self, debug=False): sub = (session.query(Item.item_id) .join(ItemTag) .group_by(Item.item_id) .subquery()) q = (self.items.filter(Item.item_id == sub.c.item_id) .options(load_only(Item.qid))) if debug: print('running wbgetentities query') print(q) print(q.count()) items = {i.qid: i for i in q} if debug: print('{} items'.format(len(items))) for qid, entity in wikidata.entity_iter(items.keys(), debug=debug): if debug: print(qid) items[qid].entity = entity
def space(): overpass_dir = app.config['OVERPASS_DIR'] files = [{'file': f, 'size': f.stat().st_size} for f in os.scandir(overpass_dir) if '_' not in f.name and f.name.endswith('.xml')] files.sort(key=lambda f: f['size'], reverse=True) files = files[:200] place_lookup = {int(f['file'].name[:-4]): f for f in files} # q = Place.query.outerjoin(Changeset).filter(Place.place_id.in_(place_lookup.keys())).add_columns(func.count(Changeset.id)) q = (database.session.query(Place, func.count(Changeset.id)) .outerjoin(Changeset) .filter(Place.place_id.in_(place_lookup.keys())) .options(load_only(Place.place_id, Place.display_name, Place.state)) .group_by(Place.place_id, Place.display_name, Place.state)) for place, num in q: place_id = place.place_id place_lookup[place_id]['place'] = place place_lookup[place_id]['changesets'] = num return render_template('space.html', files=files)
def handleGetPlayerInfoById(self, data): playerId = data[4] if playerId.isdigit(): playerId = int(playerId) if playerId == self.user.Id: playerSwid = self.user.Swid username = self.user.Username else: playerModel = self.session.query(User).options(load_only("Username", "Swid")) \ .filter_by(Id=playerId).first() if playerModel is None: return username = playerModel.Username playerSwid = playerModel.Swid self.sendXt("pbi", playerSwid, playerId, username)
def getTimeSeries(self, simId, paramName, expList): ''' Retrieve all timeseries rows for the given simId and paramName. :param simId: simulation ID :param paramName: name of output parameter :param expList: (list of str) the names of the experiments to select results for. :return: list of TimeSeries tuples or None ''' cols = ['seriesId', 'runId', 'outputId', 'units'] + self.yearCols() with self.sessionScope() as session: query = session.query(TimeSeries, Experiment.expName).options(load_only(*cols)). \ join(Run).filter_by(simId=simId).filter_by(status='succeeded'). \ join(Experiment).filter(Experiment.expName.in_(expList)). \ join(Output).filter_by(name=paramName) rslt = query.all() return rslt # Single instance of the class. Use 'getDatabase' constructor # to ensure that this instance is returned if already created.
def get_all_board_names() -> List[str]: local_cached = local_cache.get('all_board_names') if local_cached: return local_cached all_board_names_cached = cache.get(cache_key('all_board_names')) if all_board_names_cached is not None: # No need to map a list of strings res = all_board_names_cached else: with session() as s: q = s.query(BoardOrmModel).options(load_only('name')).order_by(BoardOrmModel.name) # No mapping here either res = list(map(lambda i: i.name, q.all())) s.commit() cache.set(cache_key('all_board_names'), res) local_cache.set('all_board_names', res) return res
def get_top_existing(limit=39): cols = [Place.place_id, Place.display_name, Place.area, Place.state, Place.candidate_count, Place.item_count] c = func.count(Changeset.place_id) q = (Place.query.filter(Place.state.in_(['ready', 'refresh']), Place.area > 0, Place.candidate_count > 4) .options(load_only(*cols)) .outerjoin(Changeset) .group_by(*cols) .having(c == 0) .order_by((Place.item_count / Place.area).desc())) return q[:limit]
def load_only(self, *columns): return self.options(load_only(*columns))
def get_all_user_ids(self, session=None) -> list: from sqlalchemy.orm import load_only users = session.query(Users).options(load_only('uuid')).all() return [user.uuid for user in users]
def list_breakfasts(): """List all breakfasts currently in the database""" breakfasts = Breakfast.query.with_hint(Breakfast, "WITH (NOLOCK)").options(load_only("id")).all() return jsonify(breakfasts=[{"id": x.id} for x in breakfasts])
def list_ingredients(): """List all ingredients currently in the database""" ingredients = Ingredient.query.with_hint(Ingredient, "WITH (NOLOCK)").options(load_only("id")).all() return jsonify(ingredients=[{"id": x.id} for x in ingredients])
def getseg(session, segment_id, cols2load=None): '''Returns the segment identified by id `segment_id` by querying the session and, if not found, by querying the database :param cols2load: if the db has to be queried, specifies a list of columns to load. E.g.: `cols2load=[Segment.id]` ''' seg = session.query(Segment).get(segment_id) if seg: return seg query = session.query(Segment).filter(Segment.id == segment_id) if cols2load: query = query.options(load_only(*cols2load)) return query.first()
def toggle_class_id(session, segment_id, class_id): segment = session.query(Segment).options(load_only(Segment.id)).first() clz = segment.classes if any(c.id == class_id for c in clz): segment.del_classes(class_id) else: segment.add_classes(class_id) # re-query the database to be sure: return {'classes': get_classes(session), 'segment_class_ids': get_classes(session, segment_id)}
def is_url_parsed(self, *url_tuple): """Test whether this URL is parsed. Before parse the article, test whether this URL is parsed before. If so, ignore and return True. Otherwise, need to parse, return False. Parameters ---------- url_tuple : tuple Tuple (url_id, created_at, canonical) Returns ------- bool Whether this URL is parsed or not. """ url_id, created_at, canonical = url_tuple marticle = self.session.query(Article)\ .filter_by(canonical_url=canonical)\ .options(load_only('id', 'date_captured'))\ .one_or_none() # marticle exists # update date_captured of this article if marticle is not None: # marticle.date_captured > article['date_captured'] if marticle.date_captured > created_at: marticle.date_captured = created_at try: self.session.query(Url).filter_by(id=url_id).update( dict(article_id=marticle.id, status_code=U_WP_SUCCESS)) self.session.commit() return True except SQLAlchemyError as e: logger.error('Error when update url: %s', e) raise else: return False
def _set_all_board_names_cache(s): all_board_names_q = s.query(BoardOrmModel).options(load_only('name')).order_by(BoardOrmModel.name) cache.set(cache_key('all_board_names'), list(map(lambda i: i.name, all_board_names_q.all())))
def get_or_create_murl(session, data, platform_id=None, load_cols=['id', 'date_published']): """Get a URL record from table, if not exists, insert it. The function is similar as `get_or_create_m`. The difference is how to handle duplications. In this function, try to update 'date_published' if `data['date_published']` is not None. Parameters ---------- session : object An instance of SQLAlchemy Session. data : dict A dict that contains necessary attributes of the ORM objects. platform_id : int The id of a platform object. load_cols : list The columns to be loaded. Default is ['id', 'date_published']. Returns ------- object A URL model object. """ q = session.query(Url).filter_by(raw=data['raw'])\ .options(load_only(*load_cols)) murl = q.one_or_none() if murl: # update date_published if possible if murl.date_published is None and \ data.get('date_published', None) is not None: murl.date_published = data['date_published'] session.commit() else: murl = Url(**data) session.add(murl) try: session.commit() except IntegrityError as e: logger.warning('Concurrecy conflict %s', e) session.rollback() murl = q.one() if platform_id is not None: append_platform_to_url(session, murl.id, platform_id) return murl
def process_item(self, item, spider): """Main function that process Article item (third phase).""" url_id = item.pop('url_id') marticle = spider.session.query(Article)\ .filter_by(canonical_url=item['canonical_url'])\ .options(load_only('id', 'date_published', 'date_captured'))\ .one_or_none() # marticle exists # update datetime of this article if marticle: # marticle.date_published is None if marticle.date_published is None: marticle.date_published = item['date_published'] # marticle.date_captured > article['date_captured'] if marticle.date_captured > item['date_captured']: marticle.date_captured = item['date_captured'] # new article else: if item['site_id'] is not None: item['group_id'] = self.get_or_next_group_id( spider.session, item['title'], item['site_id']) # create this article marticle = Article(**item) spider.session.add(marticle) # commit changes try: spider.session.commit() except SQLAlchemyError as e: logger.error('Error when inserting article: %s', e) spider.session.rollback() raise DropItem() try: # finally update url status and article_id spider.session.query(Url).filter_by(id=url_id)\ .update(dict(status_code=U_WP_SUCCESS, article_id=marticle.id)) spider.session.commit() except SQLAlchemyError as e: logger.error('Error when update url: %s', e) spider.session.rollback() raise DropItem() item['url_id'] = url_id item['id'] = marticle.id return item