我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用sqlparse.sql()。
def parse_sql_tables(sql): tables = [] parsed = sqlparse.parse(sql) stmt = parsed[0] from_seen = False for token in stmt.tokens: if from_seen: if token.ttype is Keyword: continue else: if isinstance(token, IdentifierList): for identifier in token.get_identifiers(): tables.append(SQLParser.get_table_name(identifier)) elif isinstance(token, Identifier): tables.append(SQLParser.get_table_name(token)) else: pass if token.ttype is Keyword and token.value.upper() == "FROM": from_seen = True return tables
def test_placeholder(self): def _get_tokens(sql): return sqlparse.parse(sql)[0].tokens[-1].tokens t = _get_tokens('select * from foo where user = ?') self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder) self.assertEqual(t[-1].value, '?') t = _get_tokens('select * from foo where user = :1') self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder) self.assertEqual(t[-1].value, ':1') t = _get_tokens('select * from foo where user = :name') self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder) self.assertEqual(t[-1].value, ':name') t = _get_tokens('select * from foo where user = %s') self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder) self.assertEqual(t[-1].value, '%s') t = _get_tokens('select * from foo where user = $a') self.assert_(t[-1].ttype is sqlparse.tokens.Name.Placeholder) self.assertEqual(t[-1].value, '$a')
def extract_tables(): stream = extract_from_part(sqlparse.parse(sql)[0]) return list(extract_table_identifiers(stream))
def _allow_join_condition(statement): """ Tests if a join condition should be suggested We need this to avoid bad suggestions when entering e.g. select * from tbl1 a join tbl2 b on a.id = <cursor> So check that the preceding token is a ON, AND, or OR keyword, instead of e.g. an equals sign. :param statement: an sqlparse.sql.Statement :return: boolean """ if not statement or not statement.tokens: return False last_tok = statement.token_prev(len(statement.tokens))[1] return last_tok.value.lower() in ('on', 'and', 'or')
def _allow_join(statement): """ Tests if a join should be suggested We need this to avoid bad suggestions when entering e.g. select * from tbl1 a join tbl2 b <cursor> So check that the preceding token is a JOIN keyword :param statement: an sqlparse.sql.Statement :return: boolean """ if not statement or not statement.tokens: return False last_tok = statement.token_prev(len(statement.tokens))[1] return ( last_tok.value.lower().endswith('join') and last_tok.value.lower() not in( 'cross join', 'natural join'))
def parse_sql_columns(sql): columns = [] parsed = sqlparse.parse(sql) stmt = parsed[0] for token in stmt.tokens: if isinstance(token, IdentifierList): for identifier in token.get_identifiers(): columns.append(identifier.get_real_name()) if isinstance(token, Identifier): columns.append(token.get_real_name()) if token.ttype is Keyword: # from break return columns
def test_tokenize(self): sql = 'select * from foo;' stmts = sqlparse.parse(sql) self.assertEqual(len(stmts), 1) self.assertEqual(str(stmts[0]), sql)
def test_newlines(self): sql = u'select\n*from foo;' p = sqlparse.parse(sql)[0] self.assertEqual(unicode(p), sql) sql = u'select\r\n*from foo' p = sqlparse.parse(sql)[0] self.assertEqual(unicode(p), sql) sql = u'select\r*from foo' p = sqlparse.parse(sql)[0] self.assertEqual(unicode(p), sql) sql = u'select\r\n*from foo\n' p = sqlparse.parse(sql)[0] self.assertEqual(unicode(p), sql)
def test_within(self): sql = 'foo(col1, col2)' p = sqlparse.parse(sql)[0] col1 = p.tokens[0].tokens[1].tokens[1].tokens[0] self.assert_(col1.within(sqlparse.sql.Function))
def test_child_of(self): sql = '(col1, col2)' p = sqlparse.parse(sql)[0] self.assert_(p.tokens[0].tokens[1].is_child_of(p.tokens[0])) sql = 'select foo' p = sqlparse.parse(sql)[0] self.assert_(not p.tokens[2].is_child_of(p.tokens[0])) self.assert_(p.tokens[2].is_child_of(p))
def test_access_symbol(self): # see issue27 t = sqlparse.parse('select a.[foo bar] as foo')[0].tokens self.assert_(isinstance(t[-1], sqlparse.sql.Identifier)) self.assertEqual(t[-1].get_name(), 'foo') self.assertEqual(t[-1].get_real_name(), '[foo bar]') self.assertEqual(t[-1].get_parent_name(), 'a')
def test_square_brackets_notation_isnt_too_greedy(self): # see issue153 t = sqlparse.parse('[foo], [bar]')[0].tokens self.assert_(isinstance(t[0], sqlparse.sql.IdentifierList)) self.assertEqual(len(t[0].tokens), 4) self.assertEqual(t[0].tokens[0].get_real_name(), '[foo]') self.assertEqual(t[0].tokens[-1].get_real_name(), '[bar]')
def test_keyword_like_identifier(self): # see issue47 t = sqlparse.parse('foo.key')[0].tokens self.assertEqual(len(t), 1) self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
def test_function_parameter(self): # see issue94 t = sqlparse.parse('abs(some_col)')[0].tokens[0].get_parameters() self.assertEqual(len(t), 1) self.assert_(isinstance(t[0], sqlparse.sql.Identifier))
def test_quoted_identifier(): t = sqlparse.parse('select x.y as "z" from foo')[0].tokens assert isinstance(t[2], sqlparse.sql.Identifier) assert t[2].get_name() == 'z' assert t[2].get_real_name() == 'y'
def test_valid_identifier_names(name): # issue175 t = sqlparse.parse(name)[0].tokens assert isinstance(t[0], sqlparse.sql.Identifier)
def test_double_precision_is_builtin(): sql = 'DOUBLE PRECISION' t = sqlparse.parse(sql)[0].tokens assert (len(t) == 1 and t[0].ttype == sqlparse.tokens.Name.Builtin and t[0].value == 'DOUBLE PRECISION')
def test_double_quotes_are_identifiers(): p = sqlparse.parse('"foo"')[0].tokens assert len(p) == 1 assert isinstance(p[0], sqlparse.sql.Identifier)
def test_sqlite_identifiers(): # Make sure we still parse sqlite style escapes p = sqlparse.parse('[col1],[col2]')[0].tokens assert (len(p) == 1 and isinstance(p[0], sqlparse.sql.IdentifierList) and [id.get_name() for id in p[0].get_identifiers()] == ['[col1]', '[col2]']) p = sqlparse.parse('[col1]+[col2]')[0] types = [tok.ttype for tok in p.flatten()] assert types == [T.Name, T.Operator, T.Name]
def test_single_line_comments(sql): p = sqlparse.parse(sql)[0] assert len(p.tokens) == 5 assert p.tokens[-1].ttype == T.Comment.Single
def __init__(self,sql=None): self.sql = sql
def extract_tables(self): # print sqlparse.parse(self.sql)[0] stream = self.extract_from_part(sqlparse.parse(self.sql)[0]) # print stream return list(self.extract_table_identifiers(stream)) #black white list check
def check_query_table(self,dbtag,username): list = Tb_blacklist.objects.filter(dbtag=dbtag) if list : #user in white list if User.objects.get(username=username) in list[0].user_permit.all(): existTb=[] else : #if table in black list blackTblist = list[0].tbname.split(',') parser = Sqlparse(self.sql) tblist = parser.extract_tables() existTb = [val for val in blackTblist if val in tblist] if existTb: return True,existTb return False,[]
def to_sqla(sql): tokens = sqlparse.parse(sql)[0].tokens tokens = remove_whitespace(tokens) return tokens_to_sqla(tokens).render()
def find_prev_keyword(sql, n_skip=0): """ Find the last sql keyword in an SQL statement. Returns the value of the last keyword, and the text of the query with everything after the last keyword stripped. """ if not sql.strip(): return None, '' parsed = sqlparse.parse(sql)[0] flattened = list(parsed.flatten()) flattened = flattened[:len(flattened) - n_skip] logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN') for t in reversed(flattened): if t.value == '(' or (t.is_keyword and ( t.value.upper() not in logical_operators)): # Find the location of token t in the original parsed statement # We can't use parsed.token_index(t) because t may be a child token # inside a TokenList, in which case token_index thows an error # Minimal example: # p = sqlparse.parse('select * from foo where bar') # t = list(p.flatten())[-3] # The "Where" token # p.token_index(t) # Throws ValueError: not in list idx = flattened.index(t) # Combine the string values of all tokens in the original list # up to and including the target keyword token t, to produce a # query string with everything after the keyword token removed text = ''.join(tok.value for tok in flattened[:idx + 1]) return t, text return None, '' # Postgresql dollar quote signs look like `$$` or `$tag$`
def is_open_quote(sql): """Returns true if the query contains an unclosed quote.""" # parsed can contain one or more semi-colon separated commands parsed = sqlparse.parse(sql) return any(_parsed_is_open_quote(p) for p in parsed)
def parse_partial_identifier(word): """ Attempt to parse a (partially typed) word as an identifier. word may include a schema qualification, like `schema_name.partial_name` or `schema_name.` There may also be unclosed quotation marks, like `"schema` or `schema."partial_name`. :param word: string representing a (partially complete) identifier :return: sqlparse.sql.Identifier, or None """ p = sqlparse.parse(word)[0] n_tok = len(p.tokens) if n_tok == 1 and isinstance(p.tokens[0], Identifier): return p.tokens[0] elif p.token_next_by(m=(Error, '"'))[1]: # An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar' # Close the double quote, then reparse return parse_partial_identifier(word + '"') else: return None