我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.PAX_FORMAT。
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed") finally: tar.close()
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {"path": "foo", "uid": "123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = "\xe4\xf6\xfc" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123) finally: tar.close()
def test_unicode_filename_error(self): if self.format == tarfile.PAX_FORMAT: # PAX_FORMAT ignores encoding in write mode. return tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() tarinfo.name = "\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = "\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) finally: tar.close()
def test_uname_unicode(self): t = tarfile.TarInfo("foo") t.uname = "\xe4\xf6\xfc" t.gname = "\xe4\xf6\xfc" tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") try: tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmember("foo") self.assertEqual(t.uname, "\xe4\xf6\xfc") self.assertEqual(t.gname, "\xe4\xf6\xfc") if self.format != tarfile.PAX_FORMAT: tar.close() tar = tarfile.open(tmpname, encoding="ascii") t = tar.getmember("foo") self.assertEqual(t.uname, "\udce4\udcf6\udcfc") self.assertEqual(t.gname, "\udce4\udcf6\udcfc") finally: tar.close()
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {u"path": u"foo", u"uid": u"123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = u"\xe4\xf6\xfc" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123) finally: tar.close()
def test_unicode_filename_error(self): tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() tarinfo.name = "\xe4\xf6\xfc" if self.format == tarfile.PAX_FORMAT: self.assertRaises(UnicodeError, tar.addfile, tarinfo) else: tar.addfile(tarinfo) tarinfo.name = u"\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = u"\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) finally: tar.close()
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertEqual(link, l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertEqual(name, n, "PAX longname creation failed") finally: tar.close()
def extract(self, compression_type='*', direc='.'): """ Extract contents of internal tar(pax) archive. :param compression_type: the compression type (*=transparent) :type compression_type: string :param direc: path to extract to :type direc: string """ if not self.data: raise DataDictError('no data files') with tarfile.open(mode='r:%s' % compression_type, format=tarfile.PAX_FORMAT, fileobj=cStringIO.StringIO(self.data) ) as tar: tar.extractall(direc)
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) tar.addfile(tarinfo) tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed") finally: tar.close()
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {u"path": u"foo", u"uid": u"123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = u"äöü" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123) finally: tar.close()
def test_unicode_filename_error(self): tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() tarinfo.name = "äöü" if self.format == tarfile.PAX_FORMAT: self.assertRaises(UnicodeError, tar.addfile, tarinfo) else: tar.addfile(tarinfo) tarinfo.name = u"äöü" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = u"äöü" self.assertRaises(UnicodeError, tar.addfile, tarinfo) finally: tar.close()
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) tar.addfile(tarinfo) tar.close() tar = tarfile.open(tmpname) if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed")
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {u"path": u"foo", u"uid": u"123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") t = tarfile.TarInfo() t.name = u"" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123)
def test_unicode_filename_error(self): tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") tarinfo = tarfile.TarInfo() tarinfo.name = "" if self.format == tarfile.PAX_FORMAT: self.assertRaises(UnicodeError, tar.addfile, tarinfo) else: tar.addfile(tarinfo) tarinfo.name = u"" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = u"" self.assertRaises(UnicodeError, tar.addfile, tarinfo)
def save_files(self, imageId, namespace, rootfsdir, files): def tarfilter(member): subber = re.sub("^/*", "", rootfsdir) subber = re.sub("/*$", "", subber) finalstr = '/'.join(['imageroot', re.sub("^"+re.escape(subber)+"/*", "", member.name)]) member.name = finalstr return(member) thedir = os.path.join(self.imagerootdir, imageId, "file_store", namespace) if not os.path.exists(thedir): os.makedirs(thedir) tar = tarfile.open('/'.join([thedir, 'stored_files.tar.gz']), mode='w:gz', format=tarfile.PAX_FORMAT) for thefile in files: if os.path.exists(thefile): print "INFO: storing file: " + str(thefile) tar.add(thefile, filter=tarfilter) else: print "WARN: could not find file ("+str(thefile)+") in image: skipping store" tar.close() return(True)
def uploadPackage(self, buildId, audit, content, verbose): if not self.canUploadLocal(): return shown = False try: with self._openUploadFile(buildId, ARTIFACT_SUFFIX) as (name, fileobj): pax = { 'bob-archive-vsn' : "1" } if verbose > 0: print(colorize(" UPLOAD {} to {} .. " .format(content, self._remoteName(buildId, ARTIFACT_SUFFIX)), "32"), end="") else: print(colorize(" UPLOAD {} .. ".format(content), "32"), end="") shown = True with gzip.open(name or fileobj, 'wb', 6) as gzf: with tarfile.open(name, "w", fileobj=gzf, format=tarfile.PAX_FORMAT, pax_headers=pax) as tar: tar.add(audit, "meta/" + os.path.basename(audit)) tar.add(content, arcname="content") print(colorize("ok", "32")) except ArtifactExistsError: if shown: print("skipped ({} exists in archive)".format(content)) else: print(" UPLOAD skipped ({} exists in archive)".format(content)) except (ArtifactUploadError, tarfile.TarError, OSError) as e: if shown: if verbose > 0: print(colorize("error ("+str(e)+")", "31")) else: print(colorize("error", "31")) if not self.__ignoreErrors: raise BuildError("Cannot upload artifact: " + str(e))
def __createArtifactByName(self, name, version="1"): pax = { 'bob-archive-vsn' : version } with tarfile.open(name, "w|gz", format=tarfile.PAX_FORMAT, pax_headers=pax) as tar: with NamedTemporaryFile() as audit: audit.write(b'AUDIT') audit.flush() tar.add(audit.name, "meta/audit.json.gz") with TemporaryDirectory() as content: with open(os.path.join(content, "data"), "wb") as f: f.write(b'DATA') tar.add(content, "content") return name
def handle_download(abe, page): name = abe.args.download_name if name is None: name = re.sub(r'\W+', '-', ABE_APPNAME.lower()) + '-' + ABE_VERSION fileobj = lambda: None fileobj.func_dict['write'] = page['start_response']( '200 OK', [('Content-type', 'application/x-gtar-compressed'), ('Content-disposition', 'filename=' + name + '.tar.gz')]) import tarfile with tarfile.TarFile.open(fileobj=fileobj, mode='w|gz', format=tarfile.PAX_FORMAT) as tar: tar.add(os.path.split(__file__)[0], name) raise Streamed()
def create_tarfile(env, compression_level='9'): print('Creating archive...') base = os.path.join(SW, 'dist') try: shutil.rmtree(base) except EnvironmentError as err: if err.errno != errno.ENOENT: raise os.mkdir(base) dist = os.path.join(base, '%s-%s-%s.tar' % (calibre_constants['appname'], calibre_constants['version'], arch)) with tarfile.open(dist, mode='w', format=tarfile.PAX_FORMAT) as tf: cwd = os.getcwd() os.chdir(env.base) try: for x in os.listdir('.'): tf.add(x) finally: os.chdir(cwd) print('Compressing archive...') ans = dist.rpartition('.')[0] + '.txz' start_time = time.time() subprocess.check_call(['xz', '--threads=0', '-f', '-' + compression_level, dist]) secs = time.time() - start_time print('Compressed in %d minutes %d seconds' % (secs // 60, secs % 60)) os.rename(dist + '.xz', ans) print('Archive %s created: %.2f MB' % ( os.path.basename(ans), os.stat(ans).st_size / (1024.**2)))
def test_pax_global_header(self): pax_headers = { "foo": "bar", "uid": "0", "mtime": "1.23", "test": "\xe4\xf6\xfc", "\xe4\xf6\xfc": "test"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, pax_headers=pax_headers) try: tar.addfile(tarfile.TarInfo("test")) finally: tar.close() # Test if the global header was written correctly. tar = tarfile.open(tmpname, encoding="iso8859-1") try: self.assertEqual(tar.pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are strings. for key, val in tar.pax_headers.items(): self.assertTrue(type(key) is not bytes) self.assertTrue(type(val) is not bytes) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) except (TypeError, ValueError): self.fail("unable to convert pax header field") finally: tar.close()
def test_open_nonwritable_fileobj(self): for exctype in IOError, EOFError, RuntimeError: class BadFile(StringIO.StringIO): first = True def write(self, data): if self.first: self.first = False raise exctype f = BadFile() with self.assertRaises(exctype): tar = tarfile.open(tmpname, self.mode, fileobj=f, format=tarfile.PAX_FORMAT, pax_headers={'non': 'empty'}) self.assertFalse(f.closed)
def test_pax_global_header(self): pax_headers = { u"foo": u"bar", u"uid": u"0", u"mtime": u"1.23", u"test": u"\xe4\xf6\xfc", u"\xe4\xf6\xfc": u"test"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, pax_headers=pax_headers) try: tar.addfile(tarfile.TarInfo("test")) finally: tar.close() # Test if the global header was written correctly. tar = tarfile.open(tmpname, encoding="iso8859-1") try: self.assertEqual(tar.pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are unicode. for key, val in tar.pax_headers.iteritems(): self.assertTrue(type(key) is unicode) self.assertTrue(type(val) is unicode) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) except (TypeError, ValueError): self.fail("unable to convert pax header field") finally: tar.close()
def build_sdist(sdist_directory, config_settings): target = 'pkg2-0.5.tar.gz' with tarfile.open(pjoin(sdist_directory, target), 'w:gz', format=tarfile.PAX_FORMAT) as tf: def _add(relpath): tf.add(relpath, arcname='pkg2-0.5/' + relpath) _add('pyproject.toml') for pyfile in glob('*.py'): _add(pyfile) for distinfo in glob('*.dist-info'): _add(distinfo) return target
def build_sdist(sdist_directory, config_settings): if config_settings.get('test_unsupported', False): raise UnsupportedOperation target = 'pkg1-0.5.tar.gz' with tarfile.open(pjoin(sdist_directory, target), 'w:gz', format=tarfile.PAX_FORMAT) as tf: def _add(relpath): tf.add(relpath, arcname='pkg1-0.5/' + relpath) _add('pyproject.toml') for pyfile in glob('*.py'): _add(pyfile) return target
def test_pax_global_header(self): pax_headers = { "foo": "bar", "uid": "0", "mtime": "1.23", "test": "\xe4\xf6\xfc", "\xe4\xf6\xfc": "test"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, pax_headers=pax_headers) try: tar.addfile(tarfile.TarInfo("test")) finally: tar.close() # Test if the global header was written correctly. tar = tarfile.open(tmpname, encoding="iso8859-1") try: self.assertEqual(tar.pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are strings. for key, val in tar.pax_headers.items(): self.assertIsNot(type(key), bytes) self.assertIsNot(type(val), bytes) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) except (TypeError, ValueError): self.fail("unable to convert pax header field") finally: tar.close()
def test_pax_limits(self): tarinfo = tarfile.TarInfo("123/" * 126 + "longname") tarinfo.tobuf(tarfile.PAX_FORMAT) tarinfo = tarfile.TarInfo("longlink") tarinfo.linkname = "123/" * 126 + "longname" tarinfo.tobuf(tarfile.PAX_FORMAT) tarinfo = tarfile.TarInfo("name") tarinfo.uid = 0o4000000000000000000 tarinfo.tobuf(tarfile.PAX_FORMAT)
def list(self, compression_type='*'): """ List contents of internal tar(pax) archive. :param compression_type: the compression type (*=transparent) :type compression_type: string """ if not self.data: raise DataDictError('no data files') with tarfile.open(mode='r:%s' % compression_type, format=tarfile.PAX_FORMAT, fileobj=cStringIO.StringIO(self.data) ) as tar: tar.list()
def fromarchive(archive_path, dir=None): """Extract an archive and return a SigMFFile. If `dir` is given, extract the archive to that directory. Otherwise, the archive will be extracted to a temporary directory. For example, `dir` == "." will extract the archive into the current working directory. """ if not dir: dir = tempfile.mkdtemp() archive = tarfile.open(archive_path, mode="r", format=tarfile.PAX_FORMAT) members = archive.getmembers() try: archive.extractall(path=dir) data_file = None metadata = None for member in members: if member.name.endswith(SIGMF_DATASET_EXT): data_file = path.join(dir, member.name) elif member.name.endswith(SIGMF_METADATA_EXT): bytestream_reader = codecs.getreader("utf-8") # bytes -> str mdfile_reader = bytestream_reader(archive.extractfile(member)) metadata = json.load(mdfile_reader) finally: archive.close() return SigMFFile(metadata=metadata, data_file=data_file)
def __init__(self, sigmffile, name=None, fileobj=None): self.sigmffile = sigmffile self.name = name self.fileobj = fileobj self._check_input() archive_name = self._get_archive_name() sigmf_fileobj = self._get_output_fileobj() sigmf_archive = tarfile.TarFile(mode="w", fileobj=sigmf_fileobj, format=tarfile.PAX_FORMAT) tmpdir = tempfile.mkdtemp() sigmf_md_filename = archive_name + SIGMF_METADATA_EXT sigmf_md_path = os.path.join(tmpdir, sigmf_md_filename) sigmf_data_filename = archive_name + SIGMF_DATASET_EXT sigmf_data_path = os.path.join(tmpdir, sigmf_data_filename) with open(sigmf_md_path, "w") as mdfile: self.sigmffile.dump(mdfile, pretty=True) shutil.copy(self.sigmffile.data_file, sigmf_data_path) def chmod(tarinfo): if tarinfo.isdir(): tarinfo.mode = 0o755 # dwrxw-rw-r else: tarinfo.mode = 0o644 # -wr-r--r-- return tarinfo sigmf_archive.add(tmpdir, arcname=archive_name, filter=chmod) sigmf_archive.close() if not fileobj: sigmf_fileobj.close() shutil.rmtree(tmpdir) self.path = sigmf_archive.name
def create_test_archive(test_sigmffile, tmpfile): sigmf_archive = test_sigmffile.archive(fileobj=tmpfile) sigmf_tarfile = tarfile.open(sigmf_archive, mode="r", format=tarfile.PAX_FORMAT) return sigmf_tarfile
def test_pax_global_header(self): pax_headers = { u"foo": u"bar", u"uid": u"0", u"mtime": u"1.23", u"test": u"äöü", u"äöü": u"test"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, pax_headers=pax_headers) tar.addfile(tarfile.TarInfo("test")) tar.close() # Test if the global header was written correctly. tar = tarfile.open(tmpname, encoding="iso8859-1") try: self.assertEqual(tar.pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are unicode. for key, val in tar.pax_headers.iteritems(): self.assertTrue(type(key) is unicode) self.assertTrue(type(val) is unicode) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) except (TypeError, ValueError): self.fail("unable to convert pax header field") finally: tar.close()
def test_open_nonwritable_fileobj(self): for exctype in OSError, EOFError, RuntimeError: class BadFile(io.BytesIO): first = True def write(self, data): if self.first: self.first = False raise exctype f = BadFile() with self.assertRaises(exctype): tar = tarfile.open(tmpname, self.mode, fileobj=f, format=tarfile.PAX_FORMAT, pax_headers={'non': 'empty'}) self.assertFalse(f.closed)
def test_pax_global_header(self): pax_headers = { u"foo": u"bar", u"uid": u"0", u"mtime": u"1.23", u"test": u"", u"": u"test"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, pax_headers=pax_headers) tar.addfile(tarfile.TarInfo("test")) tar.close() # Test if the global header was written correctly. tar = tarfile.open(tmpname, encoding="iso8859-1") self.assertEqual(tar.pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are unicode. for key, val in tar.pax_headers.iteritems(): self.assertTrue(type(key) is unicode) self.assertTrue(type(val) is unicode) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) except (TypeError, ValueError): self.fail("unable to convert pax header field")
def test_pax_limits(self): tarinfo = tarfile.TarInfo("123/" * 126 + "longname") tarinfo.tobuf(tarfile.PAX_FORMAT) tarinfo = tarfile.TarInfo("longlink") tarinfo.linkname = "123/" * 126 + "longname" tarinfo.tobuf(tarfile.PAX_FORMAT) tarinfo = tarfile.TarInfo("name") tarinfo.uid = 04000000000000000000L tarinfo.tobuf(tarfile.PAX_FORMAT)