我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用fixtures.MockPatch()。
def test_parser(self): fixture = fixtures.MockPatch('fuel_ccp.deploy.deploy_components') dc_mock = self.useFixture(fixture).mock fixture = fixtures.MockPatch( 'fuel_ccp.validation.service.validate_service_definitions') self.useFixture(fixture) self.useFixture(fixtures.MockPatch( 'fuel_ccp.common.utils.get_deploy_components_info', return_value={})) self.useFixture(fixtures.MockPatch( 'fuel_ccp.validation.service.validate_service_versions')) self.argv += self.add_argv self._run_app() if self.components is None: components = None else: components = set(self.components) dc_mock.assert_called_once_with({}, components) for k, v in self.action_vals.items(): self.assertEqual(config.CONF.action[k], v)
def _setUp(self): self.client = mock.Mock(k8s_client.K8sClient) self.useFixture(fixtures.MockPatch( 'kuryr_kubernetes.clients.get_kubernetes_client', lambda: self.client))
def _setUp(self): self.client = mock.Mock() self.useFixture(fixtures.MockPatch( 'kuryr_kubernetes.clients.get_neutron_client', lambda: self.client))
def setUp(self): super(TestRetryHandler, self).setUp() self.now = time.time() f_time = self.useFixture(fixtures.MockPatch('time.time')) f_time.mock.return_value = self.now
def setUp(self): super(TestCase, self).setUp() self.conf = cfg.ConfigOpts() default_config_files = self.get_default_config_files() common_config.parse(self.conf, [], default_config_files=default_config_files) # NOTE(jeffrey4l): mock the _get_image_dir method to return a fake # docker images dir self.useFixture(fixtures.MockPatch( 'kolla.cmd.build.KollaWorker._get_images_dir', mock.Mock(return_value=os.path.join(TESTS_ROOT, 'docker'))))
def setUp(self): super(TestBase, self).setUp() db_url = os.environ.get('PIFPAF_URL', "sqlite://").replace( "mysql://", "mysql+pymysql://") engine = urlparse.urlparse(db_url).scheme # in case some drivers have additional specification, for example: # PyMySQL will have scheme mysql+pymysql engine = engine.split('+')[0] # NOTE(Alexei_987) Shortcut to skip expensive db setUp test_method = self._get_test_method() if (hasattr(test_method, '_run_with') and engine not in test_method._run_with): raise testcase.TestSkipped( 'Test is not applicable for %s' % engine) self.CONF = service.prepare_service([], []) manager = self.DRIVER_MANAGERS.get(engine) if not manager: self.skipTest("missing driver manager: %s" % engine) self.db_manager = manager(db_url, self.CONF) self.useFixture(self.db_manager) self.conn = self.db_manager.connection self.conn.upgrade() self.useFixture(fixtures.MockPatch('panko.storage.get_connection', side_effect=self._get_connection))
def setUp(self): super(TestQuery, self).setUp() self.useFixture(fixtures.MonkeyPatch( 'pecan.response', mock.MagicMock())) self.useFixture(fixtures.MockPatch('panko.api.controllers.v2.events' '._build_rbac_query_filters', return_value={'t_filter': [], 'admin_proj': None}))
def setUp(self): super(TestFetch, self).setUp() # Creating temporaty directory for repos self.tmp_path = self.useFixture(fixtures.TempDir()).path self.conf['repositories']['path'] = self.tmp_path if self.clone_side_effect: fixture_clone = fixtures.MockPatch('git.Repo.clone_from') self.mock_clone = self.useFixture(fixture_clone).mock self.mock_clone.side_effect = self.clone_side_effect else: self.repo = mock.Mock() self.repo.git.checkout.side_effect = self.checkout_side_effect fixture_clone = fixtures.MockPatch('git.Repo.clone_from', return_value=self.repo) self.mock_clone = self.useFixture(fixture_clone).mock
def test_fetch_repository(self): component_def = self.component_def.copy() component_def.update(self.update_def) fixture = fixtures.MockPatch('os.path.isdir') isdir_mock = self.useFixture(fixture).mock isdir_mock.return_value = self.dir_exists status = fetch.fetch_repository(component_def) git_path = os.path.join(self.tmp_path, component_def['name']) isdir_mock.assert_called_once_with(git_path) if self.dir_exists: self.assertEqual(status['status'], fetch.ALREADY_EXISTED_STATUS) self.mock_clone.assert_not_called() elif self.clone_side_effect: self.mock_clone.assert_called_once_with('theurl', git_path) self.assertEqual(status['status'], fetch.CLONE_FAILED_STATUS) else: git_ref = component_def.get('git_ref') self.mock_clone.assert_called_once_with('theurl', git_path) if git_ref and self.checkout_side_effect: self.repo.git.checkout.assert_called_once_with(git_ref) self.assertEqual(status['status'], fetch.CHECKOUT_FAILED_STATUS) elif git_ref: self.repo.git.checkout.assert_called_once_with(git_ref) self.assertEqual(status['status'], fetch.FETCH_SUCCEEDED_STATUS) else: self.assertEqual(status['status'], fetch.FETCH_SUCCEEDED_STATUS)
def test_get_configmaps_version(self): self.useFixture(fixtures.MockPatch( "fuel_ccp.deploy._get_service_files_hash", return_value='222')) cm_list = [mock.Mock(obj={'metadata': {'resourceVersion': '1'}}) for _ in range(3)] self.assertEqual('111222', deploy._get_configmaps_version( cm_list, mock.ANY, mock.ANY)) cm_list = [] self.assertEqual('222', deploy._get_configmaps_version( cm_list, mock.ANY, mock.ANY))
def test_get_service_files_hash(self): files = { 'file': {'content': '/tmp/file'} } self.useFixture(fixtures.MockPatch( "fuel_ccp.common.jinja_utils.jinja_render", return_value='rendered')) expected_hash = '86e85bd63aef5a740d4b7b887ade37ec9017c961' self.assertEqual( expected_hash, deploy._get_service_files_hash(files, {}))
def setUp(self): super(TestDeployProcessPorts, self).setUp() fixture = self.useFixture(fixtures.MockPatch( "fuel_ccp.kubernetes.process_object")) self.create_obj = fixture.mock
def setUp(self): super(TestParser, self).setUp() fixture = fixtures.MockPatch('fuel_ccp.fetch.fetch_repositories') self.fetch_mock = self.useFixture(fixture).mock self.useFixture( fixtures.MockPatch('fuel_ccp.config.load_component_defaults'))
def test_parser(self): fixture = fixtures.MockPatch('fuel_ccp.build.build_components') bc_mock = self.useFixture(fixture).mock self._run_app() bc_mock.assert_called_once_with(components=self.components)
def test_parser(self): self.useFixture(fixtures.MockPatch('os.path.exists', return_value=False)) self._run_app() self.fetch_mock.assert_called_once_with()
def test_parser(self): fixture = fixtures.MockPatch('fuel_ccp.cleanup.cleanup') c_mock = self.useFixture(fixture).mock self._run_app() insecure = self.margs.pop('insecure', None) ca_cert = self.margs.pop('ca_cert', None) self.margs['verify'] = ca_cert or not insecure c_mock.assert_called_once_with(**self.margs)
def test_get_cli_config(self): self.useFixture(fixtures.MockPatch( 'argparse.ArgumentParser.error', side_effect=ArgumentParserError)) result = cli.CCPApp.get_config_file(self.argv) self.assertEqual(result, self.expected_result)
def setUp(self): super(TestServiceValidation, self).setUp() self.useFixture(fixtures.MockPatch("fuel_ccp.dsl_version", "0.2.0"))
def test_load_with_includes(self): self.files_mocks = {} for name, content in six.iteritems(self.files): if content.startswith('\n'): lines = content.splitlines()[1:] indent = len(lines[0]) - len(lines[0].lstrip(' ')) content = '\n'.join(l[indent:] for l in lines) m = mock.mock_open(read_data=content) self.files_mocks[name] = m.return_value fixture = fixtures.MockPatch('six.moves.builtins.open') self.mock_open = self.useFixture(fixture).mock self.mock_open.side_effect = self.files_mocks.__getitem__ res = _yaml.load_with_includes('config') self.assertEqual(res, self.expected_result)
def test_object_create(self): obj_dict = {'kind': self.kind, 'metadata': {'name': 'test'}} m_obj = mock.Mock(exists=mock.Mock(return_value=False)) m_class = self.useFixture(fixtures.MockPatch( 'pykube.{}'.format(self.kind), return_value=m_obj)) kubernetes.process_object(obj_dict, client=mock.Mock()) m_class.mock.assert_called_once_with(mock.ANY, obj_dict) m_obj.create.assert_called_once_with()
def test_object_update(self): obj_dict = {'kind': self.kind, 'metadata': {'name': 'test'}} m_obj = mock.Mock(exists=mock.Mock(return_value=True)) m_class = self.useFixture(fixtures.MockPatch( 'pykube.{}'.format(self.kind), return_value=m_obj)) kubernetes.process_object(obj_dict, client=mock.Mock()) m_class.mock.assert_called_once_with(mock.ANY, obj_dict) m_obj.exists.assert_called_once_with() m_obj.create.assert_not_called() if self.reload: m_obj.reload.assert_called_once_with() if self.update: m_obj.update.assert_called_once_with()
def patch(self, target, **kwargs): mockfixture = self.useFixture(fixtures.MockPatch(target, **kwargs)) return mockfixture.mock
def setUp(self): self.server_group_mock = mock.Mock() self.useFixture(fixtures.MockPatch( 'moganclient.v1.server_group.ServerGroupManager', return_value=self.server_group_mock)) super(TestServerGroup, self).setUp()
def setUp(self): super(TestCase, self).setUp() if swexc: self.useFixture(fixtures.MockPatch( 'swiftclient.client.Connection', FakeSwiftClient)) if self.conf.storage.driver == 'file': tempdir = self.useFixture(fixtures.TempDir()) self.conf.set_override('file_basepath', tempdir.path, 'storage') elif self.conf.storage.driver == 'ceph': pool_name = uuid.uuid4().hex with open(os.devnull, 'w') as f: subprocess.call("rados -c %s mkpool %s" % ( os.getenv("CEPH_CONF"), pool_name), shell=True, stdout=f, stderr=subprocess.STDOUT) self.conf.set_override('ceph_pool', pool_name, 'storage') # Override the bucket prefix to be unique to avoid concurrent access # with any other test self.conf.set_override("s3_bucket_prefix", str(uuid.uuid4())[:26], "storage") self.storage = storage.get_driver(self.conf, self.coord) self.incoming = incoming.get_driver(self.conf) if self.conf.storage.driver == 'redis': # Create one prefix per test self.storage.STORAGE_PREFIX = str(uuid.uuid4()).encode() if self.conf.incoming.driver == 'redis': self.incoming.SACK_PREFIX = str(uuid.uuid4()) self.storage.upgrade() self.incoming.upgrade(128)
def setUp(self): self.rpc_api = mock.Mock() self.useFixture(fixtures.MockPatch('mogan.engine.rpcapi.EngineAPI', return_value=self.rpc_api)) self.image_api = mock.Mock() self.useFixture(fixtures.MockPatch('mogan.image.api.API', return_value=self.image_api)) self.network_api = mock.Mock() self.useFixture(fixtures.MockPatch('mogan.network.api.API', return_value=self.network_api)) self.image_api.get.return_value = _get_fake_image() self.network_api.validate_networks.return_value = 100 super(TestServers, self).setUp() self._prepare_flavor()
def setUp(self): super(TestCase, self).setUp() self.conf = cfg.ConfigOpts() default_config_files = self.get_default_config_files() common_config.parse(self.conf, [], default_config_files=default_config_files) # NOTE(jeffrey4l): mock the _get_image_dir method to return a fake # docker images dir self.useFixture(fixtures.MockPatch( 'kolla.image.build.KollaWorker._get_images_dir', mock.Mock(return_value=os.path.join(TESTS_ROOT, 'docker'))))