【源码分享】基于PTrade的ETF轮动策略

今天开始陆续跟大家分享一些PTrade及QMT量化交易策略。第一弹:基于动量评分的ETF轮动策略,源码直接可用。
一、策略概述
核心思想:动量效应——“强者恒强”。近期表现好的资产,往往在一段时间内继续保持强势;表现差的资产,容易持续低迷。我们通过动量评分,选出当前最强的1只ETF并满仓持有。
二、资产池
4只ETF,覆盖四大方向,相关性低:
-
518880 黄金ETF(大宗商品)
-
513100 纳指100(美股科技)
-
159915 创业板ETF(A股成长)
-
510180 上证180ETF(A股蓝筹)
三、动量评分模型
-
取过去25日收盘价(前复权)
-
对数价格做线性回归:ln(price) = slope × day + intercept
-
计算年化收益率 = exp(slope)^250 − 1
-
计算拟合优度R²
-
动量得分 = 年化收益率 × R²
既看上涨速度,也看趋势稳定性,避免追高杀跌。
四、交易规则
-
集中持仓:只持有得分最高的1只ETF
-
全仓操作:每日14:50(实盘)/15:00(回测)计算得分并调仓
-
交易成本:ETF免印花税,佣金万二,最低5元
五、PTrade源码
"""ETF轮动策略 - PTRADE平台策略概述:基于动量评分的ETF轮动策略核心逻辑:1. ETF池包含5只不同类型的ETF(黄金、纳指、创业板、上证180、沪深300)2. 动量评分算法:使用线性回归计算年化收益率和R²,得分 = 年化收益率 × R²3. 每天选择动量得分最高的1只ETF持有,卖出其他4. 日线策略:回测15:00运行,实盘14:50运行"""import numpy as npimport pandas as pdimport mathimport picklefrom datetime import datetime# ========== 辅助函数 ==========def normalize_security_code(security):"""标准化股票代码尾缀支持两种格式:- .SS/.SZ (两位尾缀)- .XSHG/.XSHE (四位尾缀)统一转换为 .SS/.SZ 格式"""if not security:return security# 将 .XSHG 转为 .SS,.XSHE 转为 .SZif security.endswith('.XSHG'):return security.replace('.XSHG', '.SS')elif security.endswith('.XSHE'):return security.replace('.XSHE', '.SZ')# 已经是 .SS 或 .SZ 格式,直接返回return securitydef normalize_security_list(security_list):"""标准化股票代码列表"""return [normalize_security_code(sec) for sec in security_list]# ========== 策略函数 ==========def initialize(context):"""策略初始化"""log.info('=' * 60)log.info('ETF轮动策略启动')log.info('=' * 60)# 文件保存路径g.notebook_path = get_research_path()# ========== 策略参数配置 ==========# 动量参考天数g.m_days = 25# 买入模式:全仓操作(使用全部可用资金)# g.buy_money = 100000 # 已废弃:改为全仓模式# ETF池(支持 .SS/.SZ 或 .XSHG/.XSHE 尾缀)etf_pool_raw = ['518880.SS', # 黄金ETF(大宗商品)'513100.SS', # 纳指100(海外资产)'159915.SZ', # 创业板ETF(成长股、科技股、中小盘)'510180.SS', # 上证180ETF(价值股、蓝筹股、中大盘)]# 标准化股票代码(统一转为 .SS/.SZ 格式)g.etf_pool = normalize_security_list(etf_pool_raw)# 目标持仓数量(只持有动量最高的N只)g.target_num = 1# 交易状态(存储持仓信息)# 回测模式下使用空字典,实盘模式下从文件加载g.is_backtest = not is_trade()g.trade_status = {} if g.is_backtest else load_trade_status()set_commission(commission_ratio =0.0002, min_commission=5.0, type='ETF')set_commission(commission_ratio =0.0002, min_commission=5.0, type='STOCK')log.info('参数配置完成')log.info('运行模式: {}'.format('回测' if g.is_backtest else '实盘'))log.info('ETF池: {}'.format(', '.join(g.etf_pool)))log.info('动量天数: {},目标持仓数: {},买入模式: 全仓'.format(g.m_days, g.target_num))if not g.is_backtest:log.info('初始交易状态: {}'.format(g.trade_status))log.info('=' * 60)def before_trading_start(context, data):"""盘前处理"""log.info('=' * 60)log.info('盘前准备开始 {}'.format(context.current_dt.date()))log.info('=' * 60)def handle_data(context, data):"""主处理函数(日线级)日线策略说明:- 回测模式:每天15:00运行- 实盘模式:每天14:50运行- 系统自动控制运行时间,无需手动判断"""execute_trade(context)def after_trading_end(context, data):"""盘后处理"""log.info('=' * 60)log.info('盘后处理开始 {}'.format(context.current_dt.date()))# 保存交易状态(仅实盘模式)if not g.is_backtest:save_trade_status()# 统计持仓positions = get_positions()log.info('当前持仓数量: {}'.format(len(positions)))for stock, pos in positions.items():# 显示标准化后的代码,方便阅读stock_normalized = normalize_security_code(stock)market_value = pos.amount * pos.last_sale_pricelog.info(' {}: {}股,成本 {:.2f},市值 {:.2f}'.format(stock_normalized, pos.amount, pos.cost_basis, market_value))# 统计账户total_value = context.portfolio.total_valuecash = context.portfolio.cashlog.info('账户总资产: {:.2f}元,可用资金: {:.2f}元'.format(total_value, cash))log.info('盘后处理完成')log.info('=' * 60)# ========== 核心交易逻辑 ==========def get_momentum_rank(context, etf_pool):"""计算ETF池中所有ETF的动量得分并排序算法:1. 获取过去m_days天的收盘价2. 对收盘价取对数3. 线性回归拟合:log(price) = slope * day + intercept4. 计算年化收益率 = exp(slope)^250 - 15. 计算R²(拟合优度)6. 最终得分 = 年化收益率 × R²返回:按得分降序排列的ETF列表"""score_list = []current_date = context.current_dt.date()for etf in etf_pool:try:# 标准化股票代码etf_normalized = normalize_security_code(etf)# 获取历史数据(最近m_days天,使用前复权,包括当天)df = get_history(g.m_days,'1d','close',etf_normalized,fq='pre',include=True)if df is None or len(df) < g.m_days:log.info('{} 数据不足,跳过'.format(etf))score_list.append(-999999) # 给一个很低的分数continuelog.info('[{}] 获取到的行情数据:\n{}'.format(current_date, df))# 对收盘价取对数y = np.log(df['close'].values)# 创建天数序列(0, 1, 2, ..., m_days-1)x = np.arange(len(y))# 线性回归拟合slope, intercept = np.polyfit(x, y, 1)# 计算年化收益率annualized_returns = math.pow(math.exp(slope), 250) - 1# 计算R²(拟合优度)y_pred = slope * x + interceptss_res = np.sum((y - y_pred) ** 2)ss_tot = (len(y) - 1) * np.var(y, ddof=1)r_squared = 1 - (ss_res / ss_tot)# 最终得分 = 年化收益率 × R²score = annualized_returns * r_squaredscore_list.append(score)log.info('{}: 年化收益率={:.2%}, R²={:.4f}, 得分={:.4f}'.format(etf, annualized_returns, r_squared, score))except Exception as e:log.info('{} 计算动量得分失败: {}'.format(etf, str(e)))score_list.append(-999999)# 创建DataFrame并排序df_score = pd.DataFrame(index=etf_pool, data={'score': score_list})df_score = df_score.sort_values(by='score', ascending=False)rank_list = list(df_score.index)log.info('动量排名: {}'.format(rank_list))return rank_listdef execute_trade(context):"""执行调仓交易"""log.info('-' * 60)log.info('开始执行调仓 {}'.format(context.current_dt))# 获取动量最高的N只ETF(标准化格式)target_list = get_momentum_rank(context, g.etf_pool)[:g.target_num]log.info('目标持仓: {}'.format(target_list))# 获取当前持仓(建立原始代码到标准化代码的映射)positions = get_positions()# 创建双向映射:标准化代码 <-> 原始代码normalized_to_original = {}for original_code in positions.keys():normalized_code = normalize_security_code(original_code)normalized_to_original[normalized_code] = original_codehold_list = list(normalized_to_original.keys())log.info('当前持仓: {}'.format(hold_list))# ===== 步骤1:卖出不在目标列表中的持仓 =====for etf_normalized in hold_list:if etf_normalized not in target_list:# 使用原始代码调用APIetf_original = normalized_to_original[etf_normalized]position = get_position(etf_original)if position and position.amount > 0:log.info('卖出 {},数量 {}股'.format(etf_normalized, position.amount))order(etf_original, -position.amount)# 更新交易状态(仅实盘模式,使用标准化代码)if not g.is_backtest and etf_normalized in g.trade_status:g.trade_status[etf_normalized]['status'] = 1 # 标记为已卖出g.trade_status[etf_normalized]['nums'] = 0else:log.info('持有不动: {}'.format(etf_normalized))# ===== 步骤2:买入目标ETF =====# 如果当前持仓数量 < 目标数量,则买入if len(hold_list) < g.target_num:# 计算每只ETF的买入金额(全仓模式:使用全部可用资金)cash = context.portfolio.cashbuy_count = g.target_num - len(hold_list)value_per_etf = cash / buy_countfor etf_normalized in target_list:# 如果当前未持有该ETF,则买入# 优先使用原始代码(如果存在映射),否则使用标准化代码etf_to_use = normalized_to_original.get(etf_normalized, etf_normalized)position = get_position(etf_to_use)if position is None or position.amount == 0:log.info('买入 {},金额 {:.2f}元'.format(etf_normalized, value_per_etf))order_value(etf_to_use, value_per_etf)# 更新交易状态(仅实盘模式,使用标准化代码)if not g.is_backtest:if etf_normalized not in g.trade_status:g.trade_status[etf_normalized] = {}g.trade_status[etf_normalized]['status'] = 0 # 标记为持有中log.info('调仓完成')log.info('-' * 60)# ========== 交易状态管理 ==========def get_state_filename():"""获取状态文件名(完整路径)"""return g.notebook_path + 'ETF_rotation_trade_status.pkl'def save_trade_status():"""使用pickle保存交易状态"""filename = get_state_filename()# 清理已卖出的记录keys_to_delete = []for stk in g.trade_status.keys():if g.trade_status[stk].get('status', 0) == 1:keys_to_delete.append(stk)for stk in keys_to_delete:del g.trade_status[stk]try:with open(filename, 'wb') as f:pickle.dump(g.trade_status, f, -1)log.info('交易状态已保存到 {}'.format(filename))except Exception as e:log.info('保存交易状态失败: {}'.format(str(e)))def load_trade_status():"""从pickle文件加载交易状态"""filename = get_state_filename()try:with open(filename, 'rb') as f:data = pickle.load(f)log.info('从文件加载交易状态: {}'.format(filename))return dataexcept:log.info('交易状态文件不存在或加载失败,使用空状态')return {}
六、效果验证:回测结果
把生成的代码复制到PTrade里回测,可以看到,回测6年(2020年1月1日至2025年12月31日)下来,年化收益达到29%。

七、风险提示
本策略仅供学习参考,不构成投资建议。市场有风险,回测收益不代表未来表现,实盘需谨慎。
以上策略内容主要来自于粉丝投稿,由我通过PTrade软件实现,如有侵权,还望联系!
夜雨聆风
