Python pymysql 模块,Error() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pymysql.Error()。
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_insert = 'begin; '
for r in p_block.Rows:
v_insert = v_insert + 'insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + self.Mogrify(r, v_fields) + '; '
v_insert = v_insert + 'commit;'
self.Execute(v_insert)
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def ExecuteScalar(self, p_sql):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def GetFields(self, p_sql):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
v_fields = []
self.v_cur.execute('select * from ( ' + p_sql + ' ) t limit 1')
r = self.v_cur.fetchone()
if r != None:
k = 0
for c in self.v_cur.description:
v_fields.append(DataField(c[0], p_type=type(r[k]), p_dbtype=type(r[k])))
k = k + 1
else:
k = 0
for c in self.v_cur.description:
v_fields.append(DataField(c[0], p_type=type(None), p_dbtype=type(None)))
k = k + 1
return v_fields
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Open(self, p_autocommit=True):
try:
self.v_con = psycopg2.connect(
self.GetConnectionString(),
cursor_factory=psycopg2.extras.DictCursor
)
self.v_con.autocommit = p_autocommit
self.v_cur = self.v_con.cursor()
self.v_start = True
# PostgreSQL types
self.v_cur.execute('select oid, typname from pg_type')
self.v_types = dict([(r['oid'], r['typname']) for r in self.v_cur.fetchall()])
if not p_autocommit:
self.v_con.commit()
self.v_con.notices = DataList()
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except pymysql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except fdb.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except fdb.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except cx_Oracle.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except cx_Oracle.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except cx_Oracle.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except pymssql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except pymssql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except ibm_db_dbi.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except ibm_db_dbi.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(DataField(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.Mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except ibm_db_dbi.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def setUp(self):
_skip_if_no_pymysql()
import pymysql
try:
# Try Travis defaults.
# No real user should allow root access with a blank password.
self.conn = pymysql.connect(host='localhost', user='root',
passwd='', db='pandas_nosetest')
except:
pass
else:
return
try:
self.conn = pymysql.connect(read_default_group='pandas')
except pymysql.ProgrammingError:
raise nose.SkipTest(
"Create a group of connection parameters under the heading "
"[pandas] in your system's mysql default file, "
"typically located at ~/.my.cnf or /etc/.my.cnf. ")
except pymysql.Error:
raise nose.SkipTest(
"Cannot connect to database. "
"Create a group of connection parameters under the heading "
"[pandas] in your system's mysql default file, "
"typically located at ~/.my.cnf or /etc/.my.cnf. ")
def __connect( self, host, port, user, password, charset = 'utf8' ):
'''
?????????MySQL??
:param host:???IP
:param port:??
:param user:???
:param password:??
:param db:???
:param charset:???
:return:
'''
try:
self.__conn = pymysql.connect( host = host, port = port, user = user, password = password,
charset = charset )
except pymysql.Error as e:
print( 'MySQL?????%d?%s' % (e.args[ 0 ], e.args[ 1 ]) )
self.__cur = self.__conn.cursor( )
def insert(self, tablename=None, values=None, letCommit=True):
assert (tablename is not None), "Error: no tablename given"
assert (values is not None), "Error: no values given"
if len(values) > 0:
query = "insert into %s (" % tablename
qvalues = []
for key in values:
query += "%s, " % key
qvalues.append(values.get(key))
query = query[:-2]
query += ") values ("
for v in qvalues:
query += "'%s', " % v
query = query[:-2]
query += ")"
self.select(query=query, letCommit=letCommit)
def getData(self, section=None, key=None, default=None, type=None):
assert (section is not None), "Error: no section given"
assert (key is not None), "Error: no key given"
val = None
if self.conf.has_section(section):
try:
val = self.conf.get(section, key)
except cParser.NoOptionError:
pass
# return default if val is None and default is not None else val if type is None else Helper.safe_cast(val, type)
if val is not None and len(val) > 0:
if type is None:
return val
else:
return Helper.safe_cast(val, type)
elif default is not None:
return default
else:
return None
def __init__(self, log=None, dbHost=None, dbUser=None, dbPassword=None, dbName=None, dbTableName=None,
nameMapping=None, charset="utf8"):
assert (dbHost is not None), "Error: database host ip is mandatory"
assert (dbUser is not None), "Error: database user is mandatory"
assert (dbPassword is not None), "Error: database password is mandatory"
assert (dbName is not None), "Error: database name is mandatory"
assert (dbTableName is not None), "Error: database table name is mandatory"
self.log = log
self.host = dbHost
self.user = dbUser
self.password = dbPassword
self.dbName = dbName
self.dbTableName = dbTableName
self.nameMapping = nameMapping
self.charset = charset
self.mydb = None
self.connect()
def __init__(self, environ):
""" Constructor for object """
assert environ
self._db = None
try:
if 'MYSQL_DB_HOST' in environ:
self._db = mdb.connect(environ['MYSQL_DB_HOST'],
environ['MYSQL_DB_USERNAME'],
environ['MYSQL_DB_PASSWORD'],
'odrs',
use_unicode=True, charset='utf8')
else:
self._db = mdb.connect('localhost', 'test', 'test', 'odrs',
use_unicode=True, charset='utf8')
self._db.autocommit(True)
except mdb.Error as e:
print("Error %d: %s" % (e.args[0], e.args[1]))
assert self._db
def review_modify(self, review):
""" Modifies a review """
try:
cur = self._db.cursor()
cur.execute("UPDATE reviews SET version = %s, "
"distro = %s, locale = %s, "
"summary = %s, description = %s, "
"user_display = %s, reported = %s, "
"user_hash = %s, date_deleted = %s "
"WHERE review_id = %s;",
(review.version,
review.distro,
review.locale,
review.summary,
review.description,
review.user_display,
review.reported,
review.user_hash,
review.date_deleted or 0,
review.review_id,))
except mdb.Error as e:
raise CursorError(cur, e)
return True
def review_add(self, review, user_addr):
""" Add a review to the database """
try:
cur = self._db.cursor()
cur.execute("INSERT INTO reviews (app_id, locale, summary, "
"description, user_hash, user_display, version, "
"distro, rating, user_addr) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
(review.app_id,
review.locale,
review.summary,
review.description,
review.user_hash,
review.user_display,
review.version,
review.distro,
review.rating,
user_addr,))
except mdb.Error as e:
raise CursorError(cur, e)
def vote_add(self, review_id, val, user_hash):
""" Votes on a specific review and add to the votes database """
try:
cur = self._db.cursor()
if val == -5:
cur.execute("UPDATE reviews SET reported = reported + 1 "
"WHERE review_id = %s;", (review_id,))
elif val == 1:
cur.execute("UPDATE reviews SET karma_up = karma_up + 1 "
"WHERE review_id = %s;", (review_id,))
elif val == -1:
cur.execute("UPDATE reviews SET karma_down = karma_down + 1 "
"WHERE review_id = %s;", (review_id,))
cur.execute("INSERT INTO votes (user_hash, review_id, val) "
"VALUES (%s, %s, %s);",
(user_hash, review_id, val,))
except mdb.Error as e:
raise CursorError(cur, e)
def review_get_all(self):
""" Gets all non-removed reviews from the server for all applications """
try:
cur = self._db.cursor()
cur.execute("SELECT review_id, date_created, app_id, locale, summary, "
"description, version, distro, karma_up, karma_down, "
"user_hash, user_display, rating, date_deleted, reported "
"FROM reviews ORDER BY date_created DESC;")
except mdb.Error as e:
raise CursorError(cur, e)
res = cur.fetchall()
if not res:
return []
reviews = []
for e in res:
reviews.append(_create_review(e))
return reviews
def user_get_all(self):
""" Get all the users on the system """
try:
cur = self._db.cursor()
cur.execute("SELECT user_id, date_created, "
"user_hash, karma, is_banned "
"FROM users ORDER BY user_id DESC;")
except mdb.Error as e:
raise CursorError(cur, e)
res = cur.fetchall()
if not res:
return []
users = []
for e in res:
users.append(_create_user(e))
return users
def get_users_by_karma(self, best=True):
""" Returns interesting statistics for the webapp """
try:
cur = self._db.cursor()
if best:
cur.execute("SELECT user_id, date_created, "
"user_hash, karma, is_banned FROM users "
"WHERE karma != 0 ORDER BY karma DESC LIMIT 10;")
else:
cur.execute("SELECT user_id, date_created, "
"user_hash, karma, is_banned FROM users "
"WHERE karma != 0 ORDER BY karma ASC LIMIT 10;")
except mdb.Error as e:
raise CursorError(cur, e)
results = cur.fetchall()
data = []
for res in results:
data.append(_create_user(res))
return data
def user_update_karma(self, user_hash, val):
""" Update the request time for a specific user ID """
# if not existing, create it
user = self.user_get_by_hash(user_hash)
if not user:
self.user_add(user_hash)
return
# update the karma value
try:
cur = self._db.cursor()
cur.execute("UPDATE users SET karma = karma + %s "
"WHERE user_hash = %s;", (val, user_hash,))
except mdb.Error as e:
raise CursorError(cur, e)
def get_stats_by_interval(self, size, interval, msg):
""" Gets stats data """
cnt = []
now = datetime.date.today()
# yes, there's probably a way to do this in one query
cur = self._db.cursor()
for i in range(size):
start = now - datetime.timedelta((i * interval) + interval - 1)
end = now - datetime.timedelta((i * interval) - 1)
try:
cur.execute("SELECT COUNT(*) FROM eventlog "
"WHERE message = %s AND date_created BETWEEN %s "
"AND %s", (msg, start, end,))
except mdb.Error as e:
raise CursorError(cur, e)
res = cur.fetchone()
cnt.append(int(res[0]))
return cnt
def get_analytics_by_interval(self, size, interval):
""" Gets analytics data """
array = []
now = datetime.date.today()
# yes, there's probably a way to do this in one query
cur = self._db.cursor()
for i in range(size):
start = _get_datestr_from_dt(now - datetime.timedelta((i * interval) + interval - 1))
end = _get_datestr_from_dt(now - datetime.timedelta((i * interval) - 1))
try:
cur.execute("SELECT fetch_cnt FROM analytics WHERE "
"datestr BETWEEN %s "
"AND %s", (start, end,))
except mdb.Error as e:
raise CursorError(cur, e)
res = cur.fetchall()
# add all these up
tmp = 0
for r in res:
tmp = tmp + int(r[0])
array.append(tmp)
return array
def mysql_query(host, user, password, port, charset, database, sql):
try:
connection = pymysql.connect(host=host, user=user, password=password, database=database, port=port,
charset=charset,
cursorclass=pymysql.cursors.DictCursor, connect_timeout=5)
except pymysql.Error as e:
# raise RuntimeError("Can't connect to MySQL server")
print e.message or e.args
sys.exit(1)
try:
with connection.cursor() as cursor:
cursor.execute(sql)
connection.commit()
finally:
connection.close()
if cursor is not None:
return cursor
else:
sys.exit(1)
def mdb_query(sql, host, port, user, passwd, db='', dictType = False):
'''
??mysql??????????
'''
conn = None
rows = None
results = None
field_names = None
# Connect to the cluster database
try:
conn = pymysql.connect(host=host,
port=port,
user=user,
passwd=passwd,
db=db,
charset='utf8mb4')
except pymysql.Error as e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
return None
if dictType:
try:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
rows = cursor.execute(sql)
field_names = [i[0] for i in cursor.description]
results = cursor.fetchall()
except pymysql.Error as e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
finally:
conn.close()
else:
try:
with conn.cursor() as cursor:
rows = cursor.execute(sql)
field_names = [i[0] for i in cursor.description]
_results = cursor.fetchall()
results = list(map(list, _results))
except pymysql.Error as e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
finally:
conn.close()
return field_names, results
def Open(self, p_autocommit=True):
try:
self.v_con = sqlite3.connect(self.v_service, self.v_timeout)
#self.v_con.row_factory = sqlite3.Row
self.v_cur = self.v_con.cursor()
if self.v_foreignkeys:
self.v_cur.execute('PRAGMA foreign_keys = ON')
self.v_start = True
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_row = self.v_cur.fetchone()
while v_row is not None:
if p_alltypesstr:
v_rowtmp = list(v_row)
for j in range(0, len(v_table.Columns)):
if v_rowtmp[j] != None:
v_rowtmp[j] = str(v_rowtmp[j])
else:
v_rowtmp[j] = ''
v_row = tuple(v_rowtmp)
v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row)))
v_row = self.v_cur.fetchone()
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def Close(self):
try:
if self.v_con:
self.v_con.commit()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def GetFields(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
v_fields = []
self.v_cur.execute('select * from ( ' + p_sql + ' ) t limit 1')
r = self.v_cur.fetchone()
if r != None:
k = 0
for c in self.v_cur.description:
v_fields.append(DataField(c[0], p_type=type(r[k]), p_dbtype=type(r[k])))
k = k + 1
else:
k = 0
for c in self.v_cur.description:
v_fields.append(DataField(c[0], p_type=type(None), p_dbtype=type(None)))
k = k + 1
return v_fields
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def Open(self, p_autocommit=True):
try:
self.v_con = sqlite3.connect(self.v_service, self.v_timeout)
#self.v_con.row_factory = sqlite3.Row
self.v_cur = self.v_con.cursor()
if self.v_foreignkeys:
self.v_cur.execute('PRAGMA foreign_keys = ON')
self.v_start = True
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.')
else:
self.v_cur.execute(p_sql)
v_table = DataTable()
if self.v_cur.description:
for c in self.v_cur.description:
v_table.Columns.append(c[0])
v_row = self.v_cur.fetchone()
while v_row is not None:
if p_alltypesstr:
v_rowtmp = list(v_row)
for j in range(0, len(v_table.Columns)):
if v_rowtmp[j] != None:
v_rowtmp[j] = str(v_rowtmp[j])
else:
v_rowtmp[j] = ''
v_row = tuple(v_rowtmp)
v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row)))
v_row = self.v_cur.fetchone()
return v_table
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Close(self):
try:
if self.v_con:
self.v_con.commit()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Cancel(self):
try:
if self.v_con:
self.v_con.cancel()
if self.v_cur:
self.v_cur.close()
self.v_cur = None
self.v_con.close()
self.v_con = None
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))