Python MySQLdb 模块,query() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用MySQLdb.query()。
def query(self, paramstyle=None):
"""
Returns the query part of the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.query()
'SELECT * FROM test WHERE name=%s'
>>> q.query(paramstyle='qmark')
'SELECT * FROM test WHERE name=?'
"""
s = []
for x in self.items:
if isinstance(x, SQLParam):
x = x.get_marker(paramstyle)
s.append(safestr(x))
else:
x = safestr(x)
# automatically escape % characters in the query
# For backward compatability, ignore escaping when the query looks already escaped
if paramstyle in ['format', 'pyformat']:
if '%' in x and '%%' not in x:
x = x.replace('%', '%%')
s.append(x)
return "".join(s)
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
query, params = self._process_query(sql_query)
out = cur.execute(query, params)
b = time.time()
except:
if self.printing:
print >> debug, 'ERR:', str(sql_query)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query))
return out
def query(self, paramstyle=None):
"""
Returns the query part of the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.query()
'SELECT * FROM test WHERE name=%s'
>>> q.query(paramstyle='qmark')
'SELECT * FROM test WHERE name=?'
"""
s = []
for x in self.items:
if isinstance(x, SQLParam):
x = x.get_marker(paramstyle)
s.append(safestr(x))
else:
x = safestr(x)
# automatically escape % characters in the query
# For backward compatability, ignore escaping when the query looks already escaped
if paramstyle in ['format', 'pyformat']:
if '%' in x and '%%' not in x:
x = x.replace('%', '%%')
s.append(x)
return "".join(s)
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
query, params = self._process_query(sql_query)
out = cur.execute(query, params)
b = time.time()
except:
if self.printing:
print('ERR:', str(sql_query), file=debug)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print('%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query)), file=debug)
return out
def query(self, paramstyle=None):
"""
Returns the query part of the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.query()
'SELECT * FROM test WHERE name=%s'
>>> q.query(paramstyle='qmark')
'SELECT * FROM test WHERE name=?'
"""
s = []
for x in self.items:
if isinstance(x, SQLParam):
x = x.get_marker(paramstyle)
s.append(safestr(x))
else:
x = safestr(x)
# automatically escape % characters in the query
# For backward compatability, ignore escaping when the query looks already escaped
if paramstyle in ['format', 'pyformat']:
if '%' in x and '%%' not in x:
x = x.replace('%', '%%')
s.append(x)
return "".join(s)
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
query, params = self._process_query(sql_query)
out = cur.execute(query, params)
b = time.time()
except:
if self.printing:
print >> debug, 'ERR:', str(sql_query)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query))
return out
def query(self, paramstyle=None):
"""
Returns the query part of the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.query()
'SELECT * FROM test WHERE name=%s'
>>> q.query(paramstyle='qmark')
'SELECT * FROM test WHERE name=?'
"""
s = []
for x in self.items:
if isinstance(x, SQLParam):
x = x.get_marker(paramstyle)
s.append(safestr(x))
else:
x = safestr(x)
# automatically escape % characters in the query
# For backward compatability, ignore escaping when the query looks already escaped
if paramstyle in ['format', 'pyformat']:
if '%' in x and '%%' not in x:
x = x.replace('%', '%%')
s.append(x)
return "".join(s)
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
query, params = self._process_query(sql_query)
out = cur.execute(query, params)
b = time.time()
except:
if self.printing:
print >> debug, 'ERR:', str(sql_query)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query))
return out
def select(self, tables, vars=None, what='*', where=None, order=None, group=None,
limit=None, offset=None, _test=False):
"""
Selects `what` from `tables` with clauses `where`, `order`,
`group`, `limit`, and `offset`. Uses vars to interpolate.
Otherwise, each clause can be a SQLQuery.
>>> db = DB(None, {})
>>> db.select('foo', _test=True)
<sql: 'SELECT * FROM foo'>
>>> db.select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True)
<sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'>
"""
if vars is None: vars = {}
sql_clauses = self.sql_clauses(what, tables, where, group, order, limit, offset)
clauses = [self.gen_clause(sql, val, vars) for sql, val in sql_clauses if val is not None]
qout = SQLQuery.join(clauses)
if _test: return qout
return self.query(qout, processed=True)
def query(self, paramstyle=None):
"""
Returns the query part of the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.query()
'SELECT * FROM test WHERE name=%s'
>>> q.query(paramstyle='qmark')
'SELECT * FROM test WHERE name=?'
"""
s = []
for x in self.items:
if isinstance(x, SQLParam):
x = x.get_marker(paramstyle)
s.append(safestr(x))
else:
x = safestr(x)
# automatically escape % characters in the query
# For backward compatability, ignore escaping when the query looks already escaped
if paramstyle in ['format', 'pyformat']:
if '%' in x and '%%' not in x:
x = x.replace('%', '%%')
s.append(x)
return "".join(s)
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
query, params = self._process_query(sql_query)
out = cur.execute(query, params)
b = time.time()
except:
if self.printing:
print >> debug, 'ERR:', str(sql_query)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query))
return out
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
paramstyle = getattr(self, 'paramstyle', 'pyformat')
out = cur.execute(sql_query.query(paramstyle),
[self._py2sql(x)
for x in sql_query.values()])
b = time.time()
except:
if self.printing:
print >> debug, 'ERR:', str(sql_query)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query))
return out
def select(self, tables, vars=None, what='*', where=None, order=None, group=None,
limit=None, offset=None, _test=False):
"""
Selects `what` from `tables` with clauses `where`, `order`,
`group`, `limit`, and `offset`. Uses vars to interpolate.
Otherwise, each clause can be a SQLQuery.
>>> db = DB(None, {})
>>> db.select('foo', _test=True)
<sql: 'SELECT * FROM foo'>
>>> db.select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True)
<sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'>
"""
if vars is None: vars = {}
sql_clauses = self.sql_clauses(what, tables, where, group, order, limit, offset)
clauses = [self.gen_clause(sql, val, vars) for sql, val in sql_clauses if val is not None]
qout = SQLQuery.join(clauses)
if _test: return qout
return self.query(qout, processed=True)
def query(self, paramstyle=None):
"""
Returns the query part of the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.query()
'SELECT * FROM test WHERE name=%s'
>>> q.query(paramstyle='qmark')
'SELECT * FROM test WHERE name=?'
"""
s = []
for x in self.items:
if isinstance(x, SQLParam):
x = x.get_marker(paramstyle)
s.append(safestr(x))
else:
x = safestr(x)
# automatically escape % characters in the query
# For backward compatability, ignore escaping when the query looks already escaped
if paramstyle in ['format', 'pyformat']:
if '%' in x and '%%' not in x:
x = x.replace('%', '%%')
s.append(x)
return "".join(s)
def _db_execute(self, cur, sql_query):
"""executes an sql query"""
self.ctx.dbq_count += 1
try:
a = time.time()
query, params = self._process_query(sql_query)
out = cur.execute(query, params)
b = time.time()
except:
if self.printing:
print >> debug, 'ERR:', str(sql_query)
if self.ctx.transactions:
self.ctx.transactions[-1].rollback()
else:
self.ctx.rollback()
raise
if self.printing:
print >> debug, '%s (%s): %s' % (round(b - a, 2), self.ctx.dbq_count, str(sql_query))
return out
def __init__(self, items=None):
r"""Creates a new SQLQuery.
>>> SQLQuery("x")
<sql: 'x'>
>>> q = SQLQuery(['SELECT * FROM ', 'test', ' WHERE x=', SQLParam(1)])
>>> q
<sql: 'SELECT * FROM test WHERE x=1'>
>>> q.query(), q.values()
('SELECT * FROM test WHERE x=%s', [1])
>>> SQLQuery(SQLParam(1))
<sql: '1'>
"""
if items is None:
self.items = []
elif isinstance(items, list):
self.items = items
elif isinstance(items, SQLParam):
self.items = [items]
elif isinstance(items, SQLQuery):
self.items = list(items.items)
else:
self.items = [items]
# Take care of SQLLiterals
for i, item in enumerate(self.items):
if isinstance(item, SQLParam) and isinstance(item.value, SQLLiteral):
self.items[i] = item.value.v
def values(self):
"""
Returns the values of the parameters used in the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.values()
['joe']
"""
return [i.value for i in self.items if isinstance(i, SQLParam)]
def _str(self):
try:
return self.query() % tuple([sqlify(x) for x in self.values()])
except (ValueError, TypeError):
return self.query()
def sqlquote(a):
"""
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
>>> 'WHERE x = ' + sqlquote(True) + ' AND y IN ' + sqlquote([2, 3])
<sql: "WHERE x = 't' AND y IN (2, 3)">
"""
if isinstance(a, list):
return _sqllist(a)
else:
return sqlparam(a).sqlquery()
def _process_query(self, sql_query):
"""Takes the SQLQuery object and returns query string and parameters.
"""
paramstyle = getattr(self, 'paramstyle', 'pyformat')
query = sql_query.query(paramstyle)
params = sql_query.values()
return query, params
def query(self, sql_query, vars=None, processed=False, _test=False):
"""
Execute SQL query `sql_query` using dictionary `vars` to interpolate it.
If `processed=True`, `vars` is a `reparam`-style list to use
instead of interpolating.
>>> db = DB(None, {})
>>> db.query("SELECT * FROM foo", _test=True)
<sql: 'SELECT * FROM foo'>
>>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
>>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
"""
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, sql_query)
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
out.__len__ = lambda: int(db_cursor.rowcount)
out.list = lambda: [storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
else:
out = db_cursor.rowcount
if not self.ctx.transactions:
self.ctx.commit()
return out
def update(self, tables, where, vars=None, _test=False, **values):
"""
Update `tables` with clause `where` (interpolated using `vars`)
and setting `values`.
>>> db = DB(None, {})
>>> name = 'Joseph'
>>> q = db.update('foo', where='name = $name', name='bob', age=2,
... created=SQLLiteral('NOW()'), vars=locals(), _test=True)
>>> q
<sql: "UPDATE foo SET age = 2, name = 'bob', created = NOW() WHERE name = 'Joseph'">
>>> q.query()
'UPDATE foo SET age = %s, name = %s, created = NOW() WHERE name = %s'
>>> q.values()
[2, 'bob', 'Joseph']
"""
if vars is None: vars = {}
where = self._where(where, vars)
query = (
"UPDATE " + sqllist(tables) +
" SET " + sqlwhere(values, ', ') +
" WHERE " + where)
if _test: return query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, query)
if not self.ctx.transactions:
self.ctx.commit()
return db_cursor.rowcount
def _process_insert_query(self, query, tablename, seqname):
return query
def _process_insert_query(self, query, tablename, seqname):
if seqname is None:
# when seqname is not provided guess the seqname and make sure it exists
seqname = tablename + "_id_seq"
if seqname not in self._get_all_sequences():
seqname = None
if seqname:
query += "; SELECT currval('%s')" % seqname
return query
def _get_all_sequences(self):
"""Query postgres to find names of all sequences used in this database."""
if self._sequences is None:
q = "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'"
self._sequences = set([c.relname for c in self.query(q)])
return self._sequences
def _process_insert_query(self, query, tablename, seqname):
return query, SQLQuery('SELECT last_insert_rowid();')
def query(self, *a, **kw):
out = DB.query(self, *a, **kw)
if isinstance(out, iterbetter):
del out.__len__
return out
def _process_query(self, sql_query):
"""Takes the SQLQuery object and returns query string and parameters.
"""
# MSSQLDB expects params to be a tuple.
# Overwriting the default implementation to convert params to tuple.
paramstyle = getattr(self, 'paramstyle', 'pyformat')
query = sql_query.query(paramstyle)
params = sql_query.values()
return query, tuple(params)
def _process_insert_query(self, query, tablename, seqname):
if seqname is None:
# It is not possible to get seq name from table name in Oracle
return query
else:
return query + "; SELECT %s.currval FROM dual" % seqname
def __init__(self, items=None):
r"""Creates a new SQLQuery.
>>> SQLQuery("x")
<sql: 'x'>
>>> q = SQLQuery(['SELECT * FROM ', 'test', ' WHERE x=', SQLParam(1)])
>>> q
<sql: 'SELECT * FROM test WHERE x=1'>
>>> q.query(), q.values()
('SELECT * FROM test WHERE x=%s', [1])
>>> SQLQuery(SQLParam(1))
<sql: '1'>
"""
if items is None:
self.items = []
elif isinstance(items, list):
self.items = items
elif isinstance(items, SQLParam):
self.items = [items]
elif isinstance(items, SQLQuery):
self.items = list(items.items)
else:
self.items = [items]
# Take care of SQLLiterals
for i, item in enumerate(self.items):
if isinstance(item, SQLParam) and isinstance(item.value, SQLLiteral):
self.items[i] = item.value.v
def values(self):
"""
Returns the values of the parameters used in the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.values()
['joe']
"""
return [i.value for i in self.items if isinstance(i, SQLParam)]
def _str(self):
try:
return self.query() % tuple([sqlify(x) for x in self.values()])
except (ValueError, TypeError):
return self.query()
def sqlquote(a):
"""
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
>>> 'WHERE x = ' + sqlquote(True) + ' AND y IN ' + sqlquote([2, 3])
<sql: "WHERE x = 't' AND y IN (2, 3)">
"""
if isinstance(a, list):
return _sqllist(a)
else:
return sqlparam(a).sqlquery()
def _process_query(self, sql_query):
"""Takes the SQLQuery object and returns query string and parameters.
"""
paramstyle = getattr(self, 'paramstyle', 'pyformat')
query = sql_query.query(paramstyle)
params = sql_query.values()
return query, params
def query(self, sql_query, vars=None, processed=False, _test=False):
"""
Execute SQL query `sql_query` using dictionary `vars` to interpolate it.
If `processed=True`, `vars` is a `reparam`-style list to use
instead of interpolating.
>>> db = DB(None, {})
>>> db.query("SELECT * FROM foo", _test=True)
<sql: 'SELECT * FROM foo'>
>>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
>>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
"""
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, sql_query)
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
out.__len__ = lambda: int(db_cursor.rowcount)
out.list = lambda: [storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
else:
out = db_cursor.rowcount
if not self.ctx.transactions:
self.ctx.commit()
return out
def update(self, tables, where, vars=None, _test=False, **values):
"""
Update `tables` with clause `where` (interpolated using `vars`)
and setting `values`.
>>> db = DB(None, {})
>>> name = 'Joseph'
>>> q = db.update('foo', where='name = $name', name='bob', age=2,
... created=SQLLiteral('NOW()'), vars=locals(), _test=True)
>>> q
<sql: "UPDATE foo SET age = 2, created = NOW(), name = 'bob' WHERE name = 'Joseph'">
>>> q.query()
'UPDATE foo SET age = %s, created = NOW(), name = %s WHERE name = %s'
>>> q.values()
[2, 'bob', 'Joseph']
"""
if vars is None: vars = {}
where = self._where(where, vars)
values = sorted(values.items(), key=lambda t: t[0])
query = (
"UPDATE " + sqllist(tables) +
" SET " + sqlwhere(values, ', ') +
" WHERE " + where)
if _test: return query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, query)
if not self.ctx.transactions:
self.ctx.commit()
return db_cursor.rowcount
def _process_insert_query(self, query, tablename, seqname):
return query
def _process_insert_query(self, query, tablename, seqname):
if seqname is None:
# when seqname is not provided guess the seqname and make sure it exists
seqname = tablename + "_id_seq"
if seqname not in self._get_all_sequences():
seqname = None
if seqname:
query += "; SELECT currval('%s')" % seqname
return query
def _get_all_sequences(self):
"""Query postgres to find names of all sequences used in this database."""
if self._sequences is None:
q = "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'"
self._sequences = set([c.relname for c in self.query(q)])
return self._sequences
def _process_insert_query(self, query, tablename, seqname):
return query, SQLQuery('SELECT last_insert_rowid();')
def query(self, *a, **kw):
out = DB.query(self, *a, **kw)
if isinstance(out, iterbetter):
del out.__len__
return out
def _process_query(self, sql_query):
"""Takes the SQLQuery object and returns query string and parameters.
"""
# MSSQLDB expects params to be a tuple.
# Overwriting the default implementation to convert params to tuple.
paramstyle = getattr(self, 'paramstyle', 'pyformat')
query = sql_query.query(paramstyle)
params = sql_query.values()
return query, tuple(params)
def _process_insert_query(self, query, tablename, seqname):
if seqname is None:
# It is not possible to get seq name from table name in Oracle
return query
else:
return query + "; SELECT %s.currval FROM dual" % seqname
def __init__(self, items=None):
r"""Creates a new SQLQuery.
>>> SQLQuery("x")
<sql: 'x'>
>>> q = SQLQuery(['SELECT * FROM ', 'test', ' WHERE x=', SQLParam(1)])
>>> q
<sql: 'SELECT * FROM test WHERE x=1'>
>>> q.query(), q.values()
('SELECT * FROM test WHERE x=%s', [1])
>>> SQLQuery(SQLParam(1))
<sql: '1'>
"""
if items is None:
self.items = []
elif isinstance(items, list):
self.items = items
elif isinstance(items, SQLParam):
self.items = [items]
elif isinstance(items, SQLQuery):
self.items = list(items.items)
else:
self.items = [items]
# Take care of SQLLiterals
for i, item in enumerate(self.items):
if isinstance(item, SQLParam) and isinstance(item.value, SQLLiteral):
self.items[i] = item.value.v
def values(self):
"""
Returns the values of the parameters used in the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.values()
['joe']
"""
return [i.value for i in self.items if isinstance(i, SQLParam)]
def _str(self):
try:
return self.query() % tuple([sqlify(x) for x in self.values()])
except (ValueError, TypeError):
return self.query()
def sqlquote(a):
"""
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
>>> 'WHERE x = ' + sqlquote(True) + ' AND y IN ' + sqlquote([2, 3])
<sql: "WHERE x = 't' AND y IN (2, 3)">
"""
if isinstance(a, list):
return _sqllist(a)
else:
return sqlparam(a).sqlquery()