我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.stats.linregress()。
def regression(nx, ny): """ Parameters ========== specturm : pd.series Pandas Series object nodes : list of nodes to be used for the continuum Returns ======= corrected : array Continuum corrected array continuum : array The continuum used to correct the data x : array The potentially truncated x values """ m, b, r_value, p_value, stderr = ss.linregress(nx, ny) c = m * nx + b return c
def test_alpha(self, returns, benchmark, expected): observed = self.empyrical.alpha(returns, benchmark) assert_almost_equal( observed, expected, DECIMAL_PLACES) if len(returns) == len(benchmark): # Compare to scipy linregress returns_arr = returns.values benchmark_arr = benchmark.values mask = ~np.isnan(returns_arr) & ~np.isnan(benchmark_arr) slope, intercept, _, _, _ = stats.linregress(benchmark_arr[mask], returns_arr[mask]) assert_almost_equal( observed, intercept * 252, DECIMAL_PLACES ) # Alpha/beta translation tests.
def test_beta(self, returns, benchmark, expected): observed = self.empyrical.beta(returns, benchmark) assert_almost_equal( observed, expected, DECIMAL_PLACES) if len(returns) == len(benchmark): # Compare to scipy linregress returns_arr = returns.values benchmark_arr = benchmark.values mask = ~np.isnan(returns_arr) & ~np.isnan(benchmark_arr) slope, intercept, _, _, _ = stats.linregress(benchmark_arr[mask], returns_arr[mask]) assert_almost_equal( observed, slope )
def Regress(self): if len(self._independent_data) == 0: print('Loaded no data for contest "%s" party "%s"' % (self._independent, self._party)) return if len(self._dependent_data) == 0: print('Loaded no data for contest "%s" party "%s"' % (self._dependent, self._party)) return y = [] x = [] for precinct,votes in self._independent_data.iteritems(): if precinct not in self._dependent_data: continue x.append(votes) y.append(self._dependent_data[precinct]) if not x or len(x) != len(y): print('Mismatched or empty data') return slope, intercept, r_value, p_value, std_err = stats.linregress(x,y) print('Slope=%6.4f Intercept=%6.4f R^2=%.4f p=%.6f' % (slope, intercept, r_value**2, p_value)) self._PrintResiduals(slope, intercept)
def correlation_plot(x, y, save_path, title, xlabel, ylabel): plt.scatter(x, y) slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) line_x = np.arange(x.min(), x.max()) line_y = slope*line_x + intercept plt.plot(line_x, line_y, label='$%.2fx + %.2f$, $R^2=%.2f$' % (slope, intercept, r_value**2)) plt.legend(loc='best') plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.tight_layout() plt.savefig(save_path) plt.clf() # clear figure plt.close()
def __get_tail_se(self): """ Method to produce the standard error of the Mack Chainladder model tail factor Returns: This calculation is consistent with the R calculation MackChainLadder$tail.se """ tailse = np.array(self.sigma[-2] / \ np.sqrt(self.full_triangle.iloc[0, -3]**self.alpha[-1])) if self.chainladder.tail == True: time_pd = self.__get_tail_weighted_time_period() fse = np.append(self.fse, tailse) x = np.array([i + 1 for i in range(len(fse))]) fse_reg = stats.linregress(x, np.log(fse)) tailse = np.append(tailse, np.exp(time_pd * fse_reg[0] + fse_reg[1])) else: tailse = np.append(tailse,0) return tailse
def __get_tail_weighted_time_period(self): """ Method to approximate the weighted-average development age assuming exponential tail fit. Returns: float32 """ #n = self.triangle.ncol-1 #y = self.f[:n] #x = np.array([i + 1 for i in range(len(y))]) #ldf_reg = stats.linregress(x, np.log(y - 1)) #time_pd = (np.log(self.f[n] - 1) - ldf_reg[1]) / ldf_reg[0] n = self.triangle.ncol-1 y = Series(self.f[:n]) x = [num+1 for num, item in enumerate(y)] y.index = x x = sm.add_constant((y.index)[y>1]) y = y[y>1] ldf_reg = sm.OLS(np.log(y-1),x).fit() time_pd = (np.log(self.f[n] - 1) - ldf_reg.params[0]) / ldf_reg.params[1] #tail_factor = np.exp(tail_model.params[0] + np.array([i+2 for i in range(n,n+100)]) * tail_model.params[1]).astype(float) + 1) return time_pd
def getModelDiffs(self, dependentContainer, playoffTeamsOnly=False): ## so generate our model here based on the data provided in each ## stat container, then output a new statContainer object with ## the values of the model diffs print "Consistency check between %s and %s containers is a %r" % (self.getShortStatName(), dependentContainer.getShortStatName(), consistencyCheck(self, dependentContainer)) x_values = self.getStat(playoffTeamsOnly) y_values = dependentContainer.getStat(playoffTeamsOnly) gradient, intercept, r_value, p_value, std_err = stats.linregress( x_values, y_values) modelDiffs = [] def modelValue(x, gradient, intercept): return (x*gradient + intercept) for i in range(0, len(x_values)): modelDiffs.append(y_values[i] - modelValue(x_values[i], gradient, intercept)) return statContainer("%sby%s Model Diffs" % (dependentContainer.getShortStatName(), self.getShortStatName()), "Deltas from the Model for %s by %s, gradient=%.3f, intercept=%.3f" % (dependentContainer.getLongStatName, self.getShortStatName(), gradient, intercept), modelDiffs, self.getTeamIds(playoffTeamsOnly), self.getTeamNames(playoffTeamsOnly), self.getYears(playoffTeamsOnly), self.getMadePlayoffsList(playoffTeamsOnly))
def corrPlotV(dv_true,dv_pred,sv_true,sv_pred,teamName,outName): slope, intercept, rvalue, pvalue, std_err = stats.linregress(np.append(dv_true,sv_true),np.append(dv_pred,sv_pred)) plt.scatter(dv_true,dv_pred,label='Diastolic Volume',marker='o',facecolors='none',edgecolors='r') plt.scatter(sv_true,sv_pred,label='Systolic Volume',marker='o',facecolors='none',edgecolors='b') plt.xlabel("True Volume (mL)") plt.ylabel("Predicted Volume (mL)") plt.title("%s\nCorrelation of Volume Predictions with Test Values" % teamName) x = np.linspace(0,500,10) plt.plot(x,x,color='k',label='guide y = x') plt.plot(x,x*slope+intercept,color = 'k',linestyle='--',label='y=%.2fx+%.2f\n$R^2=$%.3f p=%.2e' % (slope,intercept,rvalue**2,pvalue)) plt.gca().set_xlim((0,500)) plt.gca().set_ylim((0,500)) plt.legend(loc='upper left') plt.grid() plt.savefig("%sCorrVols.png" % outName) plt.close() #plotting prediction vs truth... EF
def corrPlotEF(true,pred,teamName,outName): slope, intercept, rvalue, pvalue, std_err = stats.linregress(true*100.0,pred*100.0) plt.scatter(true*100.0,pred*100.0,marker='x',color='#990099',label="Ejection Fraction") x = np.linspace(0,90,10) plt.plot(x,x,color='k',label='guide y = x') plt.plot(x,x*slope+intercept,color = 'k',linestyle='--',label='y=%.2fx+%.2f\n$R^2=$%.3f p=%.2e' % (slope,intercept,rvalue**2,pvalue)) plt.gca().set_xlim((0,90)) plt.gca().set_ylim((0,90)) plt.xlabel("True Ejection Fraction (%)") plt.ylabel("Predicted Ejection Fraction (%)") plt.title("%s\nCorrelation of EF Predictions with Test Values" % teamName) plt.legend(loc='upper left') plt.grid() plt.savefig("%sCorrEF.png" % outName) plt.close() ################## # # Bland - Altman plots # #plotting Bland-Altman for dv and sv
def plot2dRegression(x,y, nameX, nameY, namePlot): model = LinearRegression() linearModel = model.fit(x, y) predictModel = linearModel.predict(x) plt.scatter(x,y, color='g') plt.plot(x, predictModel, color='k') plt.xlabel(nameX) plt.ylabel(nameY) test = stats.linregress(predictModel,y) print("The squared of the correlation coefficient R^2 is " + str(test.rvalue**2)) plt.savefig("plot/loadings/"+namePlot, bbox_inches='tight') plt.show() return test.rvalue**2 #plot the 2D regression between the performance values and the loadings. #return the correlation factor: R squared
def plot_corr(param_1, param_2, title="", x_label="", y_label="", legend=[]): """ It creates a graph with all the time series of the list parameters. :param title: Title of the graph. :param x_label: X label of the graph. :param y_label: Y label of the graph. :param legend: Labels to show in the legend. :param param_1: Parameter to correlate. :param param_2: Parameter to correlate. :return: The graph. """ slope, intercept, r_value, p_value, std_err = stats.linregress(param_1.values, param_2.values) fig_correlation, axes = plt.subplots(nrows=1, ncols=1) axes.plot(param_1.values, param_2.values, marker='.', linestyle="") axes.plot(param_1.values, param_1.values * slope + intercept) axes.set_title(title) axes.set_xlabel(x_label) axes.set_ylabel(y_label) legend.append("$y = {:.2f}x+{:.2f}$ $r^2={:.2f}$".format(slope, intercept, r_value ** 2)) axes.legend(legend, loc='best') return fig_correlation
def test_alpha_beta_equality(self, returns, benchmark): alpha, beta = self.empyrical( pandas_only=len(returns) != len(benchmark), return_types=tuple, ).alpha_beta(returns, benchmark) assert_almost_equal( alpha, self.empyrical.alpha(returns, benchmark), DECIMAL_PLACES) assert_almost_equal( beta, self.empyrical.beta(returns, benchmark), DECIMAL_PLACES) if len(returns) == len(benchmark): # Compare to scipy linregress returns_arr = returns.values benchmark_arr = benchmark.values mask = ~np.isnan(returns_arr) & ~np.isnan(benchmark_arr) slope, intercept, _, _, _ = stats.linregress(returns_arr[mask], benchmark_arr[mask]) assert_almost_equal( alpha, intercept ) assert_almost_equal( beta, slope )
def stability_of_timeseries(returns): """Determines R-squared of a linear fit to the cumulative log returns. Computes an ordinary least squares linear fit, and returns R-squared. Parameters ---------- returns : pd.Series or np.ndarray Daily returns of the strategy, noncumulative. - See full explanation in :func:`~empyrical.stats.cum_returns`. Returns ------- float R-squared. """ if len(returns) < 2: return np.nan returns = np.asanyarray(returns) returns = returns[~np.isnan(returns)] cum_log_returns = np.log1p(returns).cumsum() rhat = stats.linregress(np.arange(len(cum_log_returns)), cum_log_returns)[2] return rhat ** 2
def calcFashionEvoFunc(pNow): ''' Calculates a new approximate dynamic rule for the evolution of the proportion of punks as a linear function and a "shock width". Parameters ---------- pNow : [float] List describing the history of the proportion of punks in the population. Returns ------- (unnamed) : FashionEvoFunc A new rule for the evolution of the population punk proportion, based on the history in input pNow. ''' pNowX = np.array(pNow) T = pNowX.size p_t = pNowX[100:(T-1)] p_tp1 = pNowX[101:T] pNextSlope, pNextIntercept, trash1, trash2, trash3 = stats.linregress(p_t,p_tp1) pPopExp = pNextIntercept + pNextSlope*p_t pPopErrSq= (pPopExp - p_tp1)**2 pNextStd = np.sqrt(np.mean(pPopErrSq)) print(str(pNextIntercept) + ', ' + str(pNextSlope) + ', ' + str(pNextStd)) return FashionEvoFunc(pNextIntercept,pNextSlope,2*pNextStd) ############################################################################### ###############################################################################
def linear_regression(x_vals, y_vals): ''' least-squares regression of scipy ''' a_value, b_value, r_value, p_value, std_err = linregress(x_vals, y_vals) est_yvals = a_value * x_vals + b_value k = 1 / a_value plot.plot(x_vals, est_yvals, label='Least-squares fit, k = ' + str(round(k)) + ", RSquare = " + str(r_value**2))
def model_test(model, X_test, y_test,outputfile): #net.load_params_from('/path/to/weights_file') y_pred = model.predict(X_test) print stats.linregress(y_test,y_pred[:,0]) hkl.dump([y_pred[:,0],y_test],outputfile) #save model parameters
def linregress_ting(bst, tuningcurve, n_shuffles=250): """perform linear regression on all the events in bst, and return the R^2 values""" if float(n_shuffles).is_integer: n_shuffles = int(n_shuffles) else: raise ValueError("n_shuffles must be an integer!") posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve) # bdries = np.insert(np.cumsum(bst.lengths), 0, 0) r2values = np.zeros(bst.n_epochs) r2values_shuffled = np.zeros((n_shuffles, bst.n_epochs)) for idx in range(bst.n_epochs): y = mode_pth[bdries[idx]:bdries[idx+1]] x = np.arange(bdries[idx],bdries[idx+1], step=1) x = x[~np.isnan(y)] y = y[~np.isnan(y)] if len(y) > 0: slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y) r2values[idx] = rvalue**2 else: r2values[idx] = np.nan # for ss in range(n_shuffles): if len(y) > 0: slope, intercept, rvalue, pvalue, stderr = stats.linregress(np.random.permutation(x), y) r2values_shuffled[ss, idx] = rvalue**2 else: r2values_shuffled[ss, idx] = np.nan # event contained NO decoded activity... unlikely or even impossible with current code # sig_idx = np.argwhere(r2values[0,:] > np.percentile(r2values, q=q, axis=0)) # np.argwhere(((R2[1:,:] >= R2[0,:]).sum(axis=0))/(R2.shape[0]-1)<0.05) # equivalent to above if n_shuffles > 0: return r2values, r2values_shuffled return r2values
def linregress_array(posterior): """perform linear regression on the posterior matrix, and return the slope, intercept, and R^2 value""" mode_pth = get_mode_pth_from_array(posterior) y = mode_pth x = np.arange(len(y)) x = x[~np.isnan(y)] y = y[~np.isnan(y)] if len(y) > 0: slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y) return slope, intercept, rvalue**2 else: return np.nan, np.nan, np.nan
def linregress_bst(bst, tuningcurve): """perform linear regression on all the events in bst, and return the slopes, intercepts, and R^2 values""" posterior, bdries, mode_pth, mean_pth = decode(bst=bst, ratemap=tuningcurve) slopes = np.zeros(bst.n_epochs) intercepts = np.zeros(bst.n_epochs) r2values = np.zeros(bst.n_epochs) for idx in range(bst.n_epochs): y = mode_pth[bdries[idx]:bdries[idx+1]] x = np.arange(bdries[idx],bdries[idx+1], step=1) x = x[~np.isnan(y)] y = y[~np.isnan(y)] if len(y) > 0: slope, intercept, rvalue, pvalue, stderr = stats.linregress(x, y) slopes[idx] = slope intercepts[idx] = intercept r2values[idx] = rvalue**2 else: slopes[idx] = np.nan intercepts[idx] = np.nan r2values[idx] = np.nan # # if bst.n_epochs == 1: # return np.asscalar(slopes), np.asscalar(intercepts), np.asscalar(r2values) return slopes, intercepts, r2values
def __get_tail_sigma(self): """ Method to produce the sigma of the Mack Chainladder model tail factor Returns: This calculation is consistent with the R calculation MackChainLadder$tail.sigma """ y = np.log(self.sigma[:len(self.triangle.data.columns[:-2])]) x = np.array([i + 1 for i in range(len(y))]) model = stats.linregress(x, y) tailsigma = np.exp((x[-1] + 1) * model[0] + model[1]) if model[3] > 0.05: # p-vale of slope parameter y = self.sigma tailsigma = np.sqrt( abs(min((y[-1]**4 / y[-2]**2), min(y[-2]**2, y[-1]**2)))) if self.chainladder.tail == True: time_pd = self.__get_tail_weighted_time_period() y = np.log(np.append(self.sigma,tailsigma)) x = np.array([i + 1 for i in range(len(y))]) sigma_reg = stats.linregress(x, y) tailsigma = np.append(tailsigma, np.exp(time_pd * sigma_reg[0] + sigma_reg[1])) else: tailsigma = np.append(tailsigma,0) return tailsigma
def get_stats(twoDarray): print(np.mean(twoDarray[0])) print(np.mean(twoDarray[1])) print(np.var(twoDarray[0])) print(np.var(twoDarray[1])) print(np.corrcoef(twoDarray[0], twoDarray[1])) print(stats.linregress(twoDarray[0], twoDarray[1]))
def wilson(data): ''' Calculate useful things like mass ratio and systemic velocity. Parameters ---------- data : list Radial velocity pairs in a 2D list. Returns ------- -slope : float Mass Ratio of the system. The ratio of the secondary component mass to the primary. intercept : float y-intercept of the line which fits data. stderr : float Standard error of the estimated gradient. ''' from scipy.stats import linregress # Primary RVs on y. y = [datum[1] for datum in data if not np.isnan(datum[1]+datum[2])] # Secondary RVs on x. x = [datum[2] for datum in data if not np.isnan(datum[1]+datum[2])] slope, intercept, rvalue, pvalue, stderr = linregress(x,y) return -slope, intercept, stderr
def load_calibration(): '''Load calibration data and calculate calibration values.''' # TODO: handle different calibration measurements, not just one for datadir global p1_cal, p2_cal try: # Load the variables in 'calibration.py'. calglobals = runpy.run_path( os.path.join(datadir, 'calibration.py') ) # Store the calibration data and the linear regression. p1_cal = dict(data=calglobals['p1_data']) try: p1_zero_idx = p1_cal['data']['refinputs'].index(0.0) p1_offset = p1_cal['data']['measurements'][p1_zero_idx] except IndexError: p1_offset = 0.0 p1_cal['regression'] = stats.linregress( np.array(p1_cal['data']['measurements']) - p1_offset, np.array(p1_cal['data']['refinputs']) ) p2_cal = dict(data=calglobals['p2_data']) try: p2_zero_idx = p2_cal['data']['refinputs'].index(0.0) p2_offset = p2_cal['data']['measurements'][p2_zero_idx] except IndexError: p2_offset = 0.0 p2_cal['regression'] = stats.linregress( np.array(p2_cal['data']['measurements']) - p2_offset, np.array(p2_cal['data']['refinputs']) ) except Exception as e: # TODO: print info/warning? print(e) p1_cal = None p2_cal = None print('p1_cal: ', p1_cal) print('p2_cal: ', p2_cal)
def getLinearModel(x_values, y_values, k=1.0, l=1.0): gradient, intercept, r_value, p_value, std_err = stats.linregress(x_values,y_values) y_model = [] grad = k*gradient interc = l*intercept for x in x_values: y = trendline(x, grad, interc) y_model.append(y) return y_model
def getLinearModel_rValue(x_values, y_values): gradient, intercept, r_value, p_value, std_err = stats.linregress(x_values,y_values) return r_value
def getLinearModel_pValue(x_values, y_values): gradient, intercept, r_value, p_value, std_err = stats.linregress(x_values,y_values) return p_value
def test_?_w(self): """ ?_w = [A][A^T]. See p. 533. DESCRIPTION: Since A is a free parameter, t will not necessarily recover the original A. ?_w is what really describes the covariance between cluster means (see p. 533), so that is what you want to test - it is 'closer' to the data. """ n_experiments = int(np.log10(1000000) / 2) n_list = [100 ** x for x in range(1, n_experiments + 1)] n_list = np.array(n_list).astype(float) n_dims = self.n_dims n_classes = 30 #self.n_classes ?_w_L1_errors = [] for n in n_list: A, ?, model = self.experiment(int(n), n_dims, n_classes) ?_w = np.matmul(A, A.T) ?_w_model = np.matmul(model.A, model.A.T) L1_error = np.abs(?_w - ?_w_model).mean() abs_? = (np.abs(?_w).mean() + np.abs(?_w_model).mean()) * .5 percent_error = L1_error / abs_? * 100 print('Testing ?_w with {} samples: {} percent error'.format(n, percent_error)) ?_w_L1_errors.append(percent_error) Y = ?_w_L1_errors X = [x for x in range(len(?_w_L1_errors))] slope_of_error_vs_N = linregress(X, Y)[0] self.assertTrue(slope_of_error_vs_N < 0)
def test_?_b(self): # """ ?_b = [A][?][A^T]. See p. 533. # # DESCRIPTION: Since A and ? are free parameters, they will not necessarily # recover the original A & ?. ?_b is what really describes # the covariance between cluster means (see p. 533), so that # is what you want to test - they are 'closer' to the data". # """ n_classes_list = [4 ** x for x in range(1, 6)] n_list = [100 * n for n in n_classes_list] n_list = np.array(n_list).astype(float) n_dims = self.n_dims ?_b_L1_errors = [] for n, n_classes in zip(n_list, n_classes_list): A, ?, model = self.experiment(int(n), n_dims, n_classes) ?_b = np.matmul(np.matmul(A, ?), A.T) ?_b_model = np.matmul(np.matmul(model.A, model.?), model.A.T) L1_error = np.abs(?_b - ?_b_model).mean() abs_? = (np.abs(?_b).mean() + np.abs(?_b_model).mean()) * .5 percent_error = L1_error / abs_? * 100 ?_b_L1_errors.append(percent_error) print('Testing ?_b with {} classes: {} percent error'.format( n_classes, percent_error)) Y = ?_b_L1_errors X = [x for x in range(len(?_b_L1_errors))] slope_of_error_vs_N = linregress(X, Y)[0] self.assertTrue(slope_of_error_vs_N < 0)
def linreg(x,y,units='PW/Sv'): """ Return linear regression model and plot label """ if len(x) > 1: slope, intercept, r_value, p_value, std_err = stats.linregress(x,y) y_model = x * slope + intercept label = '(%5.3f %s)' % (slope, units) else: y_model = None label = '' return y_model, label
def Get_Summary_Stats(df): slope, intercept, r_value, p_value, std_err = \ stats.linregress(df['act_ret_diff'], df['expec_ret_diff']) r2_value = r_value**2 df_stats = pd.DataFrame({'slope':[slope], 'intercept':[intercept], 'r2_value':[r2_value], 'p_value':[p_value], 'std_err':[std_err]}) df_stats.to_csv(cfg.DATA_PATH + cfg.CLEAN_DATA_FILE + '_summary_stats.csv', index=False)
def get_slopes(X, y): slope, __, __, __, __ = linregress(X, y) return slope
def fit_line(self): if self.received_data and self.xs.shape[0] > 0: # fit line to euclidean space laser data in the window of interest slope, intercept, r_val, p_val, std_err = stats.linregress(self.xs,self.ys) self.m = slope self.c = intercept # window the data, compute the line fit and associated control
def getAlphaBeta(self, interval=100): """Formula: (cov(dailyROR, marketROR)/var(marketROR)) or linear-regression:intercept, slope""" linreg = np.array([stats.linregress(self.marketROR[i:i+interval][:, 1].astype(float), self.dailyROR[i:i+interval][:, 1].astype(float)) for i in range(min(len(self.dailyROR), len(self.marketROR))-interval)]) Alpha = [(self.Date[i], linreg[i, 0]) for i in range(min(len(self.dailyROR), len(self.marketROR))-interval)] Beta = [(self.Date[i], linreg[i, 1]) for i in range(min(len(self.dailyROR), len(self.marketROR))-interval)] return np.array(Alpha), np.array(Beta)
def GetDeseasonalizedData(period, aodvalues, allLats, allLongs, months, meanAOD, rejectzeros=True, uprof=0): mlist=[] for m in months: if int(m.split('-')[0]) in period: mlist.append(m.split('-')[1]+m.split('-')[0]) mlist.sort() data = np.zeros((len(allLats), len(allLongs), len(mlist))) slopedata = np.zeros((len(allLats), len(allLongs))) interceptdata = np.zeros((len(allLats), len(allLongs))) for e in aodvalues: i=allLats.index(e.latitude) j=allLongs.index(e.longitude) if not rejectzeros : numcheck=isfloat(e.aod_12) else : numcheck=False if isfloat(e.aod_12): if float(e.aod_12)>0.0 : numcheck=True if e.month in period and numcheck and int(e.uprofiles)>=uprof: k=mlist.index(str(e.year)+str(e.month).zfill(2)) if meanAOD[i][j]>0 : data[i][j][k]=(float(e.aod_12)-meanAOD[i][j])/meanAOD[i][j]*100 x = np.arange(0,len(mlist)) for ind, e in np.ndenumerate(data[:,:,-1]) : slope, intercept, r_value, p_value, std_err = linregress(x,data[ind[0]][ind[1]]) slopedata[ind[0]][ind[1]]=slope interceptdata[ind[0]][ind[1]]=intercept return slopedata, interceptdata, data
def slope_create(F0,F1): # Calculate slopes (OLS) slope, intercept, r_value, p_value, std_err = stats.linregress(F0,F1) # Slope with robust regression x = sm.add_constant(F0) y = F1 rlm_results = sm.RLM(y,x, M=sm.robust.norms.HuberT()).fit() slope_r = rlm_results.params[-1] intercept_r = rlm_results.params[0] del x,y,rlm_results return slope, intercept, r_value, slope_r, intercept_r
def weighted_least_squares(X,Y,W): ''' Initialize power law fit EPS = 1e-10 use = (X>EPS)&(Y>EPS) weighted_least_squares(np.log(X+EPS)[use],np.log(Y+EPS)[use],1/(EPS+X[use])) Parameters ---------- X: List of distances Y: List of amplitudes W: Weights for points Returns ------- result : object Optimization result returned by scipy.optimize.minimize. See scipy.optimize documentation for details. ''' X = np.float64(X) Y = np.float64(Y) def objective(ab): a,b=(a,b) return np.sum( W*(Y-(X*a+b))**2) a,b,_,_,_ = linregress(X,Y) result = minimize(objective,[a,b]) if not result.success: print(result.message) warnings.warn('Optimization failed: %s'%result.message) return result
def fit_lm(y): x = np.array(range(len(y))) gradient, intercept, r_value, p_value, std_err = stats.linregress(x,y) fit_line = [(x_val * gradient) + intercept for x_val in x] return gradient, intercept, r_value, p_value, std_err, fit_line
def computeSlope(y): """compute slope of y best fit line to y under a linear regression""" x = np.arange(len(y)) y -= np.mean(y) slope, _, _, _, _ = stats.linregress(x, y) return slope
def calculate_breaking_points(quant_list): MAX_ERROR = 5 RANGE = 5 CENTER = 50 BRAKE_POINTS = dict() # right to left window for x_iter in range(100, CENTER, -1): x_proj = range(x_iter - RANGE, x_iter) # pylab.plot(x_proj, quant[x-RANGE:x], 'k',alpha=0.5) y_subset = quant_list[x_iter - RANGE:x_iter] slope, intercept, r_value, p_value, std_err = stats.linregress(x_proj, y_subset) g, l = calculate_error(slope, intercept, x_proj, y_subset) # print x-RANGE,x, slope, intercept, y[0], f(x, slope, intercept), l,r if l > MAX_ERROR: BRAKE_POINTS[x_iter - RANGE / 2] = {"error":l, "slope":slope, "offset":intercept} # left to right window for x_iter in range(0, CENTER, 1): x_proj = range(x_iter, x_iter + RANGE) # pylab.plot(x_proj, quant_list[x_iter:x_iter + RANGE], 'k', alpha=0.3) y_subset = quant_list[x_iter:x_iter + RANGE] slope, intercept, r_value, p_value, std_err = stats.linregress(x_proj, y_subset) g, l = calculate_error(slope, intercept, x_proj, y_subset) # print x,x+RANGE, slope, intercept, y[0], f(x, slope, intercept), l,r if l > MAX_ERROR: if ((x_iter - RANGE + x_iter) / 2) not in BRAKE_POINTS.keys(): BRAKE_POINTS[x_iter + (RANGE / 2)] = {"error":l, "slope":slope, "offset":intercept} # pylab.plot([x_iter, x_iter + RANGE, ], [ f(x_iter, slope, intercept), f(x_iter + RANGE, slope, intercept)], "b", alpha=0.3) return BRAKE_POINTS
def fit_exponential(series, *, p0=(1.0, 0.01, 0.0), n0: float = None): """ Fits an exponential to a series. First attempts an exponential fit in linear space using p0, then falls back to a fit in log space to attempt to find parameters p0 for a linear fit; if all else fails returns the linear fit. """ if n0 is None: fit_fn = exponential else: def exponential_constrain_n0(x, a, b): return a * numpy.exp(b * x) + n0 fit_fn = exponential_constrain_n0 p0 = p0[:2] try: popt, pcov = curve_fit(fit_fn, series.index, series.values, p0=p0, maxfev=10000) if n0 is not None: popt = tuple(popt) + (n0,) except RuntimeError: pass else: slope = popt[1] intercept = numpy.log(1 / popt[0]) / slope N0 = popt[2] if slope >= 0: snr = signal_noise_ratio(series, *popt) return slope, intercept, N0, snr, False log_series = numpy.log(series[series > 0] - (n0 or 0.0)) slope, c, *__ = linregress(log_series.index, log_series.values) intercept = - c / slope if n0 is None: p0 = (numpy.exp(c), slope, 0.0) else: p0 = (numpy.exp(c), slope) try: popt, pcov = curve_fit(fit_fn, series.index, series.values, p0=p0, maxfev=10000) if n0 is not None: popt = tuple(popt) + (n0,) except RuntimeError: snr = signal_noise_ratio(series, c, slope, 0.0) return slope, intercept, (n0 or 0.0), snr, True else: slope = popt[1] intercept = numpy.log(1 / popt[0]) / slope n0 = popt[2] snr = signal_noise_ratio(series, *popt) return slope, intercept, n0, snr, False
def analyze_r01(state, human, time): """ Iteration 1 """ #Read data if human: DATA_CSV = 'data/'+state+'_2a.csv' else: DATA_CSV = 'data/'+state+'_2b.csv' data_points = pd.read_csv(DATA_CSV) #Shift carbon and year one row back nr1 = data_points['carb'] nr1 = nr1.iloc[1:len(nr1)] nr2 = data_points['py'] nr2 = nr2.iloc[1:len(nr2)] data_points = data_points.iloc[0:len(data_points.index)-1] nr1.index = np.arange(len(nr1.index)) nr2.index = np.arange(len(nr2.index)) #Now we can calculate difference in carbon if time: data_points.loc[:, 'growth'] = (nr1 - data_points['carb']) / (nr2 - data_points['py']) else: data_points.loc[:, 'growth'] = nr1 data_points.loc[:, 'post_py'] = nr2 data_points = data_points[data_points.post_py // 10000 == data_points.py // 10000] data_points.drop(['py', 'post_py'], axis=1, inplace=True) data_points.index = np.arange(len(data_points.index)) data_points = data_points.loc[:, ['carb', 'growth']] data_points = data_points.as_matrix().tolist() #Split data into training and testing random.shuffle(data_points) training = data_points[0:len(data_points) / 2] test = data_points[len(data_points) / 2:len(data_points)] training = np.array(training) #Create the linear regression function m = stats.linregress(training).slope n = stats.linregress(training).intercept sq_error = 0.0 #Perform validation for elem in test: predicted = m * elem[0] + n actual = elem[1] sq_error += (actual - predicted) ** 2 mse = math.sqrt(sq_error/len(test)) return mse
def analyze_r02(state, human, time): """ Revision 2 """ #Read data if human: DATA_CSV = 'data/'+state+'_2a.csv' else: DATA_CSV = 'data/'+state+'_2b.csv' data_points = pd.read_csv(DATA_CSV) #Shift carbon and year one row back nr1 = data_points['carb'] nr1 = nr1.iloc[1:len(nr1)] nr2 = data_points['py'] nr2 = nr2.iloc[1:len(nr2)] data_points = data_points.iloc[0:len(data_points.index)-1] nr1.index = np.arange(len(nr1.index)) nr2.index = np.arange(len(nr2.index)) #Now we can calculate difference in carbon if time: data_points.loc[:, 'growth'] = (nr1 - data_points['carb']) / (nr2 - data_points['py']) else: data_points.loc[:, 'growth'] = nr1 data_points.loc[:, 'post_py'] = nr2 data_points = data_points[data_points.post_py // 10000 == data_points.py // 10000] data_points.drop(['py', 'post_py'], axis=1, inplace=True) data_points.index = np.arange(len(data_points.index)) data_points = data_points.loc[:, ['carb', 'growth']] data_points = data_points.as_matrix().tolist() #Split data into 10 groups N_GROUPS = 10 random.shuffle(data_points) groups = [] prev_cutoff = 0 #Create the model while performing cross-validation for i in np.arange(N_GROUPS): next_cutoff = (i + 1) * len(data_points) / N_GROUPS groups.append(data_points[prev_cutoff:next_cutoff]) prev_cutoff = next_cutoff sum_mse = 0 for i in np.arange(N_GROUPS): training = [] test = [] for j, group in enumerate(groups): if j == i: test = group else: training.extend(group) training = np.array(training) m = stats.linregress(training).slope n = stats.linregress(training).intercept sq_error = 0.0 for elem in test: predicted = m * elem[0] + n actual = elem[1] sq_error += (actual - predicted) ** 2 mse = math.sqrt(sq_error/len(test)) sum_mse += mse return sum_mse/N_GROUPS
def calcAFunc(self,MaggNow,AaggNow): ''' Calculate a new aggregate savings rule based on the history of the aggregate savings and aggregate market resources from a simulation. Parameters ---------- MaggNow : [float] List of the history of the simulated aggregate market resources for an economy. AaggNow : [float] List of the history of the simulated aggregate savings for an economy. Returns ------- (unnamed) : CapDynamicRule Object containing a new savings rule ''' verbose = True discard_periods = 200 # Throw out the first T periods to allow the simulation to approach the SS update_weight = 0.5 # Proportional weight to put on new function vs old function parameters total_periods = len(MaggNow) # Regress the log savings against log market resources logAagg = np.log(AaggNow[discard_periods:total_periods]) logMagg = np.log(MaggNow[discard_periods-1:total_periods-1]) slope, intercept, r_value, p_value, std_err = stats.linregress(logMagg,logAagg) # Make a new aggregate savings rule by combining the new regression parameters # with the previous guess intercept = update_weight*intercept + (1.0-update_weight)*self.intercept_prev slope = update_weight*slope + (1.0-update_weight)*self.slope_prev AFunc = AggregateSavingRule(intercept,slope) # Make a new next-period capital function # Save the new values as "previous" values for the next iteration self.intercept_prev = intercept self.slope_prev = slope # Plot aggregate resources vs aggregate savings for this run and print the new parameters if verbose: print('intercept=' + str(intercept) + ', slope=' + str(slope) + ', r-sq=' + str(r_value**2)) #plot_start = discard_periods #plt.plot(logMagg[plot_start:],logAagg[plot_start:],'.k') #plt.show() return AggShocksDynamicRule(AFunc)
def log_linear_fitting(x, y, method='lsq'): """Function to perform log linear regression. Parameters ---------- x : ndarray, shape (n_samples) Corresponds to the x. In our case, it should be the time. y : ndarray, shape (n_samples) Corresponds to the y. In our case, it should be the rpp. method : string, optional (default='lsq') The method to use to perform the regression. The choices are: - If 'lsq', an ordinary least-square approach is used. - If 'lm', the Levenberg-Marquardt is used. Returns ------- slope : float slope of the regression line. intercept : float intercept of the regression line. std_err : float Standard error of the estimate. coeff_det : float Coefficient of determination. """ # Check that the array x and y have the same size if x.shape != y.shape: raise ValueError('The size of x and y should be the same.') if method == 'lsq': # Perform the fitting using least-square slope, intercept, _, _, _ = linregress(np.log(x), y) std_err = res_std_dev(y, log_linear_model(x, slope, intercept)) coeff_det = r_squared(y, log_linear_model(x, slope, intercept)) elif method == 'lm': # Perform the fitting using non-linear least-square # Levenberg-Marquardt popt, _ = curve_fit(linear_model, np.log(x), y) slope = popt[0] intercept = popt[1] std_err = res_std_dev(y, log_linear_model(x, slope, intercept)) coeff_det = r_squared(y, log_linear_model(x, slope, intercept)) else: raise NotImplementedError return slope, intercept, std_err, coeff_det
def SymmetryTest(signal1, signal2, error, binary_names, name_signal1 = "", comment="ok"): """ From two signals, this function calculates the relative error at each time indexe, and therefore returns the time indexes where anomalies are found. Computes as well the linear regression coefficients. Inputs : - signal1, signal2 : two signals of the class SignalData to compare (generally signal1 -> "...CHA", and signal2 -> "...CHB" or "..AMSC1" and "..AMSC2" ) - error : the result will be False if there is at least one value which is out of the relative error box - binary_names : list of binary files names - [name_signal1] : optional, a string, the name of the first input. Allows the algorithm to check if the inputs are boolean or not (from specifications) By defaut, consider the signal as non boolean. - [comment] : otpional, a string, if equals 'none' then don't print any result, prints the result if nothing is specified. The print is done through logger.info Outputs : - result : a bool, indicates if the two imput signals are the same (according to the accepted error) - index : time indexes of the signal.data where the differences where found - lin_reg : an array which contains first the type of the signals ('b' for boolean and 'c' for continuous), then the slope, the intercept and finally the R2 value of linear regression between the two signals. """ n = 6 # truncation of digits in res result =True error = abs(error) index = [] lin_reg = [] sig1 = signal1.data sig2 = signal2.data if is_bool(name_signal1, binary_names): #The signals are categorized as boolean : we test if they are different for i, s in enumerate(sig1): if sig2[i] != s : result = False index.append(i) lin_reg = ["b"," -"," -"," -"] #boolean signals : no linear regression else: #The signals are 'reg' (continuous) for i, s in enumerate(sig1): if s or sig2[i]: # avoid division by 0 if abs(2*(s-sig2[i])/(abs(s)+abs(sig2[i]))) > error: result = False index.append(i) np.seterr(divide="ignore") np.seterr(invalid="ignore") a, b, r_value, p_value, std_err = stats.linregress(sig1, sig2) np.seterr(divide="warn") np.seterr(divide="ignore") lin_reg = ["c", str(a)[0:n], str(b)[0:n], str(r_value**2)] #continuous signals : linear regression parameters logger.info(result) if result: logger.info("Les signaux ", signal1.name, signal2.name, "sont identiques (à l'erreur error près)\n") else: logger.info("L'erreur relative entre les signaux est supérieur à error sur une certaine plage\n") return result, index, lin_reg #%%
def get_trend(books, trades): ''' Returns the linear trend in previous trades for each data point in DataFrame of book data ''' def trend(x): trades_n = trades.iloc[x.trades_indexes[0]:x.trades_indexes[1]] if len(trades_n) < 3: return 0 else: return linregress(trades_n.index.values, trades_n.price.values)[0] return books.apply(trend, axis=1) # def get_tick_df(min_ts, max_ts, live, convert_timestamps=False): # ''' # Returns a DataFrame of ticks in time range # ''' # if not live: # query = {'_id': {'$gt': min_ts, '$lt': max_ts}} # cursor = ticks_db.find(query).sort('_id', pymongo.ASCENDING) # else: # cursor = ticks_db.find({}).sort('$natural', pymongo.DESCENDING).limit(1) # # ticks = pd.DataFrame(list(cursor)) # # if not ticks.empty: # ticks = ticks.set_index('_id') # if convert_timestamps: # ticks.index = pd.to_datetime(ticks.index, unit='s') # return ticks # # def get_ticks_indexes(books, ticks): # ''' # Returns indexes of ticks closest to each data point in DataFrame # of book data # ''' # def ticks_indexes(ts): # ts = int(ts) # return ticks.index.get_loc(ts, method='nearest') # return books.index.map(ticks_indexes) # # def get_buys_from_ticks(books, ticks): # ''' # Returns a count of trades for each data point in DataFrame of book data # ''' # def get_buy(x): # return ticks.iloc[x.ticks_indexes].buy # return books.apply(get_buy, axis=1) # # def get_sells_from_ticks(books, ticks): # ''' # Returns a count of trades for each data point in DataFrame of book data # ''' # def get_sell(x): # return ticks.iloc[x.ticks_indexes].sell # return books.apply(get_sell, axis=1)
def validate(self, plot=False, cutoff=0.0, validation_set = None, fname=None): ''' predict titers of the validation set (separate set of test_titers aside previously) and compare against known values. If requested by plot=True, a figure comparing predicted and measured titers is produced ''' from scipy.stats import linregress, pearsonr if validation_set is None: validation_set=self.test_titers self.validation = {} for key, val in validation_set.iteritems(): pred_titer = self.predict_titer(key[0], key[1], cutoff=cutoff) self.validation[key] = (val, pred_titer) a = np.array(self.validation.values()) print ("number of prediction-measurement pairs",a.shape) self.abs_error = np.mean(np.abs(a[:,0]-a[:,1])) self.rms_error = np.sqrt(np.mean((a[:,0]-a[:,1])**2)) self.slope, self.intercept, tmpa, tmpb, tmpc = linregress(a[:,0], a[:,1]) print ("error (abs/rms): ",self.abs_error, self.rms_error) print ("slope, intercept:", self.slope, self.intercept) self.r2 = pearsonr(a[:,0], a[:,1])[0]**2 print ("pearson correlation:", self.r2) if plot: import matplotlib.pyplot as plt import seaborn as sns fs=16 sns.set_style('darkgrid') plt.figure() ax = plt.subplot(111) plt.plot([-1,6], [-1,6], 'k') plt.scatter(a[:,0], a[:,1]) plt.ylabel(r"predicted $\log_2$ distance", fontsize = fs) plt.xlabel(r"measured $\log_2$ distance" , fontsize = fs) ax.tick_params(axis='both', labelsize=fs) plt.text(-2.5,6,'regularization:\nprediction error:\nR^2:', fontsize = fs-2) plt.text(1.2,6, str(self.lam_drop)+'/'+str(self.lam_pot)+'/'+str(self.lam_avi)+' (HI/pot/avi)' +'\n'+str(round(self.abs_error, 2))+'/'+str(round(self.rms_error, 2))+' (abs/rms)' + '\n' + str(self.r2), fontsize = fs-2) plt.tight_layout() if fname: plt.savefig(fname)
def calc_time_censored_tree_frequencies(self): print("fitting time censored tree frequencies") # this doesn't interfere with the previous freq estimates via difference in region: global_censored vs global region = "global_censored" freq_cutoff = 25.0 pivots_fit = 6 freq_window = 1.0 for node in self.nodes: node.fit_frequencies = {} node.freq_slope = {} for time in self.timepoints: time_interval = [time - freq_window, time] pivots = make_pivots( time_interval[0], time_interval[1], 1 / self.pivot_spacing ) node_filter_func = lambda node: node.numdate >= time_interval[0] and node.numdate < time_interval[1] # Recalculate tree frequencies for the given time interval and its # corresponding pivots. tree_freqs = tree_frequencies(self.tree, pivots, node_filter=node_filter_func) tree_freqs.estimate_clade_frequencies() self.frequencies[time] = tree_freqs.frequencies # Annotate censored frequencies on nodes. # TODO: replace node-based annotation with dicts indexed by node name. for node in self.nodes: node.freq = { region: self.frequencies[time][node.clade] } node.logit_freq = { region: logit_transform(self.frequencies[time][node.clade], 1e-4) } for node in self.nodes: if node.logit_freq[region] is not None: node.fit_frequencies[time] = np.minimum(freq_cutoff, np.maximum(-freq_cutoff,node.logit_freq[region])) else: node.fit_frequencies[time] = self.node_parents[node].fit_frequencies[time] try: slope, intercept, rval, pval, stderr = linregress(pivots[pivots_fit:-1], node.fit_frequencies[time][pivots_fit:-1]) node.freq_slope[time] = slope except: import ipdb; ipdb.set_trace() # reset pivots in tree to global pivots self.rootnode.pivots = self.pivots