我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest2.SkipTest()。
def download_test_files_if_not_present(self): ''' Download %s file at G-node for testing url_for_tests is global at beginning of this file. ''' % self.ioclass.__name__ if not self.use_network: raise unittest.SkipTest("Requires download of data from the web") url = url_for_tests+self.shortname try: make_all_directories(self.files_to_download, self.local_test_dir) download_test_file(self.files_to_download, self.local_test_dir, url) except IOError as exc: raise unittest.SkipTest(exc)
def test_paged_result_handling(self): if PROTOCOL_VERSION < 2: raise unittest.SkipTest("Paging requires native protocol 2+, currently using: {0}".format(PROTOCOL_VERSION)) # addresses #225 class PagingTest(Model): id = columns.Integer(primary_key=True) val = columns.Integer() sync_table(PagingTest) PagingTest.create(id=1, val=1) PagingTest.create(id=2, val=2) session = get_session() with mock.patch.object(session, 'default_fetch_size', 1): results = PagingTest.objects()[:] assert len(results) == 2
def setUp(self): """ Test is skipped if run with native protocol version <4 """ self.support_v5 = True if PROTOCOL_VERSION < 4: raise unittest.SkipTest( "Native protocol 4,0+ is required for custom payloads, currently using %r" % (PROTOCOL_VERSION,)) try: self.cluster = Cluster(protocol_version=ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True) self.session = self.cluster.connect() except NoHostAvailable: log.info("Protocol Version 5 not supported,") self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.session = self.cluster.connect() self.support_v5 = False self.nodes_currently_failing = [] self.node1, self.node2, self.node3 = get_cluster().nodes.values()
def setUp(self): """ Test is skipped if run with cql version < 2 """ if PROTOCOL_VERSION < 2: raise unittest.SkipTest( "Protocol 2.0+ is required for Lightweight transactions, currently testing against %r" % (PROTOCOL_VERSION,)) self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.session = self.cluster.connect() ddl = ''' CREATE TABLE test3rf.lwt ( k int PRIMARY KEY, v int )''' self.session.execute(ddl)
def test_cql_compatibility(self): if CASS_SERVER_VERSION >= (3, 0): raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+") # having more than one non-PK column is okay if there aren't any # clustering columns create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True) self.session.execute(create_statement) tablemeta = self.get_table_metadata() self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key]) self.assertEqual([], tablemeta.clustering_key) self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys())) self.assertTrue(tablemeta.is_cql_compatible) # ... but if there are clustering columns, it's not CQL compatible. # This is a hacky way to simulate having clustering columns. tablemeta.clustering_key = ["foo", "bar"] tablemeta.columns["foo"] = None tablemeta.columns["bar"] = None self.assertFalse(tablemeta.is_cql_compatible)
def test_replicas(self): """ Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace """ if murmur3 is None: raise unittest.SkipTest('the murmur3 extension is not available') cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), []) cluster.connect('test3rf') self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', six.b('key'))), []) host = list(cluster.metadata.get_replicas('test3rf', six.b('key')))[0] self.assertEqual(host.datacenter, 'dc1') self.assertEqual(host.rack, 'r1') cluster.shutdown()
def test_paged_result_handling(self): if PROTOCOL_VERSION < 2: raise unittest.SkipTest( "Paging requires native protocol 2+, " "currently using: {0}".format(PROTOCOL_VERSION) ) # addresses #225 class PagingTest(Model): id = columns.Integer(primary_key=True) val = columns.Integer() sync_table(self.conn, PagingTest) PagingTest.create(self.conn, id=1, val=1) PagingTest.create(self.conn, id=2, val=2) with mock.patch.object(self.conn.session, 'default_fetch_size', 1): results = PagingTest.objects().find_all(self.conn) assert len(results) == 2 drop_table(self.conn, PagingTest)
def test_skiptest_in_setupclass(self): class Test(unittest2.TestCase): @classmethod def setUpClass(cls): raise unittest2.SkipTest('foo') def test_one(self): pass def test_two(self): pass result = self.runTests(Test) self.assertEqual(result.testsRun, 0) self.assertEqual(len(result.errors), 0) self.assertEqual(len(result.skipped), 1) skipped = result.skipped[0][0] self.assertEqual(str(skipped), 'setUpClass (%s.%s)' % (__name__, getattr(Test, '__qualname__', Test.__name__)))
def test_skiptest_in_setupmodule(self): class Test(unittest2.TestCase): def test_one(self): pass def test_two(self): pass class Module(object): @staticmethod def setUpModule(): raise unittest2.SkipTest('foo') Test.__module__ = 'Module' sys.modules['Module'] = Module result = self.runTests(Test) self.assertEqual(result.testsRun, 0) self.assertEqual(len(result.errors), 0) self.assertEqual(len(result.skipped), 1) skipped = result.skipped[0][0] self.assertEqual(str(skipped), 'setUpModule (Module)')
def test_discover_with_init_module_that_raises_SkipTest_on_import(self): vfs = {abspath('/foo'): ['my_package'], abspath('/foo/my_package'): ['__init__.py', 'test_module.py']} self.setup_import_issue_package_tests(vfs) import_calls = [] def _get_module_from_name(name): import_calls.append(name) raise unittest.SkipTest('skipperoo') loader = unittest.TestLoader() loader._get_module_from_name = _get_module_from_name suite = loader.discover(abspath('/foo')) self.assertIn(abspath('/foo'), sys.path) self.assertEqual(suite.countTestCases(), 1) result = unittest.TestResult() suite.run(result) self.assertEqual(len(result.skipped), 1) self.assertEqual(result.testsRun, 1) self.assertEqual(import_calls, ['my_package']) # Check picklability for proto in range(pickle.HIGHEST_PROTOCOL + 1): pickle.loads(pickle.dumps(suite, proto))
def assert_garbage_collect_test_after_run(self, TestSuiteClass): if not unittest.BaseTestSuite._cleanup: raise unittest.SkipTest("Suite cleanup is disabled") class Foo(unittest.TestCase): def test_nothing(self): pass test = Foo('test_nothing') wref = weakref.ref(test) suite = TestSuiteClass([wref()]) suite.run(unittest.TestResult()) del test # for the benefit of non-reference counting implementations gc.collect() self.assertEqual(suite._tests, [None]) self.assertIsNone(wref())
def testSocketAuthInstallPlugin(self): # needs plugin. lets install it. cur = self.connections[0].cursor() try: cur.execute("install plugin auth_socket soname 'auth_socket.so'") TestAuthentication.socket_found = True self.socket_plugin_name = 'auth_socket' self.realtestSocketAuth() except pymysql.err.InternalError: try: cur.execute("install soname 'auth_socket'") TestAuthentication.socket_found = True self.socket_plugin_name = 'unix_socket' self.realtestSocketAuth() except pymysql.err.InternalError: TestAuthentication.socket_found = False raise unittest2.SkipTest('we couldn\'t install the socket plugin') finally: if TestAuthentication.socket_found: cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self): args = self.databases[0].copy() args["charset"] = "utf8mb4" conn = pymysql.connect(**args) if not self.mysql_server_is(conn, (5, 7, 0)): raise SkipTest("JSON type is not supported on MySQL <= 5.6") self.safe_create_table(conn, "test_json", """\ create table test_json ( id int not null, json JSON not null, primary key (id) );""") cur = conn.cursor() json_str = u'{"hello": "?????"}' cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,)) cur.execute("SELECT `json` from `test_json` WHERE `id`=42") res = cur.fetchone()[0] self.assertEqual(json.loads(res), json.loads(json_str)) cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,)) res = cur.fetchone()[0] self.assertEqual(json.loads(res), json.loads(json_str))
def requires(resource, msg=None): """Raise ResourceDenied if the specified resource is not available. If the caller's module is __main__ then automatically return True. The possibility of False being returned occurs when regrtest.py is executing. """ if resource == 'gui' and not _is_gui_available(): raise unittest.SkipTest("Cannot use the 'gui' resource") # see if the caller's module is __main__ - if so, treat as if # the resource was set if sys._getframe(1).f_globals.get("__name__") == "__main__": return if not is_resource_enabled(resource): if msg is None: msg = "Use of the %r resource not enabled" % resource raise ResourceDenied(msg)
def bigaddrspacetest(f): """Decorator for tests that fill the address space.""" def wrapper(self): if max_memuse < MAX_Py_ssize_t: if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: raise unittest.SkipTest( "not enough memory: try a 32-bit build instead") else: raise unittest.SkipTest( "not enough memory: %.1fG minimum needed" % (MAX_Py_ssize_t / (1024 ** 3))) else: return f(self) return wrapper #======================================================================= # unittest integration.
def get_test_client(nowait=False, **kwargs): # construct kwargs from the environment kw = {'timeout': 30} if 'TEST_ES_CONNECTION' in os.environ: from elasticsearch import connection kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION']) kw.update(kwargs) client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw) # wait for yellow status for _ in range(1 if nowait else 100): try: client.cluster.health(wait_for_status='yellow') return client except ConnectionError: time.sleep(.1) else: # timeout raise SkipTest("Elasticsearch failed to start.")
def setUpClass(cls): ReusedPySparkTestCase.setUpClass() cls.tempdir = tempfile.NamedTemporaryFile(delete=False) try: cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() except py4j.protocol.Py4JError: cls.tearDownClass() raise unittest.SkipTest("Hive is not available") except TypeError: cls.tearDownClass() raise unittest.SkipTest("Hive is not available") os.unlink(cls.tempdir.name) _scala_HiveContext =\ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext) cls.testData = [Row(key=i, value=str(i)) for i in range(100)] cls.df = cls.sc.parallelize(cls.testData).toDF()
def quiet_run(self, result, func, *args, **kwargs): try: func(*args, **kwargs) except (KeyboardInterrupt, SystemExit): raise except unittest.SkipTest as e: if hasattr(result, 'addSkip'): result.addSkip(self, str(e)) else: warnings.warn("TestResult has no addSkip method, skips not reported", RuntimeWarning, 2) result.addSuccess(self) return False except: result.addError(self, self.__exc_info()) return False return True
def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'files_for_testing_neo', 'klustakwik/test1') if not os.path.exists(self.dirname): raise unittest.SkipTest('data directory does not exist: ' + self.dirname)
def test1(self): """Tests that files can be loaded by basename""" kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename')) if not BaseTestIO.use_network: raise unittest.SkipTest("Requires download of data from the web") fetfiles = kio._fp.read_filenames('fet') self.assertEqual(len(fetfiles), 2) self.assertEqual(os.path.abspath(fetfiles[0]), os.path.abspath(os.path.join(self.dirname, 'basename.fet.0'))) self.assertEqual(os.path.abspath(fetfiles[1]), os.path.abspath(os.path.join(self.dirname, 'basename.fet.1')))
def test3(self): """Tests that files can be loaded by basename2""" kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename2')) if not BaseTestIO.use_network: raise unittest.SkipTest("Requires download of data from the web") clufiles = kio._fp.read_filenames('clu') self.assertEqual(len(clufiles), 1) self.assertEqual(os.path.abspath(clufiles[1]), os.path.abspath(os.path.join(self.dirname, 'basename2.clu.1')))
def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'files_for_testing_neo', 'klustakwik/test2') if not os.path.exists(self.dirname): raise unittest.SkipTest('data directory does not exist: ' + self.dirname)
def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'files_for_testing_neo', 'klustakwik/test3') if not os.path.exists(self.dirname): raise unittest.SkipTest('data directory does not exist: ' + self.dirname)
def setUp(self): super(CommonTests, self).setUp() data_dir = os.path.join(self.local_test_dir, 'Cheetah_v{}'.format(self.cheetah_version)) self.sn = os.path.join(data_dir, 'original_data') self.pd = os.path.join(data_dir, 'plain_data') if not os.path.exists(self.sn): raise unittest.SkipTest('data file does not exist:' + self.sn)
def setUp(self): BaseTestIO.setUp(self) if sys.platform.startswith('win'): distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_3.7b.zip' localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_3.7b.zip') if not os.path.exists(localfile): urlretrieve(distantfile, localfile) if platform.architecture()[0].startswith('64'): self.dllname = os.path.join(tempfile.gettempdir(),'Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary64.dll') if not os.path.exists(self.dllname): zip = zipfile.ZipFile(localfile) zip.extract('Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary64.dll', path = tempfile.gettempdir()) else: self.dllname = os.path.join(tempfile.gettempdir(),'Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary.dll') if not os.path.exists(self.dllname): zip = zipfile.ZipFile(localfile) zip.extract('Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary.dll', path = tempfile.gettempdir()) elif sys.platform.startswith('linux'): if platform.architecture()[0].startswith('64'): distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_Linux64_3.7b.tar.gz' localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_Linux64_3.7b.tar.gz') else: distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_Linux32_3.7b.tar.gz' localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_Linux32_3.7b.tar.gz') if not os.path.exists(localfile): urlretrieve(distantfile, localfile) self.dllname = os.path.join(tempfile.gettempdir(),'nsMCDLibrary/nsMCDLibrary.so') if not os.path.exists(self.dllname): tar = tarfile.open(localfile) tar.extract('nsMCDLibrary/nsMCDLibrary.so', path = tempfile.gettempdir()) else: raise unittest.SkipTest("Not currently supported on OS X")