乐于分享
好东西不私藏

【源码分享】基于PTrade的ETF轮动策略

【源码分享】基于PTrade的ETF轮动策略

今天开始陆续跟大家分享一些PTrade及QMT量化交易策略。第一弹:基于动量评分的ETF轮动策略,源码直接可用。

一、策略概述

核心思想:动量效应——“强者恒强”。近期表现好的资产,往往在一段时间内继续保持强势;表现差的资产,容易持续低迷。我们通过动量评分,选出当前最强的1只ETF并满仓持有。

二、资产池

4只ETF,覆盖四大方向,相关性低:

  • 518880 黄金ETF(大宗商品)

  • 513100 纳指100(美股科技)

  • 159915 创业板ETF(A股成长)

  • 510180 上证180ETF(A股蓝筹)

三、动量评分模型

  1. 取过去25日收盘价(前复权)

  2. 对数价格做线性回归:ln(price) = slope × day + intercept

  3. 计算年化收益率 = exp(slope)^250 − 1

  4. 计算拟合优度R²

  5. 动量得分 = 年化收益率 × R²

    既看上涨速度,也看趋势稳定性,避免追高杀跌。

四、交易规则

  • 集中持仓:只持有得分最高的1只ETF

  • 全仓操作:每日14:50(实盘)/15:00(回测)计算得分并调仓

  • 交易成本:ETF免印花税,佣金万二,最低5元

五、PTrade源码

"""ETF轮动策略 - PTRADE平台策略概述:基于动量评分的ETF轮动策略核心逻辑:1. ETF池包含5只不同类型的ETF(黄金、纳指、创业板、上证180、沪深300)2. 动量评分算法:使用线性回归计算年化收益率和R²,得分 = 年化收益率 × R²3. 每天选择动量得分最高的1只ETF持有,卖出其他4. 日线策略:回测15:00运行,实盘14:50运行"""import numpy as npimport pandas as pdimport mathimport picklefrom datetime import datetime# ========== 辅助函数 ==========def normalize_security_code(security):    """    标准化股票代码尾缀    支持两种格式:    - .SS/.SZ (两位尾缀)    - .XSHG/.XSHE (四位尾缀)    统一转换为 .SS/.SZ 格式    """    if not security:        return security    # 将 .XSHG 转为 .SS,.XSHE 转为 .SZ    if security.endswith('.XSHG'):        return security.replace('.XSHG''.SS')    elif security.endswith('.XSHE'):        return security.replace('.XSHE''.SZ')    # 已经是 .SS 或 .SZ 格式,直接返回    return securitydef normalize_security_list(security_list):    """    标准化股票代码列表    """    return [normalize_security_code(sec) for sec in security_list]# ========== 策略函数 ==========def initialize(context):    """策略初始化"""    log.info('=' * 60)    log.info('ETF轮动策略启动')    log.info('=' * 60)    # 文件保存路径    g.notebook_path = get_research_path()    # ========== 策略参数配置 ==========    # 动量参考天数    g.m_days = 25    # 买入模式:全仓操作(使用全部可用资金)    # g.buy_money = 100000  # 已废弃:改为全仓模式    # ETF池(支持 .SS/.SZ 或 .XSHG/.XSHE 尾缀)    etf_pool_raw = [        '518880.SS',  # 黄金ETF(大宗商品)        '513100.SS',  # 纳指100(海外资产)        '159915.SZ',  # 创业板ETF(成长股、科技股、中小盘)        '510180.SS',  # 上证180ETF(价值股、蓝筹股、中大盘)    ]    # 标准化股票代码(统一转为 .SS/.SZ 格式)    g.etf_pool = normalize_security_list(etf_pool_raw)    # 目标持仓数量(只持有动量最高的N只)    g.target_num = 1    # 交易状态(存储持仓信息)    # 回测模式下使用空字典,实盘模式下从文件加载    g.is_backtest = not is_trade()    g.trade_status = {} if g.is_backtest else load_trade_status()    set_commission(commission_ratio =0.0002, min_commission=5.0type='ETF')    set_commission(commission_ratio =0.0002, min_commission=5.0type='STOCK')    log.info('参数配置完成')    log.info('运行模式: {}'.format('回测' if g.is_backtest else '实盘'))    log.info('ETF池: {}'.format(', '.join(g.etf_pool)))    log.info('动量天数: {},目标持仓数: {},买入模式: 全仓'.format(        g.m_days, g.target_num))    if not g.is_backtest:        log.info('初始交易状态: {}'.format(g.trade_status))    log.info('=' * 60)def before_trading_start(context, data):    """盘前处理"""    log.info('=' * 60)    log.info('盘前准备开始 {}'.format(context.current_dt.date()))    log.info('=' * 60)def handle_data(context, data):    """主处理函数(日线级)    日线策略说明:    - 回测模式:每天15:00运行    - 实盘模式:每天14:50运行    - 系统自动控制运行时间,无需手动判断    """    execute_trade(context)def after_trading_end(context, data):    """盘后处理"""    log.info('=' * 60)    log.info('盘后处理开始 {}'.format(context.current_dt.date()))    # 保存交易状态(仅实盘模式)    if not g.is_backtest:        save_trade_status()    # 统计持仓    positions = get_positions()    log.info('当前持仓数量: {}'.format(len(positions)))    for stock, pos in positions.items():        # 显示标准化后的代码,方便阅读        stock_normalized = normalize_security_code(stock)        market_value = pos.amount * pos.last_sale_price        log.info('  {}: {}股,成本 {:.2f},市值 {:.2f}'.format(            stock_normalized, pos.amount, pos.cost_basis, market_value))    # 统计账户    total_value = context.portfolio.total_value    cash = context.portfolio.cash    log.info('账户总资产: {:.2f}元,可用资金: {:.2f}元'.format(total_value, cash))    log.info('盘后处理完成')    log.info('=' * 60)# ========== 核心交易逻辑 ==========def get_momentum_rank(context, etf_pool):    """    计算ETF池中所有ETF的动量得分并排序    算法:    1. 获取过去m_days天的收盘价    2. 对收盘价取对数    3. 线性回归拟合:log(price) = slope * day + intercept    4. 计算年化收益率 = exp(slope)^250 - 1    5. 计算R²(拟合优度)    6. 最终得分 = 年化收益率 × R²    返回:按得分降序排列的ETF列表    """    score_list = []    current_date = context.current_dt.date()    for etf in etf_pool:        try:            # 标准化股票代码            etf_normalized = normalize_security_code(etf)            # 获取历史数据(最近m_days天,使用前复权,包括当天)            df = get_history(                g.m_days,                '1d',                'close',                etf_normalized,                fq='pre',                include=True            )            if df is None or len(df) < g.m_days:                log.info('{} 数据不足,跳过'.format(etf))                score_list.append(-999999)  # 给一个很低的分数                continue            log.info('[{}] 获取到的行情数据:\n{}'.format(current_date, df))            # 对收盘价取对数            y = np.log(df['close'].values)            # 创建天数序列(0, 1, 2, ..., m_days-1)            x = np.arange(len(y))            # 线性回归拟合            slope, intercept = np.polyfit(x, y, 1)            # 计算年化收益率            annualized_returns = math.pow(math.exp(slope), 250) - 1            # 计算R²(拟合优度)            y_pred = slope * x + intercept            ss_res = np.sum((y - y_pred) ** 2)            ss_tot = (len(y) - 1) * np.var(y, ddof=1)            r_squared = 1 - (ss_res / ss_tot)            # 最终得分 = 年化收益率 × R²            score = annualized_returns * r_squared            score_list.append(score)            log.info('{}: 年化收益率={:.2%}, R²={:.4f}, 得分={:.4f}'.format(                etf, annualized_returns, r_squared, score))        except Exception as e:            log.info('{} 计算动量得分失败: {}'.format(etf, str(e)))            score_list.append(-999999)    # 创建DataFrame并排序    df_score = pd.DataFrame(index=etf_pool, data={'score': score_list})    df_score = df_score.sort_values(by='score', ascending=False)    rank_list = list(df_score.index)    log.info('动量排名: {}'.format(rank_list))    return rank_listdef execute_trade(context):    """执行调仓交易"""    log.info('-' * 60)    log.info('开始执行调仓 {}'.format(context.current_dt))    # 获取动量最高的N只ETF(标准化格式)    target_list = get_momentum_rank(context, g.etf_pool)[:g.target_num]    log.info('目标持仓: {}'.format(target_list))    # 获取当前持仓(建立原始代码到标准化代码的映射)    positions = get_positions()    # 创建双向映射:标准化代码 <-> 原始代码    normalized_to_original = {}    for original_code in positions.keys():        normalized_code = normalize_security_code(original_code)        normalized_to_original[normalized_code] = original_code    hold_list = list(normalized_to_original.keys())    log.info('当前持仓: {}'.format(hold_list))    # ===== 步骤1:卖出不在目标列表中的持仓 =====    for etf_normalized in hold_list:        if etf_normalized not in target_list:            # 使用原始代码调用API            etf_original = normalized_to_original[etf_normalized]            position = get_position(etf_original)            if position and position.amount > 0:                log.info('卖出 {},数量 {}股'.format(etf_normalized, position.amount))                order(etf_original, -position.amount)                # 更新交易状态(仅实盘模式,使用标准化代码)                if not g.is_backtest and etf_normalized in g.trade_status:                    g.trade_status[etf_normalized]['status'] = 1  # 标记为已卖出                    g.trade_status[etf_normalized]['nums'] = 0        else:            log.info('持有不动: {}'.format(etf_normalized))    # ===== 步骤2:买入目标ETF =====    # 如果当前持仓数量 < 目标数量,则买入    if len(hold_list) < g.target_num:        # 计算每只ETF的买入金额(全仓模式:使用全部可用资金)        cash = context.portfolio.cash        buy_count = g.target_num - len(hold_list)        value_per_etf = cash / buy_count        for etf_normalized in target_list:            # 如果当前未持有该ETF,则买入            # 优先使用原始代码(如果存在映射),否则使用标准化代码            etf_to_use = normalized_to_original.get(etf_normalized, etf_normalized)            position = get_position(etf_to_use)            if position is None or position.amount == 0:                log.info('买入 {},金额 {:.2f}元'.format(etf_normalized, value_per_etf))                order_value(etf_to_use, value_per_etf)                # 更新交易状态(仅实盘模式,使用标准化代码)                if not g.is_backtest:                    if etf_normalized not in g.trade_status:                        g.trade_status[etf_normalized] = {}                    g.trade_status[etf_normalized]['status'] = 0  # 标记为持有中    log.info('调仓完成')    log.info('-' * 60)# ========== 交易状态管理 ==========def get_state_filename():    """获取状态文件名(完整路径)"""    return g.notebook_path + 'ETF_rotation_trade_status.pkl'def save_trade_status():    """使用pickle保存交易状态"""    filename = get_state_filename()    # 清理已卖出的记录    keys_to_delete = []    for stk in g.trade_status.keys():        if g.trade_status[stk].get('status'0) == 1:            keys_to_delete.append(stk)    for stk in keys_to_delete:        del g.trade_status[stk]    try:        with open(filename, 'wb'as f:            pickle.dump(g.trade_status, f, -1)        log.info('交易状态已保存到 {}'.format(filename))    except Exception as e:        log.info('保存交易状态失败: {}'.format(str(e)))def load_trade_status():    """从pickle文件加载交易状态"""    filename = get_state_filename()    try:        with open(filename, 'rb'as f:            data = pickle.load(f)        log.info('从文件加载交易状态: {}'.format(filename))        return data    except:        log.info('交易状态文件不存在或加载失败,使用空状态')        return {}

六、效果验证:回测结果

把生成的代码复制到PTrade里回测,可以看到,回测6年(2020年1月1日至2025年12月31日)下来,年化收益达到29%

七、风险提示

本策略仅供学习参考,不构成投资建议。市场有风险,回测收益不代表未来表现,实盘需谨慎。

以上策略内容主要来自于粉丝投稿,由我通过PTrade软件实现,如有侵权,还望联系!

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » 【源码分享】基于PTrade的ETF轮动策略

评论 抢沙发

6 + 1 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮