我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用pandas.read_sql_table()。
def import_data_from_psql(user_id): """Import data from psql; clean & merge dataframes.""" library = pd.read_sql_table( 'library', con='postgres:///nextbook', columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages']) book_subjects = pd.read_sql_table( 'book_subjects', con='postgres:///nextbook') subjects = pd.read_sql_table( 'subjects', con='postgres:///nextbook', columns=['subject_id', 'subject']) user_ratings = pd.read_sql_query( sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id), con='postgres:///nextbook') library = library.merge(user_ratings, how='left', on='book_id') library['pages'].fillna(0, inplace=True) # merge subject names into book_subjects; drop uninteresting subjects from book_subjects table book_subjects = book_subjects.merge(subjects, how='left', on='subject_id') delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library'] book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)] return [library, book_subjects, subjects]
def modify_table(self): engine_line = get_engine('db_selection') df = pd.read_sql_table('xiayinxian', engine_line, index_col='index') df['ocupy_ration'] = df['ocupy_ration'].map(lambda x: '%.3f' % x) # print df df.to_sql('xiayingxian', engine_line)
def break_low(self, date): ''' ???????????? :param date: ?????? ‘'2017-11-11 :return: ''' #cmd = 'select * from `{}`'.format(date) df = pd.read_sql_table(date, daily_engine,index_col='index') # **** ???index?????? low_db= get_mysql_conn('db_selection') low_cursor = low_db.cursor() for i in range(len(df)): code = df.loc[i]['code'] cur_low = df.loc[i]['low'] mins_date,mins = self.get_lowest(code, '2017',date) if not mins_date: continue if mins and float(cur_low)<=float(mins) and float(cur_low) !=0.0: print code, print df.loc[i]['name'] print 'year mins {} at {}'.format(mins,mins_date) print 'curent mins ',cur_low create_cmd = 'create table if not exists break_low' \ '(`index` int primary key auto_increment,datetime datetime,code text,name text,low_price float,last_price float, last_price_date datetime);' low_cursor.execute(create_cmd) insert_cmd = 'insert into break_low (datetime,code,name,low_price,last_price,last_price_date) values (%s,%s,%s,%s,%s,%s);' insert_data = (date,code,df.loc[i]['name'],cur_low,mins,mins_date) low_cursor.execute(insert_cmd,insert_data) low_db.commit()
def _xiayingxian(self, row, ratio): ''' ?????? ratio ??????????????????? row: series?? ''' open_p = float(row['open']) # print open_p closed = float(row['close']) # print closed low = float(row['low']) # print low high = float(row['high']) p = min(closed,open_p) try: diff = (p - low) * 1.00 / (high - low) diff=round(diff,3) except ZeroDivisionError: diff = 0 if diff > ratio: xiayinxian_engine = get_engine('db_selection') date,code,name,ocupy_ration ,standards = row['datetime'],row['code'],row['name'],diff,ratio df = pd.DataFrame( {'datetime': [date], 'code': [code], 'name': [name], 'ocupy_ration': [ocupy_ration], 'standards': [standards]}) try: df1=pd.read_sql_table('xiayingxian',xiayinxian_engine,index_col='index') df = pd.concat([df1, df]) except Exception,e: print e #return None df = df.reset_index(drop=True) df.to_sql('xiayingxian',xiayinxian_engine,if_exists='replace') return row
def get_hist_data(code, name, start_data): try: # start_data = datetime.datetime.strptime(str(start_data), '%Y%m%d').strftime('%Y-%m-%d') df = ts.bar(code, conn=conn, start_date=start_data, adj='qfq') except Exception, e: print e return hist_con = get_engine('history') df.insert(1, 'name', name) df = df.reset_index() #print df df2=pd.read_sql_table(code,hist_con,index_col='index') try: new_df = pd.concat([df,df2]) new_df = new_df.reset_index(drop=True) new_df.to_sql(code, engine, if_exists='replace') except Exception, e: print e return
def read_attrs(db_path, table=Annotation.__tablename__, index_col='TranscriptId'): """ Read the attributes database file into a pandas DataFrame :param db_path: path to the attributes database :param table: table name. should generally be annotation :param index_col: column to index on. should generally be tx_id. :return: pandas DataFrame """ engine = create_engine('sqlite:///{}'.format(db_path)) return pd.read_sql_table(table, engine, index_col=index_col)
def load_annotation(ref_db_path): """ Load the reference annotation table :param ref_db_path: path to reference genome database. Must have table Annotation.__tablename__ :return: DataFrame """ engine = create_engine('sqlite:///' + ref_db_path) df = pd.read_sql_table(Annotation.__tablename__, engine) return df
def load_alignment_evaluation(db_path): """ Loads the transMap alignment evaluation table :param db_path: path to genome database :return: DataFrame """ engine = create_engine('sqlite:///' + db_path) df = pd.read_sql_table(TmEval.__tablename__, engine) df = pd.pivot_table(df, index=['TranscriptId', 'AlignmentId'], columns='classifier', values='value') return df.reset_index()
def load_filter_evaluation(db_path): """ Loads the transMap alignment filtering evaluation table :param db_path: path to genome database :return: DataFrame """ engine = create_engine('sqlite:///' + db_path) return pd.read_sql_table(TmFilterEval.__tablename__, engine)
def load_luigi_stats(db_path, table): """ Loads the luigi stats from the stats db :param db_path: path to database :return: DataFrame """ engine = create_engine('sqlite:///' + db_path) return pd.read_sql_table(table, engine)
def test_readonly_axis_blosc_to_sql(self): # GH11880 if not _BLOSC_INSTALLED: pytest.skip('no blosc') if not self._SQLALCHEMY_INSTALLED: pytest.skip('no sqlalchemy') expected = DataFrame({'A': list('abcd')}) df = self.encode_decode(expected, compress='blosc') eng = self._create_sql_engine("sqlite:///:memory:") df.to_sql('test', eng, if_exists='append') result = pandas.read_sql_table('test', eng, index_col='index') result.index.names = [None] assert_frame_equal(expected, result)
def test_readonly_axis_zlib_to_sql(self): # GH11880 if not _ZLIB_INSTALLED: pytest.skip('no zlib') if not self._SQLALCHEMY_INSTALLED: pytest.skip('no sqlalchemy') expected = DataFrame({'A': list('abcd')}) df = self.encode_decode(expected, compress='zlib') eng = self._create_sql_engine("sqlite:///:memory:") df.to_sql('test', eng, if_exists='append') result = pandas.read_sql_table('test', eng, index_col='index') result.index.names = [None] assert_frame_equal(expected, result)
def test_readonly_axis_blosc_to_sql(self): # GH11880 if not _BLOSC_INSTALLED: raise nose.SkipTest('no blosc') if not self._SQLALCHEMY_INSTALLED: raise nose.SkipTest('no sqlalchemy') expected = DataFrame({'A': list('abcd')}) df = self.encode_decode(expected, compress='blosc') eng = self._create_sql_engine("sqlite:///:memory:") df.to_sql('test', eng, if_exists='append') result = pandas.read_sql_table('test', eng, index_col='index') result.index.names = [None] assert_frame_equal(expected, result)
def test_readonly_axis_zlib_to_sql(self): # GH11880 if not _ZLIB_INSTALLED: raise nose.SkipTest('no zlib') if not self._SQLALCHEMY_INSTALLED: raise nose.SkipTest('no sqlalchemy') expected = DataFrame({'A': list('abcd')}) df = self.encode_decode(expected, compress='zlib') eng = self._create_sql_engine("sqlite:///:memory:") df.to_sql('test', eng, if_exists='append') result = pandas.read_sql_table('test', eng, index_col='index') result.index.names = [None] assert_frame_equal(expected, result)
def save_json(): df = pd.read_sql_table(filename, SQLALCHEMY_DATABASE_URI) data = {} for i, row in df.iterrows(): spatial_id = str(row['spatial_id']) if spatial_id not in data.keys(): data[spatial_id] = {} for key, value in row.iteritems(): if key != 'spatial_id': data[spatial_id][key] = str(value) with open(DATA_FOLDER+'results/'+filename+'.json', 'w') as fp: json.dump(data, fp, indent=2)
def __init__(self, path=None): if path is None: path = "sqlite:///" + os.path.join(os.environ["HOME"], "tmp", "keras_logs.db") db_path = path.replace("sqlite:///", "") try: self.logs = pd.read_sql_table("log", path) self.runs = pd.read_sql_table("run", path).rename(columns={"id": "runid"}).sort_values("runid", ascending=False) self.df = self.logs.merge(self.runs) except ValueError: self.runs = pd.DataFrame({"runid":[], "comment":[], "user":[]})
def get_load_areas_table(schema, table, index_col, conn, columns=None): r"""Retrieve load areas intermediate results table from oedb """ # retrieve table with processed input data load_areas = pd.read_sql_table(table, conn, schema=schema, index_col=index_col, columns=columns) return load_areas
def main(config): ''' After we added the new columns drsstep and roi, existing rois contained nulls. This script updates those rows with the correct values. ''' config = load_config(config) database.init(**config['processing_database']) database.connect() engine = create_mysql_engine(**config['fact_database']) with engine.connect() as conn: df = pd.read_sql_table('RunInfo', conn, columns=[ 'fNight', 'fRunID', 'fDrsStep', 'fROI' ]) df.set_index(['fNight', 'fRunID'], inplace=True) query = RawDataFile.select().where(RawDataFile.roi == None) for raw_data_file in tqdm(query, total=query.count()): night = raw_data_file.night.year * 10000 + raw_data_file.night.month * 100 + raw_data_file.night.day raw_data_file.roi = df.loc[(night, raw_data_file.run_id), 'fROI'] raw_data_file.save() query = DrsFile.select().where(DrsFile.roi == None) for drs_file in tqdm(query, total=query.count()): night = drs_file.night.year * 10000 + drs_file.night.month * 100 + drs_file.night.day drs_file.roi = df.loc[(night, drs_file.run_id), 'fROI'] drs_file.drs_step = df.loc[(night, drs_file.run_id), 'fDrsStep'] drs_file.save()
def sql_table(self): df = pd.read_sql_table('2017-11-17', engine, index_col='index')
def update_daily(): ''' ?????? :return: ''' # ?????? SaveData.daily_market() time.sleep(20) daily_conn = get_mysql_conn('daily') cursor = daily_conn.cursor() today = datetime.datetime.now().strftime('%Y-%m-%d') cmd = 'select * from `{}`;'.format(today) cursor.execute(cmd) #today = '2017-11-17' #daily_df = pd.read_sql_table(today,daily_conn,index_col='index') days_info = cursor.fetchall() for i in days_info: code = i[1] name = i[2] close = i[4] opens = i[5] high = i[6] low = i[7] vol = i[9] amount = i[11] try: history_conn = get_mysql_conn('history') history_cur = history_conn.cursor() history_cur.execute('select count(*) from `{}`;'.format(code)) except Exception,e: print e continue l=history_cur.fetchone() df = pd.DataFrame(columns=['datetime', 'code', 'name', 'open', 'close', 'high', 'low', 'vol', 'amount']) df.loc[l] = [today, code, name, opens, close, high, low, vol, amount] try: df.to_sql(code, engine, if_exists='append') print code except Exception, e: print df print e
def run(self): def load_evals(tx_mode): """Loads the error tracks from the database""" cds_table = tools.sqlInterface.tables['CDS'][tx_mode]['evaluation'] mrna_table = tools.sqlInterface.tables['mRNA'][tx_mode]['evaluation'] cds_df = pd.read_sql_table(cds_table.__tablename__, engine).set_index('AlignmentId') mrna_df = pd.read_sql_table(mrna_table.__tablename__, engine).set_index('AlignmentId') return {'CDS': cds_df, 'mRNA': mrna_df} pipeline_args = self.get_pipeline_args() track, trackdb = self.output() chrom_sizes = GenomeFiles.get_args(pipeline_args, self.genome).sizes engine = tools.sqlInterface.create_engine('sqlite:///' + pipeline_args.dbs[self.genome]) evals = {tx_mode: load_evals(tx_mode) for tx_mode in self.tx_modes} consensus_args = Consensus.get_args(pipeline_args, self.genome) consensus_gp_info = pd.read_csv(consensus_args.consensus_gp_info, sep='\t', header=0, na_filter=False).set_index('transcript_id') aln_ids = set(consensus_gp_info.alignment_id) rows = [] for aln_id in aln_ids: tx_mode = tools.nameConversions.alignment_type(aln_id) if tx_mode not in ['transMap', 'augTM', 'augTMR']: continue mode = 'CDS' df = tools.misc.slice_df(evals[tx_mode][mode], aln_id) if len(df) == 0: mode = 'mRNA' df = tools.misc.slice_df(evals[tx_mode][mode], aln_id) for tx_id, s in df.iterrows(): bed = s.tolist() bed[3] = '/'.join([tx_id, bed[3], mode]) rows.append(bed) tmp = luigi.LocalTarget(is_tmp=True) with tmp.open('w') as tmp_handle: tools.fileOps.print_rows(tmp_handle, rows) tools.procOps.run_proc(['bedSort', tmp.path, tmp.path]) with track.open('w') as outf: cmd = ['bedToBigBed', '-type=bed12', '-tab', tmp.path, chrom_sizes, '/dev/stdout'] tools.procOps.run_proc(cmd, stdout=outf, stderr='/dev/null') with trackdb.open('w') as outf: outf.write(error_template.format(genome=self.genome, path=os.path.basename(track.path)))
def table_archiver(server, database, source_table, destination_table, timestamp_column_name='ArchivedDTS'): """ Takes a table and archives a complete copy of it with the addition of a timestamp of when the archive occurred to a given destination table on the same database. This should build a new table if the table doesn't exist. Args: server (str): Server name database (str): Database name source_table (str): Source table name destination_table (str): Destination table name timestamp_column_name (str): New timestamp column name Returns: (str): A string with details on records archived. Example usage:
from healthcareai.common.table_archiver import table_archiver table_archiver('localhost', 'SAM_123', 'RiskScores', 'RiskScoreArchive', 'ArchiveDTS') ``` """ # Basic input validation if type(server) is not str: raise HealthcareAIError('Please specify a server address') if type(database) is not str: raise HealthcareAIError('Please specify a database name') if type(source_table) is not str: raise HealthcareAIError('Please specify a source table name') if type(destination_table) is not str: raise HealthcareAIError('Please specify a destination table name') start_time = time.time() connection_string = 'mssql+pyodbc://{}/{}?driver=SQL+Server+Native+Client+11.0'.format(server, database) # Load the table to be archived df = pd.read_sql_table(source_table, connection_string) number_records_to_add = len(df) # Add timestamp to dataframe df[timestamp_column_name] = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') # Save the new dataframe out to the db without the index, appending values df.to_sql(destination_table, connection_string, index=False, if_exists='append') end_time = time.time() delta_time = end_time - start_time result = 'Archived {0} records from {1}/{2}/{3} to {4} in {5} seconds'.format( number_records_to_add, server, database, source_table, destination_table, delta_time) return result
```