Python django.db.transaction 模块,commit_on_success() 实例源码
我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用django.db.transaction.commit_on_success()。
def file_create(request):
if not can_upload(request.user):
raise Http404
if request.method == "POST":
form = FileUploadForm(request.POST, request.FILES)
if form.is_valid():
with transaction.commit_on_success():
kwargs = {
"file": form.cleaned_data["file"],
}
File.objects.create(**kwargs)
return redirect("file_index")
else:
form = FileUploadForm()
ctx = {
"form": form,
}
return render(request, "cms/file_create.html", ctx)
def nested_commit_on_success(func):
"""Like commit_on_success, but doesn't commit existing transactions.
This decorator is used to run a function within the scope of a
database transaction, committing the transaction on success and
rolling it back if an exception occurs.
Unlike the standard transaction.commit_on_success decorator, this
version first checks whether a transaction is already active. If so
then it doesn't perform any commits or rollbacks, leaving that up to
whoever is managing the active transaction.
"""
from django.db import transaction
commit_on_success = transaction.commit_on_success(func)
def _nested_commit_on_success(*args, **kwds):
if transaction.is_managed():
return func(*args, **kwds)
else:
return commit_on_success(*args, **kwds)
return transaction.wraps(func)(_nested_commit_on_success)
def update(self, stat_name, stat_properties, stat_data):
from chroma_core.lib.storage_plugin.api import statistics
if isinstance(stat_properties, statistics.BytesHistogram):
# Histograms
for dp in stat_data:
ts = dp['timestamp']
bin_vals = dp['value']
from django.db import transaction
with transaction.commit_on_success():
time = SimpleHistoStoreTime.objects.create(time = ts, storage_resource_statistic = self)
for i in range(0, len(stat_properties.bins)):
SimpleHistoStoreBin.objects.create(bin_idx = i, value = bin_vals[i], histo_store_time = time)
# Only keep latest time
SimpleHistoStoreTime.objects.filter(~Q(id = time.id), storage_resource_statistic = self).delete()
return []
for i in stat_data:
i['value'] = float(i['value'])
return self.metrics.serialize(stat_name, stat_properties, stat_data)
def update_corosync_configuration(self, corosync_configuration_id, mcast_port, network_interface_ids):
with self._lock:
with transaction.commit_on_success():
# For now we only support 1 or 2 network configurations, jobs aren't so helpful at supporting lists
corosync_configuration = CorosyncConfiguration.objects.get(id=corosync_configuration_id)
assert len(network_interface_ids) == 1 or len(network_interface_ids) == 2
network_interface_0 = NetworkInterface.objects.get(id = network_interface_ids[0])
network_interface_1 = None if len(network_interface_ids) == 1 else NetworkInterface.objects.get(id = network_interface_ids[1])
command_id = CommandPlan(self._lock_cache, self._job_collection).command_run_jobs_preserve_states(
[{"class_name": corosync_configuration.configure_job_name,
"args": {"corosync_configuration": corosync_configuration,
"mcast_port": mcast_port,
"network_interface_0": network_interface_0,
"network_interface_1": network_interface_1}}],
[corosync_configuration, corosync_configuration.host.pacemaker_configuration],
"Update Corosync Configuration on host %s" % corosync_configuration.host.fqdn)
self.progress.advance()
return command_id
def _create_client_mount(self, host, filesystem, mountpoint):
# Used for intra-JobScheduler calls
log.debug("Creating client mount for %s as %s:%s" % (filesystem, host, mountpoint))
with self._lock:
from django.db import transaction
with transaction.commit_on_success():
mount, created = LustreClientMount.objects.get_or_create(
host = host,
filesystem = filesystem)
mount.mountpoint = mountpoint
mount.save()
ObjectCache.add(LustreClientMount, mount)
if created:
log.info("Created client mount: %s" % mount)
return mount
def change(self, request, new_email, confirm=True):
"""
Given a new email address, change self and re-confirm.
"""
try:
atomic_transaction = transaction.atomic
except AttributeError:
atomic_transaction = transaction.commit_on_success
with atomic_transaction():
user_email(self.user, new_email)
self.user.save()
self.email = new_email
self.verified = False
self.save()
if confirm:
self.send_confirmation(request)
def change(self, request, new_email, confirm=True):
"""
Given a new email address, change self and re-confirm.
"""
try:
atomic_transaction = transaction.atomic
except AttributeError:
atomic_transaction = transaction.commit_on_success
with atomic_transaction():
user_email(self.user, new_email)
self.user.save()
self.email = new_email
self.verified = False
self.save()
if confirm:
self.send_confirmation(request)
def change(self, new_email, confirm=True):
"""
Given a new email address, change self and re-confirm.
"""
#with transaction.commit_on_success():
#todo: no longer exists as of django 1.6
#commented out for now
#fix later, see https://docs.djangoproject.com/en/1.8/topics/db/transactions/
self.user.email = new_email
self.user.save()
self.email = new_email
self.verified = False
self.save()
if confirm:
self.send_confirmation()
def save(self, must_create=False):
"""
Saves the current session data to the database. If 'must_create' is
True, a database error will be raised if the saving operation doesn't
create a *new* entry (as opposed to possibly updating an existing
entry).
"""
obj = Session(
session_key=self._get_or_create_session_key(),
session_data=self.encode(self._get_session(no_load=must_create)),
expire_date=self.get_expiry_date(),
user_agent=self.user_agent,
user_id=self.user_id,
ip=self.ip,
)
using = router.db_for_write(Session, instance=obj)
try:
if django.VERSION >= (1, 6):
with transaction.atomic(using):
obj.save(force_insert=must_create, using=using)
else:
with transaction.commit_on_success(using):
obj.save(force_insert=must_create, using=using)
except IntegrityError as e:
if must_create and 'session_key' in str(e):
raise CreateError
raise
def view_with_context_manager(request):
with commit_on_success():
return HttpResponse('Request in a transaction')
def managed_transaction(func):
""" This decorator wraps a function so that all sql executions in the function are atomic
It's used instead of django.db.transaction.commit_on_success in cases where reporting exceptions is necessary
as commit_on_success swallows exceptions
"""
@wraps(func)
@transaction.commit_manually
def _inner(*args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
transaction.rollback()
raise
else:
transaction.commit()
return ret
return _inner
def validate_token(key, credits=1):
"""
Validate that a token is valid to authorize a setup/register operation:
* Check it's not expired
* Check it has some credits
:param credits: number of credits to decrement if valid
:return 2-tuple (<http response if error, else None>, <registration token if valid, else None>)
"""
try:
with transaction.commit_on_success():
token = RegistrationToken.objects.get(secret = key)
if not token.credits:
log.warning("Attempt to register with exhausted token %s" % key)
return HttpForbidden(), None
else:
# Decrement .credits
RegistrationToken.objects.filter(secret = key).update(credits = token.credits - credits)
except RegistrationToken.DoesNotExist:
log.warning("Attempt to register with non-existent token %s" % key)
return HttpForbidden(), None
else:
now = IMLDateTime.utcnow()
if token.expiry < now:
log.warning("Attempt to register with expired token %s (now %s, expired at %s)" % (key, now, token.expiry))
return HttpForbidden(), None
elif token.cancelled:
log.warning("Attempt to register with cancelled token %s" % key)
return HttpForbidden(), None
return None, token
def get_steps(self):
from chroma_core.models.registration_token import RegistrationToken
# Commit token so that registration request handler will see it
with transaction.commit_on_success():
token = RegistrationToken.objects.create(credits=1, profile=self.managed_host.server_profile)
return [
(DeployStep, {
'token': token,
'host': self.managed_host,
'profile_name': self.managed_host.server_profile.name,
'__auth_args': self.auth_args},)
]
def run(self, kwargs):
from chroma_core.models import ManagedHost
from chroma_core.lib.detection import DetectScan
# Get all the host data
host_data = {}
threads = []
host_target_devices = defaultdict(list)
for host in ManagedHost.objects.filter(id__in = kwargs['host_ids']):
volume_nodes = VolumeNode.objects.filter(host = host)
for volume_node in volume_nodes:
resource = volume_node.volume.storage_resource.to_resource()
try:
uuid = resource.uuid
except AttributeError:
uuid = None
host_target_devices[host].append({"path": volume_node.path,
"type": resource.device_type(),
"uuid": uuid})
with transaction.commit_on_success():
self.log("Scanning server %s..." % host)
thread = ExceptionThrowingThread(target=self.detect_scan,
args=(host, host_data, host_target_devices[host]))
thread.start()
threads.append(thread)
ExceptionThrowingThread.wait_for_threads(threads) # This will raise an exception if any of the threads raise an exception
with transaction.commit_on_success():
DetectScan(self).run(host_data)
def run(self, kwargs):
job_log.info("%s passed pre-format check, allowing subsequent reformats" % kwargs['target'])
with transaction.commit_on_success():
kwargs['target'].reformat = True
kwargs['target'].save()
def global_remove_resource(self, resource_id):
with self._instance_lock:
with transaction.commit_manually():
# Be extra-sure to see a fresh view (HYD-1301)
transaction.commit()
with transaction.commit_on_success():
log.debug("global_remove_resource: %s" % resource_id)
try:
record = StorageResourceRecord.objects.get(pk = resource_id)
except StorageResourceRecord.DoesNotExist:
log.error("ResourceManager received invalid request to remove non-existent resource %s" % resource_id)
return
self._delete_resource(record)
def add_jobs(self, jobs, command):
"""Add a job, and any others which are required in order to reach its prerequisite state"""
# Important: the Job must not be committed until all
# its dependencies and locks are in.
assert transaction.is_managed()
for job in jobs:
for dependency in self._dep_cache.get(job).all():
if not dependency.satisfied():
log.info("add_jobs: setting required dependency %s %s" % (dependency.stateful_object, dependency.preferred_state))
self._set_state(dependency.get_stateful_object(), dependency.preferred_state, command)
log.info("add_jobs: done checking dependencies")
locks = self._create_locks(job)
job.locks_json = json.dumps([l.to_dict() for l in locks])
self._create_dependencies(job, locks)
with transaction.commit_on_success():
job.save()
log.info("add_jobs: created Job %s (%s)" % (job.pk, job.description()))
for l in locks:
self._lock_cache.add(l)
command.jobs.add(job)
self._job_collection.add_command(command, jobs)
def _handle(self, msg):
fn = getattr(self, "_%s" % msg[0])
# Commit after each message to ensure the next message handler
# doesn't see a stale transaction
with transaction.commit_on_success():
fn(*msg[1], **msg[2])
def _start_step(self, job_id, **kwargs):
with transaction.commit_on_success():
result = StepResult(job_id=job_id, **kwargs)
result.save()
self._job_to_result[job_id] = result
def _console(self, job_id, log_string):
result = self._job_to_result[job_id]
with transaction.commit_on_success():
result.console += log_string
result.save()
def _step_failure(self, job_id, backtrace):
result = self._job_to_result[job_id]
with transaction.commit_on_success():
result.state = 'failed'
result.backtrace = backtrace
result.save()
def _step_success(self, job_id, step_result):
result = self._job_to_result[job_id]
with transaction.commit_on_success():
result.state = 'success'
result.result = json.dumps(step_result)
result.save()
def set_state(self, object_ids, message, run):
with self._lock:
with transaction.commit_on_success():
command = self.CommandPlan.command_set_state(object_ids, message)
if run:
self.progress.advance()
return command.id
def create_copytool(self, copytool_data):
from django.db import transaction
log.debug("Creating copytool from: %s" % copytool_data)
with self._lock:
host = ObjectCache.get_by_id(ManagedHost, int(copytool_data['host']))
copytool_data['host'] = host
filesystem = ObjectCache.get_by_id(ManagedFilesystem, int(copytool_data['filesystem']))
copytool_data['filesystem'] = filesystem
with transaction.commit_on_success():
copytool = Copytool.objects.create(**copytool_data)
# Add the copytool after the transaction commits
ObjectCache.add(Copytool, copytool)
log.debug("Created copytool: %s" % copytool)
mount = self._create_client_mount(host, filesystem, copytool_data['mountpoint'])
# Make the association between the copytool and client mount
with self._lock:
copytool.client_mount = mount
with transaction.commit_on_success():
copytool.save()
ObjectCache.update(copytool)
self.progress.advance()
return copytool.id
def register_copytool(self, copytool_id, uuid):
from django.db import transaction
with self._lock:
copytool = ObjectCache.get_by_id(Copytool, int(copytool_id))
log.debug("Registering copytool %s with uuid %s" % (copytool, uuid))
with transaction.commit_on_success():
copytool.register(uuid)
ObjectCache.update(copytool)
self.progress.advance()
def unregister_copytool(self, copytool_id):
from django.db import transaction
with self._lock:
copytool = ObjectCache.get_by_id(Copytool, int(copytool_id))
log.debug("Unregistering copytool %s" % copytool)
with transaction.commit_on_success():
copytool.unregister()
ObjectCache.update(copytool)
self.progress.advance()
def set_host_profile(self, host_id, server_profile_id):
'''
Set the profile for the given host to the given profile.
:param host_id:
:param server_profile_id:
:return: Command for the host job or None if no commands were created.
'''
with self._lock:
with transaction.commit_on_success():
server_profile = ServerProfile.objects.get(pk=server_profile_id)
host = ObjectCache.get_one(ManagedHost, lambda mh: mh.id == host_id)
commands_required = host.set_profile(server_profile_id)
if commands_required:
command = self.CommandPlan.command_run_jobs(commands_required,
help_text['change_host_profile'] % (host.fqdn, server_profile.ui_name))
else:
command = None
if command:
self.progress.advance()
return command
def create_host(self, fqdn, nodename, address, server_profile_id):
"""
Create a new host, or update a host in the process of being deployed.
"""
server_profile = ServerProfile.objects.get(pk=server_profile_id)
with self._lock:
with transaction.commit_on_success():
try:
# If there is already a host record (SSH-assisted host addition) then
# update it
host = ManagedHost.objects.get(fqdn=fqdn, state='undeployed')
# host.fqdn = fqdn
# host.nodename = nodename
# host.save()
job = DeployHostJob.objects.filter(~Q(state='complete'), managed_host=host)
command = Command.objects.filter(jobs=job)[0]
except ManagedHost.DoesNotExist:
# Else create a new one
host = ManagedHost.objects.create(
fqdn=fqdn,
nodename=nodename,
immutable_state=not server_profile.managed,
address=address,
server_profile=server_profile,
install_method = ManagedHost.INSTALL_MANUAL)
lnet_configuration = LNetConfiguration.objects.create(host=host)
ObjectCache.add(LNetConfiguration, lnet_configuration)
ObjectCache.add(ManagedHost, host)
with transaction.commit_on_success():
command = self.CommandPlan.command_set_state(
[(ContentType.objects.get_for_model(host).natural_key(), host.id, server_profile.initial_state)],
help_text["deploying_host"] % host)
self.progress.advance()
return host.id, command.id
def update_nids(self, nid_list):
# Although this is creating/deleting a NID it actually rewrites the whole NID configuration for the node
# this is all in here for now, but as we move to dynamic lnet it will probably get it's own file.
with self._lock:
lnet_configurations = set()
lnet_nid_data = defaultdict(lambda: {'nid_updates': {}, 'nid_deletes': {}})
for nid_data in nid_list:
network_interface = NetworkInterface.objects.get(id = nid_data['network_interface'])
lnet_configuration = LNetConfiguration.objects.get(host = network_interface.host_id)
lnet_configurations.add(lnet_configuration)
if str(nid_data['lnd_network']) == '-1':
lnet_nid_data[lnet_configuration]['nid_deletes'][network_interface.id] = nid_data
else:
lnet_nid_data[lnet_configuration]['nid_updates'][network_interface.id] = nid_data
jobs = []
for lnet_configuration in lnet_configurations:
jobs.append(ConfigureLNetJob(lnet_configuration = lnet_configuration,
config_changes = json.dumps(lnet_nid_data[lnet_configuration])))
with transaction.commit_on_success():
command = Command.objects.create(message = "Configuring NIDS for hosts")
self.CommandPlan.add_jobs(jobs, command)
self.progress.advance()
return command.id
def trigger_plugin_update(self, include_host_ids, exclude_host_ids, plugin_names):
"""
Cause the plugins on the hosts passed to send an update irrespective of whether any
changes have occurred.
:param include_host_ids: List of host ids to include in the trigger update.
:param exclude_host_ids: List of host ids to exclude from the include list (makes for usage easy)
:param plugin_names: list of plugins to trigger update on - empty list means all.
:return: command id that caused updates to be sent.
"""
host_ids = [host.id for host in ManagedHost.objects.all()] if include_host_ids is None else include_host_ids
host_ids = host_ids if exclude_host_ids is None else list(set(host_ids) - set(exclude_host_ids))
if host_ids:
with self._lock:
jobs = [TriggerPluginUpdatesJob(host_ids=json.dumps(host_ids),
plugin_names_json=json.dumps(plugin_names))]
with transaction.commit_on_success():
command = Command.objects.create(message="%s triggering updates from agents" % ManagedHost.objects.get(id=exclude_host_ids[0]).fqdn)
self.CommandPlan.add_jobs(jobs, command)
self.progress.advance()
return command.id
else:
return None
def _check_size(self):
"""Apply a size limit to the table of log messages"""
MAX_ROWS_PER_TRANSACTION = 10000
removed_num_entries = 0
overflow_filename = os.path.join(settings.LOG_PATH, "db_log")
if self._table_size > settings.DBLOG_HW:
remove_num_entries = self._table_size - settings.DBLOG_LW
trans_size = min(MAX_ROWS_PER_TRANSACTION, remove_num_entries)
with transaction.commit_on_success():
while remove_num_entries > 0:
removed_entries = LogMessage.objects.all().order_by('id')[0:trans_size]
self.log.debug("writing %s batch of entries" % trans_size)
try:
f = open(overflow_filename, "a")
for line in removed_entries:
f.write("%s\n" % line.__str__())
LogMessage.objects.filter(id__lte = removed_entries[-1].id).delete()
except Exception, e:
self.log.error("error opening/writing/closing the db_log: %s" % e)
finally:
f.close()
remove_num_entries -= trans_size
removed_num_entries += trans_size
if remove_num_entries < trans_size:
trans_size = remove_num_entries
self._table_size -= removed_num_entries
self.log.info("Wrote %s DB log entries to %s" % (removed_num_entries, overflow_filename))
return removed_num_entries
def remove_host(self, fqdn):
log.info("remove_host: %s" % fqdn)
self.sessions.remove_host(fqdn)
self.queues.remove_host(fqdn)
self.hosts.remove_host(fqdn)
with transaction.commit_on_success():
for cert in ClientCertificate.objects.filter(host__fqdn = fqdn, revoked = False):
log.info("Revoking %s:%s" % (fqdn, cert.serial))
self.valid_certs.pop(cert.serial, None)
ClientCertificate.objects.filter(host__fqdn = fqdn, revoked = False).update(revoked = True)
# TODO: ensure there are no GETs left in progress after this completes
# TODO: drain plugin_rx_queue so that anything we will send to AMQP has been sent before this returns
def change(self, new_email, confirm=True):
"""
Given a new email address, change self and re-confirm.
"""
with transaction.commit_on_success():
self.user.email = new_email
self.user.save()
self.email = new_email
self.verified = False
self.save()
if confirm:
self.send_confirmation()
def complete_job(self, job_id, errored = False, cancelled = False):
# TODO: document the rules here: jobs may only modify objects that they
# have taken out a writelock on, and they may only modify instances obtained
# via ObjectCache, or via their stateful_object attribute. Jobs may not
# modify objects via .update() calls, all changes must be done on loaded instances.
# They do not have to .save() their stateful_object, but they do have to .save()
# any other objects that they modify (having obtained their from ObjectCache and
# held a writelock on them)
job = self._job_collection.get(job_id)
with self._lock:
with transaction.commit_on_success():
if not errored and not cancelled:
try:
job.on_success()
except Exception:
log.error("Error in Job %s on_success:%s" % (job.id, traceback.format_exc()))
errored = True
log.info("Job %d: complete_job: Updating cache" % job.pk)
# Freshen cached information about anything that this job held a writelock on
for lock in self._lock_cache.get_by_job(job):
if lock.write:
if hasattr(lock.locked_item, 'not_deleted'):
log.info("Job %d: locked_item %s %s %s %s" % (
job.id,
id(lock.locked_item),
lock.locked_item.__class__,
isinstance(lock.locked_item, DeletableStatefulObject),
lock.locked_item.not_deleted
))
if hasattr(lock.locked_item, 'not_deleted') and lock.locked_item.not_deleted is None:
log.debug("Job %d: purging %s/%s" %
(job.id, lock.locked_item.__class__, lock.locked_item.id))
ObjectCache.purge(lock.locked_item.__class__, lambda o: o.id == lock.locked_item.id)
else:
log.debug("Job %d: updating write-locked %s/%s" %
(job.id, lock.locked_item.__class__, lock.locked_item.id))
# Ensure that any notifications prior to release of the writelock are not
# applied
if hasattr(lock.locked_item, 'state_modified_at'):
lock.locked_item.__class__.objects.filter(pk=lock.locked_item.pk).update(
state_modified_at=django.utils.timezone.now())
ObjectCache.update(lock.locked_item)
if job.state != 'tasked':
# This happens if a Job is cancelled while it's calling this
log.info("Job %s has state %s in complete_job" % (job.id, job.state))
return
self._complete_job(job, errored, cancelled)
with transaction.commit_on_success():
self._drain_notification_buffer()
self._run_next()