我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connection()。
def test_for_missing_migrations(self): """Checks if there're models changes which aren't reflected in migrations.""" current_models_state = ProjectState.from_apps(apps) # skip test models current_models_state.remove_model('onfido', 'testbasemodel') current_models_state.remove_model('onfido', 'testbasestatusmodel') migrations_loader = MigrationExecutor(connection).loader migrations_detector = MigrationAutodetector( from_state=migrations_loader.project_state(), to_state=current_models_state ) # import pdb; pdb.set_trace() if migrations_detector.changes(graph=migrations_loader.graph): self.fail( 'Your models have changes that are not yet reflected ' 'in a migration. You should add them now.' )
def setUp(self): assert self.migrate_from and self.migrate_to, \ "TestCase '{}' must define migrate_from and migrate_to properties".format(type(self).__name__) migrate_from = [(self.app, self.migrate_from)] migrate_to = [(self.app, self.migrate_to)] executor = MigrationExecutor(connection) old_apps = executor.loader.project_state(migrate_from).apps # Reverse to the original migration executor.migrate(migrate_from) self.setUpBeforeMigration(old_apps) # Run the migration to test executor = MigrationExecutor(connection) executor.loader.build_graph() # reload. executor.migrate(migrate_to) self.apps = executor.loader.project_state(migrate_to).apps
def _check_sql_mode(self, **kwargs): with self.connection.cursor() as cursor: cursor.execute("SELECT @@sql_mode") sql_mode = cursor.fetchone() modes = set(sql_mode[0].split(',') if sql_mode else ()) if not (modes & {'STRICT_TRANS_TABLES', 'STRICT_ALL_TABLES'}): return [checks.Warning( "MySQL Strict Mode is not set for database connection '%s'" % self.connection.alias, hint="MySQL's Strict Mode fixes many data integrity problems in MySQL, " "such as data truncation upon insertion, by escalating warnings into " "errors. It is strongly recommended you activate it. See: " "https://docs.djangoproject.com/en/%s/ref/databases/#mysql-sql-mode" % (get_docs_version(),), id='mysql.W002', )] return []
def setUp(self): class TextDocument(models.Model): body = models.TextField() other = models.TextField() search = SearchVectorField([ WeightedColumn('body', 'D'), ], 'english') class TextDocumentLanguageColumn(models.Model): body = models.TextField() lang = models.TextField(null=True) search = SearchVectorField([ WeightedColumn('body', 'D'), ], language_column='lang', language='english') with DatabaseSchemaEditor(connection) as schema_editor: schema_editor.create_model(TextDocument) schema_editor.create_model(TextDocumentLanguageColumn) self.create = TextDocument.objects.create self.lang = TextDocumentLanguageColumn.objects.create
def setUp(self): class TextDocument(models.Model): title = models.CharField(max_length=128) body = models.TextField() search = SearchVectorField([ WeightedColumn('title', 'A'), WeightedColumn('body', 'D'), ], 'english') with DatabaseSchemaEditor(connection) as schema_editor: schema_editor.create_model(TextDocument) TextDocument.objects.create( title="My hovercraft is full of eels.", body="Spam! Spam! Spam! Spam! Spam! Spam!", ) TextDocument.objects.create( title="Spam! Spam! Spam! Spam! Spam! Spam!", body="My hovercraft is full of eels." ) self.objects = TextDocument.objects
def test_validate_bearer_token_should_not_reach_db_when_cached( self, access_token, validator, http_request, scopes ): db_result = self._warm_up_cache( validator, access_token.token, scopes, http_request ) with CaptureQueriesContext(connection) as context: cached_result = validator.validate_bearer_token( access_token.token, scopes, http_request ) assert len(context.captured_queries) == 0 assert db_result == cached_result
def test_no_reverse_match_docs(self): """ Test whether the docs URL in old notfications are converted to their new location. """ Notification.objects.create( message='Fake', redirect_to='frontend:docs', # Non-existing legacy. ) # This SHOULD crash. response = self.client.get(reverse('frontend:dashboard')) self.assertEqual(response.status_code, 500) # Now we fake applying the migration (again for this test). MigrationRecorder.Migration.objects.filter(app='dsmr_frontend', name='0009_docs_no_reverse_match').delete() MigrationExecutor(connection=connection).migrate([(self.app, '0009_docs_no_reverse_match')]) # The error should be fixed now. response = self.client.get(reverse('frontend:dashboard')) self.assertEqual(response.status_code, 200)
def test_next_sync_setting_retroactive(self): """ Test whether the migration can also handle existing data. """ now = timezone.now().replace(microsecond=0) TemperatureReading.objects.create( read_at=now + timezone.timedelta(hours=1), degrees_celcius=20, ) TemperatureReading.objects.create( read_at=now, degrees_celcius=20, ) self.assertIsNone(WeatherSettings.get_solo().next_sync) # Now we fake applying the migration (again for this test). MigrationRecorder.Migration.objects.filter( app='dsmr_weather', name='0004_next_sync_setting_retroactive' ).delete() MigrationExecutor(connection=connection).migrate([(self.app, '0004_next_sync_setting_retroactive')]) # When having existing data, next_sync should be based on latest reading. self.assertEqual(WeatherSettings.get_solo().next_sync, now + timezone.timedelta(hours=2))
def test_for_missing_migrations(self): """Checks if there're models changes which aren't reflected in migrations.""" current_models_state = ProjectState.from_apps(apps) # skip tracking changes for TestModel current_models_state.remove_model('elasticsearch_django', 'testmodel') migrations_loader = MigrationExecutor(connection).loader migrations_detector = MigrationAutodetector( from_state=migrations_loader.project_state(), to_state=current_models_state ) if migrations_detector.changes(graph=migrations_loader.graph): self.fail( 'Your models have changes that are not yet reflected ' 'in a migration. You should add them now.' )
def _get_default(self): """Cargo-cult of Django's `Field.get_default`. Django is totally smoking crack on this one. It forces a unicode string out of the default which is demonstrably not unicode. This corrects that behaviour. """ if self.has_default(): if callable(self.default): return self.default() return self.default if not self.empty_strings_allowed: return None if self.null: if not connection.features.interprets_empty_strings_as_nulls: return None return b""
def handle(self, *args, **options): from django.db import connection connection = connection if options.get('apps', None): migrations_applied = recorder.MigrationRecorder(connection).\ Migration.objects.filter(app__in=options['apps']) else: migrations_applied = recorder.MigrationRecorder( connection).Migration.objects.all() for i in migrations_applied: delete_migration_file(get_migration_files(i.app, i.name)) message = "Deleting migration records from db" print message migrations_applied.delete()
def handle(self, *args, **options): config = get_config() with chdir(SQLIBRIST_DIRECTORY): try: options['func'](Args(options), config, connection) except SqlibristException as e: handle_exception(e)
def add_spatial_version_related_fields(sender, **kwargs): """ Adds fields after establishing a database connection to prevent database operations at compile time. """ if connection_created.disconnect(add_spatial_version_related_fields, sender=DatabaseWrapper): spatial_version = connection.ops.spatial_version[0] if spatial_version >= 4: SpatialiteSpatialRefSys.add_to_class('srtext', models.CharField(max_length=2048)) SpatialiteGeometryColumns.add_to_class('type', models.IntegerField(db_column='geometry_type')) else: SpatialiteGeometryColumns.add_to_class('type', models.CharField(max_length=30))
def check_field(self, field, **kwargs): """ MySQL has the following field length restriction: No character (varchar) fields can have a length exceeding 255 characters if they have a unique index on them. """ from django.db import connection errors = super(DatabaseValidation, self).check_field(field, **kwargs) # Ignore any related fields. if getattr(field, 'remote_field', None) is None: field_type = field.db_type(connection) # Ignore any non-concrete fields if field_type is None: return errors if (field_type.startswith('varchar') # Look for CharFields... and field.unique # ... that are unique and (field.max_length is None or int(field.max_length) > 255)): errors.append( checks.Error( ('MySQL does not allow unique CharFields to have a max_length > 255.'), hint=None, obj=field, id='mysql.E001', ) ) return errors
def db_type(self, connection): # By default related field will not have a column as it relates to # columns from another table. return None
def get_extra_restriction(self, where_class, alias, related_alias): """ Return a pair condition used for joining and subquery pushdown. The condition is something that responds to as_sql(compiler, connection) method. Note that currently referring both the 'alias' and 'related_alias' will not work in some conditions, like subquery pushdown. A parallel method is get_extra_descriptor_filter() which is used in instance.fieldname related object fetching. """ return None
def get_db_prep_save(self, value, connection): if value is None or (value == '' and (not self.target_field.empty_strings_allowed or connection.features.interprets_empty_strings_as_nulls)): return None else: return self.target_field.get_db_prep_save(value, connection=connection)
def db_type(self, connection): # The database column type of a ForeignKey is the column type # of the field to which it points. An exception is if the ForeignKey # points to an AutoField/PositiveIntegerField/PositiveSmallIntegerField, # in which case the column type is simply that of an IntegerField. # If the database needs similar types for key fields however, the only # thing we can do is making AutoField an IntegerField. rel_field = self.target_field if (isinstance(rel_field, AutoField) or (not connection.features.related_fields_match_type and isinstance(rel_field, (PositiveIntegerField, PositiveSmallIntegerField)))): return IntegerField().db_type(connection=connection) return rel_field.db_type(connection=connection)
def db_parameters(self, connection): return {"type": self.db_type(connection), "check": []}
def convert_empty_strings(self, value, expression, connection, context): if (not value) and isinstance(value, six.string_types): return None return value
def get_db_converters(self, connection): converters = super(ForeignKey, self).get_db_converters(connection) if connection.features.interprets_empty_strings_as_nulls: converters += [self.convert_empty_strings] return converters
def _get_m2m_db_table(self, opts): """ Function that can be curried to provide the m2m table name for this relation. """ if self.remote_field.through is not None: return self.remote_field.through._meta.db_table elif self.db_table: return self.db_table else: return utils.truncate_name('%s_%s' % (opts.db_table, self.name), connection.ops.max_name_length())
def db_parameters(self, connection): return {"type": None, "check": None}
def check_field(self, field, **kwargs): """ MySQL has the following field length restriction: No character (varchar) fields can have a length exceeding 255 characters if they have a unique index on them. """ # Django 1.7 errors = super(DatabaseValidation, self).check_field(field, **kwargs) # Ignore any related fields. if getattr(field, 'rel', None) is None: field_type = field.db_type(connection) if field_type is None: return errors if (field_type.startswith('varchar') # Look for CharFields... and field.unique # ... that are unique and (field.max_length is None or int(field.max_length) > 255)): errors.append( checks.Error( ('MySQL does not allow unique CharFields to have a ' 'max_length > 255.'), hint=None, obj=field, id='mysql.E001', ) ) return errors
def test_db_parameters_respects_db_type_filefield(self): f = EncryptedFileField() self.assertEqual(f.db_parameters(connection)['type'], 'varchar(100)')
def test_db_parameters_respects_db_type_imagefilefield(self): f = EncryptedImageField() self.assertEqual(f.db_parameters(connection)['type'], 'varchar(100)')
def test_lbheartbeat_makes_no_db_queries(dockerflow_middleware, rf): queries = CaptureQueriesContext(connection) request = rf.get('/__lbheartbeat__') with queries: response = dockerflow_middleware.process_request(request) assert response.status_code == 200 assert len(queries) == 0
def test_check_database_connected_cannot_connect(mocker): ensure_connection = mocker.patch('django.db.connection.ensure_connection') ensure_connection.side_effect = OperationalError errors = checks.check_database_connected([]) assert len(errors) == 1 assert errors[0].id == checks.ERROR_CANNOT_CONNECT_DATABASE
def test_check_database_connected_misconfigured(mocker): ensure_connection = mocker.patch('django.db.connection.ensure_connection') ensure_connection.side_effect = ImproperlyConfigured errors = checks.check_database_connected([]) assert len(errors) == 1 assert errors[0].id == checks.ERROR_MISCONFIGURED_DATABASE
def test_check_database_connected_unsuable(mocker): mocker.patch('django.db.connection.is_usable', return_value=False) errors = checks.check_database_connected([]) assert len(errors) == 1 assert errors[0].id == checks.ERROR_UNUSABLE_DATABASE
def is_expired(self): # get postgres timestamp to determine if it's expired if not self.ttl: return False now = dbutils.get_db_time(connection) return (now - self.updated).total_seconds() > self.ttl
def inspect_db(dbname=''): """Debugging tool: print the table names and available models for that db.""" from django.db import connections, connection if dbname: tables = connections[dbname].introspection.table_names() seen_models = connections[dbname].introspection.installed_models(tables) else: tables = connection.introspection.table_names() seen_models = connection.introspection.installed_models(tables) print('Tables: ',tables) print('Models:', seen_models) return tables, seen_models
def add_db_to_settings(dbname, filename, gemini_path=GEMINI_DB_PATH): """Add a new db to settings.DATABASES""" connection = { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': join(gemini_path, filename) } settings.DATABASES[dbname] = connection connections.databases[dbname] = connection logger.debug("(+) Adding connection '{}'".format(dbname))
def remove_db_from_settings(dbname): """Remove that connection from settings.DATABASES and connections.databases.""" settings.DATABASES.pop(dbname, None) connections.databases.pop(dbname, None)
def check_field(self, field, **kwargs): """ MySQL has the following field length restriction: No character (varchar) fields can have a length exceeding 255 characters if they have a unique index on them. """ from django.db import connection errors = super(DatabaseValidation, self).check_field(field, **kwargs) # Ignore any related fields. if getattr(field, 'rel', None) is None: field_type = field.db_type(connection) # Ignore any non-concrete fields if field_type is None: return errors if (field_type.startswith('varchar') # Look for CharFields... and field.unique # ... that are unique and (field.max_length is None or int(field.max_length) > 255)): errors.append( checks.Error( ('MySQL does not allow unique CharFields to have a max_length > 255.'), hint=None, obj=field, id='mysql.E001', ) ) return errors
def check_field(self, field, **kwargs): """ MySQL has the following field length restriction: No character (varchar) fields can have a length exceeding 255 characters if they have a unique index on them. """ from django.db import connection errors = super(DatabaseValidation, self).check_field(field, **kwargs) # Ignore any related fields. if getattr(field, 'remote_field', None) is None: field_type = field.db_type(connection) # Ignore any non-concrete fields if field_type is None: return errors if (field_type.startswith('varchar') and field.unique and (field.max_length is None or int(field.max_length) > 255)): errors.append( checks.Error( 'MySQL does not allow unique CharFields to have a max_length > 255.', obj=field, id='mysql.E001', ) ) return errors