Python pymysql 模块,cursors() 实例源码
我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用pymysql.cursors()。
def execute_cursor(self, S, server_side=False):
def dict_factory(cursor, row):
d = {}
for i, column in enumerate(cursor.description):
d[column[0]] = row[i]
return d
S = S.strip()
if options.cfg.explain_queries:
self.explain(S)
logger.debug(S)
if self.db_type == SQL_MYSQL:
if not self.Con.open:
self.Con = self.get_connection()
if server_side:
cursor = self.Con.cursor(pymysql.cursors.SSDictCursor)
else:
cursor = self.Con.cursor(pymysql.cursors.DictCursor)
elif self.db_type == SQL_SQLITE:
con = self.get_connection()
con.row_factory = dict_factory
cursor = con.cursor()
cursor.execute(S)
return cursor
def create_connection(self, host, user, passwd, database):
"""This creates a connection object\
to the database and returns it for use
"""
try:
self.connection = pymysql.connect(host,
user,
passwd,
database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
except (pymysql.err.DatabaseError,
pymysql.err.IntegrityError,
pymysql.err.MySQLError) as exception:
sys.stderr.write(str(exception))
return 2
finally:
if self.connection is None:
sys.stderr.write("Problem connecting to database\n")
return 0
def create_connection(self, host, user, passwd, database):
"""This creates a connection object\
to the database and returns it for use
"""
try:
self.connection = pymysql.connect(host,
user,
passwd,
database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
except (pymysql.err.DatabaseError,
pymysql.err.IntegrityError,
pymysql.err.MySQLError) as exception:
sys.stderr.write(exception)
return 2
finally:
if self.connection is None:
sys.stderr.write("Problem connecting to database\n")
return 0
def iter(self, query, *parameters, **kwparameters):
"""Returns an iterator for the given query and parameters."""
self._ensure_connected()
cursor = MySQLdb.cursors.SSCursor(self._db)
try:
self._execute(cursor, query, parameters, kwparameters)
column_names = [d[0] for d in cursor.description]
for row in cursor:
yield Row(zip(column_names, row))
finally:
cursor.close()
def make_connection(args):
"""
Factory function to create and return a connection object.
`args` is the parsed result from main arguments parser.
"""
try:
return pymysql.connect(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
db='igem',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
except pymysql.err.DatabaseError as e:
_display_error(e)
def __init__(self,cfg_name='mysql_tunnel'):
import paramiko
import pymysql.cursors
import pymysql
self.host = luigi.configuration.get_config().get(cfg_name, 'host')
self.database = luigi.configuration.get_config().get(cfg_name, 'database')
self.user = luigi.configuration.get_config().get(cfg_name, 'user')
self.password = luigi.configuration.get_config().get(cfg_name, 'password')
self.port = int(luigi.configuration.get_config().get(cfg_name, 'port'))
self.tunnel_dest_host = luigi.configuration.get_config().get(cfg_name, 'tunnel_dest_host')
self.tunnel_dest_user = luigi.configuration.get_config().get(cfg_name, 'user')
self.tunnel_source_host = luigi.configuration.get_config().get(cfg_name, 'tunnel_source_host ')
self.tunnel_source_port = luigi.configuration.get_config().get(cfg_name, 'tunnel_source_port')
self.channel = self.__tunnel__()
self.connection = self.__connect__()
self.cursor = self.connection.cursor()
def connect(self):
await self.instance.db.connect()
await self.instance.apps.discover()
await self.instance.db.initiate()
if self.db_type != 'mysql':
raise Exception('We only support mysql converting right now!')
self.connection = pymysql.connect(
host=self.db_host, user=self.db_user, password=self.db_password, db=self.db_name, charset=self.charset,
port=self.db_port or 3306,
cursorclass=pymysql.cursors.DictCursor
)
def __init__(self):
self.params = get_params()
self.conn = pymysql.connect(
host=self.params.host,
user=self.params.username, password=self.params.password,
db='ceredron', charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
def mysql_connect(self):
"""This method connects to MySQL, returning the connection handle.
:rtype: pymysql.Connection
"""
logger.info('Connecting to MySQL %s', self._mysql_url)
# Connect to the database
return pymysql.connect(host=self._mysql_url,
user=self._mysql_username,
password=self._mysql_psw,
db=self._mysql_dbname,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
def handler(event, context):
global connection, kmsKeyARN, application
try:
print(event)
kmsKeyARN = event['ResourceProperties']['KMSKeyARN']
application = event['ResourceProperties']['Application']
print('Connecting to mysql instance..')
connection = pymysql.connect(host=event['ResourceProperties']['Hostname'],
port=int(event['ResourceProperties']['Port']),
user=event['ResourceProperties']['Username'],
password=decrypt(event['ResourceProperties']['Password']),
cursorclass=pymysql.cursors.DictCursor)
print('Connected.')
response = {}
if event['RequestType'] == 'Create':
response = create(database=event['ResourceProperties']['Database'])
elif event['RequestType'] == 'Update':
response = update(database=event['ResourceProperties']['Database'],
old_database=event['OldResourceProperties']['Database'], username=event['PhysicalResourceId'])
elif event['RequestType'] == 'Delete':
if event['ResourceProperties']['RetainDatabase'] == 'false':
delete(database=event['ResourceProperties']['Database'], username=event['PhysicalResourceId'])
else:
print('Retaining Database.')
else:
print('Invalid RequestType: ' + event['RequestType'])
return cfnresponse.send(event,
context,
cfnresponse.SUCCESS,
response_data=response,
physical_resource_id=(
response['PhysicalResourceId'] if 'PhysicalResourceId' in response else None))
except:
logging.exception("Unhandled Exception")
cfnresponse.send(event, context, cfnresponse.FAILED)
finally:
print('closing connection.')
connection.close()
def __init__(self, host, port, database, user, password):
self.conn = pymysql.connect(host=host, port=port, db=database,
user=user, password=password,
cursorclass=pymysql.cursors.DictCursor)
def pymysql_conn_from_info(conn_info: ConnectionInfo) -> pymysql.Connection:
return pymysql.connect(
host=conn_info.host,
user=conn_info.user,
password=conn_info.password,
db=conn_info.db,
charset=conn_info.charset,
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
def _connect(self):
if self._connection is None:
'''
don't set "use_unicode=0"
SQLAlchemy people say don't set this at all
http://docs.sqlalchemy.org/en/latest/dialects/mysql.html
'''
DB_CHARSET = os.getenv('DB_CHARSET', 'utf8mb4')
DB_COLLATION = os.getenv('DB_COLLATION', 'utf8mb4_general_ci')
DB_SQL_MODE = os.getenv('DB_SQL_MODE', "'ANSI,STRICT_TRANS_TABLES,STRICT_ALL_TABLES,ONLY_FULL_GROUP_BY'")
DB_TIMEZONE = os.getenv('DB_TIMEZONE', "'+2:00'")
self._connection = pymysql.connect(
host=self._host,
user=self._user,
passwd=self._password,
database=self._database,
port=self._port,
charset=DB_CHARSET,
cursorclass=pymysql.cursors.DictCursor,
# persistent
)
# Force MySQL to UTF-8 Encodingclear
self.set_charset(DB_CHARSET, DB_COLLATION)
# self.query("SET NAMES %s COLLATE %s" % (DB_CHARSET, DB_COLLATION))
# self.query("SET CHARACTER_SET_RESULTS='%s'" % DB_CHARSET)
# Default MySQL behavior to conform more closely to SQL standards.
# This allows to run almost seamlessly on many different kinds of database systems.
# These settings force MySQL to behave the same as postgresql, or sqlite
# in regards to syntax interpretation and invalid data handling. See
# https://www.drupal.org/node/344575 for further discussion. Also, as MySQL
# 5.5 changed the meaning of TRADITIONAL we need to spell out the modes one by one
self.query("SET SESSION sql_mode = %s" % DB_SQL_MODE) # Force MySQL ANSI compatibilities
self.query("SET SESSION time_zone = %s" % DB_TIMEZONE)
def saveData(self, category):
import pymysql.cursors
import pymysql
# Connect to the database
connection = pymysql.connect(host='localhost',
user='root',
password='',
db='hht',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
try:
# ????
with connection.cursor() as cursor:
# Read a single record
sql = "SELECT `id`, `typename`, `py` FROM `type` WHERE `py` = %s"
cursor.execute(sql, category)
typeArr = cursor.fetchone()
# ??????????
with connection.cursor() as cursor:
for resOne in self._res:
sql = "INSERT INTO `res` (`type_id`, `name`, `link`) VALUES (%s, %s, %s)"
cursor.execute(sql, (typeArr['id'], resOne['name'], resOne['res']))
connection.commit()
print 'Insert ' + str(len(self._res)) + ' datas with category ' + typeArr['typename']
del typeArr
finally:
connection.close()
return
# ????
def from_settings(cls,settings):
args = settings['MYSQLDB']
# args['cursorclass'] = pymysql.cursors.DictCursor
dbpool = adbapi.ConnectionPool('pymysql',**args)
return cls(dbpool)
def __init__(self, cfg_name='mysql'):
import pymysql.cursors
import pymysql
self.host = luigi.configuration.get_config().get(cfg_name, 'host')
self.database = luigi.configuration.get_config().get(cfg_name, 'database')
self.user = luigi.configuration.get_config().get(cfg_name, 'user')
self.password = luigi.configuration.get_config().get(cfg_name, 'password')
self.port = int(luigi.configuration.get_config().get(cfg_name, 'port'))
self.connection = self.__connect__()
self.cursor = self.connection.cursor()
def query(self, query: str, args=None) -> Cursor:
cursor = self.cnx.cursor(pymysql.cursors.DictCursor)
cursor.execute(query, args)
return cursor