Python os 模块,set_blocking() 实例源码
我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用os.set_blocking()。
def __init__(self, args, **kwargs):
if 'universal_newlines' in kwargs:
raise RuntimeError('universal_newlines argument not supported')
# If stdin has been given and it's set to a curio FileStream object,
# then we need to flip it to blocking.
if 'stdin' in kwargs:
stdin = kwargs['stdin']
if isinstance(stdin, FileStream):
# At hell's heart I stab thy coroutine attempting to read from a stream
# that's been used as a pipe input to a subprocess. Must set back to
# blocking or all hell breaks loose in the child.
os.set_blocking(stdin.fileno(), True)
self._popen = subprocess.Popen(args, **kwargs)
if self._popen.stdin:
self.stdin = FileStream(self._popen.stdin)
if self._popen.stdout:
self.stdout = FileStream(self._popen.stdout)
if self._popen.stderr:
self.stderr = FileStream(self._popen.stderr)
def _set_nonblocking(fd):
os.set_blocking(fd, False)
def check_reopen(r1, w):
try:
print("Reopening read end")
r2 = os.open("/proc/self/fd/{}".format(r1), os.O_RDONLY)
print("r1 is {}, r2 is {}".format(r1, r2))
print("checking they both can receive from w...")
os.write(w, b"a")
assert os.read(r1, 1) == b"a"
os.write(w, b"b")
assert os.read(r2, 1) == b"b"
print("...ok")
print("setting r2 to non-blocking")
os.set_blocking(r2, False)
print("os.get_blocking(r1) ==", os.get_blocking(r1))
print("os.get_blocking(r2) ==", os.get_blocking(r2))
# Check r2 is really truly non-blocking
try:
os.read(r2, 1)
except BlockingIOError:
print("r2 definitely seems to be in non-blocking mode")
# Check that r1 is really truly still in blocking mode
def sleep_then_write():
time.sleep(1)
os.write(w, b"c")
threading.Thread(target=sleep_then_write, daemon=True).start()
assert os.read(r1, 1) == b"c"
print("r1 definitely seems to be in blocking mode")
except Exception as exc:
print("ERROR: {!r}".format(exc))
def __init__(self, fd, map=None):
dispatcher.__init__(self, None, map)
self.connected = True
try:
fd = fd.fileno()
except AttributeError:
pass
self.set_file(fd)
# set it to non-blocking mode
os.set_blocking(fd, False)
def __init__(self, fileobj):
assert not isinstance(fileobj, io.TextIOBase), 'Only binary mode files allowed'
super().__init__(fileobj)
os.set_blocking(int(self._fileno), False)
# Common bound methods
self._file_read = fileobj.read
self._file_write = fileobj.write
def blocking(self):
'''
Allow temporary access to the underlying file in blocking mode
'''
if self._buffer:
raise IOError('There is unread buffered data.')
try:
os.set_blocking(int(self._fileno), True)
yield self._file
finally:
os.set_blocking(int(self._fileno), False)