我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.ssl()。
def __init__(self, conn, host, port): SocketTransport.__init__(self, conn, host, port) # Bug (QPID-4337): this is the "old" version of python SSL. # The private key is required. If a certificate is given, but no # keyfile, assume the key is contained in the certificate ssl_keyfile = conn.ssl_keyfile ssl_certfile = conn.ssl_certfile if ssl_certfile and not ssl_keyfile: ssl_keyfile = ssl_certfile # this version of SSL does NOT perform certificate validation. If the # connection has been configured with CA certs (via ssl_trustfile), then # the application expects the certificate to be validated against the # supplied CA certs. Since this version cannot validate, the peer cannot # be trusted. if conn.ssl_trustfile: raise socket.error("This version of Python does not support verification of the peer's certificate.") self.ssl = ssl(self.socket, keyfile=ssl_keyfile, certfile=ssl_certfile) self.socket.setblocking(1)
def _ssl_wrap_socket(sock, key_file, cert_file, disable_validation, ca_certs, ssl_version, hostname): if disable_validation: cert_reqs = ssl.CERT_NONE else: cert_reqs = ssl.CERT_REQUIRED if ssl_version is None: ssl_version = ssl.PROTOCOL_SSLv23 if hasattr(ssl, 'SSLContext'): # Python 2.7.9 context = ssl.SSLContext(ssl_version) context.verify_mode = cert_reqs context.check_hostname = (cert_reqs != ssl.CERT_NONE) if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) return context.wrap_socket(sock, server_hostname=hostname) else: return ssl.wrap_socket(sock, keyfile=key_file, certfile=cert_file, cert_reqs=cert_reqs, ca_certs=ca_certs, ssl_version=ssl_version)
def test_ssl_close(self): def serve(listener): sock, addr = listener.accept() sock.recv(8192) try: self.assertEqual(b'', sock.recv(8192)) except greenio.SSL.ZeroReturnError: pass sock = listen_ssl_socket() server_coro = eventlet.spawn(serve, sock) raw_client = eventlet.connect(sock.getsockname()) client = ssl.wrap_socket(raw_client) client.sendall(b'X') greenio.shutdown_safe(client) client.close() server_coro.wait()
def test_ssl_unwrap(self): def serve(): sock, addr = listener.accept() self.assertEqual(sock.recv(6), b'before') sock_ssl = ssl.wrap_socket(sock, tests.private_key_file, tests.certificate_file, server_side=True) sock_ssl.do_handshake() self.assertEqual(sock_ssl.recv(6), b'during') sock2 = sock_ssl.unwrap() self.assertEqual(sock2.recv(5), b'after') sock2.close() listener = eventlet.listen(('127.0.0.1', 0)) server_coro = eventlet.spawn(serve) client = eventlet.connect(listener.getsockname()) client.sendall(b'before') client_ssl = ssl.wrap_socket(client) client_ssl.do_handshake() client_ssl.sendall(b'during') client2 = client_ssl.unwrap() client2.sendall(b'after') server_coro.wait()
def DefaultOptions(): return { 'host' : 'localhost', # Host to attack 'port' : 80, # Port to connect to 'ssl' : False, # Use SSL connections 'attacklimit' : 500, # Total number of times to attack (0 for unlimited) 'connectionlimit' : 500, # Total number of concurrent connections (0 for unlimited) 'threadlimit' : 50, # Total number of threads (0 for unlimited) 'connectionspeed' : 1, # Connection speed in bytes/second 'timebetweenthreads' : 1, # Time delay between starting threads 'timebetweenconnections' : 1, # Time delay between starting connections 'quitimmediately' : False, # Close connections immediately after completing request 'socksversion' : '', # Enable SOCKS proxy, set to SOCKS4, SOCKS5, or HTTP 'sockshost' : '', # SOCKS host 'socksport' : 0, # SOCKS port 'socksuser' : '', # SOCKS username 'sockspass' : '', # SOCKS password 'request' : '', # The main body of the attack }
def _ssl_wrap_socket(sock, key_file, cert_file, disable_validation, ca_certs): if disable_validation: cert_reqs = ssl.CERT_NONE else: cert_reqs = ssl.CERT_REQUIRED # We should be specifying SSL version 3 or TLS v1, but the ssl module # doesn't expose the necessary knobs. So we need to go with the default # of SSLv23. return ssl.wrap_socket(sock, keyfile=key_file, certfile=cert_file, cert_reqs=cert_reqs, ca_certs=ca_certs)
def _ssl_wrap_socket(sock, key_file, cert_file, disable_validation, ca_certs): if not disable_validation: raise CertificateValidationUnsupported( "SSL certificate validation is not supported without " "the ssl module installed. To avoid this error, install " "the ssl module, or explicity disable validation.") ssl_sock = socket.ssl(sock, key_file, cert_file) return httplib.FakeSocket(sock, ssl_sock)
def ssl_wrap_socket(self): # Allow sending of keep-alive messages - seems to prevent some servers # from closing SSL, leading to deadlocks. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) try: import ssl if self.ca_certs is not None: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE if self.ssl_version == "tls1": ssl_version = ssl.PROTOCOL_TLSv1 elif self.ssl_version == "ssl2": ssl_version = ssl.PROTOCOL_SSLv2 elif self.ssl_version == "ssl3": ssl_version = ssl.PROTOCOL_SSLv3 elif self.ssl_version == "ssl23" or self.ssl_version is None: ssl_version = ssl.PROTOCOL_SSLv23 else: raise socket.sslerror("Invalid SSL version requested: %s", self.ssl_version) self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs, ssl_version=ssl_version) ssl_exc = ssl.SSLError self.read_fd = self.sock.fileno() except ImportError: # No ssl module, and socket.ssl has no fileno(), and does not allow certificate verification raise socket.sslerror("imaplib2 SSL mode does not work without ssl module") if self.cert_verify_cb is not None: cert_err = self.cert_verify_cb(self.sock.getpeercert(), self.host) if cert_err: raise ssl_exc(cert_err)
def open(self, host=None, port=None): """open(host=None, port=None) Setup secure connection to remote server on "host:port" (default: localhost:standard IMAP4 SSL port). This connection will be used by the routines: read, send, shutdown, socket, ssl.""" self.host = self._choose_nonull_or_dflt('', host) self.port = self._choose_nonull_or_dflt(IMAP4_SSL_PORT, port) self.sock = self.open_socket() self.ssl_wrap_socket()
def ssl(self): """ssl = ssl() Return ssl instance used to communicate with the IMAP4 server.""" return self.sock
def connect(self): ProxyHTTPConnection.connect(self) # Make the sock ssl-aware ssl = socket.ssl(self.sock, self.key_file, self.cert_file) self.sock = httplib.FakeSocket(self.sock, ssl)
def connect(self): sock = TimeoutSocket(self.timeout) sock.connect((self.host, self.port)) realsock = getattr(sock.sock, '_sock', sock.sock) ssl = socket.ssl(realsock, self.key_file, self.cert_file) self.sock = httplib.FakeSocket(sock, ssl)
def _ssl_wrap_socket_unsupported(sock, key_file, cert_file, disable_validation, ca_certs, ssl_version, hostname): if not disable_validation: raise CertificateValidationUnsupported( "SSL certificate validation is not supported without " "the ssl module installed. To avoid this error, install " "the ssl module, or explicity disable validation.") ssl_sock = socket.ssl(sock, key_file, cert_file) return httplib.FakeSocket(sock, ssl_sock)
def request(self, operation, url, data=None, headers=None, url_params=None): if isinstance(url, (str, unicode)): if url.startswith('http:') and self.ssl: # Force all requests to be https if self.ssl is True. url = atom.url.parse_url('https:' + url[5:]) elif not url.startswith('http') and self.ssl: url = atom.url.parse_url('https://%s%s' % (self.server, url)) elif not url.startswith('http'): url = atom.url.parse_url('http://%s%s' % (self.server, url)) else: url = atom.url.parse_url(url) if url_params: for name, value in url_params.iteritems(): url.params[name] = value all_headers = self.additional_headers.copy() if headers: all_headers.update(headers) if isinstance(data, types.StringTypes): data = data.encode('utf-8') # If the list of headers does not include a Content-Length, attempt to # calculate it based on the data object. if data and 'Content-Length' not in all_headers: content_length = CalculateDataLength(data) if content_length: all_headers['Content-Length'] = str(content_length) if 'GData-Version' not in all_headers: all_headers['GData-Version'] = '2.0' # Find an Authorization token for this URL if one is available. if self.override_token: auth_token = self.override_token else: auth_token = self.token_store.find_token(url) return auth_token.perform_request(self.http_client, operation, url, data=data, headers=all_headers)
def ProcessUrl(service, url, for_proxy=False): """Processes a passed URL. If the URL does not begin with https?, then the default value for server is used This method is deprecated, use atom.url.parse_url instead. """ if not isinstance(url, atom.url.Url): url = atom.url.parse_url(url) server = url.host ssl = False port = 80 if not server: if hasattr(service, 'server'): server = service.server else: server = service if not url.protocol and hasattr(service, 'ssl'): ssl = service.ssl if hasattr(service, 'port'): port = service.port else: if url.protocol == 'https': ssl = True elif url.protocol == 'http': ssl = False if url.port: port = int(url.port) elif port == 80 and ssl: port = 443 return (server, port, ssl, url.get_request_uri())
def _ssl_wrap_socket(sock, key_file, cert_file, disable_validation, ca_certs): disable_validation = True # Force to disable validation if disable_validation: cert_reqs = ssl.CERT_NONE ca_certs = None else: cert_reqs = ssl.CERT_REQUIRED # We should be specifying SSL version 3 or TLS v1, but the ssl module # doesn't expose the necessary knobs. So we need to go with the default # of SSLv23. return ssl.wrap_socket(sock, keyfile=key_file, certfile=cert_file, cert_reqs=cert_reqs, ca_certs=ca_certs)
def connect(self): "Connect to a host on a given (SSL) port." if self.proxy_info and self.proxy_info.isgood(): self.sock.setproxy(*self.proxy_info.astuple()) sock.setproxy(*self.proxy_info.astuple()) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.timeout is not None: sock.settimeout(self.timeout) sock.connect((self.host, self.port)) ssl = socket.ssl(sock, self.key_file, self.cert_file) self.sock = httplib.FakeSocket(sock, ssl)