Python django.db.transaction 模块,on_commit() 实例源码

我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用django.db.transaction.on_commit()

项目:buggy    作者:fusionbox    | 项目源码 | 文件源码
def on_action_save(self, sender, instance, created, raw, **kwargs):
        if created and not raw:
            transaction.on_commit(lambda: send_notifications(instance))
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def provision_run(self, spark_job, first_run=False):
        """
        Actually run the given Spark job.

        If this is the first run we'll update the "last_run_at" value
        to the start date of the spark_job so Celery beat knows what's
        going on.
        """
        spark_job.run()
        if first_run:
            def update_last_run_at():
                schedule_entry = spark_job.schedule.get()
                if schedule_entry is None:
                    schedule_entry = spark_job.schedule.add()
                schedule_entry.reschedule(last_run_at=spark_job.start_date)
            transaction.on_commit(update_last_run_at)
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        # whether the job is being created for the first time
        first_save = self.pk is None
        # resetting expired_date in case a user resets the end_date
        if self.expired_date and self.end_date and self.end_date > timezone.now():
            self.expired_date = None
        super().save(*args, **kwargs)
        # Remove the cached latest run to this objects will requery it.
        try:
            delattr(self, 'latest_run')
        except AttributeError:  # pragma: no cover
            pass  # It didn't have a `latest_run` and that's ok.
        # first remove if it exists
        self.schedule.delete()
        # and then add it, but only if the end date is in the future
        if self.has_future_end_date(timezone.now()):
            self.schedule.add()
        if first_save:
            transaction.on_commit(self.first_run)
项目:rohrpost    作者:axsemantics    | 项目源码 | 文件源码
def _send_notify(self, message_type, updated_fields=None, data=None, always_send=True):
        group_name = self._get_group_name(message_type=message_type)
        message_data = {
            'group': group_name,
            'type': self._get_message_type(message_type),
            'object': data or self._get_push_data(updated_fields=updated_fields, message_type=message_type),
        }
        if updated_fields and 'updated_fields' not in message_data['object']:
            message_data['object']['updated_fields'] = updated_fields

        if message_type == 'update' and updated_fields and always_send is False:
            if not set(updated_fields) & set(message_data['object'].keys()):
                return

        payload = {
            'text': json.dumps(build_message(
                generate_id=True,
                handler='subscription-update',
                **message_data
            ), cls=self.encoder)
        }

        on_transaction_commit(
            lambda: Group(group_name).send(payload)
        )
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def save(self, **kwargs):
        new = self.pk is None
        if not new and (self.was_processed or not self.processed):
            raise TypeError

        super().save(**kwargs)

        with suppress(FileExistsError):
            os.mkdir(os.path.dirname(self._changed_geometries_filename()))

        from c3nav.mapdata.utils.cache.changes import changed_geometries
        pickle.dump(changed_geometries, open(self._changed_geometries_filename(), 'wb'))

        if new:
            transaction.on_commit(
                lambda: cache.set('mapdata:last_update', self.to_tuple, 300)
            )
            if settings.HAS_CELERY:
                transaction.on_commit(
                    lambda: process_map_updates.delay()
                )
项目:django-happymailer    作者:barbuza    | 项目源码 | 文件源码
def update_history(instance, **kwargs):
    if instance.body != instance.db_body \
            or instance.layout != instance.db_layout \
            or instance.subject != instance.db_subject:
        instance.version += 1

    if instance.version != instance.db_version:
        version = instance.version

        @transaction.on_commit
        def save():
            instance.history.create(
                version=version,
                body=instance.body,
                subject=instance.subject,
                layout=instance.layout,
            )
项目:django-rest-framework-reactive    作者:genialis    | 项目源码 | 文件源码
def model_post_save(sender, instance, created=False, **kwargs):
    """
    Signal emitted after any model is saved via Django ORM.

    :param sender: Model class that was saved
    :param instance: The actual instance that was saved
    :param created: True if a new row was created
    """

    def notify():
        table = sender._meta.db_table
        if created:
            observer_client.notify_table_insert(table)
        else:
            observer_client.notify_table_update(table)

    transaction.on_commit(notify)
项目:django-rest-framework-reactive    作者:genialis    | 项目源码 | 文件源码
def model_m2m_changed(sender, instance, action, **kwargs):
    """
    Signal emitted after any M2M relation changes via Django ORM.

    :param sender: M2M intermediate model
    :param instance: The actual instance that was saved
    :param action: M2M action
    """

    def notify():
        table = sender._meta.db_table
        if action == 'post_add':
            observer_client.notify_table_insert(table)
        elif action in ('post_remove', 'post_clear'):
            observer_client.notify_table_remove(table)

    transaction.on_commit(notify)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def profile_following_change(sender, instance, action, pk_set, **kwargs):
    """Deliver notification on new followers."""
    logger.debug("profile_following_change - sender %s, instance %s, action %s, pk_set %s, kwargs: %s",
                 sender, instance, action, pk_set, kwargs)
    if action in ["post_add", "post_remove"]:
        logger.debug("profile_following_change - Got %s from %s for %s", action, sender, instance)
        logger.debug("profile_following_change - pk_set %s", pk_set)
        transaction.on_commit(lambda: on_commit_profile_following_change(action, pk_set, instance))
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def profile_post_save(instance, **kwargs):
    if instance.is_local:
        transaction.on_commit(lambda: federate_profile(instance))
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def content_post_save(instance, **kwargs):
    fetch_preview(instance)
    render_content(instance)
    if kwargs.get("created"):
        notify_listeners(instance)
        if instance.content_type == ContentType.REPLY:
            transaction.on_commit(lambda: django_rq.enqueue(send_reply_notifications, instance.id))
        elif instance.content_type == ContentType.SHARE and instance.share_of.local:
            transaction.on_commit(lambda: django_rq.enqueue(send_share_notification, instance.id))
        transaction.on_commit(lambda: update_streams_with_content(instance))
    if instance.local:
        transaction.on_commit(lambda: federate_content(instance))
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def on_commit(self, fun, *args, **kwargs):
        if args or kwargs:
            fun = partial(fun, *args, **kwargs)
        if on_commit is not None:
            try:
                return on_commit(fun)
            except TransactionManagementError:
                pass  # not in transaction management, execute now.
        return fun()
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def sync_user_profile(sender, instance, created, **kwargs):  # pylint: disable=unused-argument
    """
    Signal handler create/update a DiscussionUser every time a profile is created/updated
    """
    if not settings.FEATURES.get('OPEN_DISCUSSIONS_USER_SYNC', False):
        return
    transaction.on_commit(lambda: tasks.sync_discussion_user.delay(instance.user_id))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_create_programenrollment(sender, instance, created, **kwargs):  # pylint: disable=unused-argument
    """
    When a ProgramEnrollment model is created/updated, update index.
    """
    transaction.on_commit(lambda: index_program_enrolled_users.delay([instance.id]))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_delete_programenrollment(sender, instance, **kwargs):  # pylint: disable=unused-argument
    """
    When a ProgramEnrollment model is deleted, update index.
    """
    enrollment_id = instance.id  # this is modified in-place on delete, so store it on a local
    transaction.on_commit(lambda: remove_program_enrolled_user.delay(enrollment_id))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_create_coursecertificate(sender, instance, created, **kwargs):  # pylint: disable=unused-argument
    """
    When a MicromastersCourseCertificate model is created
    """
    if created:
        user = instance.final_grade.user
        program = instance.final_grade.course_run.course.program
        transaction.on_commit(lambda: generate_program_certificate(user, program))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_update_profile(sender, instance, **kwargs):
    """Update index when Profile model is updated."""
    transaction.on_commit(lambda: index_users.delay([instance.user.id], check_if_changed=True))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_update_employment(sender, instance, **kwargs):
    """Update index when Employment model is updated."""
    transaction.on_commit(lambda: index_users.delay([instance.profile.user.id], check_if_changed=True))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_delete_education(sender, instance, **kwargs):
    """Update index when Education model instance is deleted."""
    transaction.on_commit(lambda: index_users.delay([instance.profile.user.id]))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_delete_employment(sender, instance, **kwargs):
    """Update index when Employment model instance is deleted."""
    transaction.on_commit(lambda: index_users.delay([instance.profile.user.id]))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_update_percolate(sender, instance, **kwargs):
    """When a new query is created or a query is updated, update Elasticsearch too"""
    transaction.on_commit(lambda: index_percolate_queries.delay([instance.id]))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_delete_percolate(sender, instance, **kwargs):
    """When a query is deleted, make sure we also delete it on Elasticsearch"""
    transaction.on_commit(lambda: delete_percolate_query.delay(instance.id))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def handle_remove_role(sender, instance, **kwargs):
    """Update index when Role model instance is deleted."""
    transaction.on_commit(lambda: index_users.delay([instance.user.id]))
项目:buildinfo.debian.net    作者:lamby    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        created = not self.pk

        super(Key, self).save(*args, **kwargs)

        if created:
            from .tasks import update_or_create_key

            transaction.on_commit(lambda: update_or_create_key.delay(self.uid))
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def render_submission_form(submission_form_id=None):
    logger = render_submission_form.get_logger()

    # XXX: Look to wait for submission form to appear. The celery task is
    # triggered on submit before the request transaction is committed, so we
    # have to wait. We should start using transaction.on_commit() as soon as
    # we've updated to Django 1.9.
    for i in range(60):
        with transaction.atomic():
            try:
                sf = SubmissionForm.unfiltered.get(id=submission_form_id)
            except SubmissionForm.DoesNotExist:
                pass
            else:
                sf.render_pdf_document()
                break
        time.sleep(1)
    else:
        logger.error("SubmissionForm(id=%d) didn't appear", submission_form_id)
        return
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        created = False
        if not self.pk:
            created = True

        # If no API key exists, just produce one.
        # Typically: at instance creation
        with transaction.atomic():
            if not self.secret:
                self.regen_secret()
            super().save(*args, **kwargs)

        if created and not self.password:
            transaction.on_commit(self.send_invitation_email)
项目:fire    作者:FundersClub    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        data = request.POST
        from_name, from_email = parseaddr(data['from'])

        msg = IncomingMessage.objects.create(
            body_html=data.get('html', ''),
            body_text=data.get('text', ''),
            from_email=from_email,
            from_name=from_name,
            original_post_data=dict(data),
            subject=data.get('subject', '<No subject>'),
            to_email=json.loads(data['envelope'])['to'][0],
        )

        for name, info in json.loads(data.get('attachment-info', '{}')).items():
            attachment = Attachment(
                content_id=info.get('content-id', ''),
                content_type=info.get('type', ''),
                file=request.FILES[name],
                msg=msg,
            )
            if attachment.content_type:
                attachment.file.content_type = attachment.content_type
            attachment.save()

        transaction.on_commit(partial(process_incoming_message.delay, msg.id))
        return HttpResponse()
项目:django-route    作者:vinayinvicible    | 项目源码 | 文件源码
def destination_post_save(instance, **kwargs):
    if hasattr(transaction, 'on_commit'):
        transaction.on_commit(clear_destination_cache)
    else:
        clear_destination_cache()


# noinspection PyUnusedLocal
项目:django-route    作者:vinayinvicible    | 项目源码 | 文件源码
def destination_post_delete(instance, **kwargs):
    if hasattr(transaction, 'on_commit'):
        transaction.on_commit(clear_destination_cache)
    else:
        clear_destination_cache()


# noinspection PyUnusedLocal
项目:django-route    作者:vinayinvicible    | 项目源码 | 文件源码
def router_post_save(instance, **kwargs):
    if hasattr(transaction, 'on_commit'):
        transaction.on_commit(clear_router_cache)
    else:
        clear_router_cache()


# noinspection PyUnusedLocal
项目:django-route    作者:vinayinvicible    | 项目源码 | 文件源码
def router_deleted(instance, **kwargs):
    if hasattr(transaction, 'on_commit'):
        transaction.on_commit(clear_router_cache)
    else:
        clear_router_cache()
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def run(self):
        """Actually run the scheduled Spark job."""
        # if the job ran before and is still running, don't start it again
        if not self.is_runnable:
            return
        jobflow_id = self.provisioner.run(
            user_username=self.created_by.username,
            user_email=self.created_by.email,
            identifier=self.identifier,
            emr_release=self.emr_release.version,
            size=self.size,
            notebook_key=self.notebook_s3_key,
            is_public=self.is_public,
            job_timeout=self.job_timeout,
        )
        # Create new job history record.
        run = self.runs.create(
            spark_job=self,
            jobflow_id=jobflow_id,
            scheduled_at=timezone.now(),
            emr_release_version=self.emr_release.version,
            size=self.size,
        )
        # Remove the cached latest run to this objects will requery it.
        try:
            delattr(self, 'latest_run')
        except AttributeError:  # pragma: no cover
            pass  # It didn't have a `latest_run` and that's ok.

        with transaction.atomic():
            Metric.record('sparkjob-emr-version',
                          data={'version': self.emr_release.version})

        # sync with EMR API
        transaction.on_commit(run.sync)
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        """Insert the cluster into the database or update it if already
        present, spawning the cluster if it's not already spawned.
        """
        # actually start the cluster
        if self.jobflow_id is None:
            self.jobflow_id = self.provisioner.start(
                user_username=self.created_by.username,
                user_email=self.created_by.email,
                identifier=self.identifier,
                emr_release=self.emr_release.version,
                size=self.size,
                public_key=self.ssh_key.key,
            )
            # once we've stored the jobflow id we can fetch the status for the first time
            transaction.on_commit(self.sync)

            with transaction.atomic():
                Metric.record('cluster-emr-version',
                              data={'version': self.emr_release.version})

        # set the dates
        if not self.expires_at:
            # clusters should expire after the lifetime it's set to
            self.expires_at = timezone.now() + timedelta(hours=self.lifetime)

        super().save(*args, **kwargs)
项目:rohrpost    作者:axsemantics    | 项目源码 | 文件源码
def on_transaction_commit(func):
        func()
项目:Odin    作者:HackSoftware    | 项目源码 | 文件源码
def start_grader_communication(solution_id: int, solution_model: str):
    transaction.on_commit(lambda: submit_solution.delay(solution_id, solution_model))
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        with transaction.atomic():
            super().save(*args, **kwargs)
            transaction.on_commit(lambda: cache.delete(self.user_access_permission_key(self.user_id)))
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def delete(self, *args, **kwargs):
        with transaction.atomic():
            super().delete(*args, **kwargs)
            transaction.on_commit(lambda: cache.delete(self.user_access_permission_key(self.user_id)))
项目:django-rest-framework-reactive    作者:genialis    | 项目源码 | 文件源码
def model_post_delete(sender, instance, **kwargs):
    """
    Signal emitted after any model is deleted via Django ORM.

    :param sender: Model class that was deleted
    :param instance: The actual instance that was removed
    """

    def notify():
        table = sender._meta.db_table
        observer_client.notify_table_remove(table)

    transaction.on_commit(notify)
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def update_clusters(self):
    """
    Update the cluster metadata from AWS for the pending clusters.

    - To be used periodically.
    - Won't update state if not needed.
    - Will queue updating the Cluster's public IP address if needed.
    """
    # only update the cluster info for clusters that are pending
    active_clusters = Cluster.objects.active()

    # Short-circuit for no active clusters (e.g. on weekends)
    if not active_clusters.exists():
        return []

    # get the start dates of the active clusters, set to the start of the day
    # to counteract time differences between atmo and AWS and use the oldest
    # start date to limit the ListCluster API call to AWS
    oldest_created_at = active_clusters.datetimes('created_at', 'day')

    try:
        # build a mapping between jobflow ID and cluster info
        cluster_mapping = {}
        provisioner = ClusterProvisioner()
        cluster_list = provisioner.list(
            created_after=oldest_created_at[0]
        )
        for cluster_info in cluster_list:
            cluster_mapping[cluster_info['jobflow_id']] = cluster_info

        # go through pending clusters and update the state if needed
        updated_clusters = []
        for cluster in active_clusters:
            with transaction.atomic():
                info = cluster_mapping.get(cluster.jobflow_id)
                # ignore if no info was found for some reason,
                # the cluster was deleted in AWS but it wasn't deleted here yet
                if info is None:
                    continue
                # update cluster status
                cluster.sync(info)
                updated_clusters.append(cluster.identifier)

                # if not given enqueue a job to update the public IP address
                # but only if the cluster is running or waiting, so the
                # API call isn't wasted
                if (not cluster.master_address and
                        cluster.most_recent_status in cluster.READY_STATUS_LIST):
                    transaction.on_commit(
                        lambda: update_master_address.delay(cluster.id)
                    )
        return updated_clusters
    except ClientError as exc:
        self.retry(
            exc=exc,
            countdown=celery.backoff(self.request.retries),
        )
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def process_updates(cls):
        logger = logging.getLogger('c3nav')

        with transaction.atomic():
            new_updates = tuple(cls.objects.filter(processed=False).select_for_update(nowait=True))
            if not new_updates:
                return ()

            from c3nav.mapdata.utils.cache.changes import changed_geometries
            changed_geometries.reset()

            logger.info('Recalculating altitude areas...')

            from c3nav.mapdata.models import AltitudeArea
            AltitudeArea.recalculate()

            logger.info('%.3f m² of altitude areas affected.' % changed_geometries.area)

            last_processed_update = cls.objects.filter(processed=True).latest().to_tuple

            for new_update in new_updates:
                logger.info('Applying changed geometries from MapUpdate #%(id)s (%(type)s)...' %
                            {'id': new_update.pk, 'type': new_update.type})
                try:
                    new_changes = pickle.load(open(new_update._changed_geometries_filename(), 'rb'))
                except FileNotFoundError:
                    logger.warning('changed_geometries pickle file not found.')
                else:
                    logger.info('%.3f m² affected by this update.' % new_changes.area)
                    changed_geometries.combine(new_changes)
                new_update.processed = True
                new_update.save()

            logger.info('%.3f m² of geometries affected in total.' % changed_geometries.area)

            changed_geometries.save(last_processed_update, new_updates[-1].to_tuple)

            logger.info('Rebuilding level render data...')

            from c3nav.mapdata.render.renderdata import LevelRenderData
            LevelRenderData.rebuild()

            logger.info('Rebuilding router...')
            from c3nav.routing.router import Router
            Router.rebuild()

            transaction.on_commit(
                lambda: cache.set('mapdata:last_processed_updatee', new_updates[-1].totuple, 300)
            )

            return new_updates