我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.EOPNOTSUPP。
def queryWirelessDevice(self,iface): try: from pythonwifi.iwlibs import Wireless import errno except ImportError: return False else: try: ifobj = Wireless(iface) # a Wireless NIC Object wlanresponse = ifobj.getAPaddr() except IOError, (error_no, error_str): if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM): return False else: print "error: ",error_no,error_str return True else: return True
def test_listen(self, llc, ldl, dlc, bind): with pytest.raises(nfc.llcp.Error) as excinfo: llc.listen(object(), 0) assert excinfo.value.errno == errno.ENOTSOCK with pytest.raises(nfc.llcp.Error) as excinfo: llc.listen(ldl, 0) assert excinfo.value.errno == errno.EOPNOTSUPP with pytest.raises(TypeError) as excinfo: llc.listen(dlc, 0.1) assert str(excinfo.value) == "backlog must be int type" with pytest.raises(ValueError) as excinfo: llc.listen(dlc, -1) assert str(excinfo.value) == "backlog can not be negative" if bind: llc.bind(dlc) llc.listen(dlc, 0) assert dlc.state.LISTEN is True
def test_accept_connect(self, llc, ldl, dlc, peer_miu, send_miu): with pytest.raises(nfc.llcp.Error) as excinfo: llc.accept(object()) assert excinfo.value.errno == errno.ENOTSOCK with pytest.raises(nfc.llcp.Error) as excinfo: llc.accept(ldl) assert excinfo.value.errno == errno.EOPNOTSUPP with pytest.raises(nfc.llcp.Error) as excinfo: llc.accept(dlc) assert excinfo.value.errno == errno.EINVAL connect_pdu = nfc.llcp.pdu.Connect(4, 32, peer_miu) threading.Timer(0.01, llc.dispatch, (connect_pdu,)).start() llc.bind(dlc, b'urn:nfc:sn:snep') llc.listen(dlc, 0) sock = llc.accept(dlc) assert isinstance(sock, nfc.llcp.tco.DataLinkConnection) assert llc.getsockopt(sock, nfc.llcp.SO_SNDMIU) == send_miu assert llc.getpeername(sock) == 32 assert llc.getsockname(sock) == 4
def test_copystat_handles_harmless_chflags_errors(self): tmpdir = self.mkdtemp() file1 = os.path.join(tmpdir, 'file1') file2 = os.path.join(tmpdir, 'file2') self.write_file(file1, 'xxx') self.write_file(file2, 'xxx') def make_chflags_raiser(err): ex = OSError() def _chflags_raiser(path, flags): ex.errno = err raise ex return _chflags_raiser old_chflags = os.chflags try: for err in errno.EOPNOTSUPP, errno.ENOTSUP: os.chflags = make_chflags_raiser(err) shutil.copystat(file1, file2) # assert others errors break it os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) self.assertRaises(OSError, shutil.copystat, file1, file2) finally: os.chflags = old_chflags
def _test_chflags_regular_file(self, chflags_func, target_file): st = os.stat(target_file) self.assertTrue(hasattr(st, 'st_flags')) # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. try: chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE) except OSError as err: if err.errno != errno.EOPNOTSUPP: raise msg = 'chflag UF_IMMUTABLE not supported by underlying fs' self.skipTest(msg) try: new_st = os.stat(target_file) self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) try: fd = open(target_file, 'w+') except IOError as e: self.assertEqual(e.errno, errno.EPERM) finally: posix.chflags(target_file, st.st_flags)
def test_copystat_handles_harmless_chflags_errors(self): tmpdir = self.mkdtemp() file1 = os.path.join(tmpdir, 'file1') file2 = os.path.join(tmpdir, 'file2') write_file(file1, 'xxx') write_file(file2, 'xxx') def make_chflags_raiser(err): ex = OSError() def _chflags_raiser(path, flags, *, follow_symlinks=True): ex.errno = err raise ex return _chflags_raiser old_chflags = os.chflags try: for err in errno.EOPNOTSUPP, errno.ENOTSUP: os.chflags = make_chflags_raiser(err) shutil.copystat(file1, file2) # assert others errors break it os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) self.assertRaises(OSError, shutil.copystat, file1, file2) finally: os.chflags = old_chflags
def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): st = os.stat(target_file) self.assertTrue(hasattr(st, 'st_flags')) # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. flags = st.st_flags | stat.UF_IMMUTABLE try: chflags_func(target_file, flags, **kwargs) except OSError as err: if err.errno != errno.EOPNOTSUPP: raise msg = 'chflag UF_IMMUTABLE not supported by underlying fs' self.skipTest(msg) try: new_st = os.stat(target_file) self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) try: fd = open(target_file, 'w+') except IOError as e: self.assertEqual(e.errno, errno.EPERM) finally: posix.chflags(target_file, st.st_flags)
def queryWirelessDevice(self,iface): try: from pythonwifi.iwlibs import Wireless import errno except ImportError: return False else: try: ifobj = Wireless(iface) # a Wireless NIC Object wlanresponse = ifobj.getAPaddr() except IOError, (error_no, error_str): if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM): return False else: print "[AdapterSetupConfiguration] error: ",error_no,error_str return True else: return True
def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): st = os.stat(target_file) self.assertTrue(hasattr(st, 'st_flags')) # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. flags = st.st_flags | stat.UF_IMMUTABLE try: chflags_func(target_file, flags, **kwargs) except OSError as err: if err.errno != errno.EOPNOTSUPP: raise msg = 'chflag UF_IMMUTABLE not supported by underlying fs' self.skipTest(msg) try: new_st = os.stat(target_file) self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) try: fd = open(target_file, 'w+') except OSError as e: self.assertEqual(e.errno, errno.EPERM) finally: posix.chflags(target_file, st.st_flags)
def set_hardlinks(self, testdir): """Set self.hardlinks to true iff hard linked files can be made""" hl_source = testdir.append("hardlinked_file1") hl_dir = testdir.append("hl") hl_dir.mkdir() hl_dest = hl_dir.append("hardlinked_file2") hl_source.touch() try: hl_dest.hardlink(hl_source.path) if hl_source.getinode() != hl_dest.getinode(): raise IOError(errno.EOPNOTSUPP, "Hard links don't compare") except (IOError, OSError, AttributeError): if Globals.preserve_hardlinks != 0: log.Log("Warning: hard linking not supported by filesystem " "at %s" % (self.root_rp.path,), 3) self.hardlinks = None else: self.hardlinks = 1
def clear_rp(self, rp): """Delete all the extended attributes in rpath""" try: for name in rp.conn.xattr.listxattr(encode(rp.path), rp.issym()): try: rp.conn.xattr.removexattr(encode(rp.path), name, rp.issym()) except IOError, exc: # SELinux attributes cannot be removed, and we don't want # to bail out or be too noisy at low log levels. if exc[0] == errno.EACCES: log.Log("Warning: unable to remove xattr %s from %s" % (name, repr(rp.path)), 7) continue else: raise except IOError, exc: if exc[0] == errno.EOPNOTSUPP or exc[0] == errno.EPERM: return # if not supported, consider empty elif exc[0] == errno.ENOENT: # path is bad log.Log("Warning: unable to clear xattrs on %s: %s" % (repr(rp.path), exc), 3) return else: raise
def write_to_rp(self, rp): """Write extended attributes to rpath rp""" self.clear_rp(rp) for (name, value) in self.attr_dict.iteritems(): try: rp.conn.xattr.setxattr(encode(rp.path), name, value, 0, rp.issym()) except IOError, exc: # Mac and Linux attributes have different namespaces, so # fail gracefully if can't call setxattr if exc[0] in (errno.EOPNOTSUPP, errno.EPERM, errno.EACCES, errno.ENOENT, errno.EINVAL): log.Log("Warning: unable to write xattr %s to %s" % (name, repr(rp.path)), 6) continue else: raise
def set_rp_acl(rp, entry_list = None, default_entry_list = None, map_names = 1): """Set given rp with ACL that acl_text defines. rp should be local""" assert rp.conn is Globals.local_connection if entry_list: acl = list_to_acl(entry_list, map_names) else: acl = posix1e.ACL() try: acl.applyto(encode(rp.path)) except IOError, exc: if exc[0] == errno.EOPNOTSUPP: log.Log("Warning: unable to set ACL on %s: %s" % (repr(rp.path), exc), 4) return else: raise if rp.isdir(): if default_entry_list: def_acl = list_to_acl(default_entry_list, map_names) else: def_acl = posix1e.ACL() def_acl.applyto(encode(rp.path), posix1e.ACL_TYPE_DEFAULT)
def do_fallocate(filename: str, size: int): log.debug('Fallocating %s for size %d.', filename, size) fd = os.open(filename, os.O_RDWR) try: # os.posix_fallocate: # 1. does not support FALLOC_FL_KEEP_SIZE # 2. has bug: http://bugs.python.org/issue31106 # 3. will emulate instead of raising of error if operation is not # supported (see libc docs on posix_fallocate) fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, size) except OSError as err: if err.errno != errno.EOPNOTSUPP: raise log.warning('Fallocate is not supported on your FS. Skipped.') finally: os.close(fd)
def copystat(src, dst): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" st = os.stat(src) mode = stat.S_IMODE(st.st_mode) if hasattr(os, 'utime'): os.utime(dst, (st.st_atime, st.st_mtime)) if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) except OSError as why: if (not hasattr(errno, 'EOPNOTSUPP') or why.errno != errno.EOPNOTSUPP): raise
def copystat(src, dst): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" st = os.stat(src) mode = stat.S_IMODE(st.st_mode) if hasattr(os, 'utime'): os.utime(dst, (st.st_atime, st.st_mtime)) if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) except OSError, why: if (not hasattr(errno, 'EOPNOTSUPP') or why.errno != errno.EOPNOTSUPP): raise
def bind(self, *pos, **kw): """ Implements proxy connection for UDP sockets, which happens during the bind() phase. """ proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy if not proxy_type or self.type != socket.SOCK_DGRAM: return _orig_socket.bind(self, *pos, **kw) if self._proxyconn: raise socket.error(EINVAL, "Socket already bound to an address") if proxy_type != SOCKS5: msg = "UDP only supported by SOCKS5 proxy type" raise socket.error(EOPNOTSUPP, msg) super(socksocket, self).bind(*pos, **kw) # Need to specify actual local port because # some relays drop packets if a port of zero is specified. # Avoid specifying host address in case of NAT though. _, port = self.getsockname() dst = ("0", port) self._proxyconn = _orig_socket() proxy = self._proxy_addr() self._proxyconn.connect(proxy) UDP_ASSOCIATE = b"\x03" _, relay = self._SOCKS5_request(self._proxyconn, UDP_ASSOCIATE, dst) # The relay is most likely on the same host as the SOCKS proxy, # but some proxies return a private IP address (10.x.y.z) host, _ = proxy _, port = relay super(socksocket, self).connect((host, port)) super(socksocket, self).settimeout(self._timeout) self.proxy_sockname = ("0.0.0.0", 0) # Unknown
def bind(self, *pos, **kw): """ Implements proxy connection for UDP sockets, which happens during the bind() phase. """ proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy if not proxy_type or self.type != socket.SOCK_DGRAM: return _orig_socket.bind(self, *pos, **kw) if self._proxyconn: raise socket.error(EINVAL, "Socket already bound to an address") if proxy_type != SOCKS5: msg = "UDP only supported by SOCKS5 proxy type" raise socket.error(EOPNOTSUPP, msg) _BaseSocket.bind(self, *pos, **kw) # Need to specify actual local port because # some relays drop packets if a port of zero is specified. # Avoid specifying host address in case of NAT though. _, port = self.getsockname() dst = ("0", port) self._proxyconn = _orig_socket() proxy = self._proxy_addr() self._proxyconn.connect(proxy) u_d_p__a_s_s_o_c_i_a_t_e = b"\x03" _, relay = self._socks5_request(self._proxyconn, u_d_p__a_s_s_o_c_i_a_t_e, dst) # The relay is most likely on the same host as the SOCKS proxy, # but some proxies return a private IP address (10.x.y.z) host, _ = proxy _, port = relay _BaseSocket.connect(self, (host, port)) self.proxy_sockname = ("0.0.0.0", 0) # Unknown
def bind(self, *pos, **kw): """ Implements proxy connection for UDP sockets, which happens during the bind() phase. """ proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy if not proxy_type or self.type != socket.SOCK_DGRAM: return _orig_socket.bind(self, *pos, **kw) if self._proxyconn: raise socket.error(EINVAL, "Socket already bound to an address") if proxy_type != SOCKS5: msg = "UDP only supported by SOCKS5 proxy type" raise socket.error(EOPNOTSUPP, msg) _BaseSocket.bind(self, *pos, **kw) # Need to specify actual local port because # some relays drop packets if a port of zero is specified. # Avoid specifying host address in case of NAT though. _, port = self.getsockname() dst = ("0", port) self._proxyconn = _orig_socket() proxy = self._proxy_addr() self._proxyconn.connect(proxy) UDP_ASSOCIATE = b"\x03" _, relay = self._SOCKS5_request(self._proxyconn, UDP_ASSOCIATE, dst) # The relay is most likely on the same host as the SOCKS proxy, # but some proxies return a private IP address (10.x.y.z) host, _ = proxy _, port = relay _BaseSocket.connect(self, (host, port)) self.proxy_sockname = ("0.0.0.0", 0) # Unknown