我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.TarInfo()。
def do_tarfile_save(self, tf): """ Write images and calibration solution to a tarfile object """ def taradd(name, buf): s = StringIO(buf) ti = tarfile.TarInfo(name) ti.size = len(s.getvalue()) ti.uname = 'calibrator' ti.mtime = int(time.time()) tf.addfile(tarinfo=ti, fileobj=s) ims = [("left-%04d.png" % i, im) for i,(_, im) in enumerate(self.db)] for (name, im) in ims: taradd(name, cv2.imencode(".png", im)[1].tostring()) if self.calibrated: taradd('ost.yaml', self.yaml()) taradd('ost.txt', self.ost()) else: print("Doing none-calibration tarfile save!")
def make_trivial_sdist(dist_path, setup_py): """Create a simple sdist tarball at dist_path, containing just a setup.py, the contents of which are provided by the setup_py string. """ setup_py_file = tarfile.TarInfo(name='setup.py') try: # Python 3 (StringIO gets converted to io module) MemFile = BytesIO except AttributeError: MemFile = StringIO setup_py_bytes = MemFile(setup_py.encode('utf-8')) setup_py_file.size = len(setup_py_bytes.getvalue()) dist = tarfile.open(dist_path, 'w:gz') try: dist.addfile(setup_py_file, fileobj=setup_py_bytes) finally: dist.close()
def create_bundle(output, tag_to_config, diffid_to_blobsum, blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy): """Creates a Docker image from a list of layers. Args: output: the name of the docker image file to create. layers: the layers (tar files) to join to the image. tag_to_layer: a map from docker_name.Tag to the layer id it references. layer_to_tags: a map from the name of the layer tarball as it appears in our archives to the list of tags applied to it. """ with tarfile.open(output, 'w') as tar: def add_file(filename, contents): info = tarfile.TarInfo(filename) info.size = len(contents) tar.addfile(tarinfo=info, fileobj=cStringIO.StringIO(contents)) tag_to_image = {} for (tag, config) in six.iteritems(tag_to_config): tag_to_image[tag] = FromParts( config, diffid_to_blobsum, blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy) v2_2_save.multi_image_tarball(tag_to_image, tar)
def backup_media(): extension = "tar.gz" filename = utils.filename_generate(extension, content_type='media') # Create tarball media_storage = get_storage_class()() outputfile = utils.create_spooled_temporary_file() tar_file = tarfile.open(name=filename, fileobj=outputfile, mode='w:gz') for media_filename in explore_storage(media_storage): tarinfo = tarfile.TarInfo(media_filename) media_file = media_storage.open(media_filename) tarinfo.size = len(media_file) tar_file.addfile(tarinfo, media_file) # Close the TAR for writing tar_file.close() # Store backup outputfile.seek(0) return outputfile, filename
def tarball( name, image, tar ): """Produce a "docker save" compatible tarball from the DockerImage. Args: name: The tag name to write into the repositories file. image: a docker image to save. tar: the open tarfile into which we are writing the image tarball. """ def add_file(filename, contents): info = tarfile.TarInfo(filename) info.size = len(contents) tar.addfile(tarinfo=info, fileobj=cStringIO.StringIO(contents)) multi_image_tarball({name: image}, tar) # Add our convenience file with the top layer's ID. add_file('top', image.top())
def copy_to_container(self, container, source, dest): tar_stream = BytesIO() tar_file = tarfile.TarFile(fileobj=tar_stream, mode='w') file_data = open(source, mode='rb').read() fil_size = os.stat(source).st_size tarinfo = tarfile.TarInfo(name=os.path.basename(source)) tarinfo.size = fil_size tarinfo.mtime = time.time() # tarinfo.mode = 0600 tar_file.addfile(tarinfo, BytesIO(file_data)) tar_file.close() tar_stream.seek(0) res = self.client.put_archive(container=container['Id'], path=dest, data=tar_stream ) return res
def tarbuilder(asset_list=None): """Create a tar file from rendered assets. Add each asset in ``asset_list`` to a tar file with the defined path and permission. The assets need to have the rendered_bytes field populated. Return a tarfile.TarFile. :param hostname: the hostname the tar is destined for :param balltype: the type of assets being included :param asset_list: list of objects.BootActionAsset instances """ tarbytes = io.BytesIO() tarball = tarfile.open( mode='w:gz', fileobj=tarbytes, format=tarfile.GNU_FORMAT) asset_list = asset_list or [] for a in asset_list: fileobj = io.BytesIO(a.rendered_bytes) tarasset = tarfile.TarInfo(name=a.path) tarasset.size = len(a.rendered_bytes) tarasset.mode = a.permissions if a.permissions else 0o600 tarasset.uid = 0 tarasset.gid = 0 tarball.addfile(tarasset, fileobj=fileobj) tarball.close() return tarbytes.getvalue()
def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0): """ Add members to tar file. """ for member_name in members: member_data = '' permissions = DEFAULT_FILE_MODE if isinstance(member_name, tuple): if len(member_name) == 3: member_data = member_name[2] member_name, permissions = member_name[:2] data_encoded = member_data.encode('utf-8') t_info = tarfile.TarInfo(member_name) t_info.type = m_type t_info.mode = permissions t_info.uid = uid t_info.gid = gid t_info.size = len(data_encoded) tar_handle.addfile(t_info, io.BytesIO(data_encoded))
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. if self.mode.endswith(":gz"): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File else: _open = open for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. with _open(tmpname, "wb") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) finally: tar.close()
def test_100_char_name(self): # The name field in a tar header stores strings of at most 100 chars. # If a string is shorter than 100 chars it has to be padded with '\0', # which implies that a string of exactly 100 chars is stored without # a trailing '\0'. name = "0123456789" * 10 tar = tarfile.open(tmpname, self.mode) try: t = tarfile.TarInfo(name) tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname) try: self.assertTrue(tar.getnames()[0] == name, "failed to store 100 char filename") finally: tar.close()
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {"path": "foo", "uid": "123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = "\xe4\xf6\xfc" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123) finally: tar.close()
def test_unicode_filename_error(self): if self.format == tarfile.PAX_FORMAT: # PAX_FORMAT ignores encoding in write mode. return tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() tarinfo.name = "\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = "\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) finally: tar.close()
def test_uname_unicode(self): t = tarfile.TarInfo("foo") t.uname = "\xe4\xf6\xfc" t.gname = "\xe4\xf6\xfc" tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") try: tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmember("foo") self.assertEqual(t.uname, "\xe4\xf6\xfc") self.assertEqual(t.gname, "\xe4\xf6\xfc") if self.format != tarfile.PAX_FORMAT: tar.close() tar = tarfile.open(tmpname, encoding="ascii") t = tar.getmember("foo") self.assertEqual(t.uname, "\udce4\udcf6\udcfc") self.assertEqual(t.gname, "\udce4\udcf6\udcfc") finally: tar.close()
def test_premature_end_of_archive(self): for size in (512, 600, 1024, 1200): with tarfile.open(tmpname, "w:") as tar: t = tarfile.TarInfo("foo") t.size = 1024 tar.addfile(t, StringIO.StringIO("a" * 1024)) with open(tmpname, "r+b") as fobj: fobj.truncate(size) with tarfile.open(tmpname) as tar: with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): for t in tar: pass with tarfile.open(tmpname) as tar: t = tar.next() with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): tar.extract(t, TEMPDIR) with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): tar.extractfile(t).read()
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed") finally: tar.close()
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {u"path": u"foo", u"uid": u"123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = u"\xe4\xf6\xfc" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123) finally: tar.close()
def test_unicode_filename_error(self): tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() tarinfo.name = "\xe4\xf6\xfc" if self.format == tarfile.PAX_FORMAT: self.assertRaises(UnicodeError, tar.addfile, tarinfo) else: tar.addfile(tarinfo) tarinfo.name = u"\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = u"\xe4\xf6\xfc" self.assertRaises(UnicodeError, tar.addfile, tarinfo) finally: tar.close()
def _test_partial_input(self, mode): class MyStringIO(StringIO.StringIO): hit_eof = False def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in tarfile.open()") self.hit_eof = self.pos == self.len return StringIO.StringIO.read(self, n) def seek(self, *args): self.hit_eof = False return StringIO.StringIO.seek(self, *args) data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors
def add(self, *, path, data, mode): if path.startswith('/'): path = path[1:] tar_info = tarfile.TarInfo(name=path) if isinstance(data, str): data_bytes = data.encode('utf-8') else: data_bytes = data tar_info.size = len(data_bytes) tar_info.mode = mode if tar_info.size > 0: # Ignore bandit false positive: B303:blacklist # This is a basic checksum for debugging not a secure hash. LOG.debug( # nosec 'Adding file path=%s size=%s md5=%s', path, tar_info.size, hashlib.md5(data_bytes).hexdigest()) else: LOG.warning('Zero length file added to path=%s', path) self._tf.addfile(tar_info, io.BytesIO(data_bytes))
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. with self.open(tmpname, "w") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) finally: tar.close()
def test_100_char_name(self): # The name field in a tar header stores strings of at most 100 chars. # If a string is shorter than 100 chars it has to be padded with '\0', # which implies that a string of exactly 100 chars is stored without # a trailing '\0'. name = "0123456789" * 10 tar = tarfile.open(tmpname, self.mode) try: t = tarfile.TarInfo(name) tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname) try: self.assertEqual(tar.getnames()[0], name, "failed to store 100 char filename") finally: tar.close()
def write(self, worker, probe, dest): """ Serialize probe data, compress it and write it exclusively to output file. """ data = StringIO() data.write(compress(dumps(probe, HIGHEST_PROTOCOL))) data.seek(0) info = tarfile.TarInfo() info.name = 'Probe_%s.lzo' % dest info.uid = 0 info.gid = 0 info.size = len(data.buf) info.mode = S_IMODE(0o0444) info.mtime = mktime(probe.circs[0].created.timetuple()) with self._lock: # Maximum file size is about 1 GB if self._bytes_written >= 1 * 1000 * 1000 * 1000: self._tar.close() self._tar = self._create_tar_file() self._bytes_written = 0 self._tar.addfile(tarinfo=info, fileobj=data) self._bytes_written += info.size self._threads_finished.append(worker) self._worker_finished.set()
def mkbuildcontext(dockerfile): f = tempfile.NamedTemporaryFile() t = tarfile.open(mode='w', fileobj=f) if isinstance(dockerfile, io.StringIO): dfinfo = tarfile.TarInfo('Dockerfile') if six.PY3: raise TypeError('Please use io.BytesIO to create in-memory ' 'Dockerfiles with Python 3') else: dfinfo.size = len(dockerfile.getvalue()) dockerfile.seek(0) elif isinstance(dockerfile, io.BytesIO): dfinfo = tarfile.TarInfo('Dockerfile') dfinfo.size = len(dockerfile.getvalue()) dockerfile.seek(0) else: dfinfo = t.gettarinfo(fileobj=dockerfile, arcname='Dockerfile') t.addfile(dfinfo, dockerfile) t.close() f.seek(0) return f
def _tar_add_string_file(self, tarobj, fpath, content): """ Given a tarfile object, add a file to it at ``fpath``, with content ``content``. Largely based on: http://stackoverflow.com/a/40392022 :param tarobj: the tarfile to add to :type tarobj: tarfile.TarFile :param fpath: path to put the file at in the archive :type fpath: str :param content: file content :type content: str """ logger.debug('Adding %d-length string to tarfile at %s', len(content), fpath) data = content.encode('utf-8') f = BytesIO(data) info = tarfile.TarInfo(name=fpath) info.size = len(data) tarobj.addfile(tarinfo=info, fileobj=f)
def write_files_in_sandbox(self, files: List[Tuple[str, bytes]], dest_dir: str) -> bool: tar_file = BytesIO() # use BytesIO to store tarfile in memory tarobj = tarfile.open(fileobj=tar_file, mode="w") for file_name, file_contents in files: tarinfo = tarfile.TarInfo(name=file_name) # set file name tarinfo.size = len(file_contents) # set file size tarobj.addfile(tarinfo, BytesIO(file_contents)) # add file to archive tarobj.close() tar_file.seek(0) # set stream position is start of the stream data = tar_file.read() return self.docker_client.put_archive(self.container.get('Id'), dest_dir, data) # get file from sandbox # file_path : A path to a file or a directory # For a directory, file_path should be end with '/' or '/.' # If path ends in /. then this indicates that only the contents of the path directory should be copied. # A symlink is always resolved to its target.
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. if self.mode.endswith(":gz"): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File else: _open = open for char in ('\0', 'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. with _open(tmpname, "wb") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) finally: tar.close()
def test_pax_extended_header(self): # The fields from the pax header have priority over the # TarInfo. pax_headers = {u"path": u"foo", u"uid": u"123"} tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = u"äöü" # non-ASCII t.uid = 8**8 # too large t.pax_headers = pax_headers tar.addfile(t) finally: tar.close() tar = tarfile.open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) self.assertEqual(t.name, "foo") self.assertEqual(t.uid, 123) finally: tar.close()
def test_unicode_filename_error(self): tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() tarinfo.name = "äöü" if self.format == tarfile.PAX_FORMAT: self.assertRaises(UnicodeError, tar.addfile, tarinfo) else: tar.addfile(tarinfo) tarinfo.name = u"äöü" self.assertRaises(UnicodeError, tar.addfile, tarinfo) tarinfo.name = "foo" tarinfo.uname = u"äöü" self.assertRaises(UnicodeError, tar.addfile, tarinfo) finally: tar.close()
def test_uname_unicode(self): for name in (u"äöü", "äöü"): t = tarfile.TarInfo("foo") t.uname = name t.gname = name fobj = StringIO.StringIO() tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1") tar.addfile(t) tar.close() fobj.seek(0) tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1") t = tar.getmember("foo") self.assertEqual(t.uname, "äöü") self.assertEqual(t.gname, "äöü")
def test_premature_end_of_archive(self): for size in (512, 600, 1024, 1200): with tarfile.open(tmpname, "w:") as tar: t = tarfile.TarInfo("foo") t.size = 1024 tar.addfile(t, io.BytesIO(b"a" * 1024)) with open(tmpname, "r+b") as fobj: fobj.truncate(size) with tarfile.open(tmpname) as tar: with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): for t in tar: pass with tarfile.open(tmpname) as tar: t = tar.next() with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): tar.extract(t, TEMPDIR) with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): tar.extractfile(t).read()
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertEqual(link, l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertEqual(name, n, "PAX longname creation failed") finally: tar.close()