我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用httplib.FORBIDDEN。
def does_bucket_exist(self, bucket_name, config=None): """ Check whether there is a bucket with specific name :param bucket_name: None :type bucket_name: str :return:True or False :rtype: bool """ try: self._send_request(http_methods.HEAD, bucket_name, config=config) return True except BceHttpClientError as e: if isinstance(e.last_error, BceServerError): if e.last_error.status_code == httplib.FORBIDDEN: return True if e.last_error.status_code == httplib.NOT_FOUND: return False raise e
def test_should_include_project_id_in_error_text_when_needed(self): resp = sc_messages.CheckResponse( checkErrors = [ sc_messages.CheckError( code=sc_messages.CheckError.CodeValueValuesEnum.PROJECT_DELETED) ] ) code, got, _ = check_request.convert_response(resp, self.PROJECT_ID) want = u'Project %s has been deleted' % (self.PROJECT_ID,) expect(code).to(equal(httplib.FORBIDDEN)) expect(got).to(equal(want))
def test_should_include_detail_in_error_text_when_needed(self): detail = u'details, details, details' resp = sc_messages.CheckResponse( checkErrors = [ sc_messages.CheckError( code=sc_messages.CheckError.CodeValueValuesEnum.IP_ADDRESS_BLOCKED, detail=detail) ] ) code, got, _ = check_request.convert_response(resp, self.PROJECT_ID) expect(code).to(equal(httplib.FORBIDDEN)) expect(got).to(equal(detail))
def test_should_include_project_id_in_error_text_when_needed(self): resp = sc_messages.AllocateQuotaResponse( allocateErrors = [ sc_messages.QuotaError( code=sc_messages.QuotaError.CodeValueValuesEnum.PROJECT_DELETED) ] ) code, got = quota_request.convert_response(resp, self.PROJECT_ID) want = u'Project %s has been deleted' % (self.PROJECT_ID,) expect(code).to(equal(httplib.FORBIDDEN)) expect(got).to(equal(want))
def test_with_disabled_user(self): data = generate_password_auth_data({ 'name': 'disabled', 'password': self.password, 'domain': {'id': uuid.uuid4().hex}, }) self.authenticate(data, expected_status=httplib.FORBIDDEN)
def test_with_domain_id(self): data = generate_password_auth_data({ 'name': self.username, 'password': self.password, }) resp = self.authenticate(data) token = resp.headers['X-Subject-Token'] data = generate_token_auth_data_with_scope( token_id=token, scope={'domain': {'id': self.project_id}}) resp = self.authenticate(data, httplib.FORBIDDEN)
def test_add_to_cart_fails_csrf(self): """ adding product fails without including the CSRF token to POST request parameters """ quantity = 2 product_url = self.product.get_absolute_url() response = self.client.get(product_url) self.assertEqual(response.status_code, httplib.OK ) # perform the post of adding to the cart postdata = {'product_slug': self.product.slug, 'quantity': quantity } response = self.client.post(product_url, postdata ) # assert forbidden error due to missing CSRF input self.assertEqual(response.status_code, httplib.FORBIDDEN )
def _is_token_expired(i): if i.status_code == httplib.FORBIDDEN and type(i.body) == dict and i.body.get('error') is not None: code = i.body['error'].get('code') return code == NetatmoClient.INVALID_ACCESS_TOKEN or code == NetatmoClient.ACCESS_TOKEN_EXPIRED else: return False
def check_resp_status_and_retry(resp, image_id, url): # Note(Jesse): This branch sorts errors into those that are permanent, # those that are ephemeral, and those that are unexpected. if resp.status in (httplib.BAD_REQUEST, # 400 httplib.UNAUTHORIZED, # 401 httplib.PAYMENT_REQUIRED, # 402 httplib.FORBIDDEN, # 403 httplib.METHOD_NOT_ALLOWED, # 405 httplib.NOT_ACCEPTABLE, # 406 httplib.PROXY_AUTHENTICATION_REQUIRED, # 407 httplib.CONFLICT, # 409 httplib.GONE, # 410 httplib.LENGTH_REQUIRED, # 411 httplib.PRECONDITION_FAILED, # 412 httplib.REQUEST_ENTITY_TOO_LARGE, # 413 httplib.REQUEST_URI_TOO_LONG, # 414 httplib.UNSUPPORTED_MEDIA_TYPE, # 415 httplib.REQUESTED_RANGE_NOT_SATISFIABLE, # 416 httplib.EXPECTATION_FAILED, # 417 httplib.UNPROCESSABLE_ENTITY, # 422 httplib.LOCKED, # 423 httplib.FAILED_DEPENDENCY, # 424 httplib.UPGRADE_REQUIRED, # 426 httplib.NOT_IMPLEMENTED, # 501 httplib.HTTP_VERSION_NOT_SUPPORTED, # 505 httplib.NOT_EXTENDED, # 510 ): raise PluginError("Got Permanent Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) # Nova service would process the exception elif resp.status == httplib.NOT_FOUND: # 404 exc = XenAPI.Failure('ImageNotFound') raise exc # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We # optimistically retry on 500 errors below. elif resp.status in (httplib.REQUEST_TIMEOUT, # 408 httplib.INTERNAL_SERVER_ERROR, # 500 httplib.BAD_GATEWAY, # 502 httplib.SERVICE_UNAVAILABLE, # 503 httplib.GATEWAY_TIMEOUT, # 504 httplib.INSUFFICIENT_STORAGE, # 507 ): raise RetryableError("Got Ephemeral Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) else: # Note(Jesse): Assume unexpected errors are retryable. If you are # seeing this error message, the error should probably be added # to either the ephemeral or permanent error list. raise RetryableError("Got Unexpected Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url))
def check_status(status, expected, path, headers=None, resp_headers=None, body=None, extras=None): """Check HTTP response status is expected. Args: status: HTTP response status. int. expected: a list of expected statuses. A list of ints. path: filename or a path prefix. headers: HTTP request headers. resp_headers: HTTP response headers. body: HTTP response body. extras: extra info to be logged verbatim if error occurs. Raises: AuthorizationError: if authorization failed. NotFoundError: if an object that's expected to exist doesn't. TimeoutError: if HTTP request timed out. ServerError: if server experienced some errors. FatalError: if any other unexpected errors occurred. """ if status in expected: return msg = ('Expect status %r from Google Storage. But got status %d.\n' 'Path: %r.\n' 'Request headers: %r.\n' 'Response headers: %r.\n' 'Body: %r.\n' 'Extra info: %r.\n' % (expected, status, path, headers, resp_headers, body, extras)) if status == httplib.UNAUTHORIZED: raise AuthorizationError(msg) elif status == httplib.FORBIDDEN: raise ForbiddenError(msg) elif status == httplib.NOT_FOUND: raise NotFoundError(msg) elif status == httplib.REQUEST_TIMEOUT: raise TimeoutError(msg) elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE: raise InvalidRange(msg) elif (status == httplib.OK and 308 in expected and httplib.OK not in expected): raise FileClosedError(msg) elif status >= 500: raise ServerError(msg) else: raise FatalError(msg)
def _authenticate_with_kerberos(conn_info, url, agent, gss_client=None): service = '{0}@{1}'.format(conn_info.scheme.upper(), conn_info.hostname) if gss_client is None: gss_client = AuthGSSClient( service, conn_info) base64_client_data = yield gss_client.get_base64_client_data() auth = 'Kerberos {0}'.format(base64_client_data) k_headers = Headers(_CONTENT_TYPE) k_headers.addRawHeader('Authorization', auth) k_headers.addRawHeader('Content-Length', '0') response = yield agent.request('POST', url, k_headers, None) auth_header = response.headers.getRawHeaders('WWW-Authenticate')[0] auth_details = get_auth_details(auth_header) if response.code == httplib.UNAUTHORIZED: try: if auth_details: gss_client._step(auth_details) except kerberos.GSSError as e: msg = "HTTP Unauthorized received on kerberos initialization. "\ "Kerberos error code {0}: {1}.".format(e.args[1][1], e.args[1][0]) raise Exception(msg) raise UnauthorizedError( "HTTP Unauthorized received on initial kerberos request. Check username and password") elif response.code == httplib.FORBIDDEN: raise ForbiddenError( "Forbidden. Check WinRM port and version.") elif response.code != httplib.OK: proto = _StringProtocol() response.deliverBody(proto) xml_str = yield proto.d xml_str = gss_client.decrypt_body(xml_str) raise Exception( "status code {0} received on initial kerberos request {1}" .format(response.code, xml_str)) if not auth_details: raise Exception( 'negotiate not found in WWW-Authenticate header: {0}' .format(auth_header)) k_username = gss_client.get_username(auth_details) log.debug('kerberos auth successful for user: {0} / {1} ' .format(conn_info.username, k_username)) defer.returnValue(gss_client)