Python os 模块,close() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.close()。
def main():
'''
Run code specifed by data received over pipe
'''
assert is_forking(sys.argv)
handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb')
process.current_process()._inheriting = True
preparation_data = load(from_parent)
prepare(preparation_data)
self = load(from_parent)
process.current_process()._inheriting = False
from_parent.close()
exitcode = self._bootstrap()
exit(exitcode)
def _dump(self, file=None, format=None):
import tempfile
suffix = ''
if format:
suffix = '.'+format
if not file:
f, file = tempfile.mkstemp(suffix)
os.close(f)
self.load()
if not format or format == "PPM":
self.im.save_ppm(file)
else:
if not file.endswith(format):
file = file + "." + format
self.save(file, format)
return file
def close(self):
"""It is advised to call 'close' on the pipe so both
handles of pipe are closed.
"""
if isinstance(self.stdin, AsyncFile):
self.stdin.close()
self.stdin = None
if self.stdin_rh:
win32pipe.DisconnectNamedPipe(self.stdin_rh)
win32file.CloseHandle(self.stdin_rh)
self.stdin_rh = None
if isinstance(self.stdout, AsyncFile):
self.stdout.close()
self.stdout = None
if isinstance(self.stderr, AsyncFile):
self.stderr.close()
self.stderr = None
def _socketpair():
if hasattr(socket, 'socketpair'):
return socket.socketpair()
srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv_sock.bind(('127.0.0.1', 0))
srv_sock.listen(1)
write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
write_sock.setblocking(False)
try:
write_sock.connect(srv_sock.getsockname()[:2])
except socket.error as e:
if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
pass
else:
raise
write_sock.setblocking(True)
read_sock = srv_sock.accept()[0]
except:
write_sock.close()
raise
finally:
srv_sock.close()
return (read_sock, write_sock)
def terminate(self):
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_write.close()
self.cmd_read.close()
for fd in self._fds.itervalues():
try:
self._poller.unregister(fd._fileno)
except:
logger.warning('unregister of %s failed with %s',
fd._fileno, traceback.format_exc())
fd._notifier = None
self._fds.clear()
self._timeouts = []
if hasattr(self._poller, 'terminate'):
self._poller.terminate()
self._poller = None
self.cmd_read = self.cmd_write = None
def _terminate_task(self, task):
"""Internal use only.
"""
self._lock.acquire()
tid = task._id
task = self._tasks.get(tid, None)
if task is None:
logger.warning('invalid task %s to terminate', tid)
self._lock.release()
return -1
# TODO: if currently waiting I/O or holding locks, warn?
if task._state == Pycos._Running:
logger.warning('task to terminate %s/%s is running', task._name, tid)
else:
self._suspended.discard(tid)
self._scheduled.add(tid)
task._state = Pycos._Scheduled
task._timeout = None
task._callers = []
if self._polling and len(self._scheduled) == 1:
self._notifier.interrupt()
task._exceptions.append((GeneratorExit, GeneratorExit('close')))
self._lock.release()
return 0
def close(self):
"""It is advised to call 'close' on the pipe so both
handles of pipe are closed.
"""
if isinstance(self.stdin, AsyncFile):
self.stdin.close()
self.stdin = None
if self.stdin_rh:
win32pipe.DisconnectNamedPipe(self.stdin_rh)
win32file.CloseHandle(self.stdin_rh)
self.stdin_rh = None
if isinstance(self.stdout, AsyncFile):
self.stdout.close()
self.stdout = None
if isinstance(self.stderr, AsyncFile):
self.stderr.close()
self.stderr = None
def _socketpair():
if hasattr(socket, 'socketpair'):
return socket.socketpair()
srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv_sock.bind(('127.0.0.1', 0))
srv_sock.listen(1)
write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
write_sock.setblocking(False)
try:
write_sock.connect(srv_sock.getsockname()[:2])
except socket.error as e:
if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
pass
else:
raise
write_sock.setblocking(True)
read_sock = srv_sock.accept()[0]
except:
write_sock.close()
raise
finally:
srv_sock.close()
return (read_sock, write_sock)
def terminate(self):
if hasattr(self.cmd_write, 'getsockname'):
self.cmd_write.close()
self.cmd_read.close()
for fd in self._fds.values():
try:
self._poller.unregister(fd._fileno)
except:
logger.warning('unregister of %s failed with %s',
fd._fileno, traceback.format_exc())
fd._notifier = None
self._fds.clear()
self._timeouts = []
if hasattr(self._poller, 'terminate'):
self._poller.terminate()
self._poller = None
self.cmd_read = self.cmd_write = None
def _terminate_task(self, task):
"""Internal use only.
"""
self._lock.acquire()
tid = task._id
task = self._tasks.get(tid, None)
if task is None:
logger.warning('invalid task %s to terminate', tid)
self._lock.release()
return -1
# TODO: if currently waiting I/O or holding locks, warn?
if task._state == Pycos._Running:
logger.warning('task to terminate %s/%s is running', task._name, tid)
else:
self._suspended.discard(tid)
self._scheduled.add(tid)
task._state = Pycos._Scheduled
task._timeout = None
task._callers = []
if self._polling and len(self._scheduled) == 1:
self._notifier.interrupt()
task._exceptions.append((GeneratorExit, GeneratorExit('close')))
self._lock.release()
return 0
def close(self):
"""Close the TarFile. In write-mode, two finishing zero blocks are
appended to the archive.
"""
if self.closed:
return
if self.mode in "aw":
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
# fill up the end with zero-blocks
# (like option -b20 for tar does)
blocks, remainder = divmod(self.offset, RECORDSIZE)
if remainder > 0:
self.fileobj.write(NUL * (RECORDSIZE - remainder))
if not self._extfileobj:
self.fileobj.close()
self.closed = True
def close(self):
"""Close the _Stream object. No operation should be
done on it afterwards.
"""
if self.closed:
return
self.closed = True
try:
if self.mode == "w" and self.comptype != "tar":
self.buf += self.cmp.flush()
if self.mode == "w" and self.buf:
self.fileobj.write(self.buf)
self.buf = b""
if self.comptype == "gz":
self.fileobj.write(struct.pack("<L", self.crc))
self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF))
finally:
if not self._extfileobj:
self.fileobj.close()
def close(self):
"""Close the TarFile. In write-mode, two finishing zero blocks are
appended to the archive.
"""
if self.closed:
return
self.closed = True
try:
if self.mode in ("a", "w", "x"):
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
# fill up the end with zero-blocks
# (like option -b20 for tar does)
blocks, remainder = divmod(self.offset, RECORDSIZE)
if remainder > 0:
self.fileobj.write(NUL * (RECORDSIZE - remainder))
finally:
if not self._extfileobj:
self.fileobj.close()
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
"""Upload copies files to non-native store correctly with no
progress"""
import shutil
import tempfile
temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
temp_src_dir = os.path.dirname(temp_file.name)
temp_dst_dir = tempfile.mkdtemp()
shutil_mock = MagicMock()
shutil_mock.copyfile.return_value = None
with patch('sfctl.custom_app.shutil', new=shutil_mock):
sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
shutil_mock.copyfile.assert_called_once()
temp_file.close()
shutil.rmtree(os.path.dirname(temp_file.name))
shutil.rmtree(temp_dst_dir)
def close(self):
"""Close the TarFile. In write-mode, two finishing zero blocks are
appended to the archive.
"""
if self.closed:
return
if self.mode in "aw":
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
# fill up the end with zero-blocks
# (like option -b20 for tar does)
blocks, remainder = divmod(self.offset, RECORDSIZE)
if remainder > 0:
self.fileobj.write(NUL * (RECORDSIZE - remainder))
if not self._extfileobj:
self.fileobj.close()
self.closed = True
def close(self):
"""Close the TarFile. In write-mode, two finishing zero blocks are
appended to the archive.
"""
if self.closed:
return
if self.mode in "aw":
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
# fill up the end with zero-blocks
# (like option -b20 for tar does)
blocks, remainder = divmod(self.offset, RECORDSIZE)
if remainder > 0:
self.fileobj.write(NUL * (RECORDSIZE - remainder))
if not self._extfileobj:
self.fileobj.close()
self.closed = True
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def importfile(path):
"""Import a Python source file or compiled file given its path."""
magic = imp.get_magic()
file = open(path, 'r')
if file.read(len(magic)) == magic:
kind = imp.PY_COMPILED
else:
kind = imp.PY_SOURCE
file.close()
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
file = open(path, 'r')
try:
module = imp.load_module(name, file, path, (ext, 'r', kind))
except:
raise ErrorDuringImport(path, sys.exc_info())
file.close()
return module
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
subpath = self._lookup(key)
f = open(os.path.join(self._path, subpath), 'r')
try:
if self._factory:
msg = self._factory(f)
else:
msg = MaildirMessage(f)
finally:
f.close()
subdir, name = os.path.split(subpath)
msg.set_subdir(subdir)
if self.colon in name:
msg.set_info(name.split(self.colon)[-1])
msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
return msg
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
path = os.path.join(self._path, str(key))
try:
f = open(path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
self._dump_message(message, f)
if isinstance(message, MHMessage):
self._dump_sequences(message, key)
finally:
if self._locked:
_unlock_file(f)
finally:
_sync_close(f)
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'r+')
else:
f = open(os.path.join(self._path, str(key)), 'r')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
return f.read()
finally:
if self._locked:
_unlock_file(f)
finally:
f.close()
def _get_soname(f):
# assuming GNU binutils / ELF
if not f:
return None
cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
"objdump -p -j .dynamic 2>/dev/null " + f
f = os.popen(cmd)
dump = f.read()
rv = f.close()
if rv == 10:
raise OSError, 'objdump command not found'
f = os.popen(cmd)
try:
data = f.read()
finally:
f.close()
res = re.search(r'\sSONAME\s+([^\s]+)', data)
if not res:
return None
return res.group(1)
def _findLib_ldconfig(name):
# XXX assuming GLIBC's ldconfig (with option -p)
expr = r'/[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
f = os.popen('LC_ALL=C LANG=C /sbin/ldconfig -p 2>/dev/null')
try:
data = f.read()
finally:
f.close()
res = re.search(expr, data)
if not res:
# Hm, this works only for libs needed by the python executable.
cmd = 'ldd %s 2>/dev/null' % sys.executable
f = os.popen(cmd)
try:
data = f.read()
finally:
f.close()
res = re.search(expr, data)
if not res:
return None
return res.group(0)
def run_with_args(args, parser):
# type: (argparse.Namespace, argparse.ArgumentParser) -> int
set_logging_parameters(args, parser)
start_time = time.time()
ret = OK
try:
if args.profile:
outline("Profiling...")
profile("ret = whatstyle(args, parser)", locals(), globals())
else:
ret = whatstyle(args, parser)
except IOError as exc:
# If the output is piped into a pager like 'less' we get a broken pipe when
# the pager is quit early and that is ok.
if exc.errno == errno.EPIPE:
pass
elif str(exc) == 'Stream closed':
pass
else:
raise
if not PY2:
sys.stderr.close()
iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
return ret
def get_terminal_size():
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.env['LINES'], os.env['COLUMNS'])
except:
cr = (25, 80)
return int(cr[1]), int(cr[0])
def grab(bbox=None):
if sys.platform == "darwin":
f, file = tempfile.mkstemp('.png')
os.close(f)
subprocess.call(['screencapture', '-x', file])
im = Image.open(file)
im.load()
os.unlink(file)
else:
size, data = grabber()
im = Image.frombytes(
"RGB", size, data,
# RGB, 32-bit line padding, origo in lower left corner
"raw", "BGR", (size[0]*3 + 3) & -4, -1
)
if bbox:
im = im.crop(bbox)
return im
def close(self):
"""
Closes the file pointer, if possible.
This operation will destroy the image core and release its memory.
The image data will be unusable afterward.
This function is only required to close images that have not
had their file read and closed by the
:py:meth:`~PIL.Image.Image.load` method.
"""
try:
self.fp.close()
except Exception as msg:
logger.debug("Error closing: %s", msg)
# Instead of simply setting to None, we're setting up a
# deferred error that will better explain that the core image
# object is gone.
self.im = deferred_error(ValueError("Operation on closed image"))
def terminate(self):
"""Close pipe and terminate child process.
"""
self.close()
super(Popen, self).terminate()
def close(self):
"""Similar to 'close' of file descriptor.
"""
if self._handle:
try:
flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
except:
flags = 0
if flags & win32con.PIPE_SERVER_END:
win32pipe.DisconnectNamedPipe(self._handle)
# TODO: if pipe, need to call FlushFileBuffers?
def _close_(rc, n):
win32file.CloseHandle(self._handle)
self._overlap = None
if self._notifier:
self._notifier.unregister(self._handle)
self._handle = None
self._read_result = self._write_result = None
self._read_task = self._write_task = None
self._buflist = []
if self._overlap.object:
self._overlap.object = _close_
win32file.CancelIo(self._handle)
else:
_close_(0, 0)
def close(self):
"""Close file descriptor.
"""
if self._fileno:
self._notifier.unregister(self)
if self._fd:
self._fd.close()
self._fd = self._fileno = None
self._read_task = self._write_task = None
self._read_fn = self._write_fn = None
self._buflist = []
def close(self):
"""Close pipe.
"""
self.first = None
self.last = None
def __exit__(self, exc_type, exc_value, trace):
self.close()
return True
def close(self):
"""'close' must be called when done with socket.
"""
self._unregister()
if self._rsock:
self._rsock.close()
self._rsock = None
self._read_fn = self._write_fn = None
self._read_task = self._write_task = None
def terminate(self):
if self.iocp:
self.async_poller.terminate()
self.cmd_rsock.close()
self.cmd_wsock.close()
self.cmd_rsock_buf = None
iocp, self.iocp = self.iocp, None
win32file.CloseHandle(iocp)
self._timeouts = []
self.cmd_rsock = self.cmd_wsock = None
self.__class__._instance = None
def terminate(self):
"""Close pipe and terminate child process.
"""
self.close()
super(Popen, self).terminate()
def close(self):
"""Similar to 'close' of file descriptor.
"""
if self._handle:
try:
flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
except:
flags = 0
if flags & win32con.PIPE_SERVER_END:
win32pipe.DisconnectNamedPipe(self._handle)
# TODO: if pipe, need to call FlushFileBuffers?
def _close_(rc, n):
win32file.CloseHandle(self._handle)
self._overlap = None
if self._notifier:
self._notifier.unregister(self._handle)
self._handle = None
self._read_result = self._write_result = None
self._read_task = self._write_task = None
self._buflist = []
if self._overlap.object:
self._overlap.object = _close_
win32file.CancelIo(self._handle)
else:
_close_(0, 0)
def __exit__(self, exc_type, exc_value, trace):
self.close()
return True
def close(self):
"""Close pipe.
"""
self.first = None
self.last = None
def __exit__(self, exc_type, exc_value, trace):
self.close()
return True
def terminate(self):
if self.iocp:
self.async_poller.terminate()
self.cmd_rsock.close()
self.cmd_wsock.close()
self.cmd_rsock_buf = None
iocp, self.iocp = self.iocp, None
win32file.CloseHandle(iocp)
self._timeouts = []
self.cmd_rsock = self.cmd_wsock = None
self.__class__._instance = None
def close(self):
if not self._location:
self.unregister()
self._subscribers = set()
self._scheduler._lock.acquire()
self._scheduler._channels.pop(self._name, None)
self._scheduler._lock.release()
def tmpfilepath():
fd, fname = tempfile.mkstemp(prefix='djpt-test-')
os.close(fd)
yield fname
os.remove(fname)
def __init__(self, content):
if not isinstance(content, str):
raise Exception("Note content must be an instance "
"of string, '%s' given." % type(content))
(tempfileHandler, tempfileName) = tempfile.mkstemp(suffix=".markdown")
os.write(tempfileHandler, self.ENMLtoText(content))
os.close(tempfileHandler)
self.content = content
self.tempfile = tempfileName
def close(self):
"""Close the ssh connection"""
ftp = self.client.open_sftp()
ftp.remove(self.script_filename)
ftp.close()
self.client.close()
def upload_file(self, filename, content, mode=None):
ftp = self.client.open_sftp()
file = ftp.file(filename, 'w', -1)
file.write(content)
file.flush()
if mode:
ftp.chmod(self.script_filename, 0o744)
ftp.close()