我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用mock.patch()。
def test_resource_form_create_valid(self, mock_open): dataset = Dataset() app = self._get_test_app() env, response = _get_resource_new_page_as_sysadmin(app, dataset['id']) form = response.forms['resource-edit'] upload = ('upload', 'valid.csv', VALID_CSV) valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV)) with mock.patch('io.open', return_value=valid_stream): submit_and_follow(app, form, env, 'save', upload_files=[upload]) dataset = call_action('package_show', id=dataset['id']) assert_equals(dataset['resources'][0]['validation_status'], 'success') assert 'validation_timestamp' in dataset['resources'][0]
def _test_should_sleep(self, seconds_left, slept): attempt = 5 timeout = 20 interval = 3 randint = 2 deadline = self.now + seconds_left retry = h_retry.Retry(mock.Mock(), timeout=timeout, interval=interval) with mock.patch('random.randint') as m_randint, \ mock.patch('time.sleep') as m_sleep: m_randint.return_value = randint ret = retry._sleep(deadline, attempt, _EX2()) self.assertEqual(slept, ret) m_randint.assert_called_once_with(1, 2 ** attempt - 1) m_sleep.assert_called_once_with(slept)
def upload_to_fileshare_test(self): #pylint: disable=no-self-use """Upload copies files to non-native store correctly with no progress""" import shutil import tempfile temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp()) temp_src_dir = os.path.dirname(temp_file.name) temp_dst_dir = tempfile.mkdtemp() shutil_mock = MagicMock() shutil_mock.copyfile.return_value = None with patch('sfctl.custom_app.shutil', new=shutil_mock): sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False) shutil_mock.copyfile.assert_called_once() temp_file.close() shutil.rmtree(os.path.dirname(temp_file.name)) shutil.rmtree(temp_dst_dir)
def test_reboot_and_finish_deploy_force_reboot(self, power_action_mock, get_pow_state_mock): d_info = self.node.driver_info d_info['deploy_forces_oob_reboot'] = True self.node.driver_info = d_info self.node.save() self.config(group='ansible', post_deploy_get_power_state_retry_interval=0) self.node.provision_state = states.DEPLOYING self.node.save() with task_manager.acquire(self.context, self.node.uuid) as task: with mock.patch.object(task.driver, 'network') as net_mock: self.driver.reboot_and_finish_deploy(task) net_mock.remove_provisioning_network.assert_called_once_with( task) net_mock.configure_tenant_networks.assert_called_once_with( task) expected_power_calls = [((task, states.POWER_OFF),), ((task, states.POWER_ON),)] self.assertEqual(expected_power_calls, power_action_mock.call_args_list) get_pow_state_mock.assert_not_called()
def test_requires_usergroup_no_acc(self): """ Test requires usergroup decorator if the user has no access """ with patch("ownbot.auth.User") as user_mock: user_mock.return_value.has_access.return_value = False @ownbot.auth.requires_usergroup("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) return True bot_mock = Mock(spec=Bot) update = self.__get_dummy_update() called = my_command_handler(bot_mock, update) self.assertIsNone(called)
def test_requires_usergroup_acc(self): """ Test requires usergroup decorator if the user has access """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock: user_mock = user_mock.return_value user_mock.has_acces.return_value = True @ownbot.auth.requires_usergroup("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) return True bot_mock = Mock(spec=Bot) update_mock = Update(1337) called = my_command_handler(bot_mock, update_mock) self.assertTrue(called)
def test_requires_usergroup_self(self): """ Test requires usergroup decorator with self as first argument. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock: user_mock = user_mock.return_value user_mock.has_acces.return_value = True @ownbot.auth.requires_usergroup("foo") def my_command_handler(self, bot, update): """Dummy command handler""" print(self, bot, update) return True bot_mock = Mock(spec=Bot) update_mock = Update(1337) called = my_command_handler(None, bot_mock, update_mock) self.assertTrue(called)
def test_assign_first_to(self): """ Test assign first to decorator. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = True @ownbot.auth.assign_first_to("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertTrue(user_mock.save.called)
def test_assign_first_to_with_self(self): """ Test assign first to decorator with self as first argument. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = True @ownbot.auth.assign_first_to("foo") def my_command_handler(self, bot, update): """Dummy command handler""" print(self, bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(None, bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertTrue(user_mock.save.called)
def test_default_security_group_rest_callback(self): try: kwargs = test_sg_create with mock.patch.object(self.securityGroupDb, 'get_security_group', return_value=security_group): resp = self._mock_req_resp(requests.codes.all_good) with mock.patch('requests.request', return_value=resp): self.secGroupSub.\ create_security_group(None, None, None, **kwargs) resp = self._mock_req_resp(requests.codes.no_content) with mock.patch('requests.request', return_value=resp): self.secGroupSub.\ create_security_group(None, None, None, **kwargs) resp = self._mock_req_resp(requests.codes.not_implemented) with mock.patch('requests.request', return_value=resp): self.secGroupSub.\ create_security_group(None, None, None, **kwargs) except Exception: pass
def test_resource_form_create_invalid(self, mock_open): dataset = Dataset() app = self._get_test_app() env, response = _get_resource_new_page_as_sysadmin(app, dataset['id']) form = response.forms['resource-edit'] upload = ('upload', 'invalid.csv', INVALID_CSV) invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV)) with mock.patch('io.open', return_value=invalid_stream): response = webtest_submit( form, 'save', upload_files=[upload], extra_environ=env) assert_in('validation', response.body) assert_in('missing-value', response.body) assert_in('Row 2 has a missing value in column 4', response.body)
def test_resource_form_update_valid(self, mock_open): dataset = Dataset(resources=[ { 'url': 'https://example.com/data.csv' } ]) app = self._get_test_app() env, response = _get_resource_update_page_as_sysadmin( app, dataset['id'], dataset['resources'][0]['id']) form = response.forms['resource-edit'] upload = ('upload', 'valid.csv', VALID_CSV) valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV)) with mock.patch('io.open', return_value=valid_stream): submit_and_follow(app, form, env, 'save', upload_files=[upload]) dataset = call_action('package_show', id=dataset['id']) assert_equals(dataset['resources'][0]['validation_status'], 'success') assert 'validation_timestamp' in dataset['resources'][0]
def test_resource_form_update_invalid(self, mock_open): dataset = Dataset(resources=[ { 'url': 'https://example.com/data.csv' } ]) app = self._get_test_app() env, response = _get_resource_update_page_as_sysadmin( app, dataset['id'], dataset['resources'][0]['id']) form = response.forms['resource-edit'] upload = ('upload', 'invalid.csv', INVALID_CSV) invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV)) with mock.patch('io.open', return_value=invalid_stream): response = webtest_submit( form, 'save', upload_files=[upload], extra_environ=env) assert_in('validation', response.body) assert_in('missing-value', response.body) assert_in('Row 2 has a missing value in column 4', response.body)
def test_delete_file_not_deleted_if_resources_first(self, mock_open): resource_id = str(uuid.uuid4()) path = '/doesnt_exist/resources/{}'.format(resource_id) patcher = fake_filesystem_unittest.Patcher() patcher.setUp() patcher.fs.CreateFile(path) assert os.path.exists(path) with mock.patch('ckanext.validation.utils.get_local_upload_path', return_value=path): delete_local_uploaded_file(resource_id) assert not os.path.exists(path) assert os.path.exists('/doesnt_exist/resources') patcher.tearDown()
def test_delete_passes_if_os_exeception(self, mock_open): resource_id = str(uuid.uuid4()) path = '/doesnt_exist/resources/{}/{}/{}'.format( resource_id[0:3], resource_id[3:6], resource_id[6:] ) patcher = fake_filesystem_unittest.Patcher() patcher.setUp() patcher.fs.CreateFile(path) assert os.path.exists(path) with mock.patch('ckanext.validation.utils.os.remove', side_effect=OSError): delete_local_uploaded_file(resource_id) patcher.tearDown()
def test_validation_fails_on_upload(self, mock_open): invalid_file = StringIO.StringIO() invalid_file.write(INVALID_CSV) mock_upload = MockFieldStorage(invalid_file, 'invalid.csv') dataset = factories.Dataset() invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV)) with mock.patch('io.open', return_value=invalid_stream): with assert_raises(t.ValidationError) as e: call_action( 'resource_create', package_id=dataset['id'], format='CSV', upload=mock_upload ) assert 'validation' in e.exception.error_dict assert 'missing-value' in str(e.exception) assert 'Row 2 has a missing value in column 4' in str(e.exception)
def test_validation_passes_on_upload(self, mock_open): invalid_file = StringIO.StringIO() invalid_file.write(VALID_CSV) mock_upload = MockFieldStorage(invalid_file, 'invalid.csv') dataset = factories.Dataset() valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV)) with mock.patch('io.open', return_value=valid_stream): resource = call_action( 'resource_create', package_id=dataset['id'], format='CSV', upload=mock_upload ) assert_equals(resource['validation_status'], 'success') assert 'validation_timestamp' in resource
def test_get_swift_hash_env(self, mock_config, mock_service_name): mock_config.return_value = None mock_service_name.return_value = "testsvc" tmpfile = tempfile.mktemp() swift_context.SWIFT_HASH_FILE = tmpfile with mock.patch('lib.swift_context.os.environ.get') as mock_env_get: mock_env_get.return_value = str(uuid.uuid4()) hash_ = swift_context.get_swift_hash() mock_env_get.assert_has_calls([ mock.call('JUJU_MODEL_UUID'), mock.call('JUJU_ENV_UUID', mock_env_get.return_value) ]) with open(tmpfile, 'r') as fd: self.assertEqual(hash_, fd.read()) self.assertTrue(mock_config.called)
def test_get_log_message( self, log_consumer_instance, publish_log_messages, log_message, log_topic ): with mock.patch( 'yelp_kafka.discovery.get_region_cluster', return_value=get_config().cluster_config ): with log_consumer_instance as consumer: publish_log_messages(log_topic, log_message, count=1) asserter = ConsumerAsserter( consumer=consumer, expected_message=log_message ) _message = consumer.get_message(blocking=True, timeout=TIMEOUT) asserter.assert_messages([_message], expected_count=1)
def test_base_consumer_without_cluster_name( self, topic, consumer_init_kwargs ): with mock.patch( 'yelp_kafka.discovery.get_kafka_cluster' ) as mock_get_kafka_cluster, mock.patch( 'kafka_utils.util.config.ClusterConfig.__init__', return_value=None ) as mock_cluster_config_init: consumer = BaseConsumer( topic_to_consumer_topic_state_map={topic: None}, auto_offset_reset='largest', **consumer_init_kwargs ) consumer._region_cluster_config assert mock_get_kafka_cluster.call_count == 0 config = get_config() mock_cluster_config_init.assert_called_once_with( type='standard', name='data_pipeline', broker_list=config.kafka_broker_list, zookeeper=config.kafka_zookeeper )
def test_setup_connections( self, base_path, refresh_batch, cluster ): with mock.patch( base_path + '.TransactionManager' ) as mock_manager, mock.patch.object( refresh_batch, 'get_connection_set_from_cluster' ) as mock_get_conn: refresh_batch.setup_connections() mock_manager.assert_called_once_with( cluster_name=cluster, ro_replica_name=cluster, rw_replica_name=cluster, connection_set_getter=mock_get_conn )
def test_after_row_processing(self, refresh_batch, write_session, rw_conn): with mock.patch.object( refresh_batch, 'throttle_to_replication' ) as throttle_mock, mock.patch.object( refresh_batch, '_wait_for_throughput', return_value=None ) as mock_wait: # count can be anything since self.avg_throughput_cap is set to None refresh_batch.unlock_tables(write_session) refresh_batch.throttle_throughput(count=0) assert write_session.rollback.call_count == 1 write_session.execute.assert_called_once_with('UNLOCK TABLES') assert write_session.commit.call_count == 1 throttle_mock.assert_called_once_with(rw_conn) assert mock_wait.call_count == 1 assert refresh_batch.avg_rows_per_second_cap == refresh_batch.DEFAULT_AVG_ROWS_PER_SECOND_CAP
def test_create_table_from_src_table( self, refresh_batch, fake_original_table, fake_new_table, show_table_query, write_session ): with mock.patch.object( refresh_batch, '_execute_query', autospec=True ) as mock_execute: mock_execute.return_value.fetchone.return_value = [ 'test_db', fake_original_table ] refresh_batch.create_table_from_src_table(write_session) calls = [ mock.call(write_session, show_table_query), mock.call(write_session, fake_new_table) ] mock_execute.assert_has_calls(calls, any_order=True)
def test_log_error_on_exception(self, message): with mock.patch.object( data_pipeline._clog_writer.clog, 'log_line', side_effect=RandomException() ) as mock_log_line, mock.patch.object( data_pipeline._clog_writer, 'logger' ) as mock_logger: writer = ClogWriter() writer.publish(message) call_args = "Failed to scribe message - {}".format(str(message)) assert mock_log_line.called assert mock_logger.error.call_args_list[0] == mock.call(call_args)
def test_meteorite_on_off( self, create_message, registered_schema, producer, enable_meteorite, expected_call_count ): with mock.patch.object( data_pipeline.tools.meteorite_wrappers.StatsCounter, 'process', autospec=True ) as mock_stats_counter: producer.enable_meteorite = enable_meteorite m = create_message(registered_schema, timeslot=1.0) producer.publish(m) assert mock_stats_counter.call_count == expected_call_count
def test_sensu_process_called_once_inside_window( self, create_message, registered_schema, producer, message_count ): with mock.patch.object( data_pipeline.tools.sensu_ttl_alerter.SensuTTLAlerter, 'process', autospec=True, return_value=None ) as mock_sensu_ttl_process: producer.enable_sensu = True m1 = create_message(registered_schema, timeslot=1.0) for i in range(message_count): producer.publish(m1) assert mock_sensu_ttl_process.call_count == 1
def test_ensure_messages_published_fails_when_overpublished( self, topic, messages, producer, topic_offsets ): for message in messages: producer.publish(message) producer.flush() with pytest.raises( PublicationUnensurableError ), mock.patch.object( data_pipeline.producer, 'logger' ) as mock_logger: producer.ensure_messages_published(messages[:2], topic_offsets) self._assert_logged_info_correct( mock_logger, len(messages), topic, topic_offsets, message_count=len(messages[:2]) )
def test_publish_fails_after_retry(self, message, producer): # TODO(DATAPIPE-606|clin) investigate better way than mocking response with mock.patch.object( producer._kafka_producer.kafka_client, 'send_produce_request', side_effect=[FailedPayloadsError] ) as mock_send_request, capture_new_messages( message.topic ) as get_messages, pytest.raises( MaxRetryError ): orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer) producer.publish(message) producer.flush() messages = get_messages() assert len(messages) == 0 assert mock_send_request.call_count == self.max_retry_count self.assert_new_topic_to_offset_map( producer, message.topic, orig_topic_to_offset_map, published_message_count=0 )
def setUp(self): client = mock.MagicMock() bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP) logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging') self.addCleanup(logging_patch.stop) logging_patch.start() self.system = TrailingOrders(client, bootstrap) self.system.get_pending_orders = mock.MagicMock(return_value=[]) self.set_next_operation = mock.MagicMock() next_operation_patcher = mock.patch( 'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation ) self.addCleanup(next_operation_patcher.stop) next_operation_patcher.start()
def _setup_operation(self, next_operation): client = mock.Mock() bootstrap = TrailingOrdersFactory().make_fake_bootstrap( TrailingOrderSetup.make( next_operation=next_operation, start_value=self.START_VALUE, stop_value=self.STOP_VALUE, reversal=self.REVERSAL, stop_loss=self.STOP_LOSS, operational_cost=0.2, profit=0.5, ) ) logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging') self.addCleanup(logging_patch.stop) logging_patch.start() self.system = TrailingOrders(client, bootstrap)
def test_push(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_already_in_registry_with_force(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image', "--force"] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 1] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock result = self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) self.assertEqual(result.exit_code, 1) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0, 1] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock result = self._invoke_cli( global_params=self.global_params, subcmd='push', subcmd_params=push_params ) self.assertEqual(result.exit_code, 0) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock): skipper_runner_run_mock.side_effect = [0, 0] push_params = ['my_image'] with mock.patch('requests.Response', autospec=True) as requests_response_class_mock: requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock self._invoke_cli( defaults=config.load_defaults(), subcmd='push', subcmd_params=push_params ) expected_commands = [ mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']), mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']), ] skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_get_retry_logging(log_stream): with mock.patch("umapi_client.connection.requests.Session.get") as mock_get: mock_get.return_value = MockResponse(429, headers={"Retry-After": "3"}) stream, logger = log_stream params = dict(mock_connection_params) params["logger"] = logger conn = Connection(**params) pytest.raises(UnavailableError, conn.make_call, "") stream.flush() log = stream.getvalue() # save as a local so can do pytest -l to see exact log assert log == """UMAPI timeout...service unavailable (code 429 on try 1) Next retry in 3 seconds... UMAPI timeout...service unavailable (code 429 on try 2) Next retry in 3 seconds... UMAPI timeout...service unavailable (code 429 on try 3) UMAPI timeout...giving up after 3 attempts (6 seconds). """ # log_stream fixture defined in conftest.py
def test_post_retry_logging(log_stream): with mock.patch("umapi_client.connection.requests.Session.post") as mock_post: mock_post.return_value = MockResponse(429, headers={"Retry-After": "3"}) stream, logger = log_stream params = dict(mock_connection_params) params["logger"] = logger conn = Connection(**params) pytest.raises(UnavailableError, conn.make_call, "", [3, 5]) stream.flush() log = stream.getvalue() # save as a local so can do pytest -l to see exact log assert log == """UMAPI timeout...service unavailable (code 429 on try 1) Next retry in 3 seconds... UMAPI timeout...service unavailable (code 429 on try 2) Next retry in 3 seconds... UMAPI timeout...service unavailable (code 429 on try 3) UMAPI timeout...giving up after 3 attempts (6 seconds). """
def test_plug(self, mock_plug): plg = extension.Extension(name="noop", entry_point="os-vif", plugin=NoOpPlugin, obj=None) with mock.patch('stevedore.extension.ExtensionManager.names', return_value=['foobar']),\ mock.patch('stevedore.extension.ExtensionManager.__getitem__', return_value=plg): os_vif.initialize() info = mock.sentinel.info vif = mock.MagicMock() vif.plugin_name = 'noop' os_vif.plug(vif, info) mock_plug.assert_called_once_with(vif, info)
def test_unplug(self, mock_unplug): plg = extension.Extension(name="demo", entry_point="os-vif", plugin=NoOpPlugin, obj=None) with mock.patch('stevedore.extension.ExtensionManager.names', return_value=['foobar']),\ mock.patch('stevedore.extension.ExtensionManager.__getitem__', return_value=plg): os_vif.initialize() info = mock.sentinel.info vif = mock.MagicMock() vif.plugin_name = 'noop' os_vif.unplug(vif, info) mock_unplug.assert_called_once_with(vif, info)
def test_wrap_consumer(self, m_retry_type, m_logging_type): consumer = mock.sentinel.consumer retry_handler = mock.sentinel.retry_handler logging_handler = mock.sentinel.logging_handler m_retry_type.return_value = retry_handler m_logging_type.return_value = logging_handler thread_group = mock.sentinel.thread_group with mock.patch.object(h_dis.EventPipeline, '__init__'): pipeline = h_pipeline.ControllerPipeline(thread_group) ret = pipeline._wrap_consumer(consumer) self.assertEqual(logging_handler, ret) m_logging_type.assert_called_with(retry_handler) m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count): m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} resource_version = "123" new_resource_version = "456" conflicting_obj = {'metadata': { 'annotations': annotations, 'resourceVersion': resource_version}} good_obj = {'metadata': { 'annotations': annotations, 'resourceVersion': new_resource_version}} conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True) good_data = jsonutils.dumps(good_obj, sort_keys=True) m_resp_conflict = mock.MagicMock() m_resp_conflict.ok = False m_resp_conflict.status_code = requests.codes.conflict m_resp_good = mock.MagicMock() m_resp_good.ok = True m_resp_good.json.return_value = conflicting_obj m_patch.side_effect = [m_resp_conflict, m_resp_good] with mock.patch.object(self.client, 'get') as m_get: m_get.return_value = good_obj self.assertEqual(annotations, self.client.annotate( path, annotations, resource_version=resource_version)) m_patch.assert_has_calls([ mock.call(self.base_url + path, data=conflicting_data, headers=mock.ANY, cert=(None, None), verify=False), mock.call(self.base_url + path, data=good_data, headers=mock.ANY, cert=(None, None), verify=False)])
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count): m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} annotating_resource_version = '123' annotating_obj = {'metadata': { 'annotations': annotations, 'resourceVersion': annotating_resource_version}} annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True) new_resource_version = '456' new_obj = {'metadata': { 'resourceVersion': new_resource_version}} resolution_obj = annotating_obj.copy() resolution_obj['metadata']['resourceVersion'] = new_resource_version resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True) m_resp_conflict = mock.MagicMock() m_resp_conflict.ok = False m_resp_conflict.status_code = requests.codes.conflict m_resp_good = mock.MagicMock() m_resp_good.ok = True m_resp_good.json.return_value = resolution_obj m_patch.side_effect = (m_resp_conflict, m_resp_good) with mock.patch.object(self.client, 'get') as m_get: m_get.return_value = new_obj self.assertEqual(annotations, self.client.annotate( path, annotations, resource_version=annotating_resource_version)) m_patch.assert_has_calls([ mock.call(self.base_url + path, data=annotating_data, headers=mock.ANY, cert=(None, None), verify=False), mock.call(self.base_url + path, data=resolution_data, headers=mock.ANY, cert=(None, None), verify=False)])
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count): m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} resource_version = "123" new_resource_version = "456" conflicting_obj = {'metadata': { 'annotations': annotations, 'resourceVersion': resource_version}} actual_obj = {'metadata': { 'annotations': {'a1': 'v2'}, 'resourceVersion': new_resource_version}} conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True) m_resp_conflict = mock.MagicMock() m_resp_conflict.ok = False m_resp_conflict.status_code = requests.codes.conflict m_patch.return_value = m_resp_conflict with mock.patch.object(self.client, 'get') as m_get: m_get.return_value = actual_obj self.assertRaises(exc.K8sClientException, self.client.annotate, path, annotations, resource_version=resource_version) m_patch.assert_called_once_with(self.base_url + path, data=conflicting_data, headers=mock.ANY, cert=(None, None), verify=False)
def test_run(self, m_count): event = mock.sentinel.event group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.return_value = True m_queue.get.return_value = event m_handler = mock.Mock() m_count.return_value = [1] async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), queue_depth=1) with mock.patch('time.sleep'): async_handler._run(group, m_queue) m_handler.assert_called_once_with(event)
def test_run_empty(self, m_count): events = [mock.sentinel.event1, mock.sentinel.event2] group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.return_value = True m_queue.get.side_effect = events + [six_queue.Empty()] m_handler = mock.Mock() m_count.return_value = list(range(5)) async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) with mock.patch('time.sleep'): async_handler._run(group, m_queue) m_handler.assert_has_calls([mock.call(event) for event in events]) self.assertEqual(len(events), m_handler.call_count)
def test__prepare_variables_configdrive_file(self, mem_req_mock): i_info = self.node.instance_info i_info['configdrive'] = 'fake-content' self.node.instance_info = i_info self.node.save() self.config(tempdir='/path/to/tmpfiles') expected = {"image": {"url": "http://image", "validate_certs": "yes", "source": "fake-image", "mem_req": 2000, "disk_format": "qcow2", "checksum": "md5:checksum"}, 'configdrive': {'type': 'file', 'location': '/path/to/tmpfiles/%s.cndrive' % self.node.uuid}} with mock.patch.object(ansible_deploy, 'open', mock.mock_open(), create=True) as open_mock: with task_manager.acquire(self.context, self.node.uuid) as task: self.assertEqual(expected, ansible_deploy._prepare_variables(task)) open_mock.assert_has_calls(( mock.call('/path/to/tmpfiles/%s.cndrive' % self.node.uuid, 'w'), mock.call().__enter__(), mock.call().write('fake-content'), mock.call().__exit__(None, None, None)))