我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.call()。
def test_load_ok(self): in_config = json.dumps({'command': '/bin/true', 'config_files': {}}) mo = mock.mock_open(read_data=in_config) with mock.patch.object(set_configs, 'open', mo): config = set_configs.load_config() set_configs.copy_config(config) self.assertEqual([ mock.call('/var/lib/kolla/config_files/config.json'), mock.call().__enter__(), mock.call().read(), mock.call().__exit__(None, None, None), mock.call('/run_command', 'w+'), mock.call().__enter__(), mock.call().write(u'/bin/true'), mock.call().__exit__(None, None, None)], mo.mock_calls)
def test_create_ovs_vif_port(self): calls = [ mock.call('ovs-vsctl', '--', '--if-exists', 'del-port', 'fake-dev', '--', 'add-port', 'fake-bridge', 'fake-dev', '--', 'set', 'Interface', 'fake-dev', 'external-ids:iface-id=fake-iface-id', 'external-ids:iface-status=active', 'external-ids:attached-mac=fake-mac', 'external-ids:vm-uuid=fake-instance-uuid', run_as_root=True)] with mock.patch.object(utils, 'execute', return_value=('', '')) as ex: linux_net.create_ovs_vif_port('fake-bridge', 'fake-dev', 'fake-iface-id', 'fake-mac', 'fake-instance-uuid') ex.assert_has_calls(calls)
def test_request(self): cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) loadbalancer = mock.sentinel.loadbalancer obj = mock.sentinel.obj create = mock.sentinel.create find = mock.sentinel.find expected_result = mock.sentinel.expected_result timer = [mock.sentinel.t0, mock.sentinel.t1] m_driver._provisioning_timer.return_value = timer m_driver._ensure.side_effect = [n_exc.StateInvalidClient, expected_result] ret = cls._ensure_provisioned(m_driver, loadbalancer, obj, create, find) m_driver._wait_for_provisioning.assert_has_calls( [mock.call(loadbalancer, t) for t in timer]) m_driver._ensure.assert_has_calls( [mock.call(obj, create, find) for _ in timer]) self.assertEqual(expected_result, ret)
def test_ensure_not_ready(self): cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) loadbalancer = mock.sentinel.loadbalancer obj = mock.sentinel.obj create = mock.sentinel.create find = mock.sentinel.find timer = [mock.sentinel.t0, mock.sentinel.t1] m_driver._provisioning_timer.return_value = timer m_driver._ensure.return_value = None self.assertRaises(k_exc.ResourceNotReady, cls._ensure_provisioned, m_driver, loadbalancer, obj, create, find) m_driver._wait_for_provisioning.assert_has_calls( [mock.call(loadbalancer, t) for t in timer]) m_driver._ensure.assert_has_calls( [mock.call(obj, create, find) for _ in timer])
def test_register(self, m_init, m_wrap_consumer): consumes = {mock.sentinel.key_fn1: mock.sentinel.key1, mock.sentinel.key_fn2: mock.sentinel.key2, mock.sentinel.key_fn3: mock.sentinel.key3} m_dispatcher = mock.Mock() m_consumer = mock.Mock() m_consumer.consumes = consumes m_wrap_consumer.return_value = mock.sentinel.handler m_init.return_value = None pipeline = _TestEventPipeline() pipeline._dispatcher = m_dispatcher pipeline.register(m_consumer) m_wrap_consumer.assert_called_once_with(m_consumer) m_dispatcher.register.assert_has_calls([ mock.call(key_fn, key, mock.sentinel.handler) for key_fn, key in consumes.items()], any_order=True)
def test_call_retry(self, m_sleep, m_count): attempts = 3 timeout = 10 deadline = self.now + timeout failures = [_EX1()] * (attempts - 1) event = mock.sentinel.event m_handler = mock.Mock() m_handler.side_effect = failures + [None] m_sleep.return_value = 1 m_count.return_value = list(range(1, 5)) retry = h_retry.Retry(m_handler, timeout=timeout, exceptions=_EX1) retry(event) m_handler.assert_has_calls([mock.call(event)] * attempts) m_sleep.assert_has_calls([ mock.call(deadline, i + 1, failures[i]) for i in range(len(failures))])
def test_send_magic_packets(self, mock_socket): fake_socket = mock.Mock(spec=socket, spec_set=True) mock_socket.return_value = fake_socket() obj_utils.create_test_port(self.context, uuid=uuidutils.generate_uuid(), address='aa:bb:cc:dd:ee:ff', node_id=self.node.id) with task_manager.acquire( self.context, self.node.uuid, shared=True) as task: wol_power._send_magic_packets(task, '255.255.255.255', 9) expected_calls = [ mock.call(), mock.call().setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1), mock.call().sendto(mock.ANY, ('255.255.255.255', 9)), mock.call().sendto(mock.ANY, ('255.255.255.255', 9)), mock.call().close()] fake_socket.assert_has_calls(expected_calls) self.assertEqual(1, mock_socket.call_count)
def test_get_swift_hash_env(self, mock_config, mock_service_name): mock_config.return_value = None mock_service_name.return_value = "testsvc" tmpfile = tempfile.mktemp() swift_context.SWIFT_HASH_FILE = tmpfile with mock.patch('lib.swift_context.os.environ.get') as mock_env_get: mock_env_get.return_value = str(uuid.uuid4()) hash_ = swift_context.get_swift_hash() mock_env_get.assert_has_calls([ mock.call('JUJU_MODEL_UUID'), mock.call('JUJU_ENV_UUID', mock_env_get.return_value) ]) with open(tmpfile, 'r') as fd: self.assertEqual(hash_, fd.read()) self.assertTrue(mock_config.called)
def test_failure(self): """Ensure that action_fail is called on failure.""" self.config.return_value = "swauth" self.action_get.return_value = "test" self.determine_api_port.return_value = 8070 self.CalledProcessError = ValueError self.check_call.side_effect = subprocess.CalledProcessError( 0, "hi", "no") actions.add_user.add_user() self.leader_get.assert_called_with("swauth-admin-key") calls = [call("account"), call("username"), call("password")] self.action_get.assert_has_calls(calls) self.action_set.assert_not_called() self.action_fail.assert_called_once_with( 'Adding user test failed with: "Command \'hi\' returned non-zero ' 'exit status 0"')
def test_final_action( self, refresh_batch, temp_name, write_session, mock_execute, database_name ): refresh_batch.final_action() calls = [ mock.call(write_session, 'USE {0}'.format(database_name)), mock.call( write_session, 'DROP TABLE IF EXISTS {0}'.format(temp_name), ), ] mock_execute.assert_has_calls(calls)
def test_create_table_from_src_table( self, refresh_batch, fake_original_table, fake_new_table, show_table_query, write_session ): with mock.patch.object( refresh_batch, '_execute_query', autospec=True ) as mock_execute: mock_execute.return_value.fetchone.return_value = [ 'test_db', fake_original_table ] refresh_batch.create_table_from_src_table(write_session) calls = [ mock.call(write_session, show_table_query), mock.call(write_session, fake_new_table) ] mock_execute.assert_has_calls(calls, any_order=True)
def test_bootstrap_files_calls_register_file_for_each_file( self, containers ): file_paths = {'a.test', 'b.test'} bootstrapper = FileBootstrapperBase( schema_ref=None, file_paths=file_paths, override_metadata=True, file_extension='test' ) bootstrapper.register_file = mock.Mock(return_value=None) bootstrapper.bootstrap_schema_result = mock.Mock() bootstrapper.bootstrap_files() assert bootstrapper.register_file.mock_calls == [ mock.call(file_path) for file_path in file_paths ] assert bootstrapper.bootstrap_schema_result.mock_calls == [ mock.call(None) for _ in file_paths ]
def test_run_converts_existing_if_overwrite_true( self, sql_file_path, avsc_file_path, globs, mock_get_file_paths_from_glob_patterns, mock_os_path_exists, mock_batch ): mock_batch.options.overwrite = True mock_batch.run() assert mock_get_file_paths_from_glob_patterns.mock_calls == [ mock.call(glob_patterns=globs) ] assert mock_os_path_exists.mock_calls == [mock.call(avsc_file_path)] assert mock_batch.convert_sql_to_avsc.mock_calls == [ mock.call( avsc_file_path=avsc_file_path, sql_file_path=sql_file_path ) ]
def test_client_manager_setup_with_multiple_bot_tokens(mocked_slack_client): config = { 'SLACK': { 'BOTS': { 'spam': 'abc-123', 'ham': 'def-456', } } } client_manager = rtm.SlackRTMClientManager() client_manager.container = Mock(config=config) client_manager.setup() assert 'spam' in client_manager.clients assert 'ham' in client_manager.clients assert client_manager.clients['spam'] == mocked_slack_client.return_value assert client_manager.clients['ham'] == mocked_slack_client.return_value assert call('abc-123') in mocked_slack_client.call_args_list assert call('def-456') in mocked_slack_client.call_args_list
def test_handle_event_by_type(self, events, service_runner, tracker): class Service: name = 'sample' @rtm.handle_event('presence_change') def handle_event(self, event): tracker.handle_event(event) service_runner(Service, events) assert ( tracker.handle_event.call_args_list == [ call(event) for event in events if event.get('type') == 'presence_change' ])
def test_handle_any_message(self, events, service_runner, tracker): class Service: name = 'sample' @rtm.handle_message def handle_event(self, event, message): tracker.handle_event(event, message) service_runner(Service, events) assert ( tracker.handle_event.call_args_list == [ call(event, event.get('text')) for event in events if event.get('type') == 'message' ])
def test_replies_on_handle_message(events, service_runner): class Service: name = 'sample' @rtm.handle_message def handle_message(self, event, message): return 'sure, {}'.format(message) reply_calls = service_runner(Service, events) assert reply_calls == [ call('D11', 'sure, spam ham'), call('D11', 'sure, ham spam'), call('D11', 'sure, spam egg'), ]
def test_make_without_build_container_tag_with_context(self, skipper_runner_run_mock): global_params = self.global_params[:-2] makefile = 'Makefile' target = 'all' make_params = ['-f', makefile, target] self._invoke_cli( defaults=config.load_defaults(), global_params=global_params, subcmd='make', subcmd_params=make_params ) expected_commands = [ mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image', SKIPPER_CONF_CONTAINER_CONTEXT]), mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[], interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 1] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock result = self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) self.assertEqual(result.exit_code, 1) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0, 1] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock result = self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) self.assertEqual(result.exit_code, 0) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_to_namespace(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['--namespace', 'my_namespace', 'my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_namespace/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_namespace/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_namespace/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( defaults=config.load_defaults(), subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_make_without_build_container_tag(self, skipper_runner_run_mock): global_params = self.global_params[:-2] makefile = 'Makefile' target = 'all' make_params = ['-f', makefile, target] self._invoke_cli( global_params=global_params, subcmd='make', subcmd_params=make_params ) expected_commands = [ mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image', '.']), mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[], interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_resume_compute_exception_wait_slave_available(self, mock_sleep): side_effect_xenapi_failure = FakeXenAPIException side_effect_plugin_error = [self.pluginlib.PluginError( "Wait for the slave to become available"), None] self.mock_patch_object(self.session.xenapi.VM, 'start') self.session.xenapi.VM.start.side_effect = \ side_effect_xenapi_failure self.host._run_command.side_effect = side_effect_plugin_error self.host.XenAPI.Failure = FakeXenAPIException expected = [call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid']), call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid'])] self.host._resume_compute(self.session, 'fake_compute_ref', 'fake_compute_uuid') self.session.xenapi.VM.start.assert_called_with( 'fake_compute_ref', False, True) self.assertEqual(expected, self.host._run_command.call_args_list) mock_sleep.assert_called_once()
def test_power_action_input_cmd_result_not_empty(self): side_effects = [None, None, 'not_empty'] temp_arg_dict = {'host_uuid': 'fake_host_uuid'} self.host._run_command.side_effect = side_effects cmds = {"reboot": "host-reboot", "startup": "host-power-on", "shutdown": "host-shutdown"} fake_action = 'reboot' # 'statup' and 'shutdown' should be same expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s" % 'fake_host_uuid']), call(["xe", "vm-shutdown", "--multiple", "resident-on=%s" % 'fake_host_uuid']), call(["xe", cmds[fake_action], "uuid=%s" % 'fake_host_uuid'])] self.assertRaises(self.pluginlib.PluginError, self.host._power_action, fake_action, temp_arg_dict) self.assertEqual(self.host._run_command.call_args_list, expected_cmd_arg_list)
def test_power_action(self): temp_arg_dict = {'host_uuid': 'fake_host_uuid'} self.host._run_command.return_value = None cmds = {"reboot": "host-reboot", "startup": "host-power-on", "shutdown": "host-shutdown"} fake_action = 'reboot' # 'statup' and 'shutdown' should be same expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s" % 'fake_host_uuid']), call(["xe", "vm-shutdown", "--multiple", "resident-on=%s" % 'fake_host_uuid']), call(["xe", cmds[fake_action], "uuid=%s" % 'fake_host_uuid'])] expected_result = {"power_action": fake_action} action_result = self.host._power_action(fake_action, temp_arg_dict) self.assertEqual(self.host._run_command.call_args_list, expected_cmd_arg_list) self.assertEqual(action_result, expected_result)
def test_ovs_add_patch_port(self): brige_name = 'fake_brige_name' port_name = 'fake_port_name' peer_port_name = 'fake_peer_port_name' side_effects = [brige_name, port_name, peer_port_name] self.mock_patch_object(self.pluginlib, 'exists') self.pluginlib.exists.side_effect = side_effects expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port', port_name, '--', 'add-port', brige_name, 'fake_port_name', '--', 'set', 'interface', 'fake_port_name', 'type=patch', 'options:peer=%s' % peer_port_name] expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'), call('fake_args', 'port_name'), call('fake_args', 'peer_port_name')] self.host._ovs_add_patch_port('fake_args') self.host._run_command.assert_called_with(expected_cmd_args) self.assertEqual(self.pluginlib.exists.call_args_list, expected_pluginlib_arg_list)
def test_ovs_del_port(self): bridge_name = 'fake_brige_name' port_name = 'fake_port_name' side_effects = [bridge_name, port_name] self.mock_patch_object(self.pluginlib, 'exists') self.pluginlib.exists.side_effect = side_effects expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port', bridge_name, port_name] expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'), call('fake_args', 'port_name')] self.host._ovs_del_port('fake_args') self.host._run_command.assert_called_with(expected_cmd_args) self.assertEqual(self.pluginlib.exists.call_args_list, expected_pluginlib_arg_list)
def test_ovs_set_if_external_id(self): interface = 'fake_interface' extneral_id = 'fake_extneral_id' value = 'fake_value' side_effects = [interface, extneral_id, value] self.mock_patch_object(self.pluginlib, 'exists') self.pluginlib.exists.side_effect = side_effects expected_cmd_args = ['ovs-vsctl', 'set', 'Interface', interface, 'external-ids:%s=%s' % (extneral_id, value)] expected_pluginlib_arg_list = [call('fake_args', 'interface'), call('fake_args', 'extneral_id'), call('fake_args', 'value')] self.host._ovs_set_if_external_id('fake_args') self.host._run_command.assert_called_with(expected_cmd_args) self.assertEqual(self.pluginlib.exists.call_args_list, expected_pluginlib_arg_list)
def test_ip_link_add_veth_pair(self): dev1_name = 'fake_brige_name' dev2_name = 'fake_port_name' side_effects = [dev1_name, dev2_name] self.mock_patch_object(self.pluginlib, 'exists') self.pluginlib.exists.side_effect = side_effects expected_cmd_args = ['ip', 'link', 'add', dev1_name, 'type', 'veth', 'peer', 'name', dev2_name] expected_pluginlib_arg_list = [call('fake_args', 'dev1_name'), call('fake_args', 'dev2_name')] self.host._ip_link_add_veth_pair('fake_args') self.host._run_command.assert_called_with(expected_cmd_args) self.assertEqual(self.pluginlib.exists.call_args_list, expected_pluginlib_arg_list)
def test_ip_link_set_promisc(self): device_name = 'fake_device_name' option = 'fake_option' side_effects = [device_name, option] self.mock_patch_object(self.pluginlib, 'exists') self.pluginlib.exists.side_effect = side_effects expected_cmd_args = ['ip', 'link', 'set', device_name, 'promisc', option] expected_pluginlib_arg_list = [call('fake_args', 'device_name'), call('fake_args', 'option')] self.host._ip_link_set_promisc('fake_args') self.host._run_command.assert_called_with(expected_cmd_args) self.assertEqual(self.pluginlib.exists.call_args_list, expected_pluginlib_arg_list)
def test_no_req_ids(self, *args): in_flight = 3 get_holders = self.make_get_holders(1) max_connection = Mock(spec=Connection, host='localhost', lock=Lock(), max_request_id=in_flight - 1, in_flight=in_flight, is_idle=True, is_defunct=False, is_closed=False) holder = get_holders.return_value[0] holder.get_connections.return_value.append(max_connection) self.run_heartbeat(get_holders) holder.get_connections.assert_has_calls([call()] * get_holders.call_count) self.assertEqual(max_connection.in_flight, in_flight) self.assertEqual(max_connection.send_msg.call_count, 0) self.assertEqual(max_connection.send_msg.call_count, 0) max_connection.defunct.assert_has_calls([call(ANY)] * get_holders.call_count) holder.return_connection.assert_has_calls( [call(max_connection)] * get_holders.call_count)
def test_scheduler_persistent(self): # TODO: Improve this test to avoid the need to check for log messages. self.spider.log = mock.Mock(spec=self.spider.log) self.scheduler.persist = True self.scheduler.open(self.spider) self.assertEqual(self.spider.log.call_count, 0) self.scheduler.enqueue_request(Request('http://example.com/page1')) self.scheduler.enqueue_request(Request('http://example.com/page2')) self.assertTrue(self.scheduler.has_pending_requests()) self.scheduler.close('finish') self.scheduler.open(self.spider) self.spider.log.assert_has_calls([ mock.call("Resuming crawl (2 requests scheduled)"), ]) self.assertEqual(len(self.scheduler), 2) self.scheduler.persist = False self.scheduler.close('finish') self.assertEqual(len(self.scheduler), 0)
def test_register_configs_apache(self, resource_map, exists, renderer): exists.return_value = False self.os_release.return_value = 'havana' fake_renderer = MagicMock() fake_renderer.register = MagicMock() renderer.return_value = fake_renderer resource_map.return_value = self.rsc_map utils.register_configs() renderer.assert_called_with( openstack_release='havana', templates_dir='templates/') ex_reg = [ call('/etc/keystone/keystone.conf', [self.ctxt]), call( '/etc/apache2/sites-available/openstack_https_frontend', [self.ctxt]), call( '/etc/apache2/sites-available/openstack_https_frontend.conf', [self.ctxt]), ] self.assertEqual(fake_renderer.register.call_args_list, ex_reg)
def test_create_user_credentials_no_roles(self, mock_create_user, mock_create_role, mock_grant_role, mock_user_exists, get_callback, set_callback): mock_user_exists.return_value = False get_callback.return_value = 'passA' utils.create_user_credentials('userA', get_callback, set_callback, tenant='tenantA') mock_create_user.assert_has_calls([call('userA', 'passA', domain=None, tenant='tenantA')]) mock_create_role.assert_has_calls([]) mock_grant_role.assert_has_calls([])
def test_create_user_credentials(self, mock_create_user, mock_create_role, mock_grant_role, mock_user_exists, get_callback, set_callback): mock_user_exists.return_value = False get_callback.return_value = 'passA' utils.create_user_credentials('userA', get_callback, set_callback, tenant='tenantA', grants=['roleA'], new_roles=['roleB']) mock_create_user.assert_has_calls([call('userA', 'passA', tenant='tenantA', domain=None)]) mock_create_role.assert_has_calls([call('roleB', user='userA', tenant='tenantA', domain=None)]) mock_grant_role.assert_has_calls([call('userA', 'roleA', tenant='tenantA', user_domain=None, project_domain=None)])
def test_git_pre_install(self, adduser, add_group, add_user_to_group, write_file, mkdir): utils.git_pre_install() adduser.assert_called_with('keystone', shell='/bin/bash', system_user=True, home_dir='/var/lib/keystone') add_group.assert_called_with('keystone', system_group=True) add_user_to_group.assert_called_with('keystone', 'keystone') expected = [ call('/var/lib/keystone', owner='keystone', group='keystone', perms=0755, force=False), call('/var/lib/keystone/cache', owner='keystone', group='keystone', perms=0755, force=False), call('/var/log/keystone', owner='keystone', group='keystone', perms=0755, force=False), ] self.assertEqual(mkdir.call_args_list, expected) write_file.assert_called_with('/var/log/keystone/keystone.log', '', owner='keystone', group='keystone', perms=0600)
def test_ports_build(self, mock_iface_updown): expected_port_build = self.read_config_xml('irf_port_build') expected_activate = self.read_action_xml('irf_port_build_activate') expected_call_list = [ [expected_port_build, 'edit_config'], ['startup.cfg', 'save'], [expected_activate, 'action'] ] down_ifaces = ['FortyGigE1/0/2', 'FortyGigE1/0/1', 'FortyGigE1/0/3', 'FortyGigE1/0/4'] up_ifaces = ['FortyGigE1/0/1', 'FortyGigE1/0/2', 'FortyGigE1/0/3', 'FortyGigE1/0/4'] self.irf_port.build('1', [], [], ['FortyGigE1/0/1', 'FortyGigE1/0/2'], ['FortyGigE1/0/3', 'FortyGigE1/0/4']) self.assert_stage_requests_multiple(expected_call_list) mock_iface_updown.assert_has_calls([mock.call(down_ifaces, 'down'), mock.call(up_ifaces, 'up')])
def test_ports_build_remove(self, mock_iface_updown): expected_port_build = self.read_config_xml('irf_port_build_remove') expected_activate = self.read_action_xml('irf_port_build_activate') expected_call_list = [ [expected_port_build, 'edit_config'], ['startup.cfg', 'save'], [expected_activate, 'action'] ] down_ifaces = ['FortyGigE1/0/2', 'FortyGigE1/0/1', 'FortyGigE1/0/3', 'FortyGigE1/0/4'] up_ifaces = [] self.irf_port.build('1', ['FortyGigE1/0/1', 'FortyGigE1/0/2'], ['FortyGigE1/0/3', 'FortyGigE1/0/4'], [], []) self.assert_stage_requests_multiple(expected_call_list) mock_iface_updown.assert_has_calls([mock.call(down_ifaces, 'down'), mock.call(up_ifaces, 'up')])
def test_init(self, mock_ping): ping = Ping(self.device, TARGET) self.assertEqual(ping.vrf, '') self.assertEqual(ping.host, TARGET) self.assertEqual(ping.v6, False) self.assertEqual(ping.detail, False) ping6 = Ping(self.device, TARGET, v6=True) self.assertEqual(ping6.vrf, '') self.assertEqual(ping6.host, TARGET) self.assertEqual(ping6.v6, True) self.assertEqual(ping6.detail, False) mock_ping.assert_has_calls([mock.call(), mock.call()])
def test_migrate_nova_databases_ocata(self, cellv2_ready, get_cell_uuid, check_output): "Migrate database with nova-manage in a clustered env" get_cell_uuid.return_value = 'c83121db-f1c7-464a-b657-38c28fac84c6' self.relation_ids.return_value = ['cluster:1'] self.os_release.return_value = 'ocata' utils.migrate_nova_databases() check_output.assert_has_calls([ call(['nova-manage', 'api_db', 'sync']), call(['nova-manage', 'cell_v2', 'map_cell0']), call(['nova-manage', 'cell_v2', 'create_cell', '--name', 'cell1', '--verbose']), call(['nova-manage', 'db', 'sync']), call(['nova-manage', 'db', 'online_data_migrations']), call(['nova-manage', 'cell_v2', 'discover_hosts', '--cell_uuid', 'c83121db-f1c7-464a-b657-38c28fac84c6', '--verbose']), ]) self.peer_store.assert_called_with('dbsync_state', 'complete') self.assertTrue(self.enable_services.called) self.cmd_all_services.assert_called_with('start')
def test_upgrade_icehouse_juno(self, determine_packages, migrate_nova_databases, get_step_upgrade_source): "Simulate a call to do_openstack_upgrade() for icehouse->juno" self.test_config.set('openstack-origin', 'cloud:trusty-juno') get_step_upgrade_source.return_value = None self.os_release.return_value = 'icehouse' self.get_os_codename_install_source.return_value = 'juno' self.is_leader.return_value = True self.relation_ids.return_value = [] utils.do_openstack_upgrade(self.register_configs()) self.apt_update.assert_called_with(fatal=True) self.apt_upgrade.assert_called_with(options=DPKG_OPTS, fatal=True, dist=True) self.apt_install.assert_called_with(determine_packages(), fatal=True) self.register_configs.assert_called_with(release='juno') self.assertTrue(migrate_nova_databases.call_count, 1)
def test_upgrade_kilo_liberty(self, determine_packages, migrate_nova_databases, migrate_nova_flavors, get_step_upgrade_source): "Simulate a call to do_openstack_upgrade() for kilo->liberty" self.test_config.set('openstack-origin', 'cloud:trusty-liberty') get_step_upgrade_source.return_value = None self.os_release.return_value = 'kilo' self.get_os_codename_install_source.return_value = 'liberty' self.is_leader.return_value = True self.relation_ids.return_value = [] utils.do_openstack_upgrade(self.register_configs()) self.apt_update.assert_called_with(fatal=True) self.apt_upgrade.assert_called_with(options=DPKG_OPTS, fatal=True, dist=True) self.apt_install.assert_called_with(determine_packages(), fatal=True) self.register_configs.assert_called_with(release='liberty') self.assertTrue(migrate_nova_flavors.call_count, 1) self.assertTrue(migrate_nova_databases.call_count, 1)
def test_upgrade_liberty_mitaka(self, determine_packages, migrate_nova_databases, get_step_upgrade_source, database_setup): "Simulate a call to do_openstack_upgrade() for liberty->mitaka" self.test_config.set('openstack-origin', 'cloud:trusty-kilo') get_step_upgrade_source.return_value = None self.os_release.return_value = 'liberty' self.get_os_codename_install_source.return_value = 'mitaka' self.is_leader.return_value = True self.relation_ids.return_value = [] database_setup.return_value = False utils.do_openstack_upgrade(self.register_configs()) self.apt_update.assert_called_with(fatal=True) self.apt_upgrade.assert_called_with(options=DPKG_OPTS, fatal=True, dist=True) self.apt_install.assert_called_with(determine_packages(), fatal=True) self.register_configs.assert_called_with(release='mitaka') self.assertFalse(migrate_nova_databases.called) database_setup.assert_called_with(prefix='novaapi')
def test_config_changed_region_change(self, mock_compute_changed, mock_config_https, mock_filter_packages, mock_service_resume, mock_is_db_initialised, mock_update_nova_consoleauth_config, mock_update_aws_compat_services): self.git_install_requested.return_value = False self.openstack_upgrade_available.return_value = False self.config_value_changed.return_value = True self.related_units.return_value = ['unit/0'] self.relation_ids.side_effect = \ lambda x: ['generic_rid'] if x == 'cloud-compute' else [] mock_is_db_initialised.return_value = False self.os_release.return_value = 'diablo' hooks.config_changed() mock_compute_changed.assert_has_calls([call('generic_rid', 'unit/0')]) self.assertTrue(mock_update_nova_consoleauth_config.called) self.assertTrue(mock_update_aws_compat_services.called)
def test_db_joined_mitaka(self): self.get_relation_ip.return_value = '10.10.10.10' self.os_release.return_value = 'mitaka' self.is_relation_made.return_value = False hooks.db_joined() self.relation_set.assert_has_calls([ call(nova_database='nova', nova_username='nova', nova_hostname='10.10.10.10', relation_id=None), call(novaapi_database='nova_api', novaapi_username='nova', novaapi_hostname='10.10.10.10', relation_id=None), ]) self.get_relation_ip.assert_called_with('shared-db', cidr_network=None)
def test_amqp_changed_api_rel(self, configs, cell_joined, api_joined, quantum_joined, mock_is_db_initialised, update_db_allowed, init_db_allowed): self.relation_ids.side_effect = [ ['nova-cell-api/0'], ['nova-api/0'], ['quantum-service/0'], ] mock_is_db_initialised.return_value = False configs.complete_contexts = MagicMock() configs.complete_contexts.return_value = ['amqp'] configs.write = MagicMock() self.os_release.return_value = 'diablo' self.is_relation_made.return_value = True hooks.amqp_changed() self.assertEqual(configs.write.call_args_list, [call('/etc/nova/nova.conf')]) cell_joined.assert_called_with(rid='nova-cell-api/0') api_joined.assert_called_with(rid='nova-api/0') quantum_joined.assert_called_with(rid='quantum-service/0', remote_restart=True)
def test_execute(self, mock_finalize, mock_execute, mock_initialize): c1 = Chain('c1') l1 = Link('l1') l2 = Link('l2') l3 = Link('l3') # test happy flow c1.links = [l1, l2, l3] mock_initialize.return_value = StatusCode.Success mock_execute.return_value = StatusCode.Success mock_finalize.return_value = StatusCode.Success mock_parent = mock.MagicMock(autospec=True) mock_parent.attach_mock(mock_initialize, 'initialize') mock_parent.attach_mock(mock_execute, 'execute') mock_parent.attach_mock(mock_finalize, 'finalize') status = c1.initialize() self.assertEqual(status, StatusCode.Success) status = c1.execute() self.assertEqual(status, StatusCode.Success) status = c1.finalize() self.assertEqual(status, StatusCode.Success) calls = [mock.call.initialize()]*3 + [mock.call.execute()]*3 + [mock.call.finalize()]*3 mock_parent.assert_has_calls(calls, any_order=False)