动量选股策略源码及回测数据



def calculate_momentum(df):# 计算动量、RSRS斜率、R2mon_window = 20rsrs_window_N = 18rsrs_window_M = 600df[f"mon{mon_window}"] = df[COL_CLOSE].pct_change(mon_window)rsrs_slopes = []rsrs_r2 = []for i in range(len(df)):if i < rsrs_window_N:rsrs_slopes.append(np.nan)rsrs_r2.append(np.nan)continue# 获取最近N天的最高价和最低价high_prices = df[COL_HIGH].iloc[i-rsrs_window_N+1:i+1].valueslow_prices = df[COL_LOW].iloc[i-rsrs_window_N+1:i+1].values# 线性回归计算斜率if len(high_prices) == rsrs_window_N and len(low_prices) == rsrs_window_N:X = low_prices.reshape(-1, 1)y = high_pricestry:model = LinearRegression()model.fit(X, y)beta = model.coef_[0]r2 = model.score(X, y)rsrs_slopes.append(beta)rsrs_r2.append(r2)except:rsrs_slopes.append(np.nan)rsrs_r2.append(np.nan)logger.info(f"{df['ts_code'].iloc[i]}{df['trade_date'].iloc[i]}: rsrs计算报错。")else:rsrs_slopes.append(np.nan)df['rsrs_slope'] = rsrs_slopesdf['rsrs_r2'] = rsrs_r2# 计算标准分df['rsrs_mean'] = df['rsrs_slope'].rolling(window=rsrs_window_M, min_periods=60).mean()# 计算RSRS斜率的M日滚动标准差,衡量RSRS斜率的波动性/离散程度df['rsrs_std'] = df['rsrs_slope'].rolling(window=rsrs_window_M, min_periods=60).std()# Z=(当前值-均值)/标准差,将原始RSRS斜率转换为标准正态分布分数df['rsrs_std_score'] = (df['rsrs_slope'] - df['rsrs_mean']) / df['rsrs_std']return df
def _select_stocks(self, prev_date, current_date):# prev_date : datetime 前一日日期# current_date : datetime 当日日期(用于检查可交易性)candidate_stocks = []for stock_code, df in self.data.items():prev_row = df.loc[prev_date]current_row = df.loc[current_date]# 获取指标rsrs_score = prev_row.get('rsrs_std_score')r2 = prev_row.get('rsrs_r2')momentum = prev_row.get('mon20')current_open = current_row.get('open')# 检查条件:指标有效、RSRS标准分大于买入阈值、R²大于0.2if (pd.notna(rsrs_score) and pd.notna(r2) and pd.notna(momentum) and pd.notna(current_open)):if (rsrs_score > self.buy_threshold and r2 > 0.2):# 计算综合得分:动量 × R²composite_score = momentum * r2candidate_stocks.append((stock_code, composite_score))# 按综合得分排序(降序)candidate_stocks.sort(key=lambda x: x[1], reverse=True)# 返回前self.stock_num个selected_stocks = [stock_code for stock_code, score in candidate_stocks[:self.stock_num]]return selected_stocks
夜雨聆风
