我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connections()。
def test_find_unsafe_migrations(self): conn = connections[DEFAULT_DB_ALIAS] result = migration_utils.find_unsafe_migrations(conn) assert len(result) == 0 self.add_django_app('tests.unsafe_migrations') result = migration_utils.find_unsafe_migrations(conn) assert len(result) == 2 unsafemodel_field_added = result[0] assert unsafemodel_field_added.app_name == 'unsafe_migrations' assert unsafemodel_field_added.migration_name == '0002_unsafemodel_field_added' assert len(unsafemodel_field_added.offending_operations) == 1 unsafemodel_kitchen_sink = result[1] assert unsafemodel_kitchen_sink.app_name == 'unsafe_migrations' assert unsafemodel_kitchen_sink.migration_name == '0003_unsafemodel_kitchen_sink' assert len(unsafemodel_kitchen_sink.offending_operations) == 5
def has_key(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) if settings.USE_TZ: now = datetime.utcnow() else: now = datetime.now() now = now.replace(microsecond=0) with connection.cursor() as cursor: cursor.execute("SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table, [key, connection.ops.adapt_datetimefield_value(now)]) return cursor.fetchone() is not None
def scale(self, x, y, z=0.0, **kwargs): """ Scales the geometry to a new size by multiplying the ordinates with the given x,y,z scale factors. """ if connections[self.db].ops.spatialite: if z != 0.0: raise NotImplementedError('SpatiaLite does not support 3D scaling.') s = {'procedure_fmt': '%(geo_col)s,%(x)s,%(y)s', 'procedure_args': {'x': x, 'y': y}, 'select_field': GeomField(), } else: s = {'procedure_fmt': '%(geo_col)s,%(x)s,%(y)s,%(z)s', 'procedure_args': {'x': x, 'y': y, 'z': z}, 'select_field': GeomField(), } return self._spatial_attribute('scale', s, **kwargs)
def translate(self, x, y, z=0.0, **kwargs): """ Translates the geometry to a new location using the given numeric parameters as offsets. """ if connections[self.db].ops.spatialite: if z != 0.0: raise NotImplementedError('SpatiaLite does not support 3D translation.') s = {'procedure_fmt': '%(geo_col)s,%(x)s,%(y)s', 'procedure_args': {'x': x, 'y': y}, 'select_field': GeomField(), } else: s = {'procedure_fmt': '%(geo_col)s,%(x)s,%(y)s,%(z)s', 'procedure_args': {'x': x, 'y': y, 'z': z}, 'select_field': GeomField(), } return self._spatial_attribute('translate', s, **kwargs)
def is_nullable(self, field): """ A helper to check if the given field should be treated as nullable. Some backends treat '' as null and Django treats such fields as nullable for those backends. In such situations field.null can be False even if we should treat the field as nullable. """ # We need to use DEFAULT_DB_ALIAS here, as QuerySet does not have # (nor should it have) knowledge of which connection is going to be # used. The proper fix would be to defer all decisions where # is_nullable() is needed to the compiler stage, but that is not easy # to do currently. if ((connections[DEFAULT_DB_ALIAS].features.interprets_empty_strings_as_nulls) and field.empty_strings_allowed): return True else: return field.null
def _init_worker(counter): """ Switch to databases dedicated to this worker. This helper lives at module-level because of the multiprocessing module's requirements. """ global _worker_id with counter.get_lock(): counter.value += 1 _worker_id = counter.value for alias in connections: connection = connections[alias] settings_dict = connection.creation.get_test_db_clone_settings(_worker_id) # connection.settings_dict must be updated in place for changes to be # reflected in django.db.connections. If the following line assigned # connection.settings_dict = settings_dict, new threads would connect # to the default database instead of the appropriate clone. connection.settings_dict.update(settings_dict) connection.close()
def _cull(self, db, cursor, now): if self._cull_frequency == 0: self.clear() else: connection = connections[db] table = connection.ops.quote_name(self._table) cursor.execute("DELETE FROM %s WHERE expires < %%s" % table, [connection.ops.adapt_datetimefield_value(now)]) cursor.execute("SELECT COUNT(*) FROM %s" % table) num = cursor.fetchone()[0] if num > self._max_entries: cull_num = num // self._cull_frequency cursor.execute( connection.ops.cache_key_culling_sql() % table, [cull_num]) cursor.execute("DELETE FROM %s " "WHERE cache_key < %%s" % table, [cursor.fetchone()[0]])
def _geomset_attribute(self, func, geom, tolerance=0.05, **kwargs): """ DRY routine for setting up a GeoQuerySet method that attaches a Geometry attribute and takes a Geoemtry parameter. This is used for geometry set-like operations (e.g., intersection, difference, union, sym_difference). """ s = { 'geom_args': ('geom',), 'select_field': GeomField(), 'procedure_fmt': '%(geo_col)s,%(geom)s', 'procedure_args': {'geom': geom}, } if connections[self.db].ops.oracle: s['procedure_fmt'] += ',%(tolerance)s' s['procedure_args']['tolerance'] = tolerance return self._spatial_attribute(func, s, **kwargs)
def run_tests(self, tested_model, count): connection = connections[self.current_db_alias] for (test_name, model, y_label), test_class in self.tests.items(): if model is not tested_model: continue benchmark_test = test_class(self, model) with transaction.atomic(using=self.current_db_alias): try: benchmark_test.setup() except SkipTest: value = elapsed_time = None else: start = time() value = benchmark_test.run() elapsed_time = time() - start connection.needs_rollback = True if value is None: value = elapsed_time self.add_data(model, test_name, count, value, y_label=y_label)
def reset_sequences(*models): """ After loading data the sequences must be reset in the database if the primary keys are manually specified. This is handled automatically by django for fixtures. Much of this is modeled after django.core.management.commands.loaddata. """ # connection = connections[DEFAULT_DB_ALIAS] # cursor = connection.cursor() # sequence_sql = connection.ops.sequence_reset_sql(no_style(), models) # if sequence_sql: # for line in sequence_sql: # cursor.execute(line) # # transaction.commit_unless_managed() # cursor.close() pass
def get_compiler(self, using=None, connection=None): """ Overrides the Query method get_compiler in order to return an instance of the above custom compiler. """ # Copy the body of this method from Django except the final # return statement. We will ignore code coverage for this. if using is None and connection is None: # pragma: no cover raise ValueError("Need either using or connection") if using: connection = connections[using] # Check that the compiler will be able to execute the query for alias, aggregate in self.annotation_select.items(): connection.ops.check_expression_support(aggregate) # Instantiate the custom compiler. return { CTEUpdateQuery: CTEUpdateQueryCompiler, CTEInsertQuery: CTEInsertQueryCompiler, CTEDeleteQuery: CTEDeleteQueryCompiler, CTEAggregateQuery: CTEAggregateQueryCompiler, }.get(self.__class__, CTEQueryCompiler)(self, connection, using)
def readiness(self, request): # Connect to each database and do a generic standard SQL query # that doesn't write any data and doesn't depend on any tables # being present. try: from django.db import connections for name in connections: cursor = connections[name].cursor() cursor.execute("SELECT 1;") row = cursor.fetchone() if row is None: return HttpResponseServerError("db: invalid response") except Exception as e: logger.exception(e) return HttpResponseServerError("db: cannot connect to database.") return HttpResponse("OK")
def run_query(query): """Takes a raw sql query and returns a list of lists. This function used a special `purged` database to protect users which connects to a read-only replicate with a special user. Returns a list of lists or a string on error. """ with connections[settings.CLIPS_DATABASE_ALIAS].cursor() as cursor: try: cursor.execute(query) except Exception as e: message = [[str(e)]] return message desc = cursor.description headers = [h.name for h in desc] result = cursor.fetchall() result.insert(0, headers) return result
def __exit__(self, exc_type, exc_val, exc_tb): exec_time = datetime.now() - self._start_time exec_time = (exec_time.seconds * 1000) + (exec_time.microseconds / 1000.0) self.exec_time = exec_time sql_count = 0 sql_time = Decimal() for dbname in connections: queries = connections[dbname].queries[self._db_start_queries[dbname]:] if not queries: continue self.sqls[dbname] = queries sql_time += sum(Decimal(row['time']) for row in queries) sql_count += len(queries) self.sql_count = sql_count self.sql_time = int(sql_time * 1000) self.print_info() self.stdout.flush() if self._close_stdout: self.stdout.close()
def _init_discrete_filter_masks(self): """Create an array of passing ids for every discrete valued filter. :rtype: dict `{filter_name: {value: [ids]}}`""" translated = tuple(TRANSLATION.get(f,f) for f in ['variant_id']+DISCRETE_FILTER_NAMES) cursor = connections[self.db].cursor() cursor.execute("SELECT {} FROM variants".format(','.join(translated))) # Create a variants mask per couple (filter, value), with 1 at indices corresponding to passing variants variant_masks = {t:defaultdict(partial(np.zeros, self._N, dtype=np.bool_)) for t in DISCRETE_FILTER_NAMES} enum_values = {t:set() for t in DISCRETE_FILTER_NAMES} irange = range(1,len(translated)) for row in cursor: vid = row[0] # variant id for i in irange: val = row[i] fname = DISCRETE_FILTER_NAMES[i-1] variant_masks[fname][val][vid-1] = 1 enum_values[fname].add(val) # Pack and cache the result for fname in DISCRETE_FILTER_NAMES: for val, mask in variant_masks[fname].items(): mask = masking.pack(mask) self.save_mask(mask, fname, val) self.save_enum_values(enum_values) self._masks_ready = True
def gene_dict(self): """ Build (and cache) a map {gene_name -> annotation} :return a dictionary {gene_name -> {annotation}} """ if self._gene_dict is None: cursor = connections[self._db].cursor() request = 'SELECT gene,chrom,transcript_min_start,transcript_max_end FROM gene_summary' cursor.execute(request) self._gene_dict = {} for r in cursor.fetchall(): symbol = r[0] if symbol is None: continue symbol = symbol.lower() self._gene_dict.setdefault(symbol, {'location': []}) self._gene_dict[symbol]['location'].append(GenomicRange(r[1], int(r[2]), int(r[3]))) return self._gene_dict
def user_factory(u: Users): """Create a more useful User instance from a Django Users instance *u*. In particular, its 'databases' attribute stores all active database names he has access to, with a runtime check of the connection and physical presence. """ role = role_factory(u.role) person = person_factory(u.person) accesses_qs = DbAccess.objects.filter(user=u, is_active=1) user_dbs = [acc.variants_db for acc in accesses_qs if acc.variants_db.is_active] databases = [] for db in user_dbs: if not db.name in connections: logger.warning("Database '{}' " "found in users db but not in settings.DATABASES. " "It was probably introduced manually. Syncing.".format(db.name)) manage_dbs.add_db_to_settings(db.name, db.filename) if not os.path.exists(settings.DATABASES.get(db.name)['NAME']): logger.warning("Database '{}' not found on disk!".format(db.name)) vdb = VariantsDb.objects.get(name=db.name) manage_dbs.deactivate_if_not_found_on_disk(vdb) continue databases.append(db) databases = [database_factory(db) for db in databases] return User(u.username, u.email, u.code, u.salt, u.is_active, person, role, databases)
def has_key(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) table = connections[db].ops.quote_name(self._table) if settings.USE_TZ: now = datetime.utcnow() else: now = datetime.now() now = now.replace(microsecond=0) with connections[db].cursor() as cursor: cursor.execute("SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table, [key, connections[db].ops.value_to_db_datetime(now)]) return cursor.fetchone() is not None
def _cull(self, db, cursor, now): if self._cull_frequency == 0: self.clear() else: # When USE_TZ is True, 'now' will be an aware datetime in UTC. now = now.replace(tzinfo=None) table = connections[db].ops.quote_name(self._table) cursor.execute("DELETE FROM %s WHERE expires < %%s" % table, [connections[db].ops.value_to_db_datetime(now)]) cursor.execute("SELECT COUNT(*) FROM %s" % table) num = cursor.fetchone()[0] if num > self._max_entries: cull_num = num // self._cull_frequency cursor.execute( connections[db].ops.cache_key_culling_sql() % table, [cull_num]) cursor.execute("DELETE FROM %s " "WHERE cache_key < %%s" % table, [cursor.fetchone()[0]])
def handle(self, *args, **options): changed = set() self.stdout.write("Checking...") for db in settings.DATABASES.keys(): try: executor = MigrationExecutor(connections[db]) except OperationalError: sys.exit("Unable to check migrations: cannot connect to database\n") autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), ) changed.update(autodetector.changes(graph=executor.loader.graph).keys()) changed -= set(options['ignore']) if changed: sys.exit( "Apps with model changes but no corresponding migration file: %(changed)s\n" % { 'changed': list(changed) }) else: sys.stdout.write("All migration files present\n")
def check(request): databases_info = [] for db in connections: databases_info.append(get_connection_info(connections[db])) return databases_info
def wait_for_db(max_attempts=15, seconds_between_attempts=1): # type: (int, int) -> None ''' Some manage.py commands interact with the database, and we want them to be directly callable from `docker-compose run`. However, because docker may start the database container at the same time as it runs `manage.py`, we potentially face a race condition, and the manage.py command may attempt to connect to a database that isn't yet ready for connections. To alleviate this, we'll just wait for the database before calling the manage.py command. ''' from django.db import DEFAULT_DB_ALIAS, connections from django.db.utils import OperationalError connection = connections[DEFAULT_DB_ALIAS] attempts = 0 while True: try: connection.ensure_connection() break except OperationalError as e: if attempts >= max_attempts: raise e attempts += 1 time.sleep(seconds_between_attempts) info("Attempting to connect to database.") info("Connection to database established.")
def test_find_migration_conflicts(self): conn = connections[DEFAULT_DB_ALIAS] self.add_django_app('tests.conflicting_migrations') result = migration_utils.find_unsafe_migrations(conn) assert result[0].app_name == 'conflicting_migrations' assert result[0].migration_names == { '0002_unsafemodel_some_other_changes', '0002_unsafemodel_first_changes' }
def handle(self, *args, **options): connection = connections['CHECK_MIGRATIONS'] unsafe_migrations = find_unsafe_migrations(connection) if len(unsafe_migrations) > 0: self.print_error_report(unsafe_migrations) exit(len(unsafe_migrations)) # noinspection PyMethodMayBeStatic
def check_migrations(): from django.db.migrations.autodetector import MigrationAutodetector from django.db.migrations.executor import MigrationExecutor from django.db.migrations.state import ProjectState changed = set() print("Checking {} migrations...".format(APP_NAME)) for db in settings.DATABASES.keys(): try: executor = MigrationExecutor(connections[db]) except OperationalError as e: sys.exit( "Unable to check migrations due to database: {}".format(e) ) autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), ) changed.update( autodetector.changes(graph=executor.loader.graph).keys() ) if changed and APP_NAME in changed: sys.exit( "A migration file is missing. Please run " "`python makemigrations.py` to generate it." ) else: print("All migration files present.")
def handle_app_config(self, app_config, **options): if app_config.models_module is None: return connection = connections[options.get('database')] models = app_config.get_models(include_auto_created=True) statements = connection.ops.sequence_reset_sql(self.style, models) return '\n'.join(statements)
def handle(self, *args, **options): self.verbosity = options.get('verbosity') # Get the database we're operating from db = options.get('database') connection = connections[db] if options['format'] == "plan": return self.show_plan(connection) else: return self.show_list(connection, options['app_labels'])
def handle(self, **options): return '\n'.join(sql_flush(self.style, connections[options['database']], only_django=True))
def handle(self, *args, **options): # Get the database we're operating from connection = connections[options['database']] # Load up an executor to get all the migration data executor = MigrationExecutor(connection) # Resolve command-line arguments into a migration app_label, migration_name = options['app_label'], options['migration_name'] if app_label not in executor.loader.migrated_apps: raise CommandError("App '%s' does not have migrations" % app_label) try: migration = executor.loader.get_migration_by_prefix(app_label, migration_name) except AmbiguityError: raise CommandError("More than one migration matches '%s' in app '%s'. Please be more specific." % ( migration_name, app_label)) except KeyError: raise CommandError("Cannot find a migration matching '%s' from app '%s'. Is it in INSTALLED_APPS?" % ( migration_name, app_label)) targets = [(app_label, migration.name)] # Make a plan that represents just the requested migrations and show SQL # for it plan = [(executor.loader.graph.nodes[targets[0]], options['backwards'])] sql_statements = executor.collect_sql(plan) return '\n'.join(sql_statements)
def handle(self, **options): connection = connections[options.get('database')] try: connection.client.runshell() except OSError: # Note that we're assuming OSError means that the client program # isn't installed. There's a possibility OSError would be raised # for some other reason, in which case this error message would be # inaccurate. Still, this message catches the common case. raise CommandError('You appear not to have the %r program installed or on your path.' % connection.client.executable_name)
def get(self, key, default=None, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) with connection.cursor() as cursor: cursor.execute("SELECT cache_key, value, expires FROM %s " "WHERE cache_key = %%s" % table, [key]) row = cursor.fetchone() if row is None: return default expires = row[2] expression = models.Expression(output_field=models.DateTimeField()) for converter in (connection.ops.get_db_converters(expression) + expression.get_db_converters(connection)): expires = converter(expires, expression, connection, {}) if expires < timezone.now(): db = router.db_for_write(self.cache_model_class) connection = connections[db] with connection.cursor() as cursor: cursor.execute("DELETE FROM %s " "WHERE cache_key = %%s" % table, [key]) return default value = connection.ops.process_clob(row[1]) return pickle.loads(base64.b64decode(force_bytes(value)))
def delete(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_write(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) with connection.cursor() as cursor: cursor.execute("DELETE FROM %s WHERE cache_key = %%s" % table, [key])
def clear(self): db = router.db_for_write(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) with connection.cursor() as cursor: cursor.execute('DELETE FROM %s' % table)