我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pickle.HIGHEST_PROTOCOL。
def lcdict_to_pickle(lcdict, outfile=None): '''This just writes the lcdict to a pickle. If outfile is None, then will try to get the name from the lcdict['objectid'] and write to <objectid>-hptxtlc.pkl. If that fails, will write to a file named hptxtlc.pkl'. ''' if not outfile and lcdict['objectid']: outfile = '%s-hplc.pkl' % lcdict['objectid'] elif not outfile and not lcdict['objectid']: outfile = 'hplc.pkl' with open(outfile,'wb') as outfd: pickle.dump(lcdict, outfd, protocol=pickle.HIGHEST_PROTOCOL) if os.path.exists(outfile): LOGINFO('lcdict for object: %s -> %s OK' % (lcdict['objectid'], outfile)) return outfile else: LOGERROR('could not make a pickle for this lcdict!') return None
def register(name): # hit api to see if name is already registered if check_name(name)['status'] == 'error': print('{} already registered.'.format(name)) else: # generate new keypair (pub, priv) = rsa.newkeys(512) if os.path.exists(KEY_LOCATION) == False: os.mkdir(KEY_LOCATION) # save to disk with open('{}/.key'.format(KEY_LOCATION), 'wb') as f: pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL) r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e}) if r.json()['status'] == 'success': print('Successfully registered new name: {}'.format(name)) else: print('Error registering name: {}'.format(name))
def get_item_history(self, prior_or_train, reconstruct = False, none_idx = 49689): filepath = self.cache_dir + './item_history_' + prior_or_train + '.pkl' if (not reconstruct) and os.path.exists(filepath): with open(filepath, 'rb') as f: item_history = pickle.load(f) else: up = self.get_users_orders(prior_or_train).sort_values(['user_id', 'order_number', 'product_id'], ascending = True) item_history = up.groupby(['user_id', 'order_number'])['product_id'].apply(list).reset_index() item_history.loc[item_history.order_number == 1, 'product_id'] = item_history.loc[item_history.order_number == 1, 'product_id'] + [none_idx] item_history = item_history.sort_values(['user_id', 'order_number'], ascending = True) # accumulate item_history['product_id'] = item_history.groupby(['user_id'])['product_id'].transform(pd.Series.cumsum) # get unique item list item_history['product_id'] = item_history['product_id'].apply(set).apply(list) item_history = item_history.sort_values(['user_id', 'order_number'], ascending = True) # shift each group to make it history item_history['product_id'] = item_history.groupby(['user_id'])['product_id'].shift(1) for row in item_history.loc[item_history.product_id.isnull(), 'product_id'].index: item_history.at[row, 'product_id'] = [none_idx] item_history = item_history.sort_values(['user_id', 'order_number'], ascending = True).groupby(['user_id'])['product_id'].apply(list).reset_index() item_history.columns = ['user_id', 'history_items'] with open(filepath, 'wb') as f: pickle.dump(item_history, f, pickle.HIGHEST_PROTOCOL) return item_history
def run(self): """ Entry point for the live plotting when started as a separate process. This starts the loop """ self.entity_name = current_process().name plogger.info("Starting new thread %s", self.entity_name) self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.connect("tcp://localhost:%d" % self.port) topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL) self.socket.setsockopt(zmq.SUBSCRIBE, topic) plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port) self.init(**self.init_kwargs) # Reference to animation required so that GC doesn't clean it up. # WILL NOT work if you remove it!!!!! # See: http://matplotlib.org/api/animation_api.html ani = animation.FuncAnimation(self.fig, self.loop, interval=100) self.plt.show()
def gt_roidb(self): """ Return the database of ground-truth regions of interest. This function loads/saves from/to a cache file to speed up future calls. """ cache_file = os.path.join(self.cache_path, self.name + '_gt_roidb.pkl') if os.path.exists(cache_file): with open(cache_file, 'rb') as fid: try: roidb = pickle.load(fid) except: roidb = pickle.load(fid, encoding='bytes') print('{} gt roidb loaded from {}'.format(self.name, cache_file)) return roidb gt_roidb = [self._load_pascal_annotation(index) for index in self.image_index] with open(cache_file, 'wb') as fid: pickle.dump(gt_roidb, fid, pickle.HIGHEST_PROTOCOL) print('wrote gt roidb to {}'.format(cache_file)) return gt_roidb
def set(self, key, value, timeout=None): if timeout is None: timeout = int(time() + self.default_timeout) elif timeout != 0: timeout = int(time() + timeout) filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(timeout, f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True
def _dump_cache_data(self, simstate, dump_fp=None): if self.tracer.predecessors[-1] != None: state = self.tracer.predecessors[-1] else: state = None if dump_fp: proj = state.project state.project = None state.history.trim() try: pickle.dump((self.tracer.bb_cnt, self.tracer.cgc_flag_bytes, state, claripy.ast.base.var_counter), dump_fp, pickle.HIGHEST_PROTOCOL) except RuntimeError as e: # maximum recursion depth can be reached here l.error("unable to cache state, '%s' during pickling", e.message) finally: state.project = proj # unhook receive receive.cache_hook = None # add preconstraints to tracer self.tracer._preconstrain_state(simstate)
def __init__(self, writer, reducers=None, protocol=pickle.HIGHEST_PROTOCOL): pickle.Pickler.__init__(self, writer, protocol=protocol) self.extended_init = set() if reducers is None: reducers = {} if hasattr(pickle.Pickler, 'dispatch'): # Make the dispatch registry an instance level attribute instead of # a reference to the class dictionary under Python 2 self.dispatch = pickle.Pickler.dispatch.copy() else: # Under Python 3 initialize the dispatch table with a copy of the # default registry self.dispatch_table = copyreg.dispatch_table.copy() for type, reduce_func in reducers.items(): self.register(type, reduce_func)
def _save(self): """ Export the data to a more permanent location. """ log.debug("Writing cache to file...") data = { 'pokemon_hist': self._pokemon_hist, 'pokestop_hist': self._pokestop_hist, 'gym_team': self._gym_team, 'gym_info': self._gym_info, 'egg_hist': self._egg_hist, 'raid_hist': self._raid_hist } log.debug(self._pokestop_hist) log.debug("SAVED: {}".format(data)) try: with portalocker.Lock(self._file, timeout=5, mode="wb+") as f: pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) except Exception as e: log.error("Encountered error while saving cache: {}: {}".format(type(e).__name__, e)) log.debug("Stack trace: \n {}".format(traceback.format_exc()))
def set(self, key, value, timeout=None): timeout = self._normalize_timeout(timeout) filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(timeout, f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True
def test_discover_with_init_module_that_raises_SkipTest_on_import(self): vfs = {abspath('/foo'): ['my_package'], abspath('/foo/my_package'): ['__init__.py', 'test_module.py']} self.setup_import_issue_package_tests(vfs) import_calls = [] def _get_module_from_name(name): import_calls.append(name) raise unittest.SkipTest('skipperoo') loader = unittest.TestLoader() loader._get_module_from_name = _get_module_from_name suite = loader.discover(abspath('/foo')) self.assertIn(abspath('/foo'), sys.path) self.assertEqual(suite.countTestCases(), 1) result = unittest.TestResult() suite.run(result) self.assertEqual(len(result.skipped), 1) self.assertEqual(result.testsRun, 1) self.assertEqual(import_calls, ['my_package']) # Check picklability for proto in range(pickle.HIGHEST_PROTOCOL + 1): pickle.loads(pickle.dumps(suite, proto))
def testPickle(self): # Issue 10326 # Can't use TestCase classes defined in Test class as # pickle does not work with inner classes test = unittest2.TestCase('run') for protocol in range(pickle.HIGHEST_PROTOCOL + 1): # blew up prior to fix pickled_test = pickle.dumps(test, protocol=protocol) unpickled_test = pickle.loads(pickled_test) self.assertEqual(test, unpickled_test) # exercise the TestCase instance in a way that will invoke # the type equality lookup mechanism unpickled_test.assertEqual(set(), set())
def cache_it(self, key, f, time_expire): if self.debug: self.r_server.incr('web2py_cache_statistics:misses') cache_set_key = self.cache_set_key expire_at = int(time.time() + time_expire) + 120 bucket_key = "%s:%s" % (cache_set_key, expire_at / 60) value = f() value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) if time_expire == 0: time_expire = 1 self.r_server.setex(key, time_expire, value_) # print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expire_at)) # print 'that will expire on %s' % (bucket_key, time.ctime(((expire_at / 60) + 1) * 60)) p = self.r_server.pipeline() # add bucket to the fixed set p.sadd(cache_set_key, bucket_key) # sets the key p.setex(key, time_expire, value_) # add the key to the bucket p.sadd(bucket_key, key) # expire the bucket properly p.expireat(bucket_key, ((expire_at / 60) + 1) * 60) p.execute() return value
def test_pickle(self): import pickle self.compile("mypoint.capnp", """ @0xbf5147cbbecf40c1; struct Point { x @0 :Int64; y @1 :Int64; } """) mypoint = self.import_('mypoint') p1 = mypoint.Point(1, 2) for proto in (0, pickle.HIGHEST_PROTOCOL): s = pickle.dumps(p1, proto) p2 = pickle.loads(s) assert p2.x == 1 assert p2.y == 2
def set(self, key, value, timeout=None): if timeout is None: timeout = self.default_timeout filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) f = os.fdopen(fd, 'wb') try: pickle.dump(int(time() + timeout), f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) finally: f.close() rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): pass
def set(self, key, value, timeout=None): if timeout is None: timeout = self.default_timeout filename = self._get_filename(key) self._prune() try: fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix, dir=self._path) with os.fdopen(fd, 'wb') as f: pickle.dump(int(time() + timeout), f, 1) pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) rename(tmp, filename) os.chmod(filename, self._mode) except (IOError, OSError): return False else: return True
def remap_start( paths, use_json=False, ): filepath_remap = "bam_remap.data" for p in paths: if not os.path.exists(p): fatal("Path %r not found!" % p) paths = [p.encode('utf-8') for p in paths] if os.path.exists(filepath_remap): fatal("Remap in progress, run with 'finish' or remove %r" % filepath_remap) from bam.blend import blendfile_path_remap remap_data = blendfile_path_remap.start( paths, use_json=use_json, ) with open(filepath_remap, 'wb') as fh: import pickle pickle.dump(remap_data, fh, pickle.HIGHEST_PROTOCOL) del pickle
def timing_experiments(TCC_dist, TCC_dls_dist, num_cells, distribution_flname, distance_flname): num_processes=1 distance_time=[] distance_time_dls=[] for num in num_cells: TCC_dist_short= TCC_dist[0:num, :] TCC_dls_dist_short= TCC_dls_dist[0:num, :] dist_flname='timing_exp/'+ distribution_flname + str(num) +'.dat' dist_dls_flname= 'timing_exp/'+ distribution_flname + '_dls_' + str(num) +'.dat' distan_flname='timing_exp/'+ distance_flname + str(num) +'.dat' distan_dls_flname= 'timing_exp/'+ distance_flname + '_dls_' + str(num) +'.dat' with open(dist_flname , 'wb') as outfile: pickle.dump(scipy.sparse.csr_matrix(TCC_dist_short.todense()), outfile, pickle.HIGHEST_PROTOCOL) with open(dist_dls_flname , 'wb') as outfile: pickle.dump(scipy.sparse.csr_matrix(TCC_dls_dist_short.todense()), outfile, pickle.HIGHEST_PROTOCOL) t=time() os.system('python get_pairwise_distances.py '+dist_flname +' '+distan_flname+' '+str(num_processes)) distance_time.append( time() - t ) t=time() os.system('python get_pairwise_distances.py '+dist_dls_flname +' '+distan_dls_flname+' '+str(num_processes)) distance_time_dls.append( time() - t) return(distance_time, distance_time_dls)
def test_load(): pu = p_utils.PickleUtils() filename = os.path.join(TEST_RESOURCES, 'test.pickle') expected_data = { 'test': 'string' } with open(filename, 'wb') as f: pickle.dump(expected_data, f, pickle.HIGHEST_PROTOCOL) data = pu.load(filename) assert os.path.exists(filename) assert expected_data['test'] == data['test'] # Cleanup os.remove(filename)
def savePosScores(pos_tags_scores_neutral, pos_tags_scores_positive,pos_tags_scores_negative,pos_bigrams_scores_neutral,pos_bigrams_scores_positive,pos_bigrams_scores_negative,pos_trigrams_scores_neutral,pos_trigrams_scores_positive,pos_trigrams_scores_negative,mpqaScores): with open('resources/scores.pkl', 'wb') as output: pickle.dump(pos_tags_scores_neutral, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_tags_scores_positive, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_tags_scores_negative, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_bigrams_scores_neutral, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_bigrams_scores_positive, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_bigrams_scores_negative, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_trigrams_scores_neutral, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_trigrams_scores_positive, output, pickle.HIGHEST_PROTOCOL) pickle.dump(pos_trigrams_scores_negative, output, pickle.HIGHEST_PROTOCOL) pickle.dump(mpqaScores, output, pickle.HIGHEST_PROTOCOL) print "POS scores saved" #save lexicons
def trace(self, signum, frame): # pylint: disable=unused-argument """ Signal handler used to take snapshots of the running process. """ # the last pending signal after trace_stop if not self.profiling: return gc.collect() snapshot = tracemalloc.take_snapshot() timestamp = time.time() sample_data = (timestamp, snapshot) # *Must* use the HIGHEST_PROTOCOL, otherwise the serialization will # use GBs of memory pickle.dump(sample_data, self.trace_stream, protocol=pickle.HIGHEST_PROTOCOL) self.trace_stream.flush()
def snapshot(self, sess, iter_num): if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) # Store the model snapshot filename = cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_{:d}'.format(iter_num) + '.ckpt' filename = os.path.join(self.output_dir, filename) self.saver.save(sess, filename) print('Wrote snapshot to: {:s}'.format(filename)) # Also store some meta information, random state, etc. nfilename = cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_{:d}'.format(iter_num) + '.pkl' nfilename = os.path.join(self.output_dir, nfilename) # current state of numpy random st0 = np.random.get_state() # Dump the meta info with open(nfilename, 'wb') as fid: pickle.dump(st0, fid, pickle.HIGHEST_PROTOCOL) return filename, nfilename
def serialize(obj): return pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)
def get_users_orders(self, prior_or_train): ''' get users' prior detailed orders ''' if os.path.exists(self.cache_dir + 'users_orders.pkl'): with open(self.cache_dir + 'users_orders.pkl', 'rb') as f: users_orders = pickle.load(f) else: orders = self.get_orders() order_products_prior = self.get_orders_items(prior_or_train) users_orders = pd.merge(order_products_prior, orders[['user_id', 'order_id', 'order_number', 'days_up_to_last']], on = ['order_id'], how = 'left') with open(self.cache_dir + 'users_orders.pkl', 'wb') as f: pickle.dump(users_orders, f, pickle.HIGHEST_PROTOCOL) return users_orders
def get_users_products(self, prior_or_train): ''' get users' all purchased products ''' if os.path.exists(self.cache_dir + 'users_products.pkl'): with open(self.cache_dir + 'users_products.pkl', 'rb') as f: users_products = pickle.load(f) else: users_products = self.get_users_orders(prior_or_train)[['user_id', 'product_id']].drop_duplicates() users_products['product_id'] = users_products.product_id.astype(int) users_products['user_id'] = users_products.user_id.astype(int) users_products = users_products.groupby(['user_id'])['product_id'].apply(list).reset_index() with open(self.cache_dir + 'users_products.pkl', 'wb') as f: pickle.dump(users_products, f, pickle.HIGHEST_PROTOCOL) return users_products
def get_baskets(self, prior_or_train, reconstruct = False, reordered = False, none_idx = 49689): ''' get users' baskets ''' if reordered: filepath = self.cache_dir + './reorder_basket_' + prior_or_train + '.pkl' else: filepath = self.cache_dir + './basket_' + prior_or_train + '.pkl' if (not reconstruct) and os.path.exists(filepath): with open(filepath, 'rb') as f: up_basket = pickle.load(f) else: up = self.get_users_orders(prior_or_train).sort_values(['user_id', 'order_number', 'product_id'], ascending = True) uid_oid = up[['user_id', 'order_number']].drop_duplicates() up = up[up.reordered == 1][['user_id', 'order_number', 'product_id']] if reordered else up[['user_id', 'order_number', 'product_id']] up_basket = up.groupby(['user_id', 'order_number'])['product_id'].apply(list).reset_index() up_basket = pd.merge(uid_oid, up_basket, on = ['user_id', 'order_number'], how = 'left') for row in up_basket.loc[up_basket.product_id.isnull(), 'product_id'].index: up_basket.at[row, 'product_id'] = [none_idx] up_basket = up_basket.sort_values(['user_id', 'order_number'], ascending = True).groupby(['user_id'])['product_id'].apply(list).reset_index() up_basket.columns = ['user_id', 'reorder_basket'] if reordered else ['user_id', 'basket'] #pdb.set_trace() with open(filepath, 'wb') as f: pickle.dump(up_basket, f, pickle.HIGHEST_PROTOCOL) return up_basket
def donations(filename='donationdata.pickle'): try: print("donation data pickled already. Grabbing data from donationdata.picke") with open(filename, 'rb') as handle: donations = pickle.load(handle) return donations except EOFError: print("donation data not pickled, grabbing directly from FEC and ProPublica APIs") donations = donations_helper() with open(filename, 'wb') as handle: pickle.dump(donations, handle, protocol=pickle.HIGHEST_PROTOCOL) return donations
def __init__(self, process_obj): # create pipe for communication with child rfd, wfd = os.pipe() # get handle for read end of the pipe and make it inheritable rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) os.close(rfd) # start process cmd = get_command_line() + [rhandle] cmd = ' '.join('"%s"' % x for x in cmd) hp, ht, pid, tid = _subprocess.CreateProcess( _python_exe, cmd, None, None, 1, 0, None, None, None ) ht.Close() close(rhandle) # set attributes of self self.pid = pid self.returncode = None self._handle = hp # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close()
def save_pickle(filename, save): try: f = open(filename, 'wb') pickle.dump(save, f, pickle.HIGHEST_PROTOCOL) f.close() except Exception as e: logging.error(f'Unable to save data to {filename}: {e}') raise
def _write_entries(self, entries): log_file = open(self._filename, 'wb') try: log_file.seek(0) for entry in entries: pickle.dump(entry, log_file, pickle.HIGHEST_PROTOCOL) log_file.flush() os.fsync(log_file.fileno()) finally: log_file.close()
def record(self, var_name, var_value): """ Call this method each time you want to record a variable with name `var_name` and value `var_value`. Usually, there is one plot for each `var_name`. :param var_name: Name of variable to record :param var_value: Value of variable to record """ assert not isinstance(var_value, type(SENTINEL)) or var_value != SENTINEL, \ "You cannot record a value {} since this conflicts with the internal SENTINEL string" topic = pickle.dumps(var_name, protocol=pickle.HIGHEST_PROTOCOL) messagedata = pickle.dumps(var_value, protocol=pickle.HIGHEST_PROTOCOL) self.socket.send_multipart([topic, messagedata]) rlogger.debug("Sent message to topic %s", var_name)
def close(self, var_name): """ Call this method for each variable name `var_name` to clean up the plotting process :param var_name: Name of variable to clean up. """ topic = pickle.dumps(var_name, protocol=pickle.HIGHEST_PROTOCOL) messagedata = pickle.dumps(SENTINEL, protocol=pickle.HIGHEST_PROTOCOL) self.socket.send_multipart([topic, messagedata]) rlogger.debug("Sent close message to topic %s", var_name)
def snapshot(self, sess, iter): net = self.net if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) # Store the model snapshot filename = cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_{:d}'.format(iter) + '.ckpt' filename = os.path.join(self.output_dir, filename) self.saver.save(sess, filename) print('Wrote snapshot to: {:s}'.format(filename)) # Also store some meta information, random state, etc. nfilename = cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_{:d}'.format(iter) + '.pkl' nfilename = os.path.join(self.output_dir, nfilename) # current state of numpy random st0 = np.random.get_state() # current position in the database cur = self.data_layer._cur # current shuffled indexes of the database perm = self.data_layer._perm # current position in the validation database cur_val = self.data_layer_val._cur # current shuffled indexes of the validation database perm_val = self.data_layer_val._perm # Dump the meta info with open(nfilename, 'wb') as fid: pickle.dump(st0, fid, pickle.HIGHEST_PROTOCOL) pickle.dump(cur, fid, pickle.HIGHEST_PROTOCOL) pickle.dump(perm, fid, pickle.HIGHEST_PROTOCOL) pickle.dump(cur_val, fid, pickle.HIGHEST_PROTOCOL) pickle.dump(perm_val, fid, pickle.HIGHEST_PROTOCOL) pickle.dump(iter, fid, pickle.HIGHEST_PROTOCOL) return filename, nfilename
def gt_roidb(self): """ Return the database of ground-truth regions of interest. This function loads/saves from/to a cache file to speed up future calls. """ cache_file = osp.join(self.cache_path, self.name + '_gt_roidb.pkl') if osp.exists(cache_file): with open(cache_file, 'rb') as fid: roidb = pickle.load(fid) print('{} gt roidb loaded from {}'.format(self.name, cache_file)) return roidb gt_roidb = [self._load_coco_annotation(index) for index in self._image_index] with open(cache_file, 'wb') as fid: pickle.dump(gt_roidb, fid, pickle.HIGHEST_PROTOCOL) print('wrote gt roidb to {}'.format(cache_file)) return gt_roidb
def _do_detection_eval(self, res_file, output_dir): ann_type = 'bbox' coco_dt = self._COCO.loadRes(res_file) coco_eval = COCOeval(self._COCO, coco_dt) coco_eval.params.useSegm = (ann_type == 'segm') coco_eval.evaluate() coco_eval.accumulate() self._print_detection_eval_metrics(coco_eval) eval_file = osp.join(output_dir, 'detection_results.pkl') with open(eval_file, 'wb') as fid: pickle.dump(coco_eval, fid, pickle.HIGHEST_PROTOCOL) print('Wrote COCO eval results to: {}'.format(eval_file))
def save_pickle(obj, dir, filename): path = os.path.join(dir + filename + '.pkl') with open(path, 'wb') as f: pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def kepler_lcdict_to_pkl(lcdict, outfile=None): '''This simply writes the lcdict to a pickle. ''' if not outfile: outfile = '%s-keplc.pkl' % lcdict['objectid'].replace(' ','-') # we're using pickle.HIGHEST_PROTOCOL here, this will make Py3 pickles # unreadable for Python 2.7 with open(outfile,'wb') as outfd: pickle.dump(lcdict, outfd, protocol=pickle.HIGHEST_PROTOCOL) return os.path.abspath(outfile)
def test_unicode_pickle(self): # A tree containing Unicode characters can be pickled. html = u"<b>\N{SNOWMAN}</b>" soup = self.soup(html) dumped = pickle.dumps(soup, pickle.HIGHEST_PROTOCOL) loaded = pickle.loads(dumped) self.assertEqual(loaded.decode(), soup.decode())
def test_unicode_pickle(self): # A tree containing Unicode characters can be pickled. html = "<b>\N{SNOWMAN}</b>" soup = self.soup(html) dumped = pickle.dumps(soup, pickle.HIGHEST_PROTOCOL) loaded = pickle.loads(dumped) self.assertEqual(loaded.decode(), soup.decode())
def save_binary(obj, path): with open(path,'wb') as f: pickle.dump(obj, f, protocol=pickle.HIGHEST_PROTOCOL)
def save(self, session): fn = self.get_session_filename(session.sid) fd, tmp = tempfile.mkstemp(suffix=_fs_transaction_suffix, dir=self.path) f = os.fdopen(fd, 'wb') try: dump(dict(session), f, HIGHEST_PROTOCOL) finally: f.close() try: rename(tmp, fn) os.chmod(fn, self.mode) except (IOError, OSError): pass
def set(self, key, value, timeout=None): expires = self._get_expiration(timeout) self._prune() self._cache[key] = (expires, pickle.dumps(value, pickle.HIGHEST_PROTOCOL)) return True