我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用mock.ANY。
def test_ensure_loadbalancer(self): cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) expected_resp = mock.sentinel.expected_resp namespace = 'TEST_NAMESPACE' name = 'TEST_NAME' project_id = 'TEST_PROJECT' subnet_id = 'D3FA400A-F543-4B91-9CD3-047AF0CE42D1' ip = '1.2.3.4' # TODO(ivc): handle security groups sg_ids = [] endpoints = {'metadata': {'namespace': namespace, 'name': name}} m_driver._ensure.return_value = expected_resp resp = cls.ensure_loadbalancer(m_driver, endpoints, project_id, subnet_id, ip, sg_ids) m_driver._ensure.assert_called_once_with(mock.ANY, m_driver._create_loadbalancer, m_driver._find_loadbalancer) req = m_driver._ensure.call_args[0][0] self.assertEqual("%s/%s" % (namespace, name), req.name) self.assertEqual(project_id, req.project_id) self.assertEqual(subnet_id, req.subnet_id) self.assertEqual(ip, str(req.ip)) self.assertEqual(expected_resp, resp)
def test_ensure_pool(self): cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) expected_resp = mock.sentinel.expected_resp endpoints = mock.sentinel.endpoints loadbalancer = obj_lbaas.LBaaSLoadBalancer( id='00EE9E11-91C2-41CF-8FD4-7970579E5C4C', project_id='TEST_PROJECT') listener = obj_lbaas.LBaaSListener( id='A57B7771-6050-4CA8-A63C-443493EC98AB', name='TEST_LISTENER_NAME', protocol='TCP') m_driver._ensure_provisioned.return_value = expected_resp resp = cls.ensure_pool(m_driver, endpoints, loadbalancer, listener) m_driver._ensure_provisioned.assert_called_once_with( loadbalancer, mock.ANY, m_driver._create_pool, m_driver._find_pool) pool = m_driver._ensure_provisioned.call_args[0][1] self.assertEqual(listener.name, pool.name) self.assertEqual(loadbalancer.project_id, pool.project_id) self.assertEqual(listener.id, pool.listener_id) self.assertEqual(listener.protocol, pool.protocol) self.assertEqual(expected_resp, resp)
def test_annotate(self, m_patch, m_count): m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} resource_version = "123" ret = {'metadata': {'annotations': annotations, "resourceVersion": resource_version}} data = jsonutils.dumps(ret, sort_keys=True) m_resp = mock.MagicMock() m_resp.ok = True m_resp.json.return_value = ret m_patch.return_value = m_resp self.assertEqual(annotations, self.client.annotate( path, annotations, resource_version=resource_version)) m_patch.assert_called_once_with(self.base_url + path, data=data, headers=mock.ANY, cert=(None, None), verify=False)
def test_execute_clean_step_no_success_log( self, log_mock, run_mock, utils_mock, parse_driver_info_mock): run_mock.side_effect = exception.InstanceDeployFailure('Boom') step = {'priority': 10, 'interface': 'deploy', 'step': 'erase_devices', 'args': {'tags': ['clean']}} di_info = self.node.driver_internal_info di_info['agent_url'] = 'http://127.0.0.1' self.node.driver_internal_info = di_info self.node.save() with task_manager.acquire(self.context, self.node.uuid) as task: self.driver.execute_clean_step(task, step) log_mock.error.assert_called_once_with( mock.ANY, {'node': task.node['uuid'], 'step': 'erase_devices'}) utils_mock.assert_called_once_with(task, 'Boom') self.assertFalse(log_mock.info.called)
def test_send_magic_packets(self, mock_socket): fake_socket = mock.Mock(spec=socket, spec_set=True) mock_socket.return_value = fake_socket() obj_utils.create_test_port(self.context, uuid=uuidutils.generate_uuid(), address='aa:bb:cc:dd:ee:ff', node_id=self.node.id) with task_manager.acquire( self.context, self.node.uuid, shared=True) as task: wol_power._send_magic_packets(task, '255.255.255.255', 9) expected_calls = [ mock.call(), mock.call().setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1), mock.call().sendto(mock.ANY, ('255.255.255.255', 9)), mock.call().sendto(mock.ANY, ('255.255.255.255', 9)), mock.call().close()] fake_socket.assert_has_calls(expected_calls) self.assertEqual(1, mock_socket.call_count)
def test__set_boot_device_order_fail(self, mock_aw, mock_client_pywsman): namespace = resource_uris.CIM_BootConfigSetting device = boot_devices.PXE result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}], namespace) mock_xml = test_utils.mock_wsman_root(result_xml) mock_pywsman = mock_client_pywsman.Client.return_value mock_pywsman.invoke.return_value = mock_xml self.assertRaises(exception.AMTFailure, amt_mgmt._set_boot_device_order, self.node, device) mock_pywsman.invoke.assert_called_once_with( mock.ANY, namespace, 'ChangeBootOrder', mock.ANY) mock_pywsman = mock_client_pywsman.Client.return_value mock_pywsman.invoke.return_value = None self.assertRaises(exception.AMTConnectFailure, amt_mgmt._set_boot_device_order, self.node, device) self.assertTrue(mock_aw.called)
def test__enable_boot_config_fail(self, mock_aw, mock_client_pywsman): namespace = resource_uris.CIM_BootService result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}], namespace) mock_xml = test_utils.mock_wsman_root(result_xml) mock_pywsman = mock_client_pywsman.Client.return_value mock_pywsman.invoke.return_value = mock_xml self.assertRaises(exception.AMTFailure, amt_mgmt._enable_boot_config, self.node) mock_pywsman.invoke.assert_called_once_with( mock.ANY, namespace, 'SetBootConfigRole', mock.ANY) mock_pywsman = mock_client_pywsman.Client.return_value mock_pywsman.invoke.return_value = None self.assertRaises(exception.AMTConnectFailure, amt_mgmt._enable_boot_config, self.node) self.assertTrue(mock_aw.called)
def test__get_nm_address_raw_fail(self, parse_mock, dump_mock, raw_mock, unlink_mock): parse_mock.return_value = ('0x0A', '0x0B') raw_mock.side_effect = exception.IPMIFailure('raw error') with task_manager.acquire(self.context, self.node.uuid, shared=False) as task: self.assertRaises(exception.IPMIFailure, nm_vendor._get_nm_address, task) self.node.refresh() internal_info = self.node.driver_internal_info self.assertEqual(False, internal_info['intel_nm_address']) self.assertEqual(False, internal_info['intel_nm_channel']) parse_mock.assert_called_once_with(self.temp_filename) dump_mock.assert_called_once_with(task, self.temp_filename) unlink_mock.assert_called_once_with(self.temp_filename) raw_mock.assert_called_once_with(task, mock.ANY)
def test_rpc( self, container, entrypoint_tracker, rpc_proxy, wait_for_result, backoff_count ): """ RPC entrypoint supports backoff """ with entrypoint_waiter( container, 'method', callback=wait_for_result ) as result: res = rpc_proxy.service.method("arg") assert res == result.get() == "result" # entrypoint fired backoff_count + 1 times assert entrypoint_tracker.get_results() == ( [None] * backoff_count + ["result"] ) # entrypoint raised `Backoff` for all but the last execution assert entrypoint_tracker.get_exceptions() == ( [(Backoff, ANY, ANY)] * backoff_count + [None] )
def test_events( self, container, entrypoint_tracker, dispatch_event, wait_for_result, backoff_count ): """ Event handler supports backoff """ with entrypoint_waiter( container, 'method', callback=wait_for_result ) as result: dispatch_event("src_service", "event_type", {}) assert result.get() == "result" assert entrypoint_tracker.get_results() == ( [None] * backoff_count + ["result"] ) assert entrypoint_tracker.get_exceptions() == ( [(Backoff, ANY, ANY)] * backoff_count + [None] )
def test_messaging( self, container, entrypoint_tracker, publish_message, exchange, queue, wait_for_result, backoff_count ): """ Message consumption supports backoff """ with entrypoint_waiter( container, 'method', callback=wait_for_result ) as result: publish_message(exchange, "msg", routing_key=queue.routing_key) assert result.get() == "result" assert entrypoint_tracker.get_results() == ( [None] * backoff_count + ["result"] ) assert entrypoint_tracker.get_exceptions() == ( [(Backoff, ANY, ANY)] * backoff_count + [None] )
def test_expiry( self, container, entrypoint_tracker, publish_message, exchange, queue, limited_backoff, wait_for_backoff_expired ): """ Message consumption supports backoff expiry """ with entrypoint_waiter( container, 'method', callback=wait_for_backoff_expired ) as result: publish_message(exchange, "msg", routing_key=queue.routing_key) with pytest.raises(Backoff.Expired) as raised: result.get() assert ( "Backoff aborted after '{}' retries".format(limited_backoff) ) in str(raised.value) assert entrypoint_tracker.get_results() == ( [None] * limited_backoff + [None] ) assert entrypoint_tracker.get_exceptions() == ( [(Backoff, ANY, ANY)] * limited_backoff + [(Backoff.Expired, ANY, ANY)] )
def test_create_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', 'tenant_id': ATTR_NOT_SPECIFIED, } driver._request_backend.return_value = (status_code, res_data) response = driver._create_resource(res_type, context, {res_type: res_data}) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'CREATE')
def test_get_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', } fields = ['id'] driver._request_backend.return_value = (status_code, res_data) response = driver._get_resource(res_type, context, res_data['id'], fields) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'READ')
def test_update_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', } driver._request_backend.return_value = (status_code, res_data) response = driver._update_resource(res_type, context, res_data['id'], {res_type: res_data}) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'UPDATE')
def test_list_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', } fields = ['id'] driver._request_backend.return_value = (status_code, [res_data]) response = driver._list_resource(res_type, context, None, fields) self.assertEqual(response, [res_data]) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'READALL')
def test_add_router_interface(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'router' res_data = { 'id': 'router-123', } res_id = res_data['id'] driver._request_backend.return_value = (status_code, res_data) response = driver.add_router_interface(context, res_id, res_data) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'ADDINTERFACE')
def test_remove_router_interface(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'router' res_data = { 'id': 'router-123', } res_id = res_data['id'] driver._request_backend.return_value = (status_code, res_data) response = driver.remove_router_interface(context, res_id, res_data) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'DELINTERFACE')
def test_bad_protocol_version(self, *args): c = self.make_connection() c._requests = Mock() c.defunct = Mock() # read in a SupportedMessage response header = self.make_header_prefix(SupportedMessage, version=0x7f) options = self.make_options_body() message = self.make_msg(header, options) c._iobuf = BytesIO() c._iobuf.write(message) c.process_io_buffer() # make sure it errored correctly c.defunct.assert_called_once_with(ANY) args, kwargs = c.defunct.call_args self.assertIsInstance(args[0], ProtocolError)
def test_negative_body_length(self, *args): c = self.make_connection() c._requests = Mock() c.defunct = Mock() # read in a SupportedMessage response header = self.make_header_prefix(SupportedMessage) message = header + int32_pack(-13) c._iobuf = BytesIO() c._iobuf.write(message) c.process_io_buffer() # make sure it errored correctly c.defunct.assert_called_once_with(ANY) args, kwargs = c.defunct.call_args self.assertIsInstance(args[0], ProtocolError)
def test_unsupported_cql_version(self, *args): c = self.make_connection() c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])} c.defunct = Mock() c.cql_version = "3.0.3" # read in a SupportedMessage response header = self.make_header_prefix(SupportedMessage) options_buf = BytesIO() write_stringmultimap(options_buf, { 'CQL_VERSION': ['7.8.9'], 'COMPRESSION': [] }) options = options_buf.getvalue() c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options) # make sure it errored correctly c.defunct.assert_called_once_with(ANY) args, kwargs = c.defunct.call_args self.assertIsInstance(args[0], ProtocolError)
def test_handle_topology_change(self): event = { 'change_type': 'NEW_NODE', 'address': ('1.2.3.4', 9000) } self.cluster.scheduler.reset_mock() self.control_connection._handle_topology_change(event) self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4') event = { 'change_type': 'REMOVED_NODE', 'address': ('1.2.3.4', 9000) } self.cluster.scheduler.reset_mock() self.control_connection._handle_topology_change(event) self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.remove_host, None) event = { 'change_type': 'MOVED_NODE', 'address': ('1.2.3.4', 9000) } self.cluster.scheduler.reset_mock() self.control_connection._handle_topology_change(event) self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')
def test_handle_schema_change(self): change_types = [getattr(SchemaChangeType, attr) for attr in vars(SchemaChangeType) if attr[0] != '_'] for change_type in change_types: event = { 'target_type': SchemaTargetType.TABLE, 'change_type': change_type, 'keyspace': 'ks1', 'table': 'table1' } self.cluster.scheduler.reset_mock() self.control_connection._handle_schema_change(event) self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event) self.cluster.scheduler.reset_mock() event['target_type'] = SchemaTargetType.KEYSPACE del event['table'] self.control_connection._handle_schema_change(event) self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
def test_result_message(self): session = self.make_basic_session() session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1', 'ip2'] pool = session._pools.get.return_value pool.is_shutdown = False connection = Mock(spec=Connection) pool.borrow_connection.return_value = (connection, 1) rf = self.make_response_future(session) rf.send_request() rf.session._pools.get.assert_called_once_with('ip1') pool.borrow_connection.assert_called_once_with(timeout=ANY) connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[]) rf._set_result(None, None, None, self.make_mock_response([{'col': 'val'}])) result = rf.result() self.assertEqual(result, [{'col': 'val'}])
def test_update_host(self, mock_host, mock_get_device_type): mock_get_device_type.return_value = "network_devices" record = dict(fake_resources.HOST1.items()) payload = {'name': 'Host_New', 'parent_id': 2} db_data = payload.copy() record.update(payload) mock_host.return_value = record resp = self.put('/v1/hosts/1', data=payload) self.assertEqual(resp.json['name'], db_data['name']) self.assertEqual(resp.json['parent_id'], db_data['parent_id']) self.assertEqual(200, resp.status_code) mock_host.assert_called_once_with(mock.ANY, '1', db_data) mock_get_device_type.assert_called_once() up_link = { "rel": "up", "href": "http://localhost/v1/network-devices/2" } self.assertIn(up_link, resp.json["links"])
def test_get_host_by_ip_address_filter(self, fake_hosts): region_id = 1 ip_address = '10.10.0.1' filters = { 'region_id': 1, 'ip_address': ip_address, 'resolved-values': True, } path_query = '/v1/hosts?region_id={}&ip_address={}'.format( region_id, ip_address ) fake_hosts.return_value = (fake_resources.HOSTS_LIST_R2, {}) resp = self.get(path_query) host_resp = fake_resources.HOSTS_LIST_R2 self.assertEqual(len(resp.json['hosts']), 1) self.assertEqual(resp.json['hosts'][0]["name"], host_resp[0].name) fake_hosts.assert_called_once_with( mock.ANY, filters, {'limit': 30, 'marker': None}, )
def test_projects_put_variables(self, mock_project): proj1 = fake_resources.PROJECT1 proj1_id = str(proj1.id) db_return_value = copy.deepcopy(proj1) db_return_value.variables["a"] = "b" mock_project.return_value = db_return_value payload = {"a": "b"} db_data = payload.copy() resp = self.put( 'v1/projects/{}/variables'.format(proj1_id), data=payload ) self.assertEqual(resp.status_code, 200) mock_project.assert_called_once_with(mock.ANY, "projects", proj1_id, db_data) expected = { "variables": {"key1": "value1", "key2": "value2", "a": "b"}, } self.assertDictEqual(expected, resp.json)
def test_put_network_device(self, fake_device, mock_get_device_type): mock_get_device_type.return_value = "network_devices" payload = {"name": "NetDev_New1", "parent_id": 2} fake_device.return_value = dict(fake_resources.NETWORK_DEVICE1.items(), **payload) resp = self.put('v1/network-devices/1', data=payload) self.assertEqual(resp.status_code, 200) self.assertEqual(resp.json['name'], "NetDev_New1") self.assertEqual(resp.json['parent_id'], 2) fake_device.assert_called_once_with( mock.ANY, '1', {"name": "NetDev_New1", "parent_id": 2} ) mock_get_device_type.assert_called_once() up_link = { "rel": "up", "href": "http://localhost/v1/network-devices/2" } self.assertIn(up_link, resp.json["links"])
def test_get_network_devices_by_ip_address_filter(self, fake_devices): region_id = '1' ip_address = '10.10.0.1' filters = {'region_id': region_id, 'ip_address': ip_address, 'resolved-values': True} path_query = '/v1/network-devices?region_id={}&ip_address={}'.format( region_id, ip_address ) fake_devices.return_value = (fake_resources.NETWORK_DEVICE_LIST1, {}) resp = self.get(path_query) device_resp = fake_resources.NETWORK_DEVICE_LIST1 self.assertEqual(len(resp.json['network_devices']), 1) self.assertEqual(resp.json['network_devices'][0]["ip_address"], device_resp[0].ip_address) fake_devices.assert_called_once_with( mock.ANY, filters, {'limit': 30, 'marker': None}, )
def test_get_netinterfaces_by_ip_address_filter(self, fake_interfaces): device_id = 1 ip_address = '10.10.0.1' filters = {'device_id': device_id, 'ip_address': ip_address} path_query = ( '/v1/network-interfaces?device_id={}&ip_address={}'.format( device_id, ip_address ) ) fake_interfaces.return_value = (fake_resources.NETWORK_INTERFACE_LIST1, {}) resp = self.get(path_query) interface_resp = fake_resources.NETWORK_INTERFACE_LIST1 self.assertEqual(len(resp.json['network_interfaces']), 1) self.assertEqual(resp.json['network_interfaces'][0]["name"], interface_resp[0].name) fake_interfaces.assert_called_once_with( mock.ANY, filters, {'limit': 30, 'marker': None}, )
def test_network_interfaces_update(self, fake_interfaces): record = dict(fake_resources.NETWORK_INTERFACE1.items()) payload = {'name': 'New'} db_data = payload.copy() record.update(payload) fake_interfaces.return_value = record resp = self.put('/v1/network-interfaces/1', data=payload) self.assertEqual(resp.json['name'], db_data['name']) self.assertEqual(200, resp.status_code) self.assertEqual( resp.json['ip_address'], fake_resources.NETWORK_INTERFACE1.ip_address ) fake_interfaces.assert_called_once_with(mock.ANY, '1', db_data)
def test_send_mail(self): """ send_mail should send mail and mark as sent if everything goes well. """ job = self.create_job(state='queued') with mailman_mock() as mailman: job.send_mail('token') mailman.send_draft.assert_called_with( job.message_id_int, mock.ANY) mailman.mark_as_sent.assert_called_with(job.message_id_int, job.sent_mail_rfc_id, 'testmail') mailman.quit.assert_called_with() job = job.key.get() self.assertEquals(job.state, 'done') self.assertTrue(job.sent_mail_rfc_id)
def test_create_without_instance_name(self): self.mock_object( self.fake_driver.resource, 'create_instances', mock.Mock(return_value=mock.Mock)) self.fake_driver.create( 'fake_image_id', 'fake_flavor_id', 'fake_net_id' ) self.fake_driver.resource.create_instances. \ assert_called_once_with( ImageId='fake_image_id', MinCount=1, MaxCount=1, InstanceType='fake_flavor_id', SubnetId='fake_net_id', IamInstanceProfile={ 'Arn': '', 'Name': mock.ANY } )
def test_create_without_instance_name(self): self.mock_object( self.fake_driver.client.servers, 'create', mock.Mock(return_value=mock.Mock)) # NOTE: in fact: mock.Mock is novaclient.v2.servers.Server self.fake_driver.create( 'fake_image_id', 'fake_flavor_id', 'fake_net_id' ) self.fake_driver.client.servers.create. \ assert_called_once_with( name=mock.ANY, image='fake_image_id', flavor='fake_flavor_id', nics=[{'net-id': 'fake_net_id'}] )
def test_get_orders(requests_post, logger): client = build_client() output = client.get_orders() assert output == test_body requests_post.assert_called_with( url=build_url('/open_orders'), json={ 'book': test_book, 'key': test_key, 'nonce': test_nonce, 'signature': mock.ANY } ) logger.debug.assert_called_with( "[client: test_client_id] get user's open orders for btc_usd") with pytest.raises(InvalidOrderBookError): client.get_orders(book='invalid_book')
def test_proxy_info(http_mock, resp_mock): http = Mock() http.request.return_value = (Mock(), Mock()) http_mock.return_value = http Connection.set_proxy_info( 'example.com', 8080, proxy_type=PROXY_TYPE_SOCKS5, ) make_request("GET", "http://httpbin.org/get") http_mock.assert_called_with(timeout=None, ca_certs=ANY, proxy_info=ANY) http.request.assert_called_with("http://httpbin.org/get", "GET", body=None, headers=None) proxy_info = http_mock.call_args[1]['proxy_info'] assert_equal(proxy_info.proxy_host, 'example.com') assert_equal(proxy_info.proxy_port, 8080) assert_equal(proxy_info.proxy_type, PROXY_TYPE_SOCKS5)
def testPassThrough(self, mock_request): mock_response = Mock() mock_response.ok = True, mock_response.content = json.dumps({'key': 'value'}) mock_request.return_value = mock_response assert_equal(self.r.timeout, sentinel.timeout) assert_equal((mock_response, {'key': 'value'}), self.r.request('GET', base_uri)) mock_request.assert_called_once_with( 'GET', base_uri + '.json', headers=ANY, timeout=sentinel.timeout, auth=ANY )
def test_add(self, mock_check_conn, mock_makedirs, mock_configparser, mock_open): config = mock_configparser.return_value params = copy.copy(self.add_params) self.manager.add(**params) expected_calls = [mock.call('VirtualBMC', i, self.add_params[i]) for i in self.add_params] self.assertEqual(sorted(expected_calls), sorted(config.set.call_args_list)) config.add_section.assert_called_once_with('VirtualBMC') config.write.assert_called_once_with(mock.ANY) mock_check_conn.assert_called_once_with( self.add_params['libvirt_uri'], self.add_params['domain_name'], sasl_username=self.add_params['libvirt_sasl_username'], sasl_password=self.add_params['libvirt_sasl_password']) mock_makedirs.assert_called_once_with( os.path.join(_CONFIG_PATH, self.add_params['domain_name'])) mock_configparser.assert_called_once_with()
def test_add_with_port_as_int(self, mock_check_conn, mock_makedirs, mock_configparser, mock_open): config = mock_configparser.return_value params = copy.copy(self.add_params) params['port'] = int(params['port']) self.manager.add(**params) expected_calls = [mock.call('VirtualBMC', i, self.add_params[i]) for i in self.add_params] self.assertEqual(sorted(expected_calls), sorted(config.set.call_args_list)) config.add_section.assert_called_once_with('VirtualBMC') config.write.assert_called_once_with(mock.ANY) mock_check_conn.assert_called_once_with( self.add_params['libvirt_uri'], self.add_params['domain_name'], sasl_username=self.add_params['libvirt_sasl_username'], sasl_password=self.add_params['libvirt_sasl_password']) mock_makedirs.assert_called_once_with( os.path.join(_CONFIG_PATH, self.add_params['domain_name'])) mock_configparser.assert_called_once_with()
def test_get_all_capsules(self, mock_container_get_by_uuid, mock_capsule_list, mock_container_show): test_container = utils.get_test_container() test_container_obj = objects.Container(self.context, **test_container) mock_container_get_by_uuid.return_value = test_container_obj test_capsule = utils.get_test_capsule() test_capsule_obj = objects.Capsule(self.context, **test_capsule) mock_capsule_list.return_value = [test_capsule_obj] mock_container_show.return_value = test_container_obj response = self.app.get('/capsules/') mock_capsule_list.assert_called_once_with(mock.ANY, 1000, None, 'id', 'asc', filters=None) context = mock_capsule_list.call_args[0][0] self.assertIs(False, context.all_tenants) self.assertEqual(200, response.status_int) actual_capsules = response.json['capsules'] self.assertEqual(1, len(actual_capsules)) self.assertEqual(test_capsule['uuid'], actual_capsules[0].get('uuid'))
def test_get_all_hosts(self, mock_host_list, mock_policy): mock_policy.return_value = True test_host = utils.get_test_compute_node() numat = numa.NUMATopology._from_dict(test_host['numa_topology']) test_host['numa_topology'] = numat hosts = [objects.ComputeNode(self.context, **test_host)] mock_host_list.return_value = hosts response = self.get('/v1/hosts') mock_host_list.assert_called_once_with(mock.ANY, 1000, None, 'hostname', 'asc', filters=None) self.assertEqual(200, response.status_int) actual_hosts = response.json['hosts'] self.assertEqual(1, len(actual_hosts)) self.assertEqual(test_host['uuid'], actual_hosts[0].get('uuid'))
def test_get_all_containers(self, mock_container_list, mock_container_show): test_container = utils.get_test_container() containers = [objects.Container(self.context, **test_container)] mock_container_list.return_value = containers mock_container_show.return_value = containers[0] response = self.get('/v1/containers/') mock_container_list.assert_called_once_with(mock.ANY, 1000, None, 'id', 'asc', filters=None) context = mock_container_list.call_args[0][0] self.assertIs(False, context.all_tenants) self.assertEqual(200, response.status_int) actual_containers = response.json['containers'] self.assertEqual(1, len(actual_containers)) self.assertEqual(test_container['uuid'], actual_containers[0].get('uuid'))
def test_get_all_containers_all_tenants(self, mock_container_list, mock_container_show, mock_policy): mock_policy.return_value = True test_container = utils.get_test_container() containers = [objects.Container(self.context, **test_container)] mock_container_list.return_value = containers mock_container_show.return_value = containers[0] response = self.get('/v1/containers/?all_tenants=1') mock_container_list.assert_called_once_with(mock.ANY, 1000, None, 'id', 'asc', filters=None) context = mock_container_list.call_args[0][0] self.assertIs(True, context.all_tenants) self.assertEqual(200, response.status_int) actual_containers = response.json['containers'] self.assertEqual(1, len(actual_containers)) self.assertEqual(test_container['uuid'], actual_containers[0].get('uuid'))
def test_get_one_by_uuid_all_tenants(self, mock_container_get_by_uuid, mock_container_show, mock_policy): mock_policy.return_value = True test_container = utils.get_test_container() test_container_obj = objects.Container(self.context, **test_container) mock_container_get_by_uuid.return_value = test_container_obj mock_container_show.return_value = test_container_obj response = self.get('/v1/containers/%s/?all_tenants=1' % test_container['uuid']) mock_container_get_by_uuid.assert_called_once_with( mock.ANY, test_container['uuid']) context = mock_container_get_by_uuid.call_args[0][0] self.assertIs(True, context.all_tenants) self.assertEqual(200, response.status_int) self.assertEqual(test_container['uuid'], response.json['uuid'])
def _action_test(self, container, action, ident_field, mock_container_action, status_code, query_param=''): test_container_obj = objects.Container(self.context, **container) ident = container.get(ident_field) get_by_ident_loc = 'zun.objects.Container.get_by_%s' % ident_field with patch(get_by_ident_loc) as mock_get_by_indent: mock_get_by_indent.return_value = test_container_obj response = self.post('/v1/containers/%s/%s/?%s' % (ident, action, query_param)) self.assertEqual(status_code, response.status_int) # Only PUT should work, others like GET should fail self.assertRaises(AppError, self.get, ('/v1/containers/%s/%s/' % (ident, action))) if query_param: value = query_param.split('=')[1] mock_container_action.assert_called_once_with( mock.ANY, test_container_obj, value) else: mock_container_action.assert_called_once_with( mock.ANY, test_container_obj)
def test_delete_container_by_uuid_all_tenants(self, mock_get_by_uuid, mock_container_delete, mock_validate, mock_policy): mock_policy.return_value = True test_container = utils.get_test_container() test_container_obj = objects.Container(self.context, **test_container) mock_get_by_uuid.return_value = test_container_obj container_uuid = test_container.get('uuid') response = self.delete('/v1/containers/%s/?all_tenants=1' % container_uuid) self.assertEqual(204, response.status_int) mock_container_delete.assert_called_once_with( mock.ANY, test_container_obj, False) context = mock_container_delete.call_args[0][0] self.assertIs(True, context.all_tenants)
def test_kill_container_by_uuid(self, mock_get_by_uuid, mock_container_kill, mock_validate): test_container_obj = objects.Container(self.context, **utils.get_test_container()) mock_container_kill.return_value = test_container_obj test_container = utils.get_test_container() test_container_obj = objects.Container(self.context, **test_container) mock_get_by_uuid.return_value = test_container_obj container_uuid = test_container.get('uuid') url = '/v1/containers/%s/%s/' % (container_uuid, 'kill') cmd = {'signal': '9'} response = self.post(url, cmd) self.assertEqual(202, response.status_int) mock_container_kill.assert_called_once_with( mock.ANY, test_container_obj, cmd['signal'])
def test_resize_container_by_uuid(self, mock_get_by_uuid, mock_container_resize, mock_validate): test_container_obj = objects.Container(self.context, **utils.get_test_container()) mock_container_resize.return_value = test_container_obj test_container = utils.get_test_container() test_container_obj = objects.Container(self.context, **test_container) mock_get_by_uuid.return_value = test_container_obj container_name = test_container.get('name') url = '/v1/containers/%s/%s/' % (container_name, 'resize') cmd = {'h': '100', 'w': '100'} response = self.post(url, cmd) self.assertEqual(200, response.status_int) mock_container_resize.assert_called_once_with( mock.ANY, test_container_obj, cmd['h'], cmd['w'])
def test_get_archive_by_uuid(self, mock_get_by_uuid, container_get_archive, mock_validate): container_get_archive.return_value = ("", "") test_container = utils.get_test_container() test_container_obj = objects.Container(self.context, **test_container) mock_get_by_uuid.return_value = test_container_obj container_uuid = test_container.get('uuid') url = '/v1/containers/%s/%s/' % (container_uuid, 'get_archive') cmd = {'path': '/home/1.txt'} response = self.get(url, cmd) self.assertEqual(200, response.status_int) container_get_archive.assert_called_once_with( mock.ANY, test_container_obj, cmd['path'])