Python sqlparse 模块,tokens() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用sqlparse.tokens()

项目:etl_schedule    作者:yxl040840219    | 项目源码 | 文件源码
def parse_sql_tables(sql):
        tables = []
        parsed = sqlparse.parse(sql)
        stmt = parsed[0]
        from_seen = False
        for token in stmt.tokens:
            if from_seen:
                if token.ttype is Keyword:
                    continue
                else:
                    if isinstance(token, IdentifierList):
                        for identifier in token.get_identifiers():
                            tables.append(SQLParser.get_table_name(identifier))
                    elif isinstance(token, Identifier):
                        tables.append(SQLParser.get_table_name(token))
                    else:
                        pass
            if token.ttype is Keyword and token.value.upper() == "FROM":
                from_seen = True
        return tables
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_simple(self):
        from cStringIO import StringIO

        stream = StringIO("SELECT 1; SELECT 2;")
        lex = lexer.Lexer()

        tokens = lex.get_tokens(stream)
        self.assertEqual(len(list(tokens)), 9)

        stream.seek(0)
        lex.bufsize = 4
        tokens = list(lex.get_tokens(stream))
        self.assertEqual(len(tokens), 9)

        stream.seek(0)
        lex.bufsize = len(stream.getvalue())
        tokens = list(lex.get_tokens(stream))
        self.assertEqual(len(tokens), 9)
项目:baconql    作者:lsenta    | 项目源码 | 文件源码
def _gather_sql_placeholders(body):
    p = sqlparse.parse(body)
    assert len(p) == 1
    p = p[0]
    tokens = list(p.flatten())
    names = [x.value[1:] for x in tokens if x.ttype == Token.Name.Placeholder]
    return sorted(set(names))
项目:etl_schedule    作者:yxl040840219    | 项目源码 | 文件源码
def parse_sql_columns(sql):
        columns = []
        parsed = sqlparse.parse(sql)
        stmt = parsed[0]
        for token in stmt.tokens:
            if isinstance(token, IdentifierList):
                for identifier in token.get_identifiers():
                    columns.append(identifier.get_real_name())
            if isinstance(token, Identifier):
                columns.append(token.get_real_name())
            if token.ttype is Keyword:  # from
                break
        return columns
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def is_subselect(parsed):
    if not parsed.is_group():
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() == 'SELECT':
            return True
    return False
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def extract_from_part(parsed):
    from_seen = False
    for item in parsed.tokens:
        if from_seen:
            if is_subselect(item):
                for x in extract_from_part(item):
                    yield x
            elif item.ttype is Keyword:
                raise StopIteration
            else:
                yield item
        elif item.ttype is Keyword and item.value.upper() == 'FROM':
            from_seen = True
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_simple(self):
        s = 'select * from foo;'
        stream = lexer.tokenize(s)
        self.assert_(isinstance(stream, types.GeneratorType))
        tokens = list(stream)
        self.assertEqual(len(tokens), 8)
        self.assertEqual(len(tokens[0]), 2)
        self.assertEqual(tokens[0], (Keyword.DML, u'select'))
        self.assertEqual(tokens[-1], (Punctuation, u';'))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_backticks(self):
        s = '`foo`.`bar`'
        tokens = list(lexer.tokenize(s))
        self.assertEqual(len(tokens), 3)
        self.assertEqual(tokens[0], (Name, u'`foo`'))
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_linebreaks(self):  # issue1
        s = 'foo\nbar\n'
        tokens = lexer.tokenize(s)
        self.assertEqual(''.join(str(x[1]) for x in tokens), s)
        s = 'foo\rbar\r'
        tokens = lexer.tokenize(s)
        self.assertEqual(''.join(str(x[1]) for x in tokens), s)
        s = 'foo\r\nbar\r\n'
        tokens = lexer.tokenize(s)
        self.assertEqual(''.join(str(x[1]) for x in tokens), s)
        s = 'foo\r\nbar\n'
        tokens = lexer.tokenize(s)
        self.assertEqual(''.join(str(x[1]) for x in tokens), s)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_negative_numbers(self):
        s = "values(-1)"
        tokens = list(lexer.tokenize(s))
        self.assertEqual(len(tokens), 4)
        self.assertEqual(tokens[2][0], Number.Integer)
        self.assertEqual(tokens[2][1], '-1')

    # Somehow this test fails on Python 3.2
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_tab_expansion(self):
        s = "\t"
        lex = lexer.Lexer()
        lex.tabsize = 5
        tokens = list(lex.get_tokens(s))
        self.assertEqual(tokens[0][1], " " * 5)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_repr(self):
        p = sqlparse.parse('foo, bar, baz')[0]
        tst = "<IdentifierList 'foo, b...' at 0x"
        self.assertEqual(repr(p.tokens[0])[:len(tst)], tst)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_error(self):
        from cStringIO import StringIO

        stream = StringIO("FOOBAR{")

        lex = lexer.Lexer()
        lex.bufsize = 4
        tokens = list(lex.get_tokens(stream))
        self.assertEqual(len(tokens), 2)
        self.assertEqual(tokens[1][0], Error)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_parse_endifloop():
    p = sqlparse.parse('END IF')[0]
    assert len(p.tokens) == 1
    assert p.tokens[0].ttype is Keyword
    p = sqlparse.parse('END   IF')[0]
    assert len(p.tokens) == 1
    p = sqlparse.parse('END\t\nIF')[0]
    assert len(p.tokens) == 1
    assert p.tokens[0].ttype is Keyword
    p = sqlparse.parse('END LOOP')[0]
    assert len(p.tokens) == 1
    assert p.tokens[0].ttype is Keyword
    p = sqlparse.parse('END  LOOP')[0]
    assert len(p.tokens) == 1
    assert p.tokens[0].ttype is Keyword
项目:db_platform    作者:speedocjx    | 项目源码 | 文件源码
def is_subselect(self,parsed):
        if not parsed.is_group:
            return False
        for item in parsed.tokens:
            if item.ttype is DML and item.value.upper() == 'SELECT':
                return True
        return False
项目:db_platform    作者:speedocjx    | 项目源码 | 文件源码
def extract_from_part(self,parsed):
        from_seen = False
        for item in parsed.tokens:
            if from_seen:
                if self.is_subselect(item):
                    for x in self.extract_from_part(item):
                        yield x
                elif item.ttype is Keyword:
                    raise StopIteration
                else:
                    yield item
            elif item.ttype is Keyword and item.value.upper() == 'FROM':
                from_seen = True
项目:sqlitis    作者:pglass    | 项目源码 | 文件源码
def debug_tokens(tokens):
    for t in tokens:
        LOG.debug('  %r %s', t, type(t))
项目:sqlitis    作者:pglass    | 项目源码 | 文件源码
def debug(f):

    def wrapped(*args, **kwargs):
        debug_args = []
        for a in args:
            if is_tokens(a):
                debug_args.append("[<%s tokens>]" % len(a))
            else:
                debug_args.append("%r" % a)

        args_str = " ".join(str(a) for a in debug_args)
        kwargs_str = " ".join("%s=%s" for k, v in kwargs.items())
        LOG.debug("%s %s", f.__name__, args_str + kwargs_str)

        # try to find tokens
        if 'tokens' in kwargs:
            if is_tokens(kwargs['tokens']):
                debug_tokens(kwargs['tokens'])
        for a in args:
            if is_tokens(a):
                debug_tokens(a)

        result = f(*args, **kwargs)
        if result is not None:
            LOG.debug("%s returned %r", f.__name__, result)
        return result

    return wrapped
项目:sqlitis    作者:pglass    | 项目源码 | 文件源码
def remove_whitespace(tokens):
    return [x for x in tokens if not x.is_whitespace]
项目:sqlitis    作者:pglass    | 项目源码 | 文件源码
def to_sqla(sql):
    tokens = sqlparse.parse(sql)[0].tokens
    tokens = remove_whitespace(tokens)
    return tokens_to_sqla(tokens).render()
项目:sqlitis    作者:pglass    | 项目源码 | 文件源码
def build_comparison(tok):
    assert type(tok) is S.Comparison

    m = M.Comparison()
    for tok in remove_whitespace(tok.tokens):
        LOG.debug("  %s %s", tok, type(tok))
        m = sql_literal_to_model(tok, m)
        if not m:
            raise Exception("[BUG] Failed to convert %s to model" % tok)

    return m
项目:django-boardinghouse    作者:schinckel    | 项目源码 | 文件源码
def group_tokens(parsed):
    grouped = defaultdict(list)
    identifiers = []

    for token in parsed.tokens:
        if token.ttype:
            grouped[token.ttype].append(token.value)
        elif token.get_name():
            identifiers.append(token)

    return grouped, identifiers
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def find_prev_keyword(sql, n_skip=0):
    """
    Find the last sql keyword in an SQL statement.

    Returns the value of the last keyword, and the text of the query with
    everything after the last keyword stripped.
    """
    if not sql.strip():
        return None, ''

    parsed = sqlparse.parse(sql)[0]
    flattened = list(parsed.flatten())
    flattened = flattened[:len(flattened) - n_skip]

    logical_operators = ('AND', 'OR', 'NOT', 'BETWEEN')

    for t in reversed(flattened):
        if t.value == '(' or (t.is_keyword and (
                              t.value.upper() not in logical_operators)):
            # Find the location of token t in the original parsed statement
            # We can't use parsed.token_index(t) because t may be a child token
            # inside a TokenList, in which case token_index thows an error
            # Minimal example:
            #   p = sqlparse.parse('select * from foo where bar')
            #   t = list(p.flatten())[-3]  # The "Where" token
            #   p.token_index(t)  # Throws ValueError: not in list
            idx = flattened.index(t)

            # Combine the string values of all tokens in the original list
            # up to and including the target keyword token t, to produce a
            # query string with everything after the keyword token removed
            text = ''.join(tok.value for tok in flattened[:idx + 1])
            return t, text

    return None, ''


# Postgresql dollar quote signs look like `$$` or `$tag$`
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def parse_partial_identifier(word):
    """
    Attempt to parse a (partially typed) word as an identifier.

    word may include a schema qualification, like `schema_name.partial_name`
    or `schema_name.` There may also be unclosed quotation marks, like
    `"schema` or `schema."partial_name`.

    :param word: string representing a (partially complete) identifier
    :return: sqlparse.sql.Identifier, or None
    """

    p = sqlparse.parse(word)[0]
    n_tok = len(p.tokens)
    if n_tok == 1 and isinstance(p.tokens[0], Identifier):
        return p.tokens[0]
    elif p.token_next_by(m=(Error, '"'))[1]:
        # An unmatched double quote, e.g. '"foo', 'foo."', or 'foo."bar'
        # Close the double quote, then reparse
        return parse_partial_identifier(word + '"')
    else:
        return None
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def is_subselect(parsed):
    if not parsed.is_group:
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT', 'CREATE'):
            return True
    return False
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def _identifier_is_function(identifier):
    return any(isinstance(t, Function) for t in identifier.tokens)
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def extract_from_part(parsed, stop_at_punctuation=True):
    tbl_prefix_seen = False
    for item in parsed.tokens:
        if tbl_prefix_seen:
            if is_subselect(item):
                for x in extract_from_part(item, stop_at_punctuation):
                    yield x
            elif stop_at_punctuation and item.ttype is Punctuation:
                raise StopIteration
            # An incomplete nested select won't be recognized correctly as a
            # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
            # the second FROM to trigger this elif condition resulting in a
            # StopIteration. So we need to ignore the keyword if the keyword
            # FROM.
            # Also 'SELECT * FROM abc JOIN def' will trigger this elif
            # condition. So we need to ignore the keyword JOIN and its variants
            # INNER JOIN, FULL OUTER JOIN, etc.
            elif item.ttype is Keyword and (
                    not item.value.upper() == 'FROM') and (
                    not item.value.upper().endswith('JOIN')):
                tbl_prefix_seen = False
            else:
                yield item
        elif item.ttype is Keyword or item.ttype is Keyword.DML:
            item_val = item.value.upper()
            if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
                    item_val.endswith('JOIN')):
                tbl_prefix_seen = True
        # 'SELECT a, FROM abc' will detect FROM as part of the column list.
        # So this check here is necessary.
        elif isinstance(item, IdentifierList):
            for identifier in item.get_identifiers():
                if (identifier.ttype is Keyword and
                        identifier.value.upper() == 'FROM'):
                    tbl_prefix_seen = True
                    break
项目:ravel    作者:ravel-net    | 项目源码 | 文件源码
def discoverComponents(sql):
    """Search for installable/removable components within a string containing
       one or more SQL statemnets"""
    components = []
    parsed = sqlparse.parse(sql)
    for statement in parsed:
        for token in statement.tokens:
            name = None
            typ = None

            # remove newlines, extra spaces for regex
            stmt = str(statement).replace("\n", " ")
            stmt = " ".join(stmt.split())

            for comp in sqlComponents:
                if token.match(Keyword, comp.typ):
                    name = comp.match(stmt)
                    typ = comp.typ

            if name is not None:
                component = AppComponent(name, typ)
                if component not in components:
                    components.append(component)

    # sort alphabetically, should fix drop issues when 'rule on table'
    # is dropped before 'table'
    return sorted(components, key=lambda x: x.typ)