我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用django.db.transaction.on_commit()。
def on_action_save(self, sender, instance, created, raw, **kwargs): if created and not raw: transaction.on_commit(lambda: send_notifications(instance))
def provision_run(self, spark_job, first_run=False): """ Actually run the given Spark job. If this is the first run we'll update the "last_run_at" value to the start date of the spark_job so Celery beat knows what's going on. """ spark_job.run() if first_run: def update_last_run_at(): schedule_entry = spark_job.schedule.get() if schedule_entry is None: schedule_entry = spark_job.schedule.add() schedule_entry.reschedule(last_run_at=spark_job.start_date) transaction.on_commit(update_last_run_at)
def save(self, *args, **kwargs): # whether the job is being created for the first time first_save = self.pk is None # resetting expired_date in case a user resets the end_date if self.expired_date and self.end_date and self.end_date > timezone.now(): self.expired_date = None super().save(*args, **kwargs) # Remove the cached latest run to this objects will requery it. try: delattr(self, 'latest_run') except AttributeError: # pragma: no cover pass # It didn't have a `latest_run` and that's ok. # first remove if it exists self.schedule.delete() # and then add it, but only if the end date is in the future if self.has_future_end_date(timezone.now()): self.schedule.add() if first_save: transaction.on_commit(self.first_run)
def _send_notify(self, message_type, updated_fields=None, data=None, always_send=True): group_name = self._get_group_name(message_type=message_type) message_data = { 'group': group_name, 'type': self._get_message_type(message_type), 'object': data or self._get_push_data(updated_fields=updated_fields, message_type=message_type), } if updated_fields and 'updated_fields' not in message_data['object']: message_data['object']['updated_fields'] = updated_fields if message_type == 'update' and updated_fields and always_send is False: if not set(updated_fields) & set(message_data['object'].keys()): return payload = { 'text': json.dumps(build_message( generate_id=True, handler='subscription-update', **message_data ), cls=self.encoder) } on_transaction_commit( lambda: Group(group_name).send(payload) )
def save(self, **kwargs): new = self.pk is None if not new and (self.was_processed or not self.processed): raise TypeError super().save(**kwargs) with suppress(FileExistsError): os.mkdir(os.path.dirname(self._changed_geometries_filename())) from c3nav.mapdata.utils.cache.changes import changed_geometries pickle.dump(changed_geometries, open(self._changed_geometries_filename(), 'wb')) if new: transaction.on_commit( lambda: cache.set('mapdata:last_update', self.to_tuple, 300) ) if settings.HAS_CELERY: transaction.on_commit( lambda: process_map_updates.delay() )
def update_history(instance, **kwargs): if instance.body != instance.db_body \ or instance.layout != instance.db_layout \ or instance.subject != instance.db_subject: instance.version += 1 if instance.version != instance.db_version: version = instance.version @transaction.on_commit def save(): instance.history.create( version=version, body=instance.body, subject=instance.subject, layout=instance.layout, )
def model_post_save(sender, instance, created=False, **kwargs): """ Signal emitted after any model is saved via Django ORM. :param sender: Model class that was saved :param instance: The actual instance that was saved :param created: True if a new row was created """ def notify(): table = sender._meta.db_table if created: observer_client.notify_table_insert(table) else: observer_client.notify_table_update(table) transaction.on_commit(notify)
def model_m2m_changed(sender, instance, action, **kwargs): """ Signal emitted after any M2M relation changes via Django ORM. :param sender: M2M intermediate model :param instance: The actual instance that was saved :param action: M2M action """ def notify(): table = sender._meta.db_table if action == 'post_add': observer_client.notify_table_insert(table) elif action in ('post_remove', 'post_clear'): observer_client.notify_table_remove(table) transaction.on_commit(notify)
def profile_following_change(sender, instance, action, pk_set, **kwargs): """Deliver notification on new followers.""" logger.debug("profile_following_change - sender %s, instance %s, action %s, pk_set %s, kwargs: %s", sender, instance, action, pk_set, kwargs) if action in ["post_add", "post_remove"]: logger.debug("profile_following_change - Got %s from %s for %s", action, sender, instance) logger.debug("profile_following_change - pk_set %s", pk_set) transaction.on_commit(lambda: on_commit_profile_following_change(action, pk_set, instance))
def profile_post_save(instance, **kwargs): if instance.is_local: transaction.on_commit(lambda: federate_profile(instance))
def content_post_save(instance, **kwargs): fetch_preview(instance) render_content(instance) if kwargs.get("created"): notify_listeners(instance) if instance.content_type == ContentType.REPLY: transaction.on_commit(lambda: django_rq.enqueue(send_reply_notifications, instance.id)) elif instance.content_type == ContentType.SHARE and instance.share_of.local: transaction.on_commit(lambda: django_rq.enqueue(send_share_notification, instance.id)) transaction.on_commit(lambda: update_streams_with_content(instance)) if instance.local: transaction.on_commit(lambda: federate_content(instance))
def on_commit(self, fun, *args, **kwargs): if args or kwargs: fun = partial(fun, *args, **kwargs) if on_commit is not None: try: return on_commit(fun) except TransactionManagementError: pass # not in transaction management, execute now. return fun()
def sync_user_profile(sender, instance, created, **kwargs): # pylint: disable=unused-argument """ Signal handler create/update a DiscussionUser every time a profile is created/updated """ if not settings.FEATURES.get('OPEN_DISCUSSIONS_USER_SYNC', False): return transaction.on_commit(lambda: tasks.sync_discussion_user.delay(instance.user_id))
def handle_create_programenrollment(sender, instance, created, **kwargs): # pylint: disable=unused-argument """ When a ProgramEnrollment model is created/updated, update index. """ transaction.on_commit(lambda: index_program_enrolled_users.delay([instance.id]))
def handle_delete_programenrollment(sender, instance, **kwargs): # pylint: disable=unused-argument """ When a ProgramEnrollment model is deleted, update index. """ enrollment_id = instance.id # this is modified in-place on delete, so store it on a local transaction.on_commit(lambda: remove_program_enrolled_user.delay(enrollment_id))
def handle_create_coursecertificate(sender, instance, created, **kwargs): # pylint: disable=unused-argument """ When a MicromastersCourseCertificate model is created """ if created: user = instance.final_grade.user program = instance.final_grade.course_run.course.program transaction.on_commit(lambda: generate_program_certificate(user, program))
def handle_update_profile(sender, instance, **kwargs): """Update index when Profile model is updated.""" transaction.on_commit(lambda: index_users.delay([instance.user.id], check_if_changed=True))
def handle_update_employment(sender, instance, **kwargs): """Update index when Employment model is updated.""" transaction.on_commit(lambda: index_users.delay([instance.profile.user.id], check_if_changed=True))
def handle_delete_education(sender, instance, **kwargs): """Update index when Education model instance is deleted.""" transaction.on_commit(lambda: index_users.delay([instance.profile.user.id]))
def handle_delete_employment(sender, instance, **kwargs): """Update index when Employment model instance is deleted.""" transaction.on_commit(lambda: index_users.delay([instance.profile.user.id]))
def handle_update_percolate(sender, instance, **kwargs): """When a new query is created or a query is updated, update Elasticsearch too""" transaction.on_commit(lambda: index_percolate_queries.delay([instance.id]))
def handle_delete_percolate(sender, instance, **kwargs): """When a query is deleted, make sure we also delete it on Elasticsearch""" transaction.on_commit(lambda: delete_percolate_query.delay(instance.id))
def handle_remove_role(sender, instance, **kwargs): """Update index when Role model instance is deleted.""" transaction.on_commit(lambda: index_users.delay([instance.user.id]))
def save(self, *args, **kwargs): created = not self.pk super(Key, self).save(*args, **kwargs) if created: from .tasks import update_or_create_key transaction.on_commit(lambda: update_or_create_key.delay(self.uid))
def render_submission_form(submission_form_id=None): logger = render_submission_form.get_logger() # XXX: Look to wait for submission form to appear. The celery task is # triggered on submit before the request transaction is committed, so we # have to wait. We should start using transaction.on_commit() as soon as # we've updated to Django 1.9. for i in range(60): with transaction.atomic(): try: sf = SubmissionForm.unfiltered.get(id=submission_form_id) except SubmissionForm.DoesNotExist: pass else: sf.render_pdf_document() break time.sleep(1) else: logger.error("SubmissionForm(id=%d) didn't appear", submission_form_id) return
def save(self, *args, **kwargs): created = False if not self.pk: created = True # If no API key exists, just produce one. # Typically: at instance creation with transaction.atomic(): if not self.secret: self.regen_secret() super().save(*args, **kwargs) if created and not self.password: transaction.on_commit(self.send_invitation_email)
def post(self, request, *args, **kwargs): data = request.POST from_name, from_email = parseaddr(data['from']) msg = IncomingMessage.objects.create( body_html=data.get('html', ''), body_text=data.get('text', ''), from_email=from_email, from_name=from_name, original_post_data=dict(data), subject=data.get('subject', '<No subject>'), to_email=json.loads(data['envelope'])['to'][0], ) for name, info in json.loads(data.get('attachment-info', '{}')).items(): attachment = Attachment( content_id=info.get('content-id', ''), content_type=info.get('type', ''), file=request.FILES[name], msg=msg, ) if attachment.content_type: attachment.file.content_type = attachment.content_type attachment.save() transaction.on_commit(partial(process_incoming_message.delay, msg.id)) return HttpResponse()
def destination_post_save(instance, **kwargs): if hasattr(transaction, 'on_commit'): transaction.on_commit(clear_destination_cache) else: clear_destination_cache() # noinspection PyUnusedLocal
def destination_post_delete(instance, **kwargs): if hasattr(transaction, 'on_commit'): transaction.on_commit(clear_destination_cache) else: clear_destination_cache() # noinspection PyUnusedLocal
def router_post_save(instance, **kwargs): if hasattr(transaction, 'on_commit'): transaction.on_commit(clear_router_cache) else: clear_router_cache() # noinspection PyUnusedLocal
def router_deleted(instance, **kwargs): if hasattr(transaction, 'on_commit'): transaction.on_commit(clear_router_cache) else: clear_router_cache()
def run(self): """Actually run the scheduled Spark job.""" # if the job ran before and is still running, don't start it again if not self.is_runnable: return jobflow_id = self.provisioner.run( user_username=self.created_by.username, user_email=self.created_by.email, identifier=self.identifier, emr_release=self.emr_release.version, size=self.size, notebook_key=self.notebook_s3_key, is_public=self.is_public, job_timeout=self.job_timeout, ) # Create new job history record. run = self.runs.create( spark_job=self, jobflow_id=jobflow_id, scheduled_at=timezone.now(), emr_release_version=self.emr_release.version, size=self.size, ) # Remove the cached latest run to this objects will requery it. try: delattr(self, 'latest_run') except AttributeError: # pragma: no cover pass # It didn't have a `latest_run` and that's ok. with transaction.atomic(): Metric.record('sparkjob-emr-version', data={'version': self.emr_release.version}) # sync with EMR API transaction.on_commit(run.sync)
def save(self, *args, **kwargs): """Insert the cluster into the database or update it if already present, spawning the cluster if it's not already spawned. """ # actually start the cluster if self.jobflow_id is None: self.jobflow_id = self.provisioner.start( user_username=self.created_by.username, user_email=self.created_by.email, identifier=self.identifier, emr_release=self.emr_release.version, size=self.size, public_key=self.ssh_key.key, ) # once we've stored the jobflow id we can fetch the status for the first time transaction.on_commit(self.sync) with transaction.atomic(): Metric.record('cluster-emr-version', data={'version': self.emr_release.version}) # set the dates if not self.expires_at: # clusters should expire after the lifetime it's set to self.expires_at = timezone.now() + timedelta(hours=self.lifetime) super().save(*args, **kwargs)
def on_transaction_commit(func): func()
def start_grader_communication(solution_id: int, solution_model: str): transaction.on_commit(lambda: submit_solution.delay(solution_id, solution_model))
def save(self, *args, **kwargs): with transaction.atomic(): super().save(*args, **kwargs) transaction.on_commit(lambda: cache.delete(self.user_access_permission_key(self.user_id)))
def delete(self, *args, **kwargs): with transaction.atomic(): super().delete(*args, **kwargs) transaction.on_commit(lambda: cache.delete(self.user_access_permission_key(self.user_id)))
def model_post_delete(sender, instance, **kwargs): """ Signal emitted after any model is deleted via Django ORM. :param sender: Model class that was deleted :param instance: The actual instance that was removed """ def notify(): table = sender._meta.db_table observer_client.notify_table_remove(table) transaction.on_commit(notify)
def update_clusters(self): """ Update the cluster metadata from AWS for the pending clusters. - To be used periodically. - Won't update state if not needed. - Will queue updating the Cluster's public IP address if needed. """ # only update the cluster info for clusters that are pending active_clusters = Cluster.objects.active() # Short-circuit for no active clusters (e.g. on weekends) if not active_clusters.exists(): return [] # get the start dates of the active clusters, set to the start of the day # to counteract time differences between atmo and AWS and use the oldest # start date to limit the ListCluster API call to AWS oldest_created_at = active_clusters.datetimes('created_at', 'day') try: # build a mapping between jobflow ID and cluster info cluster_mapping = {} provisioner = ClusterProvisioner() cluster_list = provisioner.list( created_after=oldest_created_at[0] ) for cluster_info in cluster_list: cluster_mapping[cluster_info['jobflow_id']] = cluster_info # go through pending clusters and update the state if needed updated_clusters = [] for cluster in active_clusters: with transaction.atomic(): info = cluster_mapping.get(cluster.jobflow_id) # ignore if no info was found for some reason, # the cluster was deleted in AWS but it wasn't deleted here yet if info is None: continue # update cluster status cluster.sync(info) updated_clusters.append(cluster.identifier) # if not given enqueue a job to update the public IP address # but only if the cluster is running or waiting, so the # API call isn't wasted if (not cluster.master_address and cluster.most_recent_status in cluster.READY_STATUS_LIST): transaction.on_commit( lambda: update_master_address.delay(cluster.id) ) return updated_clusters except ClientError as exc: self.retry( exc=exc, countdown=celery.backoff(self.request.retries), )
def process_updates(cls): logger = logging.getLogger('c3nav') with transaction.atomic(): new_updates = tuple(cls.objects.filter(processed=False).select_for_update(nowait=True)) if not new_updates: return () from c3nav.mapdata.utils.cache.changes import changed_geometries changed_geometries.reset() logger.info('Recalculating altitude areas...') from c3nav.mapdata.models import AltitudeArea AltitudeArea.recalculate() logger.info('%.3f m² of altitude areas affected.' % changed_geometries.area) last_processed_update = cls.objects.filter(processed=True).latest().to_tuple for new_update in new_updates: logger.info('Applying changed geometries from MapUpdate #%(id)s (%(type)s)...' % {'id': new_update.pk, 'type': new_update.type}) try: new_changes = pickle.load(open(new_update._changed_geometries_filename(), 'rb')) except FileNotFoundError: logger.warning('changed_geometries pickle file not found.') else: logger.info('%.3f m² affected by this update.' % new_changes.area) changed_geometries.combine(new_changes) new_update.processed = True new_update.save() logger.info('%.3f m² of geometries affected in total.' % changed_geometries.area) changed_geometries.save(last_processed_update, new_updates[-1].to_tuple) logger.info('Rebuilding level render data...') from c3nav.mapdata.render.renderdata import LevelRenderData LevelRenderData.rebuild() logger.info('Rebuilding router...') from c3nav.routing.router import Router Router.rebuild() transaction.on_commit( lambda: cache.set('mapdata:last_processed_updatee', new_updates[-1].totuple, 300) ) return new_updates