Python os 模块,O_NONBLOCK 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.O_NONBLOCK。
def __init__(self, slave=0, pid=os.getpid()):
# apparently python GC's modules before class instances so, here
# we have some hax to ensure we can restore the terminal state.
self.termios, self.fcntl = termios, fcntl
# open our controlling PTY
self.pty = open(os.readlink("/proc/%d/fd/%d" % (pid, slave)), "rb+")
# store our old termios settings so we can restore after
# we are finished
self.oldtermios = termios.tcgetattr(self.pty)
# get the current settings se we can modify them
newattr = termios.tcgetattr(self.pty)
# set the terminal to uncanonical mode and turn off
# input echo.
newattr[3] &= ~termios.ICANON & ~termios.ECHO
# don't handle ^C / ^Z / ^\
newattr[6][termios.VINTR] = '\x00'
newattr[6][termios.VQUIT] = '\x00'
newattr[6][termios.VSUSP] = '\x00'
# set our new attributes
termios.tcsetattr(self.pty, termios.TCSADRAIN, newattr)
# store the old fcntl flags
self.oldflags = fcntl.fcntl(self.pty, fcntl.F_GETFL)
# fcntl.fcntl(self.pty, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
# make the PTY non-blocking
fcntl.fcntl(self.pty, fcntl.F_SETFL, self.oldflags | os.O_NONBLOCK)
def testOutputStreams(self):
output_spec = {
'mode': 'http',
'method': 'PUT',
'url': 'http://localhost:%d' % _socket_port
}
fd = os.open(_pipepath, os.O_RDONLY | os.O_NONBLOCK)
adapters = {
fd: make_stream_push_adapter(output_spec)
}
cmd = [sys.executable, _oscript, _pipepath]
try:
with captureOutput() as stdpipes:
run_process(cmd, adapters)
except Exception:
print('Stdout/stderr from exception: ')
print(stdpipes)
raise
self.assertEqual(stdpipes, ['start\ndone\n', ''])
self.assertEqual(len(_req_chunks), 1)
self.assertEqual(_req_chunks[0], (9, 'a message'))
def _open_ipipes(wds, fifos, input_pipes):
"""
This will attempt to open the named pipes in the set of ``fifos`` for
writing, which will only succeed if the subprocess has opened them for
reading already. This modifies and returns the list of write descriptors,
the list of waiting fifo names, and the mapping back to input adapters.
"""
for fifo in fifos.copy():
try:
fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK)
input_pipes[fd] = fifos.pop(fifo)
wds.append(fd)
except OSError as e:
if e.errno != errno.ENXIO:
raise e
return wds, fifos, input_pipes
def follow_file(filename):
while True:
try:
fd = os.open(filename, os.O_RDONLY | os.O_NONBLOCK)
except OSError:
yield None
continue
try:
inode = os.fstat(fd).st_ino
first = True
while True:
try:
stat = os.stat(filename)
except OSError:
stat = None
yield first, time.time(), fd
if stat is None or inode != stat.st_ino:
break
first = False
finally:
os.close(fd)
def getInputDevices(self):
devices = os.listdir("/dev/input/")
for evdev in devices:
try:
buffer = "\0"*512
self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
self.name = self.name[:self.name.find("\0")]
if str(self.name).find("Keyboard") != -1:
self.name = 'keyboard'
os.close(self.fd)
except (IOError,OSError), err:
print '[iInputDevices] getInputDevices <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
self.name = None
if self.name:
self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
if boxtype.startswith('et'):
self.setDefaults(evdev) # load default remote control "delay" and "repeat" values for ETxxxx ("QuickFix Scrollspeed Menues" proposed by Xtrend Support)
def _communicate(self):
import fcntl
oldflags = fcntl.fcntl(self.__inr, fcntl.F_GETFL)
fcntl.fcntl(self.__inr, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
while not self._exit:
events = select([self._agent._conn, self.__inr], [], [], 0.5)
for fd in events[0]:
if self._agent._conn == fd:
data = self._agent._conn.recv(512)
if len(data) != 0:
self.__inr.send(data)
else:
self._close()
break
elif self.__inr == fd:
data = self.__inr.recv(512)
if len(data) != 0:
self._agent._conn.send(data)
else:
self._close()
break
time.sleep(io_sleep)
def train(self):
i=0
if(self.args.ipython):
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
while(i < self.total_steps or self.total_steps == -1):
i+=1
start_time = time.time()
self.step()
if (self.args.save_every != None and
self.args.save_every != -1 and
self.args.save_every > 0 and
i % self.args.save_every == 0):
print(" |= Saving network")
self.gan.save(self.save_file)
if self.args.ipython:
self.check_stdin()
end_time = time.time()
def keyboard(callback, exit='q'):
fd = sys.stdin.fileno()
oldterm = termios.tcgetattr(fd)
newattr = termios.tcgetattr(fd)
newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, newattr)
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
try:
while True:
try:
ch = sys.stdin.read(1)
if ch:
callback(ch)
if ch == exit:
break
except IOError:
pass
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
def _open_endpoint_fd(self, ep):
'''
:param ep: USBEndpoint object
:raises Exception: if no endpoint file found, or failed to open
.. todo:: detect transfer-type specific endpoint files
'''
num = ep.number
s_dir = 'out' if ep.direction == USBEndpoint.direction_out else 'in'
filename = 'ep%d%s' % (num, s_dir)
path = os.path.join(self.gadgetfs_dir, filename)
fd = os.open(path, os.O_RDWR | os.O_NONBLOCK)
self.debug('Opened endpoint %d' % (num))
self.debug('ep: %d dir: %s file: %s fd: %d' % (num, s_dir, filename, fd))
ep.fd = fd
def init_func(self, creator_fd, tun_dev_name, *args, **kwargs):
"""
:param creator_fd:
:param tun_dev_name:tun ????
:param subnet:?????????????
"""
tun_fd = self.__create_tun_dev(tun_dev_name)
if tun_fd < 3:
print("error:create tun device failed:%s" % tun_dev_name)
sys.exit(-1)
self.__creator_fd = creator_fd
self.__qos = simple_qos.qos(simple_qos.QTYPE_DST)
self.set_fileno(tun_fd)
fcntl.fcntl(tun_fd, fcntl.F_SETFL, os.O_NONBLOCK)
self.dev_init(tun_dev_name, *args, **kwargs)
return tun_fd
def getInputDevices(self):
devices = os.listdir("/dev/input/")
for evdev in devices:
try:
buffer = "\0"*512
self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
self.name = self.name[:self.name.find("\0")]
os.close(self.fd)
except (IOError,OSError), err:
print '[iInputDevices] getInputDevices ' + evdev + ' <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
self.name = None
if self.name:
self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
def __init__(self, host='127.0.0.1', port=23, application=None, encoding='utf-8'):
assert isinstance(host, text_type)
assert isinstance(port, int)
assert isinstance(application, TelnetApplication)
assert isinstance(encoding, text_type)
self.host = host
self.port = port
self.application = application
self.encoding = encoding
self.connections = set()
self._calls_from_executor = []
# Create a pipe for inter thread communication.
self._schedule_pipe = os.pipe()
fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
def __init__(self, inputhook=None, selector=AutoSelector):
assert inputhook is None or callable(inputhook)
assert issubclass(selector, Selector)
self.running = False
self.closed = False
self._running = False
self._callbacks = None
self._calls_from_executor = []
self._read_fds = {} # Maps fd to handler.
self.selector = selector()
# Create a pipe for inter thread communication.
self._schedule_pipe = os.pipe()
fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
# Create inputhook context.
self._inputhook_context = InputHookContext(inputhook) if inputhook else None
def __init__(self, host='127.0.0.1', port=23, application=None, encoding='utf-8'):
assert isinstance(host, text_type)
assert isinstance(port, int)
assert isinstance(application, TelnetApplication)
assert isinstance(encoding, text_type)
self.host = host
self.port = port
self.application = application
self.encoding = encoding
self.connections = set()
self._calls_from_executor = []
# Create a pipe for inter thread communication.
self._schedule_pipe = os.pipe()
fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
def __init__(self, inputhook=None, selector=AutoSelector):
assert inputhook is None or callable(inputhook)
assert issubclass(selector, Selector)
self.running = False
self.closed = False
self._running = False
self._callbacks = None
self._calls_from_executor = []
self._read_fds = {} # Maps fd to handler.
self.selector = selector()
# Create a pipe for inter thread communication.
self._schedule_pipe = os.pipe()
fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
# Create inputhook context.
self._inputhook_context = InputHookContext(inputhook) if inputhook else None
def start_pty(self, *args):
if not self.container_id:
self.send_error_and_close("Error: container not found.")
return
try:
# create a pseudo terminal of container by command:
# `docker exec -ti <container_id> /bin/sh -c '[ -x /bin/bash ] && /bin/bash || /bin/sh'`
# and then set the stream to non-blocking mode.
pty = PtyProcessUnicode.spawn(
['docker', 'exec', '-ti', self.container_id, '/bin/sh', '-c',
'echo $$ > /tmp/sh.pid.{} && [ -x /bin/bash ] && /bin/bash || /bin/sh'.format(self.uuid)])
flags = fcntl(pty.fileobj, F_GETFL)
fcntl(pty.fileobj, F_SETFL, flags | O_NONBLOCK)
setattr(self, "pty", pty)
TerminalSocketHandler.clients.update({self: pty})
logger.info('Connect to console of container {}'.format(self.container_id))
except Exception as e:
self.send_error_and_close("Error: cannot start console: {}".format(e))
def open(self):
if self._f:
raise Exception("{} at {} is already "
"opened".format(self.name, self._f_name))
self._f = open("/dev/input/js{}".format(self.num), "rb")
fcntl.fcntl(self._f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
# Get number of axis and button
val = ctypes.c_int()
if fcntl.ioctl(self._f.fileno(), JSIOCGAXES, val) != 0:
self._f.close()
self._f = None
raise Exception("Failed to read number of axes")
self.axes = list(0 for i in range(val.value))
if fcntl.ioctl(self._f.fileno(), JSIOCGBUTTONS, val) != 0:
self._f.close()
self._f = None
raise Exception("Failed to read number of axes")
self.buttons = list(0 for i in range(val.value))
self.__initvalues()
def _setup_pipe(self, name):
from fcntl import fcntl, F_GETFL, F_SETFL
real_fd = getattr(sys, '__%s__' % name).fileno()
save_fd = os.dup(real_fd)
self._save_fds[name] = save_fd
pipe_out, pipe_in = os.pipe()
os.dup2(pipe_in, real_fd)
os.close(pipe_in)
self._real_fds[name] = real_fd
# make pipe_out non-blocking
flags = fcntl(pipe_out, F_GETFL)
fcntl(pipe_out, F_SETFL, flags | os.O_NONBLOCK)
return pipe_out
def open_fifo(self):
try:
os.mkfifo(self.copytool.event_fifo)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
pids = lsof(file=self.copytool.event_fifo)
readers = set()
writers = set()
for pid, files in pids.items():
for file, info in files.items():
if 'r' in info['mode']:
readers.add(pid)
if 'w' in info['mode']:
writers.add(pid)
if readers:
raise FifoReaderConflict(readers)
self.reader_fd = os.open(self.copytool.event_fifo,
os.O_RDONLY | os.O_NONBLOCK)
copytool_log.info("Opened %s for reading" % self.copytool.event_fifo)
def _communicate(self):
import fcntl
oldflags = fcntl.fcntl(self.__inr, fcntl.F_GETFL)
fcntl.fcntl(self.__inr, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
while not self._exit:
events = select([self._agent._conn, self.__inr], [], [], 0.5)
for fd in events[0]:
if self._agent._conn == fd:
data = self._agent._conn.recv(512)
if len(data) != 0:
self.__inr.send(data)
else:
self._close()
break
elif self.__inr == fd:
data = self.__inr.recv(512)
if len(data) != 0:
self._agent._conn.send(data)
else:
self._close()
break
time.sleep(io_sleep)
def set_nonblock(self, set_flag=True):
"""Set the non blocking flag on the socket"""
# Get the current flags
if self.fd_flags is None:
try:
self.fd_flags = fcntl.fcntl(self.ins, fcntl.F_GETFL)
except IOError, err:
warning("Can't get flags on this file descriptor !")
return
# Set the non blocking flag
if set_flag:
new_fd_flags = self.fd_flags | os.O_NONBLOCK
else:
new_fd_flags = self.fd_flags & ~os.O_NONBLOCK
try:
fcntl.fcntl(self.ins, fcntl.F_SETFL, new_fd_flags)
self.fd_flags = new_fd_flags
except:
warning("Can't set flags on this file descriptor !")
def _recv(self, which, maxsize):
conn, maxsize = self.get_conn_maxsize(which, maxsize)
if conn is None:
return None
flags = fcntl.fcntl(conn, fcntl.F_GETFL)
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
if not select.select([conn], [], [], 0)[0]:
return ''
r = conn.read(maxsize)
if not r:
return self._close(which)
if self.universal_newlines:
r = self._translate_newlines(r)
return r
finally:
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
def open_terminal(command="bash", columns=80, lines=24):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
path, *args = shlex.split(command)
args = [path] + args
env = dict(TERM="linux", LC_ALL="en_GB.UTF-8",
COLUMNS=str(columns), LINES=str(lines))
try:
os.execvpe(path, args, env)
except FileNotFoundError:
print("Could not find the executable in %s. Press any key to exit." % path)
exit()
# set non blocking read
flag = fcntl.fcntl(master_fd, fcntl.F_GETFD)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines), p_pid, p_out
def __init__(self, inputhook=None, selector=AutoSelector):
assert inputhook is None or callable(inputhook)
assert issubclass(selector, Selector)
super(PosixEventLoop, self).__init__()
self.closed = False
self._running = False
self._calls_from_executor = []
self._read_fds = {} # Maps fd to handler.
self.selector = selector()
self._signal_handler_mappings = {} # signal: previous_handler
# Create a pipe for inter thread communication.
self._schedule_pipe = os.pipe()
fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
self.selector.register(self._schedule_pipe[0])
# Create inputhook context.
self._inputhook_context = InputHookContext(inputhook) if inputhook else None
def _make_non_blocking(file_obj):
"""make file object non-blocking
Windows doesn't have the fcntl module, but someone on
stack overflow supplied this code as an answer, and it works
http://stackoverflow.com/a/34504971/2893090"""
if USING_WINDOWS:
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(file_obj.fileno())
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
raise ValueError(WinError())
else:
# Set the file status flag (F_SETFL) on the pipes to be non-blocking
# so we can attempt to read from a pipe with no new data without locking
# the program up
fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK)
def _recv(self, which, maxsize):
conn, maxsize = self.get_conn_maxsize(which, maxsize)
if conn is None:
return None
flags = fcntl.fcntl(conn, fcntl.F_GETFL)
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
try:
if not select.select([conn], [], [], 0)[0]:
return ''
r = conn.read(maxsize)
if not r:
return self._close(which)
if self.universal_newlines:
r = r.replace("\r\n", "\n").replace("\r", "\n")
return r
finally:
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
################################################################################
def getInputDevices(self):
devices = sorted(os.listdir("/dev/input/"))
for evdev in devices:
try:
buffer = "\0"*512
self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
self.name = self.name[:self.name.find("\0")]
if str(self.name).find("Keyboard") != -1:
self.name = 'keyboard'
os.close(self.fd)
except (IOError,OSError), err:
print "[InputDevice] Error: evdev='%s' getInputDevices <ERROR: ioctl(EVIOCGNAME): '%s'>" % (evdev, str(err))
self.name = None
if self.name:
devtype = self.getInputDeviceType(self.name)
print "[InputDevice] Found: evdev='%s', name='%s', type='%s'" % (evdev, self.name, devtype)
self.Devices[evdev] = {'name': self.name, 'type': devtype, 'enabled': False, 'configuredName': None }
def __init__(self, fd):
"""'fd' is either a file object (e.g., obtained with 'open')
or a file number (e.g., obtained with socket's fileno()).
"""
if hasattr(fd, 'fileno'):
self._fd = fd
self._fileno = fd.fileno()
elif isinstance(fd, int):
self._fd, self._fileno = None, self._fd
else:
raise ValueError('invalid file descriptor')
self._pycos = Pycos.scheduler()
if self._pycos:
self._notifier = self._pycos._notifier
if hasattr(fd, '_fileno'): # assume it is AsyncSocket
self._notifier.unregister(fd)
else:
self._notifier = None
self._timeout = None
self._read_task = None
self._read_fn = None
self._write_task = None
self._write_fn = None
self._buflist = []
flags = fcntl.fcntl(self._fileno, fcntl.F_GETFL)
fcntl.fcntl(self._fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def __init__(self, fd):
"""'fd' is either a file object (e.g., obtained with 'open')
or a file number (e.g., obtained with socket's fileno()).
"""
if hasattr(fd, 'fileno'):
self._fd = fd
self._fileno = fd.fileno()
elif isinstance(fd, int):
self._fd, self._fileno = None, self._fd
else:
raise ValueError('invalid file descriptor')
self._pycos = Pycos.scheduler()
if self._pycos:
self._notifier = self._pycos._notifier
if hasattr(fd, '_fileno'): # assume it is AsyncSocket
self._notifier.unregister(fd)
else:
self._notifier = None
self._timeout = None
self._read_task = None
self._read_fn = None
self._write_task = None
self._write_fn = None
self._buflist = []
flags = fcntl.fcntl(self._fileno, fcntl.F_GETFL)
fcntl.fcntl(self._fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def __init__(self, fd, map=None):
dispatcher.__init__(self, None, map)
self.connected = True
try:
fd = fd.fileno()
except AttributeError:
pass
self.set_file(fd)
# set it to non-blocking mode
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def open(self):
self._pipe.open(os.O_RDONLY | os.O_NONBLOCK)
def open(self):
self._pipe.open(os.O_WRONLY | os.O_NONBLOCK)
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def __init__(self, connection):
connection.setblocking(0)
#//fcntl.fcntl(connection.fileno(), fcntl.F_SETFD, os.O_NONBLOCK)
# somehow it's much slower to baseclass ?!?
#super(LineBufferedNonBlockingSocket, self).__init__(connection.fileno())
self.b = linebuffer.LineBuffer(connection.fileno())
self.socket = connection
self.out_buffer = ''
self.pollout = select.poll()
self.pollout.register(connection, select.POLLOUT)
self.sendfail_msg = self.sendfail_cnt = 0
def init(cls):
"""
Creates a pipe for waking up a select call when a signal has been received.
"""
cls.__wake_up_pipe = os.pipe()
fcntl.fcntl(cls.__wake_up_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
signal.set_wakeup_fd(EventQueueEmptyEventHandler.__wake_up_pipe[1])
# ------------------------------------------------------------------------------------------------------------------
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def setNonBlocking(fd):
"""Make a fd non-blocking."""
flags = fcntl.fcntl(fd, FCNTL.F_GETFL)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, FCNTL.F_SETFL, flags)
def setBlocking(fd):
"""Make a fd blocking."""
flags = fcntl.fcntl(fd, FCNTL.F_GETFL)
flags = flags & ~os.O_NONBLOCK
fcntl.fcntl(fd, FCNTL.F_SETFL, flags)
def __init__(self, fd, map=None):
dispatcher.__init__(self, None, map)
self.connected = True
try:
fd = fd.fileno()
except AttributeError:
pass
self.set_file(fd)
# set it to non-blocking mode
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
self.reset_input_buffer()
def nonblocking(self):
"""internal - not portable!"""
if not self.is_open:
raise portNotOpenError
fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NONBLOCK)
def _reconfigure_port(self, force_update=True):
"""Set communication parameters on opened port."""
super(VTIMESerial, self)._reconfigure_port()
fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK
if self._inter_byte_timeout is not None:
vmin = 1
vtime = int(self._inter_byte_timeout * 10)
else:
vmin = 0
vtime = int(self._timeout * 10)
try:
orig_attr = termios.tcgetattr(self.fd)
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here
raise serial.SerialException("Could not configure port: %s" % msg)
if vtime < 0 or vtime > 255:
raise ValueError('Invalid vtime: %r' % vtime)
cc[termios.VTIME] = vtime
cc[termios.VMIN] = vmin
termios.tcsetattr(
self.fd,
termios.TCSANOW,
[iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def __setNonBlocking( self, fileobj ):
fl = fcntl.fcntl( fileobj.fileno(), fcntl.F_GETFL )
fcntl.fcntl( fileobj.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)