我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.orm.aliased()。
def correlate(self, *args): """Return a :class:`.Query` construct which will correlate the given FROM clauses to that of an enclosing :class:`.Query` or :func:`~.expression.select`. The method here accepts mapped classes, :func:`.aliased` constructs, and :func:`.mapper` constructs as arguments, which are resolved into expression constructs, in addition to appropriate expression constructs. The correlation arguments are ultimately passed to :meth:`.Select.correlate` after coercion to expression constructs. The correlation arguments take effect in such cases as when :meth:`.Query.from_self` is used, or when a subquery as returned by :meth:`.Query.subquery` is embedded in another :func:`~.expression.select` construct. """ self._correlate = self._correlate.union( _interpret_as_from(s) if s is not None else None for s in args)
def outerjoin(self, *props, **kwargs): """Create a left outer join against this ``Query`` object's criterion and apply generatively, returning the newly resulting ``Query``. Usage is the same as the ``join()`` method. """ aliased, from_joinpoint, full = kwargs.pop('aliased', False), \ kwargs.pop('from_joinpoint', False), \ kwargs.pop('full', False) if kwargs: raise TypeError("unknown arguments: %s" % ', '.join(sorted(kwargs))) return self._join(props, outerjoin=True, full=full, create_aliases=aliased, from_joinpoint=from_joinpoint)
def process_vote(self, comment_id, username, value): session = Session() subquery = session.query(CommentVote).filter( CommentVote.username == username).subquery() vote_alias = aliased(CommentVote, subquery) q = session.query(Comment, vote_alias).outerjoin(vote_alias).filter( Comment.id == comment_id) comment, vote = q.one() if vote is None: vote = CommentVote(comment_id, username, value) comment.rating += value else: comment.rating += value - vote.value vote.value = value session.add(vote) session.commit() session.close()
def _addPipeline(self): ''' Adds the DRP and DAP Pipeline Info into the Query ''' self._drp_alias = aliased(marvindb.datadb.PipelineInfo, name='drpalias') self._dap_alias = aliased(marvindb.datadb.PipelineInfo, name='dapalias') drppipe = self._getPipeInfo('drp') dappipe = self._getPipeInfo('dap') # Add DRP pipeline version if drppipe: self.query = self.query.join(self._drp_alias, marvindb.datadb.Cube.pipelineInfo).\ filter(self._drp_alias.pk == drppipe.pk) # Add DAP pipeline version if dappipe: self.query = self.query.join(self._dap_alias, marvindb.dapdb.File.pipelineinfo).\ filter(self._dap_alias.pk == dappipe.pk)
def get_shots(criterions={}): shot_type = get_shot_type() criterions["entity_type_id"] = shot_type["id"] Sequence = aliased(Entity, name='sequence') query = Entity.query.filter_by(**criterions) query = query.join(Project) query = query.join(Sequence, Sequence.id == Entity.parent_id) query = query.add_columns(Project.name) query = query.add_columns(Sequence.name) data = query.all() shots = [] for (shot_model, project_name, sequence_name) in data: shot = shot_model.serialize(obj_type="Shot") shot["project_name"] = project_name shot["sequence_name"] = sequence_name shots.append(shot) return shots
def get_scenes(criterions={}): scene_type = get_scene_type() criterions["entity_type_id"] = scene_type["id"] Sequence = aliased(Entity, name='sequence') query = Entity.query.filter_by(**criterions) query = query.join(Project) query = query.join(Sequence, Sequence.id == Entity.parent_id) query = query.add_columns(Project.name) query = query.add_columns(Sequence.name) data = query.all() scenes = [] for (scene_model, project_name, sequence_name) in data: scene = scene_model.serialize(obj_type="Scene") scene["project_name"] = project_name scene["sequence_name"] = sequence_name scenes.append(scene) return scenes
def get_project_episodes(project_id): shot_type = shots_service.get_shot_type() sequence_type = shots_service.get_sequence_type() episode_type = shots_service.get_episode_type() Shot = aliased(Entity, name='shot') Sequence = aliased(Entity, name='sequence') query = Entity.query \ .join(Sequence, Sequence.parent_id == Entity.id) \ .join(Shot, Shot.parent_id == Sequence.id) \ .join(Task, Task.entity_id == Shot.id) \ .join(Project, Project.id == Entity.project_id) \ .join(ProjectStatus) \ .filter(Shot.entity_type_id == shot_type["id"]) \ .filter(Sequence.entity_type_id == sequence_type["id"]) \ .filter(Entity.entity_type_id == episode_type["id"]) \ .filter(Project.id == project_id) \ .filter(assignee_filter()) \ .filter(open_project_filter()) return Entity.serialize_list(query.all(), obj_type="Episode")
def _tenant_networks_by_network_query(self, context, network_id, bgp_speaker_id): """Return subquery for tenant networks by binding network ID""" address_scope = aliased(address_scope_db.AddressScope, name='address_scope') router_attrs = aliased(l3_attrs_db.RouterExtraAttributes, name='router_attrs') tenant_networks_query = context.session.query( l3_db.RouterPort.router_id, models_v2.Subnet.cidr, models_v2.Subnet.ip_version, address_scope.id) tenant_networks_query = tenant_networks_query.filter( l3_db.RouterPort.port_type != lib_consts.DEVICE_OWNER_ROUTER_GW, l3_db.RouterPort.port_type != lib_consts.DEVICE_OWNER_ROUTER_SNAT, l3_db.RouterPort.router_id == router_attrs.router_id, models_v2.IPAllocation.port_id == l3_db.RouterPort.port_id, models_v2.IPAllocation.subnet_id == models_v2.Subnet.id, models_v2.Subnet.network_id != network_id, models_v2.Subnet.subnetpool_id == models_v2.SubnetPool.id, models_v2.SubnetPool.address_scope_id == address_scope.id, BgpSpeaker.id == bgp_speaker_id, BgpSpeaker.ip_version == address_scope.ip_version, models_v2.Subnet.ip_version == address_scope.ip_version) return tenant_networks_query
def _next_hop_ip_addresses_by_binding_filters(self, network_id, bgp_speaker_id): """Return the filters for querying nexthops by binding network""" address_scope = aliased(address_scope_db.AddressScope, name='address_scope') return [models_v2.IPAllocation.port_id == l3_db.RouterPort.port_id, models_v2.IPAllocation.subnet_id == models_v2.Subnet.id, BgpSpeaker.id == bgp_speaker_id, BgpSpeakerNetworkBinding.bgp_speaker_id == BgpSpeaker.id, BgpSpeakerNetworkBinding.network_id == network_id, models_v2.Subnet.network_id == BgpSpeakerNetworkBinding.network_id, models_v2.Subnet.subnetpool_id == models_v2.SubnetPool.id, models_v2.SubnetPool.address_scope_id == address_scope.id, models_v2.Subnet.ip_version == address_scope.ip_version, l3_db.RouterPort.port_type == DEVICE_OWNER_ROUTER_GW]
def _tenant_prefixes_by_router_filters(self, router_id, bgp_speaker_id): binding = aliased(BgpSpeakerNetworkBinding, name='network_binding') subnetpool = aliased(models_v2.SubnetPool, name='subnetpool') router_attrs = aliased(l3_attrs_db.RouterExtraAttributes, name='router_attrs') return [models_v2.Subnet.id == models_v2.IPAllocation.subnet_id, models_v2.Subnet.subnetpool_id == subnetpool.id, l3_db.RouterPort.router_id == router_id, l3_db.Router.id == l3_db.RouterPort.router_id, l3_db.Router.id == router_attrs.router_id, l3_db.Router.gw_port_id == models_v2.Port.id, models_v2.Port.network_id == binding.network_id, binding.bgp_speaker_id == BgpSpeaker.id, l3_db.RouterPort.port_type == DEVICE_OWNER_ROUTER_INTF, models_v2.IPAllocation.port_id == l3_db.RouterPort.port_id]
def list_subscription_rule_states(name=None, account=None, session=None): """Returns a list of with the number of rules per state for a subscription. :param name: Name of the subscription :param account: Account identifier :param session: The database session in use. :returns: List with tuple (account, name, state, count) """ subscription = aliased(models.Subscription) rule = aliased(models.ReplicationRule) query = session.query(subscription.account, subscription.name, rule.state, func.count()).join(rule, subscription.id == rule.subscription_id) try: if name: query = query.filter(subscription.name == name) if account: query = query.filter(subscription.account == account) except IntegrityError as error: print(error) raise query = query.group_by(subscription.account, subscription.name, rule.state) for row in query: yield row
def filter_by_roles(current_user, roles): # Alias as we are joining against the GroupUser table twice group_user_alias = aliased(GroupUser) # Users in the same group as the parent user (correlated query) query = db.session.query(GroupUser)\ .join(GroupUser.group)\ .join(group_user_alias, Group.group_users)\ .filter(GroupUser.user_id == User.id) # Filter on the current user's group membership and roles query = query.filter( group_user_alias.user_id == current_user.id, group_user_alias.role.in_(roles) ) return query.exists()
def filter_consultants_by_patient_id(query, patient_id): # Only return consultants that belong to one of the groups the patient also belongs to consultant_alias = aliased(Consultant) consultants_for_patient_query = db.session.query(consultant_alias) consultants_for_patient_query = consultants_for_patient_query.join(consultant_alias.group_consultants) consultants_for_patient_query = consultants_for_patient_query.join(GroupConsultant.group) consultants_for_patient_query = consultants_for_patient_query.join(Group.group_patients) consultants_for_patient_query = consultants_for_patient_query.filter( GroupPatient.patient_id == patient_id, Consultant.id == consultant_alias.id, ) query = query.filter(consultants_for_patient_query.exists()) # Check the user has permission to view this patient patient_query = PatientQueryBuilder(current_user).build() patient_query = patient_query.filter(Patient.id == patient_id) query = query.filter(patient_query.exists()) return query
def filter_by_group_roles(current_user, roles, current=None): patient_alias = aliased(Patient) sub_query = db.session.query(patient_alias) sub_query = sub_query.join(patient_alias.group_patients) sub_query = sub_query.join(GroupPatient.group) sub_query = sub_query.join(Group.group_users) sub_query = sub_query.filter( patient_alias.id == Patient.id, GroupUser.user_id == current_user.id, GroupUser.role.in_(roles), ) if current: sub_query = sub_query.filter(GroupPatient.current == True) # noqa return sub_query.exists()
def _set_select_from(self, obj, set_base_alias): fa = [] select_from_alias = None for from_obj in obj: info = inspect(from_obj) if hasattr(info, 'mapper') and \ (info.is_mapper or info.is_aliased_class): self._select_from_entity = from_obj if set_base_alias: raise sa_exc.ArgumentError( "A selectable (FromClause) instance is " "expected when the base alias is being set.") fa.append(info.selectable) elif not info.is_selectable: raise sa_exc.ArgumentError( "argument is not a mapped class, mapper, " "aliased(), or FromClause instance.") else: if isinstance(from_obj, expression.SelectBase): from_obj = from_obj.alias() if set_base_alias: select_from_alias = from_obj fa.append(from_obj) self._from_obj = tuple(fa) if set_base_alias and \ len(self._from_obj) == 1 and \ isinstance(select_from_alias, expression.Alias): equivs = self.__all_equivs() self._from_obj_alias = sql_util.ColumnAdapter( self._from_obj[0], equivs)
def add_entity(self, entity, alias=None): """add a mapped entity to the list of result columns to be returned.""" if alias is not None: entity = aliased(entity, alias) self._entities = list(self._entities) m = _MapperEntity(self, entity) self._set_entity_selectables([m])
def outerjoin(self, *props, **kwargs): """Create a left outer join against this ``Query`` object's criterion and apply generatively, returning the newly resulting ``Query``. Usage is the same as the ``join()`` method. """ aliased, from_joinpoint = kwargs.pop('aliased', False), \ kwargs.pop('from_joinpoint', False) if kwargs: raise TypeError("unknown arguments: %s" % ', '.join(sorted(kwargs))) return self._join(props, outerjoin=True, create_aliases=aliased, from_joinpoint=from_joinpoint)
def __init__(self, alias): """Return a :class:`.MapperOption` that will indicate to the :class:`.Query` that the main table has been aliased. This is a seldom-used option to suit the very rare case that :func:`.contains_eager` is being used in conjunction with a user-defined SELECT statement that aliases the parent table. E.g.:: # define an aliased UNION called 'ulist' ulist = users.select(users.c.user_id==7).\\ union(users.select(users.c.user_id>7)).\\ alias('ulist') # add on an eager load of "addresses" statement = ulist.outerjoin(addresses).\\ select().apply_labels() # create query, indicating "ulist" will be an # alias for the main table, "addresses" # property should be eager loaded query = session.query(User).options( contains_alias(ulist), contains_eager(User.addresses)) # then get results via the statement results = query.from_statement(statement).all() :param alias: is the string name of an alias, or a :class:`~.sql.expression.Alias` object representing the alias. """ self.alias = alias
def next_alias(aliases, name, obj_class, use_existing=True, prefix=''): is_new = True if name in aliases: if use_existing: is_new = False else: aliases[name]['number'] += 1 aliases[name]['aliased'].append( aliased(obj_class, name=prefix + name + '_' + str(aliases[name]['number']))) else: aliases[name] = {'number': 1, 'aliased': [aliased(obj_class, name=prefix + name + '_1')]} return aliases[name]['aliased'][-1], is_new
def _get_model_and_conditions(trait_type, key, value, op='eq'): trait_model = aliased(trait_models_dict[trait_type]) op_dict = {'eq': (trait_model.value == value), 'lt': (trait_model.value < value), 'le': (trait_model.value <= value), 'gt': (trait_model.value > value), 'ge': (trait_model.value >= value), 'ne': (trait_model.value != value)} conditions = [trait_model.key == key, op_dict[op]] return (trait_model, conditions)
def add_all_columns(cols, class_name): joinclassobject = ALLOWED_CLASSES[class_name] #we need to catch this esception, because for aliased classes a mapper is not directly returned try: col_name_cols = sqlalchemy.inspection.inspect(joinclassobject).columns.items() except AttributeError: col_name_cols = sqlalchemy.inspection.inspect(joinclassobject).mapper.columns.items() for col_name, col in col_name_cols: column = getattr(joinclassobject, col.key) cols.append(column.label("{}_{}".format(class_name, col_name)))
def get_for_protocolize(db_path, class_name, code): """Return a dataframe containing a specific entry from a given class name, joined with its related tables up to three levels down. """ session, engine = load_session(db_path) cols = [] joins = [] classobject = ALLOWED_CLASSES[class_name] insp = sqlalchemy.inspection.inspect(classobject) for name, col in insp.columns.items(): cols.append(col.label(name)) for name, rel in insp.relationships.items(): alias = aliased(rel.mapper.class_, name=name) joins.append((alias, rel.class_attribute)) for col_name, col in sqlalchemy.inspection.inspect(rel.mapper).columns.items(): #the id column causes double entries, as it is mapped once on the parent table (related_table_id) and once on the child table (table_id) if col.key != "id": aliased_col = getattr(alias, col.key) cols.append(aliased_col.label("{}_{}".format(name, col_name))) sub_insp = sqlalchemy.inspection.inspect(rel.mapper.class_) for sub_name, sub_rel in sub_insp.relationships.items(): if "contains" not in sub_name: sub_alias = aliased(sub_rel.mapper.class_, name=name+"_"+sub_name) joins.append((sub_alias, sub_rel.class_attribute)) for sub_col_name, sub_col in sqlalchemy.inspection.inspect(sub_rel.mapper).columns.items(): #the id column causes double entries, as it is mapped once on the parent table (related_table_id) and once on the child table (table_id) if sub_col.key != "id": sub_aliased_col = getattr(sub_alias, sub_col.key) cols.append(sub_aliased_col.label("{}_{}_{}".format(name, sub_name, sub_col_name))) sql_query = session.query(*cols).select_from(classobject) for join in joins: sql_query = sql_query.outerjoin(*join) sql_query = sql_query.filter(classobject.code == code) mystring = sql_query.statement mydf = pd.read_sql_query(mystring,engine) session.close() engine.dispose() return mydf