我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用pandas.rolling_std()。
def BBANDS_STOP(df, n, nstd): MA = pd.Series(pd.rolling_mean(df['close'], n)) MSD = pd.Series(pd.rolling_std(df['close'], n)) Upper = pd.Series(MA + MSD * nstd, name = 'BBSTOP_upper') Lower = pd.Series(MA - MSD * nstd, name = 'BBSTOP_lower') Trend = pd.Series(0, index = Lower.index, name = 'BBSTOP_trend') for idx, dateidx in enumerate(Upper.index): if idx >= n: Trend[idx] = Trend[idx-1] if (df.close[idx] > Upper[idx-1]): Trend[idx] = 1 if (df.close[idx] < Lower[idx-1]): Trend[idx] = -1 if (Trend[idx]==1) and (Lower[idx] < Lower[idx-1]): Lower[idx] = Lower[idx-1] elif (Trend[idx]==-1) and (Upper[idx] > Upper[idx-1]): Upper[idx] = Upper[idx-1] return pd.concat([Upper,Lower, Trend], join='outer', axis=1)
def SVAPO(df, period = 8, cutoff = 1, stdev_h = 1.5, stdev_l = 1.3, stdev_period = 100): HA = HEIKEN_ASHI(df, 1) haCl = (HA.HAopen + HA.HAclose + HA.HAhigh + HA.HAlow)/4.0 haC = TEMA( haCl, 0.625 * period ) vave = MA(df, 5 * period, field = 'volume').shift(1) vc = pd.concat([df['volume'], vave*2], axis=1).min(axis=1) vtrend = TEMA(LINEAR_REG_SLOPE(df.volume, period), period) UpD = pd.Series(vc) DoD = pd.Series(-vc) UpD[(haC<=haC.shift(1)*(1+cutoff/1000.0))|(vtrend < vtrend.shift(1))] = 0 DoD[(haC>=haC.shift(1)*(1-cutoff/1000.0))|(vtrend > vtrend.shift(1))] = 0 delta_sum = pd.rolling_sum(UpD + DoD, period)/(vave+1) svapo = pd.Series(TEMA(delta_sum, period), name = 'SVAPO_%s' % period) svapo_std = pd.rolling_std(svapo, stdev_period) svapo_ub = pd.Series(svapo_std * stdev_h, name = 'SVAPO_UB%s' % period) svapo_lb = pd.Series(-svapo_std * stdev_l, name = 'SVAPO_LB%s' % period) return pd.concat([svapo, svapo_ub, svapo_lb], join='outer', axis=1)
def bollinger_band_indicator(price, syms = ['AAPL'], lookback = 21): sma = sma_indicator(price) #price = compute_prices() ###calculate bolling bands(21 day) over the entire period rolling_std = price.rolling(window = lookback, min_periods = lookback).std() #rolling_std = pd.rolling_std(price, window = lookback, min_periods = lookback) top_band = sma + (2 * rolling_std) bottom_band = sma - (2 * rolling_std) bbp = (price - bottom_band) / (top_band - bottom_band) bbp = bbp.fillna(method = 'bfill') return bbp,top_band, bottom_band ###Calculate relative strength, then RSI
def BBANDS(df_price, periods=20, mul=2): # Middle Band = 20-day simple moving average (SMA) df_middle_band = pd.rolling_mean(df_price, window=periods) #df_middle_band = pd.rolling(window=periods,center=False).mean() # 20-day standard deviation of price """ Pandas uses the unbiased estimator (N-1 in the denominator), whereas Numpy by default does not. To make them behave the same, pass ddof=1 to numpy.std().""" df_std = pd.rolling_std(df_price, window=periods) #df_std = pd.rolling(window=periods,center=False).std() # Upper Band = 20-day SMA + (20-day standard deviation of price x 2) df_upper_band = df_middle_band + (df_std * mul) # Lower Band = 20-day SMA - (20-day standard deviation of price x 2) df_lower_band = df_middle_band - (df_std * mul) return (df_upper_band, df_middle_band, df_lower_band)
def getVol(ret): ''' calculate volatility value of log return ratio :param DataFrame ret: return value :param int interval: interval over which volatility is calculated :return: DataFrame standard_error: volatility value ''' print '''************************************************************************************* a kind WARNING from the programmer(not the evil interpreter) function getVol: we have different values for interval in test code and real code,because the sample file may not have sufficient rows for real interval,leading to empty matrix.So be careful of the value you choose ************************************************************************************** ''' # real value # interval = 26 # test value interval = 4 standard_error = pd.rolling_std(ret, interval) standard_error.dropna(inplace=True) standard_error.index = range(standard_error.shape[0]) return standard_error
def STD(self, param): return pd.rolling_std(param[0], param[1])
def STDP(self, param): return pd.rolling_std(param[0], param[1], ddof = 0)
def rolling_std(series, window=200, min_periods=None): min_periods = window if min_periods is None else min_periods try: if min_periods == window: return numpy_rolling_std(series, window, True) else: try: return series.rolling(window=window, min_periods=min_periods).std() except BaseException: return pd.Series(series).rolling(window=window, min_periods=min_periods).std() except BaseException: return pd.rolling_std(series, window=window, min_periods=min_periods) # ---------------------------------------------
def bollinger_bands(series, window=20, stds=2): sma = rolling_mean(series, window=window) std = rolling_std(series, window=window) upper = sma + std * stds lower = sma - std * stds return pd.DataFrame(index=series.index, data={ 'upper': upper, 'mid': sma, 'lower': lower }) # ---------------------------------------------
def weighted_bollinger_bands(series, window=20, stds=2): ema = rolling_weighted_mean(series, window=window) std = rolling_std(series, window=window) upper = ema + std * stds lower = ema - std * stds return pd.DataFrame(index=series.index, data={ 'upper': upper.values, 'mid': ema.values, 'lower': lower.values }) # ---------------------------------------------
def STDEV(df, n, field = 'close'): return pd.Series(pd.rolling_std(df[field], n), name = 'STDEV_' + field.upper() + '_' + str(n))
def BBANDS(df, n, k = 2): MA = pd.Series(pd.rolling_mean(df['close'], n)) MSD = pd.Series(pd.rolling_std(df['close'], n)) b1 = 2 * k * MSD / MA B1 = pd.Series(b1, name = 'BollingerB' + str(n)) b2 = (df['close'] - MA + k * MSD) / (2 * k * MSD) B2 = pd.Series(b2, name = 'Bollingerb' + str(n)) return pd.concat([B1,B2], join='outer', axis=1) #Pivot Points, Supports and Resistances
def CCI(df, n): PP = (df['high'] + df['low'] + df['close']) / 3 CCI = pd.Series((PP - pd.rolling_mean(PP, n)) / pd.rolling_std(PP, n) / 0.015, name = 'CCI' + str(n)) return CCI
def get_rolling_std(values, window): """Return rolling standard deviation of given values, using specified window size.""" # TODO: Compute and return rolling standard deviation return values.rolling(window=window).std() # return pd.rolling_std(values, window=window)
def BBANDS(df, n): MA = pd.Series(pd.rolling_mean(df['Close'], n)) MSD = pd.Series(pd.rolling_std(df['Close'], n)) b1 = 4 * MSD / MA B1 = pd.Series(b1, name = 'BollingerB_' + str(n)) df = df.join(B1) b2 = (df['Close'] - MA + 2 * MSD) / (4 * MSD) B2 = pd.Series(b2, name = 'Bollinger%b_' + str(n)) df = df.join(B2) return df #Pivot Points, Supports and Resistances
def CCI(df, n): PP = (df['High'] + df['Low'] + df['Close']) / 3 CCI = pd.Series((PP - pd.rolling_mean(PP, n)) / pd.rolling_std(PP, n), name = 'CCI_' + str(n)) df = df.join(CCI) return df #Coppock Curve
def STDDEV(df, n): df = df.join(pd.Series(pd.rolling_std(df['Close'], n), name = 'STD_' + str(n))) return df
def get_rolling_30_day_vol(ticker, start, end): prices = get_stock_prices(ticker, start, end)['Adj Close'] rets = prices.pct_change() return pd.rolling_std(rets, 30, 30)
def calculateBollingerBandsValue(df): simpleMovingAverage = pandas.rolling_mean(df,window=5) stdDeviation = pandas.rolling_std(df,window=5) bollingerBandsValue = (df - simpleMovingAverage)/(2*stdDeviation) bollingerBandsValue = bollingerBandsValue.dropna() return bollingerBandsValue
def plot(df): rollingMean = pandas.rolling_mean(df['Close Price'], window=100) rollingStdv = pandas.rolling_std(df['Close Price'], window=100) plotter.plot(rollingMean) # plotting bollinger bands plotter.plot(rollingMean + rollingStdv * 2) plotter.plot(rollingMean - rollingStdv * 2) # df1 = df[['Date','Close']] plotter.plot(df) plotter.show()
def get_rolling_std(values, window,min_periods=None): """Return rolling standard deviation of given values, using specified window size.""" return pd.rolling_std(values, window=window, min_periods=min_periods)
def get_rolling_std(df_values, windowSize): return pd.rolling_std(df_values, windowSize)
def get_bollinger_bands(rolling_mean, rolling_std): upper_band = rolling_mean + rolling_std * 2 lower_band = rolling_mean - rolling_std * 2 return upper_band, lower_band
def get_rolling_std(values, window): """Return rolling standard deviation of given values, using specified window size.""" # Compute and return rolling standard deviation #return pd.rolling_std(values, window=window) return values.rolling(center=False,window=window).std()
def get_myRatio(df_price, periods=20): # Middle Band = 20-day simple moving average (SMA) #df_middle_band = pd.rolling_mean(df_price, window=periods) df_middle_band = df_price.rolling(center=False, window=periods).mean() # 20-day standard deviation of price """ Pandas uses the unbiased estimator (N-1 in the denominator), whereas Numpy by default does not. To make them behave the same, pass ddof=1 to numpy.std().""" #df_std = pd.rolling_std(df_price, window=periods) df_std = df_price.rolling(center=False, window=periods).std() return (df_price - df_middle_band)/(df_std * 2)
def bband(stock, window, num_std): data = stock.selected_line_info.values index = stock.selected_line_info.index ma = pd.rolling_mean(data, window) std = pd.rolling_std(data, window) upper = ma + std * num_std lower = ma - std * num_std bband_ma = pd.Series(ma, index=index) bband_upper = pd.Series(upper, index=index) bband_lower = pd.Series(lower, index=index) return bband_upper,bband_lower,bband_ma
def generate_features(df): """ Generate features for a stock/index based on historical price and performance Args: df (dataframe with columns "Open", "Close", "High", "Low", "Volume", "Adjusted Close") Returns: dataframe, data set with new features """ df_new = pd.DataFrame() # 6 original features df_new['open'] = df['Open'] df_new['open_1'] = df['Open'].shift(1) df_new['close_1'] = df['Close'].shift(1) df_new['high_1'] = df['High'].shift(1) df_new['low_1'] = df['Low'].shift(1) df_new['volume_1'] = df['Volume'].shift(1) # 31 original features # average price df_new['avg_price_5'] = pd.rolling_mean(df['Close'], window=5).shift(1) df_new['avg_price_30'] = pd.rolling_mean(df['Close'], window=21).shift(1) df_new['avg_price_365'] = pd.rolling_mean(df['Close'], window=252).shift(1) df_new['ratio_avg_price_5_30'] = df_new['avg_price_5'] / df_new['avg_price_30'] df_new['ratio_avg_price_5_365'] = df_new['avg_price_5'] / df_new['avg_price_365'] df_new['ratio_avg_price_30_365'] = df_new['avg_price_30'] / df_new['avg_price_365'] # average volume df_new['avg_volume_5'] = pd.rolling_mean(df['Volume'], window=5).shift(1) df_new['avg_volume_30'] = pd.rolling_mean(df['Volume'], window=21).shift(1) df_new['avg_volume_365'] = pd.rolling_mean(df['Volume'], window=252).shift(1) df_new['ratio_avg_volume_5_30'] = df_new['avg_volume_5'] / df_new['avg_volume_30'] df_new['ratio_avg_volume_5_365'] = df_new['avg_volume_5'] / df_new['avg_volume_365'] df_new['ratio_avg_volume_30_365'] = df_new['avg_volume_30'] / df_new['avg_volume_365'] # standard deviation of prices df_new['std_price_5'] = pd.rolling_std(df['Close'], window=5).shift(1) df_new['std_price_30'] = pd.rolling_std(df['Close'], window=21).shift(1) df_new['std_price_365'] = pd.rolling_std(df['Close'], window=252).shift(1) df_new['ratio_std_price_5_30'] = df_new['std_price_5'] / df_new['std_price_30'] df_new['ratio_std_price_5_365'] = df_new['std_price_5'] / df_new['std_price_365'] df_new['ratio_std_price_30_365'] = df_new['std_price_30'] / df_new['std_price_365'] # standard deviation of volumes df_new['std_volume_5'] = pd.rolling_std(df['Volume'], window=5).shift(1) df_new['std_volume_30'] = pd.rolling_std(df['Volume'], window=21).shift(1) df_new['std_volume_365'] = pd.rolling_std(df['Volume'], window=252).shift(1) df_new['ratio_std_volume_5_30'] = df_new['std_volume_5'] / df_new['std_volume_30'] df_new['ratio_std_volume_5_365'] = df_new['std_volume_5'] / df_new['std_volume_365'] df_new['ratio_std_volume_30_365'] = df_new['std_volume_30'] / df_new['std_volume_365'] # # return df_new['return_1'] = ((df['Close'] - df['Close'].shift(1)) / df['Close'].shift(1)).shift(1) df_new['return_5'] = ((df['Close'] - df['Close'].shift(5)) / df['Close'].shift(5)).shift(1) df_new['return_30'] = ((df['Close'] - df['Close'].shift(21)) / df['Close'].shift(21)).shift(1) df_new['return_365'] = ((df['Close'] - df['Close'].shift(252)) / df['Close'].shift(252)).shift(1) df_new['moving_avg_5'] = pd.rolling_mean(df_new['return_1'], window=5) df_new['moving_avg_30'] = pd.rolling_mean(df_new['return_1'], window=21) df_new['moving_avg_365'] = pd.rolling_mean(df_new['return_1'], window=252) # the target df_new['close'] = df['Close'] df_new = df_new.dropna(axis=0) return df_new