我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用MySQLdb.Connect()。
def getConnection(self): return MySQLdb.Connect( host=self.DB_HOST, port=self.DB_PORT, user=self.DB_USER, passwd=self.DB_PWD, db=self.DB_NAME, charset='utf8' )
def connect(self): conn = None try: conn = MySQLdb.Connect( host=self.hostname, port=self.port, user=self.username, password=self.password, db=self.db) except Exception as e: print("MySQL Connection error : {0}".format(e)) return conn
def connect(self): conn = None try: conn = MySQLdb.Connect(host=self.hostname, port=self.port, user=self.username, password=self.password, db=self.db) except Exception as e: print("MySQL Connection error : {0}".format(e)) return conn
def getConn(conninfo): host = conninfo['host'] user = conninfo['user'] password = conninfo['password'] port = conninfo['port'] sock = conninfo['sock'] db = conninfo['db'] # print host, user, password, port, db try: conn = MySQLdb.Connect(host=host, user=user, passwd=password, port=port, unix_socket=sock, db=db, charset='utf8') except Exception as e: print e sys.exit() return conn
def db_reconnect(db_user, db_id): db_pass = settings.DB_AUTH[db_user] pc = prpcryptec.prpcrypt(KEY_DB_AUTH) db_instance = get_setttings("db_info", db_id) db_host, db_port = db_instance.replace(' ', '').split(':') db_conn = None while not db_conn: try: logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s", db_id, db_host, db_user, db_port) db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port), connect_timeout=5, use_unicode=False) except MySQLdb.Error, e: if e.args[0] in (2013, 2003): logger.critical('Error %d: %s', e.args[0], e.args[1]) logger.warn("Reconnect Database %s: host='%s', user='%s, port=%s", db_id, db_host, db_user, db_port) db_conn = MySQLdb.Connect(host=db_host, user=db_user, passwd=pc.decrypt(db_pass), port=int(db_port), connect_timeout=5, use_unicode=False) except Exception as inst: print "Error %s %s" % type(inst), inst.args.__str__() time.sleep(10) return db_conn # judge this thread meet kill_opt or not
def __init__(self): self.__cfgReporting = Config(os.path.join(RAGPICKER_ROOT, 'config', 'reporting.conf')) self.__mysqlEnabled = self.__cfgReporting.getOption("mysql", "enabled") if self.__mysqlEnabled: #Anbindung an Datenbank MySQL herstellen try: mysqlHost = self.__cfgReporting.getOption("mysql", "host") mysqlPort = self.__cfgReporting.getOption("mysql", "port") mysqlDatabase = self.__cfgReporting.getOption("mysql", "database") mysqlUser = self.__cfgReporting.getOption("mysql", "user") mysqlPassword = self.__cfgReporting.getOption("mysql", "password") self.__mysqlConnection = MySQLdb.Connect(host=mysqlHost, port=mysqlPort, db=mysqlDatabase, user=mysqlUser, passwd=mysqlPassword) except (Exception) as e: raise Exception("Cannot connect to MySQL (ragpicker): %s" % e)
def default_get_mysql_connection( user_name, user_pass, socket, dbname='', timeout=60, connect_timeout=10, charset=None): """ Default method for connection to a MySQL instance. You can override this behaviour by define/import in cli.py and pass it to Payload at instantiation time. The function should return a valid Connection object just as MySQLdb.Connect does. """ connection_config = { 'user': user_name, 'passwd': user_pass, 'unix_socket': socket, 'db': dbname, 'use_unicode': True, 'connect_timeout': connect_timeout } if charset: connection_config['charset'] = charset dbh = MySQLdb.Connect(**connection_config) dbh.autocommit(True) if timeout: cursor = dbh.cursor() cursor.execute("SET SESSION WAIT_TIMEOUT = %s", (timeout,)) return dbh
def c2db(self): self.conn = MySQLdb.Connect( host = self.host, port = self.port, user = self.user, passwd = self.passwd, db = self.db, ) self.cur = self.conn.cursor()
def query_table_info(*dbinfo): sql_str = """ SELECT IFNULL(@@hostname, @@server_id) SERVER_NAME, %s as HOST, t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_ROWS, t.DATA_LENGTH, t.INDEX_LENGTH, t.AUTO_INCREMENT, c.COLUMN_NAME, c.DATA_TYPE, LOCATE('unsigned', c.COLUMN_TYPE) COL_UNSIGNED # CONCAT(c.DATA_TYPE, IF(LOCATE('unsigned', c.COLUMN_TYPE)=0, '', '_unsigned')) FROM information_schema.`TABLES` t LEFT JOIN information_schema.`COLUMNS` c ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME AND c.EXTRA = 'auto_increment' WHERE t.TABLE_SCHEMA NOT IN ( 'mysql', 'information_schema', 'performance_schema', 'sys' ) AND t.TABLE_TYPE = 'BASE TABLE' """ dbinfo_str = "%s:%d" % (dbinfo[0], dbinfo[1]) rs = ({},) try: db_conn = MySQLdb.Connect(host=dbinfo[0], port=dbinfo[1], user=dbinfo[2], passwd=dbinfo[3], connect_timeout=5) cur = db_conn.cursor(MySQLdb.cursors.DictCursor) param = (dbinfo_str,) print "\n[%s] Get schema info from db: '%s'..." % (datetime.today(), dbinfo_str) cur.execute(sql_str, param) rs = cur.fetchall() except MySQLdb.Error, e: print "Error[%d]: %s (%s)" % (e.args[0], e.args[1], dbinfo_str) exit(-1) if InfluxDB_INFO is not None: print "Write '%s' schema table info to influxdb ..." % dbinfo_str write_influxdb(*rs) else: print rs