Python MySQLdb 模块,connect() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用MySQLdb.connect()。
def checkUserDbGrants(self, username):
if self.cursor is None:
self.connect()
self.cursor.execute("select Drop_priv, Grant_priv, References_priv,Create_tmp_table_priv,"
"Lock_tables_priv,Create_routine_priv,Alter_routine_priv,Execute_priv,"
"Event_priv,Trigger_priv from mysql.db "
"where User ='%s'" % username)
res = self.cursor.fetchall()
flag = 0
for row in res:
# print row
for key, value in row.items():
if value == 'Y':
flag = 1
LOg.log_warn('setting %s = %s is suggested!' % (key, 'N'))
if (flag == 0):
Log.log_pass('All the setting are approriate!')
def createDefCon(self):
try:
host = "127.0.0.1"
port = 3306 ### default mysql port, change if you know better
user = "krisk" ### def parameters
passwd = "kish" ### def parameters
db = "loginfo" ### connection.user_info contains the autho users
### Create a connection object, use it to create a cursor
con = MySQLdb.connect(host = host ,port = port , user = user,passwd = passwd ,db = db)
return con ### returns a connection object
except:
return 0;
####################################### Test connection #######################################################
def populate_store(self, store):
try:
connection = None
connection = MySQLdb.connect('localhost', 'annon', 'pass')
cursor = connection.cursor()
cursor.execute("Select * From `INFORMATION_SCHEMA`.`SCHEMATA`")
rows = cursor.fetchall()
for row in rows:
store.append([row[0], row[1], row[2], row[3]])
except MySQLdb.Error, e:
store.append([str(e.args[0]), e.args[1], '', ''])
finally:
if connection != None:
connection.close()
def getdbconnection(configsection, as_dict=False, defaultargs={}):
""" Helper function that gets a database connection object from a config dictionary """
dbengine = configsection['dbengine'] if 'dbengine' in configsection else 'MySQLDB'
if dbengine == 'MySQLDB':
dbargs = defaultargs.copy()
# Convert the external configuration names to python PEP-249 config names
translate = {"host": "host",
"defaultsfile": "read_default_file",
"user": "user",
"pass": "passwd",
"port": "port"}
for confval, myval in translate.iteritems():
if confval in configsection:
dbargs[myval] = configsection[confval]
if as_dict:
dbargs['cursorclass'] = MySQLdb.cursors.DictCursor
return MySQLdb.connect(**dbargs)
else:
raise Exception("Unsupported database engine %s" % (dbengine))
def new_connection(conn_argkw, conv=None, options=None):
# useful arg could be added in future.:
# conn_argkw.init_command
conv = conv or {}
options = options or {}
opt = {
'autocommit': 1,
}
opt.update(options)
conn = MySQLdb.connect(conv=conv, **conn_argkw)
for k, v in opt.items():
conn.query('set {k}={v}'.format(k=k, v=v))
return conn
def cursor(self, autocommit=False, readonly=False):
conv = MySQLdb.converters.conversions.copy()
conv[float] = lambda value, _: repr(value)
conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None
args = {
'db': self.database_name,
'sql_mode': 'traditional,postgresql',
'use_unicode': True,
'charset': 'utf8',
'conv': conv,
}
uri = parse_uri(config.get('database', 'uri'))
assert uri.scheme == 'mysql'
if uri.hostname:
args['host'] = uri.hostname
if uri.port:
args['port'] = uri.port
if uri.username:
args['user'] = uri.username
if uri.password:
args['passwd'] = urllib.unquote_plus(uri.password)
conn = MySQLdb.connect(**args)
cursor = Cursor(conn, self.database_name)
cursor.execute('SET time_zone = "+00:00"')
return cursor
def list(cursor):
now = time.time()
timeout = config.getint('session', 'timeout')
res = Database._list_cache
if res and abs(Database._list_cache_timestamp - now) < timeout:
return res
cursor.execute('SHOW DATABASES')
res = []
for db_name, in cursor.fetchall():
try:
database = Database(db_name).connect()
except Exception:
continue
cursor2 = database.cursor()
if cursor2.test():
res.append(db_name)
cursor2.close(close=True)
else:
cursor2.close()
database.close()
Database._list_cache = res
Database._list_cache_timestamp = now
return res
def AuthorizeMachineAndConnectDB(db_instance_name, project, username, database,
logfile):
"""This function authorizes a machine to connect to a DB.
Args:
db_instance_name: name of the DB instance.
project: name of the project in which the db_instance belongs to.
username: username that will be used to log in to the DB
database: name of the database to connect to
logfile: logfile for the gcloud command in shell
Returns:
Returns the connection after being connected to the DB.
"""
hostname = GetInstanceIP(db_instance_name, project)
password = utils.GetRandomPassword()
sh_utils.RunGCloudService(["users", "set-password", username, "%",
"--instance", db_instance_name, "--password",
password], project=project,
service="sql", logfile=logfile)
print "DB Usernames and passwords are set."
connection = MySQLdb.connect(host=hostname, user=username,
passwd=password, db=database)
return connection
def getImageName(_index):
global MAX
if _index > MAX:
return "-1"
conn = MySQLdb.connect(IP, DBUSERNAME, DBPWD, DBNAME)
cursor = conn.cursor()
sql = "select url from image where id = %d" % _index
cursor.execute(sql)
url = cursor.fetchone()
print url
if url != None:
url = str(url[0])
items = url.split('/')
if len(items) != 7:
return "#error#"
name = items[-3] + '/' + items[-2]
return name
else:
print "+++None return"
return "#error#"
conn.close()
def __init__(self,host,user,password,db,charset='utf8',logFile=None,color=True,debug=4):
'''
:param host: <class str|host name>
:param user: <class str|user name>
:param password: <class str|password>
:param db: <class str|database name>
:param charset: default='utf8' <class str>
:param logFile: default=None <class str>
:param color: default=True <class bool>
:param debug: default=4 <class int|0 NONE,1 [Error],2 [Error][WARING],3 [Error][WARING][INFO],4 ALL>
'''
self.logFile = logFile
self.color = color
self.debug = debug
self.success = 0
self.fail = 0
self.repeat = 0
self._conn = MYSQL.connect(host,user,password,db,charset=charset)
self._cursor = self._conn.cursor()
def get_data_from_mysql(host: str, username: str, password: str, db: str, sql: str):
""" Run SQL query and get data from MySQL table. """
db = MySQLdb.connect(host, username, password, db)
cursor = db.cursor(MySQLdb.cursors.DictCursor)
try:
cursor.execute(sql)
data = cursor.fetchall()
except Exception as e:
print("MySQL error %s: %s" % (e.args[0], e.args[1]))
data = None
db.close()
return data
def insertBooks(self, title, page, bookQuote):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
# insert data
cursor.execute("INSERT INTO books (Book_Title, Book_Page) VALUES (%s,%s)", (title, page))
# last inserted auto increment value
keyID = cursor.lastrowid
# print(keyID)
cursor.execute("INSERT INTO quotations (Quotation, Books_Book_ID) VALUES (%s, %s)", \
(bookQuote, keyID))
# commit transaction
conn.commit ()
# close cursor and connection
self.close(cursor, conn)
#------------------------------------------------------
def insertBooksExample(self):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
# insert hard-coded data
cursor.execute("INSERT INTO books (Book_Title, Book_Page) VALUES ('Design Patterns', 17)")
# last inserted auto increment value
keyID = cursor.lastrowid
print(keyID)
cursor.execute("INSERT INTO quotations (Quotation, Books_Book_ID) VALUES (%s, %s)", \
('Programming to an Interface, not an Implementation', keyID))
# commit transaction
conn.commit ()
# close cursor and connection
self.close(cursor, conn)
#------------------------------------------------------
def showBooks(self):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
# print results
cursor.execute("SELECT * FROM Books")
allBooks = cursor.fetchall()
print(allBooks)
# close cursor and connection
self.close(cursor, conn)
return allBooks
#------------------------------------------------------
def showData(self):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
# execute command
cursor.execute("SELECT * FROM books")
print(cursor.fetchall())
cursor.execute("SELECT * FROM quotations")
print(cursor.fetchall())
# close cursor and connection
self.close(cursor, conn)
#------------------------------------------------------
def showDataWithReturn(self):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
# execute command
cursor.execute("SELECT * FROM books")
booksData = cursor.fetchall()
cursor.execute("SELECT * FROM quotations")
quoteData = cursor.fetchall()
# close cursor and connection
self.close(cursor, conn)
# print(booksData, quoteData)
for record in quoteData:
print(record)
return booksData, quoteData
#------------------------------------------------------
def updateGOF(self):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
# execute command
cursor.execute("SELECT Book_ID FROM books WHERE Book_Title = 'Design Patterns'")
primKey = cursor.fetchall()[0][0]
print("Primary key=" + str(primKey))
cursor.execute("SELECT * FROM quotations WHERE Books_Book_ID = (%s)", (primKey,))
print(cursor.fetchall())
# close cursor and connection
self.close(cursor, conn)
#------------------------------------------------------
def deleteRecord(self):
# connect to MySQL
conn, cursor = self.connect()
self.useGuiDB(cursor)
try:
# execute command
cursor.execute("SELECT Book_ID FROM books WHERE Book_Title = 'Design Patterns'")
primKey = cursor.fetchall()[0][0]
# print(primKey)
cursor.execute("DELETE FROM books WHERE Book_ID = (%s)", (primKey,))
# commit transaction
conn.commit ()
except:
pass
# close cursor and connection
self.close(cursor, conn)
#==========================================================
def create_table(self):
sql = """
CREATE TABLE """ + self._table_name + """ (
id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
read_timestamp TIMESTAMP NOT NULL DEFAULT "0000-00-00 00:00:00",
write_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
ost CHAR(7) NOT NULL,
ip CHAR(15) NOT NULL,
size BIGINT(20) UNSIGNED NOT NULL,
read_throughput BIGINT(20) SIGNED NOT NULL,
write_throughput BIGINT(20) SIGNED NOT NULL,
read_duration INT(10) SIGNED NOT NULL,
write_duration INT(10) SIGNED NOT NULL,
PRIMARY KEY (id)
) ENGINE=MyISAM DEFAULT CHARSET=latin1
"""
logging.debug("Creating database table:\n" + sql)
with closing(MySQLdb.connect(host=self._host, user=self._user, passwd=self._passwd, db=self._db)) as conn:
with closing(conn.cursor()) as cur:
cur.execute(sql)
def query_single(_db_config, _sql, _args):
config = _db_config
conn = MySQLdb.connect(host=config['host'], port=config['port'], user=config['user'], passwd=config['passwd'],
db=config['db'], charset=config['charset'], use_unicode=True)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
result = ()
try:
cursor.execute(_sql, _args)
result = cursor.fetchall()
except:
pass
rootLogger.error("query exception sql is %s ,_args is %s,stacks is %s", _sql, _args, get_caller_info_total())
rootLogger.exception("message")
finally:
cursor.close()
conn.close()
return result
# ===============================================
# FUNCTION ??????
# ===============================================
def insertOrUpdate_getId(_db_config, _sql, _args):
result = 0
id = 0
config = _db_config
conn = MySQLdb.connect(host=config['host'], port=config['port'], user=config['user'], passwd=config['passwd'],
db=config['db'], charset=config['charset'], use_unicode=True)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
try:
cursor.execute(_sql, _args)
id = conn.insert_id()
conn.commit()
result = cursor.rowcount
except:
pass
rootLogger.error("exception sql is %s ,_args is %s", _sql, _args)
rootLogger.exception("message")
conn.rollback()
finally:
print("affected rows = {}".format(cursor.rowcount))
cursor.close()
conn.close()
return result, id
def __init__(self, db_name):
if db_name == 'wiki':
kb_name = 'wiki_kb'
elif db_name == 'baidu':
kb_name = 'kb'
elif db_name == 'hudong':
kb_name = 'hudong_kb'
self.db = MySQLdb.connect(host='192.168.1.104',user="root",passwd="",db=kb_name ,charset="utf8", cursorclass=cursors.SSCursor)
self.cur = self.db.cursor()
self.db_titles = []
self.db_disambiguations = []
print 'db is loading......'
if os.path.exists('title_disambiguation.data'):
with open('title_disambiguation.data','r') as f:
self.db_titles, self.db_disambiguations = pickle.load(f)
else:
self.load_save_db()
print 'db has been loaded !'
def __init__(self, db_name):
if db_name == 'wiki':
kb_name = 'wiki_kb'
elif db_name == 'baidu':
kb_name = 'kb'
elif db_name == 'hudong':
kb_name = 'hudong_kb'
self.db = MySQLdb.connect(host='192.168.1.104',user="root",passwd="",db=kb_name ,charset="utf8", cursorclass=cursors.SSCursor)
self.cur = self.db.cursor()
self.db_titles = []
self.db_disambiguations = []
print 'db is loading......'
if os.path.exists('title_disambiguation.data'):
with open('title_disambiguation.data','r') as f:
self.db_titles, self.db_disambiguations = pickle.load(f)
else:
self.load_save_db()
print 'db has been loaded !'
def search_ID_in_list_and_connect(search_ID):
CUR.execute("SELECT Id, Ip, DeviceName, Session FROM %s" % line[7][:-1])
row = CUR.fetchone()
while row is not None:
rows = {row[0] : row[1]}
type_protocol = row[3]
row = CUR.fetchone()
for key in rows.keys():
if search_ID == key:
rb1 = str(rows.values())
ip_device = rb1[2:-2]
# verify where you try to connect
print 'Connect to: ' + ip_device
# condition which protocol use, view in coloumn session in db
if 'ssh' in type_protocol:
proto_connections.ssh_connections(ip_device)
else:
proto_connections.telnet_connections(ip_device)
CUR.close()
CONN.close()
# main function
def main():
banner()
# search device but is empty return device not found!
try:
userdate = raw_input('Search: ')
while len(userdate) <= 0:
userdate = raw_input('Search: ')
search(userdate)
# search ID for connect but if empty exit
global_ItemFound
search_ID = input('Connect to ID: ')
search_ID_in_list_and_connect(search_ID)
# manage exception KeyboardError ctrl+c and NameError
except KeyboardInterrupt, e:
print "\nexit"
except NameError, e:
print "%s, not found !" % userdate
except EOFError, e:
print "\nEOFError, exit"
except SyntaxError, e:
print "\nSyntaxError, exit"
def execute(self,query,doCommit=False):
'''exec query insert or delete'''
retCount = 0
#self.logger.debug(query)
try:
cursor = self.conn.cursor()
retCount = cursor.execute(query)
except Exception:
#self.logger.debug("query : {0}".format(query))
self.connect()
if self.conn :
cursor = self.conn.cursor()
retCount = cursor.execute(query)
if doCommit :
#self.logger.debug("before commit")
self.commit()
#self.logger.debug("after commit")
return retCount
def execWithRet(self,query):
'''exec query '''
retList = []
try :
cursor = self.conn.cursor()
count = cursor.execute(query)
retList = cursor.fetchmany(count)
except Exception:
#self.logger.debug("query : {0}".format(query))
self.connect()
if self.conn :
cursor = self.conn.cursor()
count = cursor.execute(query)
retList = cursor.fetchmany(count)
return retList
def __init__(self):
self.con = mdb.connect(host=config.mysql_host, user=config.mysql_user, passwd=config.mysql_passwd, db=config.mysql_db, charset=config.mysql_charset)
self.cur = self.con.cursor(mdb.cursors.DictCursor)
self.fromlist = []
self.wherelist = []
self.deletelist = []
self.selectlist = []
self.orderlist = []
self.limitlist = []
self.updatelist = []
self.inslist = []
self.leftjoinlist = []
self.selectas = ''
self.selectas_b = ''
self.sql = ''
self.withtag = False
def __init__(self):
env = C.get('sys', 'env')
host = C.get('mysql_' + env, 'host')
user = C.get('mysql_' + env, 'user')
passwd = C.get('mysql_' + env, 'password')
name = C.get('mysql_' + env, 'name')
self.db = MySQLdb.connect(
host = host,
port = 3306,
user = user,
passwd = passwd,
db = name,
charset='utf8',
cursorclass = MySQLdb.cursors.DictCursor)
self.cursor = self.db.cursor()
def bannerread(host,port):
try:
if port==80 or port==8443 or port==2089 or port==10000:
con = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
con.connect((host,port))
con.send("GET / HTTP/1.1\r\n\r\n\r\n")
data = (con.recv(200))
dbcon(host,port,data)
return(data)
elif port==53:
pkt = IP(dst=host)/UDP(dport=port,sport=RandShort())/DNS(aa=0L, qr=0L, qd=DNSQR(qclass=3, qtype=16, qname='version.bind.'))
x = sr1(pkt)
ban = x[DNS].summary()
dbcon(host,port,data)
return(ban)
else:
conexion = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
conexion.settimeout(1.0) #Timeout - socket non-blocking
conexion.connect((host,port))
banner = conexion.recv(1024)
dbcon(host,port,banner)
return(banner)
except:
return("No Banner")
def CheckConn(self,port):
retry_num = 0
while True:
try:
local_conn = MySQLdb.connect(host='127.0.0.1', user=mysql_user, passwd=mysql_password, port=int(port), db='',charset="utf8")
local_conn.cursor()
local_conn.close()
state = True
break
except MySQLdb.Error,e:
logging.error(e)
state = None
retry_num += 1
time.sleep(1)
if retry_num >= 3:
break
return state
def sabo_db_connect(db_conf):
ip = db_conf["db_addr"]
port = db_conf["db_port"]
user = db_conf["db_user"]
passwd = db_conf["db_passwd"]
db = db_conf["db_name"]
charset = "utf8"
for i in range(db_conf["db_retry"]):
try:
mysql_db = MySQLdb.connect(host=ip, port=port, user=user, passwd=passwd, db=db, charset=charset)
except Exception as e:
sabo_error_log("error", "Connect to mysql failed: {0}".format(e))
else:
return mysql_db
# if there are some tasks didn't be judged,
# then the process will be blocked until the queue is empty.
def execute_sql(sql):
connection, cursor = None, None
try:
connection = MySQLdb.connect(host=settings.inception_host,
user=settings.inception_user,
passwd=settings.inception_password,
port=settings.inception_port,
use_unicode=True, charset="utf8", connect_timeout=2)
cursor = connection.cursor()
cursor.execute(sql)
return cursor.fetchall()
finally:
if (cursor is not None):
cursor.close()
if (connection is not None):
connection.close()
return []
def sql_init(ip,username,password,cmd):
try:
client = paramiko.SSHClient()
# Default accept unknown keys
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect
client.connect(ip, port=22, username=username, password=password,timeout="20s")
# Execute shell remotely
stdin, stdout, stderr = client.exec_command(cmd)
stdout.read()
print stdout
client.close()
#sftp = client.open_sftp()
# Make a dir
#sftp.mkdir('/home/testssh')
# Down file from remote to local ????????????????
#sftp.get('firewall.sh', '/tmp/firewall.sh')
# Upload file from local to remote ????????
#sftp.put('D:/iso/auto.sh', '/home/testssh/auto.sh')
except Exception, e:
print 'authentication failed or execute commands failed:',e
#dropdatabase("172.30.121.54","webui","123456","drop database frontend")
#sql_init("172.30.121.54", "root", "teamsun", "mysql -uwebui -p123456 < /etc/frontend/frontend.sql")
def DepositSampleInfo(self):
########################
# Sample_info database #
########################
# Insert into DB
db = MySQLdb.connect(host="mbitdb1.cwnerb5rh4qg.us-east-1.rds.amazonaws.com", # your host, usually localhost
user="cmitaws", # your username
passwd="microbiome", # your password
db="Sample_info") # name of the data base
insert_OTU = ("INSERT INTO turnbaugh_mouse_diet_2015 "
"(sample_ID, disease_labels, keywords)"
"VALUES (%(sample_ID)s, %(disease_labels)s, %(keywords)s)")
cursor = db.cursor()
# CREATE sample info table
mysql_cmd1 = "create table " + data_summary.datasetID + " (sampleID VARCHAR(20), disease_labels VARCHAR(30), keywords VARCHAR(200));"
# Deposit sample info
def connect(self):
if self.app.config['MYSQL_DATABASE_HOST']:
self.connect_args['host'] = self.app.config['MYSQL_DATABASE_HOST']
if self.app.config['MYSQL_DATABASE_PORT']:
self.connect_args['port'] = self.app.config['MYSQL_DATABASE_PORT']
if self.app.config['MYSQL_DATABASE_USER']:
self.connect_args['user'] = self.app.config['MYSQL_DATABASE_USER']
if self.app.config['MYSQL_DATABASE_PASSWORD']:
self.connect_args['passwd'] = self.app.config['MYSQL_DATABASE_PASSWORD']
if self.app.config['MYSQL_DATABASE_DB']:
self.connect_args['db'] = self.app.config['MYSQL_DATABASE_DB']
if self.app.config['MYSQL_DATABASE_CHARSET']:
self.connect_args['charset'] = self.app.config['MYSQL_DATABASE_CHARSET']
if self.app.config['MYSQL_USE_UNICODE']:
self.connect_args['use_unicode'] = self.app.config['MYSQL_USE_UNICODE']
return MySQLdb.connect(**self.connect_args)
def _connect(self):
"""Function connects to the database"""
logger.debug('Connecting to MySQL database.')
try:
if str(self.logsocket).lower() == 'tcp':
self.connection = MySQLdb.connect(
host=self.host,
port=self.port,
user=self.username,
passwd=self.passphrase,
db=self.db)
elif str(self.logsocket).lower() == 'dev':
self.connection = MySQLdb.connect(
unix_socket=self.logdevice,
user=self.username,
passwd=self.passphrase,
db=self.db)
self._create_database()
except (AttributeError, MySQLdb.OperationalError):
logger.exception('Exception: Cannot connect to database.')
def require_db_connection(f):
@wraps(f)
def decorated_function(*args, **kwargs):
####?????
if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None:
print 'has db connect, do nothing'
else:
(g.conn, g.cursor) = _connect_db()
print 'create new db connect'
#????
func = f(*args, **kwargs)
###???????
if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None:
g.cursor.close()
g.cursor = None
g.conn.close()
g.conn = None
print 'close db connect'
else:
print 'no db connect, no need to close...'
return func
return decorated_function
def __connect(self, db=None):
if db or self.db:
try:
if db:
self.conn = mdb.connect(self.host, self.user, self.passwd, db)
else:
self.conn = mdb.connect(self.host, self.user, self.passwd, self.db, charset='utf8')
except Exception, e:
print e
return
else:
try:
self.conn = mdb.connect(self.host, self.user, self.passwd)
except Exception, e:
print e
return
self.cursor = self.conn.cursor()
#self.cursor = mdb.cursors.DictCursor(self.conn)
def checkTrafficTable(self,table):
found = 0
db = MySQLdb.connect("localhost","root","toor", self.VEHICLE_NAME )
# prepare a cursor object using cursor() method
cursor = db.cursor()
try:
# Check if the tags database exists
cursor.execute("SHOW TABLES LIKE '%s';" % table)
results = cursor.fetchall()
for row in results:
if(row[0] == table):
found = 1
# Commit your changes in the database
db.commit()
except:
# Rollback in case there is any error
db.rollback()
db.close()
return(found)
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'ty158917', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def connect():
try:
log('I', 'db', 'Establishing connection...')
db = dblib.connect(host='localhost',
user=DB['username'],
passwd=DB['password'],
db=DB['database'],
use_unicode=1,
charset='utf8')
return db
except:
log('E', 'db', 'Could not establish connection')
print_tb()
def connect():
try:
log('I', 'db', 'Establishing connection...')
db = dblib.connect(host='localhost',
user=DB['username'],
passwd=DB['password'],
db=DB['database'],
use_unicode=1,
charset='utf8')
return db
except:
log('E', 'db', 'Could not establish connection')
print_tb()
def __init__(self):
self.u = os.environ['htpuser']
self.p = os.environ['htppass']
self.d = "htp"
self.con = MySQLdb.connect(host = 'localhost',
user = self.u,
passwd = self.p,
db = self.d)
self.cursor = self.con.cursor()
# search database and return search values
def __init__(self):
self.conn = mysql.connect(
host=db_conf['host'],
port=int(db_conf['port']),
user=db_conf['user'],
passwd=db_conf['passwd'],
db=db_conf['db'],
charset=db_conf['charset']
)
self.conn.autocommit(True)
self.cursor = self.conn.cursor()
def __init__(self):
self.conn = mysql.connect(
host=db_conf['host'],
port=int(db_conf['port']),
user=db_conf['user'],
passwd=db_conf['passwd'],
db=db_conf['db'],
charset=db_conf['charset']
)
self.conn.autocommit(True)
self.cursor = self.conn.cursor()
def getDB(connType):
if connType == DBTYPE_MYSQL:
conn = MySQLdb.connect (host = DBHOST, user = DBUSER, passwd = DBPASS, db = DBNAME)
else:
conn = sqlite3.connect ("{}.db".format(DBNAME))
return conn
def mysql_from_url(url):
parsed = urlparse(url)
assert parsed.scheme == 'mysql', (
'MySQL url must be of the form mysql://hostname:port/db'
)
return MySQLdb.connect(
user=parsed.username or 'root',
passwd=parsed.password or '',
host=parsed.hostname,
port=parsed.port or 3306,
db=parsed.path.strip('/'),
)