我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_EXCL。
def write_pid_to_pidfile(pidfile_path): """ Write the PID in the named PID file. Get the numeric process ID (“PID”) of the current process and write it to the named file as a line of text. """ open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY) open_mode = 0o644 pidfile_fd = os.open(pidfile_path, open_flags, open_mode) pidfile = os.fdopen(pidfile_fd, 'w') # According to the FHS 2.3 section on PID files in /var/run: # # The file must consist of the process identifier in # ASCII-encoded decimal, followed by a newline character. For # example, if crond was process number 25, /var/run/crond.pid # would contain three characters: two, five, and newline. pid = os.getpid() pidfile.write("%s\n" % pid) pidfile.close()
def do_magic(self): if OS_WIN: try: if os.path.exists(LOCK_PATH): os.unlink(LOCK_PATH) self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR) except EnvironmentError as err: if err.errno == 13: self.is_running = True else: raise else: try: self.fh = open(LOCK_PATH, 'w') fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB) except EnvironmentError as err: if self.fh is not None: self.is_running = True else: raise
def createTempFile(self): attr = (os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_NOINHERIT", 0) | getattr(os, "O_NOFOLLOW", 0)) tries = 0 self.fh = -1 while True: self.tmpname = os.path.join(self.mbox.path, "tmp", _generateMaildirName()) try: self.fh = self.osopen(self.tmpname, attr, 0600) return None except OSError: tries += 1 if tries > 500: self.defer.errback(RuntimeError("Could not create tmp file for %s" % self.mbox.path)) self.defer = None return None
def create_file(self, name, excl=False, mode="wb", **kwargs): """Creates a file with the given name in this storage. :param name: the name for the new file. :param excl: if True, try to open the file in "exclusive" mode. :param mode: the mode flags with which to open the file. The default is ``"wb"``. :return: a :class:`whoosh.filedb.structfile.StructFile` instance. """ if self.readonly: raise ReadOnlyError path = self._fpath(name) if excl: flags = os.O_CREAT | os.O_EXCL | os.O_RDWR if hasattr(os, "O_BINARY"): flags |= os.O_BINARY fd = os.open(path, flags) fileobj = os.fdopen(fd, mode) else: fileobj = open(path, mode) f = StructFile(fileobj, name=name, **kwargs) return f
def _convert_pflags(self, pflags): "convert SFTP-style open() flags to python's os.open() flags" if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE): flags = os.O_RDWR elif pflags & SFTP_FLAG_WRITE: flags = os.O_WRONLY else: flags = os.O_RDONLY if pflags & SFTP_FLAG_APPEND: flags |= os.O_APPEND if pflags & SFTP_FLAG_CREATE: flags |= os.O_CREAT if pflags & SFTP_FLAG_TRUNC: flags |= os.O_TRUNC if pflags & SFTP_FLAG_EXCL: flags |= os.O_EXCL return flags
def safe_open(path, mode="w", chmod=None, buffering=None): """Safely open a file. :param str path: Path to a file. :param str mode: Same os `mode` for `open`. :param int chmod: Same as `mode` for `os.open`, uses Python defaults if ``None``. :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python defaults if ``None``. """ # pylint: disable=star-args open_args = () if chmod is None else (chmod,) fdopen_args = () if buffering is None else (buffering,) return os.fdopen( os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args), mode, *fdopen_args)
def touch(self, mode=0o666, exist_ok=True): """ Create this file with the given access mode, if it doesn't exist. """ if self._closed: self._raise_closed() if exist_ok: # First try to bump modification time # Implementation note: GNU touch uses the UTIME_NOW option of # the utimensat() / futimens() functions. try: self._accessor.utime(self, None) except OSError: # Avoid exception chaining pass else: return flags = os.O_CREAT | os.O_WRONLY if not exist_ok: flags |= os.O_EXCL fd = self._raw_open(flags, mode) os.close(fd)
def _obtain_lock_or_raise(self): """Create a lock file as flag for other instances, mark our instance as lock-holder :raise IOError: if a lock was already present or a lock file could not be written""" if self._has_lock(): return lock_file = self._lock_file_path() if osp.isfile(lock_file): raise IOError("Lock for file %r did already exist, delete %r in case the lock is illegal" % (self._file_path, lock_file)) try: flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL if is_win: flags |= os.O_SHORT_LIVED fd = os.open(lock_file, flags, 0) os.close(fd) except OSError as e: raise IOError(str(e)) self._owns_lock = True
def lock(lockfile): def decorator(clbl): def wrapper(*args, **kwargs): try: # Create or fail os.open(lockfile, os.O_CREAT | os.O_EXCL) except OSError: raise BackupError( "Another backup/restore process already running." " If it is not, try to remove `{0}` and " "try again.".format(lockfile)) try: result = clbl(*args, **kwargs) finally: os.unlink(lockfile) return result return wrapper return decorator
def lock(lockfile): def decorator(clbl): def wrapper(*args, **kwargs): try: # Create or fail os.open(lockfile, os.O_CREAT | os.O_EXCL) except OSError: raise BackupError("Another backup process already running." " If it is not, try to remove `{0}` and " "try again.".format(lockfile)) try: result = clbl(*args, **kwargs) finally: os.unlink(lockfile) return result return wrapper return decorator
def touch(self, mode=0o666, exist_ok=True): """ Create this file with the given access mode, if it doesn't exist. """ if exist_ok: # First try to bump modification time # Implementation note: GNU touch uses the UTIME_NOW option of # the utimensat() / futimens() functions. t = time.time() try: self._accessor.utime(self, (t, t)) except OSError: # Avoid exception chaining pass else: return flags = os.O_CREAT | os.O_WRONLY if not exist_ok: flags |= os.O_EXCL fd = self._raw_open(flags, mode) os.close(fd)