Python builtins 模块,open() 实例源码


项目:octoconf    作者:andras-tim    | 项目源码 | 文件源码
def patch_open_read(files):
    Patch open() command and mock read() results

    :type files: dict
    files = {os.path.abspath(path): content for path, content in files.items()}

    def mock_open_wrapper(path, *args, **kwargs):
        __tracebackhide__ = True  # pylint: disable=unused-variable

        assert path in files, 'try to open a non-mocked path\n    desired={desired!r}\n    mocked={mocked!r}'.format(
            desired=path, mocked=files.keys())

        open_mock = mock.mock_open(read_data=files[path])
        return open_mock(path, *args, **kwargs)

    return mock.patch(__get_open_ref(), mock_open_wrapper)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, stream, errors='strict'):

        """ Creates a StreamWriter instance.

            stream must be a file-like object open for writing.

            The StreamWriter may use different error handling
            schemes by providing the errors keyword argument. These
            parameters are predefined:

             'strict' - raise a ValueError (or a subclass)
             'ignore' - ignore the character and continue with the next
             'replace'- replace with a suitable replacement character
             'xmlcharrefreplace' - Replace with the appropriate XML
                                   character reference.
             'backslashreplace'  - Replace with backslashed escape
             'namereplace'       - Replace with \\N{...} escape sequences.

            The set of allowed parameter values can be extended via
        """ = stream
        self.errors = errors
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, stream, errors='strict'):

        """ Creates a StreamReader instance.

            stream must be a file-like object open for reading.

            The StreamReader may use different error handling
            schemes by providing the errors keyword argument. These
            parameters are predefined:

             'strict' - raise a ValueError (or a subclass)
             'ignore' - ignore the character and continue with the next
             'replace'- replace with a suitable replacement character
             'backslashreplace' - Replace with backslashed escape sequences;

            The set of allowed parameter values can be extended via
        """ = stream
        self.errors = errors
        self.bytebuffer = b""
        self._empty_charbuffer = self.charbuffertype()
        self.charbuffer = self._empty_charbuffer
        self.linebuffer = None
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def open(filename):
    # FIXME: modify to return a WalImageFile instance instead of
    # plain Image object ?

    if hasattr(filename, "read"):
        fp = filename
        fp =, "rb")

    # read header fields
    header =
    size = i32(header, 32), i32(header, 36)
    offset = i32(header, 40)

    # load pixel data

    im = Image.frombytes("P", size,[0] * size[1]))

    im.format = "WAL"
    im.format_description = "Quake2 Texture"

    # strings are null-terminated["name"] = header[:32].split(b"\0", 1)[0]
    next_name = header[56:56+32].split(b"\0", 1)[0]
    if next_name:["next_name"] = next_name

    return im
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def open(fp, mode="r"):
    Load texture from a GD image file.

    :param filename: GD file name, or an opened file handle.
    :param mode: Optional mode.  In this version, if the mode argument
        is given, it must be "r".
    :returns: An image instance.
    :raises IOError: If the image could not be read.
    if mode != "r":
        raise ValueError("bad mode")

    if isPath(fp):
        filename = fp
        fp =, "rb")
        filename = ""

        return GdImageFile(fp, filename)
    except SyntaxError:
        raise IOError("cannot identify this image file")
项目:octoconf    作者:andras-tim    | 项目源码 | 文件源码
def __get_open_ref():
    :rtype str
    # pylint: disable=import-error,redefined-builtin,unused-import,unused-variable
    global __OPEN_REF

    if __OPEN_REF:
        return __OPEN_REF

        from builtins import open
        __OPEN_REF = ''
    except ImportError:
        from __builtin__ import open
        __OPEN_REF = ''

    return __OPEN_REF
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def open(fp, mode="r"):
    Load texture from a GD image file.

    :param filename: GD file name, or an opened file handle.
    :param mode: Optional mode.  In this version, if the mode argument
        is given, it must be "r".
    :returns: An image instance.
    :raises IOError: If the image could not be read.
    if mode != "r":
        raise ValueError("bad mode")

    if isPath(fp):
        filename = fp
        fp =, "rb")
        filename = ""

        return GdImageFile(fp, filename)
    except SyntaxError:
        raise IOError("cannot identify this image file")
项目:ampersand    作者:natejms    | 项目源码 | 文件源码
def plugin_remove(self, name):
        root = relative(self.root)

            # Delete the directory containing the plugin
            print("Removing plugin '%s'" % name)
            rmtree(root(self.config["modules"], name ))
        except FileNotFoundError:
        except (IOError, OSError) as e:
            print("Couldn't remove plugin. You may need to delete it manually.")

            # Update _ampersand.json by adding the plugin
            with open(root("_ampersand.json"), "w", encoding="utf-8") as updated:
                    updated.write(json.dumps(self.config, indent=4, ensure_ascii=False))
        except KeyError:
            print("Failed to remove plugin '%s' as it is not installed." % name)
项目:meme_get    作者:memegen    | 项目源码 | 文件源码
def _read_cache(self):
        """ Read the saved cache file (no side effects)
        Read a tuple containing the data

        if os.path.isfile(self._filepath()):
            # Read the file in using Pickle
            file_obj = open(self._filepath(), 'rb')
            data = pickle.load(file_obj)
            # print(data)
            # self._read_data_tuple(data)
            return data
            raise OSError("No cache exists.")
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def open(fp, mode="r"):
    Load texture from a GD image file.

    :param filename: GD file name, or an opened file handle.
    :param mode: Optional mode.  In this version, if the mode argument
        is given, it must be "r".
    :returns: An image instance.
    :raises IOError: If the image could not be read.
    if mode != "r":
        raise ValueError("bad mode")

    if isPath(fp):
        filename = fp
        fp =, "rb")
        filename = ""

        return GdImageFile(fp, filename)
    except SyntaxError:
        raise IOError("cannot identify this image file")
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def open(filename):
    # FIXME: modify to return a WalImageFile instance instead of
    # plain Image object ?

    if hasattr(filename, "read"):
        fp = filename
        fp =, "rb")

    # read header fields
    header =
    size = i32(header, 32), i32(header, 36)
    offset = i32(header, 40)

    # load pixel data

    im = Image.frombytes("P", size,[0] * size[1]))

    im.format = "WAL"
    im.format_description = "Quake2 Texture"

    # strings are null-terminated["name"] = header[:32].split(b"\0", 1)[0]
    next_name = header[56:56+32].split(b"\0", 1)[0]
    if next_name:["next_name"] = next_name

    return im
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def __init__(self, stream, errors='strict'):

        """ Creates a StreamWriter instance.

            stream must be a file-like object open for writing.

            The StreamWriter may use different error handling
            schemes by providing the errors keyword argument. These
            parameters are predefined:

             'strict' - raise a ValueError (or a subclass)
             'ignore' - ignore the character and continue with the next
             'replace'- replace with a suitable replacement character
             'xmlcharrefreplace' - Replace with the appropriate XML
                                   character reference.
             'backslashreplace'  - Replace with backslashed escape
             'namereplace'       - Replace with \\N{...} escape sequences.

            The set of allowed parameter values can be extended via
        """ = stream
        self.errors = errors
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def __init__(self, stream, errors='strict'):

        """ Creates a StreamReader instance.

            stream must be a file-like object open for reading.

            The StreamReader may use different error handling
            schemes by providing the errors keyword argument. These
            parameters are predefined:

             'strict' - raise a ValueError (or a subclass)
             'ignore' - ignore the character and continue with the next
             'replace'- replace with a suitable replacement character
             'backslashreplace' - Replace with backslashed escape sequences;

            The set of allowed parameter values can be extended via
        """ = stream
        self.errors = errors
        self.bytebuffer = b""
        self._empty_charbuffer = self.charbuffertype()
        self.charbuffer = self._empty_charbuffer
        self.linebuffer = None
项目:KiField    作者:xesscorp    | 项目源码 | 文件源码
def csvfile_to_wb(csv_filename):
    '''Open a CSV file and return an openpyxl workbook.'''

        'Converting CSV file {} into an XLSX workbook.'.format(csv_filename))

    with open(csv_filename) as csv_file:
        dialect = csv.Sniffer().sniff(
        if USING_PYTHON2:
            for attr in dir(dialect):
                a = getattr(dialect, attr)
                if type(a) == unicode:
                    setattr(dialect, attr, bytes(a))
        reader = csv.reader(csv_file, dialect)
        wb = pyxl.Workbook()
        ws =
        for row_index, row in enumerate(reader, 1):
            for column_index, cell in enumerate(row, 1):
                if cell not in ('', None):
                    ws.cell(row=row_index, column=column_index).value = cell
    return (wb, dialect)
项目:awslogin    作者:byu-oit    | 项目源码 | 文件源码
def write_to_config_file(profile, net_id, region, role, account):
    if profile != 'default':
        profile = 'profile {}'.format(profile)
    file = _aws_file('config')
    one_hour = datetime.timedelta(hours=1)
    expires = + one_hour
    config = _open_config_file(file)
    if not net_id and config.get(profile, 'adfs_netid', fallback=None):
        net_id = config[profile]['adfs_netid']
    config[profile] = {
        'region': region,
        'adfs_role': "{}@{}".format(role, account),
        'adfs_expires': expires.strftime('%m-%d-%Y %H:%M')
    if net_id:
        config[profile]['adfs_netid'] = net_id
    with open(file, 'w') as configfile:
项目:buildstrap    作者:guyzmo    | 项目源码 | 文件源码
def mocked_open(self, target):
        import builtins
        import buildstrap.buildstrap

        buf = StringIO()
        orig_open =

        def mock_open(fname, *args, **kwarg):
            if fname == target:
                # mimic open() by sending buffer as a generator
                return buf
                return orig_open(fname, *args, **kwarg) = mock_open
        yield buf = orig_open
项目:skidl    作者:xesscorp    | 项目源码 | 文件源码
def load_backup_lib():
    """Load a backup library that stores the parts used in the circuit."""

    global backup_lib

    # Don't keep reloading the backup library once it's loaded.
    if not backup_lib:
            # The backup library is a SKiDL lib stored as a Python module.
            # Copy the backup library in the local storage to the global storage.
            backup_lib = locals()[BACKUP_LIB_NAME]

        except (FileNotFoundError, ImportError, NameError, IOError) as e:

    return backup_lib
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open bzip2 compressed tar archive name for reading or writing.
           Appending is not allowed.
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'.")

            import bz2
        except ImportError:
            raise CompressionError("bz2 module is not available")

        if fileobj is not None:
            fileobj = _BZ2Proxy(fileobj, mode)
            fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)

            t = cls.taropen(name, mode, fileobj, **kwargs)
        except (IOError, EOFError):
            raise ReadError("not a bzip2 file")
        t._extfileobj = False
        return t

    # All *open() methods are registered here.
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __getattr__(self, name,

        """ Inherit all other methods from the underlying stream.
        return getattr(, name)

    # these are needed to make "with" work properly
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def open(filename, mode='r', encoding=None, errors='strict', buffering=1):

    """ Open an encoded file using the given mode and return
        a wrapped version providing transparent encoding/decoding.

        Note: The wrapped version will only accept the object format
        defined by the codecs, i.e. Unicode objects for most builtin
        codecs. Output is also codec dependent and will usually be
        Unicode as well.

        Underlying encoded files are always opened in binary mode.
        The default file mode is 'r', meaning to open the file in read mode.

        encoding specifies the encoding which is to be used for the

        errors may be given to define the error handling. It defaults
        to 'strict' which causes ValueErrors to be raised in case an
        encoding error occurs.

        buffering has the same meaning as for the builtin open() API.
        It defaults to line buffered.

        The returned wrapped file object provides an extra attribute
        .encoding which allows querying the used encoding. This
        attribute is only available if an encoding was specified as

    if encoding is not None and \
       'b' not in mode:
        # Force opening of the file in binary mode
        mode = mode + 'b'
    file =, mode, buffering)
    if encoding is None:
        return file
    info = lookup(encoding)
    srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors)
    # Add attributes to simplify introspection
    srw.encoding = encoding
    return srw
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def open(filename):
    """Open a file in read only mode using the encoding detected by
    buffer = _builtin_open(filename, 'rb')
        encoding, lines = detect_encoding(buffer.readline)
        text = TextIOWrapper(buffer, encoding, line_buffering=True)
        text.mode = 'r'
        return text
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open bzip2 compressed tar archive name for reading or writing.
           Appending is not allowed.
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'.")

            import bz2
        except ImportError:
            raise CompressionError("bz2 module is not available")

        if fileobj is not None:
            fileobj = _BZ2Proxy(fileobj, mode)
            fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)

            t = cls.taropen(name, mode, fileobj, **kwargs)
        except (IOError, EOFError):
            raise ReadError("not a bzip2 file")
        t._extfileobj = False
        return t

    # All *open() methods are registered here.
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def open(fp, mode="r"):

    if mode != "r":
        raise ValueError("bad mode")

    if isPath(fp):
        filename = fp
        fp =, "rb")
        filename = ""

        return GdImageFile(fp, filename)
    except SyntaxError:
        raise IOError("cannot identify this image file")
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def frombytes(mode, size, data, decoder_name="raw", *args):
    Creates a copy of an image memory from pixel data in a buffer.

    In its simplest form, this function takes three arguments
    (mode, size, and unpacked pixel data).

    You can also use any pixel decoder supported by PIL.  For more
    information on available decoders, see the section
    :ref:`Writing Your Own File Decoder <file-decoders>`.

    Note that this function decodes pixel data only, not entire images.
    If you have an entire image in a string, wrap it in a
    :py:class:`~io.BytesIO` object, and use :py:func:`` to load

    :param mode: The image mode. See: :ref:`concept-modes`.
    :param size: The image size.
    :param data: A byte buffer containing raw data for the given mode.
    :param decoder_name: What decoder to use.
    :param args: Additional parameters for the given decoder.
    :returns: An :py:class:`~PIL.Image.Image` object.

    # may pass tuple instead of argument list
    if len(args) == 1 and isinstance(args[0], tuple):
        args = args[0]

    if decoder_name == "raw" and args == ():
        args = mode

    im = new(mode, size)
    im.frombytes(data, decoder_name, args)
    return im
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def frombytes(mode, size, data, decoder_name="raw", *args):
    Creates a copy of an image memory from pixel data in a buffer.

    In its simplest form, this function takes three arguments
    (mode, size, and unpacked pixel data).

    You can also use any pixel decoder supported by PIL.  For more
    information on available decoders, see the section
    :ref:`Writing Your Own File Decoder <file-decoders>`.

    Note that this function decodes pixel data only, not entire images.
    If you have an entire image in a string, wrap it in a
    :py:class:`~io.BytesIO` object, and use :py:func:`` to load

    :param mode: The image mode. See: :ref:`concept-modes`.
    :param size: The image size.
    :param data: A byte buffer containing raw data for the given mode.
    :param decoder_name: What decoder to use.
    :param args: Additional parameters for the given decoder.
    :returns: An :py:class:`~PIL.Image.Image` object.


    # may pass tuple instead of argument list
    if len(args) == 1 and isinstance(args[0], tuple):
        args = args[0]

    if decoder_name == "raw" and args == ():
        args = mode

    im = new(mode, size)
    im.frombytes(data, decoder_name, args)
    return im
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open bzip2 compressed tar archive name for reading or writing.
           Appending is not allowed.
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'.")

            import bz2
        except ImportError:
            raise CompressionError("bz2 module is not available")

        if fileobj is not None:
            fileobj = _BZ2Proxy(fileobj, mode)
            fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)

            t = cls.taropen(name, mode, fileobj, **kwargs)
        except (IOError, EOFError):
            raise ReadError("not a bzip2 file")
        t._extfileobj = False
        return t

    # All *open() methods are registered here.
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open bzip2 compressed tar archive name for reading or writing.
           Appending is not allowed.
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'.")

            import bz2
        except ImportError:
            raise CompressionError("bz2 module is not available")

        if fileobj is not None:
            fileobj = _BZ2Proxy(fileobj, mode)
            fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)

            t = cls.taropen(name, mode, fileobj, **kwargs)
        except (IOError, EOFError):
            raise ReadError("not a bzip2 file")
        t._extfileobj = False
        return t

    # All *open() methods are registered here.
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open bzip2 compressed tar archive name for reading or writing.
           Appending is not allowed.
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'.")

            import bz2
        except ImportError:
            raise CompressionError("bz2 module is not available")

        if fileobj is not None:
            fileobj = _BZ2Proxy(fileobj, mode)
            fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)

            t = cls.taropen(name, mode, fileobj, **kwargs)
        except (IOError, EOFError):
            raise ReadError("not a bzip2 file")
        t._extfileobj = False
        return t

    # All *open() methods are registered here.
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open bzip2 compressed tar archive name for reading or writing.
           Appending is not allowed.
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'.")

            import bz2
        except ImportError:
            raise CompressionError("bz2 module is not available")

        if fileobj is not None:
            fileobj = _BZ2Proxy(fileobj, mode)
            fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)

            t = cls.taropen(name, mode, fileobj, **kwargs)
        except (IOError, EOFError):
            raise ReadError("not a bzip2 file")
        t._extfileobj = False
        return t

    # All *open() methods are registered here.
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd =, mode, 0o666)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _check(self, mode=None):
        """Check if TarFile is still open, and if the operation's mode
           corresponds to TarFile's mode.
        if self.closed:
            raise IOError("%s is closed" % self.__class__.__name__)
        if mode is not None and self.mode not in mode:
            raise IOError("bad operation for mode %r" % self.mode)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def is_tarfile(name):
    """Return True if name points to a tar archive that we
       are able to handle, else return False.
        t = open(name)
        return True
    except TarError:
        return False