我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用django.db.OperationalError()。
def check_duplicate_emails(app_configs=None, **kwargs): from accounts.utils import get_duplicate_emails errors = [] try: if len(get_duplicate_emails()): errors.append( checks.Warning( _("There are user accounts with duplicate emails. This " "will not be allowed in Pootle 2.8."), hint=_("Try using 'pootle find_duplicate_emails', and " "then update user emails with 'pootle " "update_user_email username email'. You might also " "want to consider using pootle merge_user or " "purge_user commands"), id="pootle.W017" ) ) except (OperationalError, ProgrammingError): # no accounts set up - most likely in a test pass return errors
def check_users(app_configs=None, **kwargs): from django.contrib.auth import get_user_model errors = [] User = get_user_model() try: admin_user = User.objects.get(username='admin') except (User.DoesNotExist, OperationalError, ProgrammingError): pass else: if admin_user.check_password('admin'): errors.append(checks.Warning( _("The default 'admin' user still has a password set to " "'admin'."), hint=_("Remove the 'admin' user or change its password."), id="pootle.W016", )) return errors
def check_revision(app_configs=None, **kwargs): from pootle.core.models import Revision from pootle_store.models import Unit errors = [] revision = Revision.get() try: max_revision = Unit.max_revision() except (OperationalError, ProgrammingError): return errors if revision is None or revision < max_revision: errors.append(checks.Critical( _("Revision is missing or has an incorrect value."), hint=_("Run `revision --restore` to reset the revision counter."), id="pootle.C016", )) return errors
def handle(self, *args, **options): print ">>> Initializing your database" try: management.call_command('syncdb') management.call_command('migrate') try: # do we need cache table? cache.get('', None) except ProgrammingError: # yes we do management.call_command('createcachetable', 'vaultier_cache') # public static files management.call_command('collectstatic', interactive=False) except OperationalError as e: msg = ">>> Your DB is not configured correctly: {}" print msg.format(e.message) else: if options.get('no_statistics'): task_statistics_collector.delay() print (">>> DB is initialized, you can now try to run Vaultier " "using 'vaultier runserver'")
def test_first_access_nowait(self): def one(output): with transaction.atomic(): value = get_next_value() output.append(('one', value)) time.sleep(0.5) connection.close() # One might expect an OperationalError here, but PostgreSQL doesn't # appear to report an error in this case. def two(output): time.sleep(0.1) with transaction.atomic(): value = get_next_value(nowait=True) output.append(('two', value)) connection.close() expected = [ ('one', 1), ('two', 2), ] self.assertSequence(one, two, expected)
def close_on_exception(func): """ A wrapper to close the database connection if a DB error occurs, so that it will get re-opened on the next use. Squashes the exception and logs it. """ @functools.wraps(func) def wrapper(*args, **kwargs): try: func(*args, **kwargs) except OperationalError as exc: logger.error('Database error, closing connection', exc_info=True) db.connection.close() assert db.connection.closed_in_transaction is False, \ 'Could not close connection, probably because this wrapper ' \ 'was used inside an transaction.atomic() block.' return wrapper
def fetch_tables(self, database_name): # escape input escaped_database_name = self.escape_identifier(database_name) # prepare sql string sql = 'SHOW FULL TABLES FROM %(database)s' % { 'database': escaped_database_name } # execute query try: rows = self.fetchall(sql) except OperationalError as e: logger.error('Could not fetch from %s (%s)' % (database_name, e)) return [] else: return [{ 'name': row[0], 'type': 'view' if row[1] == 'VIEW' else 'table' } for row in rows]
def fetch_table(self, database_name, table_name): # prepare sql string sql = 'SHOW FULL TABLES FROM %(database)s LIKE %(table)s' % { 'database': self.escape_identifier(database_name), 'table': self.escape_string(table_name) } # execute query try: row = self.fetchone(sql) except OperationalError as e: logger.error('Could not fetch %s.%s (%s)' % (database_name, table_name, e)) return {} else: return { 'name': row[0], 'type': 'view' if row[1] == 'VIEW' else 'table' }
def execute_sql(connection, stmt): try: cursor = connection.cursor() except OperationalError as e: return {'error': str(e)} cursor.execute(stmt) result = cursor.fetchone()[0] return result
def save(self, *args, **kwargs): while True: try: with transaction.atomic(): return _base_save(self, *args, **kwargs) except OperationalError as err: if 'database is locked' in str(err): logger.warning("%s, model: %s, args: %s, kwargs: %s", err, self.__class__, args, kwargs) sleep(0.5) continue raise
def handle(self, *args, **options): for bid in args: try: b = Build.objects.get(pk = bid) except ObjectDoesNotExist: print('build %s does not exist, skipping...' %(bid)) continue # theoretically, just b.delete() would suffice # however SQLite runs into problems when you try to # delete too many rows at once, so we delete some direct # relationships from Build manually. for t in b.target_set.all(): t.delete() for t in b.task_build.all(): t.delete() for p in b.package_set.all(): p.delete() for lv in b.layer_version_build.all(): lv.delete() for v in b.variable_build.all(): v.delete() for l in b.logmessage_set.all(): l.delete() # delete the build; some databases might have had problem with migration of the bldcontrol app retry_count = 0 need_bldcontrol_migration = False while True: if retry_count >= 5: break retry_count += 1 if need_bldcontrol_migration: from django.core import management management.call_command('migrate', 'bldcontrol', interactive=False) try: b.delete() break except OperationalError as e: # execute migrations need_bldcontrol_migration = True
def convert_datetime_to_datetimetz(apps, schema_editor, model_name=None): if model_name: Model = apps.get_model("kolibriauth", model_name) try: # Prevent the non-existence of this table from blowing up test runs # Seems to only occur on test runs on Travis, but otherwise works fine # Hopefully future migration squashes should prevent this code being # called at all for new users for model in Model.objects.all(): model.save() except OperationalError: pass
def _check_table_exists(db, table_name): assert db is not None, "Expected table name (str), got None." cursor = connections[db].cursor() request = 'SELECT gene FROM {} LIMIT 1'.format(table_name) try: cursor.execute(request) except OperationalError: raise FileNotFoundError("Table not found: {}.")
def test_later_access_nowait(self): get_next_value() def one(output): with transaction.atomic(): value = get_next_value() output.append(('one', value)) time.sleep(0.5) connection.close() def two(output): time.sleep(0.1) with self.assertRaises(OperationalError): with transaction.atomic(): value = get_next_value(nowait=True) output.append(('two', value)) # shouldn't be reached output.append(('two', 'exc')) connection.close() expected = [ ('one', 2), ('two', 'exc'), ] self.assertSequence(one, two, expected)
def build_choice_list(field): """This function builds a list of choices from FieldChoice.""" choice_list = [] # Get choices for a certain field in FieldChoices, append machine_value and english_name try: for choice in FieldChoice.objects.filter(field=field): choice_list.append((str(choice.machine_value), choice.english_name)) return choice_list # Enter this exception if for example the db has no data yet (without this it is impossible to migrate) except OperationalError: return choice_list
def get_app_patterns(): try: return _get_app_patterns() except (OperationalError, ProgrammingError): # ignore if DB is not ready # Starting with Django 1.9 this code gets called even when creating # or running migrations. So in many cases the DB will not be ready yet. return []
def raw_id_fields(cls): # Dynamically set raw_id_fields based on settings threshold = get_cms_setting('RAW_ID_USERS') # Given a fresh django-cms install and a django settings with the # CMS_RAW_ID_USERS = CMS_PERMISSION = True # django throws an OperationalError when running # ./manage migrate # because auth_user doesn't exists yet try: threshold = threshold and get_user_model().objects.count() > threshold except OperationalError: threshold = False return ['user'] if threshold else []
def get_list_filter(self, request): threshold = get_cms_setting('RAW_ID_USERS') try: threshold = threshold and get_user_model().objects.count() > threshold except OperationalError: threshold = False filter_copy = deepcopy(self.list_filter) if threshold: filter_copy.remove('user') return filter_copy
def migrate(self, shard, *args, **options): try: start = time.time() options['database'] = shard.alias super(Command, self).handle(*args, **options) shard.connection.close() self.stdout.write( self.style.MIGRATE_HEADING('Migrated database %s in %.4f sec' % (shard.database, time.time() - start))) except OperationalError as e: self.stdout.write( self.style.MIGRATE_HEADING('Failed migrating database: %s. Error: %s' % (options['database'], e)))
def test_store_missing_symbol_client_operationalerror( client, botomock, settings ): """If the *storing* of a missing symbols causes an OperationalError, the main client that requests should still be a 404. On the inside, what we do is catch the operational error, and instead call out to a celery job that does it instead. This test is a bit cryptic. The S3 call is mocked. The calling of the 'store_missing_symbol()' function (that is in 'downloads/views.py') is mocked. Lastly, the wrapped task function 'store_missing_symbol_task()' is also mocked (so we don't actually call out to Redis). Inside the mocked call to the celery task, we actually call the original 'tecken.download.utils.store_missing_symbol' function just to make sure the MissingSymbol record gets created. """ settings.ENABLE_STORE_MISSING_SYMBOLS = True reload_downloader('https://s3.example.com/private/prefix/') def mock_api_call(self, operation_name, api_params): assert operation_name == 'ListObjectsV2' return {} url = reverse('download:download_symbol', args=( 'foo.pdb', '44E4EC8C2F41492B9369D6B9A059577C2', 'foo.ex_', )) task_arguments = [] def fake_task(*args, **kwargs): store_missing_symbol(*args, **kwargs) task_arguments.append(args) store_args = [] def mock_store_missing_symbols(*args, **kwargs): store_args.append(args) raise OperationalError('On noes!') _mock_function = 'tecken.download.views.store_missing_symbol_task.delay' with botomock(mock_api_call), mock.patch( 'tecken.download.views.store_missing_symbol', new=mock_store_missing_symbols ), mock.patch(_mock_function, new=fake_task): response = client.get(url, {'code_file': 'something'}) assert response.status_code == 404 assert response.content == b'Symbol Not Found' assert len(store_args) == 1 assert len(task_arguments) == 1 assert MissingSymbol.objects.all().count() == 1
def log_symbol_get_404( symbol, debugid, filename, code_file='', code_id='', ): """Store the fact that a symbol could not be found. The purpose of this is be able to answer "What symbol fetches have recently been attempted and failed?" With that knowledge, we can deduce which symbols are commonly needed in symbolication but failed to be available. Then you can try to go and get hold of them and thus have less symbol 404s in the future. Because this is expected to be called A LOT (in particular from Socorro's Processor) we have to do this rapidly in a database that is suitable for many fast writes. See https://bugzilla.mozilla.org/show_bug.cgi?id=1361854#c5 for the backstory about expected traffic. The URL used when requesting the file will only ever be 'symbol', 'debugid' and 'filename', but some services, like Socorro's stackwalker is actually aware of other parameters that are relevant only to this URL. Hence 'code_file' and 'code_id' which are both optional. """ if settings.ENABLE_STORE_MISSING_SYMBOLS: try: return store_missing_symbol( symbol, debugid, filename, code_file=code_file, code_id=code_id, ) except OperationalError as exception: # Note that this doesn't return. The reason is because it's # a background job. We can only fire-and-forget sending it. # That's why we only do this in the unusual case of an # operational error. # By sending it to a background task, it gets to try storing # it again. The reasons that's more like to work is because... # # A) There's a natural delay until it tries the DB # write. Perhaps that little delay is all we need to try # again and be lucky. # B) The celery tasks have built-in support for retrying. # So if it fails the first time (which is already unlikely) # you get a second chance after some deliberate sleep. # # The return value is only rarely useful. It's used to indicate # that it *just* now got written down. And that's useful to know # when we attempt to automatically download it from Microsoft. store_missing_symbol_task.delay( symbol, debugid, filename, code_file=code_file, code_id=code_id, )