我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用django.db.models.deletion.Collector()。
def delete(self): """ Deletes the records in the current QuerySet. """ assert self.query.can_filter(), \ "Cannot use 'limit' or 'offset' with delete." del_query = self._clone() # The delete is actually 2 queries - one to find related objects, # and one to delete. Make sure that the discovery of related # objects is performed on the same database as the deletion. del_query._for_write = True # Disable non-supported fields. del_query.query.select_for_update = False del_query.query.select_related = False del_query.query.clear_ordering(force_empty=True) collector = Collector(using=del_query.db) collector.collect(del_query) collector.delete() # Clear the result cache, in case this QuerySet gets reused. self._result_cache = None
def delete(self): """ Deletes the records in the current QuerySet. """ assert self.query.can_filter(), \ "Cannot use 'limit' or 'offset' with delete." if self._fields is not None: raise TypeError("Cannot call delete() after .values() or .values_list()") del_query = self._clone() # The delete is actually 2 queries - one to find related objects, # and one to delete. Make sure that the discovery of related # objects is performed on the same database as the deletion. del_query._for_write = True # Disable non-supported fields. del_query.query.select_for_update = False del_query.query.select_related = False del_query.query.clear_ordering(force_empty=True) collector = Collector(using=del_query.db) collector.collect(del_query) deleted, _rows_count = collector.delete() # Clear the result cache, in case this QuerySet gets reused. self._result_cache = None return deleted, _rows_count
def delete(self, using=None, keep_parents=False): using = using or router.db_for_write(self.__class__, instance=self) assert self._get_pk_val() is not None, ( "%s object can't be deleted because its %s attribute is set to None." % (self._meta.object_name, self._meta.pk.attname) ) collector = Collector(using=using) collector.collect([self], keep_parents=keep_parents) return collector.delete()
def get_context_data(self, **kwargs): context = super(DynamicDeleteView, self).get_context_data(**kwargs) collector = Collector(using='default') # or specific database collector.collect([self.object]) to_delete = collector.instances_with_model() context['to_delete_list'] = [] for x, y in to_delete: context['to_delete_list'].append((x.__name__, y,)) return context
def test_integrity_error_is_retried(self): # This doesn't simulate the actual way the failure is caused, but it # does simulate it to cause the same effect. # # The `calls` below is used to hold the context inside of the retry # loop. The first call will fail as `_clear_events` will be called, # the first retry will not call `_clear_events` which will allow the # transaction to be committed. calls = [] def _clear_events(collector): for idx, objs in enumerate(collector.fast_deletes): if len(objs) > 0 and isinstance(objs[0], Event): collector.fast_deletes[idx] = Event.objects.none() @transactional def _in_database(): node = factory.make_Node() for _ in range(10): factory.make_Event(node=node) # Use the collector directly instead of calling `delete`. collector = Collector(using="default") collector.collect([node]) if calls == 0: _clear_events(collector) calls.append(None) collector.delete() # Test is that no exception is raised. If this doesn't work then a # `django.db.utils.IntegrityError` will be raised. yield deferToDatabase(_in_database)
def delete(self, using=None): using = using or router.db_for_write(self.__class__, instance=self) assert self._get_pk_val() is not None, ( "%s object can't be deleted because its %s attribute is set to None." % (self._meta.object_name, self._meta.pk.attname) ) collector = Collector(using=using) collector.collect([self]) collector.delete()