Python sqlparse 模块,format() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlparse.format()

项目:ddquery    作者:elinaldosoft    | 项目源码 | 文件源码
def format(self, record):
        super(SqlFormatter, self).format(record)
        sql = record.sql.strip()

        if self.parse:
            sql = sqlparse.format(sql, reindent=self.reindent, keyword_case=self.keyword_case)
            if hasattr(record, 'duration'):
                sql = "({0:.3f}ms) {1}".format(record.duration, sql)

        if self.highlight:
            sql = highlight(
                    sql,
                    self._lexer,
                    self._formatter
                )

        return sql
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def create_table(self):

        if not self.schema:
            self.schema = DB_ETL_SCHEMA

        if not self.create_schema():
            return False

        logger.info('try to create table {} in {}'.format(
            self.sql_table_name,
            self.schema
        ))

        if self.exist_table():
            return True

        table = self.get_sql_table_object(need_columns=True)

        db_table = self.local_engine.execute(CreateTable(table))

        for index in table.indexes:
            self.local_engine.execute(CreateIndex(index))

        return db_table
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def locale_sql_count(self):
        sql = ''
        if self.schema and self.sql_table_name:
            """Use schema and table name"""
            try:
                sql = 'SELECT COUNT(*) AS rows_count FROM {schema}.{table};'.\
                    format(
                        schema=self.schema,
                        table=self.sql_table_name
                    )
            except Exception as e:
                logger.exception(str(e))
        elif self.sql_table_name:
            """Use table name"""
            try:
                sql = 'SELECT COUNT(*) AS rows_count FROM {table};'.format(
                    table=self.sql_table_name
                )
            except Exception as e:
                logger.exception(str(e))

        return sql
项目:djangolab    作者:DhiaTN    | 项目源码 | 文件源码
def format(self, record):

        sql = record.sql.strip()

        if sqlparse:
            # Indent the SQL query
            sql = sqlparse.format(sql, reindent=True)

        if pygments:
            # Highlight the SQL query
            sql = pygments.highlight(
                sql,
                SqlLexer(),
                Terminal256Formatter(style='monokai')
            )

        record.statement = sql
        return super(SQLFormatter, self).format(record)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_strip_comments_multi(self):
        sql = '/* sql starts here */\nselect'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select')
        sql = '/* sql starts here */ select'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select')
        sql = '/*\n * sql starts here\n */\nselect'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select')
        sql = 'select (/* sql starts here */ select 2)'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select (select 2)')
        sql = 'select (/* sql /* starts here */ select 2)'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select (select 2)')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_notransform_of_quoted_crlf(self):
        # Make sure that CR/CR+LF characters inside string literals don't get
        # affected by the formatter.

        s1 = "SELECT some_column LIKE 'value\r'"
        s2 = "SELECT some_column LIKE 'value\r'\r\nWHERE id = 1\n"
        s3 = "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\r"
        s4 = "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\r\n"

        f = lambda x: sqlparse.format(x)

        # Because of the use of
        self.ndiffAssertEqual(f(s1), "SELECT some_column LIKE 'value\r'")
        self.ndiffAssertEqual(f(s2), "SELECT some_column LIKE 'value\r'\nWHERE id = 1\n")
        self.ndiffAssertEqual(f(s3), "SELECT some_column LIKE 'value\\'\r' WHERE id = 1\n")
        self.ndiffAssertEqual(f(s4), "SELECT some_column LIKE 'value\\\\\\'\r' WHERE id = 1\n")
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_join(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select * from foo join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'join bar on 1 = 2']))
        s = 'select * from foo inner join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'inner join bar on 1 = 2']))
        s = 'select * from foo left outer join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'left outer join bar on 1 = 2']
                                              ))
        s = 'select * from foo straight_join bar on 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'straight_join bar on 1 = 2']
                                              ))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_duplicate_linebreaks(self):  # issue3
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select c1 -- column1\nfrom foo'
        self.ndiffAssertEqual(f(s), '\n'.join(['select c1 -- column1',
                                               'from foo']))
        s = 'select c1 -- column1\nfrom foo'
        r = sqlparse.format(s, reindent=True, strip_comments=True)
        self.ndiffAssertEqual(r, '\n'.join(['select c1',
                                            'from foo']))
        s = 'select c1\nfrom foo\norder by c1'
        self.ndiffAssertEqual(f(s), '\n'.join(['select c1',
                                               'from foo',
                                               'order by c1']))
        s = 'select c1 from t1 where (c1 = 1) order by c1'
        self.ndiffAssertEqual(f(s), '\n'.join(['select c1',
                                               'from t1',
                                               'where (c1 = 1)',
                                               'order by c1']))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_issue90():
    sql = ('UPDATE "gallery_photo" SET "owner_id" = 4018, "deleted_at" = NULL,'
           ' "width" = NULL, "height" = NULL, "rating_votes" = 0,'
           ' "rating_score" = 0, "thumbnail_width" = NULL,'
           ' "thumbnail_height" = NULL, "price" = 1, "description" = NULL')
    formatted = sqlparse.format(sql, reindent=True)
    tformatted = '\n'.join(['UPDATE "gallery_photo"',
                            'SET "owner_id" = 4018,',
                            '    "deleted_at" = NULL,',
                            '    "width" = NULL,',
                            '    "height" = NULL,',
                            '    "rating_votes" = 0,',
                            '    "rating_score" = 0,',
                            '    "thumbnail_width" = NULL,',
                            '    "thumbnail_height" = NULL,',
                            '    "price" = 1,',
                            '    "description" = NULL'])
    assert formatted == tformatted
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:pgmigrate    作者:yandex    | 项目源码 | 文件源码
def _is_initialized(cursor):
    """
    Check that database is initialized
    """
    cursor.execute('SELECT EXISTS(SELECT 1 FROM '
                   'information_schema.tables '
                   'WHERE table_schema = %s '
                   'AND table_name = %s)',
                   ('public', 'schema_version'))
    table_exists = cursor.fetchone()[0]

    if not table_exists:
        return False

    cursor.execute('SELECT * from public.schema_version limit 1')

    colnames = [desc[0] for desc in cursor.description]

    if colnames != REF_COLUMNS:
        raise MalformedSchema(
            'Table schema_version has unexpected '
            'structure: {struct}'.format(struct='|'.join(colnames)))

    return True
项目:pgmigrate    作者:yandex    | 项目源码 | 文件源码
def _set_baseline(baseline_v, user, cursor):
    """
    Cleanup schema_version and set baseline
    """
    cursor.execute('SELECT EXISTS(SELECT 1 FROM public'
                   '.schema_version WHERE version >= %s::bigint)',
                   (baseline_v,))
    check_failed = cursor.fetchone()[0]

    if check_failed:
        raise BaselineError(
            'Unable to baseline, version '
            '{version} already applied'.format(version=text(baseline_v)))

    LOG.info('cleaning up table schema_version')
    cursor.execute('DELETE FROM public.schema_version')
    LOG.info(cursor.statusmessage)

    LOG.info('setting baseline')
    cursor.execute('INSERT INTO public.schema_version '
                   '(version, type, description, installed_by) '
                   'VALUES (%s::bigint, %s, %s, %s)',
                   (text(baseline_v), 'manual',
                    'Forced baseline', user))
    LOG.info(cursor.statusmessage)
项目:pgmigrate    作者:yandex    | 项目源码 | 文件源码
def _get_statements(path):
    """
    Get statements from file
    """
    with codecs.open(path, encoding='utf-8') as i:
        data = i.read()
    if u'/* pgmigrate-encoding: utf-8 */' not in data:
        try:
            data.encode('ascii')
        except UnicodeError as exc:
            raise MalformedStatement(
                'Non ascii symbols in file: {0}, {1}'.format(
                    path, text(exc)))
    data = sqlparse.format(data, strip_comments=True)
    for statement in sqlparse.parsestream(data, encoding='utf-8'):
        st_str = text(statement).strip().encode('utf-8')
        if st_str:
            yield st_str
项目:pgmigrate    作者:yandex    | 项目源码 | 文件源码
def _parse_dict_callbacks(callbacks, ret, base_dir):
    for i in callbacks:
        if i in ret._fields:
            for j in callbacks[i]:
                path = os.path.join(base_dir, j)
                if not os.path.exists(path):
                    raise ConfigurationError(
                        'Path unavailable: {path}'.format(path=text(path)))
                if os.path.isdir(path):
                    for fname in sorted(os.listdir(path)):
                        getattr(ret, i).append(os.path.join(path, fname))
                else:
                    getattr(ret, i).append(path)
        else:
            raise ConfigurationError(
                'Unexpected callback type: {type}'.format(type=text(i)))

    return ret
项目:jatumba-backend    作者:YetAnotherTeam    | 项目源码 | 文件源码
def process_response(request, response):
        if request.GET.get('debug') == '':
            if response['Content-Type'] == 'application/octet-stream':
                new_content = ('<html><body>Binary Data, Length: {}</body></html>'
                               .format(len(response.content)))
                response = HttpResponse(new_content)
            elif response['Content-Type'] != 'text/html':
                content = response.content
                try:
                    json_ = json.loads(content)
                    content = json.dumps(json_, sort_keys=True, indent=2)
                except ValueError:
                    pass
                response = HttpResponse('<html><body><pre>{}</pre></body></html>'.format(content))

        return response
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:dustbunny    作者:Teamworksapp    | 项目源码 | 文件源码
def render_sql(db, q, inline=False):
    """
    Render the sql used by a query (only works for Postgres)

    :param q (query): an SQLAlchemy query object 
    :param inline (bool): inline parameters? 
    :return: str
    """
    compiled_statement = q.statement.compile(dialect=postgresql.dialect())
    pretty_statement = sql_format(str(compiled_statement), reindent=True)
    if inline:
        with db.session.connection().connection.connection.cursor() as cur:
            return cur.mogrify(pretty_statement, compiled_statement.params).decode('utf-8')
    else:
        return pretty_statement + ("\nparameters: {}".format(str(compiled_statement.params)) if compiled_statement.params else '')
项目:ddquery    作者:elinaldosoft    | 项目源码 | 文件源码
def test_sqlformatter(self):
        sqlformatter = SqlFormatter().format(self.model)
        sql_compare = 'SELECT "django_migrations"."app","django_migrations"."name" FROM "django_migrations"'
        sql_compare = sqlparse.format(sql_compare, reindent=True, keyword_case='upper')
        sql_compare = "(0.005ms) {0}".format(sql_compare)

        sql_compare = highlight(
            sql_compare,
            SqlLexer(),
            Terminal256Formatter(style='default')
        )

        self.assertEqual(sql_compare, sqlformatter)
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def prettify_sql(sql):
    if sql is None:
        return None
    return sqlparse.format(
        sql,
        keyword_case="upper",
        identfier_case="lower",
        strip_comments=False,
        reindent=True,
        indent_tabs=False)
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def get_safe_connection_string(conn):
    try:
        # 2.7+
        params = conn.get_dsn_parameters()
    except AttributeError:
        params = dict(i.split('=') for i in shlex.split(conn.dsn))
    return '{user}@{host}:{port}/{dbname}'.format(**params)
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def callproc(self, procname, vars=None):
        timestamp = time.time()
        try:
            return super(TaliskerCursor, self).callproc(procname, vars)
        finally:
            duration = (time.time() - timestamp) * 1000
            # no query parameters, cannot safely record
            self.connection._record(
                'stored proc: {}'.format(procname), None, duration)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def return_insert_id(self):
        """
        For backends that support returning the last insert ID as part
        of an insert query, this method returns the SQL and params to
        append to the INSERT query. The returned fragment should
        contain a format string to hold the appropriate column.
        """
        pass
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def get_db_converters(self, expression):
        """
        Get a list of functions needed to convert field data.

        Some field types on some backends do not provide data in the correct
        format, this is the hook for converter functions.
        """
        return []
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def get_perm(self):
        return (
            '[{obj.sql_table_name}]'
            '(id:{obj.id})').format(obj=self)
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def sql_table_name(self):
        return '{}_{}'.format(DB_ETL_PREFIX, self.name)
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def create_schema(self):
        logger.info('try to create schema {}'.format(self.schema))
        if self.exist_schema():
            return True
        return self.local_engine.execute(CreateSchema(self.schema))
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def delete_table(self):
        logger.info('try to delete table {} in {}'.format(
            self.sql_table_name,
            self.schema
        ))

        table = self.get_sql_table_object(need_columns=False)
        return self.local_engine.execute(DropTable(table))
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def remote_sql_count(self):
        """SQL QUERY FOR COUNT ROW IN REMOTE SOURCE"""
        try:
            return 'SELECT COUNT(*) AS rows_count FROM ({sql}) AS CHITER;'. \
                format(
                    sql=self.remote_etl_sql().compile(
                        compile_kwargs={"literal_binds": True})
                )
        except AttributeError:
            return 'SELECT COUNT(*) AS rows_count FROM ({sql}) AS CHITER;'. \
                format(
                    sql=self.remote_etl_sql()
                )
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def clear(self):
        """ Stop ETL TASK and Clear ETL local table """

        dt = datetime.utcnow().replace(
            microsecond=0
        )

        self.status = EtlStatus.STOPPED
        self.sync_last = ''
        self.sync_last_time = dt
        self.sync_periodic = 0
        self.is_scheduled = False
        # self.chunk_size = 0
        self.progress = 0
        self.sync_next_time = self.get_next_sync()

        db.session.merge(self)
        db.session.commit()

        table_name = self.sql_table_name

        if self.schema and self.sql_table_name:
            table_name = '{schema}.{table}'.format(
                schema=self.schema,
                table=self.sql_table_name
            )
        sql_truncate = 'TRUNCATE TABLE {} CONTINUE IDENTITY RESTRICT;'.format(
            table_name
        )

        with self.local_engine.connect() as local_conection:
            local_conection.execution_options(
                autocommit=True
            ).execute(sql_truncate)
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def clear_column_type(cls, column_type=''):
        if column_type:
            column_type = column_type.split(') ')
            l = len(column_type)
            column_type = column_type[0]
            if l:
                if l > 1:
                    column_type = '{})'.format(column_type)

            column_type = 'sa.{}'.format(column_type)

            if 'INTEGER' in column_type or 'TINYINT' in column_type\
                    or 'BIGINT' in column_type:
                column_type = 'sa.Integer()'

            if 'string' in column_type:
                column_type = 'sa.Text()'

            if 'OBJECT' in column_type:
                column_type = 'sa.Text()'

            if 'DATETIME' in column_type or 'TIMESTAMP WITHOUT TIME ZONE' \
                    in column_type or 'TIMESTAMP WITH TIME ZONE'\
                    in column_type:
                column_type = 'sa.DateTime()'

            if 'HSTORE' in column_type:
                # column_type = 'postgresql.HSTORE(text_type=sa.Text())'
                column_type = 'postgresql.HSTORE'
        else:
            column_type = 'sa.Text()'

        return column_type
项目:django-modeltrans    作者:zostera    | 项目源码 | 文件源码
def sqlformat(sql):
    '''
    Format SQL queries.
    '''
    return sqlparse.format(str(sql), reindent=True, wrap_after=120)
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def prepare_sql_script(self, sql, _allow_fallback=False):
        """
        Takes a SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        # Remove _allow_fallback and keep only 'return ...' in Django 1.9.
        try:
            # This import must stay inside the method because it's optional.
            import sqlparse
        except ImportError:
            if _allow_fallback:
                # Without sqlparse, fall back to the legacy (and buggy) logic.
                warnings.warn(
                    "Providing initial SQL data on a %s database will require "
                    "sqlparse in Django 1.9." % self.connection.vendor,
                    RemovedInDjango19Warning)
                from django.core.management.sql import _split_statements
                return _split_statements(sql)
            else:
                raise
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def return_insert_id(self):
        """
        For backends that support returning the last insert ID as part
        of an insert query, this method returns the SQL and params to
        append to the INSERT query. The returned fragment should
        contain a format string to hold the appropriate column.
        """
        pass
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def get_db_converters(self, expression):
        """Get a list of functions needed to convert field data.

        Some field types on some backends do not provide data in the correct
        format, this is the hook for coverter functions.
        """
        return []
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_keywordcase(self):
        sql = 'select * from bar; -- select foo\n'
        res = sqlparse.format(sql, keyword_case='upper')
        self.ndiffAssertEqual(res, 'SELECT * FROM bar; -- select foo\n')
        res = sqlparse.format(sql, keyword_case='capitalize')
        self.ndiffAssertEqual(res, 'Select * From bar; -- select foo\n')
        res = sqlparse.format(sql.upper(), keyword_case='lower')
        self.ndiffAssertEqual(res, 'select * from BAR; -- SELECT FOO\n')
        self.assertRaises(SQLParseError, sqlparse.format, sql,
                          keyword_case='foo')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_identifiercase(self):
        sql = 'select * from bar; -- select foo\n'
        res = sqlparse.format(sql, identifier_case='upper')
        self.ndiffAssertEqual(res, 'select * from BAR; -- select foo\n')
        res = sqlparse.format(sql, identifier_case='capitalize')
        self.ndiffAssertEqual(res, 'select * from Bar; -- select foo\n')
        res = sqlparse.format(sql.upper(), identifier_case='lower')
        self.ndiffAssertEqual(res, 'SELECT * FROM bar; -- SELECT FOO\n')
        self.assertRaises(SQLParseError, sqlparse.format, sql,
                          identifier_case='foo')
        sql = 'select * from "foo"."bar"'
        res = sqlparse.format(sql, identifier_case="upper")
        self.ndiffAssertEqual(res, 'select * from "foo"."bar"')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_strip_comments_single(self):
        sql = 'select *-- statement starts here\nfrom foo'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select * from foo')
        sql = 'select * -- statement starts here\nfrom foo'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select * from foo')
        sql = 'select-- foo\nfrom -- bar\nwhere'
        res = sqlparse.format(sql, strip_comments=True)
        self.ndiffAssertEqual(res, 'select from where')
        self.assertRaises(SQLParseError, sqlparse.format, sql,
                          strip_comments=None)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_strip_ws(self):
        f = lambda sql: sqlparse.format(sql, strip_whitespace=True)
        s = 'select\n* from      foo\n\twhere  ( 1 = 2 )\n'
        self.ndiffAssertEqual(f(s), 'select * from foo where (1 = 2)')
        s = 'select -- foo\nfrom    bar\n'
        self.ndiffAssertEqual(f(s), 'select -- foo\nfrom bar')
        self.assertRaises(SQLParseError, sqlparse.format, s,
                          strip_whitespace=None)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_outputformat(self):
        sql = 'select * from foo;'
        self.assertRaises(SQLParseError, sqlparse.format, sql,
                          output_format='foo')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_option(self):
        self.assertRaises(SQLParseError, sqlparse.format, 'foo',
                          reindent=2)
        self.assertRaises(SQLParseError, sqlparse.format, 'foo',
                          indent_tabs=2)
        self.assertRaises(SQLParseError, sqlparse.format, 'foo',
                          reindent=True, indent_width='foo')
        self.assertRaises(SQLParseError, sqlparse.format, 'foo',
                          reindent=True, indent_width=-12)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_stmts(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select foo; select bar'
        self.ndiffAssertEqual(f(s), 'select foo;\n\nselect bar')
        s = 'select foo'
        self.ndiffAssertEqual(f(s), 'select foo')
        s = 'select foo; -- test\n select bar'
        self.ndiffAssertEqual(f(s), 'select foo; -- test\n\nselect bar')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_keywords(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select * from foo union select * from bar;'
        self.ndiffAssertEqual(f(s), '\n'.join(['select *',
                                               'from foo',
                                               'union',
                                               'select *',
                                               'from bar;']))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_parenthesis(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select count(*) from (select * from foo);'
        self.ndiffAssertEqual(f(s),
                              '\n'.join(['select count(*)',
                                         'from',
                                         '  (select *',
                                         '   from foo);',
                                         ])
                              )
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_where(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select * from foo where bar = 1 and baz = 2 or bzz = 3;'
        self.ndiffAssertEqual(f(s), ('select *\nfrom foo\n'
                                     'where bar = 1\n'
                                     '  and baz = 2\n'
                                     '  or bzz = 3;'))
        s = 'select * from foo where bar = 1 and (baz = 2 or bzz = 3);'
        self.ndiffAssertEqual(f(s), ('select *\nfrom foo\n'
                                     'where bar = 1\n'
                                     '  and (baz = 2\n'
                                     '       or bzz = 3);'))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_identifier_list(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = 'select foo, bar, baz from table1, table2 where 1 = 2'
        self.ndiffAssertEqual(f(s), '\n'.join(['select foo,',
                                               '       bar,',
                                               '       baz',
                                               'from table1,',
                                               '     table2',
                                               'where 1 = 2']))
        s = 'select a.*, b.id from a, b'
        self.ndiffAssertEqual(f(s), '\n'.join(['select a.*,',
                                               '       b.id',
                                               'from a,',
                                               '     b']))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_identifier_list_with_functions(self):
        f = lambda sql: sqlparse.format(sql, reindent=True)
        s = ("select 'abc' as foo, coalesce(col1, col2)||col3 as bar,"
             "col3 from my_table")
        self.ndiffAssertEqual(f(s), '\n'.join(
            ["select 'abc' as foo,",
             "       coalesce(col1, col2)||col3 as bar,",
             "       col3",
             "from my_table"]))