我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlparse.split()。
def prepare_sql_script(self, sql): """ Takes an SQL script that may contain multiple lines and returns a list of statements to feed to successive cursor.execute() calls. Since few databases are able to process raw SQL scripts in a single cursor.execute() call and PEP 249 doesn't talk about this use case, the default implementation is conservative. """ try: import sqlparse except ImportError: raise ImproperlyConfigured( "sqlparse is required if you don't split your SQL " "statements manually." ) else: return [sqlparse.format(statement, strip_comments=True) for statement in sqlparse.split(sql) if statement]
def execute_sql_file(filename): """ Since mysql can't rollback any CREATE/ALTER/DROP instruction, do a full backup before starting the migration and if it fails, and reapply it if it fails """ with database.Database() as db, open(filename) as fd: if not db.transaction(): print("Failed to start the migration") sys.exit(1) cursor = QtSql.QSqlQuery(db) for statement in sqlparse.split(fd.read()): if not statement: continue if not cursor.exec_(statement): db.rollback() print(cursor.lastError().text()) break else: db.commit()
def test_psql_quotation_marks(): # issue83 # regression: make sure plain $$ work t = sqlparse.split(""" CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $$ .... $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $$ .... $$ LANGUAGE plpgsql;""") assert len(t) == 2 # make sure $SOMETHING$ works too t = sqlparse.split(""" CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $PROC_1$ .... $PROC_1$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $PROC_2$ .... $PROC_2$ LANGUAGE plpgsql;""") assert len(t) == 2
def tokenize_content(file_path, content): """ Take a SQL file with multiple statements, returns a list of blocks. """ sql_blocks = sqlparse.split(content) sql_blocks = filter(None, sql_blocks) if not sql_blocks: return def builder(content, has_body=False): return Block(file_path, has_body, content) for block in sql_blocks: # would have used `yield from' below but py2... for r in _extract_parts(builder, block.split('\n')): yield r
def is_single_statement(sql): '''Returns True if received SQL string contains at most one statement''' return len(sqlparse.split(sql)) <= 1
def parse(self,s): # str: 1d,2h,3m,4s query = s.split(',') for elm in query: print(elm) if not self.elm_pattern.match(elm): raise ValueError('Cannot parse query: {}'.format(elm)) for e in ['d', 'h', 'm', 's']: if e in elm: setattr(self, e, int(elm.split(e)[0]))
def timestamp_from_string(str): print(str) if (str.startswith( 'last ', 0, 5 )): # sample queries: 1m; 1m,2s; 1d,2h,3m,4s query = Timeseries_query(str.split('last ')[1]) diff = datetime.timedelta(seconds=query.s, minutes=query.m, hours=query.h, days=query.d) return utcnow() - diff return iso8601.parse_date(str)
def apply_migrations(): # Get the latest applied migration with database.Cursor() as cursor: cursor.prepare("SELECT version FROM __migrations ORDER BY version DESC LIMIT 1") cursor.exec_() last_applied = -1 if cursor.next(): last_applied = cursor.value("version") migration_file, migration_name = tempfile.mkstemp() should_apply = False nb = 0 for migration in sorted(os.listdir("../migrations")): nb = int(migration.split("_")[0]) if nb <= last_applied: continue should_apply = True print(f"Applying {migration}") with open(os.path.join("../migrations", migration, "up.sql")) as fd: os.write(migration_file, fd.read().encode()) if not should_apply: print("Nothing to do") sys.exit(1) os.write(migration_file, f"INSERT INTO __migrations(version) VALUES({nb});\n".encode()) # Execute the generated file. execute_sql_file(migration_name) # Since mkstemp doesn't handle automatically the file deletion, do it # ourselves os.remove(migration_name)
def rollback_migrations(): lower = -1 upper = -1 with database.Cursor() as cursor: cursor.prepare("SELECT version FROM __migrations ORDER BY version DESC LIMIT 2") cursor.exec_() if cursor.next(): upper = int(cursor.value("version")) if cursor.next(): lower = int(cursor.value("version")) should_apply = False migration_file, migration_name = tempfile.mkstemp() for migration in sorted(os.listdir("../migrations"), reverse=True): nb = int(migration.split("_")[0]) if lower < nb <= upper: should_apply = True print(f"Rollback {migration}") with open(os.path.join("../migrations", migration, "down.sql")) as fd: os.write(migration_file, fd.read().encode()) os.write(migration_file, f"DELETE FROM __migrations WHERE version={upper};\n".encode()) if should_apply: execute_sql_file(migration_name) else: print("Nothing to do") os.remove(migration_name)
def prepare_sql_script(self, sql, _allow_fallback=False): """ Takes a SQL script that may contain multiple lines and returns a list of statements to feed to successive cursor.execute() calls. Since few databases are able to process raw SQL scripts in a single cursor.execute() call and PEP 249 doesn't talk about this use case, the default implementation is conservative. """ # Remove _allow_fallback and keep only 'return ...' in Django 1.9. try: # This import must stay inside the method because it's optional. import sqlparse except ImportError: if _allow_fallback: # Without sqlparse, fall back to the legacy (and buggy) logic. warnings.warn( "Providing initial SQL data on a %s database will require " "sqlparse in Django 1.9." % self.connection.vendor, RemovedInDjango19Warning) from django.core.management.sql import _split_statements return _split_statements(sql) else: raise else: return [sqlparse.format(statement, strip_comments=True) for statement in sqlparse.split(sql) if statement]
def test_casewhen(self): sql = ('SELECT case when val = 1 then 2 else null end as foo;\n' 'comment on table actor is \'The actor table.\';') stmts = sqlparse.split(sql) self.assertEqual(len(stmts), 2)
def test_cursor_declare(self): sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n' 'SELECT 2;') stmts = sqlparse.split(sql) self.assertEqual(len(stmts), 2)
def test_if_function(self): # see issue 33 # don't let IF as a function confuse the splitter sql = ('CREATE TEMPORARY TABLE tmp ' 'SELECT IF(a=1, a, b) AS o FROM one; ' 'SELECT t FROM two') stmts = sqlparse.split(sql) self.assertEqual(len(stmts), 2)
def test_split_simple(): stmts = sqlparse.split('select * from foo; select * from bar;') assert len(stmts) == 2 assert stmts[0] == 'select * from foo;' assert stmts[1] == 'select * from bar;'
def execute_query(self, segment, query): '''Returns a cursor.''' logging.info('Servicing request: {query}'.format(query=query)) # if the user sent more than one query, or the query is not a SELECT, raise an exception. if len(sqlparse.split(query)) != 1 or sqlparse.parse(query)[0].get_type() != 'SELECT': raise Exception('Exactly one SELECT query per request, please.') assert os.path.isfile(segment.local_path()) logging.info("Connecting to sqlite database: {segment}".format(segment=segment.local_path())) connection = sqlite3.connect(segment.local_path()) trough.sync.setup_connection(connection) cursor = connection.cursor() cursor.execute(query.decode('utf-8')) return cursor # uwsgi endpoint
def __call__(self, env, start_response): try: query_dict = urllib.parse.parse_qs(env['QUERY_STRING']) # use the ?segment= query string variable or the host string to figure out which sqlite database to talk to. segment_id = query_dict.get('segment', env.get('HTTP_HOST', "").split("."))[0] logging.info('Connecting to Rethinkdb on: %s' % settings['RETHINKDB_HOSTS']) segment = trough.sync.Segment(segment_id=segment_id, size=0, rethinker=self.rethinker, services=self.services, registry=self.registry) content_length = int(env.get('CONTENT_LENGTH', 0)) query = env.get('wsgi.input').read(content_length) write_lock = segment.retrieve_write_lock() if write_lock and write_lock['node'] != settings['HOSTNAME']: logging.info('Found write lock for {segment}. Proxying {query} to {host}'.format(segment=segment.id, query=query, host=write_lock['node'])) return self.proxy_for_write_host(write_lock['node'], segment, query, start_response) ## # enforce that we are querying the correct database, send an explicit hostname. ## write_url = "http://{node}:{port}/?segment={segment}".format(node=node, segment=segment.id, port=settings['READ_PORT']) ## with requests.post(write_url, stream=True, data=query) as r: ## status_line = '{status_code} {reason}'.format(status_code=r.status_code, reason=r.reason) ## headers = [("Content-Type", r.headers['Content-Type'],)] ## start_response(status_line, headers) ## return r.iter_content() cursor = self.execute_query(segment, query) start_response('200 OK', [('Content-Type','application/json')]) return self.sql_result_json_iter(cursor) except Exception as e: logging.error('500 Server Error due to exception', exc_info=True) start_response('500 Server Error', [('Content-Type', 'text/plain')]) return [('500 Server Error: %s\n' % str(e)).encode('utf-8')]
def handle_input(self, input_data, verbose=True, refresh_metadata=True): force_pager = False if input_data.endswith(r'\p' if isinstance(input_data, str) else rb'\p'): input_data = input_data[:-2] force_pager = True # FIXME: A dirty dirty hack to make multiple queries (per one paste) work. self.query_ids = [] for query in sqlparse.split(input_data): query_id = str(uuid4()) self.query_ids.append(query_id) self.handle_query(query, verbose=verbose, query_id=query_id, force_pager=force_pager) if refresh_metadata and input_data: self.cli.application.buffer.completer.refresh_metadata()
def statements_split(cls, statements): for statement in sqlparse.split(statements): statement = statement.strip() if statement.endswith(';'): statement = statement[:-1].strip() if statement: # remove empty statements yield statement
def execute_sql_file(db, fp): log.debug("executing sql file: `%s'", fp) with open(fp, 'r') as f: lines = f.readlines() txt = '\n'.join(lines) parts = sqlparse.split(txt) # TODO: use transaction for i, part in enumerate(parts): log.debug("executing statement %s, `%s...'", i, part[:20].replace('\n', '')) db.execute(text(part))
def convert_to_sqlalchemy_statement(raw_sql_script): """Convert raw SQL into SQLAlchemy statement.""" # remove comment and tail spaces formated_sql_script = sqlparse.format( raw_sql_script.strip(), strip_comments=True) return sqlparse.split(formated_sql_script)
def column_windows(session, w_column, w_size, fb_kw=None, f_expr=None): """Return a series of WHERE clauses against a given column that break it into windows. Parameters ---------- session : object An instance of SQLAlchemy Session. w_column : object Column object that is used to split into windows, should be an integer column. w_size : int Size of the window fb_kw : dict The filter_by keywords, used by query.filter_by(). f_expr : list The filter expressions, used by query.filter(). Returns ------- iterable Each element of the iterable is a whereclause expression, which specify the range of the window over the column `w_col`. Exmaple ------- for whereclause in column_windows(q.session, w_column, w_size): for row in q.filter(whereclause).order_by(w_column): yield row """ def int_for_range(start_id, end_id): """Internal function to build range.""" if end_id: return and_(w_column >= start_id, w_column < end_id) else: return w_column >= start_id q = session.query( w_column, func.row_number().over(order_by=w_column).label('w_row_num')) if fb_kw: q = q.filter_by(**fb_kw) if f_expr: q = q.filter(*f_expr) q = q.from_self(w_column) if w_size > 1: q = q.filter(sqlalchemy.text("w_row_num % {}=1".format(w_size))) intervals = [id for id, in q] while intervals: start = intervals.pop(0) if intervals: end = intervals[0] else: end = None yield int_for_range(start, end)
def run(self, engine, step=None): """Runs SQL script through raw dbapi execute call""" text = self.source() # Don't rely on SA's autocommit here # (SA uses .startswith to check if a commit is needed. What if script # starts with a comment?) conn = engine.connect() try: trans = conn.begin() try: # ignore transaction management statements that are # redundant in SQL script context and result in # operational error being returned. # # Note: we don't ignore ROLLBACK in migration scripts # since its usage would be insane anyway, and we're # better to fail on its occurance instead of ignoring it # (and committing transaction, which is contradictory to # the whole idea of ROLLBACK) ignored_statements = ('BEGIN', 'END', 'COMMIT') ignored_regex = re.compile('^\s*(%s).*;?$' % '|'.join(ignored_statements), re.IGNORECASE) # NOTE(ihrachys): script may contain multiple statements, and # not all drivers reliably handle multistatement queries or # commands passed to .execute(), so split them and execute one # by one text = sqlparse.format(text, strip_comments=True, strip_whitespace=True) for statement in sqlparse.split(text): if statement: if re.match(ignored_regex, statement): log.warning('"%s" found in SQL script; ignoring' % statement) else: conn.execute(statement) trans.commit() except Exception as e: log.error("SQL script %s failed: %s", self.path, e) trans.rollback() raise finally: conn.close()
def connect(self): self.scheme = 'http' if '://' in self.host: u = urlparse(self.host, allow_fragments = False) self.host = u.hostname self.port = u.port or self.port self.scheme = u.scheme self.url = '{scheme}://{host}:{port}/'.format(scheme=self.scheme, host=self.host, port=self.port) self.client = Client( self.url, self.user, self.password, self.database, self.settings, self.stacktrace, self.conn_timeout, self.conn_timeout_retry, self.conn_timeout_retry_delay, ) self.echo.print("Connecting to {host}:{port}".format( host=self.host, port=self.port) ) try: response = self.client.query('SELECT version();', fmt='TabSeparated') except TimeoutError: self.echo.error("Error: Connection timeout.") return False except ConnectionError: self.echo.error("Error: Failed to connect.") return False except DBException as e: self.echo.error("Error:") self.echo.error(e.error) if self.stacktrace and e.stacktrace: self.echo.print("Stack trace:") self.echo.print(e.stacktrace) return False if not response.data.endswith('\n'): self.echo.error("Error: Request failed: `SELECT version();` query failed.") return False version = response.data.strip().split('.') self.server_version = (int(version[0]), int(version[1]), int(version[2])) self.echo.success( "Connected to ClickHouse server v{0}.{1}.{2}.\n".format( *self.server_version ) ) return True