我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.errorcode()。
def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: self.error = socket.error(err, os.strerror(err)) # IOLoop implementations may vary: some of them return # an error state before the socket becomes writable, so # in that case a connection failure would be handled by the # error path in _handle_events instead of here. if self._connect_future is None: gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) if self._connect_future is not None: future = self._connect_future self._connect_future = None future.set_result(self) self._connecting = False
def _serve_one_listener(listener, handler_nursery, handler): async with listener: while True: try: stream = await listener.accept() except OSError as exc: if exc.errno in ACCEPT_CAPACITY_ERRNOS: LOGGER.error( "accept returned %s (%s); retrying in %s seconds", errno.errorcode[exc.errno], os.strerror(exc.errno), SLEEP_TIME, exc_info=True ) await trio.sleep(SLEEP_TIME) else: raise else: handler_nursery.start_soon(_run_handler, stream, handler)
def init(self, *event_handlers): """ Attempt to get an inotify watch on the specified path. Add event handler callbacks to the inotify hook """ try: self.watch = inotify.adapters.InotifyTree(self.rootpath, mask=_EVENT_MASK) except inotify.calls.InotifyError as err: raise OSError('Could not initialize inotify API: {} ({})'.format( errno.errorcode[err.errno], err.errno )) for h in event_handlers: func, mask = h self.add_event_handler(func, mask)
def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: # IOLoop implementations may vary: some of them return # an error state before the socket becomes writable, so # in that case a connection failure would be handled by the # error path in _handle_events instead of here. logging.warning("Connect error on fd %d: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) self._connecting = False
def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: self.error = socket.error(err, os.strerror(err)) # IOLoop implementations may vary: some of them return # an error state before the socket becomes writable, so # in that case a connection failure would be handled by the # error path in _handle_events instead of here. gen_log.warning("Connect error on fd %d: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) self._connecting = False
def test_length_limit(self): """Confirm that length limit is max_len Some of these tests depend on the length being at most max_len, so check to make sure it's accurate. """ Myrm(self.out_rp.path) self.out_rp.mkdir() really_long = self.out_rp.append('a'*max_len) really_long.touch() try: too_long = self.out_rp.append("a"*(max_len+1)) except EnvironmentError, e: assert errno.errorcode[e[0]] == 'ENAMETOOLONG', e else: assert 0, "File made successfully with length " + str(max_len+1)
def catch_error(exc): """Return true if exception exc should be caught""" for exception_class in (rpath.SkipFileException, rpath.RPathException, librsync.librsyncError, C.UnknownFileTypeError, zlib.error): if isinstance(exc, exception_class): return 1 if (isinstance(exc, EnvironmentError) and # the invalid mode shows up in backups of /proc for some reason (exc[0] in ('invalid mode: rb', 'Not a gzipped file') or errno.errorcode.has_key(exc[0]) and errno.errorcode[exc[0]] in ('EPERM', 'ENOENT', 'EACCES', 'EBUSY', 'EEXIST', 'ENOTDIR', 'EILSEQ', 'ENAMETOOLONG', 'EINTR', 'ESTALE', 'ENOTEMPTY', 'EIO', 'ETXTBSY', 'ESRCH', 'EINVAL', 'EDEADLOCK', 'EDEADLK', 'EOPNOTSUPP', 'ETIMEDOUT'))): return 1 return 0
def setdata(self): """Refresh stat cache""" try: # We may be asked to look at the target of symlinks rather than # the link itself. if globals.copy_links: self.stat = os.stat(self.name) else: self.stat = os.lstat(self.name) except OSError as e: err_string = errno.errorcode[e[0]] if err_string in ["ENOENT", "ENOTDIR", "ELOOP", "ENOTCONN"]: self.stat, self.type = None, None # file doesn't exist self.mode = None else: raise else: self.set_from_stat() if self.issym(): self.symtext = os.readlink(self.name)
def run(self): inputready = None outputready = None exceptready = None while True: try: time.sleep(relay.delay) logger.debug("Active channels: {0}. Pending Channels {1}".format(self.channel.keys(), self.establishing_dict.values())) inputready, outputready, exceptready = select.select(self.input_list, self.establishing_dict.keys(), [], 15) except KeyboardInterrupt: logger.info('SIGINT received. Closing relay and exiting') self.send_remote_cmd(self.bc_sock, relay.CLOSE_RELAY) self.shutdown() except select.error as (code, msg): logger.debug('Select error on select. Errno: {0} Msg: {1}'.format(errno.errorcode[code], msg)) self.shutdown() except socket.error as (code, msg): logger.debug('Socket error on select. Errno: {0} Msg: {1}'.format(errno.errorcode[code], msg)) self.shutdown()
def relay(self, data, to_socket): if to_socket is None: return try: to_socket.send(data) except socket.error as (code, msg): logger.debug('Exception on relaying data to socket {0}'.format(to_socket)) logger.debug('Errno: {0} Msg: {1}'.format(errno.errorcode[code], msg)) if to_socket == self.bc_sock: raise relay.RelayError else: logger.debug('Closing socket') to_socket.close() self.input_list.remove(to_socket) channel_id = self.id_by_socket[to_socket] self.unset_channel(channel_id) self.send_remote_cmd(self.socket_with_server, relay.CHANNEL_CLOSE_CMD, channel_id)
def relay(self, data, to_socket): if to_socket is None: return try: to_socket.send(data) except socket.error as (code, msg): logger.debug('Exception on relaying data to socket {0}'.format(to_socket)) logger.debug('Errno: {0} Msg: {1}'.format(errno.errorcode[code], msg)) if to_socket == self.socket_with_server: raise relay.RelayError else: logger.debug('Closing socket') to_socket.close() self.input_list.remove(to_socket) channel_id = self.id_by_socket[to_socket] self.unset_channel(channel_id) self.send_remote_cmd(self.socket_with_server, relay.CHANNEL_CLOSE_CMD, channel_id)
def _remove_watch_for_path(self, path): # Must be called with _inotify_fd_lock held. logging.debug('_remove_watch_for_path(%r)', path) wd = self._directory_to_watch_descriptor[path] if _libc.inotify_rm_watch(self._inotify_fd, wd) < 0: # If the directory is deleted then the watch will removed automatically # and inotify_rm_watch will fail. Just log the error. logging.debug('inotify_rm_watch failed for %r: %d [%r]', path, ctypes.get_errno(), errno.errorcode[ctypes.get_errno()]) parent_path = os.path.dirname(path) if parent_path in self._directory_to_subdirs: self._directory_to_subdirs[parent_path].remove(path) # _directory_to_subdirs must be copied because it is mutated in the # recursive call. for subdir in frozenset(self._directory_to_subdirs[path]): self._remove_watch_for_path(subdir) del self._watch_to_directory[wd] del self._directory_to_watch_descriptor[path] del self._directory_to_subdirs[path]
def clean_up(self): if self.sock is None: print("Double clean_up", flush=True) return err = self.bluez.hci_le_set_scan_enable( self.sock.fileno(), 0, # 1 - turn on; 0 - turn off 0, # 0-filtering disabled, 1-filter out duplicates 1000 # timeout ) if err < 0: errnum = get_errno() print("{} {}".format( errno.errorcode[errnum], os.strerror(errnum) )) self.sock.close() self.sock = None
def _acceptFailureTest(self, socketErrorNumber): """ Test behavior in the face of an exception from C{accept(2)}. On any exception which indicates the platform is unable or unwilling to allocate further resources to us, the existing port should remain listening, a message should be logged, and the exception should not propagate outward from doRead. @param socketErrorNumber: The errno to simulate from accept. """ class FakeSocket(object): """ Pretend to be a socket in an overloaded system. """ def accept(self): raise socket.error( socketErrorNumber, os.strerror(socketErrorNumber)) factory = ServerFactory() port = self.port(0, factory, interface='127.0.0.1') originalSocket = port.socket try: port.socket = FakeSocket() port.doRead() expectedFormat = "Could not accept new connection (%s)" expectedErrorCode = errno.errorcode[socketErrorNumber] expectedMessage = expectedFormat % (expectedErrorCode,) for msg in self.messages: if msg.get('message') == (expectedMessage,): break else: self.fail("Log event for failed accept not found in " "%r" % (self.messages,)) finally: port.socket = originalSocket
def __str__(self): return "nfc.llcp.Error: [{0}] {1}".format( errno.errorcode[self.errno], self.strerror)
def __str__(self): return "nfc.llcp.ConnectRefused: [{0}] {1} with reason {2}".format( errno.errorcode[self.errno], self.strerror, self.reason)
def str_errno(self): code = self.get_errno() if code is None: return 'Errno: no errno support' return 'Errno=%s (%s)' % (os.strerror(code), errno.errorcode[code])
def test_using_errorcode(self): # Every key value in errno.errorcode should be on the module. for value in errno.errorcode.values(): self.assertTrue(hasattr(errno, value), 'no %s attr in errno' % value)
def test_attributes_in_errorcode(self): for attribute in errno.__dict__.keys(): if attribute.isupper(): self.assertIn(getattr(errno, attribute), errno.errorcode, 'no %s attr in errno.errorcode' % attribute)
def handle_stream_closed_error(self, error, event): if error.real_error: err_num = abs(error.real_error[0]) try: errorcode = errno.errorcode[err_num] except KeyError: errorcode = "undefined(code={0})".format(err_num) logger.debug("connect to {0}:{1} with error code {2}".format( event.addr, event.port, errorcode)) # NOTE: if we submit an incorrect address type, # the error code will be: # - ENOEXEC in macos. # - EBADF in linux. if err_num in (errno.ENOEXEC, errno.EBADF): reason = "ADDRESS_TYPE_NOT_SUPPORTED" elif err_num == errno.ETIMEDOUT: reason = "NETWORK_UNREACHABLE" else: logger.error("unhandled error code {0} received".format(errorcode)) reason = "GENRAL_FAILURE" yield self.send_event_to_src_conn(Response( RESP_STATUS[reason], event.atyp, event.addr, event.port), raise_exception=False) raise DestNotConnectedError((event.addr, event.port)) else: # pragma: no cover # TODO: StreamClosedError without real_error? # need check that tornado would occur this situation? raise
def test_using_errorcode(self): # Every key value in errno.errorcode should be on the module. for value in errno.errorcode.itervalues(): self.assertTrue(hasattr(errno, value), 'no %s attr in errno' % value)
def test_attributes_in_errorcode(self): for attribute in errno.__dict__.iterkeys(): if attribute.isupper(): self.assertIn(getattr(errno, attribute), errno.errorcode, 'no %s attr in errno.errorcode' % attribute)
def test_errno_mapping(self): # The OSError constructor maps errnos to subclasses # A sample test for the basic functionality e = OSError(EEXIST, "Bad file descriptor") self.assertIs(type(e), FileExistsError) # Exhaustive testing for errcode, exc in self._map.items(): e = OSError(errcode, "Some message") self.assertIs(type(e), exc) othercodes = set(errno.errorcode) - set(self._map) for errcode in othercodes: e = OSError(errcode, "Some message") self.assertIs(type(e), OSError)
def _IOR(type, nr, struct): request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(struct)) def f(fileno): buffer = struct() if c.ioctl(fileno, request, ctypes.byref(buffer)) < 0: err = ctypes.c_int.in_dll(c, 'errno').value raise OSError(err, errno.errorcode[err]) return buffer return f
def _IOR_len(type, nr): def f(fileno, buffer): request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(buffer)) if c.ioctl(fileno, request, ctypes.byref(buffer)) < 0: err = ctypes.c_int.in_dll(c, 'errno').value raise OSError(err, errno.errorcode[err]) return buffer return f
def socket_connect(descriptor, address): """ Attempts to connect to the address, returns the descriptor if it succeeds, returns None if it needs to trampoline, and raises any exceptions. """ err = descriptor.connect_ex(address) if err in CONNECT_ERR: return None if err not in CONNECT_SUCCESS: raise socket.error(err, errno.errorcode[err]) return descriptor