Python sqlparse 模块,sql() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用sqlparse.sql()

项目:etl_schedule    作者:yxl040840219    | 项目源码 | 文件源码
def parse_sql_tables(sql):
        tables = []
        parsed = sqlparse.parse(sql)
        stmt = parsed[0]
        from_seen = False
        for token in stmt.tokens:
            if from_seen:
                if token.ttype is Keyword:
                    continue
                else:
                    if isinstance(token, IdentifierList):
                        for identifier in token.get_identifiers():
                            tables.append(SQLParser.get_table_name(identifier))
                    elif isinstance(token, Identifier):
                        tables.append(SQLParser.get_table_name(token))
                    else:
                        pass
            if token.ttype is Keyword and token.value.upper() == "FROM":
                from_seen = True
        return tables
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_placeholder(self):
        def _get_tokens(sql):
            return sqlparse.parse(sql)[0].tokens[-1].tokens
        t = _get_tokens('select * from foo where user = ?')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, '?')
        t = _get_tokens('select * from foo where user = :1')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, ':1')
        t = _get_tokens('select * from foo where user = :name')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, ':name')
        t = _get_tokens('select * from foo where user = %s')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, '%s')
        t = _get_tokens('select * from foo where user = $a')
        self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder)
        self.assertEqual(t[-1].value, '$a')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def extract_tables():
    stream = extract_from_part(sqlparse.parse(sql)[0])
    return list(extract_table_identifiers(stream))
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def _allow_join_condition(statement):
    """
    Tests if a join condition should be suggested

    We need this to avoid bad suggestions when entering e.g.
        select * from tbl1 a join tbl2 b on a.id = <cursor>
    So check that the preceding token is a ON, AND, or OR keyword, instead of
    e.g. an equals sign.

    :param statement: an sqlparse.sql.Statement
    :return: boolean
    """

    if not statement or not statement.tokens:
        return False

    last_tok = statement.token_prev(len(statement.tokens))[1]
    return last_tok.value.lower() in ('on', 'and', 'or')
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def _allow_join(statement):
    """
    Tests if a join should be suggested

    We need this to avoid bad suggestions when entering e.g.
        select * from tbl1 a join tbl2 b <cursor>
    So check that the preceding token is a JOIN keyword

    :param statement: an sqlparse.sql.Statement
    :return: boolean
    """

    if not statement or not statement.tokens:
        return False

    last_tok = statement.token_prev(len(statement.tokens))[1]
    return (
        last_tok.value.lower().endswith('join') and last_tok.value.lower() not in(
            'cross join', 'natural join'))
项目:etl_schedule    作者:yxl040840219    | 项目源码 | 文件源码
def parse_sql_columns(sql):
        columns = []
        parsed = sqlparse.parse(sql)
        stmt = parsed[0]
        for token in stmt.tokens:
            if isinstance(token, IdentifierList):
                for identifier in token.get_identifiers():
                    columns.append(identifier.get_real_name())
            if isinstance(token, Identifier):
                columns.append(token.get_real_name())
            if token.ttype is Keyword:  # from
                break
        return columns
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_tokenize(self):
        sql = 'select * from foo;'
        stmts = sqlparse.parse(sql)
        self.assertEqual(len(stmts), 1)
        self.assertEqual(str(stmts[0]), sql)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_newlines(self):
        sql = u'select\n*from foo;'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
        sql = u'select\r\n*from foo'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
        sql = u'select\r*from foo'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
        sql = u'select\r\n*from foo\n'
        p = sqlparse.parse(sql)[0]
        self.assertEqual(unicode(p), sql)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_within(self):
        sql = 'foo(col1, col2)'
        p = sqlparse.parse(sql)[0]
        col1 = p.tokens[0].tokens[1].tokens[1].tokens[0]
        self.assert_(col1.within(sqlparse.sql.Function))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_child_of(self):
        sql = '(col1, col2)'
        p = sqlparse.parse(sql)[0]
        self.assert_(p.tokens[0].tokens[1].is_child_of(p.tokens[0]))
        sql = 'select foo'
        p = sqlparse.parse(sql)[0]
        self.assert_(not p.tokens[2].is_child_of(p.tokens[0]))
        self.assert_(p.tokens[2].is_child_of(p))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_access_symbol(self):  # see issue27
        t = sqlparse.parse('select a.[foo bar] as foo')[0].tokens
        self.assert_(isinstance(t[-1], sqlparse.sql.Identifier))
        self.assertEqual(t[-1].get_name(), 'foo')
        self.assertEqual(t[-1].get_real_name(), '[foo bar]')
        self.assertEqual(t[-1].get_parent_name(), 'a')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_square_brackets_notation_isnt_too_greedy(self):  # see issue153
        t = sqlparse.parse('[foo], [bar]')[0].tokens
        self.assert_(isinstance(t[0], sqlparse.sql.IdentifierList))
        self.assertEqual(len(t[0].tokens), 4)
        self.assertEqual(t[0].tokens[0].get_real_name(), '[foo]')
        self.assertEqual(t[0].tokens[-1].get_real_name(), '[bar]')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_keyword_like_identifier(self):  # see issue47
        t = sqlparse.parse('foo.key')[0].tokens
        self.assertEqual(len(t), 1)
        self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_function_parameter(self):  # see issue94
        t = sqlparse.parse('abs(some_col)')[0].tokens[0].get_parameters()
        self.assertEqual(len(t), 1)
        self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_quoted_identifier():
    t = sqlparse.parse('select x.y as "z" from foo')[0].tokens
    assert isinstance(t[2], sqlparse.sql.Identifier)
    assert t[2].get_name() == 'z'
    assert t[2].get_real_name() == 'y'
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_valid_identifier_names(name):  # issue175
    t = sqlparse.parse(name)[0].tokens
    assert isinstance(t[0], sqlparse.sql.Identifier)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_double_precision_is_builtin():
    sql = 'DOUBLE PRECISION'
    t = sqlparse.parse(sql)[0].tokens
    assert (len(t) == 1
            and t[0].ttype == sqlparse.tokens.Name.Builtin
            and t[0].value == 'DOUBLE PRECISION')
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_double_quotes_are_identifiers():
    p = sqlparse.parse('"foo"')[0].tokens
    assert len(p) == 1
    assert isinstance(p[0], sqlparse.sql.Identifier)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_sqlite_identifiers():
    # Make sure we still parse sqlite style escapes
    p = sqlparse.parse('[col1],[col2]')[0].tokens
    assert (len(p) == 1
            and isinstance(p[0], sqlparse.sql.IdentifierList)
            and [id.get_name() for id in p[0].get_identifiers()]
                    == ['[col1]', '[col2]'])

    p = sqlparse.parse('[col1]+[col2]')[0]
    types = [tok.ttype for tok in p.flatten()]
    assert types == [T.Name, T.Operator, T.Name]
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_single_line_comments(sql):
    p = sqlparse.parse(sql)[0]
    assert len(p.tokens) == 5
    assert p.tokens[-1].ttype == T.Comment.Single
项目:db_platform    作者:speedocjx    | 项目源码 | 文件源码
def __init__(self,sql=None):
        self.sql = sql
项目:db_platform    作者:speedocjx    | 项目源码 | 文件源码
def extract_tables(self):
        # print sqlparse.parse(self.sql)[0]
        stream = self.extract_from_part(sqlparse.parse(self.sql)[0])
        # print stream
        return list(self.extract_table_identifiers(stream))

    #black white list check
项目:db_platform    作者:speedocjx    | 项目源码 | 文件源码
def check_query_table(self,dbtag,username):
        list = Tb_blacklist.objects.filter(dbtag=dbtag)
        if list :
            #user in white list
            if User.objects.get(username=username) in list[0].user_permit.all():
                existTb=[]
            else :
                #if table in black list
                blackTblist = list[0].tbname.split(',')
                parser = Sqlparse(self.sql)
                tblist = parser.extract_tables()
                existTb = [val for val in blackTblist if val in tblist]
            if existTb:
                return True,existTb
        return False,[]
项目:sqlitis    作者:pglass    | 项目源码 | 文件源码
def to_sqla(sql):
    tokens = sqlparse.parse(sql)[0].tokens
    tokens = remove_whitespace(tokens)
    return tokens_to_sqla(tokens).render()
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def find_prev_keyword(sql, n_skip=0):
    """
    Find the last sql keyword in an SQL statement.

    Returns the value of the last keyword, and the text of the query with
    everything after the last keyword stripped.
    """
    if not sql.strip():
        return None, ''

    parsed = sqlparse.parse(sql)[0]
    flattened = list(parsed.flatten())
    flattened = flattened[:len(flattened) - n_skip]

    logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN')

    for t in reversed(flattened):
        if t.value == '(' or (t.is_keyword and (
                              t.value.upper() not in logical_operators)):
            # Find the location of token t in the original parsed statement
            # We can't use parsed.token_index(t) because t may be a child token
            # inside a TokenList, in which case token_index thows an error
            # Minimal example:
            #   p = sqlparse.parse('select * from foo where bar')
            #   t = list(p.flatten())[-3]  # The "Where" token
            #   p.token_index(t)  # Throws ValueError: not in list
            idx = flattened.index(t)

            # Combine the string values of all tokens in the original list
            # up to and including the target keyword token t, to produce a
            # query string with everything after the keyword token removed
            text = ''.join(tok.value for tok in flattened[:idx + 1])
            return t, text

    return None, ''


# Postgresql dollar quote signs look like `$$` or `$tag$`
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def is_open_quote(sql):
    """Returns true if the query contains an unclosed quote."""

    # parsed can contain one or more semi-colon separated commands
    parsed = sqlparse.parse(sql)
    return any(_parsed_is_open_quote(p) for p in parsed)
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def parse_partial_identifier(word):
    """
    Attempt to parse a (partially typed) word as an identifier.

    word may include a schema qualification, like `schema_name.partial_name`
    or `schema_name.` There may also be unclosed quotation marks, like
    `"schema` or `schema."partial_name`.

    :param word: string representing a (partially complete) identifier
    :return: sqlparse.sql.Identifier, or None
    """

    p = sqlparse.parse(word)[0]
    n_tok = len(p.tokens)
    if n_tok == 1 and isinstance(p.tokens[0], Identifier):
        return p.tokens[0]
    elif p.token_next_by(m=(Error, '"'))[1]:
        # An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar'
        # Close the double quote, then reparse
        return parse_partial_identifier(word + '"')
    else:
        return None