Python MySQLdb 模块,Connection() 实例源码
我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用MySQLdb.Connection()。
def db_connect(self):
"""?????"""
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host,
port=self.port,
user=self.user,
passwd=self.passwd,
charset=self.charset,
use_unicode=False)
except MySQLdb.Error, e:
log_db.error('connect error:' + str(e))
self.cursor = self.conn.cursor()
if not self.cursor:
raise (NameError, "Connect failure")
log_db.warning("???????")
self.db_lock.release()
def db_connect(self):
"""?????"""
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host,
port=self.port,
user=self.user,
passwd=self.passwd,
charset=self.charset,
use_unicode=False)
except MySQLdb.Error, e:
log_db.error('connect error:' + str(e))
self.cursor = self.conn.cursor()
if not self.cursor:
raise (NameError, "Connect failure")
log_db.warning("???????")
self.db_lock.release()
def db_connect(self):
"""?????"""
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host,
port=self.port,
user=self.user,
passwd=self.passwd,
charset=self.charset)
except MySQLdb.Error, e:
print ('connect error:' + str(e))
self.cursor = self.conn.cursor()
if not self.cursor:
raise (NameError, "Connect failure")
print ("???????")
self.db_lock.release()
def get_connect(self):
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host = self.host, user = self.user, passwd = self.passwd, charset = self.charset)
except MySQLdb.Error, e:
print 'get_connect error_info: %d: %s' % (e.args[0], e.args[1])
self.cursor = self.conn.cursor()
if not self.cursor:
raise(NameError, "Connect failure")
# # ????
# try:
# self.cursor.execute('SET character_set_connection=utf8, character_set_results=utf8, character_set_client=utf8')
# except MySQLdb.Error, e:
# print str(datetime.datetime.now()).split(".")[0], "ERROR %d: %s" % (e.args[0], e.args[1])
self.db_lock.release()
# ???????
def get_data():
global data_dict
conn = MySQLdb.Connection(host="localhost", user="root", passwd="123456",
charset="UTF8")
conn.select_db('mms')
cursor = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
cursor.execute("select * from t_data where item_id = (select max(item_id)from t_data)")
conn.commit()
tmp = cursor.fetchall()[0]
data_dict['humidity'].set(tmp['humidity'])
data_dict['temperature'].set(tmp['temperature'])
data_dict['pressure'].set(tmp['pressure'])
data_dict['sea_level_press'].set(tmp['sea_level_press'])
data_dict['illuminance'].set(tmp['illuminance'])
data_dict['API'].set(tmp['api'])
data_dict['location'].set(0)
data_dict['altitude'].set(tmp['altitude'])
data_dict['rainfall'].set(tmp['rainfall'])
cursor.close()
conn.close()
return data_dict
def get_connect(self):
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host, user=self.user, passwd=self.passwd, charset=self.charset)
except MySQLdb.Error, e:
print str(datetime.datetime.now()).split(".")[0], "ERROR %d: %s" % (e.args[0], e.args[1])
self.cursor = self.conn.cursor()
if not self.cursor:
raise(NameError, "Connect failure")
self.db_lock.release()
#?????
def get_connect(self):
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host, user=self.user, passwd=self.passwd, charset=self.charset)
except MySQLdb.Error, e:
print str(datetime.datetime.now()).split(".")[0], "ERROR %d: %s" % (e.args[0], e.args[1])
self.cursor = self.conn.cursor()
if not self.cursor:
raise(NameError, "Connect failure")
self.db_lock.release()
#?????
def get_connect(self):
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host, user=self.user, passwd=self.passwd, charset=self.charset)
except MySQLdb.Error, e:
print str(datetime.datetime.now()).split(".")[0], "ERROR %d: %s" % (e.args[0], e.args[1])
self.cursor = self.conn.cursor()
if not self.cursor:
raise(NameError, "Connect failure")
self.db_lock.release()
# ?????
def get_connect(self):
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host, user=self.user, passwd=self.passwd, charset=self.charset)
except MySQLdb.Error, e:
print str(datetime.datetime.now()).split(".")[0], "ERROR %d: %s" % (e.args[0], e.args[1])
self.cursor = self.conn.cursor()
if not self.cursor:
raise(NameError, "Connect failure")
self.db_lock.release()
# ?????
def saved(res, database='1', db_name='test', username='root', passwd='a8416459'): # ??????,mysql?mongodb
if database == '1':
try:
conn = MySQLdb.Connection(
user=username, passwd=passwd, db=db_name, use_unicode=True)
cursor = conn.cursor() # ??cursor
cursor.execute('drop table urls') # ??????varchar(10)
cursor.execute(
'create table if not exists urls(ID tinyint primary key,URL varchar(50),CTIME timestamp not null default current_timestamp)')
k = 0
for i in res:
k += 1
# ???%s?????,???int??
cursor.execute(
"insert into urls (ID,URL) values (%s,%s)", [str(k), i])
cursor.execute('select * from urls')
return True
except Exception, e:
print e
return False
finally:
cursor.close()
conn.close()
else: # ??mongodb
ISOTIMEFORMAT = '%Y-%m-%d %X'
try:
conn = pymongo.MongoClient(host='localhost', port=27017)
db = conn[db_name]
coll = db['urls']
# coll.drop() #???????url??
coll.remove() # ??url??????
for i in res:
coll.insert({'URL': i, 'CTIME': time.strftime(
ISOTIMEFORMAT, time.localtime(time.time()))})
except Exception, e:
print e
finally:
conn.close()
return True
def get_connect(self):
if self.db_lock.acquire():
try:
self.conn = MySQLdb.Connection(
host=self.host, user=self.user, passwd=self.passwd, charset=self.charset)
except MySQLdb.Error, e:
print str(datetime.datetime.now()).split(".")[0], "ERROR %d: %s" % (e.args[0], e.args[1])
self.cursor = self.conn.cursor()
if not self.cursor:
raise(NameError, "Connect failure")
self.db_lock.release()
# ?????