我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用MySQLdb.query()。
def query(self, paramstyle=None): """ Returns the query part of the sql query. >>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')]) >>> q.query() 'SELECT * FROM test WHERE name=%s' >>> q.query(paramstyle='qmark') 'SELECT * FROM test WHERE name=?' """ s = [] for x in self.items: if isinstance(x, SQLParam): x = x.get_marker(paramstyle) s.append(safestr(x)) else: x = safestr(x) # automatically escape % characters in the query # For backward compatability, ignore escaping when the query looks already escaped if paramstyle in ['format', 'pyformat']: if '%' in x and '%%' not in x: x = x.replace('%', '%%') s.append(x) return "".join(s)
def _db_execute(self, cur, sql_query): """executes an sql query""" self.ctx.dbq_count += 1 try: a = time.time() query, params = self._process_query(sql_query) out = cur.execute(query, params) b = time.time() except: if self.printing: print >> debug, 'ERR:', str(sql_query) if self.ctx.transactions: self.ctx.transactions[-1].rollback() else: self.ctx.rollback() raise if self.printing: print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query)) return out
def _db_execute(self, cur, sql_query): """executes an sql query""" self.ctx.dbq_count += 1 try: a = time.time() query, params = self._process_query(sql_query) out = cur.execute(query, params) b = time.time() except: if self.printing: print('ERR:', str(sql_query), file=debug) if self.ctx.transactions: self.ctx.transactions[-1].rollback() else: self.ctx.rollback() raise if self.printing: print('%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query)), file=debug) return out
def select(self, tables, vars=None, what='*', where=None, order=None, group=None, limit=None, offset=None, _test=False): """ Selects `what` from `tables` with clauses `where`, `order`, `group`, `limit`, and `offset`. Uses vars to interpolate. Otherwise, each clause can be a SQLQuery. >>> db = DB(None, {}) >>> db.select('foo', _test=True) <sql: 'SELECT * FROM foo'> >>> db.select(['foo', 'bar'], where="foo.bar_id = bar.id", limit=5, _test=True) <sql: 'SELECT * FROM foo, bar WHERE foo.bar_id = bar.id LIMIT 5'> """ if vars is None: vars = {} sql_clauses = self.sql_clauses(what, tables, where, group, order, limit, offset) clauses = [self.gen_clause(sql, val, vars) for sql, val in sql_clauses if val is not None] qout = SQLQuery.join(clauses) if _test: return qout return self.query(qout, processed=True)
def _db_execute(self, cur, sql_query): """executes an sql query""" self.ctx.dbq_count += 1 try: a = time.time() paramstyle = getattr(self, 'paramstyle', 'pyformat') out = cur.execute(sql_query.query(paramstyle), [self._py2sql(x) for x in sql_query.values()]) b = time.time() except: if self.printing: print >> debug, 'ERR:', str(sql_query) if self.ctx.transactions: self.ctx.transactions[-1].rollback() else: self.ctx.rollback() raise if self.printing: print >> debug, '%s (%s): %s' % (round(b-a, 2), self.ctx.dbq_count, str(sql_query)) return out
def _db_execute(self, cur, sql_query): """executes an sql query""" self.ctx.dbq_count += 1 try: a = time.time() query, params = self._process_query(sql_query) out = cur.execute(query, params) b = time.time() except: if self.printing: print >> debug, 'ERR:', str(sql_query) if self.ctx.transactions: self.ctx.transactions[-1].rollback() else: self.ctx.rollback() raise if self.printing: print >> debug, '%s (%s): %s' % (round(b - a, 2), self.ctx.dbq_count, str(sql_query)) return out
def __init__(self, items=None): r"""Creates a new SQLQuery. >>> SQLQuery("x") <sql: 'x'> >>> q = SQLQuery(['SELECT * FROM ', 'test', ' WHERE x=', SQLParam(1)]) >>> q <sql: 'SELECT * FROM test WHERE x=1'> >>> q.query(), q.values() ('SELECT * FROM test WHERE x=%s', [1]) >>> SQLQuery(SQLParam(1)) <sql: '1'> """ if items is None: self.items = [] elif isinstance(items, list): self.items = items elif isinstance(items, SQLParam): self.items = [items] elif isinstance(items, SQLQuery): self.items = list(items.items) else: self.items = [items] # Take care of SQLLiterals for i, item in enumerate(self.items): if isinstance(item, SQLParam) and isinstance(item.value, SQLLiteral): self.items[i] = item.value.v
def __len__(self): return len(self.query())
def values(self): """ Returns the values of the parameters used in the sql query. >>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')]) >>> q.values() ['joe'] """ return [i.value for i in self.items if isinstance(i, SQLParam)]
def _str(self): try: return self.query() % tuple([sqlify(x) for x in self.values()]) except (ValueError, TypeError): return self.query()
def sqlquote(a): """ Ensures `a` is quoted properly for use in a SQL query. >>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3) <sql: "WHERE x = 't' AND y = 3"> >>> 'WHERE x = ' + sqlquote(True) + ' AND y IN ' + sqlquote([2, 3]) <sql: "WHERE x = 't' AND y IN (2, 3)"> """ if isinstance(a, list): return _sqllist(a) else: return sqlparam(a).sqlquery()
def _process_query(self, sql_query): """Takes the SQLQuery object and returns query string and parameters. """ paramstyle = getattr(self, 'paramstyle', 'pyformat') query = sql_query.query(paramstyle) params = sql_query.values() return query, params
def query(self, sql_query, vars=None, processed=False, _test=False): """ Execute SQL query `sql_query` using dictionary `vars` to interpolate it. If `processed=True`, `vars` is a `reparam`-style list to use instead of interpolating. >>> db = DB(None, {}) >>> db.query("SELECT * FROM foo", _test=True) <sql: 'SELECT * FROM foo'> >>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> >>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> """ if vars is None: vars = {} if not processed and not isinstance(sql_query, SQLQuery): sql_query = reparam(sql_query, vars) if _test: return sql_query db_cursor = self._db_cursor() self._db_execute(db_cursor, sql_query) if db_cursor.description: names = [x[0] for x in db_cursor.description] def iterwrapper(): row = db_cursor.fetchone() while row: yield storage(dict(zip(names, row))) row = db_cursor.fetchone() out = iterbetter(iterwrapper()) out.__len__ = lambda: int(db_cursor.rowcount) out.list = lambda: [storage(dict(zip(names, x))) \ for x in db_cursor.fetchall()] else: out = db_cursor.rowcount if not self.ctx.transactions: self.ctx.commit() return out
def update(self, tables, where, vars=None, _test=False, **values): """ Update `tables` with clause `where` (interpolated using `vars`) and setting `values`. >>> db = DB(None, {}) >>> name = 'Joseph' >>> q = db.update('foo', where='name = $name', name='bob', age=2, ... created=SQLLiteral('NOW()'), vars=locals(), _test=True) >>> q <sql: "UPDATE foo SET age = 2, name = 'bob', created = NOW() WHERE name = 'Joseph'"> >>> q.query() 'UPDATE foo SET age = %s, name = %s, created = NOW() WHERE name = %s' >>> q.values() [2, 'bob', 'Joseph'] """ if vars is None: vars = {} where = self._where(where, vars) query = ( "UPDATE " + sqllist(tables) + " SET " + sqlwhere(values, ', ') + " WHERE " + where) if _test: return query db_cursor = self._db_cursor() self._db_execute(db_cursor, query) if not self.ctx.transactions: self.ctx.commit() return db_cursor.rowcount
def _process_insert_query(self, query, tablename, seqname): return query
def _process_insert_query(self, query, tablename, seqname): if seqname is None: # when seqname is not provided guess the seqname and make sure it exists seqname = tablename + "_id_seq" if seqname not in self._get_all_sequences(): seqname = None if seqname: query += "; SELECT currval('%s')" % seqname return query
def _get_all_sequences(self): """Query postgres to find names of all sequences used in this database.""" if self._sequences is None: q = "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'" self._sequences = set([c.relname for c in self.query(q)]) return self._sequences
def _process_insert_query(self, query, tablename, seqname): return query, SQLQuery('SELECT last_insert_rowid();')
def query(self, *a, **kw): out = DB.query(self, *a, **kw) if isinstance(out, iterbetter): del out.__len__ return out
def _process_query(self, sql_query): """Takes the SQLQuery object and returns query string and parameters. """ # MSSQLDB expects params to be a tuple. # Overwriting the default implementation to convert params to tuple. paramstyle = getattr(self, 'paramstyle', 'pyformat') query = sql_query.query(paramstyle) params = sql_query.values() return query, tuple(params)
def _process_insert_query(self, query, tablename, seqname): if seqname is None: # It is not possible to get seq name from table name in Oracle return query else: return query + "; SELECT %s.currval FROM dual" % seqname
def update(self, tables, where, vars=None, _test=False, **values): """ Update `tables` with clause `where` (interpolated using `vars`) and setting `values`. >>> db = DB(None, {}) >>> name = 'Joseph' >>> q = db.update('foo', where='name = $name', name='bob', age=2, ... created=SQLLiteral('NOW()'), vars=locals(), _test=True) >>> q <sql: "UPDATE foo SET age = 2, created = NOW(), name = 'bob' WHERE name = 'Joseph'"> >>> q.query() 'UPDATE foo SET age = %s, created = NOW(), name = %s WHERE name = %s' >>> q.values() [2, 'bob', 'Joseph'] """ if vars is None: vars = {} where = self._where(where, vars) values = sorted(values.items(), key=lambda t: t[0]) query = ( "UPDATE " + sqllist(tables) + " SET " + sqlwhere(values, ', ') + " WHERE " + where) if _test: return query db_cursor = self._db_cursor() self._db_execute(db_cursor, query) if not self.ctx.transactions: self.ctx.commit() return db_cursor.rowcount