我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models()。
def emit_post_migrate_signal(created_models, verbosity, interactive, db): # Emit the post_migrate signal for every application. for app_config in apps.get_app_configs(): if app_config.models_module is None: continue if verbosity >= 2: print("Running post-migrate handlers for application %s" % app_config.label) models.signals.post_migrate.send( sender=app_config, app_config=app_config, verbosity=verbosity, interactive=interactive, using=db) # For backwards-compatibility -- remove in Django 1.9. models.signals.post_syncdb.send( sender=app_config.models_module, app=app_config.models_module, created_models=created_models, verbosity=verbosity, interactive=interactive, db=db)
def db_type(self, connection): """ Override db_type(). Remember that the parent CharField checks max_length for validity in the CharField.check() method. If max_length is None or is not an integer, a check framework error is issued. It is therefore unnecessary to test for max_length value validity. See: https://docs.djangoproject.com/en/dev/ref/models/fields/#django.db.models.Field.db_type https://docs.djangoproject.com/en/dev/ref/checks/ https://github.com/django/django/blob/master/django/db/models/fields/__init__.py """ type_spec = [] db_type_format = 'CHAR({!s})' db_type_default_format = 'DEFAULT {!s}' type_spec.append(db_type_format.format(self.max_length)) if self.has_default(): default_value = self._get_db_type_default_value(self.get_default(), connection) type_spec.append(db_type_default_format.format(default_value)) return ' '.join(type_spec)
def get_ts_model_class_name(**kwargs): """ Generate the model class name for use in TimestampField tests. Returns: str: The model class name. """ return get_model_class_name(TS_MODEL_CLASS_NAME_PREFIX, **kwargs) ####################### # Test Configurations # ####################### # Django ORM doesn't permit value omission for fields in insert/update and so attempts to insert # NULL when no value is provided for field on model (django.db.models.NOT_PROVIDED). This affects # expected results of insert operations, most commonly by failing to emit IntegrityError and # returning a None value instead. # Configurations for FixedCharField tests. # Note that empty string insert values are not tested. PostgreSQL returns an empty string of # max_length while MySQL and SQLite return an empty string.
def make_cropping_field(cls, size_key, origin_field_name="image", verbose_name=None, help_text=None): """ Creates a image_cropping.fields.ImageRatioField that allows cropping of images in the admin forms. NB: make sure to call self.crop_image_if_needed in the save method of models using these fields """ size = cls._image_sizes[size_key] if not verbose_name: verbose_name = " ".join(map(capfirst, size_key.split("_"))) verbose_name = "%s crop" % verbose_name if not help_text: help_text = "Crops image to %s×%s" % (size[0], size[1]) return ImageRatioField( image_field=origin_field_name, size="%sx%s" % (size[0], size[1]), size_warning=True, verbose_name=verbose_name, help_text=help_text, )
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. newsletter, _is_new = orm.SubscriptionType.objects.get_or_create(name="Newsletter") third_party, _is_new = orm.SubscriptionType.objects.get_or_create(name="Partenaires") for contact in orm.Contact.objects.all(): for (attr, s_t) in (('accept_3rdparty', third_party), ('accept_newsletter', newsletter)): if getattr(contact, attr): s, is_new = orm.Subscription.objects.get_or_create(subscription_type=s_t, contact=contact) if is_new: s.accept_subscription=True s.subscription_date=datetime.datetime.now() s.save()
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. staff = {} for staff_user in orm['auth.User'].objects.filter(is_staff=True).exclude(last_name=''): staff[staff_user.username] = orm.TeamMember.objects.create( user=staff_user, name=u'{0} {1}'.format(staff_user.first_name, staff_user.last_name) ) for action in orm.Action.objects.filter(in_charge_backup__isnull=False): try: action.in_charge = staff[action.in_charge_backup.username] action.save() except KeyError: pass
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications for category in orm['Store.StoreItemCategory'].objects.all(): category.name = category.name.strip() category.save() for brand in orm['Store.Brand'].objects.all(): brand.name = brand.name.strip() brand.save() for item in orm['Store.StoreItem'].objects.all(): item.name = item.name.strip() item.save()
def sql_flush(style, connection, only_django=False, reset_sequences=True, allow_cascade=False): """ Returns a list of the SQL statements used to flush the database. If only_django is True, then only table names that have associated Django models and are in INSTALLED_APPS will be included. """ if only_django: tables = connection.introspection.django_table_names(only_existing=True, include_views=False) else: tables = connection.introspection.table_names(include_views=False) seqs = connection.introspection.sequence_list() if reset_sequences else () statements = connection.ops.sql_flush(style, tables, seqs, allow_cascade) return statements
def sql_indexes(app_config, style, connection): "Returns a list of the CREATE INDEX SQL statements for all models in the given app." check_for_migrations(app_config, connection) output = [] for model in router.get_migratable_models(app_config, connection.alias, include_auto_created=True): output.extend(connection.creation.sql_indexes_for_model(model, style)) return output
def sql_destroy_indexes(app_config, style, connection): "Returns a list of the DROP INDEX SQL statements for all models in the given app." check_for_migrations(app_config, connection) output = [] for model in router.get_migratable_models(app_config, connection.alias, include_auto_created=True): output.extend(connection.creation.sql_destroy_indexes_for_model(model, style)) return output
def custom_sql_for_model(model, style, connection): opts = model._meta app_dirs = [] app_dir = apps.get_app_config(model._meta.app_label).path app_dirs.append(os.path.normpath(os.path.join(app_dir, 'sql'))) # Deprecated location -- remove in Django 1.9 old_app_dir = os.path.normpath(os.path.join(app_dir, 'models/sql')) if os.path.exists(old_app_dir): warnings.warn("Custom SQL location '<app_label>/models/sql' is " "deprecated, use '<app_label>/sql' instead.", RemovedInDjango19Warning) app_dirs.append(old_app_dir) output = [] # Post-creation SQL should come before any initial SQL data is loaded. # However, this should not be done for models that are unmanaged or # for fields that are part of a parent model (via model inheritance). if opts.managed: post_sql_fields = [f for f in opts.local_fields if hasattr(f, 'post_create_sql')] for f in post_sql_fields: output.extend(f.post_create_sql(style, model._meta.db_table)) # Find custom SQL, if it's available. backend_name = connection.settings_dict['ENGINE'].split('.')[-1] sql_files = [] for app_dir in app_dirs: sql_files.append(os.path.join(app_dir, "%s.%s.sql" % (opts.model_name, backend_name))) sql_files.append(os.path.join(app_dir, "%s.sql" % opts.model_name)) for sql_file in sql_files: if os.path.exists(sql_file): with io.open(sql_file, encoding=settings.FILE_CHARSET) as fp: output.extend(connection.ops.prepare_sql_script(fp.read(), _allow_fallback=True)) return output
def db_type(self, connection): """ Override the db_type method. Type spec additions for self.null are not needed. Django magically appends NULL or NOT NULL to the end of the generated SQL. See: https://github.com/django/django/blob/master/django/db/backends/base/schema.py BaseDatabaseSchemaEditor.column_sql Note that returning None from this method will cause Django to simply skip this field in its generated CREATE TABLE statements. This allows one to define the field manually outside of the ORM, a feature that may prove useful in the future. See: https://github.com/django/django/blob/master/django/db/models/fields/__init__.py https://docs.djangoproject.com/en/dev/howto/custom-model-fields/#useful-methods """ engine = connection.settings_dict['ENGINE'] if engine == 'django.db.backends.mysql': db_type = self._db_type_mysql(connection) elif engine == 'django.db.backends.postgresql': db_type = self._db_type_postgresql(connection) elif engine == 'django.db.backends.sqlite3': db_type = self._db_type_sqlite(connection) else: db_type = super().db_type(connection) return db_type
def deconstruct(self): """ Override the deconstruct method to ensure auto_now_update value is preserved. See: https://docs.djangoproject.com/en/dev/ref/models/fields/#django.db.models.Field.deconstruct """ name, path, args, kwargs = super().deconstruct() if self.auto_now_update: kwargs['auto_now_update'] = True return (name, path, args, kwargs)
def _test_insert_dict(self, db_alias, model_class, model_attr, insert_value, expected_value): """ Test INSERT and SELECT field attribute values from a FieldTestConfig. Inserts the test values for the given model, selects the value from the database, and compares the result with the expected value. Interprets the FieldTestCase.insert_values_dict and runs assertions based on the type and value of the data retrieved from the database after a successful insert. Args: db_alias (str): The string key under which a database configuration is defined. Usable in django.conf.settings or directly through django.db.connections. model_class (class): The class of the model that will issue the insert. model_attr (str): The model attribute through which the test field may be accessed. insert_value: The value to save in the new model instance's attribute. expected_value: The value that is expected to be retrieved from the database after a successful save() call. """ if insert_value is django.db.models.NOT_PROVIDED: model = model_class() else: model_kwargs = {model_attr: insert_value} model = model_class(**model_kwargs) class_expected = inspect.isclass(expected_value) if class_expected and issubclass(expected_value, Exception): self.assertRaises(expected_value, model.save, using=db_alias) else: model.save(using=db_alias) retrieved_record_model = model_class.objects.using(db_alias).get(id=model.id) retrieved_value = getattr(retrieved_record_model, model_attr) if class_expected: retrieved_value = retrieved_value.__class__ self.assertEqual(retrieved_value, expected_value)
def get_model_class_name(prefix, **kwargs): """ Create a string for use as a dynamic class name. When testing all permutations of field keyword arguments in model classes, this function is used with the built-in function type() to dynamically generate the new model class' unique name. Subsequently, this function is called by the test suites to deterministically return a specific model class name based on desired field class configuration (kwargs). See sample usage of this function in the tests.models module. This algorithm is somewhat brittle. Not sure I like it. Args: prefix (str): The class name prefix. kwargs: The keyword args that would be passed to the field in the test model class. See: https://docs.python.org/3/library/functions.html#type """ kwargs_strings = [] for key, value in kwargs.items(): key_string = str(key).replace('_', '').title() if isinstance(value, datetime.datetime): value_string = 'Datetime' else: value_string = re.sub(r'[\s:\-\.]', '', str(value)).title() kwargs_strings.append(key_string + value_string) suffix = ''.join(kwargs_strings) return prefix + suffix
def make_autocropping_field(cls, width, height, upload_to="images", processor="fit", image_format="JPEG", matte_colour=(255, 255, 255, 255), **kwargs): """ Creates an imagekit.models.fields.ProcessedImageField that automatically scales and crops images """ assert processor in ("fit", "fill"), "invalid processor" if processor == "fill": processors = [ResizeToFill(width, height)] else: processors = [ResizeToFit(width, height, mat_color=matte_colour)] assert image_format in ("JPEG", "PNG"), "invalid image_format" if image_format == "PNG": image_options = {"optimize": True} else: image_options = {"quality": IMAGE_QUALITY} help_text = "Automatically scaled to %s×%s" % (width, height) help_text_additional = kwargs.pop("help_text", None) if help_text_additional: help_text = "%s. %s" % (help_text_additional, help_text) return ProcessedImageField( processors=processors, format=image_format, options=image_options, upload_to=upload_to, help_text=help_text, **kwargs )
def _crop_image(cls, origin_image, target_image, size, crop_box, image_format="JPEG"): """ Resizes an image from one model field and saves into another :param origin_image: django.db.models.fields.files.ImageFieldFile :param target_image: django.db.models.fields.files.ImageFieldFile :param size: tuple of final desired width and height :param crop_box: str, 4-coordinate crop box :param image_format: str, Pillow Image format """ # Original photo origin_image.seek(0) image_file = Image.open(origin_image) # Convert to RGB if image_file.mode not in ("L", "RGB"): image_file = image_file.convert("RGB") if crop_box: try: values = [int(x) for x in crop_box.split(",")] width = abs(values[2] - values[0]) height = abs(values[3] - values[1]) if width and height and (width != image_file.size[0] or height != image_file.size[1]): image_file = image_file.crop(values) except (ValueError, TypeError, IndexError): # There's garbage in the cropping field, ignore print("Unable to parse crop_box parameter value '%s'. Ignoring." % crop_box) image_file = ImageOps.fit(image_file, size, method=Image.LANCZOS) image_content = BytesIO() image_file.save(image_content, format=image_format, quality=IMAGE_QUALITY) image_content = ImageFile(image_content, origin_image.name) target_image.save(name=image_content.name, content=image_content, save=False)
def _get_aggregate(params): return getattr(models, params.get('aggregate', 'Count'))
def _get_aggregate(params): aggregate = params.get('aggregate') if aggregate == 'Week': aggregate = 'WeekCount' return getattr(models, aggregate)
def generate(self): data = [] models = apps.get_app_config(settings.FOREST_APP_NAME).get_models() for model in models: data.append(self.get_schema(model)) data += self.smart_collections self.apimap = self.serialize(data) return self.apimap
def forward(apps, schema_editor): """ Fills the organization_name field of the following models: * ``openwisp_controller.pki.Ca`` * ``openwisp_controller.pki.Cert`` """ if not schema_editor.connection.alias == 'default': return from ..models import Ca, Cert for model in [Ca, Cert]: for obj in model.objects.all(): obj.organization_name = obj.x509.get_subject().organizationName or '' obj.save()
def handle_pre_delete(self, sender, instance, **kwargs): """Handle removing of instance object from related models instance. We need to do this before the real delete otherwise the relation doesn't exists anymore and we can't get the related models instance. """ registry.delete_related(instance)
def setup(self): # Listen to all model saves. models.signals.post_save.connect(self.handle_save) models.signals.post_delete.connect(self.handle_delete) # Use to manage related objects update models.signals.m2m_changed.connect(self.handle_m2m_changed) models.signals.pre_delete.connect(self.handle_pre_delete)
def teardown(self): # Listen to all model saves. models.signals.post_save.disconnect(self.handle_save) models.signals.post_delete.disconnect(self.handle_delete) models.signals.m2m_changed.disconnect(self.handle_m2m_changed) models.signals.pre_delete.disconnect(self.handle_pre_delete)
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. for g in orm['Crm.Group'].objects.all(): g.save() #It should strip the names
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. for action in orm.Action.objects.filter(in_charge__isnull=False): action.in_charge_backup = action.in_charge action.save()
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. for subscription_type in orm.SubscriptionType.objects.all(): all_sites = list(orm["sites.Site"].objects.all()) subscription_type.sites.add(*all_sites) subscription_type.save()
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. for entity in orm.Entity.objects.filter(is_single_contact=True): entity.save()
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. for a in orm.Action.objects.all(): if a.contact: a.contacts.add(a.contact) if a.entity: a.entities.add(a.entity) a.save()
def forwards(self, orm): "Write your forwards methods here." # Note: Don't use "from appname.models import ModelName". # Use orm.ModelName to refer to models in this application, # and orm['appname.ModelName'] for models in other applications. for emailing in orm.Emailing.objects.all(): if not emailing.subscription_type: emailing.subscription_type = orm['Crm.SubscriptionType'].objects.all()[0] emailing.save()