Python django.core.cache.cache 模块,get_many() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用django.core.cache.cache.get_many()

项目:eoj3    作者:ultmaster    | 项目源码 | 文件源码
def get_all_contest_participants_detail(contest: Contest, users=None, privilege=False):
    cache_template = PARTICIPANT_RANK_DETAIL_PRIVATE if privilege else PARTICIPANT_RANK_DETAIL
    timeout = 60 if privilege else FORTNIGHT
    contest_users = users if users else contest.participants_ids
    cache_names = list(map(lambda x: cache_template.format(contest=contest.pk, user=x), contest_users))
    cache_res = cache.get_many(cache_names)
    ans = dict()
    second_attempt = []
    for user in contest_users:
        cache_name = cache_template.format(contest=contest.pk, user=user)
        if cache_name not in cache_res.keys():
            second_attempt.append(user)
        else:
            ans[user] = cache_res[cache_name]
    if second_attempt:
        ans2 = recalculate_for_participants(contest, second_attempt, privilege)
        cache.set_many({cache_template.format(contest=contest.pk, user=user_id): val for user_id, val in ans2.items()},
                       timeout * uniform(0.8, 1))
        ans.update(ans2)
    return ans
项目:eoj3    作者:ultmaster    | 项目源码 | 文件源码
def _get_many_or_invalidate(problem_ids, contest_id, cache_template):
    cache_res = cache.get_many(list(map(lambda x: cache_template.format(problem=x, contest=contest_id),
                                        problem_ids)))
    ans = dict()
    second_attempt = []
    for problem_id in problem_ids:
        cache_name = cache_template.format(problem=problem_id, contest=contest_id)
        if cache_name not in cache_res:
            second_attempt.append(problem_id)
        else:
            ans[problem_id] = cache_res[cache_name]
    invalidate_problems(second_attempt, contest_id)
    if second_attempt:
        res2 = cache.get_many(list(map(lambda x: cache_template.format(problem=x, contest=contest_id),
                                       second_attempt)))
        for problem_id in problem_ids:
            cache_name = cache_template.format(problem=problem_id, contest=contest_id)
            if cache_name in res2:
                ans[problem_id] = res2[cache_name]
    assert len(ans) == len(problem_ids)
    return ans
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_get_many(self):
        # Multiple cache keys can be returned using get_many
        cache = self.cache
        cache.set('a', 'a')
        cache.set('b', 'b')
        cache.set('c', 'c')
        cache.set('d', 'd')
        self.assertEqual(cache.get_many(['a', 'c', 'd']), {'a': 'a', 'c': 'c', 'd': 'd'})
        self.assertEqual(cache.get_many(['a', 'b', 'e']), {'a': 'a', 'b': 'b'})
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def cache_get_many(self, keys):
        return cache.get_many(keys)
项目:grical    作者:wikical    | 项目源码 | 文件源码
def global_template_vars(request):
    """
    Adds variables to all templates.

    It uses memcached to minimize hitting the db.
    """
    def get_user():
        if request.user.is_authenticated():
            return ExtendedUser.objects.get( id = request.user.id )
        else:
            return None
    vars_funcs = {
            'SITE_NAME':    lambda: Site.objects.get_current().name,
            'SITE_DOMAIN':  lambda: Site.objects.get_current().domain,
            'USERS_NR':     lambda: User.objects.count(),
            'EVENTS_NR':    lambda: Event.objects.count(),
            'GROUPS_NR':    lambda: Group.objects.count(), }
            # protocol  (computed below)
    vars_dic = cache.get_many( vars_funcs.keys() )
    if not vars_dic:
        vars_dic = {}
        # we get the values
        for key, func in vars_funcs.items():
            vars_dic[ key ] = func()
        # we put the values in the cache
        cache.set_many( vars_dic )
    # we add protocol
    if request.is_secure():
        vars_dic['PROTOCOL'] = "https"
    else:
        vars_dic['PROTOCOL'] = "http"
    vars_dic['VERSION'] = settings.VERSION
    vars_dic['MEDIA_URL'] = settings.MEDIA_URL
    # TODO: think on the trick to get the user out of a signed Django-1.4 cookie
    vars_dic['USER'] = get_user()
    vars_dic['READ_ONLY'] = settings.READ_ONLY
    # return
    return vars_dic
项目:missioncontrol    作者:mozilla    | 项目源码 | 文件源码
def channel_platform_summary(request):
    platform_filter = [platform.lower() for platform in request.GET.getlist('platform')]
    channel_filter = [channel.lower() for channel in request.GET.getlist('channel')]

    platforms = Platform.objects.all()
    if platform_filter:
        platforms = platforms.filter(name__in=platform_filter)

    channels = Channel.objects.all()
    if channel_filter:
        channels = channels.filter(name__in=channel_filter)

    summaries = []
    for channel in channels:
        for platform in platforms:
            measures = []
            measure_name_map = {
                get_measure_summary_cache_key(platform.name, channel.name,
                                              measure_name): measure_name
                for measure_name in Measure.objects.filter(
                        platform=platform).values_list('name', flat=True)
            }
            measure_summaries = cache.get_many(measure_name_map.keys())
            for (measure_summary_cache_key, measure_summary) in measure_summaries.items():
                measures.append({
                    'name': measure_name_map[measure_summary_cache_key],
                    **measure_summary,
                    'lastUpdated': (datetime_to_utc(measure_summary['lastUpdated'])
                                    if measure_summary.get('lastUpdated') else None)
                })
            summaries.append({
                'channel': channel.name,
                'platform': platform.name,
                'measures': measures
            })
    return JsonResponse(data={'summaries': summaries})
项目:django-asyncio-redis    作者:mackeyja92    | 项目源码 | 文件源码
def test_set_get_many(self):
        data = {
            'key1': 'value1',
            'key2': 'value2'
        }
        await cache.set_many(data)
        results = await cache.get_many(data.keys())
        self.assertDictEqual(results, data)
        data['non_existing'] = None
        results = await cache.get_many(data.keys())
        self.assertDictEqual(results, data)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get_cached_multiple_link_load(items, time_interval):
    """Cached version of get_multiple_link_load()"""
    item_map = {k: _cache_key(k, time_interval) for k in iterkeys(items)}
    # cache lookup
    cached = cache.get_many(item_map.values())
    _logger.debug(
        "get_cached_multiple_link_load: got %d/%d values from cache (%r)",
        len(cached), len(items), time_interval)

    # retrieve data for cache misses
    misses = {k: v
              for k, v in iteritems(items) if item_map[k] not in cached}
    if misses:
        get_multiple_link_load(misses, time_interval)

    # set data from cache
    reverse_item_map = {v: k for k, v in iteritems(item_map)}
    for cache_key, value in iteritems(cached):
        key = reverse_item_map[cache_key]
        properties = items[key]
        properties['load_in'], properties['load_out'] = value

    # add new data to cache
    missed_data = {item_map[key]: (properties['load_in'],
                                   properties['load_out'])
                   for key, properties in iteritems(misses)}
    _logger.debug("get_cached_multiple_link_load: caching %d values",
                  len(missed_data))
    cache.set_many(missed_data, CACHE_TIMEOUT)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get_cached_multiple_cpu_load(items, time_interval):
    """Cached version of get_multiple_link_load()"""
    item_map = {k: _cache_key(k, time_interval) for k in iterkeys(items)}
    # cache lookup
    cached = cache.get_many(item_map.values())
    _logger.debug(
        "get_cached_multiple_cpu_load: got %d/%d values from cache (%r)",
        len(cached), len(items), time_interval)

    # retrieve data for cache misses
    misses = {k: v
              for k, v in iteritems(items) if item_map[k] not in cached}
    if misses:
        get_multiple_cpu_load(misses, time_interval)

    # set data from cache
    reverse_item_map = {v: k for k, v in iteritems(item_map)}
    for cache_key, value in iteritems(cached):
        key = reverse_item_map[cache_key]
        properties = items[key]
        properties['load'] = value

    # add new data to cache
    missed_data = {item_map[key]: properties['load']
                   for key, properties in iteritems(misses)}
    _logger.debug("get_cached_multiple_cpu_load: caching %d values",
                  len(missed_data))
    cache.set_many(missed_data, CACHE_TIMEOUT)
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_cache_versioning_get_set_many(self):
        cache = self.cache
        cache2 = LRUObjectCache('lru2', dict(VERSION=2))
        cache2._cache = cache._cache

        # set, using default version = 1
        cache.set_many({'ford1': 37, 'arthur1': 42})
        self.assertEqual(cache.get_many(['ford1', 'arthur1']), {'ford1': 37, 'arthur1': 42})
        self.assertEqual(cache.get_many(['ford1', 'arthur1'], version=1), {'ford1': 37, 'arthur1': 42})
        self.assertEqual(cache.get_many(['ford1', 'arthur1'], version=2), {})

        self.assertEqual(cache2.get_many(['ford1', 'arthur1']), {})
        self.assertEqual(cache2.get_many(['ford1', 'arthur1'], version=1), {'ford1': 37, 'arthur1': 42})
        self.assertEqual(cache2.get_many(['ford1', 'arthur1'], version=2), {})

        # set, default version = 1, but manually override version = 2
        cache.set_many({'ford2': 37, 'arthur2': 42}, version=2)
        self.assertEqual(cache.get_many(['ford2', 'arthur2']), {})
        self.assertEqual(cache.get_many(['ford2', 'arthur2'], version=1), {})
        self.assertEqual(cache.get_many(['ford2', 'arthur2'], version=2), {'ford2': 37, 'arthur2': 42})

        self.assertEqual(cache2.get_many(['ford2', 'arthur2']), {'ford2': 37, 'arthur2': 42})
        self.assertEqual(cache2.get_many(['ford2', 'arthur2'], version=1), {})
        self.assertEqual(cache2.get_many(['ford2', 'arthur2'], version=2), {'ford2': 37, 'arthur2': 42})

        # v2 set, using default version = 2
        cache2.set_many({'ford3': 37, 'arthur3': 42})
        self.assertEqual(cache.get_many(['ford3', 'arthur3']), {})
        self.assertEqual(cache.get_many(['ford3', 'arthur3'], version=1), {})
        self.assertEqual(cache.get_many(['ford3', 'arthur3'], version=2), {'ford3': 37, 'arthur3': 42})

        self.assertEqual(cache2.get_many(['ford3', 'arthur3']), {'ford3': 37, 'arthur3': 42})
        self.assertEqual(cache2.get_many(['ford3', 'arthur3'], version=1), {})
        self.assertEqual(cache2.get_many(['ford3', 'arthur3'], version=2), {'ford3': 37, 'arthur3': 42})

        # v2 set, default version = 2, but manually override version = 1
        cache2.set_many({'ford4': 37, 'arthur4': 42}, version=1)
        self.assertEqual(cache.get_many(['ford4', 'arthur4']), {'ford4': 37, 'arthur4': 42})
        self.assertEqual(cache.get_many(['ford4', 'arthur4'], version=1), {'ford4': 37, 'arthur4': 42})
        self.assertEqual(cache.get_many(['ford4', 'arthur4'], version=2), {})

        self.assertEqual(cache2.get_many(['ford4', 'arthur4']), {})
        self.assertEqual(cache2.get_many(['ford4', 'arthur4'], version=1), {'ford4': 37, 'arthur4': 42})
        self.assertEqual(cache2.get_many(['ford4', 'arthur4'], version=2), {})