Python bz2 模块,open() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bz2.open()

项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_csv(self, output_csv, input_fields, write_header=True, top_level=False, mode='a', encoding='utf-8', compression=None):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_csv, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_csv, mode)
        else:
            filehandle = open(output_csv, mode)

        writer = csv.writer(filehandle)
        if write_header:
            writer.writerow(input_fields)
        tweet_parser = TweetParser()

        for tweet in self.get_iterator():
            if top_level:
                ret = list(zip(input_fields, [tweet.get(field) for field in input_fields]))
            else:
                ret = tweet_parser.parse_columns_from_tweet(tweet,input_fields)
            ret_values = [col_val[1] for col_val in ret]
            writer.writerow(ret_values)
        filehandle.close()
项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def get_iterator(self):
        tweet_parser = TweetParser()
        if self.compression == 'bz2':
            self.mode = binary_mode(self.mode)
            csv_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding)
        elif self.compression == 'gzip':
            self.mode = binary_mode(self.mode)
            csv_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding)
        else:       
            csv_handle = open(self.filepath, self.mode, encoding=self.encoding)
        for count, tweet in enumerate(csv.DictReader(csv_handle)):
            if self.limit < count+1 and self.limit != 0:
                csv_handle.close()
                return
            elif tweet_parser.tweet_passes_filter(self.filter, tweet) \
            and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet):
                if self.should_strip:
                    yield tweet_parser.strip_tweet(self.keep_fields, tweet) 
                else: 
                    yield dict(tweet)
        csv_handle.close()
项目:udapi-python    作者:udapi    | 项目源码 | 文件源码
def _token_to_filenames(token):
        if token[0] == '!':
            pattern = token[1:]
            filenames = glob.glob(pattern)
            if not filenames:
                raise RuntimeError('No filenames matched "%s" pattern' % pattern)
        elif token[0] == '@':
            filelist_name = sys.stdin if token == '@-' else token[1:]
            with open(filelist_name) as filelist:
                filenames = [line.rstrip('\n') for line in filelist]
            directory = os.path.dirname(token[1:])
            if directory != '.':
                filenames = [f if f[0] != '/' else directory + '/' + f for f in filenames]
        else:
            filenames = token
        return filenames
项目:udapi-python    作者:udapi    | 项目源码 | 文件源码
def next_filehandle(self):
        """Go to the next file and retrun its filehandle or None (meaning no more files)."""
        filename = self.next_filename()
        if filename is None:
            fhandle = None
        elif filename == '-':
            fhandle = sys.stdin
        else:
            filename_extension = filename.split('.')[-1]
            if filename_extension == 'gz':
                myopen = gzip.open
            elif filename_extension == 'xz':
                myopen = lzma.open
            elif filename_extension == 'bz2':
                myopen = bz2.open
            else:
                myopen = open
            fhandle = myopen(filename, 'rt', encoding=self.encoding)
        self.filehandle = fhandle
        return fhandle
项目:HOPSTACK    作者:willpatterson    | 项目源码 | 文件源码
def get_data(self, save_directory):
        """
        Retrieves data from remote location
        saves data in: save_directory
        TODO:
            figure out how to handle local file paths
            consider directory downloads from html pages with hyperlinks
            ** Impliment custom URL schemes -- Now needs to be done in lasubway.py
            How does raw data fit into this function?
        """

        url = urlunparse(self)
        file_name = os.path.basename(os.path.normpath(self.path))
        save_path = os.path.join(save_directory, file_name)
        with closing(urlopen(url)) as request:
            with open(save_path, 'wb') as sfile:
                shutil.copyfileobj(request, sfile)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def find(cls: Type['Game'], name: str, *, gamedb: Path = SUPPORTED_GAMES) -> 'Game':
        """Find and create instance of a supported game.

        Keyword arguments:
            name: Name of the game to instantiate.
            gamedb: Path to the YAML dictionary of supported games.

        Returns:
            Instance of the supported game.

        Raises:
            UnsupportedGameError: When the name is not found among supported games.
        """

        with gamedb.open(encoding='utf-8') as gamestream:
            games = yaml.load(gamestream)

        defaults = games.get(name.lower(), None)
        if defaults is None:
            msg = _("Game not supported: '{name}'").format_map(locals())
            raise UnsupportedGameError(msg)

        return cls(name=name.capitalize(), **defaults)
项目:xphyle    作者:jdidion    | 项目源码 | 文件源码
def test_decompress_file(self):
        path = self.root.make_file()
        gzfile = path + '.gz'
        with gzip.open(gzfile, 'wt') as o:
            o.write('foo')

        path2 = decompress_file(gzfile, keep=True)
        self.assertEqual(path, path2)
        self.assertTrue(os.path.exists(gzfile))
        self.assertTrue(os.path.exists(path))
        with open(path, 'rt') as i:
            self.assertEqual(i.read(), 'foo')

        with open(gzfile, 'rb') as i:
            path2 = decompress_file(i, keep=True)
            self.assertEqual(path, path2)
            self.assertTrue(os.path.exists(gzfile))
            self.assertTrue(os.path.exists(path))
            with open(path, 'rt') as i:
                self.assertEqual(i.read(), 'foo')
项目:xphyle    作者:jdidion    | 项目源码 | 文件源码
def test_pending(self):
        file1 = self.root.make_file(suffix='.gz')
        with gzip.open(file1, 'wt') as o:
            o.write('foo\nbar\n')
        f = FileInput(char_mode=TextMode)
        self.assertTrue(f._pending)
        f.add(file1)
        list(f)
        self.assertTrue(f.finished)
        self.assertFalse(f._pending)
        file2 = self.root.make_file(suffix='.gz')
        with gzip.open(file2, 'wt') as o:
            o.write('baz\n')
        f.add(file2)
        self.assertTrue(f._pending)
        self.assertFalse(f.finished)
        self.assertEqual('baz\n', f.readline())
        self.assertEqual('', f.readline())
        with self.assertRaises(StopIteration):
            next(f)
        self.assertTrue(f.finished)
        self.assertFalse(f._pending)
项目:xphyle    作者:jdidion    | 项目源码 | 文件源码
def test_rolling_fileoutput_write(self):
        path = self.root.make_file()
        with textoutput(
                path + '{index}.txt', file_output_type=RollingFileOutput,
                lines_per_file=3) as out:
            for i in range(6):
                out.write(i, False)
            for ch in ('a', 'b', 'c'):
                out.write(ch, False)
            out.write("d\ne\nf")
        with open(path + '0.txt', 'rt') as infile:
            self.assertEqual('0\n1\n2\n', infile.read())
        with open(path + '1.txt', 'rt') as infile:
            self.assertEqual('3\n4\n5\n', infile.read())
        with open(path + '2.txt', 'rt') as infile:
            self.assertEqual('a\nb\nc\n', infile.read())
        with open(path + '3.txt', 'rt') as infile:
            self.assertEqual('d\ne\nf\n', infile.read())
项目:RemoteCpp    作者:ruibm    | 项目源码 | 文件源码
def clear_local_caches():
  files = []
  roots = set()
  for window in sublime.windows():
    # All views in a window share the same settings.
    view = window.views()[0]
    cwd = s_cwd(view)
    local_root = File.local_root_for_cwd(cwd)
    roots.add(local_root)
  for root in roots:
      log('Deleting local cache directory [{0}]...'.format(root))
      if os.path.exists(root):
        shutil.rmtree(root)
  for file in files:
    log("Refreshing open file [{0}]...".format(file.remote_path()))
    download_file(file)
项目:RemoteCpp    作者:ruibm    | 项目源码 | 文件源码
def on_text_command(self, view, command_name, args):
    # log('cmd={cmd} args={args}'.format(cmd=command_name, args=args))
    if RemoteCppListFilesCommand.owns_view(view) and \
        command_name == 'insert' and args['characters'] == '\n':
      all_lines = get_multiple_sel_lines(view)
      paths = []
      for line in all_lines:
        if self._is_valid_path(line):
          paths.append(line)
      def run_in_background():
        for path in paths:
          file = File(cwd=s_cwd(), path=path)
          Commands.open_file(view, file.to_args())
      if len(paths) > 10:
        msg = ('This will open {0} files which could be slow. \n'
               'Are you sure you want to do that?').format(len(paths),)
        button_text = 'Open {0} Files'.format(len(paths))
        if not sublime.ok_cancel_dialog(msg, button_text):
          return None
      THREAD_POOL.run(run_in_background)
    return None
项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def get_iterator(self):
        tweet_parser = TweetParser()
        if self.compression == 'bz2':
            self.mode = binary_mode(self.mode)
            json_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding)
        elif self.compression == 'gzip':
            self.mode = binary_mode(self.mode)
            json_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding)
        else:    
            json_handle = open(self.filepath, self.mode, encoding=self.encoding)
        bad_lines = 0
        for count, tweet in enumerate(json_handle):
            if not self.throw_error:
                try:
                    tweet = json_util.loads(tweet)
                except:
                    bad_lines += 1
            else:
                tweet = json_util.loads(tweet)
            if self.limit != 0 and self.limit <= count:
                return
            elif tweet_parser.tweet_passes_filter(self.filter, tweet) \
            and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet):
                if self.should_strip:
                    yield tweet_parser.strip_tweet(self.keep_fields, tweet)
                else:
                    yield tweet
        if self.verbose:
            print("{} rows are ok.".format(count - bad_lines))
            print("{} rows are corrupt.".format(bad_lines))
        json_handle.close()
项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_bson(self, output_bson):
        filehandle = open(output_bson, 'ab+')

        for tweet in self.get_iterator():
            filehandle.write(BSON.encode(tweet))
        filehandle.close()
项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_json(self, output_json, compression=None, mode='a'):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_json, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_json, mode)
        else:
            filehandle = open(output_json, mode)

        for tweet in self.get_iterator():
            filehandle.write(json_util.dumps(tweet)+'\n')
        filehandle.close()
项目:chess_book_learning    作者:Aloril    | 项目源码 | 文件源码
def load_evaluations(filename):
    d = {}
    if filename.endswith(".bz2"):
        fp = bz2.open(filename, "rt")
    else:
        fp = open(filename)
    for line in fp:
        key = fen2key(line)
        l = line.strip().split()
        fen = " ".join(l[:6])
        score_type = l[6]
        score = l[7]
        pv = " ".join(l[8:])
        d[key] = (fen, score_type, score, pv)
    return d
项目:chess_book_learning    作者:Aloril    | 项目源码 | 文件源码
def init_log(logname):
    global log_fp
    if log_fp==None and logname:
        log_fp = open(logname, "a")
项目:neural-doodle    作者:alexjc    | 项目源码 | 文件源码
def load_data(self):
        """Open the serialized parameters from a pre-trained network, and load them into the model created.
        """
        vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2')
        if not os.path.exists(vgg19_file):
            error("Model file with pre-trained convolution layers not found. Download here...",
                  "https://github.com/alexjc/neural-doodle/releases/download/v0.0/vgg19_conv.pkl.bz2")

        data = pickle.load(bz2.open(vgg19_file, 'rb'))
        params = lasagne.layers.get_all_param_values(self.network['main'])
        lasagne.layers.set_all_param_values(self.network['main'], data[:len(params)])
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_raw(filename):
    with open(filename) as infile:
        raw = infile.read()
        # the next line needs rewriting as soon as the zenodo-dump conforms to 'records'-format
        # [{k:v}, {k:v},...]
        rawfacts = pd.read_json('[%s]' % ','.join(raw.splitlines()), orient='records')
    return rawfacts


### functions for ingesting from CProject



### functions for preprocessing
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_preprocessed_df(cacheddatapath=None, rawdatapath=None):
    try:
        with gzip.open(os.path.join(cacheddatapath, "preprocessed_df.pklz"), "rb") as infile:
            df = pickle.load(infile)
    except:
        df = preprocess(rawdatapath)
        if rawdatapath is None:
            pass
            # needs an io error for missing rawdatapath
        with gzip.open(os.path.join(cacheddatapath, "preprocessed_df.pklz"), "wb") as outfile:
            pickle.dump(df, outfile, protocol=4)
    return df
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_wikidata_dict(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "wikidata_dict.pklz"), "rb") as infile:
            wikidataIDs = pickle.load(infile)
    except:
        wikidataIDs = make_wikidata_dict(cacheddatapath, rawdatapath)
        with gzip.open(os.path.join(cacheddatapath, "wikidata_dict.pklz"), "wb") as outfile:
            pickle.dump(wikidataIDs, outfile, protocol=4)
    return wikidataIDs

## functions to extract features
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_series(cacheddatapath, rawdatapath, column):
    try:
        with gzip.open(os.path.join(cacheddatapath, column+"_series.pklz"), "rb") as infile:
            series = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        series = make_series(df, column)
        with gzip.open(os.path.join(cacheddatapath, column+"_series.pklz"), "wb") as outfile:
            pickle.dump(series, outfile, protocol=4)
    return series
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_coocc_features(cacheddatapath, rawdatapath):
    try:
        with bz2.open(os.path.join(cacheddatapath, "coocc_features.pklz2"), "r") as infile:
            coocc_features = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        coocc_features = count_cooccurrences(df)
        with bz2.BZ2File(os.path.join(cacheddatapath, "coocc_features.pklz2"), "w") as outfile:
            pickle.dump(coocc_features, outfile, protocol=4)
    return coocc_features
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_timeseries_features(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "timeseries_features.pklz"), "rb") as infile:
            ts_features = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        ts_features = make_timeseries(df)
        with gzip.open(os.path.join(cacheddatapath, "timeseries_features.pklz"), "wb") as outfile:
            pickle.dump(ts_features, outfile, protocol=4)
    return ts_features
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_journal_features(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "journal_features.pklz"), "rb") as infile:
            journ_raw = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        journ_raw = make_journal_features(df)
        with gzip.open(os.path.join(cacheddatapath, "journal_features.pklz"), "wb") as outfile:
            pickle.dump(journ_raw, outfile, protocol=4)
    return journ_raw
项目:visualizations    作者:ContentMine    | 项目源码 | 文件源码
def get_distribution_features(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "dist_features.pklz"), "rb") as infile:
            dist_features = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        dist_features = make_distribution_features(df)
        with gzip.open(os.path.join(cacheddatapath, "dist_features.pklz"), "wb") as outfile:
            pickle.dump(dist_features, outfile, protocol=4)
    return dist_features
项目:supic    作者:Hirico    | 项目源码 | 文件源码
def load_perceptual(self):
        """Open the serialized parameters from a pre-trained network, and load them into the model created.
        """
        vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2')
        if not os.path.exists(vgg19_file):
            error("Model file with pre-trained convolution layers not found. Download here...",
                  "https://github.com/alexjc/neural-doodle/releases/download/v0.0/vgg19_conv.pkl.bz2")

        data = pickle.load(bz2.open(vgg19_file, 'rb'))
        layers = lasagne.layers.get_all_layers(self.last_layer(), treat_as_input=[self.network['percept']])
        for p, d in zip(itertools.chain(*[l.get_params() for l in layers]), data): p.set_value(d)
项目:supic    作者:Hirico    | 项目源码 | 文件源码
def save_generator(self):
        def cast(p): return p.get_value().astype(np.float16)
        params = {k: [cast(p) for p in l.get_params()] for (k, l) in self.list_generator_layers()}
        config = {k: getattr(args, k) for k in ['generator_blocks', 'generator_residual', 'generator_filters'] + \
                                               ['generator_upscale', 'generator_downscale']}

        pickle.dump((config, params), bz2.open(self.get_filename(absolute=True), 'wb'))
        print('  - Saved model as `{}` after training.'.format(self.get_filename()))
项目:supic    作者:Hirico    | 项目源码 | 文件源码
def load_model(self):
        if not os.path.exists(self.get_filename(absolute=True)):
            if args.train: return {}, {}
            error("Model file with pre-trained convolution layers not found. Download it here...",
                  "https://github.com/alexjc/neural-enhance/releases/download/v%s/%s"%(__version__, self.get_filename()))
        print('  - Loaded file `{}` with trained model.'.format(self.get_filename()))
        return pickle.load(bz2.open(self.get_filename(absolute=True), 'rb'))
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def gen_opener(filenames):
    '''
    Open a sequence of filenames one at a time producing a file object.
    The file is closed immediately when proceeding to the next iteration. 
    '''
    for filename in filenames:
        if filename.endswith('.gz'):
            f = gzip.open(filename, 'rt')
        elif filename.endswith('.bz2'):
            f = bz2.open(filename, 'rt')
        else:
            f = open(filename, 'rt')
        yield f
        f.close()
项目:HOPSTACK    作者:willpatterson    | 项目源码 | 文件源码
def __init__(self, path):
        self.path = path
        self.accessor = self.open()
项目:HOPSTACK    作者:willpatterson    | 项目源码 | 文件源码
def decompress(self):
         with gzip.open(self.path, 'rb') as gfile:
             return gfile.read()
项目:HOPSTACK    作者:willpatterson    | 项目源码 | 文件源码
def decompress(self, outpath):
        with bz2.open(self.path, 'rb') as bfile:
            return bfile.read()
项目:py-etlt    作者:SetBased    | 项目源码 | 文件源码
def _open_file(self, mode, encoding=None):
        """
        Opens the next current file.

        :param str mode: The mode for opening the file.
        :param str encoding: The encoding of the file.
        """
        if self._filename[-4:] == '.bz2':
            self._file = bz2.open(self._filename, mode=mode, encoding=encoding)
        else:
            self._file = open(self._filename, mode=mode, encoding=encoding)

    # ------------------------------------------------------------------------------------------------------------------
项目:py-etlt    作者:SetBased    | 项目源码 | 文件源码
def _get_sample(self, mode, encoding):
        """
        Get a sample from the next current input file.

        :param str mode: The mode for opening the file.
        :param str|None encoding: The encoding of the file. None for open the file in binary mode.
        """
        self._open_file(mode, encoding)
        self._sample = self._file.read(UniversalCsvReader.sample_size)
        self._file.close()

    # ------------------------------------------------------------------------------------------------------------------
项目:DeepRes    作者:Aneeshers    | 项目源码 | 文件源码
def load_perceptual(self):

        vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2')
        if not os.path.exists(vgg19_file):
            error("MOdel was not found",
                  "Reformat model directory")

        data = pickle.load(bz2.open(vgg19_file, 'rb'))
        layers = lasagne.layers.get_all_layers(self.last_layer(), treat_as_input=[self.network['percept']])
        for p, d in zip(itertools.chain(*[l.get_params() for l in layers]), data): p.set_value(d)
项目:DeepRes    作者:Aneeshers    | 项目源码 | 文件源码
def save_generator(self):
        def cast(p): return p.get_value().astype(np.float16)
        params = {k: [cast(p) for p in l.get_params()] for (k, l) in self.list_generator_layers()}
        config = {k: getattr(args, k) for k in ['generator_blocks', 'generator_residual', 'generator_filters'] + \
                                               ['generator_upscale', 'generator_downscale']}

        pickle.dump((config, params), bz2.open(self.get_filename(), 'wb'))
        print('  - Saved model as `{}` after training.'.format(self.get_filename()))
项目:DeepRes    作者:Aneeshers    | 项目源码 | 文件源码
def load_model(self):
        if not os.path.exists(self.get_filename()):
            if args.train: return {}, {}
            error("Model Xfile with pre-trained convolution layers not found. Download it here...")

        print('  - Loaded file `{}`.'.format(self.get_filename()))
        return pickle.load(bz2.open(self.get_filename(), 'rb'))
项目:de_charlm    作者:scfrank    | 项目源码 | 文件源码
def open_f(filename):
    if filename.endswith('.bz2'):
        return bz2.open(filename, 'r')
    else: # assume it's normal tzt
        return open(filename, 'r')


# prints to stdout for piping into kenlm
项目:MAP-IT    作者:alexmarder    | 项目源码 | 文件源码
def __enter__(self):
        if self.compression == 'gzip':
            self.f = gzip.open(self.filename, 'rt' if self.read else 'wt')
        elif self.compression == 'bzip2':
            self.f = bz2.open(self.filename, 'rt' if self.read else 'wt')
        else:
            self.f = open(self.filename, 'r' if self.read else 'w')
        return self.f
项目:MAP-IT    作者:alexmarder    | 项目源码 | 文件源码
def load_pickle(filename):
    with open(filename, 'rb') as f:
        return pickle.load(f)
项目:MAP-IT    作者:alexmarder    | 项目源码 | 文件源码
def save_pickle(filename, obj):
    with open(filename, 'wb') as f:
        pickle.dump(obj, f)
项目:MAP-IT    作者:alexmarder    | 项目源码 | 文件源码
def save_json(filename, obj):
    with open(filename, 'w') as f:
        json.dump(obj, f)
项目:webextaware    作者:cr    | 项目源码 | 文件源码
def load(self, metadata_filename):
        global logger
        self.__ext = []
        try:
            with bz2.open(metadata_filename, "r") as f:
                logger.debug("Retrieving metadata state from `%s`" % metadata_filename)
                for e in json.load(f):
                    self.__ext.append(Extension(e))
        except FileNotFoundError:
            logger.warning("No metadata state stored in `%s`" % metadata_filename)
项目:webextaware    作者:cr    | 项目源码 | 文件源码
def save(self):
        global logger
        logger.debug("Writing metadata state to `%s`" % self.__filename)
        with bz2.open(self.__filename, "w") as f:
            f.write(json.dumps(self.__ext).encode("utf-8"))
项目:vsmlib    作者:undertherain    | 项目源码 | 文件源码
def detect_archive_format_and_open(path):
    if path.endswith(".bz2"):
        return bz2.open(path)
    if path.endswith(".gz"):
        return gzip.open(path, mode='rt')
    return open(path)
项目:vsmlib    作者:undertherain    | 项目源码 | 文件源码
def save_json(data, path):
    # if not os.path.isdir(path):
        # os.makedirs(path)
    s = json.dumps(data, ensure_ascii=False, indent=4, sort_keys=True)
    f = open(path, 'w')
    f.write(s)
    f.close()
项目:vsmlib    作者:undertherain    | 项目源码 | 文件源码
def load_json(path):
    f = open(path)
    s_data = f.read()
    data = json.loads(s_data)
    f.close()
    return data
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def _wrap_compressed(f, compression, encoding=None):
    """wraps compressed fileobject in a decompressing fileobject
    NOTE: For all files in Python 3.2 and for bzip'd files under all Python
    versions, this means reading in the entire file and then re-wrapping it in
    StringIO.
    """
    compression = compression.lower()
    encoding = encoding or get_option('display.encoding')

    if compression == 'gzip':
        import gzip

        f = gzip.GzipFile(fileobj=f)
        if compat.PY3:
            from io import TextIOWrapper

            f = TextIOWrapper(f)
        return f
    elif compression == 'bz2':
        import bz2

        if compat.PY3:
            f = bz2.open(f, 'rt', encoding=encoding)
        else:
            # Python 2's bz2 module can't take file objects, so have to
            # run through decompress manually
            data = bz2.decompress(f.read())
            f = StringIO(data)
        return f
    else:
        raise ValueError('do not recognize compression method %s'
                         % compression)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def __next__(self):
        if self.buffer is not None:
            try:
                line = next(self.buffer)
            except StopIteration:
                self.buffer = None
                line = next(self.f)
        else:
            line = next(self.f)
        # Note: 'colspecs' is a sequence of half-open intervals.
        return [line[fromm:to].strip(self.delimiter)
                for (fromm, to) in self.colspecs]
项目:lol-subreddit-tools    作者:TheEnigmaBlade    | 项目源码 | 文件源码
def load_cached_storage(cache_file, default_size=1000):
    if cache_file is not None and os.path.exists(cache_file):
        print("Loading cache: {0}".format(cache_file))
        with bz2.open(cache_file, "rb") as file:
            try:
                cache = pickle.load(file)
                return cache
            except pickle.PickleError and EOFError:
                return None
    return ThingCache(cache_size=default_size, file=cache_file)