我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用socket.SocketType()。
def get_arwsocks(vals): # asocks is all extant sockets socknames = ['cmdsock', 'stsock'] asocks = [ s for s in [ vals.get(n) for n in socknames ] if s is not None ] + \ [ info[1] for info in vals.setdefault('runinfo', []) ] # rsocks is all objects that we could select upon rsocks = [ s for s in asocks if isinstance(s, socket.SocketType) or isinstance(s, SSL.Connection) or (isinstance(s, SocketNB) and s.sock is not None) ] # wsocks is all rsocks that indicate they want to be written wsocks = [ s for s in asocks if isinstance(s, SocketNB) and (s.ssl_write or s.want_write) ] return (asocks, rsocks, wsocks) ### # make command string ###
def select_recv(conn, buff_size, timeout=None): """add timeout for socket.recv() :type conn: socket.SocketType :type buff_size: int :type timeout: float :rtype: Union[bytes, None] """ rlist, _, _ = select.select([conn], [], [], timeout) if not rlist: # timeout raise RuntimeError("recv timeout") buff = conn.recv(buff_size) if not buff: raise RuntimeError("received zero bytes, socket was closed") return buff
def _rd_shutdown(self, conn, once=False): """action when connection should be read-shutdown :type conn: socket.SocketType """ if conn in self.conn_rd: self.conn_rd.remove(conn) try: conn.shutdown(socket.SHUT_RD) except: pass if not once and conn in self.map: # use the `once` param to avoid infinite loop # if a socket is rd_shutdowned, then it's # pair should be wr_shutdown. self._wr_shutdown(self.map[conn], True) if self.map.get(conn) not in self.conn_rd: # if both two connection pair was rd-shutdowned, # this pair sockets are regarded to be completed # so we gonna close them self._terminate(conn)
def add_conn_pair(self, conn1, conn2, callback=None): """ transfer anything between two sockets :type conn1: socket.SocketType :type conn2: socket.SocketType :param callback: callback in connection finish :type callback: Callable """ # mark as readable self.conn_rd.add(conn1) self.conn_rd.add(conn2) # record sockets pairs self.map[conn1] = conn2 self.map[conn2] = conn1 # record callback if callback is not None: self.callbacks[conn1] = callback
def patch_socket(): socket.socket = MockSocket socket.socket = socket.__dict__['socket'] = MockSocket socket._socketobject = socket.__dict__['_socketobject'] = MockSocket socket.SocketType = socket.__dict__['SocketType'] = MockSocket socket.create_connection = socket.__dict__['create_connection'] = create_connection socket.getaddrinfo = socket.__dict__['getaddrinfo'] = getaddrinfo socket.gethostname = socket.__dict__['gethostname'] = lambda: 'localhost' socket.gethostbyname = socket.__dict__['gethostbyname'] = lambda host: '127.0.0.1' socket.inet_aton = socket.__dict__['inet_aton'] = lambda host: '127.0.0.1'
def add_conn_pair(self, conn1, conn2,tmp=None, callback=None): """ transfer anything between two sockets :type conn1: socket.SocketType :type conn2: socket.SocketType :param callback: callback in connection finish :type callback: Callable """ # mark as readable self.conn_rd.add(conn1) self.conn_rd.add(conn2) # record sockets pairs self.map[conn1] = conn2 self.map[conn2] = conn1 # record callback if callback is not None: self.callbacks[conn1] = callback if tmp is not None: conn2.send(tmp) logging.info("tmp send:{}".format(len(tmp)))
def _wr_shutdown(self, conn, once=False): """action when connection should be write-shutdown :type conn: socket.SocketType """ try: conn.shutdown(socket.SHUT_WR) except: pass if not once and conn in self.map: # use the `once` param to avoid infinite loop # pair should be rd_shutdown. # if a socket is wr_shutdowned, then it's self._rd_shutdown(self.map[conn], True)
def _terminate(self, conn): """terminate a sockets pair (two socket) :type conn: socket.SocketType :param conn: any one of the sockets pair """ try_close(conn) # close the first socket server_pool.ServerPool.bridgeRemove += 1 # ------ close and clean the mapped socket, if exist ------ if conn in self.map: _mapped_conn = self.map[conn] try_close(_mapped_conn) if _mapped_conn in self.map: del self.map[_mapped_conn] del self.map[conn] # clean the first socket else: _mapped_conn = None # just a fallback # ------ callback -------- # because we are not sure which socket are assigned to callback, # so we should try both if conn in self.callbacks: try: self.callbacks[conn]() except Exception as e: log.error("traceback error: {}".format(e)) log.debug(traceback.format_exc()) del self.callbacks[conn] elif _mapped_conn and _mapped_conn in self.callbacks: try: self.callbacks[_mapped_conn]() except Exception as e: log.error("traceback error: {}".format(e)) log.debug(traceback.format_exc()) del self.callbacks[_mapped_conn]
def _terminate(self, conn): """terminate a sockets pair (two socket) :type conn: socket.SocketType :param conn: any one of the sockets pair """ try_close(conn) # close the first socket # ------ close and clean the mapped socket, if exist ------ if conn in self.map: _mapped_conn = self.map[conn] try_close(_mapped_conn) if _mapped_conn in self.map: del self.map[_mapped_conn] del self.map[conn] # clean the first socket else: _mapped_conn = None # just a fallback # ------ callback -------- # because we are not sure which socket are assigned to callback, # so we should try both if conn in self.callbacks: try: self.callbacks[conn]() except Exception as e: log.error("traceback error: {}".format(e)) log.debug(traceback.format_exc()) del self.callbacks[conn] elif _mapped_conn and _mapped_conn in self.callbacks: try: self.callbacks[_mapped_conn]() except Exception as e: log.error("traceback error: {}".format(e)) log.debug(traceback.format_exc()) del self.callbacks[_mapped_conn]
def test_SocketType_is_socketobject(self): import _socket self.assertTrue(socket.SocketType is _socket.socket) s = socket.socket() self.assertIsInstance(s, socket.SocketType) s.close()
def disable(cls): cls._is_enabled = False socket.socket = old_socket socket.SocketType = old_SocketType socket._socketobject = old_socket socket.create_connection = old_create_connection socket.gethostname = old_gethostname socket.gethostbyname = old_gethostbyname socket.getaddrinfo = old_getaddrinfo socket.__dict__['socket'] = old_socket socket.__dict__['_socketobject'] = old_socket socket.__dict__['SocketType'] = old_SocketType socket.__dict__['create_connection'] = old_create_connection socket.__dict__['gethostname'] = old_gethostname socket.__dict__['gethostbyname'] = old_gethostbyname socket.__dict__['getaddrinfo'] = old_getaddrinfo if socks: socks.socksocket = old_socksocket socks.__dict__['socksocket'] = old_socksocket if ssl: ssl.wrap_socket = old_ssl_wrap_socket ssl.SSLSocket = old_sslsocket ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket ssl.__dict__['SSLSocket'] = old_sslsocket if not PY3: ssl.sslwrap_simple = old_sslwrap_simple ssl.__dict__['sslwrap_simple'] = old_sslwrap_simple if pyopenssl_override: # Replace PyOpenSSL Monkeypatching inject_into_urllib3()
def test_socktype_bad_python_version_regression(): """ Some versions of python accidentally internally shadowed the SockType variable, so it was no longer the socket object but and int Enum representing the socket type e.g. AF_INET. Make sure we don't patch SockType in these cases https://bugs.python.org/issue20386 """ import socket someObject = object() with patch('socket.SocketType', someObject): HTTPretty.enable() expect(socket.SocketType).to.equal(someObject) HTTPretty.disable()
def test_socktype_good_python_version(): import socket with patch('socket.SocketType', socket.socket): HTTPretty.enable() expect(socket.SocketType).to.equal(socket.socket) HTTPretty.disable()
def enable(cls): cls._is_enabled = True # Some versions of python internally shadowed the # SocketType variable incorrectly https://bugs.python.org/issue20386 bad_socket_shadow = (socket.socket != socket.SocketType) socket.socket = fakesock.socket socket._socketobject = fakesock.socket if not bad_socket_shadow: socket.SocketType = fakesock.socket socket.create_connection = create_fake_connection socket.gethostname = fake_gethostname socket.gethostbyname = fake_gethostbyname socket.getaddrinfo = fake_getaddrinfo socket.__dict__['socket'] = fakesock.socket socket.__dict__['_socketobject'] = fakesock.socket if not bad_socket_shadow: socket.__dict__['SocketType'] = fakesock.socket socket.__dict__['create_connection'] = create_fake_connection socket.__dict__['gethostname'] = fake_gethostname socket.__dict__['gethostbyname'] = fake_gethostbyname socket.__dict__['getaddrinfo'] = fake_getaddrinfo if socks: socks.socksocket = fakesock.socket socks.__dict__['socksocket'] = fakesock.socket if ssl: ssl.wrap_socket = fake_wrap_socket ssl.SSLSocket = FakeSSLSocket ssl.__dict__['wrap_socket'] = fake_wrap_socket ssl.__dict__['SSLSocket'] = FakeSSLSocket if not PY3: ssl.sslwrap_simple = fake_wrap_socket ssl.__dict__['sslwrap_simple'] = fake_wrap_socket if pyopenssl_override: # Remove PyOpenSSL monkeypatch - use the default implementation extract_from_urllib3()