我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.transaction.TransactionManagementError()。
def schedule_changed(self): try: # If MySQL is running with transaction isolation level # REPEATABLE-READ (default), then we won't see changes done by # other transactions until the current transaction is # committed (Issue #41). try: transaction.commit() except transaction.TransactionManagementError: pass # not in transaction management. last, ts = self._last_timestamp, self.Changes.last_change() except DatabaseError as exc: logger.exception('Database gave error: %r', exc) return False try: if ts and ts > (last if last else ts): return True finally: self._last_timestamp = ts return False
def validate_in_transaction(connection): """Ensure that `connection` is within a transaction. This only enquires as to Django's perspective on the situation. It does not actually check that the database agrees with Django. :raise TransactionManagementError: If no transaction is in progress. """ if not in_transaction(connection): raise TransactionManagementError( # XXX: GavinPanella 2015-08-07 bug=1482563: This error message is # specific to lobjects, but this lives in a general utils module. "PostgreSQL's large object support demands that all interactions " "are done in a transaction. Further, lobject() has been known to " "segfault when used outside of a transaction. This assertion has " "prevented the use of lobject() outside of a transaction. Please " "investigate.")
def test_on_signal__no_transaction(self, partial, on_commit): # test with signal_honor_transaction and not in transaction event = self.mock_event('x.y', sender_field=None) event.signal_honors_transaction = True event._on_signal = Mock(name='_on_signal') instance = self.Model() on_commit.side_effect = TransactionManagementError() assert django.TransactionManagementError is TransactionManagementError event.on_signal(instance, kw=1) partial.assert_called_once_with(event._on_signal, instance, {'kw': 1}) on_commit.assert_called_once_with(partial()) partial.return_value.assert_called_once_with()
def on_commit(self, fun, *args, **kwargs): if args or kwargs: fun = partial(fun, *args, **kwargs) if on_commit is not None: try: return on_commit(fun) except TransactionManagementError: pass # not in transaction management, execute now. return fun()
def get_rollback(self): """ Get the "needs rollback" flag -- for *advanced use* only. """ if not self.in_atomic_block: raise TransactionManagementError( "The rollback flag doesn't work outside of an 'atomic' block.") return self.needs_rollback
def set_rollback(self, rollback): """ Set or unset the "needs rollback" flag -- for *advanced use* only. """ if not self.in_atomic_block: raise TransactionManagementError( "The rollback flag doesn't work outside of an 'atomic' block.") self.needs_rollback = rollback
def validate_no_atomic_block(self): """ Raise an error if an atomic block is active. """ if self.in_atomic_block: raise TransactionManagementError( "This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self): if self.needs_rollback: raise TransactionManagementError( "An error occurred in the current transaction. You can't " "execute queries until the end of the 'atomic' block.") # ##### Foreign key constraints checks handling #####
def on_commit(self, func): if self.in_atomic_block: # Transaction in progress; save for execution on commit. self.run_on_commit.append((set(self.savepoint_ids), func)) elif not self.get_autocommit(): raise TransactionManagementError('on_commit() cannot be used in manual transaction management') else: # No transaction in progress and in autocommit mode; execute # immediately. func()
def release_lock(name): """ Release a lock for all. Note that the lock will be released even if it was never acquired. """ # Note that in unit tests, and in case the wrapped code raises an # IntegrityError, releasing the cache will result in a # TransactionManagementError. This is because unit tests run inside atomic # blocks. We cannot execute queries inside an atomic block if a transaction # needs to be rollbacked. try: cache.delete(name) except TransactionManagementError: logger.error("Could not release lock %s", name)
def atomic_save(model): try: with transaction.atomic(): model.save() except transaction.TransactionManagementError: # sqlite isn't happy model.save()
def test__crashes_on_enter_if_hooks_exist(self): hook = Deferred() post_commit_hooks.add(hook) with ExpectedException(TransactionManagementError): with post_commit_hooks: pass # The hook has been cancelled, but CancelledError is suppressed in # hooks, so we don't see it here. self.assertThat(hook, IsFiredDeferred()) # The hook list is cleared so that the exception is raised only once. self.assertThat(post_commit_hooks.hooks, HasLength(0))
def test__crashes_if_hooks_exist_before_entering_transaction(self): post_commit(lambda failure: None) decorated_function = orm.transactional(lambda: None) self.assertRaises(TransactionManagementError, decorated_function) # The hook list is cleared so that the exception is raised only once. self.assertThat(post_commit_hooks.hooks, HasLength(0))
def test__crashes_if_not_already_within_transaction(self): with ExpectedException(TransactionManagementError): with savepoint(): pass
def test__explodes_when_no_transaction_is_active(self): self.assertRaises( TransactionManagementError, validate_in_transaction, connection)
def __enter__(self): if len(self.hooks) > 0: # Capture a textual description of the hooks to help us understand # why this is about to blow oodles of egg custard in our faces. description = "\n".join(gen_description_of_hooks(self.hooks)) # Crash when there are orphaned post-commit hooks. These might # only turn up in testing, where transactions are managed by the # test framework instead of this decorator. We need to fail hard # -- not just warn about it -- to ensure it gets fixed. self.reset() raise TransactionManagementError( "Orphaned post-commit hooks found:\n" + description)
def after_feature(context, feature): context.test_case.tearDown() from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerClient from chroma_core.models import Command JobSchedulerClient.create_host_ssh = context.old_create_host_ssh JobSchedulerClient.create_targets = context.old_create_targets JobSchedulerClient.create_filesystem = context.old_create_filesystem JobSchedulerClient.command_run_jobs = context.old_run_jobs Command.set_state = context.old_set_state # If one of the steps fails, an exception will be thrown and the # transaction rolled back. In which case, this teardown will cause # a TME which we don't care about. from django.db.transaction import TransactionManagementError as TME try: context.test_case._post_teardown() except TME: pass context.runner.teardown_databases(context.old_db_config) context.runner.teardown_test_environment() # As of Django 1.4, teardown_databases() no longer restores the # original db name in the connection's settings dict. The reasoning # is documented in https://code.djangoproject.com/ticket/10868, and # their concerns are understandable. In our case, however, we're not # going on to do anything here which might affect the production DB. # Therefore, the following hack restores pre-1.4 behavior: for connection, old_name, destroy in context.old_db_config[0]: connection.settings_dict['NAME'] = old_name from chroma_cli.api import ApiHandle ApiHandle.ApiClient = context.old_api_client from chroma_api.authentication import CsrfAuthentication CsrfAuthentication.is_authenticated = context.old_is_authenticated from django.contrib.contenttypes.models import ContentType ContentType.objects.clear_cache() from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerClient JobSchedulerClient.test_host_contact = context.old_test_host_contact ##--def before_scenario(context, scenario): #-- # Set up the scenario test environment #-- context.runner.setup_test_environment() #-- # We must set up and tear down the entire database between #-- # scenarios. We can't just use db transactions, as Django's #-- # TestClient does, if we're doing full-stack tests with Mechanize, #-- # because Django closes the db connection after finishing the HTTP #-- # response. #-- context.old_db_config = context.runner.setup_databases() #--def after_scenario(context, scenario): #-- # Tear down the scenario test environment. #-- context.runner.teardown_databases(context.old_db_config) #-- context.runner.teardown_test_environment() #-- # Bob's your uncle.