我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用pandas.rolling_max()。
def VCI(df, n, rng = 8): if n > 7: varA = pd.rolling_max(df.high, rng) - pd.rolling_min(df.low, rng) varB = varA.shift(rng) varC = varA.shift(rng*2) varD = varA.shift(rng*3) varE = varA.shift(rng*4) avg_tr = (varA+varB+varC+varD+varE)/25.0 else: tr = pd.concat([df.high - df.low, abs(df.close - df.close.shift(1))], join='outer', axis=1).max(1) avg_tr = pd.rolling_mean(tr, n) * 0.16 avg_pr = (pd.rolling_mean(df.high, n) + pd.rolling_mean(df.low, n))/2.0 VO = pd.Series((df.open - avg_pr)/avg_tr, name = 'VCIO') VH = pd.Series((df.high - avg_pr)/avg_tr, name = 'VCIH') VL = pd.Series((df.low - avg_pr)/avg_tr, name = 'VCIL') VC = pd.Series((df.close - avg_pr)/avg_tr, name = 'VCIC') return pd.concat([VO, VH, VL, VC], join='outer', axis=1)
def getWilliam(close, high, low): ''' ?????? :param DataFrame close: ??? :param DataFrame high: ????? :param DataFrame low: ????? :return: DataFrame w: ???? ''' # ?14??? n = 14 high = pd.rolling_max(high, n) high.index = range(high.shape[0]) low = pd.rolling_min(low, n) low.index = range(low.shape[0]) w = 100 - 100 * (close - low) / (high - low) w.replace([np.nan, np.inf, -np.inf], 0, inplace=True) return w
def HHV(self, param): if param[1] == 0: return pd.expanding_max(param[0]) return pd.rolling_max(param[0], param[1])
def minMaxScalerRelative(tab,wind): minn=pandas.rolling_min(tab,wind) maxx=pandas.rolling_max(tab,wind) minn[0:wind-1]=minn[wind] maxx[0:wind-1]=maxx[wind] norm=maxx-minn return (tab-minn)/norm
def gen_config_file(filename): sim_config = {} sim_config['sim_func'] = 'bktest_psar_test.psar_test_sim' sim_config['scen_keys'] = ['freq'] sim_config['sim_name'] = 'psar_test' sim_config['products'] = ['m'] #[ 'm', 'RM', 'y', 'p', 'a', 'rb', 'SR', 'TA', 'MA', 'i', 'ru', 'j', 'jm', 'ag', 'cu', 'au', 'al', 'zn' ] sim_config['start_date'] = '20141101' sim_config['end_date'] = '20151118' sim_config['freq'] = [ '15m', '60m' ] sim_config['pos_class'] = 'strat.TradePos' sim_config['proc_func'] = 'dh.min_freq_group' #chan_func = {'high': {'func': 'pd.rolling_max', 'args':{}}, # 'low': {'func': 'pd.rolling_min', 'args':{}}, # } config = {'capital': 10000, 'offset': 0, 'chan': 20, 'use_chan': True, 'sar_params': {'iaf': 0.02, 'maxaf': 0.2, 'incr': 0.02}, 'trans_cost': 0.0, 'close_daily': True, 'unit': 1, 'stoploss': 0.0, #'proc_args': {'minlist':[1500]}, 'proc_args': {'freq':15}, 'pos_args': {}, 'pos_update': False, #'chan_func': chan_func, } sim_config['config'] = config with open(filename, 'w') as outfile: json.dump(sim_config, outfile) return sim_config
def process_data(self, mdf): xdf = self.proc_func(mdf, **self.proc_args) if self.win == -1: tr= pd.concat([xdf.high - xdf.low, abs(xdf.close - xdf.close.shift(1))], join='outer', axis=1).max(axis=1) elif self.win == 0: tr = pd.concat([(pd.rolling_max(xdf.high, 2) - pd.rolling_min(xdf.close, 2))*self.multiplier, (pd.rolling_max(xdf.close, 2) - pd.rolling_min(xdf.low, 2))*self.multiplier, xdf.high - xdf.close, xdf.close - xdf.low], join='outer', axis=1).max(axis=1) else: tr= pd.concat([pd.rolling_max(xdf.high, self.win) - pd.rolling_min(xdf.close, self.win), pd.rolling_max(xdf.close, self.win) - pd.rolling_min(xdf.low, self.win)], join='outer', axis=1).max(axis=1) xdf['TR'] = tr xdf['chanh'] = self.chan_high(xdf['high'], self.chan, **self.chan_func['high']['args']) xdf['chanl'] = self.chan_low(xdf['low'], self.chan, **self.chan_func['low']['args']) xdf['ATR'] = dh.ATR(xdf, n = self.atr_len) xdf['MA'] = dh.MA(xdf, n=self.atr_len, field = 'close') xdata = pd.concat([xdf['TR'].shift(1), xdf['MA'].shift(1), xdf['ATR'].shift(1), xdf['chanh'].shift(1), xdf['chanl'].shift(1), xdf['open']], axis=1, keys=['tr','ma', 'atr', 'chanh', 'chanl', 'dopen']).fillna(0) self.df = mdf.join(xdata, how = 'left').fillna(method='ffill') self.df['datetime'] = self.df.index self.df['cost'] = 0 self.df['pos'] = 0 self.df['traded_price'] = self.df['open']
def gen_config_file(filename): sim_config = {} sim_config['sim_class'] = 'bktest_dtchan_vecsim.DTChanSim' sim_config['sim_func'] = 'run_vec_sim' sim_config['scen_keys'] = ['param', 'chan'] sim_config['sim_name'] = 'DTChan_VecSim' sim_config['products'] = ['m', 'RM', 'y', 'p', 'a', 'rb', 'SR', 'TA', 'MA', 'i', 'ru', 'j' ] sim_config['start_date'] = '20150102' sim_config['end_date'] = '20170428' sim_config['param'] = [ (0.5, 0, 0.5, 0.0), (0.6, 0, 0.5, 0.0), (0.7, 0, 0.5, 0.0), (0.8, 0, 0.5, 0.0), \ (0.9, 0, 0.5, 0.0), (1.0, 0, 0.5, 0.0), (1.1, 0, 0.5, 0.0), \ (0.5, 1, 0.5, 0.0), (0.6, 1, 0.5, 0.0), (0.7, 1, 0.5, 0.0), (0.8, 1, 0.5, 0.0), \ (0.9, 1, 0.5, 0.0), (1.0, 1, 0.5, 0.0), (1.1, 1, 0.5, 0.0), \ (0.2, 2, 0.5, 0.0), (0.25,2, 0.5, 0.0), (0.3, 2, 0.5, 0.0), (0.35, 2, 0.5, 0.0),\ (0.4, 2, 0.5, 0.0), (0.45, 2, 0.5, 0.0),(0.5, 2, 0.5, 0.0), \ (0.2, 4, 0.5, 0.0), (0.25, 4, 0.5, 0.0),(0.3, 4, 0.5, 0.0), (0.35, 4, 0.5, 0.0),\ (0.4, 4, 0.5, 0.0), (0.45, 4, 0.5, 0.0),(0.5, 4, 0.5, 0.0),\ ] sim_config['chan'] = [3, 5, 10, 15, 20] sim_config['pos_class'] = 'strat.TradePos' sim_config['proc_func'] = 'dh.day_split' sim_config['offset'] = 1 chan_func = {'high': {'func': 'pd.rolling_max', 'args':{}}, 'low': {'func': 'pd.rolling_min', 'args':{}}, } config = {'capital': 10000, 'use_chan': True, 'trans_cost': 0.0, 'close_daily': False, 'unit': 1, 'min_range': 0.0035, 'proc_args': {'minlist':[1500]}, 'chan_func': chan_func, } sim_config['config'] = config with open(filename, 'w') as outfile: json.dump(sim_config, outfile) return sim_config
def CMI(df, n): ts = pd.Series(abs(df['close'] - df['close'].shift(n))/(pd.rolling_max(df['high'], n) - pd.rolling_min(df['low'], n))*100, name='CMI'+str(n)) return ts
def CHENOW_PLUNGER(df, n, atr_n = 40): atr = ATR(df, atr_n) high = pd.Series((pd.rolling_max(df['high'], n) - df['close'])/atr, name = 'CPLUNGER_H'+ str(n)) low = pd.Series((df['close'] - pd.rolling_min(df['low'], n))/atr, name = 'CPLUNGER_L'+ str(n)) return pd.concat([high,low], join='outer', axis=1) #Donchian Channel
def DONCH_H(df, n, field = 'high'): DC_H = pd.rolling_max(df[field],n) return pd.Series(DC_H, name = 'DONCH_H' + field[0].upper() + str(n))
def FISHER(df, n, smooth_p = 0.7, smooth_i = 0.7): roll_high = pd.rolling_max(df.high, n) roll_low = pd.rolling_min(df.low, n) price_loc = (df.close - roll_low)/(roll_high - roll_low) * 2.0 - 1 sm_price = pd.Series(pd.ewma(price_loc, com = 1.0/smooth_p - 1, adjust = False), name = 'FISHER_P') fisher_ind = 0.5 * np.log((1 + sm_price)/(1 - sm_price)) sm_fisher = pd.Series(pd.ewma(fisher_ind, com = 1.0/smooth_i - 1, adjust = False), name = 'FISHER_I') return pd.concat([sm_price, sm_fisher], join='outer', axis=1)
def WPR(df, n): res = pd.Series((df['close'] - pd.rolling_min(df['low'], n))/(pd.rolling_max(df['high'], n) - pd.rolling_min(df['low'], n))*100, name = "WPR_%s" % str(n)) return res
def DT_RNG(df, win = 2, ratio = 0.7): if win == 0: tr_ts = pd.concat([(pd.rolling_max(df['high'], 2) - pd.rolling_min(df['close'], 2))*0.5, (pd.rolling_max(df['close'], 2) - pd.rolling_min(df['low'], 2))*0.5, df['high'] - df['close'], df['close'] - df['low']], join='outer', axis=1).max(axis=1) else: tr_ts = pd.concat([pd.rolling_max(df['high'], win) - pd.rolling_min(df['close'], win), pd.rolling_max(df['close'], win) - pd.rolling_min(df['low'], win)], join='outer', axis=1).max(axis=1) return pd.Series(tr_ts, name = 'DTRNG%s_%s' % (win, ratio))
def ichimoku(price_objs): """ computes the ichimoku cloud for price_objs """ dates = [pd.to_datetime(str(obj.created_on)) for obj in price_objs] prices = [obj.price for obj in price_objs] d = {'date': dates, 'price': prices} _prices = pd.DataFrame(d) # Tenkan-sen (Conversion Line): (9-period high + 9-period low)/2)) period9_high = pd.rolling_max(_prices['price'], window=9) period9_low = pd.rolling_min(_prices['price'], window=9) tenkan_sen = (period9_high + period9_low) / 2 # Kijun-sen (Base Line): (26-period high + 26-period low)/2)) period26_high = pd.rolling_max(_prices['price'], window=26) period26_low = pd.rolling_min(_prices['price'], window=26) kijun_sen = (period26_high + period26_low) / 2 # Senkou Span A (Leading Span A): (Conversion Line + Base Line)/2)) senkou_span_a = ((tenkan_sen + kijun_sen) / 2).shift(26) # Senkou Span B (Leading Span B): (52-period high + 52-period low)/2)) period52_high = pd.rolling_max(_prices['price'], window=52) period52_low = pd.rolling_min(_prices['price'], window=52) senkou_span_b = ((period52_high + period52_low) / 2).shift(26) # The most current closing price plotted 22 time periods behind (optional) chikou_span = _prices.shift(-22) # 22 according to investopedia return { 'tenkan_sen': tenkan_sen, 'kijun_sen': kijun_sen, 'senkou_span_a': senkou_span_a, 'senkou_span_b': senkou_span_b, 'chikou_span': chikou_span, }
def gen_config_file(filename): sim_config = {} sim_config['sim_class'] = 'bktest_dtsplit_psar.DTStopSim' sim_config['sim_func'] = 'run_loop_sim' sim_config['scen_keys'] = ['param', 'stoploss'] sim_config['sim_name'] = 'DT_psar' sim_config['products'] = ['m', 'RM', 'y', 'p', 'a', 'rb', 'SR', 'TA', 'MA', 'i', 'ru', 'j' ] sim_config['start_date'] = '20150102' sim_config['end_date'] = '20160311' sim_config['param'] = [ (0.5, 0, 0.5, 0.0), (0.6, 0, 0.5, 0.0), (0.7, 0, 0.5, 0.0), (0.8, 0, 0.5, 0.0), \ (0.9, 0, 0.5, 0.0), (1.0, 0, 0.5, 0.0), (1.1, 0, 0.5, 0.0), \ (0.5, 1, 0.5, 0.0), (0.6, 1, 0.5, 0.0), (0.7, 1, 0.5, 0.0), (0.8, 1, 0.5, 0.0), \ (0.9, 1, 0.5, 0.0), (1.0, 1, 0.5, 0.0), (1.1, 1, 0.5, 0.0), \ (0.2, 2, 0.5, 0.0), (0.25,2, 0.5, 0.0), (0.3, 2, 0.5, 0.0), (0.35, 2, 0.5, 0.0),\ (0.4, 2, 0.5, 0.0), (0.45, 2, 0.5, 0.0),(0.5, 2, 0.5, 0.0), \ #(0.2, 4, 0.5, 0.0), (0.25, 4, 0.5, 0.0),(0.3, 4, 0.5, 0.0), (0.35, 4, 0.5, 0.0),\ #(0.4, 4, 0.5, 0.0), (0.45, 4, 0.5, 0.0),(0.5, 4, 0.5, 0.0),\ ] sim_config['stoploss'] = [1, 2, 3] sim_config['pos_class'] = 'strat.TargetTrailTradePos' sim_config['proc_func'] = 'dh.day_split' sim_config['offset'] = 1 chan_func = {'high': {'func': 'pd.rolling_max', 'args':{}}, 'low': {'func': 'pd.rolling_min', 'args':{}}, } config = {'capital': 10000, 'chan': 10, 'use_chan': False, 'trans_cost': 0.0, 'close_daily': False, 'unit': 1, 'stoploss': 0.0, 'min_range': 0.004, 'proc_args': {'minlist':[1500]}, 'pos_args': {}, 'pos_update': True, 'atr_period': 10, 'chan_func': chan_func, } sim_config['config'] = config with open(filename, 'w') as outfile: json.dump(sim_config, outfile) return sim_config
def run_vec_sim(self): xdf = self.proc_func(self.df, **self.proc_args) if self.win == -1: tr= pd.concat([xdf.high - xdf.low, abs(xdf.close - xdf.close.shift(1))], join='outer', axis=1).max(axis=1) elif self.win == 0: tr = pd.concat([(pd.rolling_max(xdf.high, 2) - pd.rolling_min(xdf.close, 2)) * self.multiplier, (pd.rolling_max(xdf.close, 2) - pd.rolling_min(xdf.low, 2)) * self.multiplier, xdf.high - xdf.close, xdf.close - xdf.low], join='outer', axis=1).max(axis=1) else: tr= pd.concat([pd.rolling_max(xdf.high, self.win) - pd.rolling_min(xdf.close, self.win), pd.rolling_max(xdf.close, self.win) - pd.rolling_min(xdf.low, self.win)], join='outer', axis=1).max(axis=1) xdf['tr'] = tr xdf['chan_h'] = self.chan_high(xdf, self.chan, **self.chan_func['high']['args']) xdf['chan_l'] = self.chan_low(xdf, self.chan, **self.chan_func['low']['args']) xdf['atr'] = dh.ATR(xdf, self.machan) xdf['ma'] = pd.rolling_mean(xdf.close, self.machan) xdf['rng'] = pd.DataFrame([self.min_rng * xdf['open'], self.k * xdf['tr'].shift(1)]).max() xdf['upper'] = xdf['open'] + xdf['rng'] * (1 + (xdf['open'] < xdf['ma'].shift(1))*self.f) xdf['lower'] = xdf['open'] - xdf['rng'] * (1 + (xdf['open'] > xdf['ma'].shift(1))*self.f) xdata = pd.concat([xdf['upper'], xdf['lower'], xdf['chan_h'].shift(1), xdf['chan_l'].shift(1), xdf['open']], axis=1, keys=['upper','lower', 'chan_h', 'chan_l', 'xopen']).fillna(0) mdf = self.df.join(xdata, how = 'left').fillna(method='ffill') mdf['dt_signal'] = np.nan if self.price_mode == "HL": up_price = mdf['high'] dn_price = mdf['low'] elif self.price_mode == "TP": up_price = (mdf['high'] + mdf['low'] + mdf['close'])/3.0 dn_price = up_price elif self.price_mode == "CL": up_price = mdf['close'] dn_price = mdf['close'] else: print "unsupported price mode" mdf.ix[up_price >= mdf['upper'], 'dt_signal'] = 1 mdf.ix[dn_price <= mdf['lower'], 'dt_signal'] = -1 if self.close_daily: mdf.ix[ mdf['min_id'] >= self.exit_min, 'dt_signal'] = 0 addon_signal = copy.deepcopy(mdf['dt_signal']) mdf['dt_signal'] = mdf['dt_signal'].fillna(method='ffill').fillna(0) mdf['chan_sig'] = np.nan if combo_signal: mdf.ix[(up_price >= mdf['chan_h']) & (addon_signal > 0), 'chan_sig'] = 1 mdf.ix[(dn_price <= mdf['chan_l']) & (addon_signal < 0), 'chan_sig'] = -1 else: mdf.ix[(mdf['high'] >= mdf['chan_h']), 'chan_sig'] = 1 mdf.ix[(mdf['low'] <= mdf['chan_l']), 'chan_sig'] = -1 mdf['chan_sig'] = mdf['chan_sig'].fillna(method='ffill').fillna(0) pos = mdf['dt_signal'] * (self.unit[0] + (mdf['chan_sig'] * mdf['dt_signal'] > 0) * self.unit[1]) mdf['pos'] = pos.shift(1).fillna(0) mdf.ix[-3:, 'pos'] = 0 mdf['cost'] = abs(mdf['pos'] - mdf['pos'].shift(1)) * (self.offset + mdf['open'] * self.tcost) mdf['cost'] = mdf['cost'].fillna(0.0) mdf['traded_price'] = mdf['open'] self.closed_trades = backtest.simdf_to_trades1(mdf, slippage = self.offset ) return mdf, self.closed_trades
def dual_thrust_sim( mdf, config): close_daily = config['close_daily'] offset = config['offset'] k = config['param'][0] win = config['param'][1] proc_func = config['proc_func'] proc_args = config['proc_args'] chan_func = config['chan_func'] chan_high = eval(chan_func['high']['func']) chan_low = eval(chan_func['low']['func']) tcost = config['trans_cost'] min_rng = config['min_range'] chan = config['chan'] xdf = proc_func(mdf, **proc_args) tr= pd.concat([pd.rolling_max(xdf.high, win) - pd.rolling_min(xdf.close, win), pd.rolling_max(xdf.close, win) - pd.rolling_min(xdf.low, win)], join='outer', axis=1).max(axis=1) xdf['tr'] = tr xdf['chan_h'] = chan_high(xdf, chan, **chan_func['high']['args']) xdf['chan_l'] = chan_low(xdf, chan, **chan_func['low']['args']) #sar_param = config['sar_param'] #sar = dh.SAR(xdf, **sar_param) #sar_signal = pd.Series(0, index = sar.index) #sar_signal[(sar < xdf['close'])] = 1 #sar_signal[(sar > xdf['close'])] = -1 #xdf['atr'] = dh.ATR(xdf, chan) xdata = pd.concat([xdf['tr'].shift(1), xdf['chan_h'].shift(1), xdf['chan_l'].shift(1), xdf['open']], axis=1, keys=['tr','chan_h', 'chan_l', 'xopen']) mdf = mdf.join(xdata, how = 'left').fillna(method='ffill') rng = pd.DataFrame([min_rng * mdf['xopen'], k * mdf['tr']]).max() long_signal = pd.Series(np.nan, index = mdf.index) long_signal[(mdf['high'] >= mdf['xopen'] + rng) & (mdf['high'] >= mdf['chan_h']) ] = 1 long_signal[(mdf['low'] <= mdf['xopen'] - rng) | (mdf['low'] <= mdf['chan_l'])] = 0 if close_daily: long_signal[(mdf['min_id']>=config['exit_min'])] = 0 long_signal = long_signal.shift(1) long_signal = long_signal.fillna(method='ffill').fillna(0) short_signal = pd.Series(np.nan, index = mdf.index) short_signal[(mdf['low'] <= mdf['xopen'] - rng) & (mdf['low'] <= mdf['chan_l']) ] = -1 short_signal[(mdf['high'] >= mdf['xopen'] + rng) | (mdf['high'] >= mdf['chan_h'])] = 0 if close_daily: short_signal[(mdf['min_id']>=config['exit_min'])] = 0 short_signal = short_signal.shift(1) short_signal = short_signal.fillna(method='ffill').fillna(0) if len(mdf[(long_signal>0)&(short_signal<0)]) > 0: print "Warning: long and short signal happen at the same time" mdf['pos'] = long_signal + short_signal mdf.ix[-3:, 'pos'] = 0 mdf['cost'] = abs(mdf['pos'] - mdf['pos'].shift(1)) * (offset + mdf['open'] * tcost) mdf['cost'] = mdf['cost'].fillna(0.0) mdf['traded_price'] = mdf.open + (mdf['pos'] - mdf['pos'].shift(1)) * offset closed_trades = backtest.simdf_to_trades1(mdf, slippage = offset ) return (mdf, closed_trades)
def run_vec_sim(self): xdf = self.proc_func(self.df, **self.proc_args) if self.win == -1: tr= pd.concat([xdf.high - xdf.low, abs(xdf.close - xdf.close.shift(1))], join='outer', axis=1).max(axis=1) elif self.win == 0: tr = pd.concat([(pd.rolling_max(xdf.high, 2) - pd.rolling_min(xdf.close, 2)) * self.multiplier, (pd.rolling_max(xdf.close, 2) - pd.rolling_min(xdf.low, 2)) * self.multiplier, xdf.high - xdf.close, xdf.close - xdf.low], join='outer', axis=1).max(axis=1) else: tr= pd.concat([pd.rolling_max(xdf.high, self.win) - pd.rolling_min(xdf.close, self.win), pd.rolling_max(xdf.close, self.win) - pd.rolling_min(xdf.low, self.win)], join='outer', axis=1).max(axis=1) xdf['tr'] = tr xdf['chan_h'] = self.chan_high(xdf, self.chan, **self.chan_func['high']['args']) xdf['chan_l'] = self.chan_low(xdf, self.chan, **self.chan_func['low']['args']) xdf['atr'] = dh.ATR(xdf, self.machan) xdf['ma'] = pd.rolling_mean(xdf.close, self.machan) xdf['rng'] = pd.DataFrame([self.min_rng * xdf['open'], self.k * xdf['tr'].shift(1)]).max() xdf['upper'] = xdf['open'] + xdf['rng'] * (1 + (xdf['open'] < xdf['ma'].shift(1))*self.f) xdf['lower'] = xdf['open'] - xdf['rng'] * (1 + (xdf['open'] > xdf['ma'].shift(1))*self.f) xdata = pd.concat([xdf['upper'], xdf['lower'], xdf['chan_h'].shift(1), xdf['chan_l'].shift(1), xdf['open']], axis=1, keys=['upper','lower', 'chan_h', 'chan_l', 'xopen']).fillna(0) mdf = self.df.join(xdata, how = 'left').fillna(method='ffill') mdf['dt_signal'] = np.nan if self.price_mode == "HL": up_price = mdf['high'] dn_price = mdf['low'] elif self.price_mode == "TP": up_price = (mdf['high'] + mdf['low'] + mdf['close'])/3.0 dn_price = up_price elif self.price_mode == "CL": up_price = mdf['close'] dn_price = mdf['close'] else: print "unsupported price mode" mdf.ix[up_price >= mdf['upper'], 'dt_signal'] = 1 mdf.ix[dn_price <= mdf['lower'], 'dt_signal'] = -1 if self.close_daily: mdf.ix[ mdf['min_id'] >= self.exit_min, 'dt_signal'] = 0 addon_signal = copy.deepcopy(mdf['dt_signal']) mdf['dt_signal'] = mdf['dt_signal'].fillna(method='ffill').fillna(0) mdf['chan_sig'] = np.nan if self.combo_signal: mdf.ix[(up_price >= mdf['chan_h']) & (addon_signal > 0), 'chan_sig'] = 1 mdf.ix[(dn_price <= mdf['chan_l']) & (addon_signal < 0), 'chan_sig'] = -1 else: mdf.ix[(mdf['high'] >= mdf['chan_h']), 'chan_sig'] = 1 mdf.ix[(mdf['low'] <= mdf['chan_l']), 'chan_sig'] = -1 mdf['chan_sig'] = mdf['chan_sig'].fillna(method='ffill').fillna(0) pos = mdf['dt_signal'] * (self.unit[0] + (mdf['chan_sig'] * mdf['dt_signal'] > 0) * self.unit[1]) mdf['pos'] = pos.shift(1).fillna(0) mdf.ix[-3:, 'pos'] = 0 mdf['cost'] = abs(mdf['pos'] - mdf['pos'].shift(1)) * (self.offset + mdf['open'] * self.tcost) mdf['cost'] = mdf['cost'].fillna(0.0) mdf['traded_price'] = mdf['open'] self.closed_trades = simdf_to_trades1(mdf, slippage = self.offset ) return (mdf, self.closed_trades)
def getKDJ(close, high, low): ''' calculate KDJ value :param DataFrame close:close price :param DataFrame high:highest price of a day :param DataFrame low: lowest price of a day :return: [DataFrame,DataFrame,DataFrame,DataFrame] [RSV, K, D, KDJ]:KDJ value and some subproducts ''' # interval over which KDJ is calculated kdj_interval = 9 N = 3 # calculate RSV # get the close value to be used close = pd.DataFrame(close.iloc[(kdj_interval - 1):, :].values) # calculate maximum in (kdj_interval) days in high value high_max_in_interval = pd.rolling_max(high, kdj_interval) # rolling_sum function will set the first (kdj_interval-1) days as np.nan,drop them high_max_in_interval.dropna(inplace=True) # set index with 0,1,2...,otherwise it will be kdj_interval,kdj_interval+1,...(may not be explicit but fuck the index) high_max_in_interval.index = range(high_max_in_interval.shape[0]) low_min_in_interval = pd.rolling_min(low, kdj_interval) low_min_in_interval.dropna(inplace=True) low_min_in_interval.index = range(low_min_in_interval.shape[0]) # calculate RSV RSV = 100 * (close - low_min_in_interval) / (high_max_in_interval - low_min_in_interval) # replace np.nan and np.inf in RSV because there might be 0 in the denominator of the last formula RSV.replace([np.nan, np.inf,-np.inf], 0, inplace=True) # get matrix shape [row, col] = RSV.shape # calculate K # assuming N equals n in the formula # initialize both N and K with 50 K = pd.DataFrame(np.zeros([row, col])) D = pd.DataFrame(np.zeros([row, col])) K.iloc[0, :] = 50 * np.ones([1, col]) D.iloc[0, :] = 50 * np.ones([1, col]) # calculate K and D iteratively for i in range(1, row): K.iloc[i, :] = (RSV.iloc[i, :] + K.iloc[(i - 1), :]) / N D.iloc[i, :] = (K.iloc[i, :] - D.iloc[(i - 1), :]) / N KDJ = 3 * K - 2 * D return [RSV, K, D, KDJ]