Python pandas 模块,read_sql_query() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pandas.read_sql_query()。
def get_classified_songs(self, telegram_id):
conn = sqlite3.connect(self._DATABASE)
sql = """
SELECT
danceability, energy, loudness, speechiness, acousticness,
instrumentalness, liveness, valence, tempo, activity
FROM songs s, users u, song_user su
WHERE
activity IS NOT NULL AND
s.id = su.song_id AND
su.user_id = u.id AND
u.telegram_user_id = {}
""".format(telegram_id)
resp = pd.read_sql_query(sql, conn)
conn.close()
return resp
def build_df(table: str = 'articles',
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> pd.DataFrame:
"""Build dataframe with derived fields."""
with closing(sqlite3.connect(DB_FILE_NAME)) as conn:
articles = pd.read_sql_query(f'select * from {table}', conn)
articles['date'] = pd.to_datetime(articles['publish_date'])
if start_date:
articles = articles.loc[articles['date'] >= start_date]
if end_date:
articles = articles.loc[articles['date'] <= end_date]
articles = articles.replace([None], [''], regex=True)
articles['base_url'] = articles.apply(get_url_base, axis=1)
articles['word_count'] = articles.apply(count_words, axis=1)
return articles
def SensitivityQuery(self, table, data_set):
# Returns the number of times an analyte is found at each concentration and the
# number of repetitions in a particular data set.
sql_statement = "SELECT COUNT(%s.id) AS Count, %s.Concentration_pg AS Conc_pg, \
DataSetConcentrations.Repetitions AS Repetitions \
FROM \
Sample \
INNER JOIN %s ON \
%s.id = Sample.%s_foreignkey \
INNER JOIN DataSetConcentrations ON \
DataSetConcentrations.id = Sample.DataSetConcentrations_foreignkey \
WHERE \
Sample.DataSetName = '%s' \
GROUP BY \
Conc_pg \
ORDER BY \
Conc_pg;" % (table, table, table, table, table, data_set)
return pd.read_sql_query(sql_statement, self.conn)
def GetRepsAtEachConcentration(self, analyte_table_lst, data_set):
df = pd.DataFrame()
for table in analyte_table_lst:
sql_statement = "SELECT \
%s.Concentration_pg AS Conc, COUNT(%s.Concentration_pg) AS %s \
FROM \
Sample \
Inner Join %s ON \
%s.id = Sample.%s_foreignkey \
WHERE \
DataSetName = '%s' \
GROUP BY 1 \
ORDER BY 1 ASC;" % (table, table, table, table, table, table, data_set)
df1 = pd.read_sql_query(sql_statement, self.conn)
df1.set_index('Conc', inplace=True)
df = pd.concat([df, df1], axis=1)
return df
def import_data_from_psql(user_id):
"""Import data from psql; clean & merge dataframes."""
library = pd.read_sql_table(
'library',
con='postgres:///nextbook',
columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages'])
book_subjects = pd.read_sql_table(
'book_subjects',
con='postgres:///nextbook')
subjects = pd.read_sql_table(
'subjects', con='postgres:///nextbook',
columns=['subject_id', 'subject'])
user_ratings = pd.read_sql_query(
sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id),
con='postgres:///nextbook')
library = library.merge(user_ratings, how='left', on='book_id')
library['pages'].fillna(0, inplace=True)
# merge subject names into book_subjects; drop uninteresting subjects from book_subjects table
book_subjects = book_subjects.merge(subjects, how='left', on='subject_id')
delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library']
book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)]
return [library, book_subjects, subjects]
def test_sql_open_close(self):
# Test if the IO in the database still work if the connection closed
# between the writing and reading (as in many real situations).
with tm.ensure_clean() as name:
conn = self.connect(name)
sql.to_sql(self.test_frame3, "test_frame3_legacy", conn,
flavor="sqlite", index=False)
conn.close()
conn = self.connect(name)
result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;",
conn)
conn.close()
tm.assert_frame_equal(self.test_frame3, result)
def test_datetime(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.to_sql('test_datetime', self.conn)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
result = result.drop('index', axis=1)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
result = result.drop('index', axis=1)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'])
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)
def test_datetime_NaT(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.loc[1, 'A'] = np.nan
df.to_sql('test_datetime', self.conn, index=False)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'], errors='coerce')
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)
def test_datetime_time(self):
# test support for datetime.time
df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
df.to_sql('test_time', self.conn, index=False)
res = read_sql_table('test_time', self.conn)
tm.assert_frame_equal(res, df)
# GH8341
# first, use the fallback to have the sqlite adapter put in place
sqlite_conn = TestSQLiteFallback.connect()
sql.to_sql(df, "test_time2", sqlite_conn, index=False)
res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
tm.assert_frame_equal(ref, res) # check if adapter is in place
# then test if sqlalchemy is unaffected by the sqlite adapter
sql.to_sql(df, "test_time3", self.conn, index=False)
if self.flavor == 'sqlite':
res = sql.read_sql_query("SELECT * FROM test_time3", self.conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
tm.assert_frame_equal(ref, res)
res = sql.read_sql_table("test_time3", self.conn)
tm.assert_frame_equal(df, res)
def test_connectable_issue_example(self):
# This tests the example raised in issue
# https://github.com/pydata/pandas/issues/10104
def foo(connection):
query = 'SELECT test_foo_data FROM test_foo_data'
return sql.read_sql_query(query, con=connection)
def bar(connection, data):
data.to_sql(name='test_foo_data',
con=connection, if_exists='append')
def main(connectable):
with connectable.connect() as conn:
with conn.begin():
foo_data = conn.run_callable(foo)
conn.run_callable(bar, foo_data)
DataFrame({'test_foo_data': [0, 1, 2]}).to_sql(
'test_foo_data', self.conn)
main(self.conn)
def test_temporary_table(self):
test_data = u'Hello, World!'
expected = DataFrame({'spam': [test_data]})
Base = declarative.declarative_base()
class Temporary(Base):
__tablename__ = 'temp_test'
__table_args__ = {'prefixes': ['TEMPORARY']}
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)
Session = sa_session.sessionmaker(bind=self.conn)
session = Session()
with session.transaction:
conn = session.connection()
Temporary.__table__.create(conn)
session.add(Temporary(spam=test_data))
session.flush()
df = sql.read_sql_query(
sql=sqlalchemy.select([Temporary.spam]),
con=conn,
)
tm.assert_frame_equal(df, expected)
def _get_index_columns(self, tbl_name):
ixs = sql.read_sql_query(
"SHOW INDEX IN %s" % tbl_name, self.conn)
ix_cols = {}
for ix_name, ix_col in zip(ixs.Key_name, ixs.Column_name):
if ix_name not in ix_cols:
ix_cols[ix_name] = []
ix_cols[ix_name].append(ix_col)
return list(ix_cols.values())
# TODO: cruft?
# def test_to_sql_save_index(self):
# self._to_sql_save_index()
# for ix_name, ix_col in zip(ixs.Key_name, ixs.Column_name):
# if ix_name not in ix_cols:
# ix_cols[ix_name] = []
# ix_cols[ix_name].append(ix_col)
# return ix_cols.values()
def get_data(query):
"""
Pulls data from the db based on the query
Input
-----
query: str
SQL query from the database
Output
------
data: DataFrame
Dump of Query into a DataFrame
"""
from setup_environment import db_dict
with setup_environment.connect_to_syracuse_db(**db_dict) as conn:
data = pd.read_sql_query(query, conn)
return data
def show_hexbin(self, query):
"""shows hexbin plot over map
Args:
query: name of sql
"""
self.load()
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
hx = self.base_map.hexbin(
np.array([geom.x for geom in points]),
np.array([geom.y for geom in points]),
gridsize=275,
bins='log',
mincnt=1,
edgecolor='none',
alpha=1.,
lw=0.2,
cmap=plt.get_cmap('afmhot'))
plt.tight_layout()
plt.show()
def show_scatter(self, query, color='blue'):
self.load()
"""shows scatter plot over map
Args:
query: name of sql
"""
data = pd.read_sql_query(con=self.con, sql=query)
points = self.gen_points(data, self.data_map)
plot = self.base_map.scatter(
[point.x for point in points],
[point.y for point in points],
10, marker='o', lw=.25,
facecolor=color, edgecolor='w',
alpha=0.9, antialiased=True,
zorder=3)
plt.show()
def get_classified_songs(self, telegram_id):
conn = sqlite3.connect(self._DATABASE)
sql = """
SELECT
danceability, energy, loudness, speechiness, acousticness,
instrumentalness, liveness, valence, tempo, activity
FROM songs s, users u, song_user su
WHERE
activity IS NOT NULL AND
s.id = su.song_id AND
su.user_id = u.id AND
u.telegram_user_id = {}
""".format(telegram_id)
resp = pd.read_sql_query(sql, conn)
conn.close()
return resp
def get_upstream_stops_ratio(self, target, trough_stops, ratio):
"""
Selects the stops for which the ratio or higher proportion of trips to the target passes trough a set of trough stops
:param target: target of trips
:param trough_stops: stops where the selected trips are passing trough
:param ratio: threshold for inclusion
:return:
"""
if isinstance(trough_stops, list):
trough_stops = ",".join(trough_stops)
query = """SELECT stops.* FROM other.stops,
(SELECT q2.from_stop_I AS stop_I FROM
(SELECT journeys.from_stop_I, count(*) AS n_total FROM journeys
WHERE journeys.to_stop_I = {target}
GROUP BY from_stop_I) q1,
(SELECT journeys.from_stop_I, count(*) AS n_trough FROM journeys, legs
WHERE journeys.journey_id=legs.journey_id AND legs.from_stop_I IN ({trough_stops}) AND journeys.to_stop_I = {target}
GROUP BY journeys.from_stop_I) q2
WHERE q1.from_stop_I = q2.from_stop_I AND n_trough/(n_total*1.0) >= {ratio}) q1
WHERE stops.stop_I = q1.stop_I""".format(target=target, trough_stops=trough_stops, ratio=ratio)
df = read_sql_query(query, self.conn)
return df
def get_directly_accessible_stops_within_distance(self, stop, distance):
"""
Returns stops that are accessible without transfer from the stops that are within a specific walking distance
:param stop: int
:param distance: int
:return:
"""
query = """SELECT stop.* FROM
(SELECT st2.* FROM
(SELECT * FROM stop_distances
WHERE from_stop_I = %s) sd,
(SELECT * FROM stop_times) st1,
(SELECT * FROM stop_times) st2
WHERE sd.d < %s AND sd.to_stop_I = st1.stop_I AND st1.trip_I = st2.trip_I
GROUP BY st2.stop_I) sq,
(SELECT * FROM stops) stop
WHERE sq.stop_I = stop.stop_I""" % (stop, distance)
return pd.read_sql_query(query, self.conn)
def setUp(self):
self.data = {
"DC_PEC": '''
import pandas as pd
from sqlalchemy import create_engine
from urllib.request import urlretrieve; urlretrieve('https://s3.amazonaws.com/assets.datacamp.com/production/course_998/datasets/Chinook.sqlite', 'Chinook.sqlite')
engine = create_engine('sqlite:///Chinook.sqlite')
''',
"DC_CODE": '''
# Execute query and store records in dataframe: df
df = pd.read_sql_query("ELECT * FROM PlaylistTrack INNER JOIN Track on PlaylistTrack.TrackId = Track.TrackId WHERE Milliseconds < 250000", engine)
# Print head of dataframe
print(df.head())
''',
"DC_SOLUTION": '''
# Execute query and store records in dataframe: df
df = pd.read_sql_query("SELECT * FROM PlaylistTrack INNER JOIN Track on PlaylistTrack.TrackId = Track.TrackId WHERE Milliseconds < 250000", engine)
# Print head of dataframe
print(df.head())
'''
}
def test_Pass(self):
self.data["DC_SCT"] = '''
# Test: call to read_sql_query() and 'df' variable
test_correct(
lambda: test_object("df"),
lambda: test_function("pandas.read_sql_query", do_eval = False)
)
# Test: Predefined code
predef_msg = "You don't have to change any of the predefined code."
test_function("print", incorrect_msg = predef_msg)
success_msg("Great work!")
'''
sct_payload = helper.run(self.data)
self.assertFalse(sct_payload['correct'])
def run(x, string):
print("Processing chunk: {}".format(string))
conn = sqlite3.connect(':memory:')
c = conn.cursor()
try:
c.execute("""CREATE TABLE war (nb_trick int)""")
conn.commit()
except sqlite3.OperationalError:
pass
for i in range(x):
b = Cython_War_Trick.Battle()
result = b.trick()
c.execute("""INSERT INTO war VALUES (?)""", [result])
conn.commit()
chunk = pd.read_sql_query("""SELECT nb_trick FROM war""", conn)
f = chunk['nb_trick'].value_counts()
return f
def run(self):
result_list = []
sql = None
while True:
self.lock.acquire()
if not self.queue.empty():
sql = self.queue.get()
self.lock.release()
else:
self.lock.release()
break
stock_data = pd.read_sql_query(sql, con=self.db)
stock_data = stock_data.set_index('datetime')
result_list.append(stock_data)
print "A stock has finished reading and {} stocks left".format(self.queue.qsize())
self.lock.acquire()
self.parent_list.extend(result_list)
self.lock.release()
self.db.close()
def get_nr_particles_per_population(self) -> pd.Series:
"""
Returns
-------
nr_particles_per_population: pd.DataFrame
A pandas DataFrame containing the number
of particles for each population
"""
query = (self._session.query(Population.t)
.join(ABCSMC)
.join(Model)
.join(Particle)
.filter(ABCSMC.id == self.id))
df = pd.read_sql_query(query.statement, self._engine)
nr_particles_per_population = df.t.value_counts().sort_index()
return nr_particles_per_population
def get_predict_acc2(debug=False):
db = Db()
engine = db._get_engine()
sql_stocklist = "select * from acc1"
if debug:
pass
df = pd.read_sql_query(sql_stocklist, engine)
acc2 = df.sort_values('c_yearmonthday', ascending=0)
acc2 = acc2.head(2)
acc2 = acc2.groupby('c_yearmonthday').sum()
acc2_final = pd.DataFrame()
acc2_final['h_p_acc'] = [df['acc'].sum() / float(df['acc'].count())]
acc2_final['h_p_change'] = [df['p_change'].sum() / 2.0]
acc2_final['p_acc'] = [acc2['acc'].sum() / 2.0]
acc2_final['p_change'] = [acc2['p_change'].sum() / 2.0]
return acc2_final
def get_data(self,labels_=None, data=None):
print('Loading CleanText from DataBase from...')
conn = connect('/home/gondin/metis/project/clinton-email-download/hrcemail3.sqlite')
sql = """SELECT Keywords, Polarity, Subjectivity, "from", cluster_labels, pdf_path as "Email" FROM document;"""
self.data = pd.read_sql_query(sql, conn)
self.data['Similarity'] = self.similarity[:,0]
conn.close()
#self.data = self.data.sample(1000)
self.data = self.data.sample(15000,random_state=44)
# labels_ =self.labels_ ==self.label_
labels_ = self.labels_
print(self.data.shape)
print(labels_.shape)
self.data.Polarity = self.data.Polarity.apply(lambda x: round(x,2))
return (self.data.ix[labels_ & (self.data.cluster_labels>0), ['Keywords','Similarity','Polarity', 'Subjectivity', "from","Email"]].sort_values('Similarity'))
#return (self.data.ix[labels_, ['Keywords','dist', "Email"]].sort_values('dist'))
def get_data(self,labels_=None, data=None):
print('Loading CleanText from DataBase from...')
conn = connect('/home/gondin/metis/project/clinton-email-download/hrcemail3.sqlite')
sql = """SELECT Keywords, Polarity, Subjectivity, "from", cluster_labels, pdf_path as "Email" FROM document;"""
self.data = pd.read_sql_query(sql, conn)
self.data['Similarity'] = self.similarity[:,0]
conn.close()
#self.data = self.data.sample(1000)
self.data = self.data.sample(15000,random_state=44)
# labels_ =self.labels_ ==self.label_
labels_ = self.labels_
print(self.data.shape)
print(labels_.shape)
self.data.Polarity = self.data.Polarity.apply(lambda x: round(x,2))
return (self.data.ix[labels_ & (self.data.cluster_labels>0), ['Keywords','Similarity','Polarity', 'Subjectivity', "from","Email"]].sort_values('Similarity'))
#return (self.data.ix[labels_, ['Keywords','dist', "Email"]].sort_values('dist'))
def query_wikidata_mysql(query):
with SSHTunnelForwarder((ssh_host, ssh_port), ssh_username=ssh_user, ssh_pkey=mypkey,
remote_bind_address=(sql_hostname, sql_port)) as tunnel:
conn = pymysql.connect(host='127.0.0.1', user=sql_user, password=sql_pass, db=sql_main_database,
port=tunnel.local_bind_port)
df = pd.read_sql_query(query, conn)
conn.close()
return df
def tweet_dates(self):
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_COLNAMES)
df = pd.read_sql_query(
'SELECT created_at FROM tweets', conn, parse_dates=['created_at'],
index_col=['created_at']
)
return df
def all(self):
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES)
df = pd.read_sql_query(
'SELECT * FROM tweets', conn, parse_dates=['created_at']
)
return df
def tweets_since(self, dt):
"""
Retrieves all tweets since a particular datetime as a generator that
iterates on ``chunksize``.
:param dt: The starting datetime to query from.
"""
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES)
df = pd.read_sql_query(
'SELECT * FROM tweets WHERE created_at > ?', conn, params=(dt,),
parse_dates=['created_at']
)
return TweetBin(df, dt, datetime.now())
def tweets_between(self, start, end):
"""
Retrieve tweets between the start and and datetimes. Returns a generator
that iterates on ``chunksize``.
:param start: The start of the search range.
:type start: datetime
:param end: The end of the search range.
:type end: datetime
"""
conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES)
df = pd.read_sql_query(
'SELECT * FROM tweets WHERE created_at > ? AND created_at <= ?',
conn, params=(start, end), parse_dates=['created_at']
)
return TweetBin(df, start, end)
def get_related_id(session, engine, parameters):
category = parameters.split(":",1)[0]
sql_query=session.query(ALLOWED_CLASSES[category])
for field_value in parameters.split(":",1)[1].split("&&"):
field, value = field_value.split(".",1)
if ":" in value:
values = get_related_id(session, engine, value)
for value in values:
value=int(value) # the value is returned as a numpy object
if field[-4:] == "date": # support for date entry matching (the values have to be passes as string but matched as datetime)
value = datetime.datetime(*[int(i) for i in value.split(",")])
sql_query = sql_query.filter(getattr(ALLOWED_CLASSES[category], field)==value)
else:
if field[-4:] == "date": # support for date entry matching (the values have to be passes as string but matched as datetime)
value = datetime.datetime(*[int(i) for i in value.split(",")])
sql_query = sql_query.filter(getattr(ALLOWED_CLASSES[category], field)==value)
mystring = sql_query.statement
mydf = pd.read_sql_query(mystring,engine)
mydf = mydf.T.groupby(level=0).first().T #awkward hack to deal with polymorphic tables returning multiple IDs
related_table_ids = mydf["id"]
input_values = list(related_table_ids)
if input_values == []:
raise BaseException("No entry was found with a value of \""+str(value)+"\" on the \""+field+"\" column of the \""+category+"\" CATEGORY, in the database.")
session.close()
engine.dispose()
return input_values
def get_for_protocolize(db_path, class_name, code):
"""Return a dataframe containing a specific entry from a given class name, joined with its related tables up to three levels down.
"""
session, engine = load_session(db_path)
cols = []
joins = []
classobject = ALLOWED_CLASSES[class_name]
insp = sqlalchemy.inspection.inspect(classobject)
for name, col in insp.columns.items():
cols.append(col.label(name))
for name, rel in insp.relationships.items():
alias = aliased(rel.mapper.class_, name=name)
joins.append((alias, rel.class_attribute))
for col_name, col in sqlalchemy.inspection.inspect(rel.mapper).columns.items():
#the id column causes double entries, as it is mapped once on the parent table (related_table_id) and once on the child table (table_id)
if col.key != "id":
aliased_col = getattr(alias, col.key)
cols.append(aliased_col.label("{}_{}".format(name, col_name)))
sub_insp = sqlalchemy.inspection.inspect(rel.mapper.class_)
for sub_name, sub_rel in sub_insp.relationships.items():
if "contains" not in sub_name:
sub_alias = aliased(sub_rel.mapper.class_, name=name+"_"+sub_name)
joins.append((sub_alias, sub_rel.class_attribute))
for sub_col_name, sub_col in sqlalchemy.inspection.inspect(sub_rel.mapper).columns.items():
#the id column causes double entries, as it is mapped once on the parent table (related_table_id) and once on the child table (table_id)
if sub_col.key != "id":
sub_aliased_col = getattr(sub_alias, sub_col.key)
cols.append(sub_aliased_col.label("{}_{}_{}".format(name, sub_name, sub_col_name)))
sql_query = session.query(*cols).select_from(classobject)
for join in joins:
sql_query = sql_query.outerjoin(*join)
sql_query = sql_query.filter(classobject.code == code)
mystring = sql_query.statement
mydf = pd.read_sql_query(mystring,engine)
session.close()
engine.dispose()
return mydf
def DataSetAnalytes(self, data_set, TableProcessingToReturn='Both_PeakFinding_TargetAnalyteFinding'):
# Query all foreign key columns in Sample table and return a list of all analyts that are
# found in a given data set.
column_string = self.createQueryColumnsStr(TableProcessingToReturn)
# Build SQL statement & Query sample table for all foreign key columns of a given data set
sql_statement = "SELECT %s FROM Sample WHERE Sample.DataSetName = '%s';" % (column_string, data_set)
df = pd.read_sql_query(sql_statement, self.conn)
return self.GetFoundAnalytesLst(df)
def Get_100fgArea(self, table, data_set):
sql_statement = "SELECT %s.Area AS Area_100fg \
FROM \
Sample \
Inner Join %s ON \
%s.id = Sample.%s_foreignkey \
WHERE \
Sample.DataSetName = '%s' AND \
%s.Concentration_pg = 0.1;" % (table, table, table, table, data_set, table)
return pd.read_sql_query(sql_statement, self.conn)
def GetAveSimilarities(self, table, data_set):
sql_statement = "SELECT AVG(%s.Similarity) AS Ave_Similarity, %s.Concentration_pg AS Conc_pg \
FROM \
Sample \
Inner Join %s ON \
%s.id = Sample.%s_foreignkey \
WHERE \
Sample.DataSetName = '%s' \
GROUP BY Conc_pg;" % (table, table, table, table, table, data_set)
return pd.read_sql_query(sql_statement, self.conn)
def Similarities(self, table, data_sets):
# This query provides all the data to create the Concentration vs Similarity plot and tables
#condition = "DataSet = '" + "' OR DataSet = '".join(data_sets) + "' "
condition = self.CreateConditionClause_OrSeriesStr(data_sets)
sql_statement = "SELECT %s.Similarity AS Similarity, %s.Concentration_pg AS Conc_pg, \
Sample.Instrument AS SerNo, Sample.DataSetName AS DataSet \
FROM \
Sample \
Inner Join %s ON \
%s.id = Sample.%s_foreignkey \
WHERE \
%s \
ORDER BY SerNo, Conc_pg ASC;" % (table, table, table, table, table, condition)
return pd.read_sql_query(sql_statement, self.conn)
def ClearDataSetData(self, data_sets, analyte_table_lst):
# Clears all the data from the database for the define data sets
# append the DataSetConcentrations table so it too will be included in the data clearing
analyte_table_lst.append('DataSetConcentrations')
# Create a single string that contains all the forigen keys in the sample table
# that has data to be removed (which is comma space delimited)
ForeignKeyColumn_lst = [col + '_foreignkey' for col in analyte_table_lst]
ForeignKeyColumn_Columns = ', '.join(ForeignKeyColumn_lst)
# Get condition string from data sets
data_set_condition = self.CreateConditionClause_OrSeriesStr(data_sets, "DataSetName")
# Resulting df: columns correspond to tables that contain data to be removed, the values in each column
# are the primary keys of records within that table that need to be deleted.
sql_statement = 'SELECT %s FROM Sample WHERE %s;' % (ForeignKeyColumn_Columns, data_set_condition)
df = pd.read_sql_query(sql_statement, self.conn)
df.columns = analyte_table_lst
# Iterate through the dataframe by column to delete each record from the db
# note: column = table name
for column in df:
condition = self.CreateConditionClause_OrSeriesStr(set(df[column]), "id")
self.conn.execute('DELETE FROM %s WHERE %s' % (column, condition))
# Finally remove the defined records from the sample table as well
self.conn.execute('DELETE FROM Sample WHERE %s' % data_set_condition)
self.CommitDB()
def write_locations(field, query_template, plate_name, row, conn, config):
# Read cells file for each image
query = query_template.replace("@@@",field).format(
plate_name,
row["Metadata_Well"],
row["Metadata_Site"]
)
locations = pd.read_sql_query(query, conn)
# Keep center coordinates only, remove NaNs, and transform to integers
locations = locations.dropna(axis=0, how="any")
locations[field+"_Location_Center_X"] = locations[field+"_Location_Center_X"]*config["compression"]["scaling_factor"]
locations[field+"_Location_Center_Y"] = locations[field+"_Location_Center_Y"]*config["compression"]["scaling_factor"]
locations[field+"_Location_Center_X"] = locations[field+"_Location_Center_X"].astype(int)
locations[field+"_Location_Center_Y"] = locations[field+"_Location_Center_Y"].astype(int)
# Save the resulting dataset frame in the output directory
loc_file = "{}/{}/locations/{}-{}-{}.csv".format(
config["compression"]["output_dir"],
row["Metadata_Plate"],
row["Metadata_Well"],
row["Metadata_Site"],
field
)
dataset.utils.check_path(loc_file)
locations.to_csv(loc_file, index=False)
def run(self):
self.initialize()
tags = pd.read_sql_query("SELECT * FROM TAGS", self.con)
self.collect_changes(tags)
self.apply_changes_if_not_dry()
self.close()
def df_query(self, query, with_labels=False):
"""
Run a :mod:`sqlalchemy` query and return result as a :class:`pandas.DataFrame`
Args:
query (sqlalchemy.orm.query.Query): query object, usually generated by :func:`session.query()` in
an :class:`sqlalchemy.orm.session.Session`
with_labels (bool): A query for fields with the same name from different tables will cause problems
when converting it to a :class:`pandas.DataFrame`, because there will be duplicate column names.
When setting `with_labels=True`, disambiguation labels are assigned to all (!)
fields in the query - the field name is prefixed with the column name. This enables
querying fields with identical names from multiple tables but getting unique column names in the output.
:return: query result as :class:`pandas.DataFrame`
"""
import pandas as pd
if with_labels:
query = query.with_labels()
# compile sql statement, including arguments
statement = query.statement.compile(self.engine)
# run query
return pd.read_sql_query(sql=statement, con=self.engine)
def get_pandas_df(self, sql):
"""
We need to overide this method in order to use connections other than the default_conn_name
:param sql: A query input via the web UI at /admin/queryview/
:return: a Panda data frame
"""
conn = self.get_conn(self.snowflake_conn_id)
df = pd.read_sql_query(sql, conn)
return df
def get_test_2(self):
return pd.read_sql_query('select date,open,close,high,low,vol,code '
'from (select * from t_stick_data_m_test order by date desc limit 15) a '
'order by date asc;',self.sqlconn)
def build_model(all_features, categorical_features, target, connection_string, filename):
engine = create_engine(connection_string)
query = """SELECT * FROM connection_features"""
df = pd.read_sql_query(query, con=engine, index_col=['departurestop', 'departuredate', 'route'])
df.index.levels[0].name = 'stationfrom'
df.index.levels[1].name = 'date'
df.index.levels[2].name = 'vehicle'
df = df.reset_index()
model = build_model_random_forest(df, all_features, categorical_features, target)
joblib.dump(model, filename)
def predict(all_features, categorical_features, connection_string, filename):
engine = create_engine(connection_string)
model = joblib.load(filename)
print model
query = """SELECT * FROM connection_features"""
df = pd.read_sql_query(query, con=engine, index_col=['departurestop', 'departuredate', 'route'])
df.index.levels[0].name = 'stationfrom'
df.index.levels[1].name = 'date'
df.index.levels[2].name = 'vehicle'
df = df.reset_index()
predicted = model.predict(df[all_features])
print predicted
def pivot_stations(df, engine):
query = """
SELECT
d.*,
s.name AS arrivalname
FROM distance d INNER JOIN station s ON d.stationto = s.id
"""
distances = pd.read_sql_query(query, con=engine)
stations = distances['arrivalname'].unique().tolist()
dist_pivot = pd.pivot_table(distances, values='distance', index=['stationfrom', 'date', 'trip'],
columns=['arrivalname'], aggfunc=np.mean)
dist_pivot = dist_pivot.reindex(df.index.rename(['stationfrom', 'date', 'vehicle']))
df = df.join(dist_pivot, how='outer')
return df, stations
def select_fromindexs(dbname,field,indexs,**kwargs):
normal_db = False
tablename = False
# handling even if indexs arent in str format
if type(indexs[0]) == int:
indexs = [str(row) for row in indexs]
for key,value in kwargs.iteritems():
if key == 'size':
size = value
if key == 'normal_db':
normal_db = value
if key == 'tablename':
tablename = value
a,engine = make_query(dbname,tablename=tablename,normal_db=normal_db)
stringindexs = ','.join(indexs)
if not tablename == False:
dbname = tablename
# now making querry
query = '''SELECT * FROM %s WHERE %s IN (%s);''' % (dbname,field,stringindexs)
return pd.read_sql_query(query,engine)
def select_fromindexs(dbname,field,indexs,**kwargs):
normal_db = False
tablename = False
# handling even if indexs arent in str format
if type(indexs[0]) == int:
indexs = [str(row) for row in indexs]
for key,value in kwargs.iteritems():
if key == 'size':
size = value
if key == 'normal_db':
normal_db = value
if key == 'tablename':
tablename = value
a,engine = make_query(dbname,tablename=tablename,normal_db=normal_db)
stringindexs = ','.join(indexs)
if not tablename == False:
dbname = tablename
# now making querry
query = '''SELECT * FROM %s WHERE %s IN (%s);''' % (dbname,field,stringindexs)
return pd.read_sql_query(query,engine)
def runCode(self, code):
errors = None
output = undefined
try:
if code.lstrip().upper().startswith('SELECT '):
output = pandas.read_sql_query(code, self._connection)
else:
self._connection.execute(code)
except Exception as exc:
errors = [{
'line': 0,
'column': 0,
'message': str(exc)
}]
return {
'errors': errors,
'output': None if output is undefined else pack(output)
}
def query_object(self, starnames, key='*'):
"""
Get information about the given star.
Parameters:
===========
starnames: string, or iterable of strings
The name(s) of the star.
key: string, default='*' (return everything)
What data do you want? Can be anything that sql will take
Returns:
========
A pandas DataFrame with the given information for each star
"""
if isinstance(starnames, str):
starnames = [starnames,]
starnames = ["'{}'".format(n) for n in starnames]
name_list = '(' + ', '.join(starnames) + ')'
sql_query = "SELECT {} FROM star WHERE name IN {}".format(key, name_list)
print(sql_query)
df = pd.read_sql_query(sql_query, self.db_con)
return df