我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用MySQLdb.escape_string()。
def login_page(): try: error = None c, conn = connection() if request.method == 'POST': username = escape_string(request.form['username']).decode() data = c.execute('SELECT * FROM users WHERE username = ("%s");' % username) data = c.fetchone() if sha256_crypt.verify(request.form['password'], data[2]) and (data[1] == username): session['logged_in'] = True session['username'] = username session['favourites'] = data[4] flash('You are now logged in') return redirect(url_for('user_page')) else: error = 'Invalid credentials, try again' gc.collect() return render_template('login.html', error=error) except: error = 'Invalid credentials, try again' return render_template('login.html', error=error)
def addrecipe(): if request.method == 'POST': title = escape_string(request.form['title']) location = escape_string(request.form['country']) ingredients = escape_string(','.join(request.form['ingredients'].split('\r\n')).strip(',')) recipe = escape_string(request.form['recipe']) username = session['username'] c, conn = connection() c.execute('INSERT INTO recipes (title, location, ingredients, recipe, user) VALUES ("%s", "%s", "%s", "%s", "%s");' % (title, location, ingredients, recipe, username)) conn.commit() # Save to the database flash("Thanks for your recipe :)") c.close() conn.close() gc.collect() # Garbage collection return redirect(url_for('newrecipe')) else: return render_template('main.html')
def find_chebi_term2(term): if _platform == "linux" or _platform == "linux2": # linux cp = "{0}/florchebi.jar:{0}/mysql-connector-java-5.1.24-bin.jar:{0}/Tokenizer.jar".format(florchebi_path) elif _platform == "win32": # "Windows..." cp = "{0}/florchebi.jar;{0}/mysql-connector-java-5.1.24-bin.jar;{0}/Tokenizer.jar".format(florchebi_path) florcall = ["java", "-cp", cp, "xldb.flor.match.FlorTextChebi3star", db.escape_string(term), "children", "true", "mychebi201301", "false", "false", "chebi", stoplist, "1"] # print ' '.join(florcall) flor = Popen(florcall, stdout=PIPE) florresult, error = flor.communicate() chebires = florresult.strip().split('\t') # print "chebires: ", chebires if len(chebires) == 3: return (chebires[0], chebires[1], float(chebires[2])) else: return ('0', 'null', 0.0)
def __call__(self, *args, **kwargs): super(TypeFilter, self).__call__() direct_type = self.rule.direct_type # ????????????????????????? if isinstance(self.value, direct_type) and self.rule.safe: return self.value if direct_type == str: if self.rule.safe: return self.value else: import MySQLdb self.value = MySQLdb.escape_string(self.value) if isinstance(self.value, bytes): self.value = self.value.decode('utf-8') return self.value # ???????bool?? elif direct_type == bool and self.value in _false_str_list: return False else: try: return self.rule.direct_type(self.value) except ValueError: raise ParamsValueError(self.error_code, filter=self)
def _encode(obj): if obj is None: return 'NULL' elif isinstance(obj, (bool, int, long, float)): return str(obj) elif isinstance(obj, basestring): # SECURITY NOTE: PAY SPECIAL CARE THIS WHEN CONNECTION IS NOT utf-8 # CHECK THE SAFETY OF THE ENCODING: # # encoding = 'utf-8' # p = ['\\', '"', "'"] # for i in range(0x110000): # c = unichr(i) # try: # e = c.encode(encoding) # except UnicodeEncodeError: # pass # else: # if any(map(lambda q: q in e, p)) and c not in p: # print i, c # # DO NOT USE THIS IF ANYTHING IS IN THE OUTPUT return "'%s'" % MySQLdb.escape_string(_strify(obj)) else: import json return _encode(json.dumps(obj))
def _encode_name(identifier): if isinstance(identifier, RawSQL): return identifier.sql elif isinstance(identifier, list): return ','.join([_encode_name(item) for item in identifier]) return '`%s`' % MySQLdb.escape_string(_strify(identifier))
def _escape_string(self, item): return MySQLdb.escape_string(unicode(item).encode('utf-8')) # pylint: disable=no-member
def _str_escape(s, d): if s == None: return '' return MySQLdb.escape_string(s)
def mysql_escape(f): @wraps(f) def decorated_function(*args, **kwargs): newargs = [] #??????????? for arg in args: #???????? if type(arg) is types.StringType or type(arg) is types.UnicodeType: newargs.append(MySQLdb.escape_string(arg)) #?? elif isinstance(arg, dict): newargs.append(MySQLdb.escape_dict(arg, { types.StringType: _str_escape, types.UnicodeType: _str_escape, types.IntType: _no_escape, types.FloatType: _no_escape })) #??????? else: newargs.append(arg) newargs = tuple(newargs) func = f(*newargs, **kwargs) return func return decorated_function
def reply(): req_msg = request.form['msg'] res_msg = '^_^' # ensure not Chinese match = zhPattern.search(req_msg) if match: res_msg = "Sorry, I can't speak Chinese right now, maybe later." else: res_msg = execute.decode_line(sess, model, enc_vocab, rev_dec_vocab, req_msg ) res_msg = res_msg.replace('_UNK', '^_^') # ensure not empty if res_msg == '': res_msg = 'Let me think about it ...' return jsonify( { 'text': res_msg } ) #insert msg to db sql = "insert into t_dialogs(dialog_type, dialog_time, req_msg, res_msg, req_user, res_user, remark) values('webpage',%d,'%s','%s','%s','%s','')" cur = conn.cursor() cur.execute(sql % (int(time.time()), MySQLdb.escape_string(req_msg), MySQLdb.escape_string(res_msg), 'websession', 'easybot')) conn.commit() conn.close() # Wechat auth
def dbInsert_st(self, table, param): field = ",".join(param.keys()) field_v = ",".join(["'%s'" % MySQLdb.escape_string(k) for k in param.values()]) #field_v = '' sql = "INSERT INTO %s(%s) VALUES (%s)" % ("%s.%s" % (self.dbname, table), field, field_v) #print sql #filename='111.txt' #fp=open(filename,'a+') #fp.write(sql) self.cursor.execute(sql) self.db.commit() return self.getRecord("SELECT LAST_INSERT_ID()", 1) #????
def genstr(str1): if str1: return "'" + MySQLdb.escape_string(str1) + "'" else: return "''"
def register_page(): form = RegistrationForm(request.form) try: if request.method == 'POST' and form.validate(): username = form.username.data email = form.email.data password = sha256_crypt.encrypt(str(form.password.data)) c, conn = connection() x = c.execute('SELECT * FROM users WHERE username = ("%s");' % escape_string(username)) if int(x) > 0: flash('That username is already taken, please choose another') return render_template('register.html', form=form) else: c.execute('INSERT INTO users (username, password, email) VALUES ("%s", "%s", "%s");' % (escape_string(username), escape_string(password), escape_string(email))) conn.commit() flash('Thanks for registering!') c.close() conn.close() gc.collect() session['logged_in'] = True session['username'] = username return redirect(url_for('favourites_page')) return render_template('register.html', form=form) except Exception as e: return render_template('register.html', form=form)
def edit_recipe(rid): # Get the recipe # c.execute('INSERT INTO recipes (title, location, ingredients, recipe, user) VALUES ("%s", "%s", "%s", "%s", "%s");' % c, conn = connection() _ = c.execute('SELECT * FROM recipes WHERE rid="%s"' % rid) recipe = c.fetchone() c.close() conn.close() gc.collect() # Fill the form form = RecipeForm(request.form) form.title.data = recipe[1] form.country.data = recipe[2] form.ingredients.data = '\n'.join(recipe[3].split(',')) form.recipe.data = recipe[4] if request.method == 'POST': title = escape_string(request.form['title']) country = escape_string(request.form['country']) ingredients = escape_string(','.join(request.form['ingredients'].split('\r\n')).strip(',')) recipe = escape_string(request.form['recipe']) # Update the DB c, conn = connection() c.execute('UPDATE recipes SET title="%s", location="%s", ingredients="%s", recipe="%s" WHERE rid=%s' % (title, country, ingredients, recipe, rid)) conn.commit() # Close connection c.close() conn.close() gc.collect() flash('Recipe updated') return redirect(url_for('user_page')) return render_template('edit_recipe.html', form=form)
def sql_escape(self, value): if isinstance(value, str): return MySQLdb.escape_string(value).decode("utf-8") elif isinstance(value, bytes): try: return self.sql_escape(value.decode("utf-8")) except: return value else: return value
def safe(self,s): return MySQLdb.escape_string(s)
def sqlForNonNone(self, value): """MySQL provides a quoting function for string -- this method uses it.""" return "'" + MySQLdb.escape_string(value) + "'"
def __init__(self, datetime, cmd): self.msg = str(datetime) + '\r\r' + cmd + '\r\n\r\n\r\n' if is_log2db: try: global log_id db._ensure_connected() db.insert( "INSERT INTO ttylog (`datetime`,`cmd`,`log_id`) VALUES ('%s','%s','%d')" % (str(datetime), MySQLdb.escape_string(cmd), log_id)) except Exception as err: pass
def __init__(self, host, cmd, remote_ip, result): self.msg = str( host) + '\r\r' + cmd + '\r\r' + remote_ip + '\r\r' + str(result) sql = "INSERT INTO execlog (`user`,`host`,`cmd`,`remote_ip`,`result`) VALUES (%s,%s,%s,%s,%s)" if is_log2db: try: db._ensure_connected() db.insert(sql, user, host, MySQLdb.escape_string(cmd), remote_ip, MySQLdb.escape_string(str(result))) except Exception as err: pass
def __init__(self, host, filename, type, remote_ip, result): self.msg = str( host ) + '\r\r' + filename + '\r\r' + type + '\r\r' + remote_ip + '\r\r' + str( result) + '\r\n\r\n\r\n' if is_log2db: try: db._ensure_connected() db.insert( "INSERT INTO filelog (`user`,`host`,`filename`,`type`,`remote_ip`,`result`) VALUES ('%s','%s','%s','%s','%s','%s')" % (user, host, filename, type, remote_ip, MySQLdb.escape_string(str(result)))) except Exception as err: pass
def init_database(host, user, passwd, dbname): warnings.filterwarnings('ignore', message = "Table.*already exists") warnings.filterwarnings('ignore', message = "Can't create.*database exists") #???if not exists???? = = db = MySQLdb.connect(host, user, passwd) tx = db.cursor() tx.execute('set names utf8mb4') tx.execute('create database if not exists `%s`default charset utf8mb4\ default collate utf8mb4_general_ci;' % MySQLdb.escape_string(dbname)) #????????? #???MySQLdb???????? ?????? db.select_db(dbname) tx.execute("create table if not exists thread(\ id BIGINT(12), title VARCHAR(100), author VARCHAR(30), reply_num INT(4),\ good BOOL, PRIMARY KEY (id)) CHARSET=utf8mb4;") tx.execute("create table if not exists post(\ id BIGINT(12), floor INT(4), author VARCHAR(30), content TEXT,\ time DATETIME, comment_num INT(4), thread_id BIGINT(12),PRIMARY KEY (id),\ FOREIGN KEY (thread_id) REFERENCES thread(id)) CHARSET=utf8mb4;") tx.execute("create table if not exists comment(id BIGINT(12),\ author VARCHAR(30), content TEXT, time DATETIME, post_id BIGINT(12),\ PRIMARY KEY (id), FOREIGN KEY (post_id) REFERENCES post(id)) CHARSET=utf8mb4;") db.commit() db.close() warnings.resetwarnings() warnings.filterwarnings('ignore', message = ".*looks like a ") # bs.get_text???url?????????
def wooyun(pages=0): searchword = request.args.get('key', '').strip() log_id = request.args.get('id', '').strip() data = {} table = list() cursor = conn() if log_id: # ??execute????SQL?? cursor.execute(MySQLdb.escape_string("SELECT * from {MYSQL_TABLES} where gid={log_id}" .format(MYSQL_TABLES=MYSQL_TABLES,log_id=log_id))) # ?? fetchone() ?????????? results = cursor.fetchone() data["id"] = results[0] data["text"] = results[2] data["title"] = results[1] if searchword: sql = 'SELECT gid,title from {MYSQL_TABLES} where title like "%{searchword}%"'\ .format(MYSQL_TABLES=MYSQL_TABLES, searchword=searchword) cursor.execute(sql) results = cursor.fetchall() for rows in results: tdata = {"id": rows[0], "title": rows[1]} table.append(tdata) cursor.close() return render_template("wooyun.html", title="??????", data=data, table=table) # ??wooyun???? -??????
def __get_stock_ltgd(self, stock): #10????? s_code = stock['s_code'].upper() self.curl_get('https://xueqiu.com/8205215793') url = 'https://xueqiu.com/stock/f10/otsholder.json?symbol=%s&page=1&size=4&_=1472904975952' % s_code _data = self.curl_get(url) re = json.loads(_data) if re['list'] is None: print "==========" return 1 for i in range(0, len(re['list'])): one = re['list'][i] for j in range(0, len(one['list'])): chg = one['list'][j]['chg'] if chg is None: chg = 0 sh_code = one['list'][j]['shholdercode'] if sh_code is None: sh_code = 0 name = one['list'][j]['shholdername'].replace("\\", "") name = name.replace("'", "") indata = { 'report_date': one['list'][j]['publishdate'], 'end_date': one['list'][j]['enddate'], 's_code': s_code, 'sh_code': sh_code, 'sh_name': MySQLdb.escape_string(name), 'sh_type': one['list'][j]['shholdertype'], 'sh_rank': one['list'][j]['rank2'], 'sh_shares': one['list'][j]['holderamt']/10000, 'sh_shares_p': one['list'][j]['pctoffloatshares'], 'sh_shares_a_p': one['list'][j]['holderrto'], 'sh_equity_type': one['list'][j]['shholdernature'], 'ishis': one['list'][j]['ishis'], 'chg': chg, } if int(indata['end_date']) <= 20140930: continue _where = "s_code='%s' and end_date=%s and sh_name='%s'" % (s_code, one['list'][j]['enddate'], name) _has = self.mysql.fetch_one("select * from s_stock_shareholder where %s" % _where) if _has is not None: self.mysql.dbUpdate('s_stock_shareholder', indata, _where) else: self.mysql.dbInsert('s_stock_shareholder', indata) print indata
def moviedb(i,globalmovieid,title,datelist,genre,content_rating,ratings,rating_value,plot,link,poster): MySQLdb.escape_string("'") plot = remove_all_special_chars(plot) #db utils db = getCursor() cur = db.cursor() #check if movie already in database cur.execute("select count(1) from Movie where imdbid = %s or movieid = %s ", [i,globalmovieid]) if cur.fetchone()[0]: print 'Movie exits' #if record exists do nothing ,its movie,movie never changes else: print 'Movie not exists' print 'insert..' if len(datelist) < 4: datelist = ["1","January","1971"] #if movie is less than 2 months old #insert into movie if(is_date_older_2months(datelist) == False): print globalmovieid,i,title print ' '.join(datelist) if not rating_value: rating_value = 5 if not content_rating: content_rating = "R" sql = "insert into Movie(movieid,imdbid,title,plot,altplot,date,year,month,day,genre,ratings,ratingvalue,contentrating,poster) " \ "values('%s','%s','%s','%s','%s','%s','%d','%s','%s','%s','%s','%s','%s','%s') " % \ (globalmovieid,i,title.strip(),plot.strip()," ",' '.join(datelist),int(datelist[2]),month_to_int(datelist[1]),0,genre,int(ratings),float(rating_value),content_rating,poster) try: cur.execute(sql) db.commit() except MySQLdb.Error, e: db.rollback() print e db.close() else: print 'do nothing' #coming soon will be maintained by other program
def run(self): try: self.conn = MySQLdb.connect(host=self.web.host,user=self.web.user,passwd = self.web.passwd,db=self.web.db,charset="utf8",connect_timeout=5) print "connet to %s ,%s\n" % (self.web.host,self.web.db) except: print "can't connet to %s ,%s\n" % (self.web.host,self.web.db) return i = 0 try: # python UCS-4 build????? highpoints = re.compile('[\\x00-\\xFF]{2,4}') except re.error: # python UCS-2 build????? highpoints = re.compile('[\uD800-\uDBFF][\uDC00-\uDFFF]') for index,keyword in enumerate(self.web.keyword): index = index + 1 #???????url for i in range(1,2):#?????? myurl = url % (keyword , i) #????????? food = youkuSGML(self.web.scope) #?????????? try: context = urllib2.urlopen(myurl,timeout=5) content = context.read() except: print "can't read from %s " % myurl continue food.feed(content) self.result += len(food.result) #????????? for clist in food.result: score = round(random.random(),2)*10 scoreer = random.randint(10,100) atime = int(time.time()) e = False if clist["title"] == '': continue clist["title"] = MySQLdb.escape_string(clist["title"]) sql = "insert into gx_video(`cid`,`intro`,`title`,`picurl`,`playurl`,`score`,`scoreer`,`keywords`,`color`,`actor`,`director`,`content`,`area`,`language`,`year`,`serial`,`addtime`,`hits`,`monthhits`,`weekhits`,`dayhits`,`hitstime`,`stars`,`status`,`up`,`down`,`downurl`,`inputer`,`reurl`,`letter`,`genuine`) values (%d,'',\'%s\',\'%s\',\'%s\',%d,%d,'','','','','','','',0,0,%d,0,0,0,0,0,0,1,0,0,'','','','',0)" % (index,clist["title"],clist["pic"],clist["link"],score,scoreer,atime) print sql try: try: self.conn.ping() except Exception,e: try: self.conn = MySQLdb.connect(host=self.web.host,user=self.web.user,passwd = self.web.passwd,db=self.web.db,charset="utf8",connect_timeout=5) print "Reconnet to %s ,%s\n" % (self.web.host,self.web.db) except: print "can't Reconnet to %s ,%s\n" % (self.web.host,self.web.db) e = True if not e: self.conn.query(sql) self.rfinally += 1 except:print sql+"/n" print "%s get %d results\n and %s insert successfully" % (self.web.db,self.result,self.rfinally) self.conn.close()
def get_best_go(self): cur = db.cursor() # synonym query = """SELECT DISTINCT t.acc, t.name, t.ic FROM term t WHERE t.acc IN (%s) ORDER BY t.ic ASC LIMIT 1;""" # or DESC # print "QUERY", query format_strings = ','.join(['%s'] * len(self.go_ids)) cur.execute(query % format_strings, (self.go_ids)) res = cur.fetchone() if res is not None: # print self.text, res[1:] logging.info("best GO for {}: {}".format(self.text, " ".join([str(r) for r in res]))) self.best_go = res[0] else: logging.info("NO GO for {}".format(self.text)) self.best_go = "" # def normalize(self): # term = MySQLdb.escape_string(self.text) # # adjust - adjust the final score # match = () # cur = db.cursor() # # synonym # query = """SELECT DISTINCT t.acc, t.name, s.term_synonym # FROM term t, term_synonym s # WHERE s.term_synonym LIKE %s and s.term_id = t.id # ORDER BY t.ic ASC # LIMIT 1;""" # or DESC # # print "QUERY", query # # cur.execute(query, ("%" + term + "%",)) # # res = cur.fetchone() # if res is not None: # print res # else: # query = """SELECT DISTINCT t.acc, t.name, p.name # FROM term t, prot p, prot_GOA_BP a # WHERE p.name LIKE %s and p.id = a.prot_id and a.term_id = t.id # ORDER BY t.ic ASC # LIMIT 1;""" # or DESC # cur.execute(query, (term,)) # res = cur.fetchone() # print res # token = Token2("IL-2") # token.start, token.dstart, token.end, token.dend = 0,0,0,0 # p = ProteinEntity([token], "", text=sys.argv[1]) # p.normalize()
def register_page(): try: form = RegistrationForm(request.form) if request.method == "POST" and form.validate(): # Pull data from html form username = form.username.data email = form.email.data # Immediately encrypt via sha256 password = sha256_crypt.encrypt((str(form.password.data))) # Connect to database cursor, conn = connection() # Using cursor, select a username in database. inject_attk_check() protects against sql injection. un_attempt = cursor.execute("SELECT * FROM users WHERE username = (%s)", (inject_attk_check(username))) # Check to see if username is taken by searching for username in db first. # If returned value is longer than 0 then the username is already taken. if len(int(un_attempt)) > 0: # Call to flask.flash() flash("That username is already taken, please try another") render_template('register.html', form=form) else: cursor.execute("INSERT INTO users(username, password, email) VALUES (%s, %s, %s)", inject_attk_check(username), inject_attk_check(password), inject_attk_check(email)) # Commit changes to database conn.commit() flash("Thanks for registering") # Close cursor and connection cursor.close() conn.close() # Garbage collect after closing database connections. This is to ensure we don't have any leaks. gc.collect() session["logged_in"] = True session['username'] = username return redirect(url_for('dashboard')) return render_template("register.html", form=form) # fix this after debugging except Exception as e: return str(e) # Check to make sure we only run the web server when this file is run directly