Python mock 模块,PropertyMock() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.PropertyMock()

项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_request_block_whenIndexIsLatest_thenRequestsLatestBlockFromNode(self):
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = '{"nonce": 12345, "index": 35, "transactions": [], "timestamp": 1234567890, "current_hash": "current_hash", "previous_hash": "previous_hash"}'

        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch("crankycoin.node.Block.current_hash", new_callable=PropertyMock) as patched_block_current_hash, \
                patch("crankycoin.requests.get", return_value=mock_response) as patched_requests:
            patched_block_current_hash.return_value = "current_hash"
            node = FullNode("127.0.0.1", "reward_address")

            block = node.request_block("127.0.0.2", "30013", "latest")

            self.assertIsNotNone(block)
            self.assertEqual(block.index, 35)
            self.assertEqual(block.transactions, [])
            self.assertEqual(block.previous_hash, "previous_hash")
            self.assertEqual(block.current_hash, "current_hash")
            self.assertEqual(block.timestamp, 1234567890)
            self.assertEqual(block.nonce, 12345)
            patched_requests.assert_called_once_with('http://127.0.0.2:30013/block/latest')
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_request_block_whenIndexIsNumeric_thenRequestsCorrectBlockFromNode(self):
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = '{"nonce": 12345, "index": 29, "transactions": [], "timestamp": 1234567890, "current_hash": "current_hash", "previous_hash": "previous_hash"}'

        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch("crankycoin.node.Block.current_hash", new_callable=PropertyMock) as patched_block_current_hash, \
                patch("crankycoin.requests.get", return_value=mock_response) as patched_requests:
            patched_block_current_hash.return_value = "current_hash"
            node = FullNode("127.0.0.1", "reward_address")

            block = node.request_block("127.0.0.2", "30013", 29)

            self.assertIsNotNone(block)
            self.assertEqual(block.index, 29)
            self.assertEqual(block.transactions, [])
            self.assertEqual(block.previous_hash, "previous_hash")
            self.assertEqual(block.current_hash, "current_hash")
            self.assertEqual(block.timestamp, 1234567890)
            self.assertEqual(block.nonce, 12345)
            patched_requests.assert_called_once_with('http://127.0.0.2:30013/block/29')
项目:pg2kinesis    作者:handshake    | 项目源码 | 文件源码
def test_delete_slot(slot):
    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=psycopg2.errorcodes.UNDEFINED_OBJECT):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe)
        slot.delete_slot()
    slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=-1):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
        with pytest.raises(psycopg2.ProgrammingError) as e_info:
            slot.delete_slot()
            slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

            assert e_info.value.pgcode == -1

    slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
    with pytest.raises(Exception):
        slot.delete_slot()
        slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_rackspace_uploader_lookup_url(self, mock1, mock2):
        """Test RACKSPACE UPLOADER lookup returns a valid link."""
        uri = 'https://rackspace.com'
        filename = 'test.jpg'
        with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
            cdn_enabled_mock = PropertyMock(return_value=True)
            type(fake_container).cdn_enabled = cdn_enabled_mock
            mycf.get_container.return_value = fake_container

            u = RackspaceUploader()
            u.init_app(self.flask_app)
            res = u._lookup_url('rackspace', {'filename': filename,
                                              'container': 'user_3'})
            expected_url = "%s/%s" % (uri, filename)
            err_msg = "We should get the following URL: %s" % expected_url
            assert res == expected_url, err_msg
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_rackspace_uploader_lookup_url_enable_cdn(self, mock1, mock2):
        """Test RACKSPACE UPLOADER lookup enables CDN for non enabled CDN."""
        filename = 'test.jpg'
        with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:
            cdn_enabled_mock = PropertyMock(return_value=False)
            type(fake_container).cdn_enabled = cdn_enabled_mock
            mycf.get_container.return_value = fake_container

            u = RackspaceUploader()
            u.init_app(self.flask_app)
            res = u._lookup_url('rackspace', {'filename': filename,
                                              'container': 'user_3'})
            url = 'https://rackspace.com/test.jpg'
            err_msg = "We should get the %s but we got %s " % (url, res)
            assert res == url, err_msg
            calls = [call.make_public()]
            fake_container.assert_has_calls(calls, any_order=True)
项目:osfclient    作者:osfclient    | 项目源码 | 文件源码
def test_create_existing_folder_exist_ok():
    folder = Folder({})
    new_folder_url = ('https://files.osf.io/v1/resources/9zpcy/providers/' +
                      'osfstorage/foo123/?kind=folder')
    folder._new_folder_url = new_folder_url
    folder._put = MagicMock(return_value=FakeResponse(409, None))

    with patch.object(Folder, 'folders',
                      new_callable=PropertyMock) as mock_folder:
        mock_folder.return_value = [MockFolder('foobar'), MockFolder('fudge')]
        existing_folder = folder.create_folder('foobar', exist_ok=True)

    assert existing_folder.name == 'foobar'

    folder._put.assert_called_once_with(new_folder_url,
                                        params={'name': 'foobar'})
项目:hdlcc    作者:suoto    | 项目源码 | 文件源码
def test():
            # We'll add no project file, so the only sources that should
            # be fond are VUnit's files
            it.assertIn('vunit', sys.modules)
            project_filename = p.join(TEST_CONFIG_PARSER_SUPPORT_PATH,
                                      'builder_only_project.prj')
            with mock.patch('hdlcc.builders.MSim.file_types',
                            new_callable=mock.PropertyMock,
                            return_value=('vhdl', )):
                parser = ConfigParser(project_filename)
                sources = parser.getSources()

            vunit_files = 0
            for source in sources:
                if 'vunit' in source.filename.lower():
                    vunit_files += 1

            it.assertEqual(len(sources), vunit_files,
                           "We should only find VUnit files")

            # Check that we find no verilog or systemverilog files
            for filetype in ('verilog', 'systemverilog'):
                it.assertNotIn(filetype, [x.filetype for x in sources],
                               "We should only find VUnit VHDL files")
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_T4A(mocker, rats_response, max_send, max_recv, result):
    clf = nfc.ContactlessFrontend()
    mocker.patch.object(clf, 'exchange', autospec=True)
    mocker.patch('nfc.ContactlessFrontend.max_send_data_size',
                 new_callable=mock.PropertyMock).return_value = max_send
    mocker.patch('nfc.ContactlessFrontend.max_recv_data_size',
                 new_callable=mock.PropertyMock).return_value = max_recv

    target = nfc.clf.RemoteTarget("106A")
    target.sens_res = HEX("4403")
    target.sel_res = HEX("20")
    target.sdd_res = HEX("04832F9A272D80")

    rats_command = 'E070' if max_recv < 256 else 'E080'
    clf.exchange.return_value = HEX(rats_response)
    tag = nfc.tag.activate(clf, target)
    clf.exchange.assert_called_once_with(HEX(rats_command), 0.03)
    assert isinstance(tag, nfc.tag.tt4.Type4Tag)
    assert str(tag) == result
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_T4B(mocker, sensb_res, max_send, max_recv, result):
    clf = nfc.ContactlessFrontend()
    mocker.patch.object(clf, 'exchange', autospec=True)
    mocker.patch('nfc.ContactlessFrontend.max_send_data_size',
                 new_callable=mock.PropertyMock).return_value = max_send
    mocker.patch('nfc.ContactlessFrontend.max_recv_data_size',
                 new_callable=mock.PropertyMock).return_value = max_recv

    target = nfc.clf.RemoteTarget("106B")
    target.sensb_res = HEX(sensb_res)

    attrib_cmd = '1D30702A1C' + ('00070100' if max_recv < 256 else '00080100')
    clf.exchange.return_value = HEX('00')
    tag = nfc.tag.activate(clf, target)
    clf.exchange.assert_called_once_with(HEX(attrib_cmd), 0.03)
    assert isinstance(tag, nfc.tag.tt4.Type4Tag)
    assert str(tag) == result
项目:networking-ovn    作者:netgroup-polito    | 项目源码 | 文件源码
def setUp(self):
        plugin = 'neutron.tests.unit.extensions.test_l3.TestNoL3NatPlugin'
        l3_plugin = ('networking_ovn.l3.l3_ovn.OVNL3RouterPlugin')
        service_plugins = {'l3_plugin_name': l3_plugin}
        # For these tests we need to enable overlapping ips
        cfg.CONF.set_default('allow_overlapping_ips', True)
        cfg.CONF.set_default('max_routes', 3)
        ext_mgr = test_extraroute.ExtraRouteTestExtensionManager()
        super(test_l3.L3BaseForIntTests, self).setUp(
            plugin=plugin, ext_mgr=ext_mgr,
            service_plugins=service_plugins)
        patcher = mock.patch(
            'networking_ovn.l3.l3_ovn.OVNL3RouterPlugin._ovn',
            new_callable=mock.PropertyMock,
            return_value=fakes.FakeOvsdbNbOvnIdl())
        patcher.start()
        self.setup_notification_driver()

    # TODO(rtheis): Skip the following test cases since they are for
    # L3 service plugins that support L3 agent RPC. These tests should
    # be refactored in neutron.
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def setUp(self):
        video_proto = VideoProto()
        video_proto.veda_id = 'XXXXXXXX2014-V00TEST'
        self.upload_filepath = os.path.join(
            os.path.dirname(os.path.abspath(__file__)),
            'test_files',
            'OVTESTFILE_01.mp4'
        )

        with patch.object(Hotstore, '_READ_AUTH', PropertyMock(return_value=lambda: CONFIG_DATA)):
            self.hotstore = Hotstore(
                video_object=video_proto,
                upload_filepath=self.upload_filepath,
                video_proto=video_proto
            )

        # do s3 mocking
        mock = mock_s3_deprecated()
        mock.start()
        conn = S3Connection()
        conn.create_bucket(CONFIG_DATA['veda_s3_hotstore_bucket'])
        self.addCleanup(mock.stop)
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_worker_thread_transfer():
    u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    u._transfer_queue.put(
        (mock.MagicMock, mock.MagicMock, mock.MagicMock, mock.MagicMock)
    )
    u._transfer_queue.put(
        (mock.MagicMock, mock.MagicMock, mock.MagicMock, mock.MagicMock)
    )
    u._process_transfer = mock.MagicMock()
    u._process_transfer.side_effect = [None, Exception()]

    with mock.patch(
            'blobxfer.operations.upload.Uploader.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        patched_tc.side_effect = [False, False, True]
        u._worker_thread_transfer()
        assert u._process_transfer.call_count == 2
        assert len(u._exceptions) == 1
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_worker_thread_upload(ts):
    u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    u._general_options.concurrency.transfer_threads = 1

    u._transfer_set = mock.MagicMock()
    u._transfer_set.__len__.side_effect = [5, 0, 0, 0]
    u._upload_queue.put(mock.MagicMock)
    u._upload_queue.put(mock.MagicMock)
    u._process_upload_descriptor = mock.MagicMock()
    u._process_upload_descriptor.side_effect = [None, Exception()]

    with mock.patch(
            'blobxfer.operations.upload.Uploader.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        patched_tc.side_effect = [False, False, False, False, True]
        u._worker_thread_upload()
        assert u._process_upload_descriptor.call_count == 2
        assert len(u._exceptions) == 1
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_submit_run(self, mock_requests):
        mock_requests.codes.ok = 200
        mock_requests.post.return_value.json.return_value = {'run_id': '1'}
        status_code_mock = mock.PropertyMock(return_value=200)
        type(mock_requests.post.return_value).status_code = status_code_mock
        json = {
          'notebook_task': NOTEBOOK_TASK,
          'new_cluster': NEW_CLUSTER
        }
        run_id = self.hook.submit_run(json)

        self.assertEquals(run_id, '1')
        mock_requests.post.assert_called_once_with(
            submit_run_endpoint(HOST),
            json={
                'notebook_task': NOTEBOOK_TASK,
                'new_cluster': NEW_CLUSTER,
            },
            auth=(LOGIN, PASSWORD),
            headers=USER_AGENT_HEADER,
            timeout=self.hook.timeout_seconds)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_get_run_state(self, mock_requests):
        mock_requests.codes.ok = 200
        mock_requests.get.return_value.json.return_value = GET_RUN_RESPONSE
        status_code_mock = mock.PropertyMock(return_value=200)
        type(mock_requests.get.return_value).status_code = status_code_mock

        run_state = self.hook.get_run_state(RUN_ID)

        self.assertEquals(run_state, RunState(
            LIFE_CYCLE_STATE,
            RESULT_STATE,
            STATE_MESSAGE))
        mock_requests.get.assert_called_once_with(
            get_run_endpoint(HOST),
            json={'run_id': RUN_ID},
            auth=(LOGIN, PASSWORD),
            headers=USER_AGENT_HEADER,
            timeout=self.hook.timeout_seconds)
项目:charm-trove    作者:openstack    | 项目源码 | 文件源码
def test_setup_endpoint(self):
        self.patch_object(trove.TroveCharm, 'service_name',
                          new_callable=mock.PropertyMock)
        self.patch_object(trove.TroveCharm, 'region',
                          new_callable=mock.PropertyMock)
        self.patch_object(trove.TroveCharm, 'public_url',
                          new_callable=mock.PropertyMock)
        self.patch_object(trove.TroveCharm, 'internal_url',
                          new_callable=mock.PropertyMock)
        self.patch_object(trove.TroveCharm, 'admin_url',
                          new_callable=mock.PropertyMock)
        self.service_name.return_value = 'type1'
        self.region.return_value = 'region1'
        self.public_url.return_value = 'public_url'
        self.internal_url.return_value = 'internal_url'
        self.admin_url.return_value = 'admin_url'
        keystone = mock.MagicMock()
        trove.setup_endpoint(keystone)
        keystone.register_endpoints.assert_called_once_with(
            'trove', 'region1', 'public_url/v1.0/%(tenant_id)s',
            'internal_url/v1.0/%(tenant_id)s',
            'admin_url/v1.0/%(tenant_id)s')
项目:clusterfuzz-tools    作者:google    | 项目源码 | 文件源码
def test_write(self):
    """Tests write_content."""
    # name is a special attribute in mock.Mock. Therefore, it needs a special
    # way of mocking.
    tmp_file = mock.Mock()
    type(tmp_file).name = mock.PropertyMock(return_value='/tmp/file')

    tmp = mock.Mock()
    tmp.__enter__ = mock.Mock(return_value=tmp_file)
    tmp.__exit__ = mock.Mock(return_value=False)
    self.mock.NamedTemporaryFile.return_value = tmp

    android.write_content('/test.html', 'content')

    self.mock.NamedTemporaryFile.assert_called_once_with(delete=False)
    self.mock.adb.assert_called_once_with('push /tmp/file /test.html')
    self.mock.adb_shell.assert_called_once_with('chmod 0644 /test.html')
    self.mock.delete_if_exists.assert_called_once_with('/tmp/file')
    tmp_file.write.assert_called_once_with('content')
项目:murano-pkg-check    作者:openstack    | 项目源码 | 文件源码
def test_errors(self, m_error):
        errors = dict()
        register = error.Register(errors, prefix='PRE')
        report = error.Report(errors, prefix='PRE')
        register.F042(description='Fake error')
        self.assertRaises(ValueError, register.__getattr__,
                          code='F042')
        self.assertEqual({'PRE:F042': {'code': 'PRE:F042',
                                       'description': 'Fake error'}},
                         errors)
        yaml_obj = mock.MagicMock(__yaml_meta__=mock.Mock(line=1,
                                                          column=1))
        type(yaml_obj.__yaml_meta__).name = mock.PropertyMock(
            return_value='fake.yaml')
        yaml_obj.__yaml_meta__.get_snippet.return_value = 'fake_code'
        report.F042('It is an error!', yaml_obj)
        m_error.assert_called_once_with(
            code='PRE:F042',
            column=2,
            line=2,
            filename='fake.yaml',
            message='It is an error!',
            source='fake_code')
        self.assertRaises(ValueError, report.__getattr__,
                          code='FAKE')
项目:charm-manila    作者:openstack    | 项目源码 | 文件源码
def test_url_endpoints_creation(self):
        # Tests that the endpoint functions call through to the baseclass
        self.patch_object(manila.charms_openstack.charm.OpenStackCharm,
                          'public_url', new_callable=mock.PropertyMock)
        self.patch_object(manila.charms_openstack.charm.OpenStackCharm,
                          'internal_url', new_callable=mock.PropertyMock)
        self.patch_object(manila.charms_openstack.charm.OpenStackCharm,
                          'admin_url', new_callable=mock.PropertyMock)
        self.public_url.return_value = 'p1'
        self.internal_url.return_value = 'i1'
        self.admin_url.return_value = 'a1'
        c = self._patch_config_and_charm({})
        self.assertEqual(c.public_url, 'p1/v1/%(tenant_id)s')
        self.assertEqual(c.internal_url, 'i1/v1/%(tenant_id)s')
        self.assertEqual(c.admin_url, 'a1/v1/%(tenant_id)s')
        self.assertEqual(c.public_url_v2, 'p1/v2/%(tenant_id)s')
        self.assertEqual(c.internal_url_v2, 'i1/v2/%(tenant_id)s')
        self.assertEqual(c.admin_url_v2, 'a1/v2/%(tenant_id)s')
项目:inspire-crawler    作者:inspirehep    | 项目源码 | 文件源码
def test_receivers_exception(app, db, sample_record_string):
    """Test receivers."""
    responses.add(
        responses.POST, "http://localhost:6800/schedule.json",
        body=json.dumps({"jobid": None, "status": "ok"}),
        status=200
    )

    mock_record = MagicMock()
    prop_mock = PropertyMock(return_value=sample_record_string)
    type(mock_record).raw = prop_mock

    with app.app_context():
        with pytest.raises(CrawlerScheduleError):
            receive_oaiharvest_job(
                request=None,
                records=[mock_record],
                name="",
                spider="Test",
                workflow="test"
            )
项目:odl-video-service    作者:mitodl    | 项目源码 | 文件源码
def test_video_task_chain(mocker):
    """
    Test that video task get_task_id method returns the correct id from the chain.
    """
    def ctx():
        """ Return a mock context object """
        return celery.app.task.Context({
            'lang': 'py',
            'task': 'cloudsync.tasks.stream_to_s3',
            'id': '1853b857-84d8-4af4-8b19-1c307c1e07d5',
            'chain': [{
                'task': 'cloudsync.tasks.transcode_from_s3',
                'args': [351],
                'kwargs': {},
                'options': {
                    'task_id': '1a859e5a-8f71-4e01-9349-5ebc6dc66631'
                }
            }]
        })
    mocker.patch('cloudsync.tasks.VideoTask.request', new_callable=PropertyMock, return_value=ctx())
    task = VideoTask()
    assert task.get_task_id() == task.request.chain[0]['options']['task_id']
项目:odl-video-service    作者:mitodl    | 项目源码 | 文件源码
def test_video_task_bad_chain(mocker):
    """
    Test that video task get_task_id method returns the task.id if the chain is not valid.
    """
    def ctx():
        """ Return a mock context object """
        return celery.app.task.Context({
            'lang': 'py',
            'task': 'cloudsync.tasks.stream_to_s3',
            'id': '1853b857-84d8-4af4-8b19-1c307c1e07d5',
            'chain': [{
                'task': 'cloudsync.tasks.transcode_from_s3',
                'args': [351],
                'kwargs': {},
                'options': {}
            }]
        })
    mocker.patch('cloudsync.tasks.VideoTask.request', new_callable=PropertyMock, return_value=ctx())
    task = VideoTask()
    assert task.get_task_id() is None
项目:annotran    作者:BirkbeckCTP    | 项目源码 | 文件源码
def test_read_unauthenticated_user():
    """
        This should call "read_group" when trying to read
        translations for the selected combination of a group, page, and language.
        read_group method is mocked, so when it is called a group object is returned.
    """
    with mock.patch('annotran.languages.models.Language') as language:
        propLang = PropertyMock(return_value=2897)
        type(language).id = propLang
        language.get_by_public_language_id = MagicMock(return_value=language)

        with mock.patch('h.groups.models.Group') as group:
            propGroup = PropertyMock(return_value=2897)
            type(group).id = propGroup
            group.get_by_pubid = MagicMock(return_value=group)

            request = mock.Mock(authenticated_userid = None,
                                matchdict={'public_group_id': '12345',
                                           'public_language_id': '12345'})
            result = views.read(request)
            assert result == None
项目:annotran    作者:BirkbeckCTP    | 项目源码 | 文件源码
def test_read_private_group_unauthenticated():
    """
        This should call "read_group" when trying to read
        translations for the selected combination of a group, page, and language.
        read_group method is mocked, so when it is called a group object is returned.
    """
    with mock.patch('annotran.languages.models.Language') as language:
        propLang = PropertyMock(return_value=2897)
        type(language).id = propLang
        language.get_by_public_language_id = MagicMock(return_value=language)

        with mock.patch('h.groups.models.Group') as group:
            propGroup = PropertyMock(return_value=2897)
            type(group).id = propGroup
            group.get_by_pubid = MagicMock(return_value=group)

            request = _mock_request(authenticated_user=mock.Mock(groups=[]),
                                    matchdict={'public_group_id': '12345',
                                               'public_language_id': '12345'})
            result = views.read(request)
            assert result == None
项目:annotran    作者:BirkbeckCTP    | 项目源码 | 文件源码
def test_read_private_group_authenticated():
    """
        This should return None when trying to read
         translations for the selected combination of a group, page, and language, because group is
         not in the list of user's authorized groups.
    """
    with mock.patch('annotran.languages.models.Language') as language:
        propLang = PropertyMock(return_value=2897)
        type(language).id = propLang
        language.get_by_public_language_id = MagicMock(return_value=language)

        with mock.patch('h.groups.models.Group') as group:
            propGroup = PropertyMock(return_value=2897)
            type(group).id = propGroup
            group.get_by_pubid = MagicMock(return_value=group)

            annotran.groups.views.read_group = MagicMock(return_value=group)

            request = _mock_request(authenticated_user=mock.Mock(groups=[group]),
                                    matchdict={'public_group_id': '12345',
                                               'public_language_id': '12345'})
            result = views.read(request)
            assert result == group
项目:charms.openstack    作者:openstack    | 项目源码 | 文件源码
def test_assess_status_active(self):
        self.patch_object(chm_core.hookenv, 'status_set')
        # disable all of the check functions
        self.patch_target('check_if_paused', return_value=(None, None))
        self.patch_target('check_interfaces', return_value=(None, None))
        self.patch_target('custom_assess_status_check',
                          return_value=(None, None))
        self.patch_target('check_services_running', return_value=(None, None))
        self.patch_object(chm_core.hookenv, 'application_version_set')
        with mock.patch.object(AssessStatusCharm, 'application_version',
                               new_callable=mock.PropertyMock,
                               return_value="abc"):
            self.target._assess_status()
        self.status_set.assert_called_once_with('active', 'Unit is ready')
        self.application_version_set.assert_called_once_with("abc")
        # check all the check functions got called
        self.check_if_paused.assert_called_once_with()
        self.check_interfaces.assert_called_once_with()
        self.custom_assess_status_check.assert_called_once_with()
        self.check_services_running.assert_called_once_with()
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = LowestBalanceFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('2.00'),
            Decimal('5.00'),
            Decimal('2.00'),
            Decimal('91.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments_total_too_low(self):
        cls = LowestBalanceFirstMethod(Decimal('3.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        with pytest.raises(TypeError):
            cls.find_payments([s1, s2, s3, s4])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = HighestBalanceFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('2.00'),
            Decimal('5.00'),
            Decimal('86.00'),
            Decimal('7.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = HighestInterestRateFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).apr = PropertyMock(return_value=Decimal('0.0100'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).apr = PropertyMock(return_value=Decimal('0.0200'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).apr = PropertyMock(return_value=Decimal('0.0800'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).apr = PropertyMock(return_value=Decimal('0.0300'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('2.00'),
            Decimal('5.00'),
            Decimal('86.00'),
            Decimal('7.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments_total_too_low(self):
        cls = HighestInterestRateFirstMethod(Decimal('3.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        with pytest.raises(TypeError):
            cls.find_payments([s1, s2, s3, s4])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = LowestInterestRateFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).apr = PropertyMock(return_value=Decimal('0.0100'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).apr = PropertyMock(return_value=Decimal('0.0200'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).apr = PropertyMock(return_value=Decimal('0.0800'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).apr = PropertyMock(return_value=Decimal('0.0300'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('86.00'),
            Decimal('5.00'),
            Decimal('2.00'),
            Decimal('7.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments_total_too_low(self):
        cls = LowestInterestRateFirstMethod(Decimal('3.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        with pytest.raises(TypeError):
            cls.find_payments([s1, s2, s3, s4])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_next_with_transactions(self):
        b = Mock(spec_set=_BillingPeriod)
        b_next = Mock(spec_set=_BillingPeriod)
        type(b).next_period = PropertyMock(return_value=b_next)
        i = Mock(spec_set=_InterestCalculation)
        p = Mock(spec_set=_MinPaymentFormula)
        cls = CCStatement(
            i, Decimal('1.23'), p, b,
            end_balance=Decimal('1.50'), interest_amt=Decimal('0.27')
        )
        mock_stmt = Mock(spec_set=CCStatement)
        with patch('biweeklybudget.interest.CCStatement', autospec=True) as m:
            m.return_value = mock_stmt
            res = cls.next_with_transactions({'foo': 'bar'})
        assert res == mock_stmt
        assert m.mock_calls == [
            call(
                i, Decimal('1.50'), p, b_next, transactions={'foo': 'bar'}
            )
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_pay(self):
        b = Mock(spec_set=_BillingPeriod)
        b_next = Mock(spec_set=_BillingPeriod)
        type(b_next).payment_date = PropertyMock(return_value=date(2017, 1, 15))
        type(b).next_period = PropertyMock(return_value=b_next)
        i = Mock(spec_set=_InterestCalculation)
        p = Mock(spec_set=_MinPaymentFormula)
        cls = CCStatement(
            i, Decimal('1.23'), p, b,
            end_balance=Decimal('1.50'), interest_amt=Decimal('0.27')
        )
        mock_stmt = Mock(spec_set=CCStatement)
        with patch(
            'biweeklybudget.interest.CCStatement.next_with_transactions',
            autospec=True
        ) as m:
            m.return_value = mock_stmt
            res = cls.pay(Decimal('98.76'))
        assert res == mock_stmt
        assert m.mock_calls == [
            call(cls, {date(2017, 1, 15): Decimal('98.76')})
        ]
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def test__finish_no_info(self):
        """Tests docker runtime finish when not aborted or exited."""
        # Access to a protected member
        # pylint: disable=W0212
        treadmill.runtime.save_app(self.manifest, self.data_dir)

        client = mock.MagicMock()
        docker.from_env.return_value = client

        container = mock.MagicMock()
        client.containers.get.return_value = container

        type(container).attrs = mock.PropertyMock(return_value={
            'State': None
        })

        docker_runtime = runtime.DockerRuntime(self.tm_env, self.container_dir)

        # Should not throw any exception
        docker_runtime._finish()
项目:charm-aodh    作者:openstack    | 项目源码 | 文件源码
def test_setup_endpoint(self):
        self.patch_object(aodh.AodhCharm, 'service_name',
                          new_callable=mock.PropertyMock)
        self.patch_object(aodh.AodhCharm, 'region',
                          new_callable=mock.PropertyMock)
        self.patch_object(aodh.AodhCharm, 'public_url',
                          new_callable=mock.PropertyMock)
        self.patch_object(aodh.AodhCharm, 'internal_url',
                          new_callable=mock.PropertyMock)
        self.patch_object(aodh.AodhCharm, 'admin_url',
                          new_callable=mock.PropertyMock)
        self.service_name.return_value = 'type1'
        self.region.return_value = 'region1'
        self.public_url.return_value = 'public_url'
        self.internal_url.return_value = 'internal_url'
        self.admin_url.return_value = 'admin_url'
        keystone = mock.MagicMock()
        aodh.setup_endpoint(keystone)
        keystone.register_endpoints.assert_called_once_with(
            'type1', 'region1', 'public_url', 'internal_url', 'admin_url')
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def test_vimeo_get_default_transcripts(self):
        """
        Test Vimeo's default transcripts fetching (positive scenario).
        """
        # Arrange
        test_json_data = {"data": [{"test_key": "test_value"}]}
        success_message = _('Default transcripts successfully fetched from a video platform.')

        with patch.object(self.vimeo_player, 'api_client') as api_client_mock, \
                patch.object(self.vimeo_player, 'parse_vimeo_texttracks') as parse_texttracks_mock:
            type(api_client_mock).access_token = PropertyMock(return_value="test_token")
            api_client_mock.get.return_value = test_json_data
            parse_texttracks_mock.return_value = test_json_data["data"]

            # Act
            transcripts, message = self.vimeo_player.get_default_transcripts(video_id="test_video_id")

            # Assert
            api_client_mock.get.assert_called_with('https://api.vimeo.com/videos/test_video_id/texttracks')
            parse_texttracks_mock.assert_called_with(test_json_data["data"])

            self.assertIsInstance(transcripts, list)
            self.assertEqual(message, success_message)
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def test_vimeo_get_default_transcripts_no_token(self):
        """
        Test Vimeo's default transcripts fetching without provided API token.
        """
        # Arrange
        failure_message = _('No API credentials provided.')

        with patch.object(self.vimeo_player, 'api_client') as api_client_mock:
            type(api_client_mock).access_token = PropertyMock(return_value=None)

            # Act
            with self.assertRaises(vimeo.VimeoApiClientError) as raised:
                self.vimeo_player.get_default_transcripts()

                # Assert
                self.assertEqual(str(raised.exception), failure_message)
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def test_vimeo_get_default_transcripts_no_data(self):
        """
        Test Vimeo's default transcripts fetching with no data returned.
        """
        # Arrange
        test_json_data = []
        success_message = _('There are no default transcripts for the video on the video platform.')

        with patch.object(self.vimeo_player, 'api_client') as api_client_mock:
            type(api_client_mock).access_token = PropertyMock(return_value="test_token")
            api_client_mock.get.return_value = test_json_data

            # Act
            transcripts, message = self.vimeo_player.get_default_transcripts(video_id="test_video_id")

            # Assert
            self.assertEqual(transcripts, [])
            self.assertEqual(message, success_message)
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def test_vimeo_get_default_transcripts_parsing_failure(self):
        """
        Test Vimeo's default transcripts fetching with data parsing failure.
        """
        # Arrange
        test_json_data = {"data": [{"test_key": "test_value"}]}
        failure_message = "test_message"

        with patch.object(self.vimeo_player, 'api_client') as api_client_mock, \
                patch.object(self.vimeo_player, 'parse_vimeo_texttracks') as parse_texttracks_mock:
            type(api_client_mock).access_token = PropertyMock(return_value="test_token")
            api_client_mock.get.return_value = test_json_data
            parse_texttracks_mock.side_effect = vimeo.VimeoApiClientError(failure_message)

            # Act
            transcripts, message = self.vimeo_player.get_default_transcripts(video_id="test_video_id")

            # Assert
            self.assertEqual(transcripts, [])
            self.assertEqual(message, failure_message)
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def test_course_default_language(self):
        """
        Test xBlock's `course_default_language` property works properly.
        """
        with patch.object(self.xblock, 'runtime') as runtime_mock:
            service_mock = runtime_mock.service
            lang_mock = type(service_mock.return_value.get_course.return_value).language = PropertyMock(
                return_value='test_lang'
            )
            lang_mock.return_value = 'test_lang'
            self.xblock.course_id = course_id_mock = PropertyMock()

            self.assertEqual(self.xblock.course_default_language, 'test_lang')
            service_mock.assert_called_once_with(self.xblock, 'modulestore')
            lang_mock.assert_called_once()
            course_id_mock.assert_not_called()
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def test_srt_to_vtt(self, convert_caps_to_vtt_mock, requests_mock):
        """
        Test xBlock's srt-to-vtt convertation works properly.
        """
        # Arrange
        request_mock = MagicMock()
        convert_caps_to_vtt_mock.return_value = 'vtt transcripts'
        requests_mock.get.return_value.text = text_mock = PropertyMock()
        text_mock.return_value = 'vtt transcripts'

        # Act
        vtt_response = self.xblock.srt_to_vtt(request_mock, 'unused suffix')

        # Assert
        self.assertIsInstance(vtt_response, Response)
        self.assertEqual(vtt_response.text, 'vtt transcripts')
        convert_caps_to_vtt_mock.assert_called_once_with(text_mock)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_dispatch_issue_empty_comment(self, post_mock, get_mock, log_mock):
        """JiraOutput - Dispatch Success, Empty Comment"""
        # setup the request to find an existing issue
        get_mock.return_value.status_code = 200
        existing_issues = {'issues': [{'fields': {'summary': 'Bogus'}, 'id': '5000'}]}
        get_mock.return_value.json.return_value = existing_issues
        # setup the auth and successful creation responses
        auth_resp = {'session': {'name': 'cookie_name', 'value': 'cookie_value'}}
        type(post_mock.return_value).status_code = PropertyMock(side_effect=[200, 200, 200])
        post_mock.return_value.json.side_effect = [auth_resp, {}, {'id': 5000}]

        assert_true(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                              rule_name='rule_name',
                                              alert=get_alert()))

        log_mock.assert_called_with('Successfully sent alert to %s', self.SERVICE)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_issue_creation_failure(self, post_mock, get_mock, log_mock):
        """JiraOutput - Issue Creation, Failure"""
        # setup the successful search response - no results
        get_mock.return_value.status_code = 200
        get_mock.return_value.json.return_value = {'issues': []}
        # setup successful auth response and failed issue creation
        type(post_mock.return_value).status_code = PropertyMock(side_effect=[200, 400])
        auth_resp = {'session': {'name': 'cookie_name', 'value': 'cookie_value'}}
        post_mock.return_value.content = 'some bad content'
        post_mock.return_value.json.side_effect = [auth_resp, dict()]

        assert_false(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                               rule_name='rule_name',
                                               alert=get_alert()))

        log_mock.assert_called_with('Failed to send alert to %s', self.SERVICE)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_issue_creation_empty_search(self, post_mock, get_mock, log_mock):
        """JiraOutput - Issue Creation, Failure Empty Search"""
        # setup the successful search response - empty response
        get_mock.return_value.status_code = 200
        get_mock.return_value.json.return_value = {}
        # setup successful auth response and failed issue creation
        type(post_mock.return_value).status_code = PropertyMock(side_effect=[200, 400])
        auth_resp = {'session': {'name': 'cookie_name', 'value': 'cookie_value'}}
        post_mock.return_value.content = 'some bad content'
        post_mock.return_value.json.side_effect = [auth_resp, dict()]

        assert_false(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                               rule_name='rule_name',
                                               alert=get_alert()))

        log_mock.assert_called_with('Failed to send alert to %s', self.SERVICE)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_comment_creation_failure(self, post_mock, get_mock, log_mock):
        """JiraOutput - Comment Creation, Failure"""
        # setup successful search response
        get_mock.return_value.status_code = 200
        existing_issues = {'issues': [{'fields': {'summary': 'Bogus'}, 'id': '5000'}]}
        get_mock.return_value.json.return_value = existing_issues
        # setup successful auth, failed comment creation, and successful issue creation
        type(post_mock.return_value).status_code = PropertyMock(side_effect=[200, 400, 200])
        auth_resp = {'session': {'name': 'cookie_name', 'value': 'cookie_value'}}
        post_mock.return_value.json.side_effect = [auth_resp, {'id': 6000}]

        assert_true(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                              rule_name='rule_name',
                                              alert=get_alert()))

        log_mock.assert_called_with('Encountered an error when adding alert to existing Jira '
                                    'issue %s. Attempting to create new Jira issue.', 5000)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_dispatch_success_good_user(self, get_mock, post_mock, log_mock):
        """PagerDutyIncidentOutput - Dispatch Success, Good User"""
        # /users, /users, /services
        type(get_mock.return_value).status_code = PropertyMock(side_effect=[200, 200, 200])
        json_user = {'users': [{'id': 'valid_user_id'}]}
        json_service = {'services': [{'id': 'service_id'}]}
        get_mock.return_value.json.side_effect = [json_user, json_user, json_service]

        # /incidents
        post_mock.return_value.status_code = 200

        ctx = {'pagerduty-incident': {'assigned_user': 'valid_user'}}

        assert_true(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                              rule_name='rule_name',
                                              alert=get_alert(context=ctx)))

        log_mock.assert_called_with('Successfully sent alert to %s', self.SERVICE)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_dispatch_success_good_policy(self, get_mock, post_mock, log_mock):
        """PagerDutyIncidentOutput - Dispatch Success, Good Policy"""
         # /users, /escalation_policies, /services
        type(get_mock.return_value).status_code = PropertyMock(side_effect=[200, 200, 200])
        json_user = {'users': [{'id': 'user_id'}]}
        json_policy = {'escalation_policies': [{'id': 'policy_id'}]}
        json_service = {'services': [{'id': 'service_id'}]}
        get_mock.return_value.json.side_effect = [json_user, json_policy, json_service]

        # /incidents
        post_mock.return_value.status_code = 200

        ctx = {'pagerduty-incident': {'assigned_policy': 'valid_policy'}}

        assert_true(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                              rule_name='rule_name',
                                              alert=get_alert(context=ctx)))

        log_mock.assert_called_with('Successfully sent alert to %s', self.SERVICE)
项目:streamalert    作者:airbnb    | 项目源码 | 文件源码
def test_dispatch_success_no_context(self, get_mock, post_mock, log_mock):
        """PagerDutyIncidentOutput - Dispatch Success, No Context"""
        # /users, /escalation_policies, /services
        type(get_mock.return_value).status_code = PropertyMock(side_effect=[200, 200, 200])
        json_user = {'users': [{'id': 'user_id'}]}
        json_policy = {'escalation_policies': [{'id': 'policy_id'}]}
        json_service = {'services': [{'id': 'service_id'}]}
        get_mock.return_value.json.side_effect = [json_user, json_policy, json_service]

        # /incidents
        post_mock.return_value.status_code = 200

        assert_true(self._dispatcher.dispatch(descriptor=self.DESCRIPTOR,
                                              rule_name='rule_name',
                                              alert=get_alert()))

        log_mock.assert_called_with('Successfully sent alert to %s', self.SERVICE)