我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用django.db.models.signals.pre_save.connect()。
def process_request(self, request): """ Gets the current user from the request and prepares and connects a signal receiver with the user already attached to it. """ # Initialize thread local storage threadlocal.actionslog = { 'signal_duid': (self.__class__, time.time()), 'remote_ip': request.META.get('REMOTE_ADDR'), } # In case of proxy, set 'original' address if request.META.get('HTTP_X_FORWARDED_FOR'): threadlocal.actionslog['remote_ip'] = request.META.get('HTTP_X_FORWARDED_FOR').split(',')[0] # Connect signal for automatic logging if hasattr(request, 'user') and hasattr(request.user, 'is_authenticated') and request.user.is_authenticated(): set_user = curry(self.set_user, request.user) pre_save.connect(set_user, sender=LogAction, dispatch_uid=threadlocal.actionslog['signal_duid'], weak=False)
def test_delete_key_old_way(env): # if pytest.config.getoption('--ci'): # pytest.skip('does not work in CI environments') from django_remote_submission.wrapper.remote import RemoteWrapper wrapper = RemoteWrapper( hostname=env.server_hostname, username=env.remote_user, port=env.server_port, ) public_key_filename = os.path.expanduser('~/.ssh/id_rsa.pub') # Connect with password drop the key with wrapper.connect(env.remote_password, public_key_filename): wrapper.deploy_key_if_it_does_not_exist() # Connect without password with wrapper.connect(): pass # delete the they with wrapper.connect(env.remote_password): wrapper.delete_key()
def create_referral(sender, request, user, *args, **kwargs): """ Get a ReferralLink ID based on clicked redirect. Use ID and created User to connect created User to ReferralLink User. """ new_personal_link = ReferralLink.objects.get_or_create(user=user)[0] referral_link_id = request.session.get("referral_link_id") created_user = user if referral_link_id: been_referred = ReferredUser.objects.filter(invitee=created_user).exists() ref_qs = ReferralLink.objects.filter(id=referral_link_id) if not been_referred and ref_qs.exists(): _link_obj = ref_qs.first() new_referral_obj = ReferredUser() new_referral_obj.invitee = created_user new_referral_obj.referrer = _link_obj.user new_referral_obj.code = _link_obj new_referral_obj.save() del request.session["referral_link_id"] if COMPLETED_MESSAGE_DISPLAY == True: complete_msg = clean_message("COMPLETED_MESSAGE", COMPLETED_MESSAGE) messages.success(request, complete_msg)
def fix_model_for_attributes(cls): post_init.connect(fixup_instance, sender=cls) pre_save.connect(attribute_model_pre_save, sender=cls) return cls
def register(model): """Register a model to the audit code. :param model: Model to register. :type model: object """ try: pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model)) post_save.connect(_post_save, sender=model, dispatch_uid=str(model)) pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model)) except Exception as e: logger.error("<Register> %s", e.message)
def register_pre_save_on_AUTH_USER_MODER_change(sender, setting, value, enter, **kwargs): if setting == "AUTH_USER_MODEL" and value != USER_MODEL: if enter: pre_save.connect(useraudit.password_expiry.user_pre_save, sender=value) else: pre_save.disconnect(useraudit.password_expiry.user_pre_save, sender=value)
def test_signal(self): def handler(sender, user=None, **kwargs): self.handler_called = True self.assertEquals(sender, type(self.user)) self.assertEquals(user, self.user) login_failure_limit_reached.connect(handler) self.handler_called = False _ = authenticate(username=self.username, password="INCORRECT") _ = authenticate(username=self.username, password="INCORRECT") login_failure_limit_reached.disconnect(handler) self.assertTrue(self.handler_called)
def contribute_to_class(self, cls, name, virtual_only=False): super().contribute_to_class(cls, name, virtual_only=False) setattr(cls, '_fsm_state_field', name) pre_save.connect( self._fsm_check_transitions, sender=cls, dispatch_uid='_fsm_check_transitions')
def register_signals(): pre_save.connect(LostKey.objects.on_pre_save, sender=LostKey) post_save.connect(LostKey.objects.send_notification, sender=LostKey) post_save.connect( Member.objects.send_transfer_workspace_key_info, sender=Member)
def ready(self): from icekit.plugins.oembed_with_caption.models import OEmbedWithCaptionItem pre_save.connect(handle_soundcloud_malformed_widths_for_oembeds, sender=OEmbedWithCaptionItem)
def ready(self): from fluent_contents.plugins.oembeditem.models import OEmbedItem pre_save.connect(handle_soundcloud_malformed_widths_for_oembeds, sender=OEmbedItem)
def job_model_saved(mocker): from django.db.models.signals import pre_save from django_remote_submission.models import Job mock = mocker.Mock() pre_save.connect(mock, sender=Job) yield mock pre_save.disconnect(mock, sender=Job)
def connect(cls): pre_save.connect(on_pre_save, cls) post_save.connect(on_post_save, cls)
def cleanup(cls): uid = cls.__name__ pre_delete.connect(receiver=on_delete, sender=cls, dispatch_uid=uid) pre_save.connect(receiver=on_save, sender=cls, dispatch_uid=uid) return cls
def _get_latest_changeset(self): try: return self.change_sets.latest() except ChangeSet.DoesNotExist: return None # connect the `pre_save` event to the subscribers for tracking changes. we make use of `dispatch_uid` so the # event is not connected twice.
def register(): pre_save.connect(check_status_change, sender=Segment)
def _run_listen(): post_save.connect(run_watchers.post_run_saved, sender=TestRun) post_save.connect(run_watchers.post_case_run_saved, sender=TestCaseRun, dispatch_uid='tcms.testruns.models.TestCaseRun') post_delete.connect(run_watchers.post_case_run_deleted, sender=TestCaseRun, dispatch_uid='tcms.testruns.models.TestCaseRun') pre_save.connect(run_watchers.pre_save_clean, sender=TestRun)
def _listen(): post_save.connect(plan_watchers.on_plan_save, TestPlan) pre_delete.connect(plan_watchers.on_plan_delete, TestPlan) pre_save.connect(plan_watchers.pre_save_clean, TestPlan)
def _listen(): """ signals listen """ # case save/delete listen for email notify post_save.connect(case_watchers.on_case_save, TestCase) post_delete.connect(case_watchers.on_case_delete, TestCase) pre_save.connect(case_watchers.pre_save_clean, TestCase)
def contribute_to_class(self, cls, name, *args, **kwargs): super(SwapIntegerField, self).contribute_to_class(cls, name) for constraint in cls._meta.unique_together: if self.name in constraint: raise TypeError("%s can't be part of a unique constraint." % self.__class__.__name__) pre_save.connect(self.get_swap_objects, sender=cls) post_save.connect(self.save_swap_objects, sender=cls)
def test_deploy_and_delete_key(env): ''' This is the new way of deploying and deleting the private key ''' from django_remote_submission.tasks import ( copy_key_to_server, delete_key_from_server ) from django_remote_submission.wrapper.remote import RemoteWrapper copy_key_to_server( username=env.remote_user, password=env.remote_password, hostname=env.server_hostname, port=env.server_port, public_key_filename=None, remote=runs_remotely, ) # This wrapper is just for testing # Note that no password is passed! wrapper = RemoteWrapper( hostname=env.server_hostname, username=env.remote_user, port=env.server_port, ) # Connect without password with wrapper.connect(): pass delete_key_from_server( username=env.remote_user, password=env.remote_password, hostname=env.server_hostname, port=env.server_port, public_key_filename=None, remote=runs_remotely, ) with pytest.raises(ValueError, message="incorrect public key"): # Connect without password fails! with wrapper.connect(): pass