我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用google.appengine.api.memcache.flush_all()。
def test_delete_db_ndb_mixed(self): # Start empty storage_ndb = appengine.StorageByKeyName( appengine.CredentialsNDBModel, 'foo', 'credentials') storage = appengine.StorageByKeyName( appengine.CredentialsModel, 'foo', 'credentials') # First DB, then NDB self.assertEqual(None, storage.get()) storage.put(self.credentials) self.assertNotEqual(None, storage.get()) storage_ndb.delete() self.assertEqual(None, storage.get()) # First NDB, then DB self.assertEqual(None, storage_ndb.get()) storage_ndb.put(self.credentials) storage.delete() self.assertNotEqual(None, storage_ndb.get()) # NDB uses memcache and an instance cache (Context) ndb.get_context().clear_cache() memcache.flush_all() self.assertEqual(None, storage_ndb.get())
def get(self): logging.debug("Caching devos for today.") memcache.flush_all() status = "Status : " try: # cache only today and tmr for delta in range(0, 2): for version in range(V.get_size()): logging.debug("how many times {}".format(version)) get_devo(delta=delta, version=V.get_version_letters(version)) except Exception as e: status += "Cache Failed - " + str(e) else: status += "Cache Passed: " finally: send_message(CREATOR_ID, status) self.response.write("Success...") return
def post(self): """Handle modifying actions and redirect to a GET page.""" if self.request.get('action:flush_memcache'): if memcache.flush_all(): message = 'Cache flushed, all keys dropped.' else: message = 'Flushing the cache failed. Please try again.' self.redirect(self._construct_url(remove=['action:flush_memcache'], add={'message': message})) elif self.request.get('action:delete_entities'): entity_keys = self.request.params.getall('entity_key') db.delete(entity_keys) self.redirect(self._construct_url( remove=['action:delete_entities'], add={'message': '%d entities deleted' % len(entity_keys)})) else: self.error(404)
def cron_scrape(): try: scrapers.scrape_intelli() scrapers.scrape_victrola() scrapers.scrape_stumptown() scrapers.scrape_bluebottle() scrapers.scrape_fortyninth() scrapers.scrape_matchstick() scrapers.scrape_heart() memcache.flush_all() except Exception as e: logging.warning("Error: {}".format(e)) return "Finished scraping"
def get(self): memcache.flush_all() self.response.out.write("Flushed memcache!")
def post(self): """Handle POST.""" if self.request.get('flush_memcache'): if memcache.flush_all(): message = 'Cache flushed, all keys dropped.' else: message = 'Flushing the cache failed. Please try again.' self.redirect_with_message(message) return kind = self.request.get('kind') keys = [] index = 0 num_keys = int(self.request.get('numkeys')) for i in xrange(1, num_keys+1): key = self.request.get('key%d' % i) if key: keys.append(key) if self.request.get('action') == 'Delete': num_deleted = 0 for key in keys: datastore.Delete(datastore.Key(key)) num_deleted = num_deleted + 1 message = '%d entit%s deleted. %s' % ( num_deleted, ('ies', 'y')[num_deleted == 1], _DATASTORE_CACHING_WARNING) self.redirect_with_message(message) return self.error(404)
def post(self): """Handle modifying actions and/or redirect to GET page.""" next_param = {} if self.request.get('action:flush'): if memcache.flush_all(): next_param['message'] = 'Cache flushed, all keys dropped.' else: next_param['message'] = 'Flushing the cache failed. Please try again.' elif self.request.get('action:display'): next_param['key'] = self.request.get('key') elif self.request.get('action:edit'): next_param['edit'] = self.request.get('key') elif self.request.get('action:delete'): key = self.request.get('key') result = memcache.delete(key) if result == memcache.DELETE_NETWORK_FAILURE: next_param['message'] = ('ERROR: Network failure, key "%s" not deleted.' % key) elif result == memcache.DELETE_ITEM_MISSING: next_param['message'] = 'Key "%s" not in cache.' % key elif result == memcache.DELETE_SUCCESSFUL: next_param['message'] = 'Key "%s" deleted.' % key else: next_param['message'] = ('Unknown return value. Key "%s" might still ' 'exist.' % key) elif self.request.get('action:save'): key = self.request.get('key') value = self.request.get('value') type_ = self.request.get('type') next_param['key'] = key try: if self._SetValue(key, type_, value): next_param['message'] = 'Key "%s" saved.' % key else: next_param['message'] = 'ERROR: Failed to save key "%s".' % key except ValueError, e: next_param['message'] = 'ERROR: Unable to encode value: %s' % e elif self.request.get('action:cancel'): next_param['key'] = self.request.get('key') else: next_param['message'] = 'Unknown action.' next = self.request.path_url if next_param: next = '%s?%s' % (next, self._urlencode(next_param)) self.redirect(next)
def post(self): """Handle modifying actions and/or redirect to GET page.""" next_param = {} if self.request.get('action:flush'): if memcache.flush_all(): next_param['message'] = 'Cache flushed, all keys dropped.' else: next_param['message'] = 'Flushing the cache failed. Please try again.' elif self.request.get('action:display'): next_param['key'] = self.request.get('key') elif self.request.get('action:edit'): next_param['edit'] = self.request.get('key') elif self.request.get('action:delete'): key = self.request.get('key') result = memcache.delete(key) if result == memcache.DELETE_NETWORK_FAILURE: next_param['message'] = ('ERROR: Network failure, key "%s" not deleted.' % key) elif result == memcache.DELETE_ITEM_MISSING: next_param['message'] = 'Key "%s" not in cache.' % key elif result == memcache.DELETE_SUCCESSFUL: next_param['message'] = 'Key "%s" deleted.' % key else: next_param['message'] = ('Unknown return value. Key "%s" might still ' 'exist.' % key) elif self.request.get('action:save'): key = self.request.get('key') value = self.request.get('value') type_ = self.request.get('type') next_param['key'] = key converter = self.FRIENDLY_TYPE_NAME_TO_CONVERTER[type_] try: memcache_value = converter.to_cache(value) except ValueError as e: next_param['message'] = 'ERROR: Failed to save key "%s": %s.' % (key, e) else: if self._set_memcache_value(key, memcache_value, converter.memcache_type): next_param['message'] = 'Key "%s" saved.' % key else: next_param['message'] = 'ERROR: Failed to save key "%s".' % key elif self.request.get('action:cancel'): next_param['key'] = self.request.get('key') else: next_param['message'] = 'Unknown action.' next = self.request.path_url if next_param: next = '%s?%s' % (next, self._urlencode(next_param)) self.redirect(next)