我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用MySQLdb.connect()。
def checkUserDbGrants(self, username): if self.cursor is None: self.connect() self.cursor.execute("select Drop_priv, Grant_priv, References_priv,Create_tmp_table_priv," "Lock_tables_priv,Create_routine_priv,Alter_routine_priv,Execute_priv," "Event_priv,Trigger_priv from mysql.db " "where User ='%s'" % username) res = self.cursor.fetchall() flag = 0 for row in res: # print row for key, value in row.items(): if value == 'Y': flag = 1 LOg.log_warn('setting %s = %s is suggested!' % (key, 'N')) if (flag == 0): Log.log_pass('All the setting are approriate!')
def createDefCon(self): try: host = "127.0.0.1" port = 3306 ### default mysql port, change if you know better user = "krisk" ### def parameters passwd = "kish" ### def parameters db = "loginfo" ### connection.user_info contains the autho users ### Create a connection object, use it to create a cursor con = MySQLdb.connect(host = host ,port = port , user = user,passwd = passwd ,db = db) return con ### returns a connection object except: return 0; ####################################### Test connection #######################################################
def populate_store(self, store): try: connection = None connection = MySQLdb.connect('localhost', 'annon', 'pass') cursor = connection.cursor() cursor.execute("Select * From `INFORMATION_SCHEMA`.`SCHEMATA`") rows = cursor.fetchall() for row in rows: store.append([row[0], row[1], row[2], row[3]]) except MySQLdb.Error, e: store.append([str(e.args[0]), e.args[1], '', '']) finally: if connection != None: connection.close()
def getdbconnection(configsection, as_dict=False, defaultargs={}): """ Helper function that gets a database connection object from a config dictionary """ dbengine = configsection['dbengine'] if 'dbengine' in configsection else 'MySQLDB' if dbengine == 'MySQLDB': dbargs = defaultargs.copy() # Convert the external configuration names to python PEP-249 config names translate = {"host": "host", "defaultsfile": "read_default_file", "user": "user", "pass": "passwd", "port": "port"} for confval, myval in translate.iteritems(): if confval in configsection: dbargs[myval] = configsection[confval] if as_dict: dbargs['cursorclass'] = MySQLdb.cursors.DictCursor return MySQLdb.connect(**dbargs) else: raise Exception("Unsupported database engine %s" % (dbengine))
def new_connection(conn_argkw, conv=None, options=None): # useful arg could be added in future.: # conn_argkw.init_command conv = conv or {} options = options or {} opt = { 'autocommit': 1, } opt.update(options) conn = MySQLdb.connect(conv=conv, **conn_argkw) for k, v in opt.items(): conn.query('set {k}={v}'.format(k=k, v=v)) return conn
def cursor(self, autocommit=False, readonly=False): conv = MySQLdb.converters.conversions.copy() conv[float] = lambda value, _: repr(value) conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None args = { 'db': self.database_name, 'sql_mode': 'traditional,postgresql', 'use_unicode': True, 'charset': 'utf8', 'conv': conv, } uri = parse_uri(config.get('database', 'uri')) assert uri.scheme == 'mysql' if uri.hostname: args['host'] = uri.hostname if uri.port: args['port'] = uri.port if uri.username: args['user'] = uri.username if uri.password: args['passwd'] = urllib.unquote_plus(uri.password) conn = MySQLdb.connect(**args) cursor = Cursor(conn, self.database_name) cursor.execute('SET time_zone = "+00:00"') return cursor
def list(cursor): now = time.time() timeout = config.getint('session', 'timeout') res = Database._list_cache if res and abs(Database._list_cache_timestamp - now) < timeout: return res cursor.execute('SHOW DATABASES') res = [] for db_name, in cursor.fetchall(): try: database = Database(db_name).connect() except Exception: continue cursor2 = database.cursor() if cursor2.test(): res.append(db_name) cursor2.close(close=True) else: cursor2.close() database.close() Database._list_cache = res Database._list_cache_timestamp = now return res
def AuthorizeMachineAndConnectDB(db_instance_name, project, username, database, logfile): """This function authorizes a machine to connect to a DB. Args: db_instance_name: name of the DB instance. project: name of the project in which the db_instance belongs to. username: username that will be used to log in to the DB database: name of the database to connect to logfile: logfile for the gcloud command in shell Returns: Returns the connection after being connected to the DB. """ hostname = GetInstanceIP(db_instance_name, project) password = utils.GetRandomPassword() sh_utils.RunGCloudService(["users", "set-password", username, "%", "--instance", db_instance_name, "--password", password], project=project, service="sql", logfile=logfile) print "DB Usernames and passwords are set." connection = MySQLdb.connect(host=hostname, user=username, passwd=password, db=database) return connection
def getImageName(_index): global MAX if _index > MAX: return "-1" conn = MySQLdb.connect(IP, DBUSERNAME, DBPWD, DBNAME) cursor = conn.cursor() sql = "select url from image where id = %d" % _index cursor.execute(sql) url = cursor.fetchone() print url if url != None: url = str(url[0]) items = url.split('/') if len(items) != 7: return "#error#" name = items[-3] + '/' + items[-2] return name else: print "+++None return" return "#error#" conn.close()
def __init__(self,host,user,password,db,charset='utf8',logFile=None,color=True,debug=4): ''' :param host: <class str|host name> :param user: <class str|user name> :param password: <class str|password> :param db: <class str|database name> :param charset: default='utf8' <class str> :param logFile: default=None <class str> :param color: default=True <class bool> :param debug: default=4 <class int|0 NONE,1 [Error],2 [Error][WARING],3 [Error][WARING][INFO],4 ALL> ''' self.logFile = logFile self.color = color self.debug = debug self.success = 0 self.fail = 0 self.repeat = 0 self._conn = MYSQL.connect(host,user,password,db,charset=charset) self._cursor = self._conn.cursor()
def get_data_from_mysql(host: str, username: str, password: str, db: str, sql: str): """ Run SQL query and get data from MySQL table. """ db = MySQLdb.connect(host, username, password, db) cursor = db.cursor(MySQLdb.cursors.DictCursor) try: cursor.execute(sql) data = cursor.fetchall() except Exception as e: print("MySQL error %s: %s" % (e.args[0], e.args[1])) data = None db.close() return data
def insertBooks(self, title, page, bookQuote): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) # insert data cursor.execute("INSERT INTO books (Book_Title, Book_Page) VALUES (%s,%s)", (title, page)) # last inserted auto increment value keyID = cursor.lastrowid # print(keyID) cursor.execute("INSERT INTO quotations (Quotation, Books_Book_ID) VALUES (%s, %s)", \ (bookQuote, keyID)) # commit transaction conn.commit () # close cursor and connection self.close(cursor, conn) #------------------------------------------------------
def insertBooksExample(self): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) # insert hard-coded data cursor.execute("INSERT INTO books (Book_Title, Book_Page) VALUES ('Design Patterns', 17)") # last inserted auto increment value keyID = cursor.lastrowid print(keyID) cursor.execute("INSERT INTO quotations (Quotation, Books_Book_ID) VALUES (%s, %s)", \ ('Programming to an Interface, not an Implementation', keyID)) # commit transaction conn.commit () # close cursor and connection self.close(cursor, conn) #------------------------------------------------------
def showBooks(self): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) # print results cursor.execute("SELECT * FROM Books") allBooks = cursor.fetchall() print(allBooks) # close cursor and connection self.close(cursor, conn) return allBooks #------------------------------------------------------
def showData(self): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) # execute command cursor.execute("SELECT * FROM books") print(cursor.fetchall()) cursor.execute("SELECT * FROM quotations") print(cursor.fetchall()) # close cursor and connection self.close(cursor, conn) #------------------------------------------------------
def showDataWithReturn(self): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) # execute command cursor.execute("SELECT * FROM books") booksData = cursor.fetchall() cursor.execute("SELECT * FROM quotations") quoteData = cursor.fetchall() # close cursor and connection self.close(cursor, conn) # print(booksData, quoteData) for record in quoteData: print(record) return booksData, quoteData #------------------------------------------------------
def updateGOF(self): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) # execute command cursor.execute("SELECT Book_ID FROM books WHERE Book_Title = 'Design Patterns'") primKey = cursor.fetchall()[0][0] print("Primary key=" + str(primKey)) cursor.execute("SELECT * FROM quotations WHERE Books_Book_ID = (%s)", (primKey,)) print(cursor.fetchall()) # close cursor and connection self.close(cursor, conn) #------------------------------------------------------
def deleteRecord(self): # connect to MySQL conn, cursor = self.connect() self.useGuiDB(cursor) try: # execute command cursor.execute("SELECT Book_ID FROM books WHERE Book_Title = 'Design Patterns'") primKey = cursor.fetchall()[0][0] # print(primKey) cursor.execute("DELETE FROM books WHERE Book_ID = (%s)", (primKey,)) # commit transaction conn.commit () except: pass # close cursor and connection self.close(cursor, conn) #==========================================================
def create_table(self): sql = """ CREATE TABLE """ + self._table_name + """ ( id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, read_timestamp TIMESTAMP NOT NULL DEFAULT "0000-00-00 00:00:00", write_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, ost CHAR(7) NOT NULL, ip CHAR(15) NOT NULL, size BIGINT(20) UNSIGNED NOT NULL, read_throughput BIGINT(20) SIGNED NOT NULL, write_throughput BIGINT(20) SIGNED NOT NULL, read_duration INT(10) SIGNED NOT NULL, write_duration INT(10) SIGNED NOT NULL, PRIMARY KEY (id) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 """ logging.debug("Creating database table:\n" + sql) with closing(MySQLdb.connect(host=self._host, user=self._user, passwd=self._passwd, db=self._db)) as conn: with closing(conn.cursor()) as cur: cur.execute(sql)
def query_single(_db_config, _sql, _args): config = _db_config conn = MySQLdb.connect(host=config['host'], port=config['port'], user=config['user'], passwd=config['passwd'], db=config['db'], charset=config['charset'], use_unicode=True) cursor = conn.cursor(MySQLdb.cursors.DictCursor) result = () try: cursor.execute(_sql, _args) result = cursor.fetchall() except: pass rootLogger.error("query exception sql is %s ,_args is %s,stacks is %s", _sql, _args, get_caller_info_total()) rootLogger.exception("message") finally: cursor.close() conn.close() return result # =============================================== # FUNCTION ?????? # ===============================================
def insertOrUpdate_getId(_db_config, _sql, _args): result = 0 id = 0 config = _db_config conn = MySQLdb.connect(host=config['host'], port=config['port'], user=config['user'], passwd=config['passwd'], db=config['db'], charset=config['charset'], use_unicode=True) cursor = conn.cursor(MySQLdb.cursors.DictCursor) try: cursor.execute(_sql, _args) id = conn.insert_id() conn.commit() result = cursor.rowcount except: pass rootLogger.error("exception sql is %s ,_args is %s", _sql, _args) rootLogger.exception("message") conn.rollback() finally: print("affected rows = {}".format(cursor.rowcount)) cursor.close() conn.close() return result, id
def __init__(self, db_name): if db_name == 'wiki': kb_name = 'wiki_kb' elif db_name == 'baidu': kb_name = 'kb' elif db_name == 'hudong': kb_name = 'hudong_kb' self.db = MySQLdb.connect(host='192.168.1.104',user="root",passwd="",db=kb_name ,charset="utf8", cursorclass=cursors.SSCursor) self.cur = self.db.cursor() self.db_titles = [] self.db_disambiguations = [] print 'db is loading......' if os.path.exists('title_disambiguation.data'): with open('title_disambiguation.data','r') as f: self.db_titles, self.db_disambiguations = pickle.load(f) else: self.load_save_db() print 'db has been loaded !'
def search_ID_in_list_and_connect(search_ID): CUR.execute("SELECT Id, Ip, DeviceName, Session FROM %s" % line[7][:-1]) row = CUR.fetchone() while row is not None: rows = {row[0] : row[1]} type_protocol = row[3] row = CUR.fetchone() for key in rows.keys(): if search_ID == key: rb1 = str(rows.values()) ip_device = rb1[2:-2] # verify where you try to connect print 'Connect to: ' + ip_device # condition which protocol use, view in coloumn session in db if 'ssh' in type_protocol: proto_connections.ssh_connections(ip_device) else: proto_connections.telnet_connections(ip_device) CUR.close() CONN.close() # main function
def main(): banner() # search device but is empty return device not found! try: userdate = raw_input('Search: ') while len(userdate) <= 0: userdate = raw_input('Search: ') search(userdate) # search ID for connect but if empty exit global_ItemFound search_ID = input('Connect to ID: ') search_ID_in_list_and_connect(search_ID) # manage exception KeyboardError ctrl+c and NameError except KeyboardInterrupt, e: print "\nexit" except NameError, e: print "%s, not found !" % userdate except EOFError, e: print "\nEOFError, exit" except SyntaxError, e: print "\nSyntaxError, exit"
def execute(self,query,doCommit=False): '''exec query insert or delete''' retCount = 0 #self.logger.debug(query) try: cursor = self.conn.cursor() retCount = cursor.execute(query) except Exception: #self.logger.debug("query : {0}".format(query)) self.connect() if self.conn : cursor = self.conn.cursor() retCount = cursor.execute(query) if doCommit : #self.logger.debug("before commit") self.commit() #self.logger.debug("after commit") return retCount
def execWithRet(self,query): '''exec query ''' retList = [] try : cursor = self.conn.cursor() count = cursor.execute(query) retList = cursor.fetchmany(count) except Exception: #self.logger.debug("query : {0}".format(query)) self.connect() if self.conn : cursor = self.conn.cursor() count = cursor.execute(query) retList = cursor.fetchmany(count) return retList
def __init__(self): self.con = mdb.connect(host=config.mysql_host, user=config.mysql_user, passwd=config.mysql_passwd, db=config.mysql_db, charset=config.mysql_charset) self.cur = self.con.cursor(mdb.cursors.DictCursor) self.fromlist = [] self.wherelist = [] self.deletelist = [] self.selectlist = [] self.orderlist = [] self.limitlist = [] self.updatelist = [] self.inslist = [] self.leftjoinlist = [] self.selectas = '' self.selectas_b = '' self.sql = '' self.withtag = False
def __init__(self): env = C.get('sys', 'env') host = C.get('mysql_' + env, 'host') user = C.get('mysql_' + env, 'user') passwd = C.get('mysql_' + env, 'password') name = C.get('mysql_' + env, 'name') self.db = MySQLdb.connect( host = host, port = 3306, user = user, passwd = passwd, db = name, charset='utf8', cursorclass = MySQLdb.cursors.DictCursor) self.cursor = self.db.cursor()
def bannerread(host,port): try: if port==80 or port==8443 or port==2089 or port==10000: con = socket.socket(socket.AF_INET,socket.SOCK_STREAM) con.connect((host,port)) con.send("GET / HTTP/1.1\r\n\r\n\r\n") data = (con.recv(200)) dbcon(host,port,data) return(data) elif port==53: pkt = IP(dst=host)/UDP(dport=port,sport=RandShort())/DNS(aa=0L, qr=0L, qd=DNSQR(qclass=3, qtype=16, qname='version.bind.')) x = sr1(pkt) ban = x[DNS].summary() dbcon(host,port,data) return(ban) else: conexion = socket.socket(socket.AF_INET,socket.SOCK_STREAM) conexion.settimeout(1.0) #Timeout - socket non-blocking conexion.connect((host,port)) banner = conexion.recv(1024) dbcon(host,port,banner) return(banner) except: return("No Banner")
def CheckConn(self,port): retry_num = 0 while True: try: local_conn = MySQLdb.connect(host='127.0.0.1', user=mysql_user, passwd=mysql_password, port=int(port), db='',charset="utf8") local_conn.cursor() local_conn.close() state = True break except MySQLdb.Error,e: logging.error(e) state = None retry_num += 1 time.sleep(1) if retry_num >= 3: break return state
def sabo_db_connect(db_conf): ip = db_conf["db_addr"] port = db_conf["db_port"] user = db_conf["db_user"] passwd = db_conf["db_passwd"] db = db_conf["db_name"] charset = "utf8" for i in range(db_conf["db_retry"]): try: mysql_db = MySQLdb.connect(host=ip, port=port, user=user, passwd=passwd, db=db, charset=charset) except Exception as e: sabo_error_log("error", "Connect to mysql failed: {0}".format(e)) else: return mysql_db # if there are some tasks didn't be judged, # then the process will be blocked until the queue is empty.
def execute_sql(sql): connection, cursor = None, None try: connection = MySQLdb.connect(host=settings.inception_host, user=settings.inception_user, passwd=settings.inception_password, port=settings.inception_port, use_unicode=True, charset="utf8", connect_timeout=2) cursor = connection.cursor() cursor.execute(sql) return cursor.fetchall() finally: if (cursor is not None): cursor.close() if (connection is not None): connection.close() return []
def sql_init(ip,username,password,cmd): try: client = paramiko.SSHClient() # Default accept unknown keys client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Connect client.connect(ip, port=22, username=username, password=password,timeout="20s") # Execute shell remotely stdin, stdout, stderr = client.exec_command(cmd) stdout.read() print stdout client.close() #sftp = client.open_sftp() # Make a dir #sftp.mkdir('/home/testssh') # Down file from remote to local ???????????????? #sftp.get('firewall.sh', '/tmp/firewall.sh') # Upload file from local to remote ???????? #sftp.put('D:/iso/auto.sh', '/home/testssh/auto.sh') except Exception, e: print 'authentication failed or execute commands failed:',e #dropdatabase("172.30.121.54","webui","123456","drop database frontend") #sql_init("172.30.121.54", "root", "teamsun", "mysql -uwebui -p123456 < /etc/frontend/frontend.sql")
def DepositSampleInfo(self): ######################## # Sample_info database # ######################## # Insert into DB db = MySQLdb.connect(host="mbitdb1.cwnerb5rh4qg.us-east-1.rds.amazonaws.com", # your host, usually localhost user="cmitaws", # your username passwd="microbiome", # your password db="Sample_info") # name of the data base insert_OTU = ("INSERT INTO turnbaugh_mouse_diet_2015 " "(sample_ID, disease_labels, keywords)" "VALUES (%(sample_ID)s, %(disease_labels)s, %(keywords)s)") cursor = db.cursor() # CREATE sample info table mysql_cmd1 = "create table " + data_summary.datasetID + " (sampleID VARCHAR(20), disease_labels VARCHAR(30), keywords VARCHAR(200));" # Deposit sample info
def connect(self): if self.app.config['MYSQL_DATABASE_HOST']: self.connect_args['host'] = self.app.config['MYSQL_DATABASE_HOST'] if self.app.config['MYSQL_DATABASE_PORT']: self.connect_args['port'] = self.app.config['MYSQL_DATABASE_PORT'] if self.app.config['MYSQL_DATABASE_USER']: self.connect_args['user'] = self.app.config['MYSQL_DATABASE_USER'] if self.app.config['MYSQL_DATABASE_PASSWORD']: self.connect_args['passwd'] = self.app.config['MYSQL_DATABASE_PASSWORD'] if self.app.config['MYSQL_DATABASE_DB']: self.connect_args['db'] = self.app.config['MYSQL_DATABASE_DB'] if self.app.config['MYSQL_DATABASE_CHARSET']: self.connect_args['charset'] = self.app.config['MYSQL_DATABASE_CHARSET'] if self.app.config['MYSQL_USE_UNICODE']: self.connect_args['use_unicode'] = self.app.config['MYSQL_USE_UNICODE'] return MySQLdb.connect(**self.connect_args)
def _connect(self): """Function connects to the database""" logger.debug('Connecting to MySQL database.') try: if str(self.logsocket).lower() == 'tcp': self.connection = MySQLdb.connect( host=self.host, port=self.port, user=self.username, passwd=self.passphrase, db=self.db) elif str(self.logsocket).lower() == 'dev': self.connection = MySQLdb.connect( unix_socket=self.logdevice, user=self.username, passwd=self.passphrase, db=self.db) self._create_database() except (AttributeError, MySQLdb.OperationalError): logger.exception('Exception: Cannot connect to database.')
def require_db_connection(f): @wraps(f) def decorated_function(*args, **kwargs): ####????? if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None: print 'has db connect, do nothing' else: (g.conn, g.cursor) = _connect_db() print 'create new db connect' #???? func = f(*args, **kwargs) ###??????? if hasattr(g, 'conn') and g.conn != None and hasattr(g, 'cursor') and g.cursor != None: g.cursor.close() g.cursor = None g.conn.close() g.conn = None print 'close db connect' else: print 'no db connect, no need to close...' return func return decorated_function
def __connect(self, db=None): if db or self.db: try: if db: self.conn = mdb.connect(self.host, self.user, self.passwd, db) else: self.conn = mdb.connect(self.host, self.user, self.passwd, self.db, charset='utf8') except Exception, e: print e return else: try: self.conn = mdb.connect(self.host, self.user, self.passwd) except Exception, e: print e return self.cursor = self.conn.cursor() #self.cursor = mdb.cursors.DictCursor(self.conn)
def checkTrafficTable(self,table): found = 0 db = MySQLdb.connect("localhost","root","toor", self.VEHICLE_NAME ) # prepare a cursor object using cursor() method cursor = db.cursor() try: # Check if the tags database exists cursor.execute("SHOW TABLES LIKE '%s';" % table) results = cursor.fetchall() for row in results: if(row[0] == table): found = 1 # Commit your changes in the database db.commit() except: # Rollback in case there is any error db.rollback() db.close() return(found)
def __init__(self): self.conn = MySQLdb.connect('127.0.0.1', 'root', 'ty158917', 'article_spider', charset="utf8", use_unicode=True) self.cursor = self.conn.cursor()
def connect(): try: log('I', 'db', 'Establishing connection...') db = dblib.connect(host='localhost', user=DB['username'], passwd=DB['password'], db=DB['database'], use_unicode=1, charset='utf8') return db except: log('E', 'db', 'Could not establish connection') print_tb()
def __init__(self): self.u = os.environ['htpuser'] self.p = os.environ['htppass'] self.d = "htp" self.con = MySQLdb.connect(host = 'localhost', user = self.u, passwd = self.p, db = self.d) self.cursor = self.con.cursor() # search database and return search values
def __init__(self): self.conn = mysql.connect( host=db_conf['host'], port=int(db_conf['port']), user=db_conf['user'], passwd=db_conf['passwd'], db=db_conf['db'], charset=db_conf['charset'] ) self.conn.autocommit(True) self.cursor = self.conn.cursor()
def getDB(connType): if connType == DBTYPE_MYSQL: conn = MySQLdb.connect (host = DBHOST, user = DBUSER, passwd = DBPASS, db = DBNAME) else: conn = sqlite3.connect ("{}.db".format(DBNAME)) return conn
def mysql_from_url(url): parsed = urlparse(url) assert parsed.scheme == 'mysql', ( 'MySQL url must be of the form mysql://hostname:port/db' ) return MySQLdb.connect( user=parsed.username or 'root', passwd=parsed.password or '', host=parsed.hostname, port=parsed.port or 3306, db=parsed.path.strip('/'), )
def __enter__(self): self.connect() return self
def connect(self): self.db = sqldb.connect(db=self.db_name, user=self.user, host=self.host, passwd=self.passwd) self.cur = self.db.cursor()