我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用mongoengine.Q。
def scene_search(request): if request.method == "POST": try: skip = int(request.POST.get('skip', 0)) except TypeError: skip = 0 except ValueError: skip = 0 try: limit = int(request.POST.get('limit', 5)) except TypeError: limit = 5 except ValueError: limit = 5 # # ????????? # try: # latitude = float(request.POST.get('latitude', None)) # longitude = float(request.POST.get('longitude', None)) # except: # return JsonResponse(resultMsg['CoordinatesError']) # coordinates = [longitude, latitude] # ????? # ?????? content = request.POST.get('searchContent', None) if not content: return JsonResponse(resultMsg['NeedParameter']) lists = Scene.objects( (Q(name={"$regex": content}) | Q(city={"$regex": content}) | Q(province={"$regex": content})), status='online' # location__near=coordinates, location__max_distance=50000 ).all()[skip: limit] lists = json.loads(lists.to_json()) map(more_replace, lists) return JsonResponse({"result": lists}) raise Http404
def q(self): return me.Q()
def owner_query(self): return me.Q(owner=self.owner)
def q(self): if self.operator == 'eq': return me.Q(**{self.field: self.value}) return me.Q(**{'%s__%s' % (self.field, self.operator): self.value})
def q(self): rtype = self._instance.condition_resource_cls._meta["collection"] ids = set() for key, value in self.tags.iteritems(): query = { 'owner': self._instance.owner, 'resource_type': rtype, 'key': key, } if value: query['value'] = value ids |= set(tag.resource.id for tag in Tag.objects(**query).only('resource')) return me.Q(id__in=ids)
def q(self): return me.Q(id__in=self.ids)
def _list_images__fetch_images(self, search=None): default_images = config.EC2_IMAGES[self.cloud.region] image_ids = default_images.keys() + self.cloud.starred if not search: try: # this might break if image_ids contains starred images # that are not valid anymore for AWS images = self.connection.list_images(None, image_ids) except Exception as e: bad_ids = re.findall(r'ami-\w*', e.message, re.DOTALL) for bad_id in bad_ids: self.cloud.starred.remove(bad_id) self.cloud.save() images = self.connection.list_images(None, default_images.keys() + self.cloud.starred) for image in images: if image.id in default_images: image.name = default_images[image.id] images += self.connection.list_images(ex_owner='self') else: image_models = CloudImage.objects( me.Q(cloud_provider=self.connection.type, image_id__icontains=search) | me.Q(cloud_provider=self.connection.type, name__icontains=search) )[:200] images = [NodeImage(id=image.image_id, name=image.name, driver=self.connection, extra={}) for image in image_models] if not images: # Actual search on EC2. images = self.connection.list_images( ex_filters={'name': '*%s*' % search} ) return images
def remove_tags_from_resource(owner, resource_obj, tags, *args, **kwargs): """ This function get a list of tags in the form [{'key': 'joe'}] or [{'key': 'joe', 'value': 'schmoe'}] and will delete them from the resource :param owner: the resource owner :param resource_obj: the resource object where the tags will be added :param rtype: resource type :param tags: list of tags to be deleted """ # ensure there are no duplicate tag keys because mongoengine will # raise exception for duplicates in query key_list = list(set(tags)) # create a query that will return all the tags with query = reduce(lambda q1, q2: q1.__or__(q2), map(lambda key: Q(key=key), key_list)) Tag.objects(Q(owner=owner) & Q(resource=resource_obj) & (query)).delete() # I think that the above overly complex query could simply be rewritten as # Tag.objects(owner=owner, resource=resource_obj, # key__in=key_list).delete() # SEC owner.mapper.update(resource_obj) rtype = resource_obj._meta["collection"] trigger_session_update(owner, [rtype + 's' if not rtype.endswith('s') else rtype])
def owner_query(self): return me.Q(cloud__in=Cloud.objects(owner=self.owner).only('id'))
def get_queryset(self): q_list = [] users_folows_list = Follow.objects.filter(user=self.request.user) for follow in users_folows_list: q_list.append(Q(fromAddress=follow.address, timestamp__gte=follow.created.timestamp())) q_list.append(Q(toAddress=follow.address, timestamp__gte=follow.created.timestamp())) if q_list: query = q_list.pop() for item in q_list: query |= item return EthTransactions.objects.filter(query) return EthTransactions.objects.none()
def _show_item(self, **kwargs): if self.user.offset < 0: self.send(self.t('NO_PREV')) return False condition = Q(bc_hash__ne=None) & \ Q(type=self.user.filter_buy_type) # Q(price__lte=self.user.filter_price) offers = Offer.objects(condition) \ .skip(self.user.offset) \ .limit(1) offer = None for item in offers: offer = item break if offer is None: self.send(self.t('NO_MORE')) return False try: path = offer.get_image_path(self.user.lang) if not path: raise Exception with open(path, 'rb') as f: inline_keyboard = InlineKeyboard(keyboard=[ [InlineKeyboardButton(text=self.t('BUY_CONTRACT'), callback_data=offer.get_id())] ]) self.send_photo( files=(('photo', path, f.read()),), reply_markup=inline_keyboard ) except Exception: if options.debug: traceback.print_exc() self.logger.error(offer.get_id()) self.send(self.t('IMAGE_NOT_FOUND')) return True
def twitter(bot, message): """#twitter [-p ??] -p : ???? """ try: cmd, *args = shlex.split(message.text) except ValueError: return False if not cmd[0] in config['trigger']: return False if not cmd[1:] == 'twitter': return False try: options, args = getopt.gnu_getopt(args, 'hp:') except getopt.GetoptError: # ???? reply(bot, message, twitter.__doc__) return True days = 0 for o, a in options: if o == '-p': # ???? try: days = int(a) if days < 0: raise ValueError except ValueError: reply(bot, message, twitter.__doc__) return True elif o == '-h': # ?? reply(bot, message, twitter.__doc__) return True tweets = Twitter.objects(Q(date__gte=datetime.now().date()+timedelta(days=-days)) & Q(date__lte=datetime.now().date()+timedelta(days=-days+1))) if tweets: reply(bot, message, '\n---------\n'.join([str(tweet) for tweet in tweets])) return True else: reply(bot, message, '??????...') return True