我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用shelve.open()。
def write(): os.remove(filename) cap = cv2.VideoCapture(0) db = shelve.open(filename) imgs = [] data = range(100) for i in range(100): ret, frame = cap.read() if ret: # jpg = frame # 29 MB # jpg = cv2.imencode('.jpg', frame) # make much smaller (1.9MB), otherwise 29MB jpg = cv2.imencode('.jpg', frame)[1].tostring() # no bennefit with doing string (1.9MB) imgs.append(jpg) print('frame[{}] {}'.format(i, frame.shape)) time.sleep(0.03) db['imgs'] = imgs db['data'] = data cap.release() db.close()
def open(self): if self.opened: return self.lock = open(SETTINGS.CACHE_PATH + '.lock', 'ab') try: fcntl.flock(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB) mode = 'c' except IOError: logger.warn("Cache locked, using read-only") mode = 'r' self.lock.close() self.lock = None try: self.storage = shelve.open(SETTINGS.CACHE_PATH, mode) except Exception as e: if mode != 'c': raise logger.warn("Dropping corrupted cache on %s", e) self.lock.truncate(0) self.storage = shelve.open(SETTINGS.CACHE_PATH, mode) self.opened = True
def __init__(self, path_entry): # Loading shelve causes an import recursive loop when it # imports dbm, and we know we are not going to load the # module # being imported, so when we seem to be # recursing just ignore the request so another finder # will be used. if ShelveFinder._maybe_recursing: raise ImportError try: # Test the path_entry to see if it is a valid shelf try: ShelveFinder._maybe_recursing = True with shelve.open(path_entry, 'r'): pass finally: ShelveFinder._maybe_recursing = False except Exception as e: print('shelf could not import from {}: {}'.format( path_entry, e)) raise else: print('shelf added to import path:', path_entry) self.path_entry = path_entry return
def new_profile(profile_name, address, port, username, password, activate): """ create a network proxy configuration profile """ store = shelve.open(db_file) try: store[str(profile_name)] = { 'address': address, 'port': port, 'username': username, 'password': password } click.echo("Profile '{}' successfully created".format(profile_name)) finally: store.close() if activate: activate_profile(profile_name)
def activate_profile(profile_name): store = shelve.open(db_file) try: if profile_name == "": util.write(None) try: del store['active'] finally: click.echo("No proxy mode activated") else: if profile_name is not "active": util.write(store[str(profile_name)]) store['active'] = str(profile_name) click.echo("Profile '{}' successfully activated". format(profile_name)) except KeyError: click.echo("No such profile '{}'".format(str(profile_name))) finally: store.close()
def delete_profile(profile_name): """ delete specified profile """ store = shelve.open(db_file) try: if profile_name is not "active": del store[str(profile_name)] try: if str(store["active"]) == profile_name: del store["active"] except KeyError: pass click.echo("Profile '{}' successfully deleted".format(str(profile_name))) except KeyError: click.echo("No such profile '{}'".format(str(profile_name))) finally: store.close()
def _get_data(url): """Helper function to get data over http or from a local file""" if url.startswith('http://'): # Try Python 2, use Python 3 on exception try: resp = urllib.urlopen(url) encoding = resp.headers.dict.get('content-encoding', 'plain') except AttributeError: resp = urllib.request.urlopen(url) encoding = resp.headers.get('content-encoding', 'plain') data = resp.read() if encoding == 'plain': pass elif encoding == 'gzip': data = StringIO(data) data = gzip.GzipFile(fileobj=data).read() else: raise RuntimeError('unknown encoding') else: with open(url, 'r') as fid: data = fid.read() return data
def get_data(url, gallery_dir): """Persistent dictionary usage to retrieve the search indexes""" # shelve keys need to be str in python 2 if sys.version_info[0] == 2 and isinstance(url, unicode): url = url.encode('utf-8') cached_file = os.path.join(gallery_dir, 'searchindex') search_index = shelve.open(cached_file) if url in search_index: data = search_index[url] else: data = _get_data(url) search_index[url] = data search_index.close() return data
def save_game(): """ open a new empty shelve (possibly overwriting an old one) to write the game data """ with shelve.open('savegames/savegame', 'n') as savefile: gv.cursor.deactivate() savefile['map'] = gv.game_map savefile['objects'] = gv.game_objects savefile['log'] = gv.game_log savefile['gamestate'] = gv.gamestate savefile['dlevel'] = gv.dungeon_level # Store the index of special objects, so they can be later restored from the gv.game_objects array savefile['p_index'] = gv.game_objects.index(gv.player) savefile['c_index'] = gv.game_objects.index(gv.cursor) savefile['sd_index'] = gv.game_objects.index(gv.stairs_down) savefile['su_index'] = gv.game_objects.index(gv.stairs_up) savefile.close()
def load_game(): """ load an existing savegame """ with shelve.open('savegames/savegame', 'r') as savefile: gv.game_map = savefile['map'] gv.game_objects = savefile['objects'] gv.game_log = savefile['log'] gv.gamestate = savefile['gamestate'] gv.dungeon_level = savefile['dlevel'] # Restore special objects gv.player = gv.game_objects[savefile['p_index']] gv.cursor = gv.game_objects[savefile['c_index']] gv.stairs_down = gv.game_objects[savefile['sd_index']] gv.stairs_up = gv.game_objects[savefile['su_index']] msgbox('Welcome back stranger to level {0} of {1}!'.format(gv.dungeon_level, settings.DUNGEONNAME),width=35, text_color=colors.red)
def extract_all(data): resp = dict(data) # text = get_sanitized(data) text = data['text'] # request feature extraction. text_hash = hashlib.sha256(text.encode('ascii', 'ignore')).hexdigest() print 'text_hash', text_hash cache_db = shelve.open(path.join(CACHE_DIR, 'feature')) if not cache_db.has_key(text_hash): print 'new call' call = alchemyapi.combined('text', text) cache_db[text_hash] = call else: print 'cached call' call = cache_db[text_hash] cache_db.close() # filter results. whitelist = ['concepts', 'entities', 'keywords', 'taxonomy'] for key in whitelist: if key not in call: resp[key] = [] continue resp[key] = call[key] return resp
def proj_create(args, config, _extra_args): """Creates a new em-managed project.""" tmpl_repo = config['project']['template_repo'] try: pygit2.clone_repository(tmpl_repo, args.dest) # delete history of template shutil.rmtree(osp.join(args.dest, '.git'), ignore_errors=True) pygit2.init_repository(args.dest) except ValueError: pass # already in a repo for em_dir in ['experiments', 'data']: dpath = osp.join(args.dest, em_dir) if not osp.isdir(dpath): os.mkdir(dpath) with shelve.open(osp.join(args.dest, '.em')) as emdb: emdb['__em__'] = {}
def resume(args, config, prog_args): """Resume a stopped experiment.""" name = args.name repo = pygit2.Repository('.') with shelve.open('.em') as emdb: if name not in emdb: return _die(E_NO_EXP.format(name)) info = emdb[name] if 'pid' in info or info.get('status') == 'running': return _die(E_IS_RUNNING.format(name)) try: repo.lookup_branch(name) except pygit2.GitError: return _die(E_NO_EXP.format(name)) prog_args.append('--resume') if args.epoch: prog_args.append(args.epoch) return _run_job(name, config, args.gpu, prog_args, args.background)
def list_experiments(args, _config, _extra_args): """List experiments.""" import subprocess if args.filter: filter_key, filter_value = args.filter.split('=') def _filt(stats): return filter_key in stats and stats[filter_key] == filter_value with shelve.open('.em') as emdb: if args.filter: names = {name for name, info in sorted(emdb.items()) if _filt(info)} else: names = emdb.keys() names -= {EM_KEY} if not names: return subprocess.run( ['column'], input='\n'.join(sorted(names)) + '\n', encoding='utf8')
def show(args, _config, _extra_args): """Show details about an experiment.""" import pickle import pprint name = args.name with shelve.open('.em') as emdb: if name not in emdb or name == EM_KEY: return _die(E_NO_EXP.format(name)) for info_name, info_val in sorted(emdb[name].items()): if isinstance(info_val, datetime.date): info_val = info_val.ctime() print(f'{info_name}: {info_val}') if not args.opts: return opts_path = _expath(name, 'run', 'opts.pkl') with open(opts_path, 'rb') as f_opts: print('\noptions:') opts = pickle.load(f_opts) cols = shutil.get_terminal_size((80, 20)).columns pprint.pprint(vars(opts), indent=2, compact=True, width=cols)
def get_pdf(pdf_link): # check whether value already existing in permanent storage: pdf_name = pdf_link.rsplit('/', 1)[-1] # set filename according to last element of link if not check_db(pdf_name) and not check_db(pdf_link): # print 'Downloading: {}'.format(pdf_link) try: opener = urllib2.build_opener() opener.addheaders = [('User-agent', USER_AGENT)] r = opener.open(pdf_link) path = tmp_dir + pdf_name with open(path, "wb") as code: # 'w' code.write(r.read()) # log successful download: log_download('DOWNLOADED: {}'.format(pdf_link)) except Exception as e: log_download('FAILURE: {} | {}'.format(pdf_link, e)) else: log_download('File already downloaded: {}'.format(pdf_name))
def _get_data(url): """Helper function to get data over http(s) or from a local file""" if urllib_parse.urlparse(url).scheme in ('http', 'https'): resp = urllib_request.urlopen(url) encoding = resp.headers.get('content-encoding', 'plain') data = resp.read() if encoding == 'plain': data = data.decode('utf-8') elif encoding == 'gzip': data = BytesIO(data) data = gzip.GzipFile(fileobj=data).read().decode('utf-8') else: raise RuntimeError('unknown encoding') else: with codecs.open(url, mode='r', encoding='utf-8') as fid: data = fid.read() return data
def fill_tf_idf_shelve(self): tf_idf_shelve = shelve.open(self.tf_idf_shelve_file_name, writeback=True) if TF not in tf_idf_shelve: tf_idf_shelve[TF] = {} if DF not in tf_idf_shelve: tf_idf_shelve[DF] = {} if D not in tf_idf_shelve: tf_idf_shelve[D] = 0 if TF_IDF not in tf_idf_shelve: tf_idf_shelve[TF_IDF] = {} if CENTROID not in tf_idf_shelve: tf_idf_shelve[CENTROID] = {} for action,trigger_txt in self.trigger_dict.iteritems(): if action not in tf_idf_shelve[TF].keys(): trigger = self.tokenize_text(trigger_txt) tf_idf_shelve[TF][action] = Counter(trigger) for word in unique(trigger): if word not in tf_idf_shelve[DF].keys(): tf_idf_shelve[DF][word] = 0 tf_idf_shelve[DF][word] += 1 tf_idf_shelve[D] = len(tf_idf_shelve[TF]) tf_idf_shelve.close() self.compute_tf_idf() self.compute_centroids()
def add_list_of_words_in_w2v_model(self, unknown_words): huge_w2v_model_file = open(self.w2v_huge_model_path, "r") current_w2v_model_file = open(self.w2v_model_path, "a") line = huge_w2v_model_file.readline() unknown_words_left = len(unknown_words) while line and unknown_words_left: word = line.split()[0] if word in unknown_words: current_w2v_model_file.write(line) unknown_words = unknown_words - set([word]) unknown_words_left -= 1 line = huge_w2v_model_file.readline() for word in list(unknown_words): random_position = random(self.w2v_model.vector_size)*2-1 current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position]))) print "warning random positions introduced for new words ... in the future this should be solved" current_w2v_model_file.close() huge_w2v_model_file.close()
def list_files(self, path): storage = self.args.storage shelve_name = os.path.join(os.path.dirname(__file__), storage) db = shelve.open(shelve_name) status = [] for key, value in db.items(): value['hash'] = key status.append(value) status.sort(lambda a,b: cmp(a['filename'],b['filename'])) stream = StringIO.StringIO() writer = csv.writer(stream) writer.writerow(('FILENAME','GIVEN_NAME','CREATED_ON','DISCOVERED_ON','UPLOADED_ON')) for params in status: writer.writerow((params['filename'], params['given_name'],params['camera'], params['timestamp'], params['discovered_on'], params['uploaded_on'])) return stream.getvalue()
def read(): db = shelve.open(filename) imgs = db['imgs'] data = db['data'] for i in range(len(imgs)): d = data[i] print(i, d) img = imgs[i] img = np.fromstring(img, np.uint8) frame = cv2.imdecode(img, 1) print('frame[{}] {}'.format(i, frame.shape)) cv2.imshow('camera', frame) cv2.waitKey(300) print('bye ...') cv2.destroyAllWindows() db.close()
def get_latest_episode(self, url, media=False): storage_path = join(self.file_system.path, 'feedcache') LOGGER.debug("storage_path:%s" % storage_path) storage = shelve.open(storage_path) ttl = 60 * 60 link = "" try: fc = cache.Cache(storage, timeToLiveSeconds=ttl) parsed_data = fc.fetch(url) print "parsed_data.feed.title:", parsed_data.feed.title for entry in parsed_data.entries: pprint(entry) if media: media_content = entry.media_content if media_content: link = entry.media_content[0]['url'] else: link = entry.link if link: break finally: storage.close() return link
def dump(self, result_storage): # The 'tainted' attribute is automatically set to 'True' # if the dataset required for an answer test is missing # (see can_run_ds() and can_run_sim()). # This logic check prevents creating a shelve with empty answers. storage_is_tainted = result_storage.get('tainted', False) if self.answer_name is None or storage_is_tainted: return # Store data using shelve ds = shelve.open(self.answer_name, protocol=-1) for ds_name in result_storage: answer_name = "%s" % ds_name if answer_name in ds: mylog.info("Overwriting %s", answer_name) ds[answer_name] = result_storage[ds_name] ds.close()
def val_dump(rels, db): """ Make a ``Valuation`` from a list of relation metadata bundles and dump to persistent database. :param rels: bundle of metadata needed for constructing a concept :type rels: list of dict :param db: name of file to which data is written. The suffix '.db' will be automatically appended. :type db: str """ concepts = process_bundle(rels).values() valuation = make_valuation(concepts, read=True) db_out = shelve.open(db, 'n') db_out.update(valuation) db_out.close()
def label_indivs(valuation, lexicon=False): """ Assign individual constants to the individuals in the domain of a ``Valuation``. Given a valuation with an entry of the form ``{'rel': {'a': True}}``, add a new entry ``{'a': 'a'}``. :type valuation: Valuation :rtype: Valuation """ # collect all the individuals into a domain domain = valuation.domain # convert the domain into a sorted list of alphabetic terms # use the same string as a label pairs = [(e, e) for e in domain] if lexicon: lex = make_lex(domain) with open("chat_pnames.cfg", 'w') as outfile: outfile.writelines(lex) # read the pairs into the valuation valuation.update(pairs) return valuation
def auth(self): connect_database = shelve.open("../database/database") database = connect_database.get("data") verify = UserVerify(database) conn = self.request str() print(bytes(u"??????")) conn.send(bytes("??????:")) username = conn.recv(1024) conn.send(bytes("?????:")) password = conn.recv(1024) login = None if username and password: try: login = verify.login(user=username, password=password) except SystemExit as e: print(e) conn.close() if not login: return False else: return True
def __init__(self, db_path): self.db_path = db_path self.db = shelve.open(self.db_path, 'c') self.lock = threading.Lock() self.stopped = False
def __init__(self, filename, flag='c', protocol=None, writeback=False): import anydbm Shelf.__init__(self, anydbm.open(filename, flag), protocol, writeback)
def open(filename, flag='c', protocol=None, writeback=False): """Open a persistent dictionary for reading and writing. The filename parameter is the base filename for the underlying database. As a side-effect, an extension may be added to the filename and more than one file may be created. The optional flag parameter has the same interpretation as the flag parameter of anydbm.open(). The optional protocol parameter specifies the version of the pickle protocol (0, 1, or 2). See the module's __doc__ string for an overview of the interface. """ return DbfilenameShelf(filename, flag, protocol, writeback)
def parse(self): if not os.path.isdir("recipes"): raise ParseError("No recipes directory found.") self.__cache.open() try: self.__parse() # config files overrule everything else for c in self.__configFiles: c = str(c) + ".yaml" if not os.path.isfile(c): raise ParseError("Config file {} does not exist!".format(c)) self.__parseUserConfig(c) finally: self.__cache.close()
def __generatePackages(self, nameFormatter, env, cacheKey, sandboxEnabled): # use separate caches with and without sandbox if sandboxEnabled: cacheName = ".bob-packages-sb.pickle" else: cacheName = ".bob-packages.pickle" # try to load the persisted packages states = { n:s() for (n,s) in self.__states.items() } rootPkg = Package() rootPkg.construct("<root>", [], nameFormatter, None, [], [], states, {}, {}, None, None, [], {}, -1) try: with open(cacheName, "rb") as f: persistedCacheKey = f.read(len(cacheKey)) if cacheKey == persistedCacheKey: tmp = PackageUnpickler(f, self.getRecipe, self.__plugins, nameFormatter).load() return tmp.toStep(nameFormatter, rootPkg).getPackage() except (EOFError, OSError, pickle.UnpicklingError): pass # not cached -> calculate packages result = self.__rootRecipe.prepare(nameFormatter, env, sandboxEnabled, states)[0] # save package tree for next invocation tmp = CoreStepRef(rootPkg, result.getPackageStep()) try: newCacheName = cacheName + ".new" with open(newCacheName, "wb") as f: f.write(cacheKey) PackagePickler(f, nameFormatter).dump(tmp) os.replace(newCacheName, cacheName) except OSError as e: print("Error saving internal state:", str(e), file=sys.stderr) return result
def open(self): self.__shelve = shelve.open(".bob-cache.shelve") self.__files = {}
def loadYaml(self, name, yamlSchema, default): bs = binStat(name) if name in self.__shelve: cached = self.__shelve[name] if ((cached['lstat'] == bs) and (cached.get('vsn') == BOB_INPUT_HASH)): self.__files[name] = cached['digest'] return cached['data'] with open(name, "r") as f: try: rawData = f.read() data = yaml.safe_load(rawData) digest = hashlib.sha1(rawData.encode('utf8')).digest() except Exception as e: raise ParseError("Error while parsing {}: {}".format(name, str(e))) if data is None: data = default try: data = yamlSchema.validate(data) except schema.SchemaError as e: raise ParseError("Error while validating {}: {}".format(name, str(e))) self.__files[name] = digest self.__shelve[name] = { 'lstat' : bs, 'data' : data, 'vsn' : BOB_INPUT_HASH, 'digest' : digest } return data
def loadBinary(self, name): with open(name, "rb") as f: result = f.read() self.__files[name] = hashlib.sha1(result).digest() return result
def build_prj1_code_graph(self): if not self.prj1_scitools_client.project_exists(): print('understand project does not exist, ' 'first run "$ prj1 understand --build"') else: with shelve.open(str(self.shelve_prj1_code_db_path)) as db: self.prj1_scitools_client.open_project() scitools_project = self.prj1_scitools_client.build_project( self.prj1_code_repo_path) self.prj1_scitools_client.close_project() db['code_graph'] = scitools_project print('loaded scitools project of size', len(scitools_project.code_graph)) print('entity kinds:', scitools_project.entity_kinds) print('ref kinds:', scitools_project.ref_kinds)
def build_git_graph(self): with shelve.open(str(self.shelve_db_path)) as db: git_client = GitClient(self.git_repo_path) git_graph = git_client.build_commit_graph() git_client.add_commit_tree(git_graph, ref_name='origin/master') db['git_graph'] = git_graph self.git_graph = git_graph
def load_git_graph(self): with shelve.open(str(self.shelve_db_path)) as db: if 'git_graph' in db: self.git_graph = db['git_graph'] else: self.git_graph = None return self.git_graph
def build_code_graph(self): if not self.scitools_client.project_exists(): print('understand project does not exist, ' 'first run "$ povray understand --build"') else: with shelve.open(str(self.shelve_db_path)) as db: self.scitools_client.open_project() self.scitools_project = self.scitools_client.build_project( self.git_repo_path) self.scitools_client.close_project() db['code_graph'] = self.scitools_project print('loaded scitools project of size', len(self.scitools_project.code_graph)) print('entity kinds:', self.scitools_project.entity_kinds) print('ref kinds:', self.scitools_project.entity_kinds)
def load_code_graph(self) -> ScitoolsProject: with shelve.open(str(self.shelve_db_path)) as db: if 'code_graph' in db: self.scitools_project = db['code_graph'] else: self.scitools_project = None return self.scitools_project
def test_unshelve_similarity_graph(data_root): with shelve.open(str(data_root / 'test-similarity_graph.shelve')) as db: graph = db['similarity_graph'] assert graph is not None print() actor_groups = graph.group_similar_actors() pprint(actor_groups)
def load_game(mySaveNum): myShelfFile = "./save"+str(mySaveNum) print("Save file: {0}".format(myShelfFile)) myOpenShelf = shelve.open(myShelfFile) cities = myOpenShelf['cities'] clubs = myOpenShelf['clubs'] humans = myOpenShelf['humans'] leagues = myOpenShelf['leagues'] managers = myOpenShelf['managers'] chairmans = myOpenShelf['chairmans'] currentCity = myOpenShelf['currentCity'] chosenClub = myOpenShelf['chosenClub'] chosenLeague = myOpenShelf['chosenLeague'] myOpenShelf.close() menuMain(chosenLeague, chosenClub)
def save_game(myChosenLeague, myChosenClub, mySaveNum=0, myCurrentCity=0): myShelfFile = "./save"+str(mySaveNum) print("Save file: {0}".format(myShelfFile)) myOpenShelf = shelve.open(myShelfFile) myOpenShelf['cities'] = cities myOpenShelf['clubs'] = clubs myOpenShelf['humans'] = humans myOpenShelf['leagues'] = leagues myOpenShelf['managers'] = managers myOpenShelf['chairmans'] = chairmans myOpenShelf['currentCity'] = myCurrentCity myOpenShelf['chosenClub'] = myChosenClub myOpenShelf['chosenLeague'] = myChosenLeague myOpenShelf.close()
def get(self, *a, **kw): self.open() return super(FileCache, self).get(*a, **kw)
def save(self): self.open() self.storage.sync() logger.debug("Saved %s.", SETTINGS.CACHE_PATH)