我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlite3.PARSE_DECLTYPES。
def CheckConvertTimestampMicrosecondPadding(self): """ http://bugs.python.org/issue14720 The microsecond parsing of convert_timestamp() should pad with zeros, since the microsecond string "456" actually represents "456000". """ con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) cur = con.cursor() cur.execute("CREATE TABLE t (x TIMESTAMP)") # Microseconds should be 456000 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") # Microseconds should be truncated to 123456 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") cur.execute("SELECT * FROM t") values = [x[0] for x in cur.fetchall()] self.assertEqual(values, [ datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), ])
def init_db(self): """Connect to the database, and create tables if necessary.""" if not self.enabled: self.db = DummyDB() return # use detect_types so that timestamps return datetime objects kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) kwargs.update(self.connection_options) self.db = sqlite3.connect(self.hist_file, **kwargs) self.db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer primary key autoincrement, start timestamp, end timestamp, num_cmds integer, remark text)""") self.db.execute("""CREATE TABLE IF NOT EXISTS history (session integer, line integer, source text, source_raw text, PRIMARY KEY (session, line))""") # Output history is optional, but ensure the table's there so it can be # enabled later. self.db.execute("""CREATE TABLE IF NOT EXISTS output_history (session integer, line integer, output text, PRIMARY KEY (session, line))""") self.db.commit() # success! reset corrupt db count self._corrupt_db_counter = 0
def connect_to_database(): """ Connect to the log database, create the schema if needed. This method MUST be called once and only once. """ if Logger.connected is True: raise RuntimeError("Database already loaded") Logger.connected = True Logger.conn = sqlite3.connect( Config.logfile, check_same_thread=False, isolation_level=None, detect_types=sqlite3.PARSE_DECLTYPES ) Logger.c = Logger.conn.cursor() Logger.c.executescript(""" PRAGMA JOURNAL_MODE = WAL; PRAGMA SYNCHRONOUS = NORMAL; CREATE TABLE IF NOT EXISTS logs ( date INTEGER DEFAULT (strftime('%s','now')) NOT NULL, category TEXT NOT NULL, level INTEGER NOT NULL, message TEXT NOT NULL); CREATE INDEX IF NOT EXISTS log_date_level ON logs (date, level); """)
def connect_to_database(): if Database.connected is True: raise RuntimeError("Database already loaded") Database.connected = True Database.conn = sqlite3.connect( Config.db, check_same_thread=False, isolation_level=None, detect_types=sqlite3.PARSE_DECLTYPES ) Database.c = Database.conn.cursor() Database.c.executescript(Schema.INIT) version = Database.get_meta("schema_version", -1, int) if version == -1: Logger.info("DB_OPERATION", "Creating database") for upd in range(version+1, len(Schema.UPDATERS)): Logger.info("DB_OPERATION", "Applying updater %d" % upd) Database.c.executescript(Schema.UPDATERS[upd]) Database.set_meta("schema_version", upd) Database.conn.commit()
def _restore_dict(self, path, read_only, cache_size): # specific cache dictionary for each table self._cache = defaultdict(dict) # ====== db manager ====== # self._conn = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES) self._conn.text_factory = str self._cursor = self._conn.cursor() # adjust pragma # SQLITE_OPEN_EXCLUSIVE self.connection.execute('PRAGMA main.locking_mode = EXCLUSIVE;') self.connection.execute("PRAGMA main.synchronous = 0;") self.connection.execute("PRAGMA journal_mode = MEMORY;") self.connection.commit() # ====== create default table ====== # self._current_table = SQLiteDict._DEFAULT_TABLE self.set_table(SQLiteDict._DEFAULT_TABLE)
def _open(self): if self._file is None: self._optimize_file_size() path = os.path.dirname(self._filename) if not os.path.exists(path): os.makedirs(path) self._file = sqlite3.connect(self._filename, check_same_thread=False, detect_types=sqlite3.PARSE_DECLTYPES, timeout=1) self._file.isolation_level = None self._cursor = self._file.cursor() self._cursor.execute('PRAGMA journal_mode=MEMORY') self._cursor.execute('PRAGMA busy_timeout=20000') # self._cursor.execute('PRAGMA synchronous=OFF') self._create_table()
def get_list_by_all_types(self, project_name): # get project result = self.get_project(project_name) if not result[0]: return(False, result[1]) # write series to db conn = sqlite3.connect(self.assets_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row c = conn.cursor() assets_list = [] for asset_type in self.asset_types: try: table = asset_type str_ = 'select * from ' + table c.execute(str_) rows = c.fetchall() for row in rows: assets_list.append(row) except: #print(('not found table from type: \" ' + asset_type + ' \"')) continue conn.close() return(True, assets_list)
def get_name_list_by_type(self, project_name, asset_type): result = self.get_project(project_name) if not result[0]: return(False, result[1]) # write series to db conn = sqlite3.connect(self.assets_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row c = conn.cursor() try: table = asset_type str_ = 'select * from ' + table c.execute(str_) rows = c.fetchall() names = [] for row in rows: names.append(row['name']) conn.close() return(True, rows) except: conn.close() return(True, [])
def get_id_name_dict_by_type(self, project_name, asset_type): result = self.get_project(project_name) if not result[0]: return(False, result[1]) # write series to db conn = sqlite3.connect(self.assets_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row c = conn.cursor() try: table = asset_type str_ = 'select * from ' + table c.execute(str_) rows = c.fetchall() asset_id_name_dict = {} for row in rows: asset_id_name_dict[row['id']] = row['name'] conn.close() return(True, asset_id_name_dict) except: conn.close() return(True, [])
def get_by_name(self, project_name, asset_type, asset_name): result = self.get_project(project_name) if not result[0]: return(False, result[1]) # write series to db conn = sqlite3.connect(self.assets_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row c = conn.cursor() try: table = asset_type str_ = 'select * from ' + table + ' where \"name\" = ?' c.execute(str_, (asset_name,)) row = c.fetchone() conn.close() return(True, row) except: conn.close() return(False, 'Not Asset With This Name!')
def get_list(self, project): result = self.get_project(project) if not result[0]: return(False, result[1]) # write series to db try: conn = sqlite3.connect(self.assets_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row c = conn.cursor() except: print(self.assets_path) return(False, ('Not Open .db' + self.assets_path)) try: table = self.series_t str_ = 'select * from ' + table c.execute(str_) rows = c.fetchall() return(True, rows) except: conn.close() return(False, 'Not Table!')
def get_by_id(self, project, id_): result = self.get_project(project) if not result[0]: return(False, result[1]) # write series to db conn = sqlite3.connect(self.assets_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) conn.row_factory = sqlite3.Row c = conn.cursor() try: table = self.series_t str_ = 'select * from ' + table c.execute(str_) rows = c.fetchall() for row in rows: if row['id'] == id_: return(True, row) return(False, 'Not Found Series!') except: conn.close() return(False, 'Not Table!')
def __init__(self, file_name, max_queue_size=100): """Automatically starts the thread. Args: file_name: The name of the file. max_queue_size: The max queries that will be queued. """ threading.Thread.__init__(self) self.daemon = True self.sqlite3_conn = sqlite3.connect( file_name, check_same_thread=False, detect_types=sqlite3.PARSE_DECLTYPES) self.sqlite3_cursor = self.sqlite3_conn.cursor() self.sql_queue = Queue.Queue(maxsize=max_queue_size) self.results = {} self.max_queue_size = max_queue_size self.exit_set = False # Token that is put into queue when close() is called. self.exit_token = str(uuid.uuid4()) self.start() self.thread_running = True
def _open(self): if self._file is None: self._optimize_file_size() path = os.path.dirname(self._filename) if not os.path.exists(path): os.makedirs(path) pass self._file = sqlite3.connect(self._filename, check_same_thread=False, detect_types=sqlite3.PARSE_DECLTYPES, timeout=1) self._file.isolation_level = None self._cursor = self._file.cursor() self._cursor.execute('PRAGMA journal_mode=MEMORY') # self._cursor.execute('PRAGMA synchronous=OFF') self._create_table() pass
def __init__(self, path: str) -> None: self.path = path if os.path.isfile(self.path): self._conn = sqlite3.connect( self.path, detect_types=sqlite3.PARSE_DECLTYPES, isolation_level="DEFERRED") self._conn.create_function("gen_salt", 0, lambda: secure_string(8)) self._cur = self._conn.cursor() self._cur.arraysize = 20 self._cur.executescript("""\ PRAGMA foreign_keys = ON; """) else: self._conn = None self._cur = None # Create adapter from python boolean to sqlite integer. sqlite3.register_adapter(bool, int) sqlite3.register_converter("BOOL", lambda x: bool(int(x)))
def database(connclass): fname = getattr(connclass, 'database', 'default.sqlite') connection = sqlite3.connect(fname, detect_types=sqlite3.PARSE_DECLTYPES) for tablename in dir(connclass): if tablename.startswith('_'): continue tabledata = getattr(connclass, tablename, None) if not isinstance(tabledata, type): continue columns = [] for colname in dir(tabledata): if colname.startswith('_'): continue coldata = getattr(tabledata, colname, None) if coldata in ('INTEGER', 'TEXT'): columns.append('{} {}'.format(colname, coldata)) sql = 'CREATE TABLE IF NOT EXISTS {} ({});' sql = sql.format(tablename, ', '.join(columns)) connection.execute(sql) return connection
def bind_roles(self, server_id): """ This method will read all the roles from the server database and add them to their container :param name: Permission level name in config file :param container: Container list to add the groups to """ self.servers[server_id] = None if not os.path.exists("cache/"): os.makedirs("cache") # Connect to SQLite file for server in cache/SERVERID.sqlite con = sqlite3.connect("cache/" + server_id + ".sqlite", detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) with con: cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)") cur.execute("SELECT * FROM rank_binding") rows = cur.fetchall() rc = Ranks.RankContainer() rc.default.append("@everyone") for row in rows: if row[1] == "Admin": rc.admin.append(row[0]) if row[1] == "Mod": rc.mod.append(row[0]) if row[1] == "Member": rc.member.append(row[0]) self.servers[server_id] = rc
def admin(self, message_object, group): if message_object.author is message_object.server.owner: if not os.path.exists("cache/"): os.makedirs("cache") # Connect to SQLite file for server in cache/SERVERID.sqlite con = sqlite3.connect("cache/" + message_object.server.id + ".sqlite", detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) with con: cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)") cur.execute("INSERT OR IGNORE INTO rank_binding(DiscordGroup, Rank) VALUES(?, ?)", (group, "Admin"))
def bind(self, message_object, group, rank): if message_object.author is message_object.server.owner: if not os.path.exists("cache/"): os.makedirs("cache") # Connect to SQLite file for server in cache/SERVERID.sqlite con = sqlite3.connect("cache/" + message_object.server.id + ".sqlite", detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) with con: cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS rank_binding(DiscordGroup TEXT PRIMARY KEY, Rank TEXT)") cur.execute("INSERT OR IGNORE INTO rank_binding(DiscordGroup, Rank) VALUES(?, ?)", (group, rank)) await self.pm.clientWrap.send_message(self.name, message_object.channel, "Group " + group + " was added as " + rank)
def __init__(self, database_file, database_name): self._connection = sqlite3.connect(database_file, detect_types=sqlite3.PARSE_DECLTYPES) self._database_name = database_name self._cursor = self._connection.cursor() self._cursor.execute(u'''SELECT name FROM sqlite_master WHERE type='table' AND name='%s';''' % self._database_name) if self._cursor.fetchone() is None: self._cursor.execute(self._sql_table.format(self._database_name)) self._connection.commit()
def connect(self): if self.database_name == ':memory:': path = ':memory:' else: db_filename = self.database_name + '.sqlite' path = os.path.join(config.get('database', 'path'), db_filename) if not os.path.isfile(path): raise IOError('Database "%s" doesn\'t exist!' % db_filename) if self._conn is not None: return self self._conn = sqlite.connect(path, detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES) self._conn.create_function('extract', 2, SQLiteExtract.extract) self._conn.create_function('date_trunc', 2, date_trunc) self._conn.create_function('split_part', 3, split_part) self._conn.create_function('position', 2, SQLitePosition.position) self._conn.create_function('overlay', 3, SQLiteOverlay.overlay) self._conn.create_function('overlay', 4, SQLiteOverlay.overlay) if sqlite.sqlite_version_info < (3, 3, 14): self._conn.create_function('replace', 3, replace) self._conn.create_function('now', 0, now) self._conn.create_function('sign', 1, sign) self._conn.create_function('greatest', -1, max) self._conn.create_function('least', -1, min) self._conn.execute('PRAGMA foreign_keys = ON') return self
def __init__(self, path=None): """ Create the connection with the database and attempt to make a table. :param path: relative path of database """ if path is None: if sys.argv[0]: self.path = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), 'log.sqlite3') else: self.path = 'log.sqlite3' else: self.path = path self.conn = sqlite3.connect(self.path, detect_types=sqlite3.PARSE_DECLTYPES) self.cursor = self.conn.cursor() self.create_tables()
def _fields(self): conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES) c = conn.cursor() c.execute("SELECT * FROM tweets") fields=tuple([f[0] for f in c.description]) c.close() return fields
def all(self): conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES) df = pd.read_sql_query( 'SELECT * FROM tweets', conn, parse_dates=['created_at'] ) return df
def tweets_since(self, dt): """ Retrieves all tweets since a particular datetime as a generator that iterates on ``chunksize``. :param dt: The starting datetime to query from. """ conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES) df = pd.read_sql_query( 'SELECT * FROM tweets WHERE created_at > ?', conn, params=(dt,), parse_dates=['created_at'] ) return TweetBin(df, dt, datetime.now())
def tweets_between(self, start, end): """ Retrieve tweets between the start and and datetimes. Returns a generator that iterates on ``chunksize``. :param start: The start of the search range. :type start: datetime :param end: The end of the search range. :type end: datetime """ conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES) df = pd.read_sql_query( 'SELECT * FROM tweets WHERE created_at > ? AND created_at <= ?', conn, params=(start, end), parse_dates=['created_at'] ) return TweetBin(df, start, end)
def update_tweet(self, tweet): """ Updates a tweet already in the database. :param tweet: The :class:`Tweet` to update. """ id_str = tweet['id_str'] buf = [k for k in tweet.keys() if k!='id_str'] vals = tuple([tweet[k] for k in buf]) updt = '' while len(buf) > 0: cur = buf.pop() if len(updt)>0: updt = updt + f',{cur}=?' else: updt = updt + f'{cur}=?' qry = f'UPDATE tweets SET {updt} WHERE id_str=?' try: conn = sqlite3.connect( self._db, detect_types=sqlite3.PARSE_DECLTYPES ) c = conn.cursor() c.execute(qry, vals+(id_str,)) c.close() conn.commit() except: if self._debug: logging.warning(f'Failed Query: {qry}, {vals+(id_str,)}')
def setUp(self): self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) self.cur = self.con.cursor() self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))") # override float, make them always return the same number sqlite.converters["FLOAT"] = lambda x: 47.2 # and implement two custom ones sqlite.converters["BOOL"] = lambda x: bool(int(x)) sqlite.converters["FOO"] = DeclTypesTests.Foo sqlite.converters["WRONG"] = lambda x: "WRONG" sqlite.converters["NUMBER"] = float
def CheckDeclTypeNotUsed(self): """ Assures that the declared type is not used when PARSE_DECLTYPES is not set. """ self.cur.execute("insert into test(x) values (?)", ("xxx",)) self.cur.execute("select x from test") val = self.cur.fetchone()[0] self.assertEqual(val, "xxx")
def setUp(self): self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) self.cur = self.con.cursor() self.cur.execute("create table test(d date, ts timestamp)")
def CheckTypeMapUsage(self): """ pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling a statement. This test exhibits the problem. """ SELECT = "select * from foo" con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) con.execute("create table foo(bar timestamp)") con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) con.execute(SELECT) con.execute("drop table foo") con.execute("create table foo(bar integer)") con.execute("insert into foo(bar) values (5)") con.execute(SELECT)