我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_serialization.jsonutils.dump_as_bytes()。
def test_request_invalid_image(self, mock_get_image): mock_get_image.side_effect = Fault(webob.exc.HTTPBadRequest()) body = {"metadata": {"ipa_enroll": "False"}, "instance-id": fake.INSTANCE_ID, "project-id": fake.PROJECT_ID, "image-id": "invalid", "hostname": "test"} req = fakes.HTTPRequest.blank('/v1') req.method = 'POST' req.content_type = "application/json" req.body = jsonutils.dump_as_bytes(body) # Not using assertRaises because the exception is wrapped as # a Fault try: self.join_controller.create(req, body) except Fault as fault: assert fault.status_int == 400 else: assert(False)
def test_invalid_instance_id(self, mock_get_image, mock_get_instance): """Mock the instance to not exist so there is nothing to enroll.""" mock_get_image.return_value = FakeImageService() mock_get_instance.return_value = None body = {"metadata": {"ipa_enroll": "True"}, "instance-id": "invalid", "project-id": fake.PROJECT_ID, "image-id": fake.IMAGE_ID, "hostname": "test"} req = fakes.HTTPRequest.blank('/v1') req.method = 'POST' req.content_type = "application/json" req.body = jsonutils.dump_as_bytes(body) # Not using assertRaises because the exception is wrapped as # a Fault try: self.join_controller.create(req, body) except Fault as fault: assert fault.status_int == 400 else: assert(False)
def update_image(self, image_uuid, values): if 'uuid' in values: msg = _('Cannot overwrite UUID for an existing image.') raise exception.InvalidParameterValue(err=msg) try: target = self.client.read('/images/' + image_uuid) target_value = json.loads(target.value) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.ImageNotFound(image=image_uuid) except Exception as e: LOG.error('Error occurred while updating image: %s', six.text_type(e)) raise return translate_etcd_result(target, 'image')
def update_resource_class(self, context, uuid, values): if 'uuid' in values: msg = _("Cannot override UUID for an existing resource class.") raise exception.InvalidParameterValue(err=msg) try: target = self.client.read('/resource_classes/' + uuid) target_value = json.loads(target.value) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.ResourceClassNotFound(resource_class=uuid) except Exception as e: LOG.error( 'Error occurred while updating resource class: %s', six.text_type(e)) raise return translate_etcd_result(target, 'resource_class')
def update_pci_device(self, node_id, address, values): try: pci_device = self.get_pci_device_by_addr(node_id, address) target = self.client.read('/pcidevices/' + pci_device.uuid) target_value = json.loads(target.value) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except exception.PciDeviceNotFound: values.update({'compute_node_uuid': node_id, 'address': address}) return self._create_pci_device(values) except Exception as e: LOG.error('Error occurred while updating pci device: %s', six.text_type(e)) raise return translate_etcd_result(target, 'pcidevice')
def update_volume_mapping(self, context, volume_mapping_uuid, values): if 'uuid' in values: msg = _('Cannot overwrite UUID for an existing VolumeMapping.') raise exception.InvalidParameterValue(err=msg) try: target_uuid = self.get_volume_mapping_by_uuid( context, volume_mapping_uuid).uuid target = self.client.read('/volume_mapping/' + target_uuid) target_value = json.loads(target.value) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.VolumeMappingNotFound( volume_mapping=volume_mapping_uuid) except Exception as e: LOG.error('Error occurred while updating volume mappping: %s', six.text_type(e)) raise return translate_etcd_result(target, 'volume_mapping')
def __call__(self, environ, start_response): for err_str in self.app_iter: err = {} try: err = json.loads(err_str.decode('utf-8')) except ValueError: pass links = {'rel': 'help', 'href': 'https://developer.openstack.org' '/api-guide/compute/microversions.html'} err['max_version'] = self.max_version err['min_version'] = self.min_version err['code'] = "zun.microversion-unsupported" err['links'] = [links] err['title'] = "Requested microversion is unsupported" self.app_iter = [six.b(json.dump_as_bytes(err))] self.headers['Content-Length'] = str(len(self.app_iter[0])) return super(HTTPNotAcceptableAPIVersion, self).__call__( environ, start_response)
def _test_create_extra(self, params, no_image=False, override_controller=None): image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' server = dict(name='server_test', imageRef=image_uuid, flavorRef=2) if no_image: server.pop('imageRef', None) server.update(params) body = dict(server=server) req = os_api_fakes.HTTPRequestV21.blank('/servers') req.method = 'POST' req.body = jsonutils.dump_as_bytes(body) req.headers["content-type"] = "application/json" if override_controller: server = override_controller.create(req, body=body).obj['server'] else: server = self.controller.create(req, body=body).obj['server'] return server
def json_request(self, method, conn_url, **kwargs): kwargs.setdefault('headers', {}) kwargs['headers'].setdefault('Content-Type', 'application/json') kwargs['headers'].setdefault('Accept', 'application/json') if 'body' in kwargs: kwargs['body'] = jsonutils.dump_as_bytes(kwargs['body']) resp, body_iter = self._http_request(conn_url, method, **kwargs) content_type = resp.headers.get('Content-Type') if(resp.status_code in (http_client.NO_CONTENT, http_client.RESET_CONTENT) or content_type is None): return resp, list() if 'application/json' in content_type: body = ''.join([chunk for chunk in body_iter]) try: body = jsonutils.loads(body) except ValueError: LOG.error(_LE('Could not decode response body as JSON')) else: body = None return resp, body
def _metadata_as_json(self, version, path): metadata = {'uuid': self.uuid} if self.files: metadata['files'] = self.files if self.extra_md: metadata.update(self.extra_md) if self.keypair: metadata['public_keys'] = { self.keypair.name: self.keypair.public_key, } metadata['keys'] = [ {'name': self.keypair.name, 'type': self.keypair.type, 'data': self.keypair.public_key} ] metadata['hostname'] = self.hostname metadata['name'] = self.server.name metadata['availability_zone'] = self.availability_zone return jsonutils.dump_as_bytes(metadata)
def json_request(self, method, url, **kwargs): kwargs.setdefault('headers', {}) kwargs['headers'].setdefault('Content-Type', 'application/json') kwargs['headers'].setdefault('Accept', 'application/json') if 'body' in kwargs: kwargs['data'] = jsonutils.dump_as_bytes(kwargs.pop('body')) resp = self._http_request(url, method, **kwargs) body = resp.content content_type = resp.headers.get('content-type', None) status = resp.status_code if (status in ( http_client.NO_CONTENT, http_client.RESET_CONTENT) or content_type is None): return resp, list() if 'application/json' in content_type: try: body = resp.json() except ValueError: LOG.error(_LE('Could not decode response body as JSON')) else: body = None return resp, body
def test_create_success_with_201_response_code( self, mock_client, mock_create): body = { "notification": { "hostname": "fake_host", "payload": { "event": "STOPPED", "host_status": "NORMAL", "cluster_status": "ONLINE" }, "type": "VM", "generated_time": NOW } } fake_req = self.req fake_req.headers['Content-Type'] = 'application/json' fake_req.method = 'POST' fake_req.body = jsonutils.dump_as_bytes(body) resp = fake_req.get_response(self.app) self.assertEqual(http.ACCEPTED, resp.status_code)
def test_create_success_with_201_response_code( self, mock_client, mock_create): body = { "segment": { "name": "segment1", "service_type": "COMPUTE", "recovery_method": "auto", "description": "failover_segment for compute" } } fake_req = self.req fake_req.headers['Content-Type'] = 'application/json' fake_req.method = 'POST' fake_req.body = jsonutils.dump_as_bytes(body) resp = fake_req.get_response(self.app) self.assertEqual(http.CREATED, resp.status_code)
def test_create_success_with_201_response_code( self, mock_client, mock_create): body = { "host": { "name": "host-1", "type": "fake", "reserved": False, "on_maintenance": False, "control_attributes": "fake-control_attributes" } } fake_req = self.req fake_req.headers['Content-Type'] = 'application/json' fake_req.method = 'POST' fake_req.body = jsonutils.dump_as_bytes(body) resp = fake_req.get_response(self.app) self.assertEqual(http.CREATED, resp.status_code)
def index(req): """Respond to a request for all OpenStack Glare API versions. :param req: user request object :return: list of supported API versions """ version_objs = [ { 'version': '1.0', 'status': 'STABLE', 'links': _LINKS, 'media-type': 'application/vnd.openstack.artifacts-1.0', }, { 'version': '1.1', 'status': 'EXPERIMENTAL', 'links': _LINKS, 'media-type': 'application/vnd.openstack.artifacts-1.1', }] response = webob.Response(request=req, status=http_client.MULTIPLE_CHOICES, content_type='application/json') response.body = jsonutils.dump_as_bytes(dict(versions=version_objs)) return response
def test_microversions_schema(self, mock_namespace, mock_maxver): mock_maxver.return_value = api_version.APIVersionRequest("3.3") app = fakes.wsgi_app_v21(init_only='test-microversions') req = fakes.HTTPRequest.blank('/v2/fake/microversions3') req.method = 'POST' req.headers = {self.header_name: '2.2'} req.environ['CONTENT_TYPE'] = "application/json" req.body = jsonutils.dump_as_bytes({'dummy': {'val': 'foo'}}) res = req.get_response(app) self.assertEqual(200, res.status_int) resp_json = jsonutils.loads(res.body) self.assertEqual('create_val1', resp_json['param']) self.assertEqual("2.2", res.headers[self.header_name]) self.assertEqual(self.header_name, res.headers['Vary'])
def test_microversions_schema_out_of_version_check(self, mock_namespace, mock_maxver): mock_maxver.return_value = api_version.APIVersionRequest("3.3") app = fakes.wsgi_app_v21(init_only='test-microversions') req = fakes.HTTPRequest.blank('/v2/fake/microversions3/1') req.method = 'PUT' req.headers = {self.header_name: '2.2'} req.body = jsonutils.dump_as_bytes({'dummy': {'inv_val': 'foo'}}) req.environ['CONTENT_TYPE'] = "application/json" res = req.get_response(app) self.assertEqual(200, res.status_int) resp_json = jsonutils.loads(res.body) self.assertEqual('update_val1', resp_json['param']) self.assertEqual("2.2", res.headers[self.header_name])
def test_microversions_schema_second_version(self, mock_namespace, mock_maxver): mock_maxver.return_value = api_version.APIVersionRequest("3.3") app = fakes.wsgi_app_v21(init_only='test-microversions') req = fakes.HTTPRequest.blank('/v2/fake/microversions3/1') req.headers = {self.header_name: '2.10'} req.environ['CONTENT_TYPE'] = "application/json" req.method = 'PUT' req.body = jsonutils.dump_as_bytes({'dummy': {'val2': 'foo'}}) res = req.get_response(app) self.assertEqual(200, res.status_int) resp_json = jsonutils.loads(res.body) self.assertEqual('update_val1', resp_json['param']) self.assertEqual("2.10", res.headers[self.header_name])
def test_create_server_keypair_name_with_leading_trailing_compat_mode( self, mock_create): mock_create.return_value = ( objects.InstanceList(objects=[ fakes.stub_instance_obj(ctxt=None, id=1)]), None) req = fakes.HTTPRequest.blank(self.base_url + '/servers') req.method = 'POST' req.headers["content-type"] = "application/json" req.body = jsonutils.dump_as_bytes({'server': {'name': 'test', 'flavorRef': 1, 'keypair_name': ' abc ', 'imageRef': FAKE_UUID}}) req.set_legacy_v2() res = req.get_response(self.app_server) self.assertEqual(202, res.status_code)
def test_rebuild_instance_onset_file_limit_over_quota(self): def fake_get_image(self, context, image_href, **kwargs): return dict(id='76fa36fc-c930-4bf3-8c8a-ea2a2420deb6', name='public image', is_public=True, status='active') with test.nested( mock.patch.object(fake._FakeImageService, 'show', side_effect=fake_get_image), mock.patch.object(self.controller.compute_api, 'rebuild', side_effect=exception.OnsetFileLimitExceeded) ) as ( show_mock, rebuild_mock ): self.req.body = jsonutils.dump_as_bytes(self.body) self.assertRaises(webob.exc.HTTPForbidden, self.controller._action_rebuild, self.req, FAKE_UUID, body=self.body)
def test_create_instance_name_all_blank_spaces(self): # proper local hrefs must start with 'http://localhost/v2/' image_uuid = '76fa36fc-c930-4bf3-8c8a-ea2a2420deb6' image_href = 'http://localhost/v2/images/%s' % image_uuid flavor_ref = 'http://localhost/fake/flavors/3' body = { 'server': { 'name': ' ' * 64, 'imageRef': image_href, 'flavorRef': flavor_ref, 'metadata': { 'hello': 'world', 'open': 'stack', }, }, } req = fakes.HTTPRequest.blank('/fake/servers') req.method = 'POST' req.body = jsonutils.dump_as_bytes(body) req.headers["content-type"] = "application/json" self.assertRaises(exception.ValidationError, self.controller.create, req, body=body)
def test_create_instance_with_group_hint(self): ctxt = self.req.environ['nova.context'] test_group = objects.InstanceGroup(ctxt) test_group.project_id = ctxt.project_id test_group.user_id = ctxt.user_id test_group.create() def fake_instance_destroy(context, uuid, constraint): return fakes.stub_instance(1) self.stub_out('nova.db.instance_destroy', fake_instance_destroy) self.body['os:scheduler_hints'] = {'group': test_group.uuid} self.req.body = jsonutils.dump_as_bytes(self.body) server = self.controller.create(self.req, body=self.body).obj['server'] test_group = objects.InstanceGroup.get_by_uuid(ctxt, test_group.uuid) self.assertIn(server['id'], test_group.members)
def test_create_instance_invalid_personality(self, mock_create): codec = 'utf8' content = 'b25zLiINCg0KLVJpY2hhcmQgQ$$%QQmFjaA==' start_position = 19 end_position = 20 msg = 'invalid start byte' mock_create.side_effect = UnicodeDecodeError(codec, content, start_position, end_position, msg) self.body['server']['personality'] = [ { "path": "/etc/banner.txt", "contents": "b25zLiINCg0KLVJpY2hhcmQgQ$$%QQmFjaA==", }, ] self.req.body = jsonutils.dump_as_bytes(self.body) self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, self.req, body=self.body)
def test_create_missing_server(self): # Test create with malformed body. def fake_create(*args, **kwargs): raise test.TestingException("Should not reach the compute API.") self.stubs.Set(compute_api.API, 'create', fake_create) req = fakes.HTTPRequestV21.blank('/fake/servers') req.method = 'POST' req.content_type = 'application/json' body = {'foo': {'a': 'b'}} req.body = jsonutils.dump_as_bytes(body) res = req.get_response(self.app) self.assertEqual(400, res.status_int)
def test_create_server_without_hints(self): def fake_create(*args, **kwargs): self.assertEqual(kwargs['scheduler_hints'], {}) return ([self.fake_instance], '') self.stubs.Set(nova.compute.api.API, 'create', fake_create) req = self._get_request() req.method = 'POST' req.content_type = 'application/json' body = {'server': { 'name': 'server_test', 'imageRef': 'cedef40a-ed67-4d10-800e-17455edce175', 'flavorRef': '1', }} req.body = jsonutils.dump_as_bytes(body) res = req.get_response(self.app) self.assertEqual(202, res.status_int)
def _test_create_server_with_hint(self, hint): def fake_create(*args, **kwargs): self.assertEqual(kwargs['scheduler_hints'], hint) return ([self.fake_instance], '') self.stubs.Set(nova.compute.api.API, 'create', fake_create) req = self._get_request() req.method = 'POST' req.content_type = 'application/json' body = { 'server': { 'name': 'server_test', 'imageRef': 'cedef40a-ed67-4d10-800e-17455edce175', 'flavorRef': '1', }, 'os:scheduler_hints': hint, } req.body = jsonutils.dump_as_bytes(body) res = req.get_response(self.app) self.assertEqual(202, res.status_int)
def test_update_server_adminPass_ignored(self): inst_dict = dict(name='server_test', adminPass='bacon') body = dict(server=inst_dict) def server_update(context, id, params): filtered_dict = { 'display_name': 'server_test', } self.assertEqual(filtered_dict, params) filtered_dict['uuid'] = id return filtered_dict self.stub_out('nova.db.instance_update', server_update) # FIXME (comstud) # self.stub_out('nova.db.instance_get', # return_server_with_attributes(name='server_test')) req = fakes.HTTPRequest.blank('/fake/servers/%s' % FAKE_UUID) req.method = 'PUT' req.content_type = "application/json" req.body = jsonutils.dump_as_bytes(body) res_dict = self.controller.update(req, FAKE_UUID, body) self.assertEqual(FAKE_UUID, res_dict['server']['id']) self.assertEqual('server_test', res_dict['server']['name'])
def test_create_server_image_too_large(self): image_uuid = '76fa36fc-c930-4bf3-8c8a-ea2a2420deb6' # Get the fake image service so we can update the size of the image (image_service, image_id) = glance.get_remote_image_service(context, image_uuid) image = image_service.show(context, image_id) orig_size = image['size'] new_size = str(1000 * (1024 ** 3)) image_service.update(context, image_uuid, {'size': new_size}) self.addCleanup(image_service.update, context, image_uuid, {'size': orig_size}) self.body['server']['flavorRef'] = 2 self.req.body = jsonutils.dump_as_bytes(self.body) with testtools.ExpectedException( webob.exc.HTTPBadRequest, "Flavor's disk is too small for requested image."): self.controller.create(self.req, self.body)
def test_create_instance_invalid_personality(self): def fake_create(*args, **kwargs): codec = 'utf8' content = 'b25zLiINCg0KLVJpY2hhcmQgQ$$%QQmFjaA==' start_position = 19 end_position = 20 msg = 'invalid start byte' raise UnicodeDecodeError(codec, content, start_position, end_position, msg) self.stubs.Set(compute_api.API, 'create', fake_create) self.body['server']['personality'] = [ { "path": "/etc/banner.txt", "contents": "b25zLiINCg0KLVJpY2hhcmQgQ$$%QQmFjaA==", }, ] self.req.body = jsonutils.dump_as_bytes(self.body) self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, self.req, self.body)
def test_create_instance_with_group_hint(self): ctxt = self.req.environ['nova.context'] test_group = objects.InstanceGroup(ctxt) test_group.project_id = ctxt.project_id test_group.user_id = ctxt.user_id test_group.create() def fake_instance_destroy(context, uuid, constraint): return fakes.stub_instance(1) self.stub_out('nova.db.instance_destroy', fake_instance_destroy) self.ext_mgr.extensions = {'OS-SCH-HNT': 'fake', 'os-server-group-quotas': 'fake'} self.body['server']['scheduler_hints'] = {'group': test_group.uuid} self.req.body = jsonutils.dump_as_bytes(self.body) server = self.controller.create(self.req, self.body).obj['server'] test_group = objects.InstanceGroup.get_by_uuid(ctxt, test_group.uuid) self.assertIn(server['id'], test_group.members)
def test_controller_action_extension_early(self): controller = StubActionController(response_body) actions = dict(action='POST') res_ext = base_extensions.ResourceExtension('tweedles', controller, member_actions=actions) ext_controller = StubEarlyExtensionController(extension_body) extension = StubControllerExtension() cont_ext = base_extensions.ControllerExtension(extension, 'tweedles', ext_controller) manager = StubExtensionManager(resource_ext=res_ext, controller_ext=cont_ext) app = compute.APIRouter(manager) request = webob.Request.blank("/fake/tweedles/foo/action") request.method = 'POST' request.headers['Content-Type'] = 'application/json' request.body = jsonutils.dump_as_bytes(dict(fooAction=True)) response = request.get_response(app) self.assertEqual(200, response.status_int) self.assertEqual(extension_body, response.body)
def test_controller_action_extension_late(self): # Need a dict for the body to convert to a ResponseObject controller = StubActionController(dict(foo=response_body)) actions = dict(action='POST') res_ext = base_extensions.ResourceExtension('tweedles', controller, member_actions=actions) ext_controller = StubLateExtensionController(extension_body) extension = StubControllerExtension() cont_ext = base_extensions.ControllerExtension(extension, 'tweedles', ext_controller) manager = StubExtensionManager(resource_ext=res_ext, controller_ext=cont_ext) app = compute.APIRouter(manager) request = webob.Request.blank("/fake/tweedles/foo/action") request.method = 'POST' request.headers['Content-Type'] = 'application/json' request.body = jsonutils.dump_as_bytes(dict(fooAction=True)) response = request.get_response(app) self.assertEqual(200, response.status_int) self.assertEqual(extension_body, response.body)
def test_invalid_metadata_items_on_create(self): self.stub_out('nova.db.instance_metadata_update', return_create_instance_metadata) req = self._get_request() req.method = 'POST' req.headers["content-type"] = "application/json" # test for long key data = {"metadata": {"a" * 260: "value1"}} req.body = jsonutils.dump_as_bytes(data) self.assertRaises(self.validation_ex_large, self.controller.create, req, self.uuid, body=data) # test for long value data = {"metadata": {"key": "v" * 260}} req.body = jsonutils.dump_as_bytes(data) self.assertRaises(self.validation_ex_large, self.controller.create, req, self.uuid, body=data) # test for empty key. data = {"metadata": {"": "value1"}} req.body = jsonutils.dump_as_bytes(data) self.assertRaises(self.validation_ex, self.controller.create, req, self.uuid, body=data)
def test_create(self, get_mocked, update_mocked, quota_mocked): mock_result = copy.deepcopy(get_image_123()) mock_result['properties']['key7'] = 'value7' update_mocked.return_value = mock_result req = fakes.HTTPRequest.blank('/v2/fake/images/123/metadata') req.method = 'POST' body = {"metadata": {"key7": "value7"}} req.body = jsonutils.dump_as_bytes(body) req.headers["content-type"] = "application/json" res = self.controller.create(req, '123', body=body) get_mocked.assert_called_once_with(mock.ANY, '123') expected = copy.deepcopy(get_image_123()) expected['properties'] = { 'key1': 'value1', # existing meta 'key7': 'value7' # new meta } quota_mocked.assert_called_once_with(mock.ANY, expected["properties"]) update_mocked.assert_called_once_with(mock.ANY, '123', expected, data=None, purge_props=True) expected_output = {'metadata': {'key1': 'value1', 'key7': 'value7'}} self.assertEqual(expected_output, res)
def test_update_all(self, get_mocked, update_mocked, quota_mocked): req = fakes.HTTPRequest.blank('/v2/fake/images/123/metadata') req.method = 'PUT' body = {"metadata": {"key9": "value9"}} req.body = jsonutils.dump_as_bytes(body) req.headers["content-type"] = "application/json" res = self.controller.update_all(req, '123', body=body) get_mocked.assert_called_once_with(mock.ANY, '123') expected = copy.deepcopy(get_image_123()) expected['properties'] = { 'key9': 'value9' # replace meta } quota_mocked.assert_called_once_with(mock.ANY, expected["properties"]) update_mocked.assert_called_once_with(mock.ANY, '123', expected, data=None, purge_props=True) expected_output = {'metadata': {'key9': 'value9'}} self.assertEqual(expected_output, res)
def test_update_item(self, _get_mocked, update_mocked, quota_mocked): req = fakes.HTTPRequest.blank('/v2/fake/images/123/metadata/key1') req.method = 'PUT' body = {"meta": {"key1": "zz"}} req.body = jsonutils.dump_as_bytes(body) req.headers["content-type"] = "application/json" res = self.controller.update(req, '123', 'key1', body=body) expected = copy.deepcopy(get_image_123()) expected['properties'] = { 'key1': 'zz' # changed meta } quota_mocked.assert_called_once_with(mock.ANY, expected["properties"]) update_mocked.assert_called_once_with(mock.ANY, '123', expected, data=None, purge_props=True) expected_output = {'meta': {'key1': 'zz'}} self.assertEqual(res, expected_output)
def test_volume_create(self): self.stubs.Set(cinder.API, "create", fakes.stub_volume_create) vol = {"size": 100, "display_name": "Volume Test Name", "display_description": "Volume Test Desc", "availability_zone": "zone1:host1"} body = {"volume": vol} req = fakes.HTTPRequest.blank(self.url_prefix + '/os-volumes') req.method = 'POST' req.body = jsonutils.dump_as_bytes(body) req.headers['content-type'] = 'application/json' resp = req.get_response(self.app) self.assertEqual(200, resp.status_int) resp_dict = jsonutils.loads(resp.body) self.assertIn('volume', resp_dict) self.assertEqual(vol['size'], resp_dict['volume']['size']) self.assertEqual(vol['display_name'], resp_dict['volume']['displayName']) self.assertEqual(vol['display_description'], resp_dict['volume']['displayDescription']) self.assertEqual(vol['availability_zone'], resp_dict['volume']['availabilityZone'])
def test_request_no_enrollment(self, mock_get_image): mock_get_image.return_value = FakeImageService() body = {"metadata": {"ipa_enroll": "False"}, "instance-id": fake.INSTANCE_ID, "project-id": fake.PROJECT_ID, "image-id": fake.IMAGE_ID, "hostname": "test"} expected = {} req = fakes.HTTPRequest.blank('/v1') req.method = 'POST' req.content_type = "application/json" req.body = jsonutils.dump_as_bytes(body) res_dict = self.join_controller.create(req, body) self.assertEqual(expected, res_dict)
def test_valid_request(self, mock_get_domain, mock_get_image, mock_get_instance, mock_conf_parser): mock_get_image.return_value = FakeImageService() mock_get_instance.return_value = fake.fake_instance mock_get_domain.return_value = "test" mock_conf_parser_instance = mock.MagicMock() mock_conf_parser_instance.get = mock.Mock( side_effect=["novajoin", "REALM"]) mock_conf_parser.return_value = mock_conf_parser_instance body = {"metadata": {"ipa_enroll": "True"}, "instance-id": fake.INSTANCE_ID, "project-id": fake.PROJECT_ID, "image-id": fake.IMAGE_ID, "hostname": "test"} req = fakes.HTTPRequest.blank('/v1') req.method = 'POST' req.content_type = "application/json" req.body = jsonutils.dump_as_bytes(body) res_dict = self.join_controller.create(req, body) # There should be no OTP because IPA shouldn't be # configured, but we'll handle both cases. if res_dict.get('ipaotp'): self.assertThat(res_dict.get('ipaotp'), MatchesRegex('^[a-z0-9]{32}')) self.assertEqual(len(res_dict.get('ipaotp', 0)), 32) self.assertEqual(res_dict.get('hostname'), 'test.test') self.assertEqual(res_dict.get('krb_realm'), 'REALM') # Note that on failures this will generate to stdout a Krb5Error # because in all likelihood the keytab cannot be read (and # probably doesn't exist. This can be ignored.
def test_valid_hostclass_request(self, mock_get_domain, mock_get_image, mock_get_instance, mock_get_project_name, mock_conf_parser): mock_get_image.return_value = FakeImageService() mock_get_instance.return_value = fake.fake_instance mock_get_domain.return_value = "test" mock_get_project_name.return_value = "test" mock_conf_parser_instance = mock.MagicMock() mock_conf_parser_instance.get = mock.Mock( side_effect=["novajoin", "REALM"]) mock_conf_parser.return_value = mock_conf_parser_instance body = {"metadata": {"ipa_enroll": "True"}, "instance-id": fake.INSTANCE_ID, "project-id": fake.PROJECT_ID, "image-id": fake.IMAGE_ID, "hostname": "test"} req = fakes.HTTPRequest.blank('/v1') req.method = 'POST' req.content_type = "application/json" req.body = jsonutils.dump_as_bytes(body) res_dict = self.join_controller.create(req, body) # There should be no OTP because IPA shouldn't be # configured, but we'll handle both cases. if res_dict.get('ipaotp'): self.assertThat(res_dict.get('ipaotp'), MatchesRegex('^[a-z0-9]{32}')) self.assertEqual(len(res_dict.get('ipaotp', 0)), 32) self.assertEqual(res_dict.get('hostname'), 'test.test') self.assertEqual(res_dict.get('krb_realm'), 'REALM') # Note that on failures this will generate to stdout a Krb5Error # because in all likelihood the keytab cannot be read (and # probably doesn't exist. This can be ignored.
def default(self, data): return jsonutils.dump_as_bytes(data)
def process_bind_param(self, value, dialect): if value is None: # Save default value according to current type to keep the # interface the consistent. value = self.type() elif not isinstance(value, self.type): raise TypeError("%s supposes to store %s objects, but %s given" % (self.__class__.__name__, self.type.__name__, type(value).__name__)) serialized_value = json.dump_as_bytes(value) return serialized_value
def update_container(self, context, container_uuid, values): # NOTE(yuywz): Update would fail if any other client # write '/containers/$CONTAINER_UUID' in the meanwhile if 'uuid' in values: msg = _("Cannot overwrite UUID for an existing Container.") raise exception.InvalidParameterValue(err=msg) if 'name' in values: self._validate_unique_container_name(context, values['name']) try: target_uuid = self.get_container_by_uuid( context, container_uuid).uuid target = self.client.read('/containers/' + target_uuid) target_value = json.loads(target.value) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.ContainerNotFound(container=container_uuid) except Exception as e: LOG.error('Error occurred while updating container: %s', six.text_type(e)) raise return translate_etcd_result(target, 'container')
def update_zun_service(self, host, binary, values): try: target = self.client.read('/zun_services/' + host + '_' + binary) target_value = json.loads(target.value) values['updated_at'] = datetime.isoformat(timeutils.utcnow()) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.ZunServiceNotFound(host=host, binary=binary) except Exception as e: LOG.error('Error occurred while updating service: %s', six.text_type(e)) raise
def save(self, session=None): if session is None: session = db.api.get_connection() client = session.client path = self.etcd_path(self.uuid) if self.path_already_exist(client, path): raise exception.ResourceExists(name=getattr(self, '__class__')) client.write(path, json.dump_as_bytes(self.as_dict())) return
def save(self, session=None): if session is None: session = db.api.get_connection() client = session.client path = self.etcd_path(self.host + '_' + self.binary) if self.path_already_exist(client, path): raise exception.ZunServiceAlreadyExists(host=self.host, binary=self.binary) client.write(path, json.dump_as_bytes(self.as_dict())) return
def save(self, session=None): if session is None: session = db.api.get_connection() client = session.client path = self.etcd_path(self.uuid) if self.path_already_exist(client, path): raise exception.ComputeNodeAlreadyExists( field='UUID', value=self.uuid) client.write(path, json.dump_as_bytes(self.as_dict())) return
def __init__(self, value): self.value = json.dump_as_bytes(value)
def to_primitive(self, obj, attr, value): return json.dump_as_bytes(value)