我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用httplib.BAD_REQUEST。
def upload_file(): if 'file' in request.files: try: filename = uploaded_photos.save(request.files.get('file')) filename = os.path.join(UPLOADED_PHOTOS_DEST, filename) output_file = trumpify(filename) except UploadNotAllowed: abort(httplib.BAD_REQUEST) return url_for('view_raw', hash=output_file.replace('.png', '')) elif 'fb_url' in request.form: url = request.form.get('fb_url') data = urllib.urlopen(url).read() filename = os.path.join(UPLOADED_PHOTOS_DEST, str(uuid.uuid4())) with open(filename, 'w') as f: f.write(data) output_file = trumpify(filename) return url_for('view_raw', hash=output_file.replace('.png', '')) abort(httplib.BAD_REQUEST)
def test_validate_image_status_before_upload_unexpected_resp_v1(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' parts = urlparse(fake_url) path = parts[2] fake_image_id = path.split('/')[-1] mock_head_resp = mock.Mock() mock_head_resp.status = httplib.BAD_REQUEST mock_head_resp.read.return_value = 'fakeData' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.mock_patch_object(self.glance, 'check_resp_status_and_retry') self.glance.validate_image_status_before_upload_v1( mock_conn, fake_url, extra_headers=mock.Mock()) self.assertEqual(mock_head_resp.read.call_count, 2) self.glance.check_resp_status_and_retry.assert_called_with( mock_head_resp, fake_image_id, fake_url) mock_conn.request.assert_called_once()
def test_with_no_scope(self): data = generate_password_auth_data({ 'id': self.username, 'password': self.password, }) resp = self.authenticate(data) token = resp.headers['X-Subject-Token'] data = { 'auth': { 'identity': { 'methods': ['token'], 'token': { 'id': token, }, }, }, } self.authenticate(data, httplib.BAD_REQUEST)
def do_GET(self): self.path = self._build_url(self.path, self.headers['Host']) scheme, netloc, path, parameters = self._analyze_url(self.path) queries = self.get_queries(parameters) for query_string in queries: query_result = self._check_query(query_string) if not query_result.is_ok(): logging.warning("Query blocked: %s. Reason: %s", query_string, query_result.value) self.send_error(httplib.BAD_REQUEST, query_result.value) return logging.debug("Query ok: %s", query_string) # TODO: Is this needed? # self.headers['Host'] = self.backend_netloc self.filter_headers(self.headers) self._handle_request(scheme, self.backend_netloc, path, self.headers)
def wopiRenameFile(fileid, reqheaders, acctok): '''Implements the RenameFile WOPI call. This seems to be broken in Office Online, thus it is disabled for the time being.''' targetName = reqheaders['X-WOPI-RequestedName'] lock = reqheaders['X-WOPI-Lock'] retrievedLock = _retrieveWopiLock(fileid, 'RENAMEFILE', lock, acctok) if retrievedLock != None and not _compareWopiLocks(retrievedLock, lock): return _makeConflictResponse('RENAMEFILE', retrievedLock, lock, '', acctok['filename']) try: # the destination name comes without base path and without extension targetName = os.path.dirname(acctok['filename']) + '/' + targetName + os.path.splitext(acctok['filename'])[1] Wopi.log.info('msg="RenameFile" user="%s:%s" filename="%s" token="%s" targetname="%s"' % \ (acctok['ruid'], acctok['rgid'], acctok['filename'], flask.request.args['access_token'][-20:], targetName)) xrdcl.renamefile(acctok['filename'], targetName, acctok['ruid'], acctok['rgid']) xrdcl.renamefile(_getLockName(acctok['filename']), _getLockName(targetName), Wopi.lockruid, Wopi.lockrgid) # prepare and send the response as JSON renamemd = {} renamemd['Name'] = reqheaders['X-WOPI-RequestedName'] return flask.Response(json.dumps(renamemd), mimetype='application/json') except IOError, e: # assume the rename failed because of the destination filename and report the error Wopi.log.info('msg="RenameFile" token="%s" error="%s"' % (flask.request.args['access_token'][-20:], e)) resp = flask.Response() resp.headers['X-WOPI-InvalidFileNameError'] = 'Failed to rename: %s' % e resp.status_code = httplib.BAD_REQUEST return resp
def put_empty(self, token): """Empty put is used to query upload progress. The file must has not finished upload. Args: token: upload token returned by post_start_creation. Returns: last offset uploaded. -1 if none has been uploaded. Raises: ValueError: if token matches no in progress uploads. """ ns = namespace_manager.get_namespace() try: namespace_manager.set_namespace('') gcs_file = _AE_GCSFileInfo_.get_by_key_name(token) if not gcs_file: raise ValueError('Invalid token', httplib.BAD_REQUEST) return gcs_file.next_offset - 1 finally: namespace_manager.set_namespace(ns)
def _GetTokenInfo(access_token): """Return the list of valid scopes for the given token as a list.""" url = _OAUTH2_TOKENINFO_TEMPLATE.format(access_token=access_token) h = httplib2.Http() response, content = h.request(url) if 'status' not in response: raise ValueError('No status in HTTP response') status_code = int(response['status']) if status_code not in [http_client.OK, http_client.BAD_REQUEST]: msg = ( 'Error making HTTP request to <{}>: status <{}>, ' 'content <{}>'.format(url, response['status'], content)) raise ValueError(msg) if status_code == http_client.BAD_REQUEST: return {} return json.loads(_AsText(content))
def test_check_resp_status_and_retry_plugin_error(self): mock_resp_badrequest = mock.Mock() mock_resp_badrequest.status = httplib.BAD_REQUEST self.assertRaises( self.glance.PluginError, self.glance.check_resp_status_and_retry, mock_resp_badrequest, 'fake_image_id', 'fake_url')
def test_validate_image_status_before_upload_unexpected_resp_v2(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' self.mock_patch_object(self.glance, 'check_resp_status_and_retry') mock_head_resp = mock.Mock() mock_head_resp.status = httplib.BAD_REQUEST mock_conn.getresponse.return_value = mock_head_resp expected_wsgi_path = '/fake_path/v2/images/%s' % 'fake_image_id' self.glance.validate_image_status_before_upload_v2( mock_conn, fake_url, mock.Mock(), expected_wsgi_path) mock_conn.request.assert_called_once() mock_conn.getresponse.assert_called_once() mock_head_resp.read.assert_called_once() self.glance.check_resp_status_and_retry.assert_called_once()
def AddFailureResponse(self): """Adds a failed response with no content to the reponse queue.""" self.AddResponse(httplib.BAD_REQUEST)
def LoadEntities(self, iter, loader, key_format=None): """Generates entities and loads them into the datastore. Returns a tuple of HTTP code and string reply. Args: iter: an iterator yielding pairs of a line number and row contents. key_format: a format string to convert a line number into an entity id. If None, then entity ID's are automatically generated. """ entities = [] output = [] for line_num, columns in iter: key_name = None if key_format is not None: key_name = key_format % line_num if columns: try: output.append('\nLoading from line %d...' % line_num) new_entities = loader.CreateEntity(columns, key_name=key_name) if new_entities: entities.extend(new_entities) output.append('done.') except: stacktrace = traceback.format_exc() output.append('error:\n%s' % stacktrace) return (httplib.BAD_REQUEST, ''.join(output)) datastore.Put(entities) return (httplib.OK, ''.join(output))
def Load(self, kind, data): """Parses CSV data, uses a Loader to convert to entities, and stores them. On error, fails fast. Returns a "bad request" HTTP response code and includes the traceback in the output. Args: kind: a string containing the entity kind that this loader handles data: a string containing the CSV data to load Returns: tuple (response code, output) where: response code: integer HTTP response code to return output: string containing the HTTP response body """ data = data.encode('utf-8') Validate(kind, basestring) Validate(data, basestring) output = [] try: loader = Loader.RegisteredLoaders()[kind] except KeyError: output.append('Error: no Loader defined for kind %s.' % kind) return (httplib.BAD_REQUEST, ''.join(output)) buffer = StringIO.StringIO(data) reader = csv.reader(buffer, skipinitialspace=True) try: csv.field_size_limit(800000) except AttributeError: pass return self.LoadEntities(self.IterRows(reader), loader)
def test_with_empty_username(self): data = generate_password_auth_data({ 'name': '', 'password': self.password, 'domain': {'id': self.domain_id}, }) self.authenticate(data, expected_status=httplib.BAD_REQUEST)
def test_with_empty_password(self): data = generate_password_auth_data({ 'name': self.username, 'password': '', 'domain': {'id': self.domain_id}, }) self.authenticate(data, expected_status=httplib.BAD_REQUEST)
def test_with_no_password(self): data = generate_password_auth_data({ 'name': self.username, 'domain': {'id': self.domain_id}, }) self.authenticate(data, expected_status=httplib.BAD_REQUEST)
def test_with_no_username(self): data = generate_password_auth_data({ 'password': self.password, 'domain': {'id': self.domain_id}, }) self.authenticate(data, expected_status=httplib.BAD_REQUEST)
def test_with_no_user(self): data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { }, }, }, } self.authenticate(data, expected_status=httplib.BAD_REQUEST)
def test_with_empty_project_id(self): data = generate_password_auth_data({ 'name': self.username, 'password': self.password, }) resp = self.authenticate(data) token = resp.headers['X-Subject-Token'] data = generate_token_auth_data_with_scope( token_id=token, scope={'project': {'id': ''}}) self.authenticate(data, httplib.BAD_REQUEST)
def test_with_empty_token_id(self): data = generate_token_auth_data_with_scope( token_id='', scope={'project': {'id': self.project_id}}) self.authenticate(data, httplib.BAD_REQUEST)
def _preprocess(method, headers, url): """Unify input. Example: _preprocess('POST', {'Content-Type': 'Foo'}, 'http://localhost:8080/_ah/gcs/b/f?foo=bar') -> 'POST', {'content-type': 'Foo'}, '/b/f', {'foo':'bar'} Args: method: HTTP method used by the request. headers: HTTP request headers in a dict. url: HTTP request url. Returns: method: method in all upper case. headers: headers with keys in all lower case. filename: a google storage filename of form /bucket/filename or a bucket path of form /bucket param_dict: a dict of query parameters. Raises: ValueError: invalid path. """ _, _, path, query, _ = urlparse.urlsplit(url) if not path.startswith(common.LOCAL_GCS_ENDPOINT): raise ValueError('Invalid GCS path: %s' % path, httplib.BAD_REQUEST) filename = path[len(common.LOCAL_GCS_ENDPOINT):] param_dict = urlparse.parse_qs(query, True) for k in param_dict: param_dict[k] = urllib.unquote(param_dict[k][0]) headers = dict((k.lower(), v) for k, v in headers.iteritems()) return method, headers, urllib.unquote(filename), param_dict
def test_dispatch_value_error(self): """Tests that ValueError raised by dispatch stub is handled properly.""" error = ValueError('Invalid Token', httplib.BAD_REQUEST) stub_dispatcher.dispatch(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()).AndRaise(error) self.run_request('GET', {}, '/_ah/some_bucket', '', '', '400 Bad Request', [], 'Invalid Token')
def test_metrics_limit(self): request = { 'start_date': '2017-01-21', 'end_date': '2017-01-29' } with self.assertRaises(HttpException) as cm: self.client.request_json('Metrics', 'POST', request) self.assertEqual(cm.exception.code, httplib.BAD_REQUEST)
def test_metrics_empty_dates(self): request = { 'start_date': '', 'end_date': '' } with self.assertRaises(HttpException) as cm: self.client.request_json('Metrics', 'POST', request) self.assertEqual(cm.exception.code, httplib.BAD_REQUEST)
def test_insert_before_consent_fails(self): measurements_1 = load_measurement_json(self.participant_id) path_1 = 'Participant/%s/PhysicalMeasurements' % self.participant_id self.send_post(path_1, measurements_1, expected_status=httplib.BAD_REQUEST)
def test_update_no_ifmatch_specified(self): response = self.send_post('Participant', self.participant) # Change the provider link for the participant participant_id = response['participantId'] response['providerLink'] = [ self.provider_link_2 ] path = 'Participant/%s' % participant_id self.send_put(path, response, expected_status=httplib.BAD_REQUEST)
def test_update_bad_ifmatch_specified(self): response = self.send_post('Participant', self.participant) # Change the provider link for the participant participant_id = response['participantId'] response['providerLink'] = [ self.provider_link_2 ] path = 'Participant/%s' % participant_id self.send_put(path, response, headers={ 'If-Match': 'Blah' }, expected_status=httplib.BAD_REQUEST)
def test_error_missing_required_fields(self): order_json = load_biobank_order_json(self.participant.participantId) del order_json['identifier'] self.send_post(self.path, order_json, expected_status=httplib.BAD_REQUEST)
def test_no_duplicate_test_within_order(self): order_json = load_biobank_order_json(self.participant.participantId) order_json['samples'].extend(list(order_json['samples'])) self.send_post(self.path, order_json, expected_status=httplib.BAD_REQUEST)
def test_update_before_insert(self): with open(data_path('questionnaire1.json')) as f: questionnaire = json.load(f) self.send_put('Questionnaire/1', questionnaire, expected_status=httplib.BAD_REQUEST)
def test_update_no_ifmatch_specified(self): response = self.insert_questionnaire() with open(data_path('questionnaire2.json')) as f2: questionnaire2 = json.load(f2) self.send_put('Questionnaire/%s' % response['id'], questionnaire2, expected_status=httplib.BAD_REQUEST)
def test_update_invalid_ifmatch_specified(self): response = self.insert_questionnaire() with open(data_path('questionnaire2.json')) as f2: questionnaire2 = json.load(f2) self.send_put('Questionnaire/%s' % response['id'], questionnaire2, expected_status=httplib.BAD_REQUEST, headers={ 'If-Match': 'Blah' })
def check_resp_status_and_retry(resp, image_id, url): # Note(Jesse): This branch sorts errors into those that are permanent, # those that are ephemeral, and those that are unexpected. if resp.status in (httplib.BAD_REQUEST, # 400 httplib.UNAUTHORIZED, # 401 httplib.PAYMENT_REQUIRED, # 402 httplib.FORBIDDEN, # 403 httplib.METHOD_NOT_ALLOWED, # 405 httplib.NOT_ACCEPTABLE, # 406 httplib.PROXY_AUTHENTICATION_REQUIRED, # 407 httplib.CONFLICT, # 409 httplib.GONE, # 410 httplib.LENGTH_REQUIRED, # 411 httplib.PRECONDITION_FAILED, # 412 httplib.REQUEST_ENTITY_TOO_LARGE, # 413 httplib.REQUEST_URI_TOO_LONG, # 414 httplib.UNSUPPORTED_MEDIA_TYPE, # 415 httplib.REQUESTED_RANGE_NOT_SATISFIABLE, # 416 httplib.EXPECTATION_FAILED, # 417 httplib.UNPROCESSABLE_ENTITY, # 422 httplib.LOCKED, # 423 httplib.FAILED_DEPENDENCY, # 424 httplib.UPGRADE_REQUIRED, # 426 httplib.NOT_IMPLEMENTED, # 501 httplib.HTTP_VERSION_NOT_SUPPORTED, # 505 httplib.NOT_EXTENDED, # 510 ): raise PluginError("Got Permanent Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) # Nova service would process the exception elif resp.status == httplib.NOT_FOUND: # 404 exc = XenAPI.Failure('ImageNotFound') raise exc # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We # optimistically retry on 500 errors below. elif resp.status in (httplib.REQUEST_TIMEOUT, # 408 httplib.INTERNAL_SERVER_ERROR, # 500 httplib.BAD_GATEWAY, # 502 httplib.SERVICE_UNAVAILABLE, # 503 httplib.GATEWAY_TIMEOUT, # 504 httplib.INSUFFICIENT_STORAGE, # 507 ): raise RetryableError("Got Ephemeral Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) else: # Note(Jesse): Assume unexpected errors are retryable. If you are # seeing this error message, the error should probably be added # to either the ephemeral or permanent error list. raise RetryableError("Got Unexpected Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url))
def _handle_put(gcs_stub, filename, param_dict, headers, payload): """Handle PUT.""" if _iscopy(headers): return _copy(gcs_stub, filename, headers) token = _get_param('upload_id', param_dict) content_range = _ContentRange(headers) if _is_query_progress(content_range): return _find_progress(gcs_stub, filename, token) if not content_range.value: raise ValueError('Missing header content-range.', httplib.BAD_REQUEST) if (headers.get('x-goog-if-generation-match', None) == '0' and gcs_stub.head_object(filename) is not None): return _FakeUrlFetchResult(httplib.PRECONDITION_FAILED, {}, '') if not token: if content_range.length is None: raise ValueError('Content-Range must have a final length.', httplib.BAD_REQUEST) elif not content_range.no_data and content_range.range[0] != 0: raise ValueError('Content-Range must specify complete object.', httplib.BAD_REQUEST) else: token = gcs_stub.post_start_creation(filename, headers) try: gcs_stub.put_continue_creation(token, payload, content_range.range, content_range.length) except ValueError, e: return _FakeUrlFetchResult(e.args[1], {}, e.args[0]) if content_range.length is not None: response_headers = { 'content-length': 0, } response_status = httplib.OK else: response_headers = {} response_status = 308 return _FakeUrlFetchResult(response_status, response_headers, '')