我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.ReadError()。
def read_tar(f): result = {} try: with tarfile.open(f) as t: for m in t: if m.isfile(): f = t.extractfile(m) sha512 = sha512_file(f) else: sha512 = None result[m.name] = TarMemberInfo(m, sha512) except tarfile.ReadError: # if we can't read the tar archive, we should never consider it to have # the same contents as another tar archive... result[f] = None return result # # #
def unzip(self, path=None): if not path: path = os.path.join(os.path.split(self.zip_file)[0], self.name) if not self.zipped: raise Exception('Scene does not have a zip file associated with it') else: try: tar = tarfile.open(self.zip_file, 'r') tar.extractall(path=path) tar.close() except tarfile.ReadError: subprocess.check_call(['tar', '-xf', self.zip_file, '-C', path]) formats = ['*.tif', '*.TIF', '*.jp2'] for f in formats: for image in glob.glob(os.path.join(path, f)): self.add(image) self.zipped = False
def test_empty_tarfile(self): # Test for issue6123: Allow opening empty archives. # This test checks if tarfile.open() is able to open an empty tar # archive successfully. Note that an empty tar archive is not the # same as an empty file! with tarfile.open(tmpname, self.mode.replace("r", "w")): pass try: tar = tarfile.open(tmpname, self.mode) tar.getnames() except tarfile.ReadError: self.fail("tarfile.open() failed on empty archive") else: self.assertListEqual(tar.getmembers(), []) finally: tar.close()
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") with open(empty, "wb") as fobj: fobj.write(b"") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: support.unlink(empty)
def test_premature_end_of_archive(self): for size in (512, 600, 1024, 1200): with tarfile.open(tmpname, "w:") as tar: t = tarfile.TarInfo("foo") t.size = 1024 tar.addfile(t, StringIO.StringIO("a" * 1024)) with open(tmpname, "r+b") as fobj: fobj.truncate(size) with tarfile.open(tmpname) as tar: with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): for t in tar: pass with tarfile.open(tmpname) as tar: t = tar.next() with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): tar.extract(t, TEMPDIR) with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): tar.extractfile(t).read()
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") with open(empty, "wb") as fobj: fobj.write("") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: support.unlink(empty)
def _test_partial_input(self, mode): class MyStringIO(StringIO.StringIO): hit_eof = False def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in tarfile.open()") self.hit_eof = self.pos == self.len return StringIO.StringIO.read(self, n) def seek(self, *args): self.hit_eof = False return StringIO.StringIO.seek(self, *args) data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors
def _test_partial_input(self, mode): class MyBytesIO(io.BytesIO): hit_eof = False def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in " "tarfile.open()") self.hit_eof = self.tell() == len(self.getvalue()) return super(MyBytesIO, self).read(n) def seek(self, *args): self.hit_eof = False return super(MyBytesIO, self).seek(*args) data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors
def extract_files(self): self.extract_error = None location = self.get_setting('download_dir').value version = self.selected_version() for setting_name, setting in self.settings['export_settings'].items(): save_file_path = setting.save_file_path(version, location) try: if setting.value: extract_path = get_data_path('files/'+setting.name) setting.extract(extract_path, version) self.progress_text += '.' except (tarfile.ReadError, zipfile.BadZipfile) as e: if os.path.exists(save_file_path): os.remove(save_file_path) self.extract_error = e self.logger.error(self.extract_error) # cannot use GUI in thread to notify user. Save it for later self.progress_text = '\nDone.\n' return True
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") with open(empty, "wb") as fobj: fobj.write(b"") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: test_support.unlink(empty)
def test_premature_end_of_archive(self): for size in (512, 600, 1024, 1200): with tarfile.open(tmpname, "w:") as tar: t = tarfile.TarInfo("foo") t.size = 1024 tar.addfile(t, io.BytesIO(b"a" * 1024)) with open(tmpname, "r+b") as fobj: fobj.truncate(size) with tarfile.open(tmpname) as tar: with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): for t in tar: pass with tarfile.open(tmpname) as tar: t = tar.next() with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): tar.extract(t, TEMPDIR) with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): tar.extractfile(t).read()
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") open(empty, "wb").write("") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: os.remove(empty)
def _untar_layers(dir, layers): output = {} # Untar layer filesystem bundle for layer in layers: tarfile = TarFile(dir + "/" + layer) for member in tarfile.getmembers(): output[member.name] = member for member_name in output: try: tarfile.extract(output[member_name], path=dir, set_attrs=False) except (ValueError, ReadError): pass # Clean up for layer in layers: clean_up(dir + "/" + layer[:-10])
def untar(destination, source): """ Untar source file into destination folder. tar.tarfile.extractall will create all necessary (sub)directories. Note: When untaring into the existing folder to overwrite files, tarfile.extractall function will throw a FileExistsError if it can not overwrite broken symlinks of the tar'ed file. Nuke it from orbit, it's the only way to be sure. :param 'destination': [str] path to where to extract target into. :param 'source': [str] path to a .tar file to untar. :return: [str] path to untared content. Raise RuntimeError on problems. """ try: destination = destination + '/untar' remove_target(destination) # succeeds even if missing with tarfile.open(source) as tar_obj: tar_obj.extractall(path=destination) return destination except (AssertionError, tarfile.ReadError, tarfile.ExtractError) as err: raise RuntimeError('Error occured while untaring "%s": %s' % (source, str(err)))
def train_or_val_pairs(self, setn): """ untar imagenet tar files into directories that indicate their label. returns [(filename, label), ...] for train or val set partitions """ img_dir = os.path.join(self.out_dir, setn) neon_logger.display("Extracting %s files" % (setn)) root_tf_path = self.tars[setn] if not os.path.exists(root_tf_path): raise IOError(("tar file {} not found. Ensure you have ImageNet downloaded" ).format(root_tf_path)) try: root_tf = tarfile.open(root_tf_path) except tarfile.ReadError as e: raise ValueError('ReadError opening {}: {}'.format(root_tf_path, e)) label_dict = self.extract_labels(setn) subpaths = root_tf.getmembers() arg_iterator = zip(repeat(self.target_size), repeat(root_tf_path), repeat(img_dir), repeat(setn), repeat(label_dict), subpaths) pool = multiprocessing.Pool() pairs = [] for pair_list in tqdm.tqdm(pool.imap_unordered(process_i1k_tar_subpath, arg_iterator), total=len(subpaths)): pairs.extend(pair_list) pool.close() pool.join() root_tf.close() return pairs
def test_null_tarfile(self): # Test for issue6123: Allow opening empty archives. # This test guarantees that tarfile.open() does not treat an empty # file as an empty tar archive. with open(tmpname, "wb"): pass self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. if self.mode == "r:": return self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) with open(tarname, "rb") as fobj: self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
def _testfunc_file(self, name, mode): try: tar = tarfile.open(name, mode) except tarfile.ReadError as e: self.fail() else: tar.close()
def _test_modes(self, testfunc): testfunc(tarname, "r") testfunc(tarname, "r:") testfunc(tarname, "r:*") testfunc(tarname, "r|") testfunc(tarname, "r|*") if gzip: self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") testfunc(gzipname, "r") testfunc(gzipname, "r:*") testfunc(gzipname, "r:gz") testfunc(gzipname, "r|*") testfunc(gzipname, "r|gz") if bz2: self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") testfunc(bz2name, "r") testfunc(bz2name, "r:*") testfunc(bz2name, "r:bz2") testfunc(bz2name, "r|*") testfunc(bz2name, "r|bz2")
def test_truncated_longname(self): longname = self.subdir + "/" + "123/" * 125 + "longname" tarinfo = self.tar.getmember(longname) offset = tarinfo.offset self.tar.fileobj.seek(offset) fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
def test_append_gz(self): if gzip is None: return self._create_testtar("w:gz") self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
def test_append_bz2(self): if bz2 is None: return self._create_testtar("w:bz2") self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") # Append mode is supposed to fail if the tarfile to append to # does not end with a zero block.
def _test_error(self, data): with open(self.tarname, "wb") as fobj: fobj.write(data) self.assertRaises(tarfile.ReadError, self._add_testfile)
def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. if self.mode == "r:": self.skipTest('needs a gz or bz2 mode') self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) with open(tarname, "rb") as fobj: self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
def _testfunc_fileobj(self, name, mode): try: tar = tarfile.open(name, mode, fileobj=open(name, "rb")) except tarfile.ReadError: self.fail() else: tar.close()
def test_truncated_longname(self): longname = self.subdir + "/" + "123/" * 125 + "longname" tarinfo = self.tar.getmember(longname) offset = tarinfo.offset self.tar.fileobj.seek(offset) fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512)) self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
def test_append_gz(self): self._create_testtar("w:gz") self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
def test_append_bz2(self): self._create_testtar("w:bz2") self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") # Append mode is supposed to fail if the tarfile to append to # does not end with a zero block.