我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用httplib.CREATED。
def _patch_upload( self, image, digest ): mounted, location = self._start_upload(digest, self._mount) if mounted: logging.info('Layer %s mounted.', digest) return location = self._get_absolute_url(location) resp, unused_content = self._transport.Request( location, method='PATCH', body=self._get_blob(image, digest), content_type='application/octet-stream', accepted_codes=[httplib.NO_CONTENT, httplib.ACCEPTED, httplib.CREATED]) location = self._add_digest(resp['location'], digest) location = self._get_absolute_url(location) self._transport.Request( location, method='PUT', body=None, accepted_codes=[httplib.CREATED])
def _put_manifest( self, image, use_digest=False ): """Upload the manifest for this image.""" if use_digest: tag_or_digest = image.digest() else: tag_or_digest = _tag_or_digest(self._name) self._transport.Request( '{base_url}/manifests/{tag_or_digest}'.format( base_url=self._base_url(), tag_or_digest=tag_or_digest), method='PUT', body=image.manifest(), content_type=image.media_type(), accepted_codes=[httplib.OK, httplib.CREATED, httplib.ACCEPTED])
def _patch_upload( self, image, digest ): mounted, location = self._start_upload(digest, self._mount) if mounted: logging.info('Layer %s mounted.', digest) return location = self._get_absolute_url(location) resp, unused_content = self._transport.Request( location, method='PATCH', body=image.blob(digest), content_type='application/octet-stream', accepted_codes=[httplib.NO_CONTENT, httplib.ACCEPTED, httplib.CREATED]) location = self._add_digest(resp['location'], digest) location = self._get_absolute_url(location) self._transport.Request( location, method='PUT', body=None, accepted_codes=[httplib.CREATED])
def _start_upload( self, digest, mount=None ): """POST to begin the upload process with optional cross-repo mount param.""" if not mount: # Do a normal POST to initiate an upload if mount is missing. url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url()) accepted_codes = [httplib.ACCEPTED] else: # If we have a mount parameter, try to mount the blob from another repo. mount_from = '&'.join( ['from=' + urllib.quote(repo.repository, '') for repo in self._mount]) url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format( base_url=self._base_url(), digest=digest, mount_from=mount_from) accepted_codes = [httplib.CREATED, httplib.ACCEPTED] resp, unused_content = self._transport.Request( url, method='POST', body=None, accepted_codes=accepted_codes) return resp.status == httplib.CREATED, resp.get('location') # type: ignore
def test_upload(self): self.client.post.return_value = mock_upload_response('/files/content', httplib.CREATED, None, 'files', 'POST_response.json') file_name = 'upload.jpg' file_path = path.join(path.dirname(__file__), '..', 'fixtures', 'files', file_name) folder_id = 'folder-id' @mock.patch('orangecloud_client.files.guess_type', return_value=('image/jpeg', 'binary')) @mock.patch('__builtin__.open', spec=open, return_value=MockFile()) @mock.patch('orangecloud_client.files.MultipartEncoder', return_value=mock.Mock()) def fire_test(mock_multipart_encoder, mock_open, _): data = mock_multipart_encoder() data.content_type = 'upload content type' response_file = self.files.upload(file_path, folder_id) self.client.post.assert_called_with(self.client.post.return_value.url, data=data, headers={'Content-Type': data.content_type}) self.assertEqual('file-id', response_file.fileId) self.assertEqual('file-name', response_file.fileName) fire_test()
def test__raise_for_status(self, mock_resp): """ Verify that an exceptions gets raised for unexpected responses. :param mock_resp: mocked httplib Response """ # # ok_statuses should not raise # mock_resp.status = httplib.CREATED self.up.resp = mock_resp self.up.content = '' try: self.up._raise_for_status([httplib.OK, httplib.CREATED]) except Exception as exc: self.fail('_raise_for_status unexpectedly threw {}'.format(exc)) # # Anything else should raise. # mock_resp.status = httplib.ACCEPTED self.assertRaises( upapi.exceptions.UnexpectedAPIResponse, self.up._raise_for_status, [httplib.OK, httplib.CREATED])
def parse_response(self, response): if response.status == httplib.CREATED: self.newKey = True headers = response.headers self.etcd_index = int(headers.get('x-etcd-index', 1)) self.raft_index = int(headers.get('x-raft-index', 1)) self.raft_term = int(headers.get('x-raft-term', 0))
def _handle_server_response(self, response): if response.status in (httplib.OK, httplib.CREATED, httplib.NO_CONTENT): return response logger.debug('invalid response status:{st} body:{body}'.format( st=response.status, body=response.data)) EtcdError.handle(response)
def _monolithic_upload( self, image, digest ): self._transport.Request( '{base_url}/blobs/uploads/?digest={digest}'.format( base_url=self._base_url(), digest=digest), method='POST', body=self._get_blob(image, digest), accepted_codes=[httplib.CREATED])
def _put_upload( self, image, digest ): mounted, location = self._start_upload(digest, self._mount) if mounted: logging.info('Layer %s mounted.', digest) return location = self._add_digest(location, digest) self._transport.Request( location, method='PUT', body=self._get_blob(image, digest), accepted_codes=[httplib.CREATED])
def _start_upload( self, digest, mount=None ): """POST to begin the upload process with optional cross-repo mount param.""" if not mount: # Do a normal POST to initiate an upload if mount is missing. url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url()) accepted_codes = [httplib.ACCEPTED] else: # If we have a mount parameter, try to mount the blob from another repo. mount_from = '&'.join( ['from=' + urllib.quote(repo.repository, '') for repo in self._mount]) url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format( base_url=self._base_url(), digest=digest, mount_from=mount_from) accepted_codes = [httplib.CREATED, httplib.ACCEPTED] resp, unused_content = self._transport.Request( url, method='POST', body=None, accepted_codes=accepted_codes) # pytype: disable=attribute-error return resp.status == httplib.CREATED, resp.get('location') # pytype: enable=attribute-error
def _monolithic_upload(self, image, digest): self._transport.Request( '{base_url}/blobs/uploads/?digest={digest}'.format( base_url=self._base_url(), digest=digest), method='POST', body=image.blob(digest), accepted_codes=[httplib.CREATED])
def _put_upload( self, image, digest ): mounted, location = self._start_upload(digest, self._mount) if mounted: logging.info('Layer %s mounted.', digest) return location = self._add_digest(location, digest) self._transport.Request( location, method='PUT', body=image.blob(digest), accepted_codes=[httplib.CREATED])
def _put_manifest(self, image): """Upload the manifest for this image.""" self._transport.Request( '{base_url}/manifests/{tag_or_digest}'.format( base_url=self._base_url(), tag_or_digest=_tag_or_digest(self._name)), method='PUT', body=image.manifest(), accepted_codes=[httplib.OK, httplib.CREATED, httplib.ACCEPTED])
def test_send_project_valid(self, mock_cs): mock_cs.side_effect = [None] api = API.apiCalls.ApiCalls( client_id="", client_secret="", base_URL="", username="", password="" ) json_dict = { "resource": { "name": "project1", "projectDescription": "projectDescription", "identifier": "1" } } json_obj = json.dumps(json_dict) session_response = Foo() setattr(session_response, "status_code", httplib.CREATED) setattr(session_response, "text", json_obj) session_post = MagicMock(side_effect=[session_response]) session = Foo() setattr(session, "post", session_post) api.session = session api.get_link = lambda x, y, targ_dict="": None proj = API.apiCalls.Project("project1", "projectDescription", "1") json_res = api.send_project(proj) self.assertEqual(json_dict, json_res)
def authenticate(self, auth_data, expected_status=httplib.CREATED): resp = requests.post( self.keystone_token_endpoint, headers=self.headers, json=auth_data, verify='/etc/ssl/certs/keystone.pem', ) self.assertEqual(expected_status, resp.status_code, resp.text) return resp
def _post(self, endpoint, post_data=None): response = requests.request('POST', url='{}/{}'.format(self._api_root, endpoint), headers={'Content-Type': 'application/json', 'Accept': 'application/json'}, auth=(self._login + '/token', self._token), json=post_data) if response.status_code in (httplib.OK, httplib.CREATED): return response.json() raise APIError(response.text, response.status_code)
def _put(self, endpoint, put_data=None): response = requests.request('PUT', url='{}/{}'.format(self._api_root, endpoint), headers={'Content-Type': 'application/json', 'Accept': 'application/json'}, auth=(self._login + '/token', self._token), json=put_data) if response.status_code in (httplib.OK, httplib.CREATED): return response.json() raise APIError(response.text, response.status_code)
def _upload_file(self, endpoint, path, form_data=None): multipart_data = {} if form_data is not None: for key, value in form_data.iteritems(): multipart_data[key] = (None, value) with open(path, 'rb') as src_file: multipart_data['file'] = src_file response = requests.request('POST', url='{}/{}'.format(self._api_root, endpoint), headers={'Accept': 'application/json'}, auth=(self._login + '/token', self._token), files=multipart_data) if response.status_code in (httplib.NO_CONTENT, httplib.CREATED): return response.json() raise APIError(response.text, response.status_code)
def create_credentials(token, account_id): """ Get client credentials, given a client token and an account_id. Reference: https://docs.brightcove.com/en/video-cloud/oauth-api/guides/get-client-credentials.html """ headers = {'Authorization': 'BC_TOKEN {}'.format(token)} data = { "type": "credential", "maximum_scope": [{ "identity": { "type": "video-cloud-account", "account-id": int(account_id), }, "operations": [ "video-cloud/video/all", "video-cloud/ingest-profiles/profile/read", "video-cloud/ingest-profiles/account/read", "video-cloud/ingest-profiles/profile/write", "video-cloud/ingest-profiles/account/write", ], }], "name": "Open edX Video XBlock" } url = 'https://oauth.brightcove.com/v4/client_credentials' response = requests.post(url, json=data, headers=headers) response_data = response.json() # New resource must have been created. if response.status_code == httplib.CREATED and response_data: client_secret = response_data.get('client_secret') client_id = response_data.get('client_id') error_message = '' else: # For dev purposes, response_data.get('error_description') may also be considered. error_message = "Authentication to Brightcove API failed: no client credentials have been retrieved.\n" \ "Please ensure you have provided an appropriate BC token, using Video API Token field." raise BrightcoveApiClientError(error_message) return client_secret, client_id, error_message
def post(self, url, payload, headers=None, can_retry=True): """ Issue REST POST request to a given URL. Can throw ApiClientError or its subclass. Arguments: url (str): API url to fetch a resource from. payload (dict): POST data. headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests. can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call. Returns: Response in Python native data format. """ headers_ = { 'Authorization': 'Bearer ' + self.access_token, 'Content-type': 'application/json' } if headers is not None: headers_.update(headers) resp = requests.post(url, data=payload, headers=headers_) if resp.status_code in (httplib.OK, httplib.CREATED): return resp.json() elif resp.status_code == httplib.UNAUTHORIZED and can_retry: self.access_token = self._refresh_access_token() return self.post(url, payload, headers, can_retry=False) else: raise BrightcoveApiClientError
def _handle_post(gcs_stub, filename, headers): """Handle POST that starts object creation.""" content_type = _ContentType(headers) token = gcs_stub.post_start_creation(filename, headers) response_headers = { 'location': 'https://storage.googleapis.com/%s?%s' % ( urllib.quote(filename), urllib.urlencode({'upload_id': token})), 'content-type': content_type.value, 'content-length': 0 } return _FakeUrlFetchResult(httplib.CREATED, response_headers, '')
def test_send_samples_valid(self, mock_cs): mock_cs.side_effect = [None] api = API.apiCalls.ApiCalls( client_id="", client_secret="", base_URL="", username="", password="" ) json_dict = { "resource": { "sequencerSampleId": "03-3333", "description": "The 53rd sample", "sampleName": "03-3333", "sampleProject": "1" } } json_obj = json.dumps(json_dict) session_response = Foo() setattr(session_response, "status_code", httplib.CREATED) setattr(session_response, "text", json_obj) session_post = MagicMock(side_effect=[session_response]) session = Foo() setattr(session, "post", session_post) api.get_link = lambda x, y, targ_dict="": None api.session = session sample_dict = { "sequencerSampleId": "03-3333", "description": "The 53rd sample", "sampleName": "03-3333", "sampleProject": "1" } sample = API.apiCalls.Sample(sample_dict) json_res_list = api.send_samples([sample]) self.assertEqual(len(json_res_list), 1) json_res = json_res_list[0] self.assertEqual(json_res, json_dict)
def test_send_sequence_files_valid(self, getsize, mock_cs): mock_cs.side_effect = [None] api = API.apiCalls.ApiCalls( client_id="", client_secret="", base_URL="", username="", password="" ) json_dict = { "resource": [ { "file": "03-3333_S1_L001_R1_001.fastq.gz" }, { "file": "03-3333_S1_L001_R2_001.fastq.gz" } ] } json_obj = json.dumps(json_dict) session_response = Foo() setattr(session_response, "status_code", httplib.CREATED) setattr(session_response, "text", json_obj) session_post = MagicMock(side_effect=[session_response]) session = Foo() setattr(session, "post", session_post) api.get_link = lambda x, y, targ_dict="": None api.session = session API.apiCalls.ApiCalls.get_file_size_list = MagicMock() sample_dict = { "sequencerSampleId": "03-3333", "description": "The 53rd sample", "sampleName": "03-3333", "sampleProject": "1" } sample = API.apiCalls.Sample(sample_dict) files = ["03-3333_S1_L001_R1_001.fastq.gz", "03-3333_S1_L001_R2_001.fastq.gz"] seq_file = SequenceFile({}, files) sample.set_seq_file(seq_file) sample.run = SequencingRun(sample_sheet="sheet", sample_list=[sample]) sample.run._sample_sheet_name = "sheet" kwargs = { "samples_list": [sample] } json_res_list = api.send_sequence_files(**kwargs) self.assertEqual(len(json_res_list), 1) json_res = json_res_list[0] self.assertEqual(json_res, json_dict)
def send_project(self, project, clear_cache=True): """ post request to send a project to IRIDA via API the project being sent requires a name that is at least 5 characters long arguments: project -- a Project object to be sent. returns a dictionary containing the result of post request. when post is successful the dictionary it returns will contain the same name and projectDescription that was originally sent as well as additional keys like createdDate and identifier. when post fails then an error will be raised so return statement is not even reached. """ if clear_cache: self.cached_projects = None json_res = {} if len(project.get_name()) >= 5: url = self.get_link(self.base_URL, "projects") json_obj = json.dumps(project.get_dict()) headers = { "headers": { "Content-Type": "application/json" } } response = self.session.post(url, json_obj, **headers) if response.status_code == httplib.CREATED: # 201 json_res = json.loads(response.text) else: raise ProjectError("Error: " + str(response.status_code) + " " + response.text) else: raise ProjectError("Invalid project name: " + project.get_name() + ". A project requires a name that must be " + "5 or more characters.") return json_res
def send_samples(self, samples_list): """ post request to send sample(s) to the given project the project that the sample will be sent to is in its dictionary's "sampleProject" key arguments: samples_list -- list containing Sample object(s) to send returns a list containing dictionaries of the result of post request. """ self.cached_samples = {} # reset the cache, we're updating stuff self.cached_projects = None json_res_list = [] for sample in samples_list: try: project_id = sample.get_project_id() proj_URL = self.get_link(self.base_URL, "projects") url = self.get_link(proj_URL, "project/samples", targ_dict={ "key": "identifier", "value": project_id }) except StopIteration: raise ProjectError("The given project ID: " + project_id + " doesn't exist") headers = { "headers": { "Content-Type": "application/json" } } json_obj = json.dumps(sample, cls=Sample.JsonEncoder) response = self.session.post(url, json_obj, **headers) if response.status_code == httplib.CREATED: # 201 json_res = json.loads(response.text) json_res_list.append(json_res) else: logging.error("Didn't create sample on server, response code is [{}] and error message is [{}]".format(response.status_code, response.text)) raise SampleError("Error {status_code}: {err_msg}.\nSample data: {sample_data}".format( status_code=str(response.status_code), err_msg=response.text, sample_data=str(sample)), ["IRIDA rejected the sample."]) return json_res_list