我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlparse.format()。
def format(self, record): super(SqlFormatter, self).format(record) sql = record.sql.strip() if self.parse: sql = sqlparse.format(sql, reindent=self.reindent, keyword_case=self.keyword_case) if hasattr(record, 'duration'): sql = "({0:.3f}ms) {1}".format(record.duration, sql) if self.highlight: sql = highlight( sql, self._lexer, self._formatter ) return sql
def prepare_sql_script(self, sql): """ Takes an SQL script that may contain multiple lines and returns a list of statements to feed to successive cursor.execute() calls. Since few databases are able to process raw SQL scripts in a single cursor.execute() call and PEP 249 doesn't talk about this use case, the default implementation is conservative. """ try: import sqlparse except ImportError: raise ImproperlyConfigured( "sqlparse is required if you don't split your SQL " "statements manually." ) else: return [sqlparse.format(statement, strip_comments=True) for statement in sqlparse.split(sql) if statement]
def create_table(self): if not self.schema: self.schema = DB_ETL_SCHEMA if not self.create_schema(): return False logger.info('try to create table {} in {}'.format( self.sql_table_name, self.schema )) if self.exist_table(): return True table = self.get_sql_table_object(need_columns=True) db_table = self.local_engine.execute(CreateTable(table)) for index in table.indexes: self.local_engine.execute(CreateIndex(index)) return db_table
def locale_sql_count(self): sql = '' if self.schema and self.sql_table_name: """Use schema and table name""" try: sql = 'SELECT COUNT(*) AS rows_count FROM {schema}.{table};'.\ format( schema=self.schema, table=self.sql_table_name ) except Exception as e: logger.exception(str(e)) elif self.sql_table_name: """Use table name""" try: sql = 'SELECT COUNT(*) AS rows_count FROM {table};'.format( table=self.sql_table_name ) except Exception as e: logger.exception(str(e)) return sql
def format(self, record): sql = record.sql.strip() if sqlparse: # Indent the SQL query sql = sqlparse.format(sql, reindent=True) if pygments: # Highlight the SQL query sql = pygments.highlight( sql, SqlLexer(), Terminal256Formatter(style='monokai') ) record.statement = sql return super(SQLFormatter, self).format(record)
def test_strip_comments_multi(self): sql = '/* sql starts here */\nselect' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select') sql = '/* sql starts here */ select' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select') sql = '/*\n * sql starts here\n */\nselect' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select') sql = 'select (/* sql starts here */ select 2)' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select (select 2)') sql = 'select (/* sql /* starts here */ select 2)' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select (select 2)')
def test_notransform_of_quoted_crlf(self): # Make sure that CR/CR+LF characters inside string literals don't get # affected by the formatter. s1 = "SELECT some_column LIKE 'value\r'" s2 = "SELECT some_column LIKE 'value\r'\r\nWHERE id = 1\n" s3 = "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\r" s4 = "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\r\n" f = lambda x: sqlparse.format(x) # Because of the use of self.ndiffAssertEqual(f(s1), "SELECT some_column LIKE 'value\r'") self.ndiffAssertEqual(f(s2), "SELECT some_column LIKE 'value\r'\nWHERE id = 1\n") self.ndiffAssertEqual(f(s3), "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\n") self.ndiffAssertEqual(f(s4), "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\n")
def test_join(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select * from foo join bar on 1 = 2' self.ndiffAssertEqual(f(s), '\n'.join(['select *', 'from foo', 'join bar on 1 = 2'])) s = 'select * from foo inner join bar on 1 = 2' self.ndiffAssertEqual(f(s), '\n'.join(['select *', 'from foo', 'inner join bar on 1 = 2'])) s = 'select * from foo left outer join bar on 1 = 2' self.ndiffAssertEqual(f(s), '\n'.join(['select *', 'from foo', 'left outer join bar on 1 = 2'] )) s = 'select * from foo straight_join bar on 1 = 2' self.ndiffAssertEqual(f(s), '\n'.join(['select *', 'from foo', 'straight_join bar on 1 = 2'] ))
def test_duplicate_linebreaks(self): # issue3 f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select c1 -- column1\nfrom foo' self.ndiffAssertEqual(f(s), '\n'.join(['select c1 -- column1', 'from foo'])) s = 'select c1 -- column1\nfrom foo' r = sqlparse.format(s, reindent=True, strip_comments=True) self.ndiffAssertEqual(r, '\n'.join(['select c1', 'from foo'])) s = 'select c1\nfrom foo\norder by c1' self.ndiffAssertEqual(f(s), '\n'.join(['select c1', 'from foo', 'order by c1'])) s = 'select c1 from t1 where (c1 = 1) order by c1' self.ndiffAssertEqual(f(s), '\n'.join(['select c1', 'from t1', 'where (c1 = 1)', 'order by c1']))
def test_issue90(): sql = ('UPDATE "gallery_photo" SET "owner_id" = 4018, "deleted_at" = NULL,' ' "width" = NULL, "height" = NULL, "rating_votes" = 0,' ' "rating_score" = 0, "thumbnail_width" = NULL,' ' "thumbnail_height" = NULL, "price" = 1, "description" = NULL') formatted = sqlparse.format(sql, reindent=True) tformatted = '\n'.join(['UPDATE "gallery_photo"', 'SET "owner_id" = 4018,', ' "deleted_at" = NULL,', ' "width" = NULL,', ' "height" = NULL,', ' "rating_votes" = 0,', ' "rating_score" = 0,', ' "thumbnail_width" = NULL,', ' "thumbnail_height" = NULL,', ' "price" = 1,', ' "description" = NULL']) assert formatted == tformatted
def _is_initialized(cursor): """ Check that database is initialized """ cursor.execute('SELECT EXISTS(SELECT 1 FROM ' 'information_schema.tables ' 'WHERE table_schema = %s ' 'AND table_name = %s)', ('public', 'schema_version')) table_exists = cursor.fetchone()[0] if not table_exists: return False cursor.execute('SELECT * from public.schema_version limit 1') colnames = [desc[0] for desc in cursor.description] if colnames != REF_COLUMNS: raise MalformedSchema( 'Table schema_version has unexpected ' 'structure: {struct}'.format(struct='|'.join(colnames))) return True
def _set_baseline(baseline_v, user, cursor): """ Cleanup schema_version and set baseline """ cursor.execute('SELECT EXISTS(SELECT 1 FROM public' '.schema_version WHERE version >= %s::bigint)', (baseline_v,)) check_failed = cursor.fetchone()[0] if check_failed: raise BaselineError( 'Unable to baseline, version ' '{version} already applied'.format(version=text(baseline_v))) LOG.info('cleaning up table schema_version') cursor.execute('DELETE FROM public.schema_version') LOG.info(cursor.statusmessage) LOG.info('setting baseline') cursor.execute('INSERT INTO public.schema_version ' '(version, type, description, installed_by) ' 'VALUES (%s::bigint, %s, %s, %s)', (text(baseline_v), 'manual', 'Forced baseline', user)) LOG.info(cursor.statusmessage)
def _get_statements(path): """ Get statements from file """ with codecs.open(path, encoding='utf-8') as i: data = i.read() if u'/* pgmigrate-encoding: utf-8 */' not in data: try: data.encode('ascii') except UnicodeError as exc: raise MalformedStatement( 'Non ascii symbols in file: {0}, {1}'.format( path, text(exc))) data = sqlparse.format(data, strip_comments=True) for statement in sqlparse.parsestream(data, encoding='utf-8'): st_str = text(statement).strip().encode('utf-8') if st_str: yield st_str
def _parse_dict_callbacks(callbacks, ret, base_dir): for i in callbacks: if i in ret._fields: for j in callbacks[i]: path = os.path.join(base_dir, j) if not os.path.exists(path): raise ConfigurationError( 'Path unavailable: {path}'.format(path=text(path))) if os.path.isdir(path): for fname in sorted(os.listdir(path)): getattr(ret, i).append(os.path.join(path, fname)) else: getattr(ret, i).append(path) else: raise ConfigurationError( 'Unexpected callback type: {type}'.format(type=text(i))) return ret
def process_response(request, response): if request.GET.get('debug') == '': if response['Content-Type'] == 'application/octet-stream': new_content = ('<html><body>Binary Data, Length: {}</body></html>' .format(len(response.content))) response = HttpResponse(new_content) elif response['Content-Type'] != 'text/html': content = response.content try: json_ = json.loads(content) content = json.dumps(json_, sort_keys=True, indent=2) except ValueError: pass response = HttpResponse('<html><body><pre>{}</pre></body></html>'.format(content)) return response
def render_sql(db, q, inline=False): """ Render the sql used by a query (only works for Postgres) :param q (query): an SQLAlchemy query object :param inline (bool): inline parameters? :return: str """ compiled_statement = q.statement.compile(dialect=postgresql.dialect()) pretty_statement = sql_format(str(compiled_statement), reindent=True) if inline: with db.session.connection().connection.connection.cursor() as cur: return cur.mogrify(pretty_statement, compiled_statement.params).decode('utf-8') else: return pretty_statement + ("\nparameters: {}".format(str(compiled_statement.params)) if compiled_statement.params else '')
def test_sqlformatter(self): sqlformatter = SqlFormatter().format(self.model) sql_compare = 'SELECT "django_migrations"."app","django_migrations"."name" FROM "django_migrations"' sql_compare = sqlparse.format(sql_compare, reindent=True, keyword_case='upper') sql_compare = "(0.005ms) {0}".format(sql_compare) sql_compare = highlight( sql_compare, SqlLexer(), Terminal256Formatter(style='default') ) self.assertEqual(sql_compare, sqlformatter)
def prettify_sql(sql): if sql is None: return None return sqlparse.format( sql, keyword_case="upper", identfier_case="lower", strip_comments=False, reindent=True, indent_tabs=False)
def get_safe_connection_string(conn): try: # 2.7+ params = conn.get_dsn_parameters() except AttributeError: params = dict(i.split('=') for i in shlex.split(conn.dsn)) return '{user}@{host}:{port}/{dbname}'.format(**params)
def callproc(self, procname, vars=None): timestamp = time.time() try: return super(TaliskerCursor, self).callproc(procname, vars) finally: duration = (time.time() - timestamp) * 1000 # no query parameters, cannot safely record self.connection._record( 'stored proc: {}'.format(procname), None, duration)
def return_insert_id(self): """ For backends that support returning the last insert ID as part of an insert query, this method returns the SQL and params to append to the INSERT query. The returned fragment should contain a format string to hold the appropriate column. """ pass
def get_db_converters(self, expression): """ Get a list of functions needed to convert field data. Some field types on some backends do not provide data in the correct format, this is the hook for converter functions. """ return []
def get_perm(self): return ( '[{obj.sql_table_name}]' '(id:{obj.id})').format(obj=self)
def sql_table_name(self): return '{}_{}'.format(DB_ETL_PREFIX, self.name)
def create_schema(self): logger.info('try to create schema {}'.format(self.schema)) if self.exist_schema(): return True return self.local_engine.execute(CreateSchema(self.schema))
def delete_table(self): logger.info('try to delete table {} in {}'.format( self.sql_table_name, self.schema )) table = self.get_sql_table_object(need_columns=False) return self.local_engine.execute(DropTable(table))
def remote_sql_count(self): """SQL QUERY FOR COUNT ROW IN REMOTE SOURCE""" try: return 'SELECT COUNT(*) AS rows_count FROM ({sql}) AS CHITER;'. \ format( sql=self.remote_etl_sql().compile( compile_kwargs={"literal_binds": True}) ) except AttributeError: return 'SELECT COUNT(*) AS rows_count FROM ({sql}) AS CHITER;'. \ format( sql=self.remote_etl_sql() )
def clear(self): """ Stop ETL TASK and Clear ETL local table """ dt = datetime.utcnow().replace( microsecond=0 ) self.status = EtlStatus.STOPPED self.sync_last = '' self.sync_last_time = dt self.sync_periodic = 0 self.is_scheduled = False # self.chunk_size = 0 self.progress = 0 self.sync_next_time = self.get_next_sync() db.session.merge(self) db.session.commit() table_name = self.sql_table_name if self.schema and self.sql_table_name: table_name = '{schema}.{table}'.format( schema=self.schema, table=self.sql_table_name ) sql_truncate = 'TRUNCATE TABLE {} CONTINUE IDENTITY RESTRICT;'.format( table_name ) with self.local_engine.connect() as local_conection: local_conection.execution_options( autocommit=True ).execute(sql_truncate)
def clear_column_type(cls, column_type=''): if column_type: column_type = column_type.split(') ') l = len(column_type) column_type = column_type[0] if l: if l > 1: column_type = '{})'.format(column_type) column_type = 'sa.{}'.format(column_type) if 'INTEGER' in column_type or 'TINYINT' in column_type\ or 'BIGINT' in column_type: column_type = 'sa.Integer()' if 'string' in column_type: column_type = 'sa.Text()' if 'OBJECT' in column_type: column_type = 'sa.Text()' if 'DATETIME' in column_type or 'TIMESTAMP WITHOUT TIME ZONE' \ in column_type or 'TIMESTAMP WITH TIME ZONE'\ in column_type: column_type = 'sa.DateTime()' if 'HSTORE' in column_type: # column_type = 'postgresql.HSTORE(text_type=sa.Text())' column_type = 'postgresql.HSTORE' else: column_type = 'sa.Text()' return column_type
def sqlformat(sql): ''' Format SQL queries. ''' return sqlparse.format(str(sql), reindent=True, wrap_after=120)
def prepare_sql_script(self, sql, _allow_fallback=False): """ Takes a SQL script that may contain multiple lines and returns a list of statements to feed to successive cursor.execute() calls. Since few databases are able to process raw SQL scripts in a single cursor.execute() call and PEP 249 doesn't talk about this use case, the default implementation is conservative. """ # Remove _allow_fallback and keep only 'return ...' in Django 1.9. try: # This import must stay inside the method because it's optional. import sqlparse except ImportError: if _allow_fallback: # Without sqlparse, fall back to the legacy (and buggy) logic. warnings.warn( "Providing initial SQL data on a %s database will require " "sqlparse in Django 1.9." % self.connection.vendor, RemovedInDjango19Warning) from django.core.management.sql import _split_statements return _split_statements(sql) else: raise else: return [sqlparse.format(statement, strip_comments=True) for statement in sqlparse.split(sql) if statement]
def get_db_converters(self, expression): """Get a list of functions needed to convert field data. Some field types on some backends do not provide data in the correct format, this is the hook for coverter functions. """ return []
def test_keywordcase(self): sql = 'select * from bar; -- select foo\n' res = sqlparse.format(sql, keyword_case='upper') self.ndiffAssertEqual(res, 'SELECT * FROM bar; -- select foo\n') res = sqlparse.format(sql, keyword_case='capitalize') self.ndiffAssertEqual(res, 'Select * From bar; -- select foo\n') res = sqlparse.format(sql.upper(), keyword_case='lower') self.ndiffAssertEqual(res, 'select * from BAR; -- SELECT FOO\n') self.assertRaises(SQLParseError, sqlparse.format, sql, keyword_case='foo')
def test_identifiercase(self): sql = 'select * from bar; -- select foo\n' res = sqlparse.format(sql, identifier_case='upper') self.ndiffAssertEqual(res, 'select * from BAR; -- select foo\n') res = sqlparse.format(sql, identifier_case='capitalize') self.ndiffAssertEqual(res, 'select * from Bar; -- select foo\n') res = sqlparse.format(sql.upper(), identifier_case='lower') self.ndiffAssertEqual(res, 'SELECT * FROM bar; -- SELECT FOO\n') self.assertRaises(SQLParseError, sqlparse.format, sql, identifier_case='foo') sql = 'select * from "foo"."bar"' res = sqlparse.format(sql, identifier_case="upper") self.ndiffAssertEqual(res, 'select * from "foo"."bar"')
def test_strip_comments_single(self): sql = 'select *-- statement starts here\nfrom foo' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select * from foo') sql = 'select * -- statement starts here\nfrom foo' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select * from foo') sql = 'select-- foo\nfrom -- bar\nwhere' res = sqlparse.format(sql, strip_comments=True) self.ndiffAssertEqual(res, 'select from where') self.assertRaises(SQLParseError, sqlparse.format, sql, strip_comments=None)
def test_strip_ws(self): f = lambda sql: sqlparse.format(sql, strip_whitespace=True) s = 'select\n* from foo\n\twhere ( 1 = 2 )\n' self.ndiffAssertEqual(f(s), 'select * from foo where (1 = 2)') s = 'select -- foo\nfrom bar\n' self.ndiffAssertEqual(f(s), 'select -- foo\nfrom bar') self.assertRaises(SQLParseError, sqlparse.format, s, strip_whitespace=None)
def test_outputformat(self): sql = 'select * from foo;' self.assertRaises(SQLParseError, sqlparse.format, sql, output_format='foo')
def test_option(self): self.assertRaises(SQLParseError, sqlparse.format, 'foo', reindent=2) self.assertRaises(SQLParseError, sqlparse.format, 'foo', indent_tabs=2) self.assertRaises(SQLParseError, sqlparse.format, 'foo', reindent=True, indent_width='foo') self.assertRaises(SQLParseError, sqlparse.format, 'foo', reindent=True, indent_width=-12)
def test_stmts(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select foo; select bar' self.ndiffAssertEqual(f(s), 'select foo;\n\nselect bar') s = 'select foo' self.ndiffAssertEqual(f(s), 'select foo') s = 'select foo; -- test\n select bar' self.ndiffAssertEqual(f(s), 'select foo; -- test\n\nselect bar')
def test_keywords(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select * from foo union select * from bar;' self.ndiffAssertEqual(f(s), '\n'.join(['select *', 'from foo', 'union', 'select *', 'from bar;']))
def test_parenthesis(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select count(*) from (select * from foo);' self.ndiffAssertEqual(f(s), '\n'.join(['select count(*)', 'from', ' (select *', ' from foo);', ]) )
def test_where(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select * from foo where bar = 1 and baz = 2 or bzz = 3;' self.ndiffAssertEqual(f(s), ('select *\nfrom foo\n' 'where bar = 1\n' ' and baz = 2\n' ' or bzz = 3;')) s = 'select * from foo where bar = 1 and (baz = 2 or bzz = 3);' self.ndiffAssertEqual(f(s), ('select *\nfrom foo\n' 'where bar = 1\n' ' and (baz = 2\n' ' or bzz = 3);'))
def test_identifier_list(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = 'select foo, bar, baz from table1, table2 where 1 = 2' self.ndiffAssertEqual(f(s), '\n'.join(['select foo,', ' bar,', ' baz', 'from table1,', ' table2', 'where 1 = 2'])) s = 'select a.*, b.id from a, b' self.ndiffAssertEqual(f(s), '\n'.join(['select a.*,', ' b.id', 'from a,', ' b']))
def test_identifier_list_with_functions(self): f = lambda sql: sqlparse.format(sql, reindent=True) s = ("select 'abc' as foo, coalesce(col1, col2)||col3 as bar," "col3 from my_table") self.ndiffAssertEqual(f(s), '\n'.join( ["select 'abc' as foo,", " coalesce(col1, col2)||col3 as bar,", " col3", "from my_table"]))