我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.TarFile()。
def make_tar_stream(sdict): """ Create a file-like tarfile object Args: sdict (Mapping[str, pyccc.FileReferenceBase]): dict mapping filenames to file references Returns: Filelike: TarFile stream """ # TODO: this is currently done in memory - bad for big files! tarbuffer = io.BytesIO() tf = tarfile.TarFile(fileobj=tarbuffer, mode='w') for name, fileobj in sdict.items(): tar_add_bytes(tf, name, fileobj.read('rb')) tf.close() tarbuffer.seek(0) return tarbuffer
def read_data(self, password=None): """ Helper function which decrypts and decompresses the data if necessary and returns a tarfile.TarFile to interact with """ fp = self.fp fp.seek(self.__data_start) if self.is_encrypted(): fp = self._decrypt(fp, password=password) if self.compression == CompressionType.ZLIB: fp = self._decompress(fp) if self.stream: mode = 'r|*' else: mode = 'r:*' tar = tarfile.open(fileobj=fp, mode=mode) return tar
def tar_compiled(file, dir, expression='^.+$', exclude_content_from=None): """Used to tar a compiled application. The content of models, views, controllers is not stored in the tar file. """ tar = tarfile.TarFile(file, 'w') for file in listdir(dir, expression, add_dirs=True, exclude_content_from=exclude_content_from): filename = os.path.join(dir, file) if os.path.islink(filename): continue if os.path.isfile(filename) and file[-4:] != '.pyc': if file[:6] == 'models': continue if file[:5] == 'views': continue if file[:11] == 'controllers': continue if file[:7] == 'modules': continue tar.add(filename, file, False) tar.close()
def copy_to_container(self, container, source, dest): tar_stream = BytesIO() tar_file = tarfile.TarFile(fileobj=tar_stream, mode='w') file_data = open(source, mode='rb').read() fil_size = os.stat(source).st_size tarinfo = tarfile.TarInfo(name=os.path.basename(source)) tarinfo.size = fil_size tarinfo.mtime = time.time() # tarinfo.mode = 0600 tar_file.addfile(tarinfo, BytesIO(file_data)) tar_file.close() tar_stream.seek(0) res = self.client.put_archive(container=container['Id'], path=dest, data=tar_stream ) return res
def extract_layer_in_tmp_dir(img: tarfile.TarFile, layer_digest: str) -> str: """ This context manager allow to extract a selected layer into a temporal directory and yield the directory path >>> with open_docker_image(image_path) as (img, top_layer, _, manifest): last_layer_digest = get_last_image_layer(manifest) with extract_layer_in_tmp_dir(img, last_layer_digest) as d: print(d) """ with tempfile.TemporaryDirectory() as d: log.debug(" > Extracting layer content in temporal " "dir: {}".format(d)) extract_docker_layer(img, layer_digest, d) yield d
def get_root_json_from_image(img: tarfile.TarFile) -> Tuple[str, dict]: """ Every docker image has a root .json file with the metadata information. this function locate this file, load it and return the value of it and their name >>> get_docker_image_layers(img) ('db079554b4d2f7c65c4df3adae88cb72d051c8c3b8613eb44e86f60c945b1ca7', dict(...)) """ for f in img.getmembers(): if f.name.endswith("json") and "/" not in f.name: c = img.extractfile(f.name).read() if hasattr(c, "decode"): c = c.decode() return f.name.split(".")[0], json.loads(c) return None, None
def fileCopyOut(self, containerid, filename, path): """Copy file from container to the local machine. Args: containerid: Container ID filename: Name of file to be copied in. path: Path in the container where file should be copied. Returns: Nothing. """ # Copies file from container to local machine. # File transmits as a tar stream. Saves to local disk as tar. # Extracts file for local manipulation. tarObj, stats = super(scClient, self).get_archive(container=containerid, path=path + filename) with open('temp.tar', 'w') as destination: for line in tarObj: destination.write(line) destination.seek(0) thisTar = tarfile.TarFile(destination.name) thisTar.extract(filename) os.remove('temp.tar')
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. if self.mode.endswith(":gz"): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File else: _open = open for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. with _open(tmpname, "wb") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) finally: tar.close()
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") with open(empty, "wb") as fobj: fobj.write(b"") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: support.unlink(empty)
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") with open(empty, "wb") as fobj: fobj.write("") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: support.unlink(empty)
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. with self.open(tmpname, "w") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) finally: tar.close()
def _tar_add_string_file(self, tarobj, fpath, content): """ Given a tarfile object, add a file to it at ``fpath``, with content ``content``. Largely based on: http://stackoverflow.com/a/40392022 :param tarobj: the tarfile to add to :type tarobj: tarfile.TarFile :param fpath: path to put the file at in the archive :type fpath: str :param content: file content :type content: str """ logger.debug('Adding %d-length string to tarfile at %s', len(content), fpath) data = content.encode('utf-8') f = BytesIO(data) info = tarfile.TarInfo(name=fpath) info.size = len(data) tarobj.addfile(tarinfo=info, fileobj=f)
def backup_img(self, disk, target, target_filename=None): """ Backup a disk image :param disk: path of the image to backup :param target: dir or filename to copy into/as. If self.compress, target has to be a tarfile.TarFile :param target_filename: destination file will have this name, or keep the original one. target has to be a dir (if not exists, will be created) :returns backup_path: complete path of our backup """ if self.compression: backup_path = self._add_img_to_tarfile( disk, target, target_filename ) else: backup_path = self._copy_img_to_file(disk, target, target_filename) logger.debug("{} successfully copied".format(disk)) return os.path.abspath(backup_path)
def _add_img_to_tarfile(self, img, target, target_filename): """ :param img: source img path :param target: tarfile.TarFile where img will be added :param target_filename: img name in the tarfile """ total_size = os.path.getsize(img) tqdm_kwargs = { "total": total_size, "unit": "B", "unit_scale": True, "ncols": 0, "mininterval": 0.5 } logger.debug("Copy {}…".format(img)) if self.compression == "xz": backup_path = target.fileobj._fp.name else: backup_path = target.fileobj.name self.pending_info["tar"] = os.path.basename(backup_path) self._dump_pending_info() with tqdm(**tqdm_kwargs) as pbar: target.fileobject = get_progress_bar_tar(pbar) target.add(img, arcname=target_filename) return backup_path
def get_archive_names(self, filename): """ Return names of files within archive having filename. """ try: if filename.lower().endswith(".zip"): with zipfile.ZipFile( os.path.join(self.path, filename), "r" ) as z: names = z.namelist() elif filename.lower().endswith(".tar"): with tarfile.TarFile( os.path.join(self.path, filename), "r" ) as t: names = t.getnames() except Exception: logging.exception( "Exception opening file:" + str(os.path.join(self.path, filename)) ) names = [] return names
def extractall(file_path, destination, ext): """Extracts an archive file. This function extracts an archive file to a destination. Args: file_path (str): The path of a file to be extracted. destination (str): A directory path. The archive file will be extracted under this directory. ext (str): An extension suffix of the archive file. This function supports :obj:`'.zip'`, :obj:`'.tar'`, :obj:`'.gz'` and :obj:`'.tgz'`. """ if ext == '.zip': with zipfile.ZipFile(file_path, 'r') as z: z.extractall(destination) elif ext == '.tar': with tarfile.TarFile(file_path, 'r') as t: t.extractall(destination) elif ext == '.gz' or ext == '.tgz': with tarfile.open(file_path, 'r:gz') as t: t.extractall(destination)
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. if self.mode.endswith(":gz"): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File else: _open = open for char in ('\0', 'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. with _open(tmpname, "wb") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) finally: tar.close()
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") with open(empty, "wb") as fobj: fobj.write(b"") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: test_support.unlink(empty)
def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. if self.mode.endswith(":gz"): _open = gzip.GzipFile elif self.mode.endswith(":bz2"): _open = bz2.BZ2File else: _open = open for char in ('\0', 'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. fobj = _open(tmpname, "wb") fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) fobj.close() tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % char) tar.close()
def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile # constructor in case of an error. For the test we rely on # the fact that opening an empty file raises a ReadError. empty = os.path.join(TEMPDIR, "empty") open(empty, "wb").write("") try: tar = object.__new__(tarfile.TarFile) try: tar.__init__(empty) except tarfile.ReadError: self.assertTrue(tar.fileobj.closed) else: self.fail("ReadError not raised") finally: os.remove(empty)
def copy_to_container(src, dest, owner, group, container): def create_tar_stream(file_content, file_name): # metadata for internal file tarinfo = tarfile.TarInfo(name=file_name) tarinfo.size = len(file_content) tarinfo.mtime = time.time() tarstream = BytesIO() tar = tarfile.TarFile(fileobj=tarstream, mode='w') tar.addfile(tarinfo, BytesIO(file_content)) tar.close() tarstream.seek(0) return tarstream log.info('copying to container: %s', locals()) dest_file = os.path.basename(dest) dest_dir = os.path.dirname(dest) file_data = open(src).read().encode('utf-8') tar_stream = create_tar_stream(file_content=file_data, file_name=dest_file) container.put_archive(path=dest_dir, data=tar_stream) container.exec_run('chown {0}:{1} -R {2}'.format(owner, group, dest))
def _download_cifar(out_dir): url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz' print(url) # if set_name == 'test': # out_name = 'test_lmdb.zip' # else: # out_name = '{category}_{set_name}_lmdb.zip'.format(**locals()) file_path = os.path.join(out_dir, 'cifar-10-python.tar.gz') if not os.path.exists(file_path): cmd = ['wget', url, '-P', out_dir] print('Downloading CIFAR') subprocess.call(cmd) # tfile = tarfile.TarFile(file_path) with tarfile.open(name=file_path, mode='r:gz') as tfile: tfile.extractall(path=out_dir)
def load_chunk(tarfile, size=None): """Load a number of images from a single imagenet .tar file. This function also converts the image from grayscale to RGB if necessary. Args: tarfile (tarfile.TarFile): The archive from which the files get loaded. size (Optional[Tuple[int, int]]): Resize the image to this size if provided. Returns: numpy.ndarray: Contains the image data in format [batch, w, h, c] """ result = [] filenames = [] for member in tarfile.getmembers(): filename = member.path content = tarfile.extractfile(member) img = Image.open(content) rgbimg = Image.new("RGB", img.size) rgbimg.paste(img) if size != None: rgbimg = rgbimg.resize(size, Image.ANTIALIAS) result.append(np.array(rgbimg).reshape(1, rgbimg.size[0], rgbimg.size[1], 3)) filenames.append(filename) return np.concatenate(result), filenames
def tar_error(tmpdir): def fixture(filename): path = os.path.join(str(tmpdir), filename) def reset(path): os.mkdir('d') with tarfile.TarFile(path, 'w') as archive: for file in ('a', 'b', 'c', 'd/e'): with open(file, 'w') as f: f.write('*') archive.add(file) os.remove(file) with tarfile.TarFile(path, 'r') as archive: archive.extractall() os.chdir(str(tmpdir)) reset(path) assert set(os.listdir('.')) == {filename, 'a', 'b', 'c', 'd'} assert set(os.listdir('./d')) == {'e'} return fixture
def test_load_from_tar(self): def generate_inmemory_tar(content): import StringIO import tarfile tar_sio = StringIO.StringIO() with tarfile.TarFile(fileobj=tar_sio, mode="w") as tar: content_sio = StringIO.StringIO() content_sio.write(content) content_sio.seek(0) info = tarfile.TarInfo(name=u"fóø".encode("utf-8")) info.size = len(content_sio.buf) tar.addfile(tarinfo=info, fileobj=content_sio) tar_sio.seek(0) tar_sio.name = "foo" return tar_sio content = u"testing únicódè" inc_files = IncludeFiles.load_from_tar(generate_inmemory_tar(content.encode("utf-8"))) self.assertEqual(inc_files.files.values()[0], content) # if encoding is not utf-8 then we cant load the file self.assertRaises(UnicodeDecodeError, IncludeFiles.load_from_tar, generate_inmemory_tar(content.encode("latin-1")))
def setUp(self): self.pid = 0 testdir = os.path.dirname(os.path.abspath(__file__)) self.root = os.path.join(testdir, 'fake') self.procfsroot = os.path.join( self.root, convirt.metrics.cgroups.PROCFS ) self.cgroupfsroot = os.path.join( self.root, convirt.metrics.cgroups.CGROUPFS ) with move_into(self.root): cgroupsdata = os.path.join(self.root, 'cgroups.tgz') with gzip.GzipFile(cgroupsdata) as gz: tar = tarfile.TarFile(fileobj=gz) tar.extractall() self.patch = monkey.Patch([ (convirt.metrics.cgroups, '_PROCBASE', self.procfsroot), (convirt.metrics.cgroups, '_CGROUPBASE', self.cgroupfsroot), ]) self.patch.apply()
def extract_filesystem_bundle(docker_driver, container_id=None, image_name=None): temporary_dir = tempfile.mkdtemp() # Get and save filesystem bundle if container_id is not None: data = docker_driver.get_docker_client().export(container=container_id).data name = container_id else: data = docker_driver.get_docker_client().get_image(image=image_name).data name = image_name.replace('/', '_').replace(':', '_') with open(temporary_dir + "/" + name + ".tar", "wb") as file: file.write(data) # Untar filesystem bundle tarfile = TarFile(temporary_dir + "/" + name + ".tar") tarfile.extractall(temporary_dir) os.remove(temporary_dir + "/" + name + ".tar") if image_name is not None: layers = _get_layers_from_manifest(temporary_dir) _untar_layers(temporary_dir, layers) # Return return temporary_dir # Clean the temporary directory
def _untar_layers(dir, layers): output = {} # Untar layer filesystem bundle for layer in layers: tarfile = TarFile(dir + "/" + layer) for member in tarfile.getmembers(): output[member.name] = member for member_name in output: try: tarfile.extract(output[member_name], path=dir, set_attrs=False) except (ValueError, ReadError): pass # Clean up for layer in layers: clean_up(dir + "/" + layer[:-10])
def write_tar_raw_data_stream(path, stream, uid, gid): """Extract data from tar raw in stream data. It extract the data from a stream to a file, and change owner to the uid with gid. :param path: file path :param stream: data in stream. :param uid: user uid :param gid: group uid """ try: stream_io = io.BytesIO(stream) except TypeError: stream_io = io.StringIO(stream) my_tar = tarfile.TarFile(fileobj=stream_io) my_tar.extractall(path=path) change_owner_dir(path, uid, gid)
def test_tarball(self): with self._temp_filesystem() as fs_path: fs_prefix = fs_path.lstrip('/') def strip_fs_prefix(tarinfo): if tarinfo.path.startswith(fs_prefix + '/'): tarinfo.path = tarinfo.path[len(fs_prefix) + 1:] elif fs_prefix == tarinfo.path: tarinfo.path = '.' else: raise AssertionError( f'{tarinfo.path} must start with {fs_prefix}' ) return tarinfo with tempfile.NamedTemporaryFile() as t: with tarfile.TarFile(t.name, 'w') as tar_obj: tar_obj.add(fs_path, filter=strip_fs_prefix) self._check_item( TarballItem(from_target='t', into_dir='y', tarball=t.name), self._temp_filesystem_provides('y'), {require_directory('y')}, f'tar --directory=y {t.name}', )
def test_get_size(self): tar_file = self.mox.CreateMock(tarfile.TarFile) tar_info = self.mox.CreateMock(tarfile.TarInfo) image = utils.RawTGZImage(None) self.mox.StubOutWithMock(image, '_as_tarfile') image._as_tarfile().AndReturn(tar_file) tar_file.next().AndReturn(tar_info) tar_info.size = 124 self.mox.ReplayAll() result = image.get_size() self.assertEqual(124, result) self.assertEqual(image._tar_info, tar_info) self.assertEqual(image._tar_file, tar_file)
def test_get_size_called_twice(self): tar_file = self.mox.CreateMock(tarfile.TarFile) tar_info = self.mox.CreateMock(tarfile.TarInfo) image = utils.RawTGZImage(None) self.mox.StubOutWithMock(image, '_as_tarfile') image._as_tarfile().AndReturn(tar_file) tar_file.next().AndReturn(tar_info) tar_info.size = 124 self.mox.ReplayAll() image.get_size() result = image.get_size() self.assertEqual(124, result) self.assertEqual(image._tar_info, tar_info) self.assertEqual(image._tar_file, tar_file)
def test_stream_to_without_size_retrieved(self): source_tar = self.mox.CreateMock(tarfile.TarFile) first_tarinfo = self.mox.CreateMock(tarfile.TarInfo) target_file = self.mox.CreateMock(open) source_file = self.mox.CreateMock(open) image = utils.RawTGZImage(None) image._image_service_and_image_id = ('service', 'id') self.mox.StubOutWithMock(image, '_as_tarfile', source_tar) self.mox.StubOutWithMock(utils.shutil, 'copyfileobj') image._as_tarfile().AndReturn(source_tar) source_tar.next().AndReturn(first_tarinfo) source_tar.extractfile(first_tarinfo).AndReturn(source_file) utils.shutil.copyfileobj(source_file, target_file) source_tar.close() self.mox.ReplayAll() image.stream_to(target_file)
def test_stream_to_with_size_retrieved(self): source_tar = self.mox.CreateMock(tarfile.TarFile) first_tarinfo = self.mox.CreateMock(tarfile.TarInfo) target_file = self.mox.CreateMock(open) source_file = self.mox.CreateMock(open) first_tarinfo.size = 124 image = utils.RawTGZImage(None) image._image_service_and_image_id = ('service', 'id') self.mox.StubOutWithMock(image, '_as_tarfile', source_tar) self.mox.StubOutWithMock(utils.shutil, 'copyfileobj') image._as_tarfile().AndReturn(source_tar) source_tar.next().AndReturn(first_tarinfo) source_tar.extractfile(first_tarinfo).AndReturn(source_file) utils.shutil.copyfileobj(source_file, target_file) source_tar.close() self.mox.ReplayAll() image.get_size() image.stream_to(target_file)
def test_start(self): outf = six.StringIO() producer = vdi_through_dev.TarGzProducer('fpath', outf, '100', 'fname') tfile = self.mox.CreateMock(tarfile.TarFile) tinfo = self.mox.CreateMock(tarfile.TarInfo) inf = self.mox.CreateMock(open) self.mox.StubOutWithMock(vdi_through_dev, 'tarfile') self.mox.StubOutWithMock(producer, '_open_file') vdi_through_dev.tarfile.TarInfo(name='fname').AndReturn(tinfo) vdi_through_dev.tarfile.open(fileobj=outf, mode='w|gz').AndReturn( fake_context(tfile)) producer._open_file('fpath', 'rb').AndReturn(fake_context(inf)) tfile.addfile(tinfo, fileobj=inf) outf.close() self.mox.ReplayAll() producer.start() self.assertEqual(100, tinfo.size)
def tar_compiled(file, dir, expression='^.+$'): """Used to tar a compiled application. The content of models, views, controllers is not stored in the tar file. """ tar = tarfile.TarFile(file, 'w') for file in listdir(dir, expression, add_dirs=True): filename = os.path.join(dir, file) if os.path.islink(filename): continue if os.path.isfile(filename) and file[-4:] != '.pyc': if file[:6] == 'models': continue if file[:5] == 'views': continue if file[:11] == 'controllers': continue if file[:7] == 'modules': continue tar.add(filename, file, False) tar.close()
def unpack(self, target_dir=None, password=None, pickle_fname=None): """ High level function for unpacking a backup file into the given target directory (will be generated based on the filename if not given). Creates also a filename.pickle file containing the exact order of the included files (required for repacking). :param target_dir: the directory to extract the backup file into (default: filename + _unpacked) :param password: optional password for decrypting the backup (can also be set in the constructor) """ if target_dir is None: target_dir = os.path.basename(self.fp.name) + '_unpacked' if pickle_fname is None: pickle_fname = os.path.basename(fname) + '.pickle' if not os.path.exists(target_dir): os.mkdir(target_dir) tar = self.read_data(password) members = tar.getmembers() # reopen stream (TarFile is not able to seek) tar = self.read_data(password) tar.extractall(path=target_dir, members=members) with open(pickle_fname, 'wb') as fp: pickle.dump(members, fp)
def tar_directory(path): cwd = os.getcwd() parent = os.path.dirname(path) directory = os.path.basename(path) tmp = tempfile.TemporaryFile() os.chdir(parent) tarball = tarfile.TarFile(fileobj=tmp, mode='w') tarball.add(directory) tarball.close() tmp.seek(0) out = tmp.read() tmp.close() os.chdir(cwd) return out