我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.DEFAULT。
def test_init(self, mock_get_env_var, mock_get_dir_path): """Test initialization of config object""" # set return values of project utility functions mock_get_env_var.side_effect = lambda *a, **k: ':0.0' if a and a[0] == 'display' else mock.DEFAULT mock_get_dir_path.side_effect = lambda *a, **k: 'es_path' if a and a[0] == 'es_root' else mock.DEFAULT # create mock config object mock_config_object = mock.MagicMock(name='ConfigObject_instance') settings = {} mock_config_object.__getitem__ = lambda s, k: settings.__getitem__(k) mock_config_object.__setitem__ = lambda s, k, v: settings.__setitem__(k, v) mock_config_object.get = settings.get # call init method with mock variables CONFIG_VARS.update([('sec1', ['var1']), ('sec2', ['var2', 'var3'])]) CONFIG_DEFAULTS.update(var1='foo', var3=42) ConfigObject.__init__(mock_config_object) # check values of settings variables exp_settings = dict(var1='foo', var2=None, var3=42, batchMode=False, esRoot='es_path', resultsDir='es_path/results', dataDir='es_path/data', macrosDir='es_path/tutorials', templatesDir='es_path/templates', configDir='es_path/config') self.assertDictEqual(settings, exp_settings, 'unexpected resulting settings dictionary')
def test_sends_notification_if_resubmitted(self, run_check): # Include whimsical set to True to avoid error in the False code branch: config = { 'duration': 1.0, 'host': 'fakehost.herokuapp.com', 'whimsical': False } fake_assignment = mock.Mock(AssignmentStatus='Submitted') mturk = mock.Mock(**{'get_assignment.return_value': [fake_assignment]}) participants = [self.a.participant()] session = None # Move the clock forward so assignment is overdue: reference_time = datetime.datetime.now() + datetime.timedelta(hours=6) mock_messager = mock.Mock(spec=NullHITMessager) with mock.patch.multiple('dallinger.heroku.clock', requests=mock.DEFAULT, NullHITMessager=mock.DEFAULT) as mocks: mocks['NullHITMessager'].return_value = mock_messager run_check(config, mturk, participants, session, reference_time) mock_messager.send_resubmitted_msg.assert_called()
def test_no_assignement_on_mturk_sends_hit_cancelled_message(self, run_check): # Include whimsical set to True to avoid error in the False code branch: config = { 'duration': 1.0, 'host': 'fakehost.herokuapp.com', 'whimsical': False } mturk = mock.Mock(**{'get_assignment.return_value': []}) participants = [self.a.participant()] session = None # Move the clock forward so assignment is overdue: reference_time = datetime.datetime.now() + datetime.timedelta(hours=6) mock_messager = mock.Mock(spec=NullHITMessager) with mock.patch.multiple('dallinger.heroku.clock', requests=mock.DEFAULT, NullHITMessager=mock.DEFAULT) as mocks: mocks['NullHITMessager'].return_value = mock_messager run_check(config, mturk, participants, session, reference_time) mock_messager.send_hit_cancelled_msg.assert_called()
def test_retry(self, mock_exc_to_code, mock_time): mock_exc_to_code.side_effect = lambda e: e.code to_attempt = 3 retry = RetryOptions( [_FAKE_STATUS_CODE_1], BackoffSettings(0, 0, 0, 0, 0, 0, 1)) # Succeeds on the to_attempt'th call, and never again afterward mock_call = mock.Mock() mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] * (to_attempt - 1) + [mock.DEFAULT]) mock_call.return_value = 1729 mock_time.return_value = 0 settings = _CallSettings(timeout=0, retry=retry) my_callable = api_callable.create_api_call(mock_call, settings) self.assertEqual(my_callable(None), 1729) self.assertEqual(mock_call.call_count, to_attempt)
def test_retryable_without_timeout(self, mock_time, mock_exc_to_code): mock_time.return_value = 0 mock_exc_to_code.side_effect = lambda e: e.code to_attempt = 3 mock_call = mock.Mock() mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] * (to_attempt - 1) + [mock.DEFAULT]) mock_call.return_value = 1729 retry_options = RetryOptions( [_FAKE_STATUS_CODE_1], BackoffSettings(0, 0, 0, None, None, None, None)) my_callable = retry.retryable(mock_call, retry_options) self.assertEqual(my_callable(None), 1729) self.assertEqual(to_attempt, mock_call.call_count)
def test_retryable_with_timeout(self, mock_time, mock_exc_to_code): mock_time.return_value = 1 mock_exc_to_code.side_effect = lambda e: e.code mock_call = mock.Mock() mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1), mock.DEFAULT] mock_call.return_value = 1729 retry_options = RetryOptions( [_FAKE_STATUS_CODE_1], BackoffSettings(0, 0, 0, 0, 0, 0, 0)) my_callable = retry.retryable(mock_call, retry_options) self.assertRaises(errors.RetryError, my_callable) self.assertEqual(0, mock_call.call_count)
def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code): mock_time.return_value = 0 mock_exc_to_code.side_effect = lambda e: e.code mock_call = mock.Mock() mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1), mock.DEFAULT] mock_call.return_value = 1729 retry_options = RetryOptions( [], BackoffSettings(0, 0, 0, 0, 0, 0, 1)) my_callable = retry.retryable(mock_call, retry_options) try: my_callable(None) self.fail('Should not have been reached') except errors.RetryError as exc: self.assertIsInstance(exc.cause, CustomException) self.assertEqual(1, mock_call.call_count)
def test_retryable_aborts_on_unexpected_exception( self, mock_time, mock_exc_to_code): mock_time.return_value = 0 mock_exc_to_code.side_effect = lambda e: e.code mock_call = mock.Mock() mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_2), mock.DEFAULT] mock_call.return_value = 1729 retry_options = RetryOptions( [_FAKE_STATUS_CODE_1], BackoffSettings(0, 0, 0, 0, 0, 0, 1)) my_callable = retry.retryable(mock_call, retry_options) try: my_callable(None) self.fail('Should not have been reached') except errors.RetryError as exc: self.assertIsInstance(exc.cause, CustomException) self.assertEqual(1, mock_call.call_count)
def test_nrpe_dependency_installed(self, mock_config): config = copy.deepcopy(CHARM_CONFIG) mock_config.side_effect = lambda key: config[key] with patch.multiple(ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT) as mocks: ceph_hooks.update_nrpe_config() mocks["apt_install"].assert_called_once_with( ["python-dbus", "lockfile-progs"])
def test_train_1(self): j = 2 ret_train = np.zeros((6, 3, N_CLASSES)) ret_dev = np.zeros((6, 3, N_CLASSES)) func_ret = np.zeros((1, N_CLASSES)) func_ret[0, j] = 1. with patch.multiple(self.wb, _gs=True, _generate_ts=lambda *x: (self.TRAIN_X, Y), _extract_features=MagicMock( return_value=FEATS), _model=MOCK_DEFAULT): with patch("dsenser.wang.wangbase.GridSearchCV"): self.wb._model.decision_function = \ MagicMock(return_value=func_ret) self.wb._model.classes_ = CLASSES_ self.wb.train(([(0, REL1)], [PARSE1]), ([(0, REL1)], [PARSE1]), 1, 1, ret_train, ret_dev)
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(self): with patch.multiple( ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT, emit_cephconf=DEFAULT, mon_relation_joined=DEFAULT, is_relation_made=DEFAULT) as mocks, patch( "charmhelpers.contrib.hardening.harden.config"): mocks["is_relation_made"].return_value = True ceph_hooks.upgrade_charm() mocks["apt_install"].assert_called_with( ["python-dbus", "lockfile-progs"])
def test__allocate_sockets(self): """Test allocating sockets. """ # access protected module _allocate_sockets # pylint: disable=w0212 socket.socket.bind.side_effect = [ socket.error(errno.EADDRINUSE, 'In use'), mock.DEFAULT, mock.DEFAULT, mock.DEFAULT ] sockets = treadmill.runtime._allocate_sockets( 'prod', '0.0.0.0', socket.SOCK_STREAM, 3 ) self.assertEqual(3, len(sockets))
def patch( target, new=mock.DEFAULT, spec=None, create=False, mocksignature=False, spec_set=None, autospec=False, new_callable=None, **kwargs ): """Mocks an async function. Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq attribute is automatically created and shouldn't be used when accessing data on the mock. """ getter, attribute = _get_target(target) return _make_patch_async( getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable, kwargs )
def test_attach_volume_fail(self, mock_execute): self.encryptor._get_key = mock.MagicMock() self.encryptor._get_key.return_value = \ test_cryptsetup.fake__get_key(None) mock_execute.side_effect = [ processutils.ProcessExecutionError(exit_code=1), # luksOpen mock.DEFAULT, # isLuks ] self.assertRaises(processutils.ProcessExecutionError, self.encryptor.attach_volume, None) mock_execute.assert_has_calls([ mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path, self.dev_name, process_input='0' * 32, run_as_root=True, check_exit_code=True), mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path, run_as_root=True, check_exit_code=True), ], any_order=False) self.assertEqual(2, mock_execute.call_count)
def test_deploy_no_callback(self, power_mock, get_ip_mock): self.config(group='ansible', use_ramdisk_callback=False) with mock.patch.multiple(self.driver, _ansible_deploy=mock.DEFAULT, reboot_to_instance=mock.DEFAULT) as moks: with task_manager.acquire( self.context, self.node['uuid'], shared=False) as task: driver_return = self.driver.deploy(task) self.assertEqual(driver_return, states.DEPLOYDONE) power_mock.assert_called_once_with(task, states.REBOOT) get_ip_mock.assert_called_once_with(task) moks['_ansible_deploy'].assert_called_once_with(task, '127.0.0.1') moks['reboot_to_instance'].assert_called_once_with(task)
def test_continue_deploy(self, getip_mock): self.node.provision_state = states.DEPLOYWAIT self.node.target_provision_state = states.ACTIVE self.node.save() with task_manager.acquire(self.context, self.node.uuid) as task: with mock.patch.multiple(self.driver, autospec=True, _ansible_deploy=mock.DEFAULT, reboot_to_instance=mock.DEFAULT): self.driver.continue_deploy(task) getip_mock.assert_called_once_with(task) self.driver._ansible_deploy.assert_called_once_with( task, '1.2.3.4') self.driver.reboot_to_instance.assert_called_once_with(task) self.assertEqual(states.ACTIVE, task.node.target_provision_state) self.assertEqual(states.DEPLOYING, task.node.provision_state)
def test_run(self, mCreateSession, mSample): mSample.return_value = 'XXX' iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_activities.return_value = { 'activities':[{ 'name': 'name', 'activityArn': 'XXX' }] } client.get_activity_task.return_value = { 'taskToken': 'YYY', 'input': '{}' } target = mock.MagicMock() activity = ActivityMixin(handle_task = target) def stop_loop(*args, **kwargs): activity.polling = False return mock.DEFAULT target.side_effect = stop_loop activity.run('name') calls = [ mock.call.list_activities(), mock.call.get_activity_task(activityArn = 'XXX', workerName = 'name-XXX') ] self.assertEqual(client.mock_calls, calls) calls = [ mock.call('YYY', {}), mock.call().start() ] self.assertEqual(target.mock_calls, calls)
def patchobject(self, target, attribute, new=mock.DEFAULT, autospec=True): """Convenient wrapper around `mock.patch.object` Returns a started mock that will be automatically stopped after the test ran. """ p = mock.patch.object(target, attribute, new, autospec=autospec) m = p.start() self.addCleanup(p.stop) return m
def test_check_db_for_missing_notifications_assembles_resources(self, run_check): # Can't import until after config is loaded: from dallinger.heroku.clock import check_db_for_missing_notifications with mock.patch.multiple('dallinger.heroku.clock', run_check=mock.DEFAULT, MTurkConnection=mock.DEFAULT) as mocks: mocks['MTurkConnection'].return_value = 'fake connection' check_db_for_missing_notifications() mocks['run_check'].assert_called()
def recruiter(self): from dallinger.recruiters import BotRecruiter with mock.patch.multiple('dallinger.recruiters', _get_queue=mock.DEFAULT, get_base_url=mock.DEFAULT) as mocks: mocks['get_base_url'].return_value = 'fake_base_url' r = BotRecruiter() r._get_bot_factory = mock.Mock() yield r
def recruiter(self, active_config): from dallinger.mturk import MTurkService from dallinger.recruiters import MTurkRecruiter with mock.patch.multiple('dallinger.recruiters', os=mock.DEFAULT, get_base_url=mock.DEFAULT) as mocks: mocks['get_base_url'].return_value = 'http://fake-domain' mocks['os'].getenv.return_value = 'fake-host-domain' mockservice = mock.create_autospec(MTurkService) active_config.extend({'mode': u'sandbox'}) r = MTurkRecruiter() r.mturkservice = mockservice('fake key', 'fake secret') r.mturkservice.check_credentials.return_value = True r.mturkservice.create_hit.return_value = {'type_id': 'fake type id'} return r
def faster(tempdir): with mock.patch.multiple('dallinger.command_line', time=mock.DEFAULT, setup_experiment=mock.DEFAULT) as mocks: mocks['setup_experiment'].return_value = ('fake-uid', tempdir) yield mocks
def testDEFAULT(self): self.assertIs(DEFAULT, sentinel.DEFAULT)
def test_upgrade_charm_with_nrpe_relation_installs_dependencies( self, mock_config): config = copy.deepcopy(CHARM_CONFIG) mock_config.side_effect = lambda key: config[key] with patch.multiple( ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT, emit_cephconf=DEFAULT, mon_relation_joined=DEFAULT, is_relation_made=DEFAULT) as mocks, patch( "charmhelpers.contrib.hardening.harden.config"): mocks["is_relation_made"].return_value = True ceph_hooks.upgrade_charm() mocks["apt_install"].assert_called_with( ["python-dbus", "lockfile-progs"])
def test_send_request(self): """ Test the execution of a deferred Supervisor request. """ from supvisors.mainloop import SupvisorsMainLoop from supvisors.utils import DeferredRequestHeaders main_loop = SupvisorsMainLoop(self.supvisors) # patch main loop subscriber with patch.multiple(main_loop, check_address=DEFAULT, start_process=DEFAULT, stop_process=DEFAULT, restart=DEFAULT, shutdown=DEFAULT) as mocked_loop: # test check address self.check_call(main_loop, mocked_loop, 'check_address', DeferredRequestHeaders.CHECK_ADDRESS, ('10.0.0.2', )) # test start process self.check_call(main_loop, mocked_loop, 'start_process', DeferredRequestHeaders.START_PROCESS, ('10.0.0.2', 'dummy_process', 'extra args')) # test stop process self.check_call(main_loop, mocked_loop, 'stop_process', DeferredRequestHeaders.STOP_PROCESS, ('10.0.0.2', 'dummy_process')) # test restart self.check_call(main_loop, mocked_loop, 'restart', DeferredRequestHeaders.RESTART, ('10.0.0.2', )) # test shutdown self.check_call(main_loop, mocked_loop, 'shutdown', DeferredRequestHeaders.SHUTDOWN, ('10.0.0.2', ))
def test_on_remote_event(self): """ Test the reception of a Supervisor remote comm event. """ from supvisors.listener import SupervisorListener listener = SupervisorListener(self.supvisors) # add patches for what is tested just above with patch.multiple(listener, unstack_event=DEFAULT, unstack_info=DEFAULT, authorization=DEFAULT): # test unknown type event = Mock(type='unknown', data='') listener.on_remote_event(event) self.assertFalse(listener.unstack_event.called) self.assertFalse(listener.unstack_info.called) self.assertFalse(listener.authorization.called) # test event event = Mock(type='event', data={'state': 'RUNNING'}) listener.on_remote_event(event) self.assertEqual([call({'state': 'RUNNING'})], listener.unstack_event.call_args_list) self.assertFalse(listener.unstack_info.called) self.assertFalse(listener.authorization.called) listener.unstack_event.reset_mock() # test info event = Mock(type='info', data={'name': 'dummy_process'}) listener.on_remote_event(event) self.assertFalse(listener.unstack_event.called) self.assertEqual([call({'name': 'dummy_process'})], listener.unstack_info.call_args_list) self.assertFalse(listener.authorization.called) listener.unstack_info.reset_mock() # test authorization event = Mock(type='auth', data=('10.0.0.1', True)) listener.on_remote_event(event) self.assertFalse(listener.unstack_event.called) self.assertFalse(listener.unstack_info.called) self.assertEqual([call(('10.0.0.1', True))], listener.authorization.call_args_list)
def test_run(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentcp'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self): self.cls.reactor = Mock(spec_set=reactor) self.cls.tls_factory = Mock() with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentls'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = None with pytest.raises(SystemExit) as excinfo: self.cls.run() assert excinfo.value.code == 3 assert mod_mocks['logger'].mock_calls == [ call.critical("ERROR: Could not get active vault node from " "Consul. Exiting.") ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [] assert mod_mocks['Site'].mock_calls == [] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [] assert mod_mocks['LoopingCall'].mock_calls == []
def run_and_exit(cli_args=None, prompt_commands=None): """Run http-prompt executable, execute some prompt commands, and exit.""" if cli_args is None: cli_args = [] # Make sure last command is 'exit' if prompt_commands is None: prompt_commands = ['exit'] else: prompt_commands += ['exit'] # Fool cli() so that it believes we're running from CLI instead of pytest. # We will restore it at the end of the function. orig_argv = sys.argv sys.argv = ['http-prompt'] + cli_args try: with patch.multiple('http_prompt.cli', prompt=DEFAULT, execute=DEFAULT) as mocks: mocks['execute'].side_effect = execute # prompt() is mocked to return the command in 'prompt_commands' in # sequence, i.e., prompt() returns prompt_commands[i-1] when it is # called for the ith time mocks['prompt'].side_effect = prompt_commands result = CliRunner().invoke(cli, cli_args) context = mocks['execute'].call_args[0][1] return result, context finally: sys.argv = orig_argv
def test_train_0(self): with patch.multiple(self.wb, _extract_features=MOCK_DEFAULT, _model=MOCK_DEFAULT): self.wb.train(([], []), None)
def test_train_0(self): with patch("dsenser.wang.wangbase.GridSearchCV"): with patch.multiple(self.wb, _generate_ts=lambda *x: (self.TRAIN_X, Y), _model=MOCK_DEFAULT): self.wb.train(([], []), None)
def test_train_1(self): rels = [REL1] * NFOLDS parses = [PARSE1] * NFOLDS with patch("dsenser.wang.wangbase.GridSearchCV"): with patch.multiple(self.wb, _generate_ts=lambda *x: (self.TRAIN_X, Y), _model=MOCK_DEFAULT): self.wb.train((rels, parses), (rels, parses))
def test_nrpe_dependency_installed(self): with patch.multiple(ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT) as mocks: ceph_hooks.update_nrpe_config() mocks["apt_install"].assert_called_once_with( ["python-dbus", "lockfile-progs"])
def test_cached(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos self.cls._data_cache = {'foo': 'bar'} res = self.cls._data assert res == {'foo': 'bar'} assert mocks['_transactions'].mock_calls == [] assert mocks['_scheduled_transactions_date'].mock_calls == [] assert mocks['_scheduled_transactions_per_period'].mock_calls == [] assert mocks['_scheduled_transactions_monthly'].mock_calls == [] assert mocks['_make_combined_transactions'].mock_calls == [] assert mocks['_make_budget_sums'].mock_calls == [] assert mocks['_make_overall_sums'].mock_calls == []
def test_insert_violations_with_error(self): """Test insert_violations handles errors during insert. Setup: * Create mocks: * self.dao.conn * self.dao.get_latest_snapshot_timestamp * self.dao.create_snapshot_table * Create side effect for one violation to raise an error. Expect: * Log MySQLError when table insert error occurs and return list of errors. * Return a tuple of (num_violations-1, [violation]) """ resource_name = 'policy_violations' self.dao.get_latest_snapshot_timestamp = mock.MagicMock( return_value=self.fake_snapshot_timestamp) self.dao.create_snapshot_table = mock.MagicMock( return_value=self.fake_table_name) violation_dao.LOGGER = mock.MagicMock() def insert_violation_side_effect(*args, **kwargs): if args[2] == self.expected_fake_violations[1]: raise MySQLdb.DataError( self.resource_name, mock.MagicMock()) else: return mock.DEFAULT self.dao.execute_sql_with_commit = mock.MagicMock( side_effect=insert_violation_side_effect) actual = self.dao.insert_violations( self.fake_flattened_violations, self.resource_name) expected = (2, [self.expected_fake_violations[1]]) self.assertEqual(expected, actual) self.assertEquals(1, violation_dao.LOGGER.error.call_count)
def test_rebuild_indices(self): with patch.multiple( Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT ) as handles: handles['_delete'].return_value = True call_command('search_index', stdout=self.out, action='rebuild') handles['_delete'].assert_called() handles['_create'].assert_called() handles['_populate'].assert_called()
def test_rebuild_indices_aborted(self): with patch.multiple( Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT ) as handles: handles['_delete'].return_value = False call_command('search_index', stdout=self.out, action='rebuild') handles['_delete'].assert_called() handles['_create'].assert_not_called() handles['_populate'].assert_not_called()
def test_with_statement(self): with patch.multiple(WireMockServer, start=DEFAULT, stop=DEFAULT) as mocks: with WireMockServer() as wm: self.assertIsInstance(wm, WireMockServer) mocks['start'].assert_called_once_with() mocks['stop'].assert_called_once_with()
def test_ioctl_fn_ptr_r(self, ioctl_mock): def _handle_ioctl(fd, request, int_ptr): assert fd == 12 assert request == 32 assert type(int_ptr) == ctypes.POINTER(ctypes.c_int) int_ptr.contents.value = 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_ptr_r(32, ctypes.c_int) res = fn(12) assert res == 42
def test_ioctl_fn_ptr_w(self, ioctl_mock): def _handle_ioctl(fd, request, int_ptr): assert fd == 12 assert request == 32 assert type(int_ptr) == ctypes.POINTER(ctypes.c_int) assert int_ptr.contents.value == 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_ptr_w(32, ctypes.c_int) fn(12, 42)
def test_ioctl_fn_ptr_wr(self, ioctl_mock): def _handle_ioctl(fd, request, int_ptr): assert fd == 12 assert request == 32 assert type(int_ptr) == ctypes.POINTER(ctypes.c_int) assert int_ptr.contents.value == 24 int_ptr.contents.value = 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_ptr_wr(32, ctypes.c_int) res = fn(12, 24) assert res == 42
def test_ioctl_fn_w(self, ioctl_mock): def _handle_ioctl(fd, request, int_val): assert fd == 12 assert request == 32 assert type(int_val) == ctypes.c_int assert int_val.value == 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_w(32, ctypes.c_int) fn(12, 42)
def _patch_object( target, attribute, new=mock.DEFAULT, spec=None, create=False, mocksignature=False, spec_set=None, autospec=False, new_callable=None, **kwargs ): getter = lambda: target return _make_patch_async( getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable, kwargs )
def _maybe_wrap_new(new): """If the mock replacement cannot have attributes set on it, wraps it in a function. Also, if the replacement object is a method, applies the async() decorator. This is needed so that we support patch(..., x.method) where x.method is an instancemethod object, because instancemethods do not support attribute assignment. """ if new is mock.DEFAULT: return new if inspect.isfunction(new) or isinstance(new, (classmethod, staticmethod)): return asynq(sync_fn=new)(new) elif not callable(new): return new try: new._maybe_wrap_new_test_attribute = None del new._maybe_wrap_new_test_attribute except (AttributeError, TypeError): # setting something on a bound method raises AttributeError, setting something on a # Cythonized class raises TypeError should_wrap = True else: should_wrap = False if should_wrap: # we can't just use a lambda because that overrides __get__ and creates bound methods we # don't want, so we make a wrapper class that overrides __call__ class Wrapper(object): def __call__(self, *args, **kwargs): return new(*args, **kwargs) return Wrapper() else: return new
def __init__(self, obj, attr, new=mock.DEFAULT, **kwargs): self.obj = obj self.attr = attr self.kwargs = kwargs self.new = new
def test_attach_volume_not_formatted(self, mock_execute): self.encryptor._get_key = mock.MagicMock() self.encryptor._get_key.return_value = \ test_cryptsetup.fake__get_key(None) mock_execute.side_effect = [ processutils.ProcessExecutionError(exit_code=1), # luksOpen processutils.ProcessExecutionError(exit_code=1), # isLuks mock.DEFAULT, # luksFormat mock.DEFAULT, # luksOpen mock.DEFAULT, # ln ] self.encryptor.attach_volume(None) mock_execute.assert_has_calls([ mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path, self.dev_name, process_input='0' * 32, run_as_root=True, check_exit_code=True), mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path, run_as_root=True, check_exit_code=True), mock.call('cryptsetup', '--batch-mode', 'luksFormat', '--key-file=-', self.dev_path, process_input='0' * 32, run_as_root=True, check_exit_code=True, attempts=3), mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path, self.dev_name, process_input='0' * 32, run_as_root=True, check_exit_code=True), mock.call('ln', '--symbolic', '--force', '/dev/mapper/%s' % self.dev_name, self.symlink_path, run_as_root=True, check_exit_code=True), ], any_order=False) self.assertEqual(5, mock_execute.call_count)
def test_get_rdp_console(self): mock_get_host_ip = self.rdpconsoleops._hostops.get_host_ip_addr mock_get_rdp_port = ( self.rdpconsoleops._rdpconsoleutils.get_rdp_console_port) mock_get_vm_id = self.rdpconsoleops._vmutils.get_vm_id connect_info = self.rdpconsoleops.get_rdp_console(mock.DEFAULT) self.assertEqual(mock_get_host_ip.return_value, connect_info.host) self.assertEqual(mock_get_rdp_port.return_value, connect_info.port) self.assertEqual(mock_get_vm_id.return_value, connect_info.internal_access_path)
def test_spawn_no_admin_permissions(self): self._vmops._vmutils.check_admin_permissions.side_effect = ( os_win_exc.HyperVException) self.assertRaises(os_win_exc.HyperVException, self._vmops.spawn, self.context, mock.DEFAULT, mock.DEFAULT, [mock.sentinel.FILE], mock.sentinel.PASSWORD, mock.sentinel.INFO, mock.sentinel.DEV_INFO)
def test_get_volume_connector(self): mock_instance = mock.DEFAULT initiator = self._volumeops._volutils.get_iscsi_initiator.return_value expected = {'ip': CONF.my_ip, 'host': CONF.host, 'initiator': initiator} response = self._volumeops.get_volume_connector(instance=mock_instance) self._volumeops._volutils.get_iscsi_initiator.assert_called_once_with() self.assertEqual(expected, response)