Python django.db.transaction 模块,TransactionManagementError() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.transaction.TransactionManagementError()。
def schedule_changed(self):
try:
# If MySQL is running with transaction isolation level
# REPEATABLE-READ (default), then we won't see changes done by
# other transactions until the current transaction is
# committed (Issue #41).
try:
transaction.commit()
except transaction.TransactionManagementError:
pass # not in transaction management.
last, ts = self._last_timestamp, self.Changes.last_change()
except DatabaseError as exc:
logger.exception('Database gave error: %r', exc)
return False
try:
if ts and ts > (last if last else ts):
return True
finally:
self._last_timestamp = ts
return False
def validate_in_transaction(connection):
"""Ensure that `connection` is within a transaction.
This only enquires as to Django's perspective on the situation. It does
not actually check that the database agrees with Django.
:raise TransactionManagementError: If no transaction is in progress.
"""
if not in_transaction(connection):
raise TransactionManagementError(
# XXX: GavinPanella 2015-08-07 bug=1482563: This error message is
# specific to lobjects, but this lives in a general utils module.
"PostgreSQL's large object support demands that all interactions "
"are done in a transaction. Further, lobject() has been known to "
"segfault when used outside of a transaction. This assertion has "
"prevented the use of lobject() outside of a transaction. Please "
"investigate.")
def test_on_signal__no_transaction(self, partial, on_commit):
# test with signal_honor_transaction and not in transaction
event = self.mock_event('x.y', sender_field=None)
event.signal_honors_transaction = True
event._on_signal = Mock(name='_on_signal')
instance = self.Model()
on_commit.side_effect = TransactionManagementError()
assert django.TransactionManagementError is TransactionManagementError
event.on_signal(instance, kw=1)
partial.assert_called_once_with(event._on_signal, instance, {'kw': 1})
on_commit.assert_called_once_with(partial())
partial.return_value.assert_called_once_with()
def on_commit(self, fun, *args, **kwargs):
if args or kwargs:
fun = partial(fun, *args, **kwargs)
if on_commit is not None:
try:
return on_commit(fun)
except TransactionManagementError:
pass # not in transaction management, execute now.
return fun()
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def release_lock(name):
"""
Release a lock for all. Note that the lock will be released even if it was
never acquired.
"""
# Note that in unit tests, and in case the wrapped code raises an
# IntegrityError, releasing the cache will result in a
# TransactionManagementError. This is because unit tests run inside atomic
# blocks. We cannot execute queries inside an atomic block if a transaction
# needs to be rollbacked.
try:
cache.delete(name)
except TransactionManagementError:
logger.error("Could not release lock %s", name)
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def atomic_save(model):
try:
with transaction.atomic():
model.save()
except transaction.TransactionManagementError:
# sqlite isn't happy
model.save()
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def on_commit(self, func):
if self.in_atomic_block:
# Transaction in progress; save for execution on commit.
self.run_on_commit.append((set(self.savepoint_ids), func))
elif not self.get_autocommit():
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
else:
# No transaction in progress and in autocommit mode; execute
# immediately.
func()
def test__crashes_on_enter_if_hooks_exist(self):
hook = Deferred()
post_commit_hooks.add(hook)
with ExpectedException(TransactionManagementError):
with post_commit_hooks:
pass
# The hook has been cancelled, but CancelledError is suppressed in
# hooks, so we don't see it here.
self.assertThat(hook, IsFiredDeferred())
# The hook list is cleared so that the exception is raised only once.
self.assertThat(post_commit_hooks.hooks, HasLength(0))
def test__crashes_if_hooks_exist_before_entering_transaction(self):
post_commit(lambda failure: None)
decorated_function = orm.transactional(lambda: None)
self.assertRaises(TransactionManagementError, decorated_function)
# The hook list is cleared so that the exception is raised only once.
self.assertThat(post_commit_hooks.hooks, HasLength(0))
def test__crashes_if_not_already_within_transaction(self):
with ExpectedException(TransactionManagementError):
with savepoint():
pass
def test__explodes_when_no_transaction_is_active(self):
self.assertRaises(
TransactionManagementError,
validate_in_transaction, connection)
def __enter__(self):
if len(self.hooks) > 0:
# Capture a textual description of the hooks to help us understand
# why this is about to blow oodles of egg custard in our faces.
description = "\n".join(gen_description_of_hooks(self.hooks))
# Crash when there are orphaned post-commit hooks. These might
# only turn up in testing, where transactions are managed by the
# test framework instead of this decorator. We need to fail hard
# -- not just warn about it -- to ensure it gets fixed.
self.reset()
raise TransactionManagementError(
"Orphaned post-commit hooks found:\n" + description)
def get_rollback(self):
"""
Get the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
return self.needs_rollback
def set_rollback(self, rollback):
"""
Set or unset the "needs rollback" flag -- for *advanced use* only.
"""
if not self.in_atomic_block:
raise TransactionManagementError(
"The rollback flag doesn't work outside of an 'atomic' block.")
self.needs_rollback = rollback
def validate_no_atomic_block(self):
"""
Raise an error if an atomic block is active.
"""
if self.in_atomic_block:
raise TransactionManagementError(
"This is forbidden when an 'atomic' block is active.")
def validate_no_broken_transaction(self):
if self.needs_rollback:
raise TransactionManagementError(
"An error occurred in the current transaction. You can't "
"execute queries until the end of the 'atomic' block.")
# ##### Foreign key constraints checks handling #####
def after_feature(context, feature):
context.test_case.tearDown()
from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerClient
from chroma_core.models import Command
JobSchedulerClient.create_host_ssh = context.old_create_host_ssh
JobSchedulerClient.create_targets = context.old_create_targets
JobSchedulerClient.create_filesystem = context.old_create_filesystem
JobSchedulerClient.command_run_jobs = context.old_run_jobs
Command.set_state = context.old_set_state
# If one of the steps fails, an exception will be thrown and the
# transaction rolled back. In which case, this teardown will cause
# a TME which we don't care about.
from django.db.transaction import TransactionManagementError as TME
try:
context.test_case._post_teardown()
except TME:
pass
context.runner.teardown_databases(context.old_db_config)
context.runner.teardown_test_environment()
# As of Django 1.4, teardown_databases() no longer restores the
# original db name in the connection's settings dict. The reasoning
# is documented in https://code.djangoproject.com/ticket/10868, and
# their concerns are understandable. In our case, however, we're not
# going on to do anything here which might affect the production DB.
# Therefore, the following hack restores pre-1.4 behavior:
for connection, old_name, destroy in context.old_db_config[0]:
connection.settings_dict['NAME'] = old_name
from chroma_cli.api import ApiHandle
ApiHandle.ApiClient = context.old_api_client
from chroma_api.authentication import CsrfAuthentication
CsrfAuthentication.is_authenticated = context.old_is_authenticated
from django.contrib.contenttypes.models import ContentType
ContentType.objects.clear_cache()
from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerClient
JobSchedulerClient.test_host_contact = context.old_test_host_contact
##--def before_scenario(context, scenario):
#-- # Set up the scenario test environment
#-- context.runner.setup_test_environment()
#-- # We must set up and tear down the entire database between
#-- # scenarios. We can't just use db transactions, as Django's
#-- # TestClient does, if we're doing full-stack tests with Mechanize,
#-- # because Django closes the db connection after finishing the HTTP
#-- # response.
#-- context.old_db_config = context.runner.setup_databases()
#--def after_scenario(context, scenario):
#-- # Tear down the scenario test environment.
#-- context.runner.teardown_databases(context.old_db_config)
#-- context.runner.teardown_test_environment()
#-- # Bob's your uncle.