Python tarfile 模块,LNKTYPE 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用tarfile.LNKTYPE

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        tar.addfile(tarinfo)
        tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        tar.format = tarfile.GNU_FORMAT
        tar.addfile(tarinfo)

        v1 = self._calc_size(name, link)
        v2 = tar.offset
        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")

        tar.close()

        tar = tarfile.open(tmpname)
        member = tar.next()
        self.assertIsNotNone(member,
                "unable to read longname member")
        self.assertEqual(tarinfo.name, member.name,
                "unable to read longname member")
        self.assertEqual(tarinfo.linkname, member.linkname,
                "unable to read longname member")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        tar.addfile(tarinfo)
        tar.close()

        tar = tarfile.open(tmpname)
        if link:
            l = tar.getmembers()[0].linkname
            self.assertTrue(link == l, "PAX longlink creation failed")
        else:
            n = tar.getmembers()[0].name
            self.assertTrue(name == n, "PAX longname creation failed")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        try:
            tar.format = tarfile.GNU_FORMAT
            tar.addfile(tarinfo)

            v1 = self._calc_size(name, link)
            v2 = tar.offset
            self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        try:
            tar.format = tarfile.GNU_FORMAT
            tar.addfile(tarinfo)

            v1 = self._calc_size(name, link)
            v2 = tar.offset
            self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        try:
            tar.format = tarfile.GNU_FORMAT
            tar.addfile(tarinfo)

            v1 = self._calc_size(name, link)
            v2 = tar.offset
            self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        try:
            tar.format = tarfile.GNU_FORMAT
            tar.addfile(tarinfo)

            v1 = self._calc_size(name, link)
            v2 = tar.offset
            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        tar.format = tarfile.GNU_FORMAT
        tar.addfile(tarinfo)

        v1 = self._calc_size(name, link)
        v2 = tar.offset
        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")

        tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        try:
            tar.format = tarfile.GNU_FORMAT
            tar.addfile(tarinfo)

            v1 = self._calc_size(name, link)
            v2 = tar.offset
            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test(self, name, link=None):
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w")
        try:
            tar.format = tarfile.GNU_FORMAT
            tar.addfile(tarinfo)

            v1 = self._calc_size(name, link)
            v2 = tar.offset
            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            member = tar.next()
            self.assertIsNotNone(member,
                    "unable to read longname member")
            self.assertEqual(tarinfo.name, member.name,
                    "unable to read longname member")
            self.assertEqual(tarinfo.linkname, member.linkname,
                    "unable to read longname member")
        finally:
            tar.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_hardlink(self):
        tarinfo = self.tar.gettarinfo(self.bar)
        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
                "add file as hardlink failed")
项目:dockeroo    作者:dockeroo    | 项目源码 | 文件源码
def copy_path(self, container_src, container_dst, src, dst=None, dst_exec=False, processor=None):
        if dst is None:
            dst = os.path.join(*os.path.dirname(src).split(os.sep))
        if processor is None:
            processor = lambda x: x
        self.logger.info("Copying files from container \"%s:%s\" to container \"%s:%s\"",
                         container_src, src, container_dst, dst)
        if src.endswith('/'):
            src_prefix = os.path.dirname(src).split(os.sep)[-1] + '/'
        else:
            src_prefix = ''

        def layout_filter(obj):
            if dst is not None:
                if len(obj.name) > len(src_prefix):
                    obj.name = os.path.join(dst, obj.name[len(src_prefix):])
                else:
                    obj.name = dst
                if obj.type == tarfile.LNKTYPE:
                    if len(obj.linkname) > len(src_prefix):
                        obj.linkname = os.path.join(
                            dst, obj.linkname[len(src_prefix):])
                    else:
                        obj.linkname = dst
            return obj
        p_in = DockerProcess(self, ['cp', "{}:{}".format(
            container_src, src), "-"], stdout=PIPE)
        if dst_exec:
            p_out = DockerProcess(self, ['exec', '-i', container_dst, "tar", "-xpf",
                                         "-", "-C", "/"], stdin=PIPE)
        else:
            p_out = DockerProcess(self, ['cp', "-", "{}:/".format(container_dst)], stdin=PIPE)
        tar_in = tarfile.open(fileobj=p_in.stdout, mode='r|')
        tar_out = tarfile.open(fileobj=p_out.stdin, mode='w|')
        for tarinfo in tar_in:
            tarinfo = processor(layout_filter(tarinfo))
            if tarinfo is None:
                continue
            if tarinfo.isreg():
                tar_out.addfile(tarinfo, fileobj=tar_in.extractfile(tarinfo))
            else:
                tar_out.addfile(tarinfo)
        tar_in.close()
        tar_out.close()
        p_in.stdout.close()
        p_out.stdin.close()
        if p_in.wait() != 0:
            raise ExternalProcessError(
                "Error processing path on container \"{}\"".format(container_src), p_in)
        if p_out.wait() != 0:
            raise ExternalProcessError(
                "Error processing path on container \"{}\"".format(container_dst), p_out)
项目:dockeroo    作者:dockeroo    | 项目源码 | 文件源码
def import_archives(self, image, *archives):
        paths = set()
        args = ['import', '-', image]
        proc = DockerProcess(self, args, stdin=PIPE)
        tar_out = tarfile.open(fileobj=proc.stdin, mode='w|')
        def layout_filter(obj, arc):
            if not obj.name.startswith(os.sep):
                obj.name = "/{}".format(obj.name)
            obj.name = os.path.normpath(obj.name)
            if arc.prefix:
                obj.name = os.path.join(
                    arc.prefix, obj.name.lstrip(os.sep))
            if obj.name.endswith('/') and len(obj.name) > 1:
                obj.name = obj.name[:-1]
            if obj.type == tarfile.LNKTYPE and obj.linkname:
                if obj.linkname.startswith(".{}".format(os.sep)):
                    obj.linkname = obj.linkname[1:]
                if obj.linkname.startswith(os.sep) and arc.prefix:
                    obj.linkname = os.path.join(
                        arc.prefix, obj.linkname.lstrip(os.sep))
                if obj.linkname.endswith('/') and len(obj.linkname) > 1:
                    obj.linkname = obj.linkname[:-1]
            return obj
        for archive in archives:
            self.logger.info("Importing archive \"%s\" into image \"%s:%s\"",
                             archive, image, archive.prefix or '/')

            tar_in = tarfile.open(name=archive.path, mode='r')
            if archive.prefix:
                segments = [os.sep]
                for segment in os.path.dirname(archive.prefix).split(os.sep):
                    segments.append(segment)
                    path = os.path.join(*segments)
                    if path in paths:
                        continue
                    tarinfo = tarfile.TarInfo(path)
                    tarinfo.mode = 0o755
                    tarinfo.uid = 0
                    tarinfo.gid = 0
                    tarinfo.type = tarfile.DIRTYPE
                    tar_out.addfile(tarinfo)
                    paths.add(tarinfo.name)
            for tarinfo in [layout_filter(obj, archive) for obj in tar_in]: # pylint: disable=cell-var-from-loop
                if tarinfo.name in paths:
                    continue
                paths.add(tarinfo.name)
                if tarinfo.isreg():
                    tar_out.addfile(
                        tarinfo, fileobj=tar_in.extractfile(tarinfo))
                else:
                    tar_out.addfile(tarinfo)
        tar_out.close()
        proc.stdin.close()
        if proc.wait() != 0:
            raise ExternalProcessError(
                "Error importing archives \"{}\" in image \"{}\"".format(archives, image), proc)