Python sqlalchemy.orm 模块,subqueryload() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用sqlalchemy.orm.subqueryload()

项目:sqlalchemy-mixins    作者:absent1706    | 项目源码 | 文件源码
def with_subquery(cls, *paths):
        """
        Eagerload for simple cases where we need to just
         joined load some relations
        In strings syntax, you can split relations with dot 
         (it's SQLAlchemy feature)

        :type paths: *List[str] | *List[InstrumentedAttribute]

        Example 1:
            User.with_subquery('posts', 'posts.comments').all()

        Example 2:
            User.with_subquery(User.posts, User.comments).all()
        """
        options = [subqueryload(path) for path in paths]
        return cls.query.options(*options)
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def filter_query(self, query):
        query = super(DiagnosisListView, self).filter_query(query)

        # Load codes and groups in subqueries rather than lazy-loading (to avoid O(n) queries)
        query = query.options(subqueryload('diagnosis_codes').joinedload('code'))
        query = query.options(subqueryload('group_diagnoses').joinedload('group'))

        args = parse_args(DiagnosisRequestSerializer)

        primary_group_ids = args['primary_group']
        secondary_group_ids = args['secondary_group']

        if primary_group_ids:
            query = query.filter(diagnosis_group_type_filter(primary_group_ids, GROUP_DIAGNOSIS_TYPE.PRIMARY))

        if secondary_group_ids:
            query = query.filter(diagnosis_group_type_filter(secondary_group_ids, GROUP_DIAGNOSIS_TYPE.SECONDARY))

        return query
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def __init__(self, current_user):
        self.current_user = current_user

        # True if the query is filtering on demographics
        self.filtering_by_demographics = False

        # Pre-load group_patients
        group_patients = subqueryload('group_patients')
        group_patients.joinedload('group')
        group_patients.joinedload('created_group')
        group_patients.joinedload('created_user')

        # Pre-load patient_numbers
        patient_numbers = subqueryload('patient_numbers')
        patient_numbers.joinedload('number_group')
        patient_numbers.joinedload('source_group')
        patient_numbers.joinedload('created_user')
        patient_numbers.joinedload('modified_user')

        self.query = Patient.query\
            .options(subqueryload('patient_demographics'))\
            .options(patient_numbers)\
            .options(group_patients)\
            .options(subqueryload('ukrdc_patient'))
项目:falcon-api    作者:Opentopic    | 项目源码 | 文件源码
def get_eager_queryset(self, req, resp, db_session=None, limit=None):
        """
        Return a default query with eager options set if any relations has been requested.

        :param req: Falcon request
        :type req: falcon.request.Request

        :param resp: Falcon response
        :type resp: falcon.response.Response

        :param db_session: SQLAlchemy session
        :type db_session: sqlalchemy.orm.session.Session

        :param limit: max number of records fetched
        :type limit: int | None

        :return: a query from `object_class`
        """
        query = db_session.query(self.objects_class)
        relations = self.clean_relations(self.get_param_or_post(req, self.PARAM_RELATIONS, ''))
        if self.eager_limit is not None and (limit is None or limit <= self.eager_limit):
            return query
        if relations is None:
            query = query.options(subqueryload('*'))
        elif len(relations):
            for relation in relations:
                query = query.options(subqueryload(relation))
        return query
项目:sqlalchemy-mixins    作者:absent1706    | 项目源码 | 文件源码
def _eager_expr_from_flat_schema(flat_schema):
    """
    :type flat_schema: dict
    """
    result = []
    for path, join_method in flat_schema.items():
        if join_method == JOINED:
            result.append(joinedload(path))
        elif join_method == SUBQUERY:
            result.append(subqueryload(path))
        else:
            raise ValueError('Bad join method `{}` in `{}`'
                             .format(join_method, path))
    return result
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def use_default_user():
    with database.session_scope() as session:
        request.account = session.query(m.User).\
            filter(m.User.username == 'default').\
            options(orm.subqueryload(m.User.roles)).\
            one_or_none()
        if request.account is not None:
            _expunge_user(request.account, session)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_with_children(self, questionnaire_response_id):
    with self.session() as session:
      query = session.query(QuestionnaireResponse) \
          .options(subqueryload(QuestionnaireResponse.answers))
      result = query.get(questionnaire_response_id)
      if result:
        ParticipantDao().validate_participant_reference(session, result)
      return result
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_with_children_in_session(self, session, obj_id):
    return (session.query(BiobankOrder)
        .options(subqueryload(BiobankOrder.identifiers), subqueryload(BiobankOrder.samples))
        .get(obj_id))
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_with_children(self, questionnaireId):
    with self.session() as session:
      query = session.query(Questionnaire).options(subqueryload(Questionnaire.concepts),
                                                   subqueryload(Questionnaire.questions))
      return query.get(questionnaireId)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_latest_questionnaire_with_concept(self, codeId):
    """Find the questionnaire most recently modified that has the specified concept code."""
    with self.session() as session:
      return (session.query(Questionnaire)
                .join(Questionnaire.concepts)
                .filter(QuestionnaireConcept.codeId == codeId)
                .order_by(Questionnaire.lastModified.desc())
                .options(subqueryload(Questionnaire.questions))
                .first())
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_with_children_with_session(self, session, questionnaireIdAndVersion):
    query = session.query(QuestionnaireHistory) \
        .options(subqueryload(QuestionnaireHistory.concepts),
                 subqueryload(QuestionnaireHistory.questions))
    return query.get(questionnaireIdAndVersion)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def _make_query(self, session, query_def):
    # For now, no filtering, ordering, or pagination is supported; fetch child organizations and
    # sites.
    return (session.query(HPO).options(subqueryload(HPO.organizations)
                                       .subqueryload(Organization.sites))
                              .order_by(HPO.name),
            _ORDER_BY_ENDING)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_with_children(self, metrics_version_id):
    with self.session() as session:
      query = session.query(MetricsVersion).options(subqueryload(MetricsVersion.buckets))
      return query.get(metrics_version_id)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_with_children(self, physical_measurements_id):
    with self.session() as session:
      query = session.query(PhysicalMeasurements) \
          .options(subqueryload(PhysicalMeasurements.measurements).subqueryload(
              Measurement.measurements)) \
          .options(subqueryload(PhysicalMeasurements.measurements).subqueryload(
              Measurement.qualifiers))
      return query.get(physical_measurements_id)
项目:overwatch-counter-picker    作者:cheshire137    | 项目源码 | 文件源码
def get_pick_records(page=1):
  offset = (page - 1) * STATS_PER_PAGE
  return Pick.query.options(subqueryload(Pick.blue_team), \
                            subqueryload(Pick.red_team)).\
              order_by(Pick.uploaded_at.desc()).\
              limit(STATS_PER_PAGE).offset(offset).all()

# Returns how many pages there are of pick records, based on STATS_PER_PAGE
# records shown per page.