我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用sqlparse.tokens()。
def parse_sql_tables(sql): tables = [] parsed = sqlparse.parse(sql) stmt = parsed[0] from_seen = False for token in stmt.tokens: if from_seen: if token.ttype is Keyword: continue else: if isinstance(token, IdentifierList): for identifier in token.get_identifiers(): tables.append(SQLParser.get_table_name(identifier)) elif isinstance(token, Identifier): tables.append(SQLParser.get_table_name(token)) else: pass if token.ttype is Keyword and token.value.upper() == "FROM": from_seen = True return tables
def test_simple(self): from cStringIO import StringIO stream = StringIO("SELECT 1; SELECT 2;") lex = lexer.Lexer() tokens = lex.get_tokens(stream) self.assertEqual(len(list(tokens)), 9) stream.seek(0) lex.bufsize = 4 tokens = list(lex.get_tokens(stream)) self.assertEqual(len(tokens), 9) stream.seek(0) lex.bufsize = len(stream.getvalue()) tokens = list(lex.get_tokens(stream)) self.assertEqual(len(tokens), 9)
def _gather_sql_placeholders(body): p = sqlparse.parse(body) assert len(p) == 1 p = p[0] tokens = list(p.flatten()) names = [x.value[1:] for x in tokens if x.ttype == Token.Name.Placeholder] return sorted(set(names))
def parse_sql_columns(sql): columns = [] parsed = sqlparse.parse(sql) stmt = parsed[0] for token in stmt.tokens: if isinstance(token, IdentifierList): for identifier in token.get_identifiers(): columns.append(identifier.get_real_name()) if isinstance(token, Identifier): columns.append(token.get_real_name()) if token.ttype is Keyword: # from break return columns
def is_subselect(parsed): if not parsed.is_group(): return False for item in parsed.tokens: if item.ttype is DML and item.value.upper() == 'SELECT': return True return False
def extract_from_part(parsed): from_seen = False for item in parsed.tokens: if from_seen: if is_subselect(item): for x in extract_from_part(item): yield x elif item.ttype is Keyword: raise StopIteration else: yield item elif item.ttype is Keyword and item.value.upper() == 'FROM': from_seen = True
def test_simple(self): s = 'select * from foo;' stream = lexer.tokenize(s) self.assert_(isinstance(stream, types.GeneratorType)) tokens = list(stream) self.assertEqual(len(tokens), 8) self.assertEqual(len(tokens[0]), 2) self.assertEqual(tokens[0], (Keyword.DML, u'select')) self.assertEqual(tokens[-1], (Punctuation, u';'))
def test_backticks(self): s = '`foo`.`bar`' tokens = list(lexer.tokenize(s)) self.assertEqual(len(tokens), 3) self.assertEqual(tokens[0], (Name, u'`foo`'))
def test_linebreaks(self): # issue1 s = 'foo\nbar\n' tokens = lexer.tokenize(s) self.assertEqual(''.join(str(x[1]) for x in tokens), s) s = 'foo\rbar\r' tokens = lexer.tokenize(s) self.assertEqual(''.join(str(x[1]) for x in tokens), s) s = 'foo\r\nbar\r\n' tokens = lexer.tokenize(s) self.assertEqual(''.join(str(x[1]) for x in tokens), s) s = 'foo\r\nbar\n' tokens = lexer.tokenize(s) self.assertEqual(''.join(str(x[1]) for x in tokens), s)
def test_negative_numbers(self): s = "values(-1)" tokens = list(lexer.tokenize(s)) self.assertEqual(len(tokens), 4) self.assertEqual(tokens[2][0], Number.Integer) self.assertEqual(tokens[2][1], '-1') # Somehow this test fails on Python 3.2
def test_tab_expansion(self): s = "\t" lex = lexer.Lexer() lex.tabsize = 5 tokens = list(lex.get_tokens(s)) self.assertEqual(tokens[0][1], " " * 5)
def test_repr(self): p = sqlparse.parse('foo, bar, baz')[0] tst = "<IdentifierList 'foo, b...' at 0x" self.assertEqual(repr(p.tokens[0])[:len(tst)], tst)
def test_error(self): from cStringIO import StringIO stream = StringIO("FOOBAR{") lex = lexer.Lexer() lex.bufsize = 4 tokens = list(lex.get_tokens(stream)) self.assertEqual(len(tokens), 2) self.assertEqual(tokens[1][0], Error)
def test_parse_endifloop(): p = sqlparse.parse('END IF')[0] assert len(p.tokens) == 1 assert p.tokens[0].ttype is Keyword p = sqlparse.parse('END IF')[0] assert len(p.tokens) == 1 p = sqlparse.parse('END\t\nIF')[0] assert len(p.tokens) == 1 assert p.tokens[0].ttype is Keyword p = sqlparse.parse('END LOOP')[0] assert len(p.tokens) == 1 assert p.tokens[0].ttype is Keyword p = sqlparse.parse('END LOOP')[0] assert len(p.tokens) == 1 assert p.tokens[0].ttype is Keyword
def is_subselect(self,parsed): if not parsed.is_group: return False for item in parsed.tokens: if item.ttype is DML and item.value.upper() == 'SELECT': return True return False
def extract_from_part(self,parsed): from_seen = False for item in parsed.tokens: if from_seen: if self.is_subselect(item): for x in self.extract_from_part(item): yield x elif item.ttype is Keyword: raise StopIteration else: yield item elif item.ttype is Keyword and item.value.upper() == 'FROM': from_seen = True
def debug_tokens(tokens): for t in tokens: LOG.debug(' %r %s', t, type(t))
def debug(f): def wrapped(*args, **kwargs): debug_args = [] for a in args: if is_tokens(a): debug_args.append("[<%s tokens>]" % len(a)) else: debug_args.append("%r" % a) args_str = " ".join(str(a) for a in debug_args) kwargs_str = " ".join("%s=%s" for k, v in kwargs.items()) LOG.debug("%s %s", f.__name__, args_str + kwargs_str) # try to find tokens if 'tokens' in kwargs: if is_tokens(kwargs['tokens']): debug_tokens(kwargs['tokens']) for a in args: if is_tokens(a): debug_tokens(a) result = f(*args, **kwargs) if result is not None: LOG.debug("%s returned %r", f.__name__, result) return result return wrapped
def remove_whitespace(tokens): return [x for x in tokens if not x.is_whitespace]
def to_sqla(sql): tokens = sqlparse.parse(sql)[0].tokens tokens = remove_whitespace(tokens) return tokens_to_sqla(tokens).render()
def build_comparison(tok): assert type(tok) is S.Comparison m = M.Comparison() for tok in remove_whitespace(tok.tokens): LOG.debug(" %s %s", tok, type(tok)) m = sql_literal_to_model(tok, m) if not m: raise Exception("[BUG] Failed to convert %s to model" % tok) return m
def group_tokens(parsed): grouped = defaultdict(list) identifiers = [] for token in parsed.tokens: if token.ttype: grouped[token.ttype].append(token.value) elif token.get_name(): identifiers.append(token) return grouped, identifiers
def find_prev_keyword(sql, n_skip=0): """ Find the last sql keyword in an SQL statement. Returns the value of the last keyword, and the text of the query with everything after the last keyword stripped. """ if not sql.strip(): return None, '' parsed = sqlparse.parse(sql)[0] flattened = list(parsed.flatten()) flattened = flattened[:len(flattened) - n_skip] logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN') for t in reversed(flattened): if t.value == '(' or (t.is_keyword and ( t.value.upper() not in logical_operators)): # Find the location of token t in the original parsed statement # We can't use parsed.token_index(t) because t may be a child token # inside a TokenList, in which case token_index thows an error # Minimal example: # p = sqlparse.parse('select * from foo where bar') # t = list(p.flatten())[-3] # The "Where" token # p.token_index(t) # Throws ValueError: not in list idx = flattened.index(t) # Combine the string values of all tokens in the original list # up to and including the target keyword token t, to produce a # query string with everything after the keyword token removed text = ''.join(tok.value for tok in flattened[:idx + 1]) return t, text return None, '' # Postgresql dollar quote signs look like `$$` or `$tag$`
def parse_partial_identifier(word): """ Attempt to parse a (partially typed) word as an identifier. word may include a schema qualification, like `schema_name.partial_name` or `schema_name.` There may also be unclosed quotation marks, like `"schema` or `schema."partial_name`. :param word: string representing a (partially complete) identifier :return: sqlparse.sql.Identifier, or None """ p = sqlparse.parse(word)[0] n_tok = len(p.tokens) if n_tok == 1 and isinstance(p.tokens[0], Identifier): return p.tokens[0] elif p.token_next_by(m=(Error, '"'))[1]: # An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar' # Close the double quote, then reparse return parse_partial_identifier(word + '"') else: return None
def is_subselect(parsed): if not parsed.is_group: return False for item in parsed.tokens: if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT', 'CREATE'): return True return False
def _identifier_is_function(identifier): return any(isinstance(t, Function) for t in identifier.tokens)
def extract_from_part(parsed, stop_at_punctuation=True): tbl_prefix_seen = False for item in parsed.tokens: if tbl_prefix_seen: if is_subselect(item): for x in extract_from_part(item, stop_at_punctuation): yield x elif stop_at_punctuation and item.ttype is Punctuation: raise StopIteration # An incomplete nested select won't be recognized correctly as a # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes # the second FROM to trigger this elif condition resulting in a # StopIteration. So we need to ignore the keyword if the keyword # FROM. # Also 'SELECT * FROM abc JOIN def' will trigger this elif # condition. So we need to ignore the keyword JOIN and its variants # INNER JOIN, FULL OUTER JOIN, etc. elif item.ttype is Keyword and ( not item.value.upper() == 'FROM') and ( not item.value.upper().endswith('JOIN')): tbl_prefix_seen = False else: yield item elif item.ttype is Keyword or item.ttype is Keyword.DML: item_val = item.value.upper() if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or item_val.endswith('JOIN')): tbl_prefix_seen = True # 'SELECT a, FROM abc' will detect FROM as part of the column list. # So this check here is necessary. elif isinstance(item, IdentifierList): for identifier in item.get_identifiers(): if (identifier.ttype is Keyword and identifier.value.upper() == 'FROM'): tbl_prefix_seen = True break
def discoverComponents(sql): """Search for installable/removable components within a string containing one or more SQL statemnets""" components = [] parsed = sqlparse.parse(sql) for statement in parsed: for token in statement.tokens: name = None typ = None # remove newlines, extra spaces for regex stmt = str(statement).replace("\n", " ") stmt = " ".join(stmt.split()) for comp in sqlComponents: if token.match(Keyword, comp.typ): name = comp.match(stmt) typ = comp.typ if name is not None: component = AppComponent(name, typ) if component not in components: components.append(component) # sort alphabetically, should fix drop issues when 'rule on table' # is dropped before 'table' return sorted(components, key=lambda x: x.typ)