我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用tarfile.DIRTYPE。
def create_user_dirs(self, tar_handle): """ Create root file system tree in tar archive. """ tar_members = [ ['dirs', tarfile.DIRTYPE], ['files', tarfile.REGTYPE], ] for user in self.rootfs_tree: for members, tar_type in tar_members: self.create_tar_members( tar_handle, self.rootfs_tree[user][members], tar_type, uid=self.rootfs_tree[user]['uid'], gid=self.rootfs_tree[user]['gid'] )
def install_freeze(self, container, arch=None): self.logger.info("Installing freeze on container \"%s\"", container) def layout_filter(obj): obj.uid = 0 obj.gid = 0 return obj if arch is None: arch = self.platform args = ['cp', '-', "{}:/".format(container)] proc = DockerProcess(self, args, stdin=PIPE) tar = tarfile.open(fileobj=proc.stdin, mode='w|') bindir = tarfile.TarInfo(name="bin") bindir.uid = 0 bindir.gid = 0 bindir.mode = 0o0755 bindir.type = tarfile.DIRTYPE tar.addfile(bindir) tar.add(os.path.join(os.path.dirname(__file__), 'freeze', 'freeze_{}'.format( arch)), arcname="bin/freeze", filter=layout_filter) tar.close() proc.stdin.close() if proc.wait() != 0: raise ExternalProcessError( "Error installing freeze on container \"{}\"".format(container), proc)
def test_v7_dirtype(self): # Test old style dirtype member (bug #1336623): # Old V7 tars create directory members using an AREGTYPE # header with a "/" appended to the filename field. tarinfo = self.tar.getmember("misc/dirtype-old-v7") self.assertTrue(tarinfo.type == tarfile.DIRTYPE, "v7 dirtype failed")
def test_read_longname(self): # Test reading of longname (bug #1471427). longname = self.subdir + "/" + "123/" * 125 + "longname" try: tarinfo = self.tar.getmember(longname) except KeyError: self.fail("longname not found") self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
def _make_tar(archive_dest_dir, contents, compression=None): mode = 'w' extension = '.tar' if compression == 'gz': mode = mode + ':gz' extension = extension + '.gz' elif compression == 'bz2': mode = mode + ':bz2' extension = extension + '.bz2' # the tarfile API only lets us put in files, so we need # files to put in a_directory = os.path.join(archive_dest_dir, "a_directory") os.mkdir(a_directory) a_file = os.path.join(archive_dest_dir, "a_file") with open(a_file, 'w') as f: f.write("hello") a_symlink = os.path.join(archive_dest_dir, "a_link") if _CONTENTS_SYMLINK in contents.values(): os.symlink("/somewhere", a_symlink) archivefile = os.path.join(archive_dest_dir, "foo" + extension) with tarfile.open(archivefile, mode) as tf: for (key, what) in contents.items(): t = tarfile.TarInfo(key) if what is _CONTENTS_DIR: t.type = tarfile.DIRTYPE elif what is _CONTENTS_FILE: pass elif what is _CONTENTS_SYMLINK: t.type = tarfile.SYMTYPE tf.addfile(t) os.remove(a_file) os.rmdir(a_directory) if os.path.exists(a_symlink): os.remove(a_symlink) return archivefile
def test_v7_dirtype(self): # Test old style dirtype member (bug #1336623): # Old V7 tars create directory members using an AREGTYPE # header with a "/" appended to the filename field. tarinfo = self.tar.getmember("misc/dirtype-old-v7") self.assertEqual(tarinfo.type, tarfile.DIRTYPE, "v7 dirtype failed")
def test_read_longname(self): # Test reading of longname (bug #1471427). longname = self.subdir + "/" + "123/" * 125 + "longname" try: tarinfo = self.tar.getmember(longname) except KeyError: self.fail("longname not found") self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, "read longname as dirtype")
def make_build_context(self): """ Makes a Docker build context from a local directory. Normalises all file ownership and times so that the docker hashes align better. """ # Start temporary tar file fileobj = tempfile.NamedTemporaryFile() tfile = tarfile.open(mode='w:gz', fileobj=fileobj) # Get list of files/dirs to add to the tar paths = exclude_paths(self.container.path, []) # For each file, add it to the tar with normalisation for path in paths: disk_location = os.path.join(self.container.path, path) # Directory addition if os.path.isdir(disk_location): info = tarfile.TarInfo(name=path) info.mtime = 0 info.mode = 0o775 info.type = tarfile.DIRTYPE info.uid = 0 info.gid = 0 info.uname = "root" info.gname = "root" tfile.addfile(info) # Normal file addition elif os.path.isfile(disk_location): stat = os.stat(disk_location) info = tarfile.TarInfo(name=path) info.mtime = 0 info.size = stat.st_size info.mode = 0o755 info.type = tarfile.REGTYPE info.uid = 0 info.gid = 0 info.uname = "root" info.gname = "root" # Rewrite docker FROM lines with a : in them and raise a warning # TODO: Deprecate this! if path.lstrip("/") == self.container.dockerfile_name: # Read in dockerfile line by line, replacing the FROM line dockerfile = io.BytesIO() with open(disk_location, "r") as fh: for line in fh: if line.upper().startswith("FROM ") and self.container.build_parent_in_prefix: line = line.replace(":", "-") dockerfile.write(line.encode("utf8")) dockerfile.seek(0) tfile.addfile(info, dockerfile) else: with open(disk_location, "rb") as fh: tfile.addfile(info, fh) # Error for anything else else: raise ValueError( "Cannot add non-file/dir %s to docker build context" % path ) # Return that tarfile tfile.close() fileobj.seek(0) return fileobj
def check_duplicates_files(info): print('Checking for duplicate files ...') map_members_scase = defaultdict(set) map_members_icase = {} for dist in info['_dists']: fn = filename_dist(dist) fn_path = join(info['_download_dir'], fn) t = tarfile.open(fn_path) update_approx_tarballs_size(info, os.path.getsize(fn_path)) for member in t.getmembers(): update_approx_pkgs_size(info, member.size) if member.type == tarfile.DIRTYPE: continue mname = member.name if not mname.split('/')[0] in ['info', 'recipe']: map_members_scase[mname].add(fn) key = mname.lower() if key not in map_members_icase: map_members_icase[key] = {'files':set(), 'fns':set()} map_members_icase[key]['files'].add(mname) map_members_icase[key]['fns'].add(fn) t.close() for member in map_members_scase: fns = map_members_scase[member] msg_str = "File '%s' found in multiple packages: %s" % ( member, ', '.join(fns)) if len(fns) > 1: if info.get('ignore_duplicate_files'): print('Warning: {}'.format(msg_str)) else: sys.exit('Error: {}'.format(msg_str)) for member in map_members_icase: # Some filesystems are not case sensitive by default (e.g HFS) # Throw warning on linux and error out on macOS/windows fns = map_members_icase[member]['fns'] files = list(map_members_icase[member]['files']) msg_str = "Files %s found in the package(s): %s" % ( str(files)[1:-1], ', '.join(fns)) if len(files) > 1: if (info.get('ignore_duplicate_files') or info['_platform'].startswith('linux')): print('Warning: {}'.format(msg_str)) else: sys.exit('Error: {}'.format(msg_str))
def export_tar(tree, storage, output, compression=None): """ Export a tree in tar format. """ mode = 'w' if compression in ('gz', 'bz2', 'xz'): mode += ':' + compression with tarfile.open(output, mode) as tar: for fullname, item in walk_tree(storage, tree): payload = None info = tarfile.TarInfo() info.name = fullname.decode('utf-8', 'ignore') if item.type == 'blob': payload = storage.get_blob(item.ref).blob info.type = tarfile.REGTYPE info.size = item['size'] printer.verbose('Adding to {out}: <b>{fn}</b> ({size})', out=output, fn=fullname.decode('utf-8', errors='ignore'), size=humanize.naturalsize(item['size'], binary=True)) elif item.type == 'tree': info.type = tarfile.DIRTYPE printer.verbose('Adding to {out}: <b>{fn}</b> (directory)', out=output, fn=fullname.decode('utf-8', errors='ignore')) else: if item['filetype'] == 'link': info.type = tarfile.SYMTYPE info.linkname = item['link'] printer.verbose('Adding to {out}: <b>{fn}</b> (link to {link})', out=output, fn=fullname.decode('utf-8', errors='ignore'), link=item['link'].decode('utf-8', errors='replace')) elif item['filetype'] == 'fifo': info.type = tarfile.FIFOTYPE printer.verbose('Adding to {out}: <b>{fn}</b> (fifo)', out=output, fn=fullname.decode('utf-8', errors='ignore')) else: continue # Ignore unknown file types # Set optional attributes: info.mode = item.get('mode') info.uid = item.get('uid') info.gid = item.get('gid') info.mtime = item.get('mtime') # Add the item into the tar file: tar.addfile(info, payload)
def make_tar(tfn, source_dirs, ignore_path=[]): ''' Make a zip file `fn` from the contents of source_dis. ''' # selector function def select(fn): rfn = realpath(fn) for p in ignore_path: if p.endswith('/'): p = p[:-1] if rfn.startswith(p): return False if rfn in python_files: return False return not is_blacklist(fn) # get the files and relpath file of all the directory we asked for files = [] for sd in source_dirs: sd = realpath(sd) compile_dir(sd) files += [(x, relpath(realpath(x), sd)) for x in listfiles(sd) if select(x)] # create tar.gz of thoses files tf = tarfile.open(tfn, 'w:gz', format=tarfile.USTAR_FORMAT) dirs = [] for fn, afn in files: # print('%s: %s' % (tfn, fn)) dn = dirname(afn) if dn not in dirs: # create every dirs first if not exist yet d = '' for component in split(dn): d = join(d, component) if d.startswith('/'): d = d[1:] if d == '' or d in dirs: continue dirs.append(d) tinfo = tarfile.TarInfo(d) tinfo.type = tarfile.DIRTYPE tf.addfile(tinfo) # put the file tf.add(fn, afn) tf.close()
def import_archives(self, image, *archives): paths = set() args = ['import', '-', image] proc = DockerProcess(self, args, stdin=PIPE) tar_out = tarfile.open(fileobj=proc.stdin, mode='w|') def layout_filter(obj, arc): if not obj.name.startswith(os.sep): obj.name = "/{}".format(obj.name) obj.name = os.path.normpath(obj.name) if arc.prefix: obj.name = os.path.join( arc.prefix, obj.name.lstrip(os.sep)) if obj.name.endswith('/') and len(obj.name) > 1: obj.name = obj.name[:-1] if obj.type == tarfile.LNKTYPE and obj.linkname: if obj.linkname.startswith(".{}".format(os.sep)): obj.linkname = obj.linkname[1:] if obj.linkname.startswith(os.sep) and arc.prefix: obj.linkname = os.path.join( arc.prefix, obj.linkname.lstrip(os.sep)) if obj.linkname.endswith('/') and len(obj.linkname) > 1: obj.linkname = obj.linkname[:-1] return obj for archive in archives: self.logger.info("Importing archive \"%s\" into image \"%s:%s\"", archive, image, archive.prefix or '/') tar_in = tarfile.open(name=archive.path, mode='r') if archive.prefix: segments = [os.sep] for segment in os.path.dirname(archive.prefix).split(os.sep): segments.append(segment) path = os.path.join(*segments) if path in paths: continue tarinfo = tarfile.TarInfo(path) tarinfo.mode = 0o755 tarinfo.uid = 0 tarinfo.gid = 0 tarinfo.type = tarfile.DIRTYPE tar_out.addfile(tarinfo) paths.add(tarinfo.name) for tarinfo in [layout_filter(obj, archive) for obj in tar_in]: # pylint: disable=cell-var-from-loop if tarinfo.name in paths: continue paths.add(tarinfo.name) if tarinfo.isreg(): tar_out.addfile( tarinfo, fileobj=tar_in.extractfile(tarinfo)) else: tar_out.addfile(tarinfo) tar_out.close() proc.stdin.close() if proc.wait() != 0: raise ExternalProcessError( "Error importing archives \"{}\" in image \"{}\"".format(archives, image), proc)