Python sqlparse 模块,split() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlparse.split()

项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:Enibar    作者:ENIB    | 项目源码 | 文件源码
def execute_sql_file(filename):
    """ Since mysql can't rollback any CREATE/ALTER/DROP instruction,
    do a full backup before starting the migration and if it fails, and reapply
    it if it fails
    """
    with database.Database() as db, open(filename) as fd:
        if not db.transaction():
            print("Failed to start the migration")
            sys.exit(1)

        cursor = QtSql.QSqlQuery(db)
        for statement in sqlparse.split(fd.read()):
            if not statement:
                continue

            if not cursor.exec_(statement):
                db.rollback()
                print(cursor.lastError().text())
                break
        else:
            db.commit()
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_psql_quotation_marks():  # issue83
    # regression: make sure plain $$ work
    t = sqlparse.split("""
    CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $$
          ....
    $$ LANGUAGE plpgsql;
    CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $$
          ....
    $$ LANGUAGE plpgsql;""")
    assert len(t) == 2
    # make sure $SOMETHING$ works too
    t = sqlparse.split("""
    CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $PROC_1$
          ....
    $PROC_1$ LANGUAGE plpgsql;
    CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $PROC_2$
          ....
    $PROC_2$ LANGUAGE plpgsql;""")
    assert len(t) == 2
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:baconql    作者:lsenta    | 项目源码 | 文件源码
def tokenize_content(file_path, content):
    """
    Take a SQL file with multiple statements,
    returns a list of blocks.
    """
    sql_blocks = sqlparse.split(content)
    sql_blocks = filter(None, sql_blocks)

    if not sql_blocks:
        return

    def builder(content, has_body=False):
        return Block(file_path, has_body, content)

    for block in sql_blocks:
        # would have used `yield from' below but py2...
        for r in _extract_parts(builder, block.split('\n')):
            yield r
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def prepare_sql_script(self, sql):
        """
        Takes an SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        try:
            import sqlparse
        except ImportError:
            raise ImproperlyConfigured(
                "sqlparse is required if you don't split your SQL "
                "statements manually."
            )
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def is_single_statement(sql):
    '''Returns True if received SQL string contains at most one statement'''
    return len(sqlparse.split(sql)) <= 1
项目:ckan-timeseries    作者:namgk    | 项目源码 | 文件源码
def parse(self,s):
        # str: 1d,2h,3m,4s
        query = s.split(',')
        for elm in query:
            print(elm)
            if not self.elm_pattern.match(elm):
                raise ValueError('Cannot parse query: {}'.format(elm))

            for e in ['d', 'h', 'm', 's']:
                if e in elm:
                    setattr(self, e, int(elm.split(e)[0]))
项目:ckan-timeseries    作者:namgk    | 项目源码 | 文件源码
def timestamp_from_string(str):
    print(str)
    if (str.startswith( 'last ', 0, 5 )):
        # sample queries: 1m; 1m,2s; 1d,2h,3m,4s
        query = Timeseries_query(str.split('last ')[1])
        diff = datetime.timedelta(seconds=query.s, minutes=query.m, hours=query.h, days=query.d)
        return utcnow() - diff

    return iso8601.parse_date(str)
项目:ckan-timeseries    作者:namgk    | 项目源码 | 文件源码
def is_single_statement(sql):
    '''Returns True if received SQL string contains at most one statement'''
    return len(sqlparse.split(sql)) <= 1
项目:Enibar    作者:ENIB    | 项目源码 | 文件源码
def apply_migrations():
    # Get the latest applied migration
    with database.Cursor() as cursor:
        cursor.prepare("SELECT version FROM __migrations ORDER BY version DESC LIMIT 1")
        cursor.exec_()
        last_applied = -1
        if cursor.next():
            last_applied = cursor.value("version")

    migration_file, migration_name = tempfile.mkstemp()

    should_apply = False
    nb = 0
    for migration in sorted(os.listdir("../migrations")):
        nb = int(migration.split("_")[0])
        if nb <= last_applied:
            continue
        should_apply = True
        print(f"Applying {migration}")
        with open(os.path.join("../migrations", migration, "up.sql")) as fd:
            os.write(migration_file, fd.read().encode())

    if not should_apply:
        print("Nothing to do")
        sys.exit(1)

    os.write(migration_file, f"INSERT INTO __migrations(version) VALUES({nb});\n".encode())

    # Execute the generated file.
    execute_sql_file(migration_name)

    # Since mkstemp doesn't handle automatically the file deletion, do it
    # ourselves
    os.remove(migration_name)
项目:Enibar    作者:ENIB    | 项目源码 | 文件源码
def rollback_migrations():
    lower = -1
    upper = -1
    with database.Cursor() as cursor:
        cursor.prepare("SELECT version FROM __migrations ORDER BY version DESC LIMIT 2")
        cursor.exec_()
        if cursor.next():
            upper = int(cursor.value("version"))
        if cursor.next():
            lower = int(cursor.value("version"))

    should_apply = False

    migration_file, migration_name = tempfile.mkstemp()

    for migration in sorted(os.listdir("../migrations"), reverse=True):
        nb = int(migration.split("_")[0])
        if lower < nb <= upper:
            should_apply = True
            print(f"Rollback {migration}")
            with open(os.path.join("../migrations", migration, "down.sql")) as fd:
                os.write(migration_file, fd.read().encode())

    os.write(migration_file, f"DELETE FROM __migrations WHERE version={upper};\n".encode())

    if should_apply:
        execute_sql_file(migration_name)
    else:
        print("Nothing to do")
    os.remove(migration_name)
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def prepare_sql_script(self, sql, _allow_fallback=False):
        """
        Takes a SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        # Remove _allow_fallback and keep only 'return ...' in Django 1.9.
        try:
            # This import must stay inside the method because it's optional.
            import sqlparse
        except ImportError:
            if _allow_fallback:
                # Without sqlparse, fall back to the legacy (and buggy) logic.
                warnings.warn(
                    "Providing initial SQL data on a %s database will require "
                    "sqlparse in Django 1.9." % self.connection.vendor,
                    RemovedInDjango19Warning)
                from django.core.management.sql import _split_statements
                return _split_statements(sql)
            else:
                raise
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_casewhen(self):
        sql = ('SELECT case when val = 1 then 2 else null end as foo;\n'
               'comment on table actor is \'The actor table.\';')
        stmts = sqlparse.split(sql)
        self.assertEqual(len(stmts), 2)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_cursor_declare(self):
        sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n'
               'SELECT 2;')
        stmts = sqlparse.split(sql)
        self.assertEqual(len(stmts), 2)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_if_function(self):  # see issue 33
        # don't let IF as a function confuse the splitter
        sql = ('CREATE TEMPORARY TABLE tmp '
               'SELECT IF(a=1, a, b) AS o FROM one; '
               'SELECT t FROM two')
        stmts = sqlparse.split(sql)
        self.assertEqual(len(stmts), 2)
项目:codenn    作者:sriniiyer    | 项目源码 | 文件源码
def test_split_simple():
    stmts = sqlparse.split('select * from foo; select * from bar;')
    assert len(stmts) == 2
    assert stmts[0] == 'select * from foo;'
    assert stmts[1] == 'select * from bar;'
项目:trough    作者:internetarchive    | 项目源码 | 文件源码
def execute_query(self, segment, query):
        '''Returns a cursor.'''
        logging.info('Servicing request: {query}'.format(query=query))
        # if the user sent more than one query, or the query is not a SELECT, raise an exception.
        if len(sqlparse.split(query)) != 1 or sqlparse.parse(query)[0].get_type() != 'SELECT':
            raise Exception('Exactly one SELECT query per request, please.')
        assert os.path.isfile(segment.local_path())

        logging.info("Connecting to sqlite database: {segment}".format(segment=segment.local_path()))
        connection = sqlite3.connect(segment.local_path())
        trough.sync.setup_connection(connection)
        cursor = connection.cursor()
        cursor.execute(query.decode('utf-8'))
        return cursor

    # uwsgi endpoint
项目:trough    作者:internetarchive    | 项目源码 | 文件源码
def __call__(self, env, start_response):
        try:
            query_dict = urllib.parse.parse_qs(env['QUERY_STRING'])
            # use the ?segment= query string variable or the host string to figure out which sqlite database to talk to.
            segment_id = query_dict.get('segment', env.get('HTTP_HOST', "").split("."))[0]
            logging.info('Connecting to Rethinkdb on: %s' % settings['RETHINKDB_HOSTS'])
            segment = trough.sync.Segment(segment_id=segment_id, size=0, rethinker=self.rethinker, services=self.services, registry=self.registry)
            content_length = int(env.get('CONTENT_LENGTH', 0))
            query = env.get('wsgi.input').read(content_length)

            write_lock = segment.retrieve_write_lock()
            if write_lock and write_lock['node'] != settings['HOSTNAME']:
                logging.info('Found write lock for {segment}. Proxying {query} to {host}'.format(segment=segment.id, query=query, host=write_lock['node']))
                return self.proxy_for_write_host(write_lock['node'], segment, query, start_response)

                ## # enforce that we are querying the correct database, send an explicit hostname.
                ## write_url = "http://{node}:{port}/?segment={segment}".format(node=node, segment=segment.id, port=settings['READ_PORT'])
                ## with requests.post(write_url, stream=True, data=query) as r:
                ##     status_line = '{status_code} {reason}'.format(status_code=r.status_code, reason=r.reason)
                ##     headers = [("Content-Type", r.headers['Content-Type'],)]
                ##     start_response(status_line, headers)
                ##     return r.iter_content()
            cursor = self.execute_query(segment, query)
            start_response('200 OK', [('Content-Type','application/json')])
            return self.sql_result_json_iter(cursor)
        except Exception as e:
            logging.error('500 Server Error due to exception', exc_info=True)
            start_response('500 Server Error', [('Content-Type', 'text/plain')])
            return [('500 Server Error: %s\n' % str(e)).encode('utf-8')]
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def handle_input(self, input_data, verbose=True, refresh_metadata=True):
        force_pager = False
        if input_data.endswith(r'\p' if isinstance(input_data, str) else rb'\p'):
            input_data = input_data[:-2]
            force_pager = True

        # FIXME: A dirty dirty hack to make multiple queries (per one paste) work.
        self.query_ids = []
        for query in sqlparse.split(input_data):
            query_id = str(uuid4())
            self.query_ids.append(query_id)
            self.handle_query(query, verbose=verbose, query_id=query_id, force_pager=force_pager)

        if refresh_metadata and input_data:
            self.cli.application.buffer.completer.refresh_metadata()
项目:omniduct    作者:airbnb    | 项目源码 | 文件源码
def statements_split(cls, statements):
        for statement in sqlparse.split(statements):
            statement = statement.strip()
            if statement.endswith(';'):
                statement = statement[:-1].strip()
            if statement:  # remove empty statements
                yield statement
项目:baconql    作者:lsenta    | 项目源码 | 文件源码
def execute_sql_file(db, fp):
    log.debug("executing sql file: `%s'", fp)
    with open(fp, 'r') as f:
        lines = f.readlines()
        txt = '\n'.join(lines)

    parts = sqlparse.split(txt)

    # TODO: use transaction
    for i, part in enumerate(parts):
        log.debug("executing statement %s, `%s...'", i, part[:20].replace('\n', ''))
        db.execute(text(part))
项目:hoaxy-backend    作者:IUNetSci    | 项目源码 | 文件源码
def convert_to_sqlalchemy_statement(raw_sql_script):
    """Convert raw SQL into SQLAlchemy statement."""
    # remove comment and tail spaces
    formated_sql_script = sqlparse.format(
        raw_sql_script.strip(), strip_comments=True)
    return sqlparse.split(formated_sql_script)
项目:hoaxy-backend    作者:IUNetSci    | 项目源码 | 文件源码
def column_windows(session, w_column, w_size, fb_kw=None, f_expr=None):
    """Return a series of WHERE clauses against a given column that break it
    into windows.

    Parameters
    ----------
    session : object
        An instance of SQLAlchemy Session.
    w_column : object
        Column object that is used to split into windows, should be an
        integer column.
    w_size : int
        Size of the window
    fb_kw : dict
        The filter_by keywords, used by query.filter_by().
    f_expr : list
        The filter expressions, used by query.filter().

    Returns
    -------
    iterable
        Each element of the iterable is a whereclause expression, which
        specify the range of the window over the column `w_col`.

    Exmaple
    -------
    for whereclause in column_windows(q.session, w_column, w_size):
        for row in q.filter(whereclause).order_by(w_column):
            yield row

    """

    def int_for_range(start_id, end_id):
        """Internal function to build range."""
        if end_id:
            return and_(w_column >= start_id, w_column < end_id)
        else:
            return w_column >= start_id

    q = session.query(
        w_column, func.row_number().over(order_by=w_column).label('w_row_num'))
    if fb_kw:
        q = q.filter_by(**fb_kw)
    if f_expr:
        q = q.filter(*f_expr)
    q = q.from_self(w_column)
    if w_size > 1:
        q = q.filter(sqlalchemy.text("w_row_num % {}=1".format(w_size)))

    intervals = [id for id, in q]

    while intervals:
        start = intervals.pop(0)
        if intervals:
            end = intervals[0]
        else:
            end = None
        yield int_for_range(start, end)
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def run(self, engine, step=None):
        """Runs SQL script through raw dbapi execute call"""
        text = self.source()
        # Don't rely on SA's autocommit here
        # (SA uses .startswith to check if a commit is needed. What if script
        # starts with a comment?)
        conn = engine.connect()
        try:
            trans = conn.begin()
            try:
                # ignore transaction management statements that are
                # redundant in SQL script context and result in
                # operational error being returned.
                #
                # Note: we don't ignore ROLLBACK in migration scripts
                # since its usage would be insane anyway, and we're
                # better to fail on its occurance instead of ignoring it
                # (and committing transaction, which is contradictory to
                # the whole idea of ROLLBACK)
                ignored_statements = ('BEGIN', 'END', 'COMMIT')
                ignored_regex = re.compile('^\s*(%s).*;?$' % '|'.join(ignored_statements),
                                           re.IGNORECASE)

                # NOTE(ihrachys): script may contain multiple statements, and
                # not all drivers reliably handle multistatement queries or
                # commands passed to .execute(), so split them and execute one
                # by one
                text = sqlparse.format(text, strip_comments=True, strip_whitespace=True)
                for statement in sqlparse.split(text):
                    if statement:
                        if re.match(ignored_regex, statement):
                            log.warning('"%s" found in SQL script; ignoring' % statement)
                        else:
                            conn.execute(statement)
                trans.commit()
            except Exception as e:
                log.error("SQL script %s failed: %s", self.path, e)
                trans.rollback()
                raise
        finally:
            conn.close()
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def prepare_sql_script(self, sql, _allow_fallback=False):
        """
        Takes a SQL script that may contain multiple lines and returns a list
        of statements to feed to successive cursor.execute() calls.

        Since few databases are able to process raw SQL scripts in a single
        cursor.execute() call and PEP 249 doesn't talk about this use case,
        the default implementation is conservative.
        """
        # Remove _allow_fallback and keep only 'return ...' in Django 1.9.
        try:
            # This import must stay inside the method because it's optional.
            import sqlparse
        except ImportError:
            if _allow_fallback:
                # Without sqlparse, fall back to the legacy (and buggy) logic.
                warnings.warn(
                    "Providing initial SQL data on a %s database will require "
                    "sqlparse in Django 1.9." % self.connection.vendor,
                    RemovedInDjango19Warning)
                from django.core.management.sql import _split_statements
                return _split_statements(sql)
            else:
                raise
        else:
            return [sqlparse.format(statement, strip_comments=True)
                    for statement in sqlparse.split(sql) if statement]
项目:clickhouse-cli    作者:hatarist    | 项目源码 | 文件源码
def connect(self):
        self.scheme = 'http'
        if '://' in self.host:
            u = urlparse(self.host, allow_fragments = False)
            self.host = u.hostname
            self.port = u.port or self.port
            self.scheme = u.scheme
        self.url = '{scheme}://{host}:{port}/'.format(scheme=self.scheme, host=self.host, port=self.port)
        self.client = Client(
            self.url,
            self.user,
            self.password,
            self.database,
            self.settings,
            self.stacktrace,
            self.conn_timeout,
            self.conn_timeout_retry,
            self.conn_timeout_retry_delay,
        )

        self.echo.print("Connecting to {host}:{port}".format(
            host=self.host, port=self.port)
        )

        try:
            response = self.client.query('SELECT version();', fmt='TabSeparated')
        except TimeoutError:
            self.echo.error("Error: Connection timeout.")
            return False
        except ConnectionError:
            self.echo.error("Error: Failed to connect.")
            return False
        except DBException as e:
            self.echo.error("Error:")
            self.echo.error(e.error)

            if self.stacktrace and e.stacktrace:
                self.echo.print("Stack trace:")
                self.echo.print(e.stacktrace)

            return False

        if not response.data.endswith('\n'):
            self.echo.error("Error: Request failed: `SELECT version();` query failed.")
            return False

        version = response.data.strip().split('.')
        self.server_version = (int(version[0]), int(version[1]), int(version[2]))

        self.echo.success(
            "Connected to ClickHouse server v{0}.{1}.{2}.\n".format(
                *self.server_version
            )
        )
        return True