我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.TarError()。
def try_open_tar(file): try: tar_file = tarfile.open(fileobj=file) except tarfile.TarError: return None lock = RLock() def open(name): try: with lock: file = tar_file.extractfile(name) except KeyError: raise FileNotFoundError(name) from None unlocked_read = file.read def locked_read(n=-1): with lock: return unlocked_read(n) file.read = locked_read return file return open
def copy_revision(self, id_or_num: t.Union[int, str], sub_dir: str, dest_dirs: t.List[str]): typecheck_locals(id_or_num=self.id_type, dest_dirs=List(Str())|Str()) if isinstance(dest_dirs, str): dest_dirs = [dest_dirs] if id_or_num == -1 or id_or_num == "HEAD": self._copy_dir(sub_dir, dest_dirs) sub_dir = os.path.join(self.base_path, sub_dir) tar_file = os.path.abspath(os.path.join(Settings()["tmp_dir"], "tmp.tar")) cmd = "git archive --format tar --output {} {}".format(tar_file, self._commit_number_to_id(id_or_num)) self._exec_command(cmd) try: with tarfile.open(tar_file) as tar: for dest in dest_dirs: if sub_dir == ".": tar.extractall(os.path.abspath(dest)) else: subdir_and_files = [ tarinfo for tarinfo in tar.getmembers() if tarinfo.name.startswith(sub_dir + "/") or tarinfo.name is sub_dir ] tar.extractall(members=subdir_and_files, path=os.path.abspath(dest)) except tarfile.TarError as err: os.remove(tar_file) raise VCSError(str(err)) os.remove(tar_file)
def extract(self, password=None): self.verify(password) try: with tarfile.open(self.filename, errorlevel=2) as t: t.extractall(self.dest) self.files = t.getnames() return self.files except tarfile.ExtractError, e: self.log_warning(e) except tarfile.CompressionError, e: raise CRCError(e) except (OSError, tarfile.TarError), e: raise ArchiveError(e)
def __find_extract_offsets(self): """Private helper method to find offsets for individual archive member extraction. """ if self.__extract_offsets: return # This causes the entire archive to be read, but is the only way # to find the offsets to extract everything. try: for member in self.__arc_tfile.getmembers(): self.__extract_offsets[member.name] = \ member.offset except tf.TarError: # Read error encountered. raise InvalidArchive(self.__arc_name) except EnvironmentError as e: raise apx._convert_error(e)
def downloadPackage(self, buildId, audit, content, verbose): if not self.canDownloadLocal(): return False if verbose > 0: print(colorize(" DOWNLOAD {} from {} .. " .format(content, self._remoteName(buildId, ARTIFACT_SUFFIX)), "32"), end="") else: print(colorize(" DOWNLOAD {} .. ".format(content), "32"), end="") try: with self._openDownloadFile(buildId, ARTIFACT_SUFFIX) as (name, fileobj): with tarfile.open(name, "r|*", fileobj=fileobj, errorlevel=1) as tar: removePath(audit) removePath(content) os.makedirs(content) self.__extractPackage(tar, audit, content) print(colorize("ok", "32")) return True except ArtifactNotFoundError: print(colorize("not found", "33")) return False except ArtifactDownloadError as e: print(colorize(e.reason, "33")) return False except BuildError as e: print(colorize("error", "31")) raise except OSError as e: print(colorize("error", "31")) raise BuildError("Cannot download artifact: " + str(e)) except tarfile.TarError as e: print(colorize("error", "31")) raise BuildError("Error extracting binary artifact: " + str(e))
def uploadPackage(self, buildId, audit, content, verbose): if not self.canUploadLocal(): return shown = False try: with self._openUploadFile(buildId, ARTIFACT_SUFFIX) as (name, fileobj): pax = { 'bob-archive-vsn' : "1" } if verbose > 0: print(colorize(" UPLOAD {} to {} .. " .format(content, self._remoteName(buildId, ARTIFACT_SUFFIX)), "32"), end="") else: print(colorize(" UPLOAD {} .. ".format(content), "32"), end="") shown = True with gzip.open(name or fileobj, 'wb', 6) as gzf: with tarfile.open(name, "w", fileobj=gzf, format=tarfile.PAX_FORMAT, pax_headers=pax) as tar: tar.add(audit, "meta/" + os.path.basename(audit)) tar.add(content, arcname="content") print(colorize("ok", "32")) except ArtifactExistsError: if shown: print("skipped ({} exists in archive)".format(content)) else: print(" UPLOAD skipped ({} exists in archive)".format(content)) except (ArtifactUploadError, tarfile.TarError, OSError) as e: if shown: if verbose > 0: print(colorize("error ("+str(e)+")", "31")) else: print(colorize("error", "31")) if not self.__ignoreErrors: raise BuildError("Cannot upload artifact: " + str(e))
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined by ``tarfile.open()``). See ``unpack_archive()`` for an explanation of the `progress_filter` argument. """ try: tarobj = tarfile.open(filename) except tarfile.TarError: raise UnrecognizedFormat( "%s is not a compressed or uncompressed tar file" % (filename,) ) try: tarobj.chown = lambda *args: None # don't do any chowning! for member in tarobj: name = member.name # don't extract absolute paths or ones with .. in them if not name.startswith('/') and '..' not in name.split('/'): prelim_dst = os.path.join(extract_dir, *name.split('/')) # resolve any links and to extract the link targets as normal files while member is not None and (member.islnk() or member.issym()): linkpath = member.linkname if member.issym(): linkpath = posixpath.join(posixpath.dirname(member.name), linkpath) linkpath = posixpath.normpath(linkpath) member = tarobj._getmember(linkpath) if member is not None and (member.isfile() or member.isdir()): final_dst = progress_filter(name, prelim_dst) if final_dst: if final_dst.endswith(os.sep): final_dst = final_dst[:-1] try: tarobj._extract_member(member, final_dst) # XXX Ugh except tarfile.ExtractError: pass # chown/chmod/mkfifo/mknode/makedev failed return True finally: tarobj.close()
def _generate_tar(dir_path): """Private function that reads a local directory and generates a tar archive from it""" try: with tarfile.open(dir_path + '.tar', 'w') as tar: tar.add(dir_path) except tarfile.TarError as e: stderr("Error: tar archive creation failed [" + str(e) + "]", exit=1)
def _extract_tar(self, file_obj, path): try: with tarfile.open(fileobj=file_obj, mode='r:gz') as archive: archive.extractall(path) except tarfile.TarError: raise self.BadArchive
def _unpack_tarfile(filename, extract_dir): """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` """ try: tarobj = tarfile.open(filename) except tarfile.TarError: raise ReadError( "%s is not a compressed or uncompressed tar file" % filename) try: tarobj.extractall(extract_dir) finally: tarobj.close()
def _unpack_tarfile(filename, extract_dir): """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` """ try: tarobj = tarfile.open(filename) except tarfile.TarError: raise ReadError( "%s is not a compressed or uncompressed tar file" % filename) try: tarobj.extractall(extract_dir) finally: tarobj.close()
def unpack(self, file_path, temp_dir): try: with tarfile.open(name=file_path, mode='r:*') as tf: tf.extractall(temp_dir) except tarfile.TarError as err: raise ProcessingException('Invalid Tar file: %s' % err)
def verify(self, password=None): try: t = tarfile.open(self.filename, errorlevel=1) except tarfile.CompressionError, e: raise CRCError(e) except (OSError, tarfile.TarError), e: raise ArchiveError(e) else: t.close()
def Archive(path_or_file): """ return in-memory Archive object, wrapping ZipArchive or TarArchive with uniform methods. If an error is raised, any passed in file will be closed. An Archive instance acts as a context manager so that you can use:: with Archive(...) as archive: archive.extract(...) # or other methods and be sure that file handles will be closed. If you do not use it as a context manager, you need to call archive.close() yourself. """ if hasattr(path_or_file, "seek"): f = path_or_file else: f = open(str(path_or_file), "rb") try: try: return ZipArchive(f) except zipfile.BadZipfile: f.seek(0) try: return TarArchive(f) except tarfile.TarError: raise UnsupportedArchive() except Exception: f.close() raise
def archivate(src_dir, dest_dir, prefix="", console=False): """ Zip src_dir to dest_dir """ filename = os.path.join(dest_dir, "%s%s.tgz" % (prefix, os.path.basename(src_dir))) error_msg = None try: if console: tar_cmd = "tar xvfz %s %s" % (filename, src_dir) debug("Running " + tar_cmd) tar = subprocess.Popen(tar_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) tar.wait(timeout=TAR_TIMEOUT) tar_error = str(tar.stderr.read()).lower() if "fatal" in tar_error or "error" in tar_error: error_msg = tar_error else: debug("Creating tar archive %s from %s" % (filename, src_dir)) tar = tarfile.open(filename, "w:gz") tar.add(src_dir, arcname=".", recursive=True) tar.close() except (FileExistsError): os.unlink(filename) archivate(src_dir, dest_dir, prefix, console) except (FileNotFoundError) as e: log("Failed to tar %s with Python lib. Trying to use console tar. Error was %s" % (src_dir, str(e))) except (tarfile.TarError, OSError) as e: error_msg = str(e) if error_msg and not console: archivate(src_dir, dest_dir, prefix, console=True) elif error_msg: error(error_msg)
def extract_tarpath(self, tarpath, destpath): """ Extract the tar path """ try: tar = tarfile.open(tarpath) tar.extractall(path=destpath) except tarfile.TarError: msg = "Can not open the tar/gz file: %s" % tarpath log.error(msg) raise introexceptions.ImageLoadErrorFromTarfile(msg)
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined by ``tarfile.open()``). See ``unpack_archive()`` for an explanation of the `progress_filter` argument. """ try: tarobj = tarfile.open(filename) except tarfile.TarError: raise UnrecognizedFormat( "%s is not a compressed or uncompressed tar file" % (filename,) ) with contextlib.closing(tarobj): # don't do any chowning! tarobj.chown = lambda *args: None for member in tarobj: name = member.name # don't extract absolute paths or ones with .. in them if not name.startswith('/') and '..' not in name.split('/'): prelim_dst = os.path.join(extract_dir, *name.split('/')) # resolve any links and to extract the link targets as normal # files while member is not None and (member.islnk() or member.issym()): linkpath = member.linkname if member.issym(): base = posixpath.dirname(member.name) linkpath = posixpath.join(base, linkpath) linkpath = posixpath.normpath(linkpath) member = tarobj._getmember(linkpath) if member is not None and (member.isfile() or member.isdir()): final_dst = progress_filter(name, prelim_dst) if final_dst: if final_dst.endswith(os.sep): final_dst = final_dst[:-1] try: # XXX Ugh tarobj._extract_member(member, final_dst) except tarfile.ExtractError: # chown/chmod/mkfifo/mknode/makedev failed pass return True
def __scan(self, fileName, verbose): try: st = binStat(fileName) bid = bytes.fromhex(fileName[0:2] + fileName[3:5] + fileName[6:42]) # validate entry in caching db if bid in self.__db: info = pickle.loads(self.__db[bid]) if info['stat'] == st: return del self.__db[bid] # read audit trail if verbose: print(fileName) with tarfile.open(fileName, errorlevel=1) as tar: # validate if tar.pax_headers.get('bob-archive-vsn') != "1": print("Not a Bob archive:", fileName, "Ignored!") return # find audit trail f = tar.next() while f: if f.name == "meta/audit.json.gz": break f = tar.next() else: raise Error("Missing audit trail!") # read audit trail auditJsonGz = tar.extractfile(f) auditJson = gzip.GzipFile(fileobj=auditJsonGz) audit = Audit.fromByteStream(auditJson) # import data artifact = audit.getArtifact() self.__db[bid] = pickle.dumps({ 'stat' : st, 'refs' : audit.getReferencedBuildIds(), 'vars' : { 'meta' : artifact.getMetaData(), 'build' : artifact.getBuildInfo(), 'metaEnv' : artifact.getMetaEnv(), } }) except tarfile.TarError as e: raise BobError("Cannot read {}: {}".format(fileName, str(e))) except OSError as e: raise BobError(str(e))
def export(local_root, commit, target): """Export git commit to directory. "Extracts" all files at the commit to the target directory. Set mtime of RST files to last commit date. :raise CalledProcessError: Unhandled git command failure. :param str local_root: Local path to git root directory. :param str commit: Git commit SHA to export. :param str target: Directory to export to. """ log = logging.getLogger(__name__) target = os.path.realpath(target) mtimes = list() # Define extract function. def extract(stdout): """Extract tar archive from "git archive" stdout. :param file stdout: Handle to git's stdout pipe. """ queued_links = list() try: with tarfile.open(fileobj=stdout, mode='r|') as tar: for info in tar: log.debug('name: %s; mode: %d; size: %s; type: %s', info.name, info.mode, info.size, info.type) path = os.path.realpath(os.path.join(target, info.name)) if not path.startswith(target): # Handle bad paths. log.warning('Ignoring tar object path %s outside of target directory.', info.name) elif info.isdir(): # Handle directories. if not os.path.exists(path): os.makedirs(path, mode=info.mode) elif info.issym() or info.islnk(): # Queue links. queued_links.append(info) else: # Handle files. tar.extract(member=info, path=target) if os.path.splitext(info.name)[1].lower() == '.rst': mtimes.append(info.name) for info in (i for i in queued_links if os.path.exists(os.path.join(target, i.linkname))): tar.extract(member=info, path=target) except tarfile.TarError as exc: log.debug('Failed to extract output from "git archive" command: %s', str(exc)) # Run command. run_command(local_root, ['git', 'archive', '--format=tar', commit], pipeto=extract) # Set mtime. for file_path in mtimes: last_committed = int(run_command(local_root, ['git', 'log', '-n1', '--format=%at', commit, '--', file_path])) os.utime(os.path.join(target, file_path), (last_committed, last_committed))