我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用pandas.pivot_table()。
def data_as_triangle(self, inplace=False): """Method to convert tabular form to triangle form. Arguments: inplace: bool Set to True will update the instance data attribute inplace Returns: Updated instance `data` parameter if inplace is set to True otherwise it returns a pandas.DataFrame """ if self.dataform == 'tabular': tri = pivot_table(self.data, values=self.values, index=[ self.origin], columns=[self.dev]).sort_index() tri.columns = [str(item) for item in tri.columns] if inplace == True: self.data = tri self.dataform = 'triangle' return tri else: return self.data
def test_pivot_dtypes(self): # can convert dtypes f = DataFrame({'a': ['cat', 'bat', 'cat', 'bat'], 'v': [ 1, 2, 3, 4], 'i': ['a', 'b', 'a', 'b']}) self.assertEqual(f.dtypes['v'], 'int64') z = pivot_table(f, values='v', index=['a'], columns=[ 'i'], fill_value=0, aggfunc=np.sum) result = z.get_dtype_counts() expected = Series(dict(int64=2)) tm.assert_series_equal(result, expected) # cannot convert dtypes f = DataFrame({'a': ['cat', 'bat', 'cat', 'bat'], 'v': [ 1.5, 2.5, 3.5, 4.5], 'i': ['a', 'b', 'a', 'b']}) self.assertEqual(f.dtypes['v'], 'float64') z = pivot_table(f, values='v', index=['a'], columns=[ 'i'], fill_value=0, aggfunc=np.mean) result = z.get_dtype_counts() expected = Series(dict(float64=2)) tm.assert_series_equal(result, expected)
def test_pivot_multi_functions(self): f = lambda func: pivot_table(self.data, values=['D', 'E'], index=['A', 'B'], columns='C', aggfunc=func) result = f([np.mean, np.std]) means = f(np.mean) stds = f(np.std) expected = concat([means, stds], keys=['mean', 'std'], axis=1) tm.assert_frame_equal(result, expected) # margins not supported?? f = lambda func: pivot_table(self.data, values=['D', 'E'], index=['A', 'B'], columns='C', aggfunc=func, margins=True) result = f([np.mean, np.std]) means = f(np.mean) stds = f(np.std) expected = concat([means, stds], keys=['mean', 'std'], axis=1) tm.assert_frame_equal(result, expected)
def test_pivot_table_with_margins_set_margin_name(self): # GH 3335 for margin_name in ['foo', 'one', 666, None, ['a', 'b']]: with self.assertRaises(ValueError): # multi-index index pivot_table(self.data, values='D', index=['A', 'B'], columns=['C'], margins=True, margins_name=margin_name) with self.assertRaises(ValueError): # multi-index column pivot_table(self.data, values='D', index=['C'], columns=['A', 'B'], margins=True, margins_name=margin_name) with self.assertRaises(ValueError): # non-multi-index index/column pivot_table(self.data, values='D', index=['A'], columns=['B'], margins=True, margins_name=margin_name)
def test_pivot_table_with_iterator_values(self): # GH 12017 aggs = {'D': 'sum', 'E': 'mean'} pivot_values_list = pd.pivot_table( self.data, index=['A'], values=list(aggs.keys()), aggfunc=aggs, ) pivot_values_keys = pd.pivot_table( self.data, index=['A'], values=aggs.keys(), aggfunc=aggs, ) tm.assert_frame_equal(pivot_values_keys, pivot_values_list) agg_values_gen = (value for value in aggs.keys()) pivot_values_gen = pd.pivot_table( self.data, index=['A'], values=agg_values_gen, aggfunc=aggs, ) tm.assert_frame_equal(pivot_values_gen, pivot_values_list)
def generate_overlaps(all_info): b = all_info b['short_style'] = b['style'].apply(lambda x : str(x).lower().replace('art ', '').replace(' art', '')) b['short_style'] = b['short_style'].apply(lambda x : x.split(' ')[-1]) b['short_style'] = b['short_style'].apply(lambda x : x.replace('(', '').replace(')', '').lower()) q = b.groupby('short_style').artist.nunique().reset_index() np.mean(q.artist > 1) d = b.groupby(['artist', 'short_style']).size().reset_index() e = pd.pivot_table(d, index='artist', columns='short_style', values=0, fill_value=0) f = 1.0*e.iloc[:, 1:].div(e.iloc[:, 1:].sum(axis=1), axis=0) n = f.shape[0] overlaps = np.ones((n,n)) for i in xrange(1, n): for j in xrange(i+1, n): overlaps[i,j] = np.sum(f.iloc[i, :].values * f.iloc[j, :].values) overlaps[j,i] = overlaps[i,j] return overlaps, f.index
def create_answer_matrix(data, user_column, item_column, value_column, aggfunc=np.mean, time_column=None): if time_column: # select only the first response data = data.loc[data.groupby([item_column, user_column])[time_column].idxmin()] data = data.drop_duplicates(subset=[item_column, user_column]) answers = pd.pivot_table(data, values=[value_column], index=[user_column], columns=[item_column], aggfunc=aggfunc) if not answers.empty: answers = answers[value_column] return answers # TODO: add Cronbach's alpha to item statistics # see http://stackoverflow.com/questions/20799403/improving-performance-of-cronbach-alpha-code-python-numpy
def make_df_stitched(df_meeting): if df_meeting is not None: #df_meeting = pd.pivot_table(df_meeting.reset_index(), index="datetime", columns = "member", values = "signal").dropna() df_meeting = pd.pivot_table(df_meeting.reset_index(), index="datetime", columns="member", values="signal").fillna(False) #Expected input: A dataframe with a datetime index and one column per badge. df_is_speech = is_speaking(df_meeting) df_stitched = make_stitched(df_is_speech) return df_stitched else: return "No meeting data" #takes in df from make_df_stitched
def _create_response_matrix(self): LOGGER.info("Creating response matrix.") df = pd.DataFrame({ 'index': self.train_df.index, 'y': self.train_df[self.response]}) Y = pd.pivot_table(df, index='index', columns=['y'], aggfunc=len, fill_value=0).as_matrix() self.response = Y self.J = self.response.shape[1] LOGGER.info("Created response matrix with shape (%d, %d)", self.response.shape[0], self.response.shape[1])
def pivot_stations(df, engine): query = """ SELECT d.*, s.name AS arrivalname FROM distance d INNER JOIN station s ON d.stationto = s.id """ distances = pd.read_sql_query(query, con=engine) stations = distances['arrivalname'].unique().tolist() dist_pivot = pd.pivot_table(distances, values='distance', index=['stationfrom', 'date', 'trip'], columns=['arrivalname'], aggfunc=np.mean) dist_pivot = dist_pivot.reindex(df.index.rename(['stationfrom', 'date', 'vehicle'])) df = df.join(dist_pivot, how='outer') return df, stations
def load_alignment_evaluation(db_path): """ Loads the transMap alignment evaluation table :param db_path: path to genome database :return: DataFrame """ engine = create_engine('sqlite:///' + db_path) df = pd.read_sql_table(TmEval.__tablename__, engine) df = pd.pivot_table(df, index=['TranscriptId', 'AlignmentId'], columns='classifier', values='value') return df.reset_index()
def test_pivot_table(self): raw_cat1 = Categorical(["a", "a", "b", "b"], categories=["a", "b", "z"], ordered=True) raw_cat2 = Categorical(["c", "d", "c", "d"], categories=["c", "d", "y"], ordered=True) df = DataFrame({"A": raw_cat1, "B": raw_cat2, "values": [1, 2, 3, 4]}) result = pd.pivot_table(df, values='values', index=['A', 'B']) expected = Series([1, 2, np.nan, 3, 4, np.nan, np.nan, np.nan, np.nan], index=pd.MultiIndex.from_product( [['a', 'b', 'z'], ['c', 'd', 'y']], names=['A', 'B']), name='values') tm.assert_series_equal(result, expected)
def test_pivot_table(self): index = ['A', 'B'] columns = 'C' table = pivot_table(self.data, values='D', index=index, columns=columns) table2 = self.data.pivot_table( values='D', index=index, columns=columns) tm.assert_frame_equal(table, table2) # this works pivot_table(self.data, values='D', index=index) if len(index) > 1: self.assertEqual(table.index.names, tuple(index)) else: self.assertEqual(table.index.name, index[0]) if len(columns) > 1: self.assertEqual(table.columns.names, columns) else: self.assertEqual(table.columns.name, columns[0]) expected = self.data.groupby( index + [columns])['D'].agg(np.mean).unstack() tm.assert_frame_equal(table, expected)
def test_pivot_table_nocols(self): df = DataFrame({'rows': ['a', 'b', 'c'], 'cols': ['x', 'y', 'z'], 'values': [1, 2, 3]}) rs = df.pivot_table(columns='cols', aggfunc=np.sum) xp = df.pivot_table(index='cols', aggfunc=np.sum).T tm.assert_frame_equal(rs, xp) rs = df.pivot_table(columns='cols', aggfunc={'values': 'mean'}) xp = df.pivot_table(index='cols', aggfunc={'values': 'mean'}).T tm.assert_frame_equal(rs, xp)
def test_pivot_table_dropna(self): df = DataFrame({'amount': {0: 60000, 1: 100000, 2: 50000, 3: 30000}, 'customer': {0: 'A', 1: 'A', 2: 'B', 3: 'C'}, 'month': {0: 201307, 1: 201309, 2: 201308, 3: 201310}, 'product': {0: 'a', 1: 'b', 2: 'c', 3: 'd'}, 'quantity': {0: 2000000, 1: 500000, 2: 1000000, 3: 1000000}}) pv_col = df.pivot_table('quantity', 'month', [ 'customer', 'product'], dropna=False) pv_ind = df.pivot_table( 'quantity', ['customer', 'product'], 'month', dropna=False) m = MultiIndex.from_tuples([(u('A'), u('a')), (u('A'), u('b')), (u('A'), u('c')), (u('A'), u('d')), (u('B'), u('a')), (u('B'), u('b')), (u('B'), u('c')), (u('B'), u('d')), (u('C'), u('a')), (u('C'), u('b')), (u('C'), u('c')), (u('C'), u('d'))]) assert_equal(pv_col.columns.values, m.values) assert_equal(pv_ind.index.values, m.values)
def test_pass_array(self): result = self.data.pivot_table( 'D', index=self.data.A, columns=self.data.C) expected = self.data.pivot_table('D', index='A', columns='C') tm.assert_frame_equal(result, expected)
def test_pivot_table_multiple(self): index = ['A', 'B'] columns = 'C' table = pivot_table(self.data, index=index, columns=columns) expected = self.data.groupby(index + [columns]).agg(np.mean).unstack() tm.assert_frame_equal(table, expected)
def test_pivot_multi_values(self): result = pivot_table(self.data, values=['D', 'E'], index='A', columns=['B', 'C'], fill_value=0) expected = pivot_table(self.data.drop(['F'], axis=1), index='A', columns=['B', 'C'], fill_value=0) tm.assert_frame_equal(result, expected)
def test_pivot_integer_columns(self): # caused by upstream bug in unstack d = date.min data = list(product(['foo', 'bar'], ['A', 'B', 'C'], ['x1', 'x2'], [d + timedelta(i) for i in range(20)], [1.0])) df = DataFrame(data) table = df.pivot_table(values=4, index=[0, 1, 3], columns=[2]) df2 = df.rename(columns=str) table2 = df2.pivot_table( values='4', index=['0', '1', '3'], columns=['2']) tm.assert_frame_equal(table, table2, check_names=False)
def test_pivot_complex_aggfunc(self): f = {'D': ['std'], 'E': ['sum']} expected = self.data.groupby(['A', 'B']).agg(f).unstack('B') result = self.data.pivot_table(index='A', columns='B', aggfunc=f) tm.assert_frame_equal(result, expected)
def test_margins_no_values_no_cols(self): # Regression test on pivot table: no values or cols passed. result = self.data[['A', 'B']].pivot_table( index=['A', 'B'], aggfunc=len, margins=True) result_list = result.tolist() self.assertEqual(sum(result_list[:-1]), result_list[-1])
def test_margins_no_values_two_rows(self): # Regression test on pivot table: no values passed but rows are a # multi-index result = self.data[['A', 'B', 'C']].pivot_table( index=['A', 'B'], columns='C', aggfunc=len, margins=True) self.assertEqual(result.All.tolist(), [3.0, 1.0, 4.0, 3.0, 11.0])
def test_margins_no_values_one_row_one_col(self): # Regression test on pivot table: no values passed but row and col # defined result = self.data[['A', 'B']].pivot_table( index='A', columns='B', aggfunc=len, margins=True) self.assertEqual(result.All.tolist(), [4.0, 7.0, 11.0])
def test_crosstab_pass_values(self): a = np.random.randint(0, 7, size=100) b = np.random.randint(0, 3, size=100) c = np.random.randint(0, 5, size=100) values = np.random.randn(100) table = crosstab([a, b], c, values, aggfunc=np.sum, rownames=['foo', 'bar'], colnames=['baz']) df = DataFrame({'foo': a, 'bar': b, 'baz': c, 'values': values}) expected = df.pivot_table('values', index=['foo', 'bar'], columns='baz', aggfunc=np.sum) tm.assert_frame_equal(table, expected)
def table_pivots(pivot_pairs, df, outputs_dir): for a, b in pivot_pairs: count_matrix = DataFeaturesAnalysis.pivot_table(a, b, df) name = DataFeaturesAnalysis.join_names((a, b)) file_path = outputs_dir + name + '.html' DataAnalysis.save_df_as_html(count_matrix, file_path)
def pivot_table(a, b, df): return pd.pivot_table( df.loc[:, (a, b)], index=a, columns=b, aggfunc=len, fill_value=0 )
def run_data_composition_analyses_for_rsmeval(df_test_metadata, df_test_excluded, subgroups, candidate_column, exclude_zero_scores=True, exclude_listwise=False): """ Similar to `run_data_composition_analyses_for_rsmtool()` but for RSMEval. """ # analyze excluded responses df_test_excluded_analysis = analyze_excluded_responses(df_test_excluded, ['raw'], 'Human/System', exclude_zero_scores=exclude_zero_scores, exclude_listwise=exclude_listwise) # rename the columns and index in the analysis data frame df_test_excluded_analysis.rename(columns={'all features numeric': 'numeric system score', 'non-numeric feature values': 'non-numeric system score'}, inplace=True) df_data_composition = analyze_used_predictions(df_test_metadata, subgroups, candidate_column) # create contingency table for each group data_composition_by_group_dict = {} for grouping_variable in subgroups: series_crosstab_group = pd.pivot_table(df_test_metadata, values='spkitemid', index=[grouping_variable], aggfunc=len) df_crosstab_group = pd.DataFrame(series_crosstab_group) df_crosstab_group.insert(0, grouping_variable, df_crosstab_group.index) df_crosstab_group.rename(columns={'spkitemid': 'N responses'}, inplace=True) data_composition_by_group_dict[grouping_variable] = df_crosstab_group return(df_test_excluded_analysis, df_data_composition, data_composition_by_group_dict)
def unstack_report(report): """Unstack performance report. Reshapes a :class:`pandas.DataFrame` of :func:`evaluate_outputs` such that performance metrics are listed as columns. Parameters ---------- report: :class:`pandas.DataFrame` :class:`pandas.DataFrame` from :func:`evaluate_outputs`. Returns ------- :class:`pandas.DataFrame` :class:`pandas.DataFrame` with performance metrics as columns. """ index = list(report.columns[~report.columns.isin(['metric', 'value'])]) report = pd.pivot_table(report, index=index, columns='metric', values='value') report.reset_index(index, inplace=True) report.columns.name = None # Sort columns columns = list(report.columns) sorted_columns = [] for fun in CAT_METRICS + CLA_METRICS + REG_METRICS: for i, column in enumerate(columns): if column.startswith(fun.__name__): sorted_columns.append(column) sorted_columns = index + sorted_columns sorted_columns += [col for col in columns if col not in sorted_columns] report = report[sorted_columns] order = [] if 'auc' in report.columns: order.append(('auc', False)) elif 'mse' in report.columns: order.append(('mse', True)) elif 'acc' in report.columns: order.append(('acc', False)) report.sort_values([x[0] for x in order], ascending=[x[1] for x in order], inplace=True) return report
def test_pivot_columns_lexsorted(self): n = 10000 dtype = np.dtype([ ("Index", object), ("Symbol", object), ("Year", int), ("Month", int), ("Day", int), ("Quantity", int), ("Price", float), ]) products = np.array([ ('SP500', 'ADBE'), ('SP500', 'NVDA'), ('SP500', 'ORCL'), ('NDQ100', 'AAPL'), ('NDQ100', 'MSFT'), ('NDQ100', 'GOOG'), ('FTSE', 'DGE.L'), ('FTSE', 'TSCO.L'), ('FTSE', 'GSK.L'), ], dtype=[('Index', object), ('Symbol', object)]) items = np.empty(n, dtype=dtype) iproduct = np.random.randint(0, len(products), n) items['Index'] = products['Index'][iproduct] items['Symbol'] = products['Symbol'][iproduct] dr = pd.date_range(date(2000, 1, 1), date(2010, 12, 31)) dates = dr[np.random.randint(0, len(dr), n)] items['Year'] = dates.year items['Month'] = dates.month items['Day'] = dates.day items['Price'] = np.random.lognormal(4.0, 2.0, n) df = DataFrame(items) pivoted = df.pivot_table('Price', index=['Month', 'Day'], columns=['Index', 'Symbol', 'Year'], aggfunc='mean') self.assertTrue(pivoted.columns.is_monotonic)
def test_pivot_datetime_tz(self): dates1 = ['2011-07-19 07:00:00', '2011-07-19 08:00:00', '2011-07-19 09:00:00', '2011-07-19 07:00:00', '2011-07-19 08:00:00', '2011-07-19 09:00:00'] dates2 = ['2013-01-01 15:00:00', '2013-01-01 15:00:00', '2013-01-01 15:00:00', '2013-02-01 15:00:00', '2013-02-01 15:00:00', '2013-02-01 15:00:00'] df = DataFrame({'label': ['a', 'a', 'a', 'b', 'b', 'b'], 'dt1': dates1, 'dt2': dates2, 'value1': np.arange(6, dtype='int64'), 'value2': [1, 2] * 3}) df['dt1'] = df['dt1'].apply(lambda d: pd.Timestamp(d, tz='US/Pacific')) df['dt2'] = df['dt2'].apply(lambda d: pd.Timestamp(d, tz='Asia/Tokyo')) exp_idx = pd.DatetimeIndex(['2011-07-19 07:00:00', '2011-07-19 08:00:00', '2011-07-19 09:00:00'], tz='US/Pacific', name='dt1') exp_col1 = Index(['value1', 'value1']) exp_col2 = Index(['a', 'b'], name='label') exp_col = MultiIndex.from_arrays([exp_col1, exp_col2]) expected = DataFrame([[0, 3], [1, 4], [2, 5]], index=exp_idx, columns=exp_col) result = pivot_table(df, index=['dt1'], columns=[ 'label'], values=['value1']) tm.assert_frame_equal(result, expected) exp_col1 = Index(['sum', 'sum', 'sum', 'sum', 'mean', 'mean', 'mean', 'mean']) exp_col2 = Index(['value1', 'value1', 'value2', 'value2'] * 2) exp_col3 = pd.DatetimeIndex(['2013-01-01 15:00:00', '2013-02-01 15:00:00'] * 4, tz='Asia/Tokyo', name='dt2') exp_col = MultiIndex.from_arrays([exp_col1, exp_col2, exp_col3]) expected = DataFrame(np.array([[0, 3, 1, 2, 0, 3, 1, 2], [1, 4, 2, 1, 1, 4, 2, 1], [2, 5, 1, 2, 2, 5, 1, 2]], dtype='int64'), index=exp_idx, columns=exp_col) result = pivot_table(df, index=['dt1'], columns=['dt2'], values=['value1', 'value2'], aggfunc=[np.sum, np.mean]) tm.assert_frame_equal(result, expected)
def test_pivot_dtaccessor(self): # GH 8103 dates1 = ['2011-07-19 07:00:00', '2011-07-19 08:00:00', '2011-07-19 09:00:00', '2011-07-19 07:00:00', '2011-07-19 08:00:00', '2011-07-19 09:00:00'] dates2 = ['2013-01-01 15:00:00', '2013-01-01 15:00:00', '2013-01-01 15:00:00', '2013-02-01 15:00:00', '2013-02-01 15:00:00', '2013-02-01 15:00:00'] df = DataFrame({'label': ['a', 'a', 'a', 'b', 'b', 'b'], 'dt1': dates1, 'dt2': dates2, 'value1': np.arange(6, dtype='int64'), 'value2': [1, 2] * 3}) df['dt1'] = df['dt1'].apply(lambda d: pd.Timestamp(d)) df['dt2'] = df['dt2'].apply(lambda d: pd.Timestamp(d)) result = pivot_table(df, index='label', columns=df['dt1'].dt.hour, values='value1') exp_idx = Index(['a', 'b'], name='label') expected = DataFrame({7: [0, 3], 8: [1, 4], 9: [2, 5]}, index=exp_idx, columns=Index([7, 8, 9], name='dt1')) tm.assert_frame_equal(result, expected) result = pivot_table(df, index=df['dt2'].dt.month, columns=df['dt1'].dt.hour, values='value1') expected = DataFrame({7: [0, 3], 8: [1, 4], 9: [2, 5]}, index=Index([1, 2], name='dt2'), columns=Index([7, 8, 9], name='dt1')) tm.assert_frame_equal(result, expected) result = pivot_table(df, index=df['dt2'].dt.year.values, columns=[df['dt1'].dt.hour, df['dt2'].dt.month], values='value1') exp_col = MultiIndex.from_arrays( [[7, 7, 8, 8, 9, 9], [1, 2] * 3], names=['dt1', 'dt2']) expected = DataFrame(np.array([[0, 3, 1, 4, 2, 5]], dtype='int64'), index=[2013], columns=exp_col) tm.assert_frame_equal(result, expected) result = pivot_table(df, index=np.array(['X', 'X', 'X', 'X', 'Y', 'Y']), columns=[df['dt1'].dt.hour, df['dt2'].dt.month], values='value1') expected = DataFrame(np.array([[0, 3, 1, np.nan, 2, np.nan], [np.nan, np.nan, np.nan, 4, np.nan, 5]]), index=['X', 'Y'], columns=exp_col) tm.assert_frame_equal(result, expected)
def plot_doy_heatmap(data, cmap='nipy_spectral', vmin=None, vmax=None, overlay=None, title=None, figsize=(7.0, 3.0)): """ Create a day-of-year (X-axis) vs. time-of-day (Y-axis) heatmap. Parameters ---------- data : pandas DataFrame or pandas Series Data (single column), indexed by time cmap : string (optional) Colomap, default = nipy_spectral vmin : float (optional) Colomap minimum, default = None (autoscale) vmax : float (optional) Colomap maximum, default = None (autoscale) overlay : pandas DataFrame (optional) Data to overlay on the heatmap. Time index should be in day-of-year (X-axis) Values should be in time-of-day in minutes (Y-axis) title : string (optional) Title, default = None figsize : tuple (optional) Figure size, default = (7.0, 3.0) """ if type(data) is pd.core.series.Series: data = data.to_frame() # Convert data to a pivot table col_name = data.columns[0] data['X'] = data.index.dayofyear data['Y'] = data.index.hour*60 + \ data.index.minute + \ data.index.second/60 + \ data.index.microsecond/(60*1000000.0) piv = pd.pivot_table(data,values=col_name,index=['Y'],columns=['X'],fill_value=np.NaN) # Create the heatmap plt.figure(figsize = figsize) fig, ax = plt.subplots(figsize=figsize) im = ax.imshow(piv, cmap=cmap, aspect='auto', vmin=vmin, vmax=vmax, extent=[data['X'].min()-0.5,data['X'].max()+0.5, data['Y'].max()-0.5,data['Y'].min()+0.5]) fig.colorbar(im, ax=ax) # Add overlay if type(overlay) is pd.core.frame.DataFrame: overlay.plot(ax=ax) # Add title and labels if title: ax.set_title(title) ax.set_xlabel("Day of the year") ax.set_ylabel("Time of day (minutes)") plt.tight_layout()