我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用tarfile.LNKTYPE。
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed") finally: tar.close()
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertEqual(link, l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertEqual(name, n, "PAX longname creation failed") finally: tar.close()
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) tar.addfile(tarinfo) tar.close() tar = tarfile.open(tmpname) try: if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed") finally: tar.close()
def _test(self, name, link=None): tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w") tar.format = tarfile.GNU_FORMAT tar.addfile(tarinfo) v1 = self._calc_size(name, link) v2 = tar.offset self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") tar.close() tar = tarfile.open(tmpname) member = tar.next() self.assertIsNotNone(member, "unable to read longname member") self.assertEqual(tarinfo.name, member.name, "unable to read longname member") self.assertEqual(tarinfo.linkname, member.linkname, "unable to read longname member")
def _test(self, name, link=None): # See GNUWriteTest. tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) tar.addfile(tarinfo) tar.close() tar = tarfile.open(tmpname) if link: l = tar.getmembers()[0].linkname self.assertTrue(link == l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name self.assertTrue(name == n, "PAX longname creation failed")
def _test(self, name, link=None): tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w") try: tar.format = tarfile.GNU_FORMAT tar.addfile(tarinfo) v1 = self._calc_size(name, link) v2 = tar.offset self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") finally: tar.close() tar = tarfile.open(tmpname) try: member = tar.next() self.assertIsNotNone(member, "unable to read longname member") self.assertEqual(tarinfo.name, member.name, "unable to read longname member") self.assertEqual(tarinfo.linkname, member.linkname, "unable to read longname member") finally: tar.close()
def test_add_hardlink(self): tarinfo = self.tar.gettarinfo(self.bar) self.assertTrue(tarinfo.type == tarfile.LNKTYPE, "add file as hardlink failed")
def _test(self, name, link=None): tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w") try: tar.format = tarfile.GNU_FORMAT tar.addfile(tarinfo) v1 = self._calc_size(name, link) v2 = tar.offset self.assertEqual(v1, v2, "GNU longname/longlink creation failed") finally: tar.close() tar = tarfile.open(tmpname) try: member = tar.next() self.assertIsNotNone(member, "unable to read longname member") self.assertEqual(tarinfo.name, member.name, "unable to read longname member") self.assertEqual(tarinfo.linkname, member.linkname, "unable to read longname member") finally: tar.close()
def test_add_hardlink(self): tarinfo = self.tar.gettarinfo(self.bar) self.assertEqual(tarinfo.type, tarfile.LNKTYPE, "add file as hardlink failed")
def _test(self, name, link=None): tarinfo = tarfile.TarInfo(name) if link: tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE tar = tarfile.open(tmpname, "w") tar.format = tarfile.GNU_FORMAT tar.addfile(tarinfo) v1 = self._calc_size(name, link) v2 = tar.offset self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") tar.close() tar = tarfile.open(tmpname) try: member = tar.next() self.assertIsNotNone(member, "unable to read longname member") self.assertEqual(tarinfo.name, member.name, "unable to read longname member") self.assertEqual(tarinfo.linkname, member.linkname, "unable to read longname member") finally: tar.close()
def copy_path(self, container_src, container_dst, src, dst=None, dst_exec=False, processor=None): if dst is None: dst = os.path.join(*os.path.dirname(src).split(os.sep)) if processor is None: processor = lambda x: x self.logger.info("Copying files from container \"%s:%s\" to container \"%s:%s\"", container_src, src, container_dst, dst) if src.endswith('/'): src_prefix = os.path.dirname(src).split(os.sep)[-1] + '/' else: src_prefix = '' def layout_filter(obj): if dst is not None: if len(obj.name) > len(src_prefix): obj.name = os.path.join(dst, obj.name[len(src_prefix):]) else: obj.name = dst if obj.type == tarfile.LNKTYPE: if len(obj.linkname) > len(src_prefix): obj.linkname = os.path.join( dst, obj.linkname[len(src_prefix):]) else: obj.linkname = dst return obj p_in = DockerProcess(self, ['cp', "{}:{}".format( container_src, src), "-"], stdout=PIPE) if dst_exec: p_out = DockerProcess(self, ['exec', '-i', container_dst, "tar", "-xpf", "-", "-C", "/"], stdin=PIPE) else: p_out = DockerProcess(self, ['cp', "-", "{}:/".format(container_dst)], stdin=PIPE) tar_in = tarfile.open(fileobj=p_in.stdout, mode='r|') tar_out = tarfile.open(fileobj=p_out.stdin, mode='w|') for tarinfo in tar_in: tarinfo = processor(layout_filter(tarinfo)) if tarinfo is None: continue if tarinfo.isreg(): tar_out.addfile(tarinfo, fileobj=tar_in.extractfile(tarinfo)) else: tar_out.addfile(tarinfo) tar_in.close() tar_out.close() p_in.stdout.close() p_out.stdin.close() if p_in.wait() != 0: raise ExternalProcessError( "Error processing path on container \"{}\"".format(container_src), p_in) if p_out.wait() != 0: raise ExternalProcessError( "Error processing path on container \"{}\"".format(container_dst), p_out)
def import_archives(self, image, *archives): paths = set() args = ['import', '-', image] proc = DockerProcess(self, args, stdin=PIPE) tar_out = tarfile.open(fileobj=proc.stdin, mode='w|') def layout_filter(obj, arc): if not obj.name.startswith(os.sep): obj.name = "/{}".format(obj.name) obj.name = os.path.normpath(obj.name) if arc.prefix: obj.name = os.path.join( arc.prefix, obj.name.lstrip(os.sep)) if obj.name.endswith('/') and len(obj.name) > 1: obj.name = obj.name[:-1] if obj.type == tarfile.LNKTYPE and obj.linkname: if obj.linkname.startswith(".{}".format(os.sep)): obj.linkname = obj.linkname[1:] if obj.linkname.startswith(os.sep) and arc.prefix: obj.linkname = os.path.join( arc.prefix, obj.linkname.lstrip(os.sep)) if obj.linkname.endswith('/') and len(obj.linkname) > 1: obj.linkname = obj.linkname[:-1] return obj for archive in archives: self.logger.info("Importing archive \"%s\" into image \"%s:%s\"", archive, image, archive.prefix or '/') tar_in = tarfile.open(name=archive.path, mode='r') if archive.prefix: segments = [os.sep] for segment in os.path.dirname(archive.prefix).split(os.sep): segments.append(segment) path = os.path.join(*segments) if path in paths: continue tarinfo = tarfile.TarInfo(path) tarinfo.mode = 0o755 tarinfo.uid = 0 tarinfo.gid = 0 tarinfo.type = tarfile.DIRTYPE tar_out.addfile(tarinfo) paths.add(tarinfo.name) for tarinfo in [layout_filter(obj, archive) for obj in tar_in]: # pylint: disable=cell-var-from-loop if tarinfo.name in paths: continue paths.add(tarinfo.name) if tarinfo.isreg(): tar_out.addfile( tarinfo, fileobj=tar_in.extractfile(tarinfo)) else: tar_out.addfile(tarinfo) tar_out.close() proc.stdin.close() if proc.wait() != 0: raise ExternalProcessError( "Error importing archives \"{}\" in image \"{}\"".format(archives, image), proc)