Python MySQLdb 模块,escape_string() 实例源码
我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用MySQLdb.escape_string()。
def login_page():
try:
error = None
c, conn = connection()
if request.method == 'POST':
username = escape_string(request.form['username']).decode()
data = c.execute('SELECT * FROM users WHERE username = ("%s");' % username)
data = c.fetchone()
if sha256_crypt.verify(request.form['password'], data[2]) and (data[1] == username):
session['logged_in'] = True
session['username'] = username
session['favourites'] = data[4]
flash('You are now logged in')
return redirect(url_for('user_page'))
else:
error = 'Invalid credentials, try again'
gc.collect()
return render_template('login.html', error=error)
except:
error = 'Invalid credentials, try again'
return render_template('login.html', error=error)
def addrecipe():
if request.method == 'POST':
title = escape_string(request.form['title'])
location = escape_string(request.form['country'])
ingredients = escape_string(','.join(request.form['ingredients'].split('\r\n')).strip(','))
recipe = escape_string(request.form['recipe'])
username = session['username']
c, conn = connection()
c.execute('INSERT INTO recipes (title, location, ingredients, recipe, user) VALUES ("%s", "%s", "%s", "%s", "%s");' %
(title, location, ingredients, recipe, username))
conn.commit() # Save to the database
flash("Thanks for your recipe :)")
c.close()
conn.close()
gc.collect() # Garbage collection
return redirect(url_for('newrecipe'))
else:
return render_template('main.html')
def find_chebi_term2(term):
if _platform == "linux" or _platform == "linux2":
# linux
cp = "{0}/florchebi.jar:{0}/mysql-connector-java-5.1.24-bin.jar:{0}/Tokenizer.jar".format(florchebi_path)
elif _platform == "win32":
# "Windows..."
cp = "{0}/florchebi.jar;{0}/mysql-connector-java-5.1.24-bin.jar;{0}/Tokenizer.jar".format(florchebi_path)
florcall = ["java", "-cp", cp, "xldb.flor.match.FlorTextChebi3star", db.escape_string(term),
"children", "true", "mychebi201301", "false", "false", "chebi", stoplist, "1"]
# print ' '.join(florcall)
flor = Popen(florcall, stdout=PIPE)
florresult, error = flor.communicate()
chebires = florresult.strip().split('\t')
# print "chebires: ", chebires
if len(chebires) == 3:
return (chebires[0], chebires[1], float(chebires[2]))
else:
return ('0', 'null', 0.0)
def __call__(self, *args, **kwargs):
super(TypeFilter, self).__call__()
direct_type = self.rule.direct_type
# ?????????????????????????
if isinstance(self.value, direct_type) and self.rule.safe:
return self.value
if direct_type == str:
if self.rule.safe:
return self.value
else:
import MySQLdb
self.value = MySQLdb.escape_string(self.value)
if isinstance(self.value, bytes):
self.value = self.value.decode('utf-8')
return self.value
# ???????bool??
elif direct_type == bool and self.value in _false_str_list:
return False
else:
try:
return self.rule.direct_type(self.value)
except ValueError:
raise ParamsValueError(self.error_code, filter=self)
def _encode(obj):
if obj is None:
return 'NULL'
elif isinstance(obj, (bool, int, long, float)):
return str(obj)
elif isinstance(obj, basestring):
# SECURITY NOTE: PAY SPECIAL CARE THIS WHEN CONNECTION IS NOT utf-8
# CHECK THE SAFETY OF THE ENCODING:
#
# encoding = 'utf-8'
# p = ['\\', '"', "'"]
# for i in range(0x110000):
# c = unichr(i)
# try:
# e = c.encode(encoding)
# except UnicodeEncodeError:
# pass
# else:
# if any(map(lambda q: q in e, p)) and c not in p:
# print i, c
#
# DO NOT USE THIS IF ANYTHING IS IN THE OUTPUT
return "'%s'" % MySQLdb.escape_string(_strify(obj))
else:
import json
return _encode(json.dumps(obj))
def _encode_name(identifier):
if isinstance(identifier, RawSQL):
return identifier.sql
elif isinstance(identifier, list):
return ','.join([_encode_name(item) for item in identifier])
return '`%s`' % MySQLdb.escape_string(_strify(identifier))
def _escape_string(self, item):
return MySQLdb.escape_string(unicode(item).encode('utf-8')) # pylint: disable=no-member
def _str_escape(s, d):
if s == None:
return ''
return MySQLdb.escape_string(s)
def mysql_escape(f):
@wraps(f)
def decorated_function(*args, **kwargs):
newargs = []
#???????????
for arg in args:
#????????
if type(arg) is types.StringType or type(arg) is types.UnicodeType:
newargs.append(MySQLdb.escape_string(arg))
#??
elif isinstance(arg, dict):
newargs.append(MySQLdb.escape_dict(arg, {
types.StringType: _str_escape,
types.UnicodeType: _str_escape,
types.IntType: _no_escape,
types.FloatType: _no_escape
}))
#???????
else:
newargs.append(arg)
newargs = tuple(newargs)
func = f(*newargs, **kwargs)
return func
return decorated_function
def reply():
req_msg = request.form['msg']
res_msg = '^_^'
# ensure not Chinese
match = zhPattern.search(req_msg)
if match:
res_msg = "Sorry, I can't speak Chinese right now, maybe later."
else:
res_msg = execute.decode_line(sess, model, enc_vocab, rev_dec_vocab, req_msg )
res_msg = res_msg.replace('_UNK', '^_^')
# ensure not empty
if res_msg == '':
res_msg = 'Let me think about it ...'
return jsonify( { 'text': res_msg } )
#insert msg to db
sql = "insert into t_dialogs(dialog_type, dialog_time, req_msg, res_msg, req_user, res_user, remark) values('webpage',%d,'%s','%s','%s','%s','')"
cur = conn.cursor()
cur.execute(sql % (int(time.time()), MySQLdb.escape_string(req_msg), MySQLdb.escape_string(res_msg), 'websession', 'easybot'))
conn.commit()
conn.close()
# Wechat auth
def dbInsert_st(self, table, param):
field = ",".join(param.keys())
field_v = ",".join(["'%s'" % MySQLdb.escape_string(k) for k in param.values()])
#field_v = ''
sql = "INSERT INTO %s(%s) VALUES (%s)" % ("%s.%s" % (self.dbname, table), field, field_v)
#print sql
#filename='111.txt'
#fp=open(filename,'a+')
#fp.write(sql)
self.cursor.execute(sql)
self.db.commit()
return self.getRecord("SELECT LAST_INSERT_ID()", 1)
#????
def genstr(str1):
if str1:
return "'" + MySQLdb.escape_string(str1) + "'"
else:
return "''"
def genstr(str1):
if str1:
return "'" + MySQLdb.escape_string(str1) + "'"
else:
return "''"
def register_page():
form = RegistrationForm(request.form)
try:
if request.method == 'POST' and form.validate():
username = form.username.data
email = form.email.data
password = sha256_crypt.encrypt(str(form.password.data))
c, conn = connection()
x = c.execute('SELECT * FROM users WHERE username = ("%s");' %
escape_string(username))
if int(x) > 0:
flash('That username is already taken, please choose another')
return render_template('register.html', form=form)
else:
c.execute('INSERT INTO users (username, password, email) VALUES ("%s", "%s", "%s");' %
(escape_string(username), escape_string(password), escape_string(email)))
conn.commit()
flash('Thanks for registering!')
c.close()
conn.close()
gc.collect()
session['logged_in'] = True
session['username'] = username
return redirect(url_for('favourites_page'))
return render_template('register.html', form=form)
except Exception as e:
return render_template('register.html', form=form)
def edit_recipe(rid):
# Get the recipe
# c.execute('INSERT INTO recipes (title, location, ingredients, recipe, user) VALUES ("%s", "%s", "%s", "%s", "%s");' %
c, conn = connection()
_ = c.execute('SELECT * FROM recipes WHERE rid="%s"' % rid)
recipe = c.fetchone()
c.close()
conn.close()
gc.collect()
# Fill the form
form = RecipeForm(request.form)
form.title.data = recipe[1]
form.country.data = recipe[2]
form.ingredients.data = '\n'.join(recipe[3].split(','))
form.recipe.data = recipe[4]
if request.method == 'POST':
title = escape_string(request.form['title'])
country = escape_string(request.form['country'])
ingredients = escape_string(','.join(request.form['ingredients'].split('\r\n')).strip(','))
recipe = escape_string(request.form['recipe'])
# Update the DB
c, conn = connection()
c.execute('UPDATE recipes SET title="%s", location="%s", ingredients="%s", recipe="%s" WHERE rid=%s' % (title, country, ingredients, recipe, rid))
conn.commit()
# Close connection
c.close()
conn.close()
gc.collect()
flash('Recipe updated')
return redirect(url_for('user_page'))
return render_template('edit_recipe.html', form=form)
def reply():
req_msg = request.form['msg']
res_msg = '^_^'
# ensure not Chinese
match = zhPattern.search(req_msg)
if match:
res_msg = "Sorry, I can't speak Chinese right now, maybe later."
else:
res_msg = execute.decode_line(sess, model, enc_vocab, rev_dec_vocab, req_msg )
res_msg = res_msg.replace('_UNK', '^_^')
# ensure not empty
if res_msg == '':
res_msg = 'Let me think about it ...'
return jsonify( { 'text': res_msg } )
#insert msg to db
sql = "insert into t_dialogs(dialog_type, dialog_time, req_msg, res_msg, req_user, res_user, remark) values('webpage',%d,'%s','%s','%s','%s','')"
cur = conn.cursor()
cur.execute(sql % (int(time.time()), MySQLdb.escape_string(req_msg), MySQLdb.escape_string(res_msg), 'websession', 'easybot'))
conn.commit()
conn.close()
# Wechat auth
def sql_escape(self, value):
if isinstance(value, str):
return MySQLdb.escape_string(value).decode("utf-8")
elif isinstance(value, bytes):
try:
return self.sql_escape(value.decode("utf-8"))
except:
return value
else:
return value
def sqlForNonNone(self, value):
"""MySQL provides a quoting function for string -- this method uses it."""
return "'" + MySQLdb.escape_string(value) + "'"
def __init__(self, datetime, cmd):
self.msg = str(datetime) + '\r\r' + cmd + '\r\n\r\n\r\n'
if is_log2db:
try:
global log_id
db._ensure_connected()
db.insert(
"INSERT INTO ttylog (`datetime`,`cmd`,`log_id`) VALUES ('%s','%s','%d')"
% (str(datetime), MySQLdb.escape_string(cmd), log_id))
except Exception as err:
pass
def __init__(self, host, cmd, remote_ip, result):
self.msg = str(
host) + '\r\r' + cmd + '\r\r' + remote_ip + '\r\r' + str(result)
sql = "INSERT INTO execlog (`user`,`host`,`cmd`,`remote_ip`,`result`) VALUES (%s,%s,%s,%s,%s)"
if is_log2db:
try:
db._ensure_connected()
db.insert(sql, user, host,
MySQLdb.escape_string(cmd), remote_ip,
MySQLdb.escape_string(str(result)))
except Exception as err:
pass
def __init__(self, host, filename, type, remote_ip, result):
self.msg = str(
host
) + '\r\r' + filename + '\r\r' + type + '\r\r' + remote_ip + '\r\r' + str(
result) + '\r\n\r\n\r\n'
if is_log2db:
try:
db._ensure_connected()
db.insert(
"INSERT INTO filelog (`user`,`host`,`filename`,`type`,`remote_ip`,`result`) VALUES ('%s','%s','%s','%s','%s','%s')"
% (user, host, filename, type, remote_ip,
MySQLdb.escape_string(str(result))))
except Exception as err:
pass
def init_database(host, user, passwd, dbname):
warnings.filterwarnings('ignore', message = "Table.*already exists")
warnings.filterwarnings('ignore', message = "Can't create.*database exists")
#???if not exists???? = =
db = MySQLdb.connect(host, user, passwd)
tx = db.cursor()
tx.execute('set names utf8mb4')
tx.execute('create database if not exists `%s`default charset utf8mb4\
default collate utf8mb4_general_ci;' % MySQLdb.escape_string(dbname))
#?????????
#???MySQLdb???????? ??????
db.select_db(dbname)
tx.execute("create table if not exists thread(\
id BIGINT(12), title VARCHAR(100), author VARCHAR(30), reply_num INT(4),\
good BOOL, PRIMARY KEY (id)) CHARSET=utf8mb4;")
tx.execute("create table if not exists post(\
id BIGINT(12), floor INT(4), author VARCHAR(30), content TEXT,\
time DATETIME, comment_num INT(4), thread_id BIGINT(12),PRIMARY KEY (id),\
FOREIGN KEY (thread_id) REFERENCES thread(id)) CHARSET=utf8mb4;")
tx.execute("create table if not exists comment(id BIGINT(12),\
author VARCHAR(30), content TEXT, time DATETIME, post_id BIGINT(12),\
PRIMARY KEY (id), FOREIGN KEY (post_id) REFERENCES post(id)) CHARSET=utf8mb4;")
db.commit()
db.close()
warnings.resetwarnings()
warnings.filterwarnings('ignore', message = ".*looks like a ")
# bs.get_text???url?????????
def wooyun(pages=0):
searchword = request.args.get('key', '').strip()
log_id = request.args.get('id', '').strip()
data = {}
table = list()
cursor = conn()
if log_id:
# ??execute????SQL??
cursor.execute(MySQLdb.escape_string("SELECT * from {MYSQL_TABLES} where gid={log_id}"
.format(MYSQL_TABLES=MYSQL_TABLES,log_id=log_id)))
# ?? fetchone() ??????????
results = cursor.fetchone()
data["id"] = results[0]
data["text"] = results[2]
data["title"] = results[1]
if searchword:
sql = 'SELECT gid,title from {MYSQL_TABLES} where title like "%{searchword}%"'\
.format(MYSQL_TABLES=MYSQL_TABLES, searchword=searchword)
cursor.execute(sql)
results = cursor.fetchall()
for rows in results:
tdata = {"id": rows[0], "title": rows[1]}
table.append(tdata)
cursor.close()
return render_template("wooyun.html", title="??????", data=data, table=table)
# ??wooyun???? -??????
def __get_stock_ltgd(self, stock):
#10?????
s_code = stock['s_code'].upper()
self.curl_get('https://xueqiu.com/8205215793')
url = 'https://xueqiu.com/stock/f10/otsholder.json?symbol=%s&page=1&size=4&_=1472904975952' % s_code
_data = self.curl_get(url)
re = json.loads(_data)
if re['list'] is None:
print "=========="
return 1
for i in range(0, len(re['list'])):
one = re['list'][i]
for j in range(0, len(one['list'])):
chg = one['list'][j]['chg']
if chg is None:
chg = 0
sh_code = one['list'][j]['shholdercode']
if sh_code is None:
sh_code = 0
name = one['list'][j]['shholdername'].replace("\\", "")
name = name.replace("'", "")
indata = {
'report_date': one['list'][j]['publishdate'],
'end_date': one['list'][j]['enddate'],
's_code': s_code,
'sh_code': sh_code,
'sh_name': MySQLdb.escape_string(name),
'sh_type': one['list'][j]['shholdertype'],
'sh_rank': one['list'][j]['rank2'],
'sh_shares': one['list'][j]['holderamt']/10000,
'sh_shares_p': one['list'][j]['pctoffloatshares'],
'sh_shares_a_p': one['list'][j]['holderrto'],
'sh_equity_type': one['list'][j]['shholdernature'],
'ishis': one['list'][j]['ishis'],
'chg': chg,
}
if int(indata['end_date']) <= 20140930:
continue
_where = "s_code='%s' and end_date=%s and sh_name='%s'" % (s_code, one['list'][j]['enddate'], name)
_has = self.mysql.fetch_one("select * from s_stock_shareholder where %s" % _where)
if _has is not None:
self.mysql.dbUpdate('s_stock_shareholder', indata, _where)
else:
self.mysql.dbInsert('s_stock_shareholder', indata)
print indata
def moviedb(i,globalmovieid,title,datelist,genre,content_rating,ratings,rating_value,plot,link,poster):
MySQLdb.escape_string("'")
plot = remove_all_special_chars(plot)
#db utils
db = getCursor()
cur = db.cursor()
#check if movie already in database
cur.execute("select count(1) from Movie where imdbid = %s or movieid = %s ", [i,globalmovieid])
if cur.fetchone()[0]:
print 'Movie exits'
#if record exists do nothing ,its movie,movie never changes
else:
print 'Movie not exists'
print 'insert..'
if len(datelist) < 4:
datelist = ["1","January","1971"]
#if movie is less than 2 months old
#insert into movie
if(is_date_older_2months(datelist) == False):
print globalmovieid,i,title
print ' '.join(datelist)
if not rating_value:
rating_value = 5
if not content_rating:
content_rating = "R"
sql = "insert into Movie(movieid,imdbid,title,plot,altplot,date,year,month,day,genre,ratings,ratingvalue,contentrating,poster) " \
"values('%s','%s','%s','%s','%s','%s','%d','%s','%s','%s','%s','%s','%s','%s') " % \
(globalmovieid,i,title.strip(),plot.strip()," ",' '.join(datelist),int(datelist[2]),month_to_int(datelist[1]),0,genre,int(ratings),float(rating_value),content_rating,poster)
try:
cur.execute(sql)
db.commit()
except MySQLdb.Error, e:
db.rollback()
print e
db.close()
else:
print 'do nothing'
#coming soon will be maintained by other program
def run(self):
try:
self.conn = MySQLdb.connect(host=self.web.host,user=self.web.user,passwd = self.web.passwd,db=self.web.db,charset="utf8",connect_timeout=5)
print "connet to %s ,%s\n" % (self.web.host,self.web.db)
except:
print "can't connet to %s ,%s\n" % (self.web.host,self.web.db)
return
i = 0
try:
# python UCS-4 build?????
highpoints = re.compile('[\\x00-\\xFF]{2,4}')
except re.error:
# python UCS-2 build?????
highpoints = re.compile('[\uD800-\uDBFF][\uDC00-\uDFFF]')
for index,keyword in enumerate(self.web.keyword):
index = index + 1
#???????url
for i in range(1,2):#??????
myurl = url % (keyword , i)
#?????????
food = youkuSGML(self.web.scope)
#??????????
try:
context = urllib2.urlopen(myurl,timeout=5)
content = context.read()
except:
print "can't read from %s " % myurl
continue
food.feed(content)
self.result += len(food.result)
#?????????
for clist in food.result:
score = round(random.random(),2)*10
scoreer = random.randint(10,100)
atime = int(time.time())
e = False
if clist["title"] == '':
continue
clist["title"] = MySQLdb.escape_string(clist["title"])
sql = "insert into gx_video(`cid`,`intro`,`title`,`picurl`,`playurl`,`score`,`scoreer`,`keywords`,`color`,`actor`,`director`,`content`,`area`,`language`,`year`,`serial`,`addtime`,`hits`,`monthhits`,`weekhits`,`dayhits`,`hitstime`,`stars`,`status`,`up`,`down`,`downurl`,`inputer`,`reurl`,`letter`,`genuine`) values (%d,'',\'%s\',\'%s\',\'%s\',%d,%d,'','','','','','','',0,0,%d,0,0,0,0,0,0,1,0,0,'','','','',0)" % (index,clist["title"],clist["pic"],clist["link"],score,scoreer,atime)
print sql
try:
try:
self.conn.ping()
except Exception,e:
try:
self.conn = MySQLdb.connect(host=self.web.host,user=self.web.user,passwd = self.web.passwd,db=self.web.db,charset="utf8",connect_timeout=5)
print "Reconnet to %s ,%s\n" % (self.web.host,self.web.db)
except:
print "can't Reconnet to %s ,%s\n" % (self.web.host,self.web.db)
e = True
if not e:
self.conn.query(sql)
self.rfinally += 1
except:print sql+"/n"
print "%s get %d results\n and %s insert successfully" % (self.web.db,self.result,self.rfinally)
self.conn.close()
def get_best_go(self):
cur = db.cursor()
# synonym
query = """SELECT DISTINCT t.acc, t.name, t.ic
FROM term t
WHERE t.acc IN (%s)
ORDER BY t.ic ASC
LIMIT 1;""" # or DESC
# print "QUERY", query
format_strings = ','.join(['%s'] * len(self.go_ids))
cur.execute(query % format_strings, (self.go_ids))
res = cur.fetchone()
if res is not None:
# print self.text, res[1:]
logging.info("best GO for {}: {}".format(self.text, " ".join([str(r) for r in res])))
self.best_go = res[0]
else:
logging.info("NO GO for {}".format(self.text))
self.best_go = ""
# def normalize(self):
# term = MySQLdb.escape_string(self.text)
# # adjust - adjust the final score
# match = ()
# cur = db.cursor()
# # synonym
# query = """SELECT DISTINCT t.acc, t.name, s.term_synonym
# FROM term t, term_synonym s
# WHERE s.term_synonym LIKE %s and s.term_id = t.id
# ORDER BY t.ic ASC
# LIMIT 1;""" # or DESC
# # print "QUERY", query
#
# cur.execute(query, ("%" + term + "%",))
#
# res = cur.fetchone()
# if res is not None:
# print res
# else:
# query = """SELECT DISTINCT t.acc, t.name, p.name
# FROM term t, prot p, prot_GOA_BP a
# WHERE p.name LIKE %s and p.id = a.prot_id and a.term_id = t.id
# ORDER BY t.ic ASC
# LIMIT 1;""" # or DESC
# cur.execute(query, (term,))
# res = cur.fetchone()
# print res
# token = Token2("IL-2")
# token.start, token.dstart, token.end, token.dend = 0,0,0,0
# p = ProteinEntity([token], "", text=sys.argv[1])
# p.normalize()
def register_page():
try:
form = RegistrationForm(request.form)
if request.method == "POST" and form.validate():
# Pull data from html form
username = form.username.data
email = form.email.data
# Immediately encrypt via sha256
password = sha256_crypt.encrypt((str(form.password.data)))
# Connect to database
cursor, conn = connection()
# Using cursor, select a username in database. inject_attk_check() protects against sql injection.
un_attempt = cursor.execute("SELECT * FROM users WHERE username = (%s)",
(inject_attk_check(username)))
# Check to see if username is taken by searching for username in db first.
# If returned value is longer than 0 then the username is already taken.
if len(int(un_attempt)) > 0:
# Call to flask.flash()
flash("That username is already taken, please try another")
render_template('register.html', form=form)
else:
cursor.execute("INSERT INTO users(username, password, email) VALUES (%s, %s, %s)",
inject_attk_check(username), inject_attk_check(password), inject_attk_check(email))
# Commit changes to database
conn.commit()
flash("Thanks for registering")
# Close cursor and connection
cursor.close()
conn.close()
# Garbage collect after closing database connections. This is to ensure we don't have any leaks.
gc.collect()
session["logged_in"] = True
session['username'] = username
return redirect(url_for('dashboard'))
return render_template("register.html", form=form)
# fix this after debugging
except Exception as e:
return str(e)
# Check to make sure we only run the web server when this file is run directly