我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.MagicMock()。
def test_cli_with_login_username_only(self, mocked_method): mocked = mock.MagicMock() mocked.login.return_value = {"Status": "Login Succeeded"} mocked.push.return_value = [ {"stream": "In process"}, {"status": "Successfully pushed"}] mocked_method.return_value = mocked with FakeProjectDirectory() as tmpdir: add_sh_fake_config(tmpdir) runner = CliRunner() result = runner.invoke( cli, ["dev", "--version", "test", "--apikey", "apikey"]) assert result.exit_code == 0 mocked.login.assert_called_with( email=None, password=' ', reauth=False, registry='registry', username='apikey') mocked.push.assert_called_with( 'registry/user/project:test', decode=True, insecure_registry=False, stream=True)
def test_transform_to_recordstore(self): # simply verify that the transform method is called first, then # rdd to recordstore kafka_stream = MagicMock(name='kafka_stream') transformed_stream = MagicMock(name='transformed_stream') kafka_stream.transform.return_value = transformed_stream MonMetricsKafkaProcessor.transform_to_recordstore( kafka_stream) # TODO(someone) figure out why these asserts now fail # transformed_stream_expected = call.foreachRDD( # MonMetricsKafkaProcessor.rdd_to_recordstore # ).call_list() # kafka_stream_expected = call.transform( # MonMetricsKafkaProcessor.store_offset_ranges # ).call_list() # self.assertEqual(kafka_stream_expected, kafka_stream.mock_calls) # self.assertEqual(transformed_stream_expected, # transformed_stream.mock_calls)
def test_transform_service_heartbeat(self, coordinator): # mock coordinator fake_kazoo_driver = MagicMock(name="MagicKazooDriver", spec=KazooDriver) coordinator.return_value = fake_kazoo_driver # option1 serv_thread = transform_service.TransformService() serv_thread.daemon = True serv_thread.start() time.sleep(2) # option2 # mocks dont seem to work when spawning a service # pid = _spawn_transform_service() # time.sleep() # os.kill(pid, signal.SIGNAL_SIGTERM) fake_kazoo_driver.heartbeat.assert_called_with()
def test_make_vif_subnets(self, m_mk_fixed_ip, m_make_vif_subnet): subnet_id = mock.sentinel.subnet_id ip_address = mock.sentinel.ip_address fixed_ip = mock.sentinel.fixed_ip subnet = mock.Mock() subnets = mock.MagicMock() subnets.__contains__.return_value = True m_mk_fixed_ip.return_value = fixed_ip m_make_vif_subnet.return_value = subnet port = {'fixed_ips': [ {'subnet_id': subnet_id, 'ip_address': ip_address}]} self.assertEqual([subnet], ovu._make_vif_subnets(port, subnets)) m_make_vif_subnet.assert_called_once_with(subnets, subnet_id) m_mk_fixed_ip.assert_called_once_with(address=ip_address) subnet.ips.objects.append.assert_called_once_with(fixed_ip)
def test_get_parent_port(self): cls = nested_vif.NestedPodVIFDriver m_driver = mock.Mock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client node_fixed_ip = mock.sentinel.node_fixed_ip pod_status = mock.MagicMock() pod_status.__getitem__.return_value = node_fixed_ip pod = mock.MagicMock() pod.__getitem__.return_value = pod_status parent_port = mock.sentinel.parent_port m_driver._get_parent_port_by_host_ip.return_value = parent_port cls._get_parent_port(m_driver, neutron, pod) m_driver._get_parent_port_by_host_ip.assert_called_once()
def test_get_subnet(self, m_osv_subnet, m_osv_network): neutron = self.useFixture(k_fix.MockNeutronClient()).client subnet = mock.MagicMock() network = mock.MagicMock() subnet_id = mock.sentinel.subnet_id network_id = mock.sentinel.network_id neutron_subnet = {'network_id': network_id} neutron_network = mock.sentinel.neutron_network neutron.show_subnet.return_value = {'subnet': neutron_subnet} neutron.show_network.return_value = {'network': neutron_network} m_osv_subnet.return_value = subnet m_osv_network.return_value = network ret = default_subnet._get_subnet(subnet_id) self.assertEqual(network, ret) neutron.show_subnet.assert_called_once_with(subnet_id) neutron.show_network.assert_called_once_with(network_id) m_osv_subnet.assert_called_once_with(neutron_subnet) m_osv_network.assert_called_once_with(neutron_network) network.subnets.objects.append.assert_called_once_with(subnet)
def test_get_in_use_vlan_ids_set(self): cls = nested_vlan_vif.NestedVlanPodVIFDriver m_driver = mock.Mock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client vlan_ids = set() trunk_id = mock.sentinel.trunk_id vlan_ids.add('100') port = mock.MagicMock() port.__getitem__.return_value = '100' trunk_obj = mock.MagicMock() trunk_obj.__getitem__.return_value = [port] trunk = mock.MagicMock() trunk.__getitem__.return_value = trunk_obj neutron.show_trunk.return_value = trunk self.assertEqual(vlan_ids, cls._get_in_use_vlan_ids_set(m_driver, trunk_id))
def test_request_vif(self): cls = vif_pool.BaseVIFPool m_driver = mock.MagicMock(spec=cls) pod = get_pod_obj() project_id = mock.sentinel.project_id subnets = mock.sentinel.subnets security_groups = [mock.sentinel.security_groups] vif = mock.sentinel.vif m_driver._get_port_from_pool.return_value = vif oslo_cfg.CONF.set_override('ports_pool_min', 5, group='vif_pool') pool_length = 5 m_driver._get_pool_size.return_value = pool_length self.assertEqual(vif, cls.request_vif(m_driver, pod, project_id, subnets, security_groups))
def test_request_vif_empty_pool(self, m_eventlet): cls = vif_pool.BaseVIFPool m_driver = mock.MagicMock(spec=cls) host_addr = mock.sentinel.host_addr pod_status = mock.MagicMock() pod_status.__getitem__.return_value = host_addr pod = mock.MagicMock() pod.__getitem__.return_value = pod_status project_id = mock.sentinel.project_id subnets = mock.sentinel.subnets security_groups = [mock.sentinel.security_groups] m_driver._get_port_from_pool.side_effect = ( exceptions.ResourceNotReady(pod)) self.assertRaises(exceptions.ResourceNotReady, cls.request_vif, m_driver, pod, project_id, subnets, security_groups) m_eventlet.assert_called_once()
def test__get_in_use_ports(self): cls = vif_pool.BaseVIFPool m_driver = mock.MagicMock(spec=cls) kubernetes = self.useFixture(k_fix.MockK8sClient()).client pod = get_pod_obj() port_id = 'f2c1b73a-6a0c-4dca-b986-0d07d09e0c02' versioned_object = jsonutils.dumps({ 'versioned_object.data': { 'active': True, 'address': 'fa:16:3e:ef:e6:9f', 'id': port_id }}) pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF] = ( versioned_object) items = [pod] kubernetes.get.return_value = {'items': items} resp = cls._get_in_use_ports(m_driver) self.assertEqual(resp, [port_id])
def test__return_ports_to_pool_no_update(self, max_pool, m_sleep): cls = vif_pool.NeutronVIFPool m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client pool_key = ('node_ip', 'project_id', tuple(['security_group'])) port_id = mock.sentinel.port_id pool_length = 5 m_driver._recyclable_ports = {port_id: pool_key} m_driver._available_ports_pools = {} oslo_cfg.CONF.set_override('ports_pool_max', max_pool, group='vif_pool') oslo_cfg.CONF.set_override('port_debug', False, group='kubernetes') m_driver._get_ports_by_attrs.return_value = [ {'id': port_id, 'security_groups': ['security_group']}] m_driver._get_pool_size.return_value = pool_length self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver) neutron.update_port.assert_not_called() neutron.delete_port.assert_not_called()
def test__return_ports_to_pool_delete_port(self, m_sleep): cls = vif_pool.NeutronVIFPool m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client pool_key = ('node_ip', 'project_id', tuple(['security_group'])) port_id = mock.sentinel.port_id pool_length = 10 vif = mock.sentinel.vif m_driver._recyclable_ports = {port_id: pool_key} m_driver._available_ports_pools = {} m_driver._existing_vifs = {port_id: vif} oslo_cfg.CONF.set_override('ports_pool_max', 10, group='vif_pool') m_driver._get_ports_by_attrs.return_value = [ {'id': port_id, 'security_groups': ['security_group_modified']}] m_driver._get_pool_size.return_value = pool_length self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver) neutron.update_port.assert_not_called() neutron.delete_port.assert_called_once_with(port_id)
def test__return_ports_to_pool_delete_exception(self, m_sleep): cls = vif_pool.NeutronVIFPool m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client pool_key = ('node_ip', 'project_id', tuple(['security_group'])) port_id = mock.sentinel.port_id pool_length = 10 vif = mock.sentinel.vif m_driver._recyclable_ports = {port_id: pool_key} m_driver._available_ports_pools = {} m_driver._existing_vifs = {port_id: vif} oslo_cfg.CONF.set_override('ports_pool_max', 5, group='vif_pool') m_driver._get_ports_by_attrs.return_value = [ {'id': port_id, 'security_groups': ['security_group_modified']}] m_driver._get_pool_size.return_value = pool_length neutron.delete_port.side_effect = n_exc.PortNotFoundClient self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver) neutron.update_port.assert_not_called() neutron.delete_port.assert_called_once_with(port_id)
def test__return_ports_to_pool_delete_key_error(self, m_sleep): cls = vif_pool.NeutronVIFPool m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client pool_key = ('node_ip', 'project_id', tuple(['security_group'])) port_id = mock.sentinel.port_id pool_length = 10 m_driver._recyclable_ports = {port_id: pool_key} m_driver._available_ports_pools = {} m_driver._existing_vifs = {} oslo_cfg.CONF.set_override('ports_pool_max', 5, group='vif_pool') m_driver._get_ports_by_attrs.return_value = [ {'id': port_id, 'security_groups': ['security_group_modified']}] m_driver._get_pool_size.return_value = pool_length self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver) neutron.update_port.assert_not_called() neutron.delete_port.assert_not_called()
def test__recover_precreated_ports_empty(self, m_get_subnet, m_to_osvif): cls = vif_pool.NeutronVIFPool m_driver = mock.MagicMock(spec=cls) filtered_ports = [] m_driver._get_ports_by_attrs.return_value = filtered_ports oslo_cfg.CONF.set_override('port_debug', False, group='kubernetes') cls._recover_precreated_ports(m_driver) m_driver._get_ports_by_attrs.assert_called_once() m_get_subnet.assert_not_called() m_to_osvif.assert_not_called()
def test__return_ports_to_pool_no_update(self, max_pool, m_sleep): cls = vif_pool.NestedVIFPool m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client pool_key = ('node_ip', 'project_id', tuple(['security_group'])) port_id = mock.sentinel.port_id pool_length = 5 m_driver._recyclable_ports = {port_id: pool_key} m_driver._available_ports_pools = {} oslo_cfg.CONF.set_override('ports_pool_max', max_pool, group='vif_pool') oslo_cfg.CONF.set_override('port_debug', False, group='kubernetes') m_driver._get_ports_by_attrs.return_value = [ {'id': port_id, 'security_groups': ['security_group']}] m_driver._get_pool_size.return_value = pool_length self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver) neutron.update_port.assert_not_called() neutron.delete_port.assert_not_called()
def test_release_vif_parent_not_found(self): cls = nested_macvlan_vif.NestedMacvlanPodVIFDriver m_driver = mock.Mock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client port_id = lib_utils.get_hash() pod = mock.sentinel.pod vif = mock.Mock() vif.id = port_id container_mac = mock.sentinel.mac_address container_ip = mock.sentinel.ip_address container_port = self._get_fake_port(port_id, container_ip, container_mac) neutron.show_port.return_value = container_port m_driver.lock = mock.MagicMock(spec=threading.Lock()) m_driver._get_parent_port.side_effect = n_exc.NeutronClientException self.assertRaises(n_exc.NeutronClientException, cls.release_vif, m_driver, pod, vif) neutron.show_port.assert_called_once_with(port_id) m_driver._get_parent_port.assert_called_once_with(neutron, pod) m_driver._remove_from_allowed_address_pairs.assert_not_called() neutron.delete_port.assert_not_called()
def test_has_port_changes(self): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) m_service = mock.MagicMock() m_handler._get_service_ports.return_value = [ {'port': 1, 'name': 'X', 'protocol': 'TCP'}, ] m_lbaas_spec = mock.MagicMock() m_lbaas_spec.ports = [ obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1), obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2), ] ret = h_lbaas.LBaaSSpecHandler._has_port_changes( m_handler, m_service, m_lbaas_spec) self.assertTrue(ret)
def test_has_port_changes__no_changes(self): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) m_service = mock.MagicMock() m_handler._get_service_ports.return_value = [ {'port': 1, 'name': 'X', 'protocol': 'TCP'}, {'port': 2, 'name': 'Y', 'protocol': 'TCP'} ] m_lbaas_spec = mock.MagicMock() m_lbaas_spec.ports = [ obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1), obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2), ] ret = h_lbaas.LBaaSSpecHandler._has_port_changes( m_handler, m_service, m_lbaas_spec) self.assertFalse(ret)
def test_annotate(self, m_patch, m_count): m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} resource_version = "123" ret = {'metadata': {'annotations': annotations, "resourceVersion": resource_version}} data = jsonutils.dumps(ret, sort_keys=True) m_resp = mock.MagicMock() m_resp.ok = True m_resp.json.return_value = ret m_patch.return_value = m_resp self.assertEqual(annotations, self.client.annotate( path, annotations, resource_version=resource_version)) m_patch.assert_called_once_with(self.base_url + path, data=data, headers=mock.ANY, cert=(None, None), verify=False)
def test_watch(self, m_get): path = '/test' data = [{'obj': 'obj%s' % i} for i in range(3)] lines = [jsonutils.dumps(i) for i in data] m_resp = mock.MagicMock() m_resp.ok = True m_resp.iter_lines.return_value = lines m_get.return_value = m_resp cycles = 3 self.assertEqual( data * cycles, list(itertools.islice(self.client.watch(path), len(data) * cycles))) self.assertEqual(cycles, m_get.call_count) self.assertEqual(cycles, m_resp.close.call_count) m_get.assert_called_with(self.base_url + path, headers={}, stream=True, params={'watch': 'true'}, cert=(None, None), verify=False)
def upload_to_fileshare_test(self): #pylint: disable=no-self-use """Upload copies files to non-native store correctly with no progress""" import shutil import tempfile temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp()) temp_src_dir = os.path.dirname(temp_file.name) temp_dst_dir = tempfile.mkdtemp() shutil_mock = MagicMock() shutil_mock.copyfile.return_value = None with patch('sfctl.custom_app.shutil', new=shutil_mock): sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False) shutil_mock.copyfile.assert_called_once() temp_file.close() shutil.rmtree(os.path.dirname(temp_file.name)) shutil.rmtree(temp_dst_dir)
def test__power_status_ironic_exception_index_error(self, mock_get_conn): mock_connection = mock.MagicMock(spec_set=['get_relays']) side_effect = IndexError("Gotcha!") mock_connection.get_relays.side_effect = side_effect mock_get_conn.return_value = mock_connection node = obj_utils.create_test_node( self.context, driver='fake_iboot_fake', driver_info=INFO_DICT) info = iboot_power._parse_driver_info(node) status = iboot_power._power_status(info) self.assertEqual(states.ERROR, status) mock_get_conn.assert_called_once_with(info) mock_connection.get_relays.assert_called_once_with()
def test__power_status_retries(self, mock_get_conn): self.config(max_retry=1, group='iboot') mock_connection = mock.MagicMock(spec_set=['get_relays']) side_effect = TypeError("Surprise!") mock_connection.get_relays.side_effect = side_effect mock_get_conn.return_value = mock_connection node = obj_utils.create_test_node( self.context, driver='fake_iboot_fake', driver_info=INFO_DICT) info = iboot_power._parse_driver_info(node) status = iboot_power._power_status(info) self.assertEqual(states.ERROR, status) mock_get_conn.assert_called_once_with(info) self.assertEqual(2, mock_connection.get_relays.call_count)
def test_reboot_good(self, mock_switch, mock_power_status, mock_sleep_switch): self.config(reboot_delay=3, group='iboot') manager = mock.MagicMock(spec_set=['switch', 'sleep']) mock_power_status.return_value = states.POWER_ON manager.attach_mock(mock_switch, 'switch') manager.attach_mock(mock_sleep_switch, 'sleep') expected = [mock.call.switch(self.info, False), mock.call.sleep(3), mock.call.switch(self.info, True)] with task_manager.acquire(self.context, self.node.uuid) as task: task.driver.power.reboot(task) self.assertEqual(expected, manager.mock_calls)
def test_reboot_bad(self, mock_switch, mock_power_status, mock_sleep_switch): self.config(reboot_delay=3, group='iboot') manager = mock.MagicMock(spec_set=['switch', 'sleep']) mock_power_status.return_value = states.POWER_OFF manager.attach_mock(mock_switch, 'switch') manager.attach_mock(mock_sleep_switch, 'sleep') expected = [mock.call.switch(self.info, False), mock.call.sleep(3), mock.call.switch(self.info, True)] with task_manager.acquire(self.context, self.node.uuid) as task: self.assertRaises(ironic_exception.PowerStateFailure, task.driver.power.reboot, task) self.assertEqual(expected, manager.mock_calls)
def test__execute_nm_command(self, addr_mock, raw_mock): addr_mock.return_value = ('0x0A', '0x0B') raw_mock.return_value = ('0x03 0x04', '') fake_data = {'foo': 'bar'} fake_command = mock.MagicMock() fake_parse = mock.MagicMock() fake_command.return_value = ('0x01', '0x02') with task_manager.acquire(self.context, self.node.uuid, shared=False) as task: nm_vendor._execute_nm_command(task, fake_data, fake_command, fake_parse) self.assertEqual('single', task.node.driver_info['ipmi_bridging']) self.assertEqual('0x0A', task.node.driver_info['ipmi_target_channel']) self.assertEqual('0x0B', task.node.driver_info['ipmi_target_address']) fake_command.assert_called_once_with(fake_data) raw_mock.assert_called_once_with(task, '0x01 0x02') fake_parse.assert_called_once_with(['0x03', '0x04'])
def test_bind_port(self): self.vif_details = {'ovs_hybrid_plug': True} network = mock.MagicMock(spec=api.NetworkContext) port_context = mock.MagicMock( spec=ctx.PortContext, current={'id': 'CURRENT_CONTEXT_ID'}, segments_to_bind=[self.valid_segment], network=network) # when port is bound self.bind_port(port_context) # port_context. # then context binding is setup with returned vif_type and valid # segment api ID port_context.set_binding.assert_called_once_with( self.valid_segment[api.ID], 'ovs', self.vif_details)
def test_fetch_from_url_or_retry_post_json(self): # mocked requests identifier = "1csb, 2pah" base_url = c.http_pdbe endpoint_url = "api/pdb/entry/summary/" response = response_mocker(kwargs={}, base_url=base_url, endpoint_url=endpoint_url, content_type='application/octet-stream', post=True, data=identifier) self.fetch_from_url_or_retry = MagicMock(return_value=response) url = base_url + endpoint_url + identifier r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier, header={'application/octet-stream'}, retry_in=None, wait=0, n_retries=10, stream=False).json() self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
def setUp(self): client = mock.MagicMock() bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP) logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging') self.addCleanup(logging_patch.stop) logging_patch.start() self.system = TrailingOrders(client, bootstrap) self.system.get_pending_orders = mock.MagicMock(return_value=[]) self.set_next_operation = mock.MagicMock() next_operation_patcher = mock.patch( 'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation ) self.addCleanup(next_operation_patcher.stop) next_operation_patcher.start()
def test_decrypt(self, mock_import_module): mock_crypter_module = mock.MagicMock() mock_crypter_module.__name__ = 'secretcrypt.mock_crypter' def mock_import_side_effect(*args, **kwargs): self.assertEqual(kwargs['package'], secretcrypt.__name__) if args[0] == '.mock_crypter': return mock_crypter_module raise Exception('Importing wrong module') mock_import_module.side_effect = mock_import_side_effect secret = StrictSecret('mock_crypter:key=value&key2=value2:myciphertext') self.assertEqual(secret._decrypt_params, dict(key='value', key2='value2')) self.assertEqual(secret._ciphertext, b'myciphertext') secret.decrypt() secret.decrypt() mock_crypter_module.decrypt.assert_called_with( b'myciphertext', key='value', key2='value2', )
def test_eager_decrypt(self, mock_import_module): mock_crypter_module = mock.MagicMock() mock_crypter_module.decrypt.side_effect = lambda *args, **kwargs: b'plaintext' mock_crypter_module.__name__ = 'secretcrypt.mock_crypter' def mock_import_side_effect(*args, **kwargs): self.assertEqual(kwargs['package'], secretcrypt.__name__) if args[0] == '.mock_crypter': return mock_crypter_module raise Exception('Importing wrong module') mock_import_module.side_effect = mock_import_side_effect secret = Secret('mock_crypter:key=value&key2=value2:myciphertext') mock_crypter_module.decrypt.assert_called_with( b'myciphertext', key='value', key2='value2', ) mock_crypter_module.reset_mock() plaintext = secret.get() self.assertEqual(b'plaintext', plaintext) mock_crypter_module.assert_not_called()
def test_run_with_existing_remote_build_container(self, skipper_runner_run_mock, requests_get_mock): requests_response_class_mock = mock.MagicMock(spec='requests.Response') requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb', 'build-container-tag'] } requests_response_mock.status_code = http_client.OK requests_get_mock.return_value = requests_response_mock command = ['ls', '-l'] run_params = command self._invoke_cli( global_params=self.global_params, subcmd='run', subcmd_params=run_params ) expected_image_name = 'registry.io:5000/build-container-image:build-container-tag' skipper_runner_run_mock.assert_called_once_with(command, fqdn_image=expected_image_name, environment=[], interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False)
def test_run_with_non_existing_build_container(self, requests_get_mock): requests_response_class_mock = mock.MagicMock(spec='requests.Response') requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb'] } requests_get_mock.return_value = requests_response_mock command = ['ls', '-l'] run_params = command ret = self._invoke_cli( global_params=self.global_params, subcmd='run', subcmd_params=run_params ) self.assertIsInstance(ret.exception, click.exceptions.ClickException)
def test_cli_with_custom_login(self, mocked_method): mocked = mock.MagicMock() mocked.login.return_value = {"Status": "Login Succeeded"} mocked.push.return_value = [ {"stream": "In process"}, {"status": "Successfully pushed"}] mocked_method.return_value = mocked with FakeProjectDirectory() as tmpdir: add_sh_fake_config(tmpdir) runner = CliRunner() result = runner.invoke( cli, ["dev", "--version", "test", "--username", "user", "--password", "pass", "--email", "mail"]) assert result.exit_code == 0 mocked.login.assert_called_with( email=u'mail', password=u'pass', reauth=False, registry='registry', username=u'user') mocked.push.assert_called_with( 'registry/user/project:test', decode=True, insecure_registry=False, stream=True)
def test_cli_with_insecure_registry(self, mocked_method): mocked = mock.MagicMock() mocked.login.return_value = {"Status": "Login Succeeded"} mocked.push.return_value = [ {"stream": "In process"}, {"status": "Successfully pushed"}] mocked_method.return_value = mocked with FakeProjectDirectory() as tmpdir: add_sh_fake_config(tmpdir) runner = CliRunner() result = runner.invoke( cli, ["dev", "--version", "test", "--insecure"]) assert result.exit_code == 0 assert not mocked.login.called mocked.push.assert_called_with( 'registry/user/project:test', decode=True, insecure_registry=True, stream=True)
def test_cli(self, test_mock, mocked_method): mocked = mock.MagicMock() mocked.build.return_value = [ {"stream": "all is ok"}, {"stream": "Successfully built 12345"}] mocked_method.return_value = mocked with FakeProjectDirectory() as tmpdir: add_scrapy_fake_config(tmpdir) add_sh_fake_config(tmpdir) add_fake_dockerfile(tmpdir) setup_py_path = os.path.join(tmpdir, 'setup.py') assert not os.path.isfile(setup_py_path) runner = CliRunner() result = runner.invoke(cli, ["dev", "-d"]) assert result.exit_code == 0 mocked.build.assert_called_with( decode=True, path=tmpdir, tag='registry/user/project:1.0') assert os.path.isfile(setup_py_path) test_mock.assert_called_with("dev", None)
def test_cli_custom_version(self, test_mock, mocked_method): mocked = mock.MagicMock() mocked.build.return_value = [ {"stream": "all is ok"}, {"stream": "Successfully built 12345"}] mocked_method.return_value = mocked with FakeProjectDirectory() as tmpdir: add_scrapy_fake_config(tmpdir) add_sh_fake_config(tmpdir) add_fake_dockerfile(tmpdir) runner = CliRunner() result = runner.invoke(cli, ["dev", "--version", "test"]) assert result.exit_code == 0 mocked.build.assert_called_with( decode=True, path=tmpdir, tag='registry/user/project:test') test_mock.assert_called_with("dev", "test")
def test_request_backend(self, options, config): driver = self._get_driver(options, config) driver._relay_request = mock.MagicMock() response = requests.Response() response.status_code = requests.codes.ok driver._relay_request.return_value = response context = mock.MagicMock() data_dict = {} obj_name = 'object' action = 'TEST' code, message = driver._request_backend(context, data_dict, obj_name, action) self.assertEqual(requests.codes.ok, code) self.assertEqual({'message': None}, message) driver._relay_request.assert_called()
def test_create_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', 'tenant_id': ATTR_NOT_SPECIFIED, } driver._request_backend.return_value = (status_code, res_data) response = driver._create_resource(res_type, context, {res_type: res_data}) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'CREATE')
def test_get_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', } fields = ['id'] driver._request_backend.return_value = (status_code, res_data) response = driver._get_resource(res_type, context, res_data['id'], fields) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'READ')
def test_update_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', } driver._request_backend.return_value = (status_code, res_data) response = driver._update_resource(res_type, context, res_data['id'], {res_type: res_data}) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'UPDATE')
def test_list_resource(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'network' res_data = { 'id': 'network-123', } fields = ['id'] driver._request_backend.return_value = (status_code, [res_data]) response = driver._list_resource(res_type, context, None, fields) self.assertEqual(response, [res_data]) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'READALL')
def test_add_router_interface(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'router' res_data = { 'id': 'router-123', } res_id = res_data['id'] driver._request_backend.return_value = (status_code, res_data) response = driver.add_router_interface(context, res_id, res_data) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'ADDINTERFACE')
def test_remove_router_interface(self, options, config): driver = self._get_driver(options, config) driver._request_backend = mock.MagicMock() status_code = requests.codes.ok context = mock.MagicMock() res_type = 'router' res_data = { 'id': 'router-123', } res_id = res_data['id'] driver._request_backend.return_value = (status_code, res_data) response = driver.remove_router_interface(context, res_id, res_data) self.assertEqual(response, res_data) driver._request_backend.assert_called_with(context, mock.ANY, res_type, 'DELINTERFACE')
def test_prepared_query_not_found(self): session = self.make_session() pool = session._pools.get.return_value connection = Mock(spec=Connection) pool.borrow_connection.return_value = (connection, 1) rf = self.make_response_future(session) rf.send_request() session.cluster._prepared_statements = MagicMock(dict) prepared_statement = session.cluster._prepared_statements.__getitem__.return_value prepared_statement.query_string = "SELECT * FROM foobar" prepared_statement.keyspace = "FooKeyspace" rf._connection.keyspace = "FooKeyspace" result = Mock(spec=PreparedQueryNotFound, info='a' * 16) rf._set_result(None, None, None, result) self.assertTrue(session.submit.call_args) args, kwargs = session.submit.call_args self.assertEqual(rf._reprepare, args[-5]) self.assertIsInstance(args[-4], PrepareMessage) self.assertEqual(args[-4].query, "SELECT * FROM foobar")
def test_prepared_query_not_found_bad_keyspace(self): session = self.make_session() pool = session._pools.get.return_value connection = Mock(spec=Connection) pool.borrow_connection.return_value = (connection, 1) rf = self.make_response_future(session) rf.send_request() session.cluster._prepared_statements = MagicMock(dict) prepared_statement = session.cluster._prepared_statements.__getitem__.return_value prepared_statement.query_string = "SELECT * FROM foobar" prepared_statement.keyspace = "FooKeyspace" rf._connection.keyspace = "BarKeyspace" result = Mock(spec=PreparedQueryNotFound, info='a' * 16) rf._set_result(None, None, None, result) self.assertRaises(ValueError, rf.result)
def runTest(self, mock_template, mock_yaml, mock_isdir, mock_isfile, mock_open): ''' Execute test ''' self.config = mock.MagicMock(spec_set={'runbook_path' : '/path/'}) mock_isfile.return_value = True mock_isdir.return_value = True mock_template = mock.MagicMock(**{ 'render.return_value' : """ '*': - book 'target': - book1 - book2 """ }) mock_yaml.return_value = {'*':['book'], 'target':['book1', 'book2']} mock_open.return_value = mock.MagicMock(spec=file) result = cache_runbooks(self.config, self.logger) self.assertEqual(result.keys(), ['*', 'target'], "Expected dictionary keys not found") self.assertTrue(mock_open.called, "open not called") self.assertTrue(mock_yaml.called, "yaml.safe_load not called") self.assertTrue(mock_isdir.called, "os.path.isdir not called") self.assertEqual(mock_isfile.call_count, 4, "mock_open.call_count {0} is not 4".format(mock_open.call_count))
def runTest(self, mock_run, mock_put, mock_local, mock_hide, mock_env, mock_set_env): ''' Execute test ''' # Set mock_env to empty dict mock_env = mock.MagicMock(spec={}) mock_set_env.return_value = mock_env mock_local.return_value = mock.MagicMock(**{ 'succeeded': True}) mock_run.return_value = mock.MagicMock(**{ 'succeeded': True}) mock_put = True mock_hide = True action = { 'type' : 'cmd', 'execute_from' : 'remote', 'cmd' : "bash" } results = execute_runbook(action, self.target, self.config, self.logger) self.assertTrue(results) self.assertFalse(self.logger.warn.called) self.assertTrue(mock_local.called) mock_local.assert_called_with("bash", capture=True)