Python httplib 模块,CREATED 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用httplib.CREATED

项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _patch_upload(
      self,
      image,
      digest
  ):
    mounted, location = self._start_upload(digest, self._mount)

    if mounted:
      logging.info('Layer %s mounted.', digest)
      return

    location = self._get_absolute_url(location)

    resp, unused_content = self._transport.Request(
        location, method='PATCH', body=self._get_blob(image, digest),
        content_type='application/octet-stream',
        accepted_codes=[httplib.NO_CONTENT, httplib.ACCEPTED, httplib.CREATED])

    location = self._add_digest(resp['location'], digest)
    location = self._get_absolute_url(location)
    self._transport.Request(
        location, method='PUT', body=None,
        accepted_codes=[httplib.CREATED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _put_manifest(
      self,
      image,
      use_digest=False
  ):
    """Upload the manifest for this image."""
    if use_digest:
      tag_or_digest = image.digest()
    else:
      tag_or_digest = _tag_or_digest(self._name)

    self._transport.Request(
        '{base_url}/manifests/{tag_or_digest}'.format(
            base_url=self._base_url(),
            tag_or_digest=tag_or_digest),
        method='PUT',
        body=image.manifest(),
        content_type=image.media_type(),
        accepted_codes=[httplib.OK, httplib.CREATED, httplib.ACCEPTED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _patch_upload(
      self,
      image,
      digest
  ):
    mounted, location = self._start_upload(digest, self._mount)

    if mounted:
      logging.info('Layer %s mounted.', digest)
      return

    location = self._get_absolute_url(location)

    resp, unused_content = self._transport.Request(
        location, method='PATCH', body=image.blob(digest),
        content_type='application/octet-stream',
        accepted_codes=[httplib.NO_CONTENT, httplib.ACCEPTED, httplib.CREATED])

    location = self._add_digest(resp['location'], digest)
    location = self._get_absolute_url(location)
    self._transport.Request(
        location, method='PUT', body=None,
        accepted_codes=[httplib.CREATED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _start_upload(
      self,
      digest,
      mount=None
  ):
    """POST to begin the upload process with optional cross-repo mount param."""
    if not mount:
      # Do a normal POST to initiate an upload if mount is missing.
      url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url())
      accepted_codes = [httplib.ACCEPTED]
    else:
      # If we have a mount parameter, try to mount the blob from another repo.
      mount_from = '&'.join(
          ['from=' + urllib.quote(repo.repository, '') for repo in self._mount])
      url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format(
          base_url=self._base_url(),
          digest=digest,
          mount_from=mount_from)
      accepted_codes = [httplib.CREATED, httplib.ACCEPTED]

    resp, unused_content = self._transport.Request(
        url, method='POST', body=None,
        accepted_codes=accepted_codes)
    return resp.status == httplib.CREATED, resp.get('location')  # type: ignore
项目:orangecloud-client    作者:antechrestos    | 项目源码 | 文件源码
def test_upload(self):
        self.client.post.return_value = mock_upload_response('/files/content',
                                                             httplib.CREATED,
                                                             None,
                                                             'files', 'POST_response.json')
        file_name = 'upload.jpg'
        file_path = path.join(path.dirname(__file__), '..', 'fixtures', 'files', file_name)
        folder_id = 'folder-id'

        @mock.patch('orangecloud_client.files.guess_type', return_value=('image/jpeg', 'binary'))
        @mock.patch('__builtin__.open', spec=open, return_value=MockFile())
        @mock.patch('orangecloud_client.files.MultipartEncoder', return_value=mock.Mock())
        def fire_test(mock_multipart_encoder, mock_open, _):
            data = mock_multipart_encoder()
            data.content_type = 'upload content type'
            response_file = self.files.upload(file_path, folder_id)
            self.client.post.assert_called_with(self.client.post.return_value.url,
                                                data=data,
                                                headers={'Content-Type': data.content_type})
            self.assertEqual('file-id', response_file.fileId)
            self.assertEqual('file-name', response_file.fileName)

        fire_test()
项目:UPPlatform_Python_SDK    作者:Jawbone    | 项目源码 | 文件源码
def test__raise_for_status(self, mock_resp):
        """
        Verify that an exceptions gets raised for unexpected responses.

        :param mock_resp: mocked httplib Response
        """
        #
        # ok_statuses should not raise
        #
        mock_resp.status = httplib.CREATED
        self.up.resp = mock_resp
        self.up.content = ''
        try:
            self.up._raise_for_status([httplib.OK, httplib.CREATED])
        except Exception as exc:
            self.fail('_raise_for_status unexpectedly threw {}'.format(exc))

        #
        # Anything else should raise.
        #
        mock_resp.status = httplib.ACCEPTED
        self.assertRaises(
            upapi.exceptions.UnexpectedAPIResponse,
            self.up._raise_for_status,
            [httplib.OK, httplib.CREATED])
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def parse_response(self, response):

        if response.status == httplib.CREATED:
            self.newKey = True

        headers = response.headers
        self.etcd_index = int(headers.get('x-etcd-index', 1))
        self.raft_index = int(headers.get('x-raft-index', 1))
        self.raft_term = int(headers.get('x-raft-term', 0))
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def _handle_server_response(self, response):

        if response.status in (httplib.OK, httplib.CREATED,
                               httplib.NO_CONTENT):
            return response

        logger.debug('invalid response status:{st} body:{body}'.format(
                     st=response.status, body=response.data))

        EtcdError.handle(response)
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _monolithic_upload(
      self,
      image,
      digest
  ):
    self._transport.Request(
        '{base_url}/blobs/uploads/?digest={digest}'.format(
            base_url=self._base_url(),
            digest=digest),
        method='POST', body=self._get_blob(image, digest),
        accepted_codes=[httplib.CREATED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _put_upload(
      self,
      image,
      digest
  ):
    mounted, location = self._start_upload(digest, self._mount)

    if mounted:
      logging.info('Layer %s mounted.', digest)
      return

    location = self._add_digest(location, digest)
    self._transport.Request(
        location, method='PUT', body=self._get_blob(image, digest),
        accepted_codes=[httplib.CREATED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _start_upload(
      self,
      digest,
      mount=None
  ):
    """POST to begin the upload process with optional cross-repo mount param."""
    if not mount:
      # Do a normal POST to initiate an upload if mount is missing.
      url = '{base_url}/blobs/uploads/'.format(base_url=self._base_url())
      accepted_codes = [httplib.ACCEPTED]
    else:
      # If we have a mount parameter, try to mount the blob from another repo.
      mount_from = '&'.join(
          ['from=' + urllib.quote(repo.repository, '') for repo in self._mount])
      url = '{base_url}/blobs/uploads/?mount={digest}&{mount_from}'.format(
          base_url=self._base_url(),
          digest=digest,
          mount_from=mount_from)
      accepted_codes = [httplib.CREATED, httplib.ACCEPTED]

    resp, unused_content = self._transport.Request(
        url, method='POST', body=None,
        accepted_codes=accepted_codes)
    # pytype: disable=attribute-error
    return resp.status == httplib.CREATED, resp.get('location')
    # pytype: enable=attribute-error
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _monolithic_upload(self, image,
                         digest):
    self._transport.Request(
        '{base_url}/blobs/uploads/?digest={digest}'.format(
            base_url=self._base_url(),
            digest=digest),
        method='POST', body=image.blob(digest),
        accepted_codes=[httplib.CREATED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _put_upload(
      self,
      image,
      digest
  ):
    mounted, location = self._start_upload(digest, self._mount)

    if mounted:
      logging.info('Layer %s mounted.', digest)
      return

    location = self._add_digest(location, digest)
    self._transport.Request(
        location, method='PUT', body=image.blob(digest),
        accepted_codes=[httplib.CREATED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _put_manifest(self, image):
    """Upload the manifest for this image."""
    self._transport.Request(
        '{base_url}/manifests/{tag_or_digest}'.format(
            base_url=self._base_url(),
            tag_or_digest=_tag_or_digest(self._name)),
        method='PUT', body=image.manifest(),
        accepted_codes=[httplib.OK, httplib.CREATED, httplib.ACCEPTED])
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def test_send_project_valid(self, mock_cs):

        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls(
            client_id="",
            client_secret="",
            base_URL="",
            username="",
            password=""
        )

        json_dict = {
            "resource": {
                "name": "project1",
                "projectDescription": "projectDescription",
                "identifier": "1"
            }
        }

        json_obj = json.dumps(json_dict)

        session_response = Foo()
        setattr(session_response, "status_code", httplib.CREATED)
        setattr(session_response, "text", json_obj)

        session_post = MagicMock(side_effect=[session_response])
        session = Foo()
        setattr(session, "post", session_post)

        api.session = session
        api.get_link = lambda x, y, targ_dict="": None
        proj = API.apiCalls.Project("project1", "projectDescription", "1")

        json_res = api.send_project(proj)
        self.assertEqual(json_dict, json_res)
项目:capstone    作者:rackerlabs    | 项目源码 | 文件源码
def authenticate(self, auth_data, expected_status=httplib.CREATED):
        resp = requests.post(
            self.keystone_token_endpoint, headers=self.headers, json=auth_data,
            verify='/etc/ssl/certs/keystone.pem',
        )
        self.assertEqual(expected_status, resp.status_code, resp.text)
        return resp
项目:zendesk-help-center-localization    作者:mykola-mokhnach    | 项目源码 | 文件源码
def _post(self, endpoint, post_data=None):
        response = requests.request('POST',
                                    url='{}/{}'.format(self._api_root, endpoint),
                                    headers={'Content-Type': 'application/json',
                                             'Accept': 'application/json'},
                                    auth=(self._login + '/token', self._token),
                                    json=post_data)
        if response.status_code in (httplib.OK, httplib.CREATED):
            return response.json()
        raise APIError(response.text, response.status_code)
项目:zendesk-help-center-localization    作者:mykola-mokhnach    | 项目源码 | 文件源码
def _put(self, endpoint, put_data=None):
        response = requests.request('PUT',
                                    url='{}/{}'.format(self._api_root, endpoint),
                                    headers={'Content-Type': 'application/json',
                                             'Accept': 'application/json'},
                                    auth=(self._login + '/token', self._token),
                                    json=put_data)
        if response.status_code in (httplib.OK, httplib.CREATED):
            return response.json()
        raise APIError(response.text, response.status_code)
项目:zendesk-help-center-localization    作者:mykola-mokhnach    | 项目源码 | 文件源码
def _upload_file(self, endpoint, path, form_data=None):
        multipart_data = {}
        if form_data is not None:
            for key, value in form_data.iteritems():
                multipart_data[key] = (None, value)
        with open(path, 'rb') as src_file:
            multipart_data['file'] = src_file
            response = requests.request('POST',
                                        url='{}/{}'.format(self._api_root, endpoint),
                                        headers={'Accept': 'application/json'},
                                        auth=(self._login + '/token', self._token),
                                        files=multipart_data)
        if response.status_code in (httplib.NO_CONTENT, httplib.CREATED):
            return response.json()
        raise APIError(response.text, response.status_code)
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def create_credentials(token, account_id):
        """
        Get client credentials, given a client token and an account_id.

        Reference:
            https://docs.brightcove.com/en/video-cloud/oauth-api/guides/get-client-credentials.html
        """
        headers = {'Authorization': 'BC_TOKEN {}'.format(token)}
        data = {
            "type": "credential",
            "maximum_scope": [{
                "identity": {
                    "type": "video-cloud-account",
                    "account-id": int(account_id),
                },
                "operations": [
                    "video-cloud/video/all",
                    "video-cloud/ingest-profiles/profile/read",
                    "video-cloud/ingest-profiles/account/read",
                    "video-cloud/ingest-profiles/profile/write",
                    "video-cloud/ingest-profiles/account/write",
                ],
            }],
            "name": "Open edX Video XBlock"
        }
        url = 'https://oauth.brightcove.com/v4/client_credentials'
        response = requests.post(url, json=data, headers=headers)
        response_data = response.json()
        # New resource must have been created.
        if response.status_code == httplib.CREATED and response_data:
            client_secret = response_data.get('client_secret')
            client_id = response_data.get('client_id')
            error_message = ''
        else:
            # For dev purposes, response_data.get('error_description') may also be considered.
            error_message = "Authentication to Brightcove API failed: no client credentials have been retrieved.\n" \
                            "Please ensure you have provided an appropriate BC token, using Video API Token field."
            raise BrightcoveApiClientError(error_message)
        return client_secret, client_id, error_message
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def post(self, url, payload, headers=None, can_retry=True):
        """
        Issue REST POST request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            payload (dict): POST data.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests.
            can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
        Returns:
            Response in Python native data format.
        """
        headers_ = {
            'Authorization': 'Bearer ' + self.access_token,
            'Content-type': 'application/json'
        }
        if headers is not None:
            headers_.update(headers)

        resp = requests.post(url, data=payload, headers=headers_)
        if resp.status_code in (httplib.OK, httplib.CREATED):
            return resp.json()
        elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
            self.access_token = self._refresh_access_token()
            return self.post(url, payload, headers, can_retry=False)
        else:
            raise BrightcoveApiClientError
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _handle_post(gcs_stub, filename, headers):
  """Handle POST that starts object creation."""
  content_type = _ContentType(headers)
  token = gcs_stub.post_start_creation(filename, headers)
  response_headers = {
      'location': 'https://storage.googleapis.com/%s?%s' % (
          urllib.quote(filename),
          urllib.urlencode({'upload_id': token})),
      'content-type': content_type.value,
      'content-length': 0
  }
  return _FakeUrlFetchResult(httplib.CREATED, response_headers, '')
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def test_send_samples_valid(self, mock_cs):

        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls(
            client_id="",
            client_secret="",
            base_URL="",
            username="",
            password=""
        )

        json_dict = {
            "resource": {
                "sequencerSampleId": "03-3333",
                "description": "The 53rd sample",
                "sampleName": "03-3333",
                "sampleProject": "1"
            }
        }

        json_obj = json.dumps(json_dict)

        session_response = Foo()
        setattr(session_response, "status_code", httplib.CREATED)
        setattr(session_response, "text", json_obj)

        session_post = MagicMock(side_effect=[session_response])
        session = Foo()
        setattr(session, "post", session_post)

        api.get_link = lambda x, y, targ_dict="": None
        api.session = session

        sample_dict = {
            "sequencerSampleId": "03-3333",
            "description": "The 53rd sample",
            "sampleName": "03-3333",
            "sampleProject": "1"
        }

        sample = API.apiCalls.Sample(sample_dict)
        json_res_list = api.send_samples([sample])

        self.assertEqual(len(json_res_list), 1)

        json_res = json_res_list[0]
        self.assertEqual(json_res, json_dict)
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def test_send_sequence_files_valid(self, getsize, mock_cs):

        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls(
            client_id="",
            client_secret="",
            base_URL="",
            username="",
            password=""
        )

        json_dict = {
            "resource": [
                {
                    "file": "03-3333_S1_L001_R1_001.fastq.gz"
                },
                {
                    "file": "03-3333_S1_L001_R2_001.fastq.gz"
                }
            ]
        }

        json_obj = json.dumps(json_dict)

        session_response = Foo()
        setattr(session_response, "status_code", httplib.CREATED)
        setattr(session_response, "text", json_obj)

        session_post = MagicMock(side_effect=[session_response])
        session = Foo()
        setattr(session, "post", session_post)

        api.get_link = lambda x, y, targ_dict="": None
        api.session = session
        API.apiCalls.ApiCalls.get_file_size_list = MagicMock()

        sample_dict = {
            "sequencerSampleId": "03-3333",
            "description": "The 53rd sample",
            "sampleName": "03-3333",
            "sampleProject": "1"
        }

        sample = API.apiCalls.Sample(sample_dict)
        files = ["03-3333_S1_L001_R1_001.fastq.gz",
                      "03-3333_S1_L001_R2_001.fastq.gz"]
        seq_file = SequenceFile({}, files)
        sample.set_seq_file(seq_file)
        sample.run = SequencingRun(sample_sheet="sheet", sample_list=[sample])
        sample.run._sample_sheet_name = "sheet"

        kwargs = {
            "samples_list": [sample]
        }
        json_res_list = api.send_sequence_files(**kwargs)

        self.assertEqual(len(json_res_list), 1)

        json_res = json_res_list[0]
        self.assertEqual(json_res, json_dict)
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def send_project(self, project, clear_cache=True):
        """
        post request to send a project to IRIDA via API
        the project being sent requires a name that is at least
            5 characters long

        arguments:
            project -- a Project object to be sent.

        returns a dictionary containing the result of post request.
        when post is successful the dictionary it returns will contain the same
            name and projectDescription that was originally sent as well as
            additional keys like createdDate and identifier.
        when post fails then an error will be raised so return statement is
            not even reached.
        """
        if clear_cache:
            self.cached_projects = None

        json_res = {}
        if len(project.get_name()) >= 5:
            url = self.get_link(self.base_URL, "projects")
            json_obj = json.dumps(project.get_dict())
            headers = {
                "headers": {
                    "Content-Type": "application/json"
                }
            }

            response = self.session.post(url, json_obj, **headers)

            if response.status_code == httplib.CREATED:  # 201
                json_res = json.loads(response.text)
            else:
                raise ProjectError("Error: " +
                                   str(response.status_code) + " " +
                                   response.text)

        else:
            raise ProjectError("Invalid project name: " +
                               project.get_name() +
                               ". A project requires a name that must be " +
                               "5 or more characters.")

        return json_res
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def send_samples(self, samples_list):
        """
        post request to send sample(s) to the given project
        the project that the sample will be sent to is in its dictionary's
        "sampleProject" key

        arguments:
            samples_list -- list containing Sample object(s) to send

        returns a list containing dictionaries of the result of post request.
        """

        self.cached_samples = {} # reset the cache, we're updating stuff
        self.cached_projects = None
        json_res_list = []

        for sample in samples_list:

            try:
                project_id = sample.get_project_id()
                proj_URL = self.get_link(self.base_URL, "projects")
                url = self.get_link(proj_URL, "project/samples",
                                    targ_dict={
                                        "key": "identifier",
                                        "value": project_id
                                    })

            except StopIteration:
                raise ProjectError("The given project ID: " +
                                   project_id + " doesn't exist")

            headers = {
                "headers": {
                    "Content-Type": "application/json"
                }
            }

            json_obj = json.dumps(sample, cls=Sample.JsonEncoder)
            response = self.session.post(url, json_obj, **headers)

            if response.status_code == httplib.CREATED:  # 201
                json_res = json.loads(response.text)
                json_res_list.append(json_res)
            else:
                logging.error("Didn't create sample on server, response code is [{}] and error message is [{}]".format(response.status_code, response.text))
                raise SampleError("Error {status_code}: {err_msg}.\nSample data: {sample_data}".format(
                                  status_code=str(response.status_code),
                                  err_msg=response.text,
                                  sample_data=str(sample)), ["IRIDA rejected the sample."])
        return json_res_list