我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用tarfile.open()。
def convert_image(inpath, outpath, size): """Convert an image file using `sips`. Args: inpath (str): Path of source file. outpath (str): Path to destination file. size (int): Width and height of destination image in pixels. Raises: RuntimeError: Raised if `sips` exits with non-zero status. """ cmd = [ b'sips', b'-z', b'{0}'.format(size), b'{0}'.format(size), inpath, b'--out', outpath] # log().debug(cmd) with open(os.devnull, 'w') as pipe: retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT) if retcode != 0: raise RuntimeError('sips exited with {0}'.format(retcode))
def read_text_file(filename): """Return the contents of *filename*. Try to decode the file contents with utf-8, the preferred system encoding (e.g., cp1252 on some Windows machines), and latin1, in that order. Decoding a byte string with latin1 will never raise an error. In the worst case, the returned string will contain some garbage characters. """ with open(filename, 'rb') as fp: data = fp.read() encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1'] for enc in encodings: try: data = data.decode(enc) except UnicodeDecodeError: continue break assert type(data) != bytes # Latin1 should have worked. return data
def maybe_download_and_extract(): """Download and extract the tarball from Alex's website.""" dest_directory = FLAGS.data_dir if not os.path.exists(dest_directory): os.makedirs(dest_directory) filename = DATA_URL.split('/')[-1] filepath = os.path.join(dest_directory, filename) if not os.path.exists(filepath): def _progress(count, block_size, total_size): sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename, float(count * block_size) / float(total_size) * 100.0)) sys.stdout.flush() filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, reporthook=_progress) print() statinfo = os.stat(filepath) print('Successfully downloaded', filename, statinfo.st_size, 'bytes.') tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def __init__(self, fname=None, password=None, stream=True): """ :param fname: The filename of the backup file or a file-like object :param password: The password to use for the en-/decryption :param stream: Open the backup file in stream mode. Reduces memory usage but allows only sequential reads. Default: True """ self.fp = None self.version = None self.compression = None self.encryption = None self.stream = stream self.password = password # position of the actual file data (after the header) self.__data_start = 0 if isinstance(fname, str): self.open(fname) else: self.fp = fname
def read_data(self, password=None): """ Helper function which decrypts and decompresses the data if necessary and returns a tarfile.TarFile to interact with """ fp = self.fp fp.seek(self.__data_start) if self.is_encrypted(): fp = self._decrypt(fp, password=password) if self.compression == CompressionType.ZLIB: fp = self._decompress(fp) if self.stream: mode = 'r|*' else: mode = 'r:*' tar = tarfile.open(fileobj=fp, mode=mode) return tar
def download_driver_file(whichbin, url, base_path): if url.endswith('.tar.gz'): ext = '.tar.gz' else: ext = '.zip' print("Downloading from: {}".format(url)) download_file(url, '/tmp/pwr_temp{}'.format(ext)) if ext == '.tar.gz': import tarfile tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz") tar.extractall('{}/'.format(base_path)) tar.close() else: import zipfile with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z: z.extractall('{}/'.format(base_path)) # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url: # os.rename('{}/geckodriver'.format(base_path), # '{}/wires'.format(base_path)) # os.chmod('{}/wires'.format(base_path), 0o775) if whichbin == 'wires': os.chmod('{}/geckodriver'.format(base_path), 0o775) else: os.chmod('{}/chromedriver'.format(base_path), 0o775)
def testUploadPackageNoFail(self): """The nofail option must prevent fatal error on upload failures""" archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]}) archive.wantUpload(True) with TemporaryDirectory() as tmp: # create simple workspace audit = os.path.join(tmp, "audit.json.gz") content = os.path.join(tmp, "workspace") with open(audit, "wb") as f: f.write(b"AUDIT") os.mkdir(content) with open(os.path.join(content, "data"), "wb") as f: f.write(b"DATA") # must not throw archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 0) archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 1) # also live-build-id upload errors must not throw with nofail archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 0) archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 1)
def testUploadJenkinsNoFail(self): """The nofail option must prevent fatal error on upload failures""" archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]}) archive.wantUpload(True) with TemporaryDirectory() as tmp: with open(os.path.join(tmp, "error.buildid"), "wb") as f: f.write(ERROR_UPLOAD_ARTIFACT) self.__createArtifactByName(os.path.join(tmp, "result.tgz")) # these uploads must not fail even though they do not succeed script = archive.upload(None, "error.buildid", "result.tgz") callJenkinsScript(script, tmp) script = archive.uploadJenkinsLiveBuildId(None, "error.buildid", "test.buildid") callJenkinsScript(script, tmp)
def testInvalidServer(self): """Test download on non-existent server""" spec = { 'url' : "https://127.1.2.3:7257" } archive = SimpleHttpArchive(spec) archive.wantDownload(True) archive.wantUpload(True) # Local archive.downloadPackage(b'\x00'*20, "unused", "unused", 0) archive.downloadPackage(b'\x00'*20, "unused", "unused", 1) self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None) # Jenkins with TemporaryDirectory() as workspace: with open(os.path.join(workspace, "test.buildid"), "wb") as f: f.write(b'\x00'*20) script = archive.download(None, "test.buildid", "result.tgz") callJenkinsScript(script, workspace)
def makeSrcTar(self, container): # this function violates the chroot encapsulation quite badly... filename = os.path.join(self._pkgBuildDir, self._pkgTar) if filename.endswith(".tar"): mode = "w" elif filename.endswith(".tar.gz"): mode = "w:gz" else: raise Exception("Unknown tar format '%s'" % filename) # make tarball from source with tarfile.open(filename, mode) as tar: tar.add(container.getSourceDir(), arcname=os.path.basename(container.getSourceDir())) ## # @brief Write a control file for building the debian package. # # @return None
def safeInstall(): FACTORIOPATH = getFactorioPath() try: if not os.path.isdir("%s" % (FACTORIOPATH) ): if os.access("%s/.." % (FACTORIOPATH), os.W_OK): os.mkdir(FACTORIOPATH, 0o777) else: subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH]) subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH]) os.mkdir(os.path.join(FACTORIOPATH, "saves")) os.mkdir(os.path.join(FACTORIOPATH, "config")) with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc: lines = bashrc.read() if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1: bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") print("You'll want to restart your shell for command autocompletion. Tab is your friend.") updateFactorio() except IOError as e: print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e)) sys.exit(1)
def setUp(self): self.dir = tempfile.mkdtemp() setup = os.path.join(self.dir, 'setup.py') f = open(setup, 'w') f.write(SETUP_PY) f.close() self.old_cwd = os.getcwd() os.chdir(self.dir) self.old_enable_site = site.ENABLE_USER_SITE self.old_file = easy_install_pkg.__file__ self.old_base = site.USER_BASE site.USER_BASE = tempfile.mkdtemp() self.old_site = site.USER_SITE site.USER_SITE = tempfile.mkdtemp() easy_install_pkg.__file__ = site.USER_SITE
def make_trivial_sdist(dist_path, setup_py): """Create a simple sdist tarball at dist_path, containing just a setup.py, the contents of which are provided by the setup_py string. """ setup_py_file = tarfile.TarInfo(name='setup.py') try: # Python 3 (StringIO gets converted to io module) MemFile = BytesIO except AttributeError: MemFile = StringIO setup_py_bytes = MemFile(setup_py.encode('utf-8')) setup_py_file.size = len(setup_py_bytes.getvalue()) dist = tarfile.open(dist_path, 'w:gz') try: dist.addfile(setup_py_file, fileobj=setup_py_bytes) finally: dist.close()
def _tar(self): '''Tar processor''' archive = tarfile.open(mode='r', fileobj=self.cur_attachment.file_obj) loc_attach = [] for subfile in archive.getmembers(): f = archive.extractfile(subfile) if f is None: # Directory continue try: cur_file = File(f.read(), subfile.name) self.process_payload(cur_file) loc_attach.append(self.cur_attachment) except Exception: self.cur_attachment.make_dangerous() return self.cur_attachment return loc_attach
def maybe_download_and_extract(data_dir, url='http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'): if not os.path.exists(os.path.join(data_dir, 'cifar-10-batches-py')): if not os.path.exists(data_dir): os.makedirs(data_dir) filename = url.split('/')[-1] filepath = os.path.join(data_dir, filename) if not os.path.exists(filepath): def _progress(count, block_size, total_size): sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename, float(count * block_size) / float(total_size) * 100.0)) sys.stdout.flush() filepath, _ = urllib.request.urlretrieve(url, filepath, _progress) print() statinfo = os.stat(filepath) print('Successfully downloaded', filename, statinfo.st_size, 'bytes.') tarfile.open(filepath, 'r:gz').extractall(data_dir)
def create_tar_archive(issuer, test_profile): mk_tar_dir(issuer, test_profile) wd = os.getcwd() _dir = os.path.join(wd, "tar", issuer) os.chdir(_dir) tar = tarfile.open("{}.tar".format(test_profile), "w") for item in os.listdir(test_profile): if item.startswith("."): continue fn = os.path.join(test_profile, item) if os.path.isfile(fn): tar.add(fn) tar.close() os.chdir(wd)
def unpack(self): EnvironmentUtils.check_docker() poco_file = None for file in next(os.walk(os.getcwd()))[2]: if file.endswith(".poco"): poco_file = file tar = tarfile.open(file) tar.extractall() tar.close() break if poco_file is None: ColorPrint.exit_after_print_messages(message=".poco file not exists in current directory") cmd = list() cmd.append("docker") cmd.append("load") cmd.append("-i") cmd.append(poco_file.rstrip("poco") + "tar") self.run_script(cmd=cmd)
def set_admin_token(admin_token='None'): """Set admin token according to deployment config or use a randomly generated token if none is specified (default). """ if admin_token != 'None': log('Configuring Keystone to use a pre-configured admin token.') token = admin_token else: log('Configuring Keystone to use a random admin token.') if os.path.isfile(STORED_TOKEN): msg = 'Loading a previously generated' \ ' admin token from %s' % STORED_TOKEN log(msg) with open(STORED_TOKEN, 'r') as f: token = f.read().strip() else: token = pwgen(length=64) with open(STORED_TOKEN, 'w') as out: out.write('%s\n' % token) return(token)
def maybe_download_and_extract(): """Download and extract model tar file.""" dest_directory = FLAGS.model_dir if not os.path.exists(dest_directory): os.makedirs(dest_directory) filename = DATA_URL.split('/')[-1] filepath = os.path.join(dest_directory, filename) if not os.path.exists(filepath): def _progress(count, block_size, total_size): sys.stdout.write('\r>> Downloading %s %.1f%%' % ( filename, float(count * block_size) / float(total_size) * 100.0)) sys.stdout.flush() filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress) print() statinfo = os.stat(filepath) print('Successfully downloaded', filename, statinfo.st_size, 'bytes.') tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def add_tar_file(self, x, tar): """ Add a file to the tar archive. Transform symlinks into files if the files lie out of the project tree. """ p = self.get_tar_path(x) tinfo = tar.gettarinfo(name=p, arcname=self.get_tar_prefix() + '/' + x.path_from(self.base_path)) tinfo.uid = 0 tinfo.gid = 0 tinfo.uname = 'root' tinfo.gname = 'root' fu = None try: fu = open(p, 'rb') tar.addfile(tinfo, fileobj=fu) finally: if fu: fu.close()
def make_tarfile(self, filename, files, **kw): if kw.get('add_to_package', True): self.files.append(filename) with tarfile.open(filename, TARFORMAT) as tar: endname = os.path.split(filename)[-1] endname = endname.split('.')[0] + '/' for x in files: tarinfo = tar.gettarinfo(x, x) tarinfo.uid = tarinfo.gid = 0 tarinfo.uname = tarinfo.gname = 'root' tarinfo.size = os.stat(x).st_size # TODO - more archive creation options? if kw.get('bare', True): tarinfo.name = os.path.split(x)[1] else: tarinfo.name = endname + x # todo, if tuple, then.. Logs.debug("adding %r to %s" % (tarinfo.name, filename)) with open(x, 'rb') as f: tar.addfile(tarinfo, f) Logs.info('Created %s' % filename)
def open_compressed(filename, open_flag='r', compression_type='bz2'): """Opens a compressed HDF5File with the given opening flags. For the 'r' flag, the given compressed file will be extracted to a local space. For 'w', an empty HDF5File is created. In any case, the opened HDF5File is returned, which needs to be closed using the close_compressed() function. """ # create temporary HDF5 file name hdf5_file_name = tempfile.mkstemp('.hdf5', 'bob_')[1] if open_flag == 'r': # extract the HDF5 file from the given file name into a temporary file name tar = tarfile.open(filename, mode="r:" + compression_type) memory_file = tar.extractfile(tar.next()) real_file = open(hdf5_file_name, 'wb') real_file.write(memory_file.read()) del memory_file real_file.close() tar.close() return bob.io.base.HDF5File(hdf5_file_name, open_flag)
def close_compressed(filename, hdf5_file, compression_type='bz2', create_link=False): """Closes the compressed hdf5_file that was opened with open_compressed. When the file was opened for writing (using the 'w' flag in open_compressed), the created HDF5 file is compressed into the given file name. To be able to read the data using the real tools, a link with the correct extension might is created, when create_link is set to True. """ hdf5_file_name = hdf5_file.filename is_writable = hdf5_file.writable hdf5_file.close() if is_writable: # create compressed tar file tar = tarfile.open(filename, mode="w:" + compression_type) tar.add(hdf5_file_name, os.path.basename(filename)) tar.close() if create_link: extension = {'': '.tar', 'bz2': '.tar.bz2', 'gz': 'tar.gz'}[compression_type] link_file = filename + extension if not os.path.exists(link_file): os.symlink(os.path.basename(filename), link_file) # clean up locally generated files os.remove(hdf5_file_name)
def add_pkg_metadata(self, metadata_tar, deb): try: with tarfile.open(metadata_tar) as tar: # Metadata is expected to be in a file. control_file_member = filter(lambda f: os.path.basename(f.name) == TarFile.PKG_METADATA_FILE, tar.getmembers()) if not control_file_member: raise self.DebError(deb + ' does not Metadata File!') control_file = tar.extractfile(control_file_member[0]) metadata = ''.join(control_file.readlines()) destination_file = os.path.join(TarFile.DPKG_STATUS_DIR, TarFile.parse_pkg_name(metadata, deb)) with self.write_temp_file(data=metadata) as metadata_file: self.add_file(metadata_file, destination_file) except (KeyError, TypeError) as e: raise self.DebError(deb + ' contains invalid Metadata! Exeception {0}'.format(e)) except Exception as e: raise self.DebError('Unknown Exception {0}. Please report an issue at' ' github.com/bazelbuild/rules_docker.'.format(e))
def create_bundle(output, tag_to_config, diffid_to_blobsum, blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy): """Creates a Docker image from a list of layers. Args: output: the name of the docker image file to create. layers: the layers (tar files) to join to the image. tag_to_layer: a map from docker_name.Tag to the layer id it references. layer_to_tags: a map from the name of the layer tarball as it appears in our archives to the list of tags applied to it. """ with tarfile.open(output, 'w') as tar: def add_file(filename, contents): info = tarfile.TarInfo(filename) info.size = len(contents) tar.addfile(tarinfo=info, fileobj=cStringIO.StringIO(contents)) tag_to_image = {} for (tag, config) in six.iteritems(tag_to_config): tag_to_image[tag] = FromParts( config, diffid_to_blobsum, blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy) v2_2_save.multi_image_tarball(tag_to_image, tar)
def extract_tarfile(archive_name, destpath): "Unpack a tar archive, optionally compressed" archive = tarfile.open(archive_name) archive.extractall(destpath)
def load_batch(fpath): with open(fpath, 'rb') as f: d = pickle.load(f, encoding='latin1') data = d["data"] labels = d["labels"] return data, labels
def untar(fname): if (fname.endswith("tar.gz")): tar = tarfile.open(fname) tar.extractall() tar.close() print("File Extracted in Current Directory") else: print("Not a tar.gz file: '%s '" % sys.argv[0])
def write_set_file(fout, labels): with open(fout, 'w+') as f: for label in labels: f.write('%s/%s %s\n' % (cwd, label[0], label[1])) # Images are ordered by species, so shuffle them
def copy_stream(self, instream, outfile, encoding=None): assert not os.path.isdir(outfile) self.ensure_dir(os.path.dirname(outfile)) logger.info('Copying stream %s to %s', instream, outfile) if not self.dry_run: if encoding is None: outstream = open(outfile, 'wb') else: outstream = codecs.open(outfile, 'w', encoding=encoding) try: shutil.copyfileobj(instream, outstream) finally: outstream.close() self.record_as_written(outfile)
def write_binary_file(self, path, data): self.ensure_dir(os.path.dirname(path)) if not self.dry_run: with open(path, 'wb') as f: f.write(data) self.record_as_written(path)
def write_text_file(self, path, data, encoding): self.ensure_dir(os.path.dirname(path)) if not self.dry_run: with open(path, 'wb') as f: f.write(data.encode(encoding)) self.record_as_written(path)
def _csv_open(fn, mode, **kwargs): if sys.version_info[0] < 3: mode += 'b' else: kwargs['newline'] = '' return open(fn, mode, **kwargs)
def file_contents(filename): with open(filename, 'rb') as fp: return fp.read().decode('utf-8')
def get_terminal_size(): """Returns a tuple (x, y) representing the width(x) and the height(x) in characters of the terminal window.""" def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack( 'hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234') ) except: return None if cr == (0, 0): return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(cr[1]), int(cr[0])
def unzip_file(filename, location, flatten=True): """ Unzip the file (with path `filename`) to the destination `location`. All files are written based on system defaults and umask (i.e. permissions are not preserved), except that regular file members with any execute permissions (user, group, or world) have "chmod +x" applied after being written. Note that for windows, any execute changes using os.chmod are no-ops per the python docs. """ ensure_dir(location) zipfp = open(filename, 'rb') try: zip = zipfile.ZipFile(zipfp, allowZip64=True) leading = has_leading_dir(zip.namelist()) and flatten for info in zip.infolist(): name = info.filename data = zip.read(name) fn = name if leading: fn = split_leading_dir(name)[1] fn = os.path.join(location, fn) dir = os.path.dirname(fn) if fn.endswith('/') or fn.endswith('\\'): # A directory ensure_dir(fn) else: ensure_dir(dir) fp = open(fn, 'wb') try: fp.write(data) finally: fp.close() mode = info.external_attr >> 16 # if mode and regular file and any execute permissions for # user/group/world? if mode and stat.S_ISREG(mode) and mode & 0o111: # make dest file have execute for user/group/world # (chmod +x) no-op on windows per python docs os.chmod(fn, (0o777 - current_umask() | 0o111)) finally: zipfp.close()
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): """Unpack zip `filename` to `extract_dir` Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation of the `progress_filter` argument. """ if not zipfile.is_zipfile(filename): raise UnrecognizedFormat("%s is not a zip file" % (filename,)) with ContextualZipFile(filename) as z: for info in z.infolist(): name = info.filename # don't extract absolute paths or ones with .. in them if name.startswith('/') or '..' in name.split('/'): continue target = os.path.join(extract_dir, *name.split('/')) target = progress_filter(name, target) if not target: continue if name.endswith('/'): # directory ensure_directory(target) else: # file ensure_directory(target) data = z.read(info.filename) with open(target, 'wb') as f: f.write(data) unix_attributes = info.external_attr >> 16 if unix_attributes: os.chmod(target, unix_attributes)