我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用django.test.TransactionTestCase()。
def test_process_no_sieve(self): """ Tests the process method for a chute with no sieve. """ mock_doc_id = 1 email = {'Message-ID': 'abc', 'Subject': 'This is an Urgent Alert'} doc_obj = DocumentObj(data=email) mailchute = MailChute.objects.get(pk=3) mailchute.enabled = True mailchute.munger.process = Mock(return_value=mock_doc_id) doc_id = mailchute.process(doc_obj) mailchute.munger.process.assert_called_once_with(doc_obj) self.assertEqual(doc_id, mock_doc_id) # NOTE: use TransactionTestCase to handle threading
def test_multiprocess_muzzled(self): """ Tests muzzling when multiple duplicate Alerts are being processed concurrently. """ with patch('alerts.models.Alert._format_title', return_value=self.data['content']['subject']): incident_num = 20 alert_count = Alert.objects.count() args = [self.doc_obj] alert = self.email_wdog.process(*args) # check that a new Alert was created self.assertEqual(Alert.objects.count(), alert_count + 1) # NOTE: we can't use multiprocessing with Mocks, # so we have to settle for using threading to mimic concurrency threads = [] for dummy_index in range(incident_num): new_thread = threading.Thread(target=self.email_wdog.process, args=args) threads.append(new_thread) new_thread.start() for thread in threads: thread.join() # NOTE: we can't check Alert counts because saved Alerts # won't be committed in the TransactionTestCase # but we can check that the previous Alert was incremented alert = Alert.objects.get(pk=alert.pk) self.assertEqual(alert.incidents, incident_num + 1)
def __call__(self, test_func): from django.test import TransactionTestCase if isinstance(test_func, type): if not issubclass(test_func, TransactionTestCase): raise Exception( "Only subclasses of Django SimpleTestCase " "can be decorated with override_settings") original_pre_setup = test_func._pre_setup original_post_teardown = test_func._post_teardown def _pre_setup(innerself): self.enable() original_pre_setup(innerself) def _post_teardown(innerself): original_post_teardown(innerself) self.disable() test_func._pre_setup = _pre_setup test_func._post_teardown = _post_teardown return test_func else: @wraps(test_func) def inner(*args, **kwargs): with self: return test_func(*args, **kwargs) return inner
def test_run_migrations_success(self): """ Test the migration success path. """ # Using TransactionTestCase sets up the migrations as set up for the test. # Reset the release_util migrations to the very beginning - i.e. no tables. call_command("migrate", "release_util", "zero", verbosity=0) input_yaml = """ database: 'default', migrations: - [release_util, 0001_initial] - [release_util, 0002_second] - [release_util, 0003_third] initial_states: - [release_util, zero] """ output = [ { 'database': 'default', 'duration': None, 'failed_migration': None, 'migration': 'all', 'output': None, 'succeeded_migrations': [ ['release_util', '0001_initial'], ['release_util', '0002_second'], ['release_util', '0003_third'], ], 'traceback': None, 'succeeded': True, }, ] out_file = tempfile.NamedTemporaryFile(suffix='.yml') in_file = tempfile.NamedTemporaryFile(suffix='.yml') in_file.write(input_yaml.encode('utf-8')) in_file.flush() # Check the stdout output against the expected output. self._check_command_output( cmd='run_migrations', cmd_args=(in_file.name,), cmd_kwargs={'output_file': out_file.name}, output=output, ) in_file.close() # Check the contents of the output file against the expected output. with open(out_file.name, 'r') as f: output_yaml = f.read() parsed_yaml = yaml.safe_load(output_yaml) self.assertTrue(isinstance(parsed_yaml, list)) parsed_yaml = self._null_certain_fields(parsed_yaml) self.assertEqual(yaml.dump(output), yaml.dump(parsed_yaml)) out_file.close()
def test_run_migrations_failure(self, migration_name, migration_output): """ Test the first, second, and last migration failing. """ # Using TransactionTestCase sets up the migrations as set up for the test. # Reset the release_util migrations to the very beginning - i.e. no tables. call_command("migrate", "release_util", "zero", verbosity=0) input_yaml = """ database: 'default', migrations: - [release_util, 0001_initial] - [release_util, 0002_second] - [release_util, 0003_third] initial_states: - [release_util, zero] """ out_file = tempfile.NamedTemporaryFile(suffix='.yml') in_file = tempfile.NamedTemporaryFile(suffix='.yml') in_file.write(input_yaml.encode('utf-8')) in_file.flush() # A bogus class for creating a migration object that will raise a CommandError. class MigrationFail(object): atomic = False def state_forwards(self, app_label, state): pass def database_forwards(self, app_label, schema_editor, from_state, to_state): raise CommandError("Yo") # Insert the bogus object into the first operation of a migration. current_migration_list = release_util.tests.migrations.test_migrations.__dict__[migration_name].__dict__['Migration'].operations current_migration_list.insert(0, MigrationFail()) try: # Check the stdout output. self._check_command_output( cmd="run_migrations", cmd_args=(in_file.name,), cmd_kwargs={'output_file': out_file.name}, output=migration_output, err_output="Migration error: Migration failed for app 'release_util' - migration '{}'.".format(migration_name), exit_value=1 ) finally: # Whether the test passes or fails, always pop the failure migration of the list. current_migration_list.pop(0) in_file.close() out_file.close()