乐于分享
好东西不私藏

趋势增强策略(附带PY源码)

趋势增强策略(附带PY源码)

附带PY源码:
"""策略名称:趋势过滤策略 (ADX 增强版)策略描述:双均线趋势过滤 + 支撑压力位 + 突破加仓 + ADX 趋势强度开关策略核心逻辑:1. 双均线系统:主趋势均线 (26) + 触发均线 (5)2. ADX 趋势过滤:ADX<25 震荡市禁止开仓,ADX>=25 趋势市允许交易3. 支撑压力位:前高前低 + 交易密集区4. 突破加仓:盈利达到 ATR 倍数后突破压力位可加仓5. 趋势反转:主趋势反转时强制平仓6. 价格修正:针对 next_bar_open 订单,在下一根 K 线确认实际成交价"""import pandas as pdimport numpy as npfrom ssquant.api.strategy_api import StrategyAPIfrom ssquant.backtest.unified_runner import UnifiedStrategyRunner, RunModefrom ssquant.config.trading_config import get_config# ============== 全局状态变量 ==============# 持仓状态g_entry_price = 0              # 入场价格 (实际成交价)g_avg_entry_price = 0          # 平均入场价格(加仓后)g_entry_bar = 0                # 入场 K 线索引g_add_position_count = 0       # 加仓次数# 待确认订单状态 (用于处理 next_bar_open 的价格延迟)g_pending_action = 0           # 0=无,1=开多,-1=开空,2=加多,-2=加空g_pending_old_pos = 0          # 挂单时的旧持仓g_pending_add_lots = 0         # 挂单时的计划加仓手数# 加仓控制g_max_add_times = 2            # 最大加仓次数g_profit_for_add = 1           # 盈利多少倍 ATR 后允许加仓# 到期控制g_expire_date = 20260630       # 到期日期 YYYYMMDDg_enable_expire = False        # 是否启用到期控制# 日志控制g_last_log_bar = -100          # 上次日志输出的 K 线索引# ============== 技术指标计算函数 ==============def calculate_atr(high, low, close, period=20):    """计算 ATR (平均真实波幅)"""    tr1 = high - low    tr2 = abs(high - close.shift(1))    tr3 = abs(low - close.shift(1))    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)    return tr.rolling(period).mean()def calculate_adx(high, low, close, period=14):    """    计算 ADX (平均趋向指数) - 标准 Wilder's Smoothing 实现    - ADX 只衡量趋势强弱,不区分多空方向    - ADX < 25: 震荡市 (禁止开仓)    - ADX >= 25: 趋势市 (允许开仓)    """    # 1. 计算 +DM 和 -DM    high_diff = high.diff()    low_diff = -low.diff()    # 原始 DM    plus_dm_raw = np.where((high_diff > low_diff) & (high_diff > 0), high_diff, 0)    minus_dm_raw = np.where((low_diff > high_diff) & (low_diff > 0), low_diff, 0)    # 2. 计算 TR    tr1 = high - low    tr2 = abs(high - close.shift(1))    tr3 = abs(low - close.shift(1))    tr_raw = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)    # 3. 平滑处理 (使用 Rolling Mean 近似 SMA 平滑)    tr_smooth = tr_raw.rolling(period).mean()    plus_dm_smooth = pd.Series(plus_dm_raw).rolling(period).mean()    minus_dm_smooth = pd.Series(minus_dm_raw).rolling(period).mean()    # 4. 计算 +DI 和 -DI (防止除以零)    plus_di = pd.Series(np.zeros(len(close)))    minus_di = pd.Series(np.zeros(len(close)))    mask = tr_smooth > 0    plus_di[mask] = 100 * plus_dm_smooth[mask] / tr_smooth[mask]    minus_di[mask] = 100 * minus_dm_smooth[mask] / tr_smooth[mask]    # 5. 计算 DX 和 ADX    di_sum = plus_di + minus_di    di_diff = abs(plus_di - minus_di)    dx = pd.Series(np.zeros(len(close)))    mask_sum = di_sum > 0    dx[mask_sum] = 100 * di_diff[mask_sum] / di_sum[mask_sum]    adx = dx.rolling(period).mean()    return adx, plus_di, minus_didef calculate_donchian(high, low, period=20):    """计算唐奇安通道 (前高前低)"""    upper = high.rolling(period).max()    lower = low.rolling(period).min()    return upper, lowerdef calculate_density_zone(high, low, close, atr, period=30):    """    计算交易密集区    使用均线作为中心,ATR 作为宽度    """    density_mid = close.rolling(period).mean()    density_high = density_mid + atr * 2    density_low = density_mid - atr * 2    return density_high, density_low# ============== 策略初始化函数 ==============def initialize(api: StrategyAPI):    """    策略初始化函数,在策略开始前调用一次    """    api.log("=" * 60)    api.log("MA 趋势过滤策略 (ADX 增强版) 已启动")    api.log("=" * 60)    # 获取并打印参数    length_trend = api.get_param('length_trend'26)    length_trigger = api.get_param('length_trigger'5)    lookback_highlow = api.get_param('lookback_highlow'20)    density_zone_period = api.get_param('density_zone_period'30)    enable_add_position = api.get_param('enable_add_position'1)    max_add_times = api.get_param('max_add_times'2)    add_position_ratio = api.get_param('add_position_ratio'0.5)    profit_for_add = api.get_param('profit_for_add'1)    adx_threshold = api.get_param('adx_threshold'25)    enable_expire = api.get_param('enable_expire'0)    expire_year = api.get_param('expire_year'2026)    expire_month = api.get_param('expire_month'6)    expire_day = api.get_param('expire_day'30)    api.log(f"主趋势均线周期:{length_trend}")    api.log(f"触发均线周期:{length_trigger}")    api.log(f"前高前低周期:{lookback_highlow}")    api.log(f"密集区周期:{density_zone_period}")    api.log(f"ADX 趋势阈值:{adx_threshold} (<25 震荡禁止开仓)")    api.log(f"突破加仓:{'启用'if enable_add_position == 1else'关闭'}")    api.log(f"最大加仓次数:{max_add_times}")    api.log(f"加仓手数比例:{add_position_ratio * 100:.0f}%")    api.log(f"盈利 ATR 倍数要求:{profit_for_add}")    api.log(f"到期时间:{expire_year}-{expire_month:02d}-{expire_day:02d}")    api.log(f"到期控制:{'启用'if enable_expire == 1else'关闭'}")    api.log("=" * 60)    # 设置全局变量    global g_max_add_times, g_profit_for_add, g_expire_date, g_enable_expire    g_max_add_times = max_add_times    g_profit_for_add = profit_for_add    g_expire_date = expire_year * 10000 + expire_month * 100 + expire_day    g_enable_expire = (enable_expire == 1)# ============== 策略主函数 ==============def strategy(api: StrategyAPI):    """    策略主函数    - 每根 K 线完成时调用    """    global g_entry_price, g_avg_entry_price, g_entry_bar    global g_add_position_count    global g_pending_action, g_pending_old_pos, g_pending_add_lots    global g_last_log_bar    # ========== 1. 获取参数 ==========    length_trend = api.get_param('length_trend'26)    length_trigger = api.get_param('length_trigger'5)    lookback_highlow = api.get_param('lookback_highlow'20)    density_zone_period = api.get_param('density_zone_period'30)    enable_add_position = api.get_param('enable_add_position'1)    add_position_ratio = api.get_param('add_position_ratio'0.5)    adx_threshold = api.get_param('adx_threshold'25)    enable_expire = api.get_param('enable_expire'0)    # ========== 2. 数据检查 ==========    min_bars = max(length_trend, density_zone_period, 14) + 10  # ADX 需要 14+period    if api.get_idx() < min_bars:        return    # ========== 3. 获取 K 线数据 ==========    close = api.get_close()    high = api.get_high()    low = api.get_low()    open_price = api.get_open()    if close is None or len(close) < min_bars:        return    current_price = close.iloc[-1]    current_open = open_price.iloc[-1]  # 当前 K 线开盘价 (即上一根 K 线 next_bar_open 的实际成交价)    current_dt = api.get_datetime()    # ========== 3.5 处理上一根 K 线的挂单成交 (核心修复) ==========    # 如果是 next_bar_open 下单,实际成交价是当前 K 线的开盘价    # 在此处更新全局状态,确保后续盈亏计算基于真实成本    if g_pending_action != 0:        pos = api.get_pos()        if g_pending_action == 1:  # 开多成交            g_entry_price = current_open            g_avg_entry_price = current_open            g_entry_bar = api.get_idx()            api.log(f"✅ 多头开仓成交 | 价格:{current_open:.2f}")        elif g_pending_action == -1:  # 开空成交            g_entry_price = current_open            g_avg_entry_price = current_open            g_entry_bar = api.get_idx()            api.log(f"✅ 空头开仓成交 | 价格:{current_open:.2f}")        elif g_pending_action == 2:  # 加多成交            if pos > 0 and g_pending_old_pos > 0:                # 准确计算新均价:(旧总成本 + 新成本) / 新总手数                g_avg_entry_price = (g_avg_entry_price * g_pending_old_pos + current_open * g_pending_add_lots) / pos                api.log(f"✅ 多头加仓成交 | 价格:{current_open:.2f} | 新均价:{g_avg_entry_price:.2f}")        elif g_pending_action == -2:  # 加空成交            if pos < 0 and g_pending_old_pos < 0:                # 注意 pos 和 old_pos 都是负数,计算时取绝对值                g_avg_entry_price = (g_avg_entry_price * abs(g_pending_old_pos) + current_open * g_pending_add_lots) / abs(pos)                api.log(f"✅ 空头加仓成交 | 价格:{current_open:.2f} | 新均价:{g_avg_entry_price:.2f}")        # 清空 pending 状态        g_pending_action = 0        g_pending_old_pos = 0        g_pending_add_lots = 0    # ========== 4. 检查到期时间 ==========    if enable_expire == 1 and current_dt is not None:        current_date = current_dt.year * 10000 + current_dt.month * 100 + current_dt.day        if current_date >= g_expire_date:            pos = api.get_pos()            if pos > 0:                api.sell(order_type='next_bar_open', reason='策略到期强制平多')                api.log(f"⚠️ 策略已到期,强制平多仓")            elif pos < 0:                api.buycover(order_type='next_bar_open', reason='策略到期强制平空')                api.log(f"⚠️ 策略已到期,强制平空仓")            g_entry_price = 0            g_avg_entry_price = 0            g_add_position_count = 0            return    # ========== 5. 计算技术指标 ==========    # 5.1 均线    ma_trend = close.rolling(length_trend).mean()    ma_trigger = close.rolling(length_trigger).mean()    # 5.2 ATR    atr_value = calculate_atr(high, low, close, 20)    # 5.3 ADX (趋势强度指标)    adx, plus_di, minus_di = calculate_adx(high, low, close, 14)    # 5.4 支撑压力位    prev_high, prev_low = calculate_donchian(high, low, lookback_highlow)    density_high, density_low = calculate_density_zone(high, low, close, atr_value, density_zone_period)    # 检查 NaN 值    if pd.isna(ma_trend.iloc[-1]) or pd.isna(ma_trigger.iloc[-1]) or pd.isna(atr_value.iloc[-1]) or pd.isna(adx.iloc[-1]):        return    # ========== 6. 判断趋势状态 ==========    # 主趋势方向    is_long_trend = current_price > ma_trend.iloc[-1]    is_short_trend = current_price < ma_trend.iloc[-1]    # ADX 趋势强度过滤    adx_current = adx.iloc[-1]    adx_prev = adx.iloc[-2if len(adx) >= 2 else adx_current    adx_turning_up = adx_current > adx_prev    adx_turning_down = adx_current < adx_prev    # 趋势市判断 (ADX >= 阈值)    is_trend_market = adx_current >= adx_threshold    # ========== 7. 交叉信号 ==========    cross_up_trigger = (close.iloc[-2] <= ma_trigger.iloc[-2and                         close.iloc[-1] > ma_trigger.iloc[-1])    cross_down_trigger = (close.iloc[-2] >= ma_trigger.iloc[-2and                           close.iloc[-1] < ma_trigger.iloc[-1])    # ========== 8. 突破信号 ==========    break_high = current_price > prev_high.iloc[-2]    break_low = current_price < prev_low.iloc[-2]    break_density_high = current_price > density_high.iloc[-2]    break_density_low = current_price < density_low.iloc[-2]    # ========== 9. 持仓和盈亏计算 ==========    pos = api.get_pos()    # 计算当前盈利 ATR 倍数    profit_atr = 0    if pos > 0 and g_avg_entry_price > 0 and atr_value.iloc[-1] > 0:        profit_atr = (current_price - g_avg_entry_price) / atr_value.iloc[-1]    elif pos < 0 and g_avg_entry_price > 0 and atr_value.iloc[-1] > 0:        profit_atr = (g_avg_entry_price - current_price) / atr_value.iloc[-1]    # ========== 10. 资金管理 ==========    base_lots = 1    add_lots = max(1int(base_lots * add_position_ratio))    # ========== 11. 交易逻辑 ==========    # --- 开仓逻辑 (受 ADX 严格过滤) ---    # 只有当是趋势市 (ADX>=25) 时才允许开新仓    if is_trend_market:        # 多头开仓        if is_long_trend and cross_up_trigger and pos <= 0:            if pos < 0:                api.buycover(order_type='next_bar_open', reason='平空开多')            api.buy(volume=base_lots, order_type='next_bar_open', reason='多头趋势上穿触发均线')            # 标记 pending 状态,等待下一根 K 线确认价格            g_pending_action = 1            g_pending_old_pos = pos            g_pending_add_lots = base_lots            api.log(f"📈 发出多头开仓指令 | 信号价:{current_price:.2f} | ADX:{adx_current:.2f}")        # 空头开仓        elif is_short_trend and cross_down_trigger and pos >= 0:            if pos > 0:                api.sell(order_type='next_bar_open', reason='平多开空')            api.sellshort(volume=base_lots, order_type='next_bar_open', reason='空头趋势下穿触发均线')            # 标记 pending 状态            g_pending_action = -1            g_pending_old_pos = pos            g_pending_add_lots = base_lots            api.log(f"📉 发出空头开仓指令 | 信号价:{current_price:.2f} | ADX:{adx_current:.2f}")        # --- 加仓逻辑 (受 ADX 过滤) ---        elif enable_add_position == 1 and g_add_position_count < g_max_add_times:            # 多头加仓            if pos > 0:                if break_high and profit_atr >= g_profit_for_add:                    api.buy(volume=add_lots, order_type='next_bar_open', reason='突破前高加仓')                    g_pending_action = 2                    g_pending_old_pos = pos                    g_pending_add_lots = add_lots                    api.log(f"➕ 发出多头加仓指令 | 信号价:{current_price:.2f}")                elif break_density_high and profit_atr >= g_profit_for_add:                    api.buy(volume=add_lots, order_type='next_bar_open', reason='突破密集区加仓')                    g_pending_action = 2                    g_pending_old_pos = pos                    g_pending_add_lots = add_lots                    api.log(f"➕ 发出多头加仓指令 (密集区) | 信号价:{current_price:.2f}")            # 空头加仓            elif pos < 0:                if break_low and profit_atr >= g_profit_for_add:                    api.sellshort(volume=add_lots, order_type='next_bar_open', reason='突破前低加仓')                    g_pending_action = -2                    g_pending_old_pos = pos                    g_pending_add_lots = add_lots                    api.log(f"➕ 发出空头加仓指令 | 信号价:{current_price:.2f}")                elif break_density_low and profit_atr >= g_profit_for_add:                    api.sellshort(volume=add_lots, order_type='next_bar_open', reason='突破密集区加仓')                    g_pending_action = -2                    g_pending_old_pos = pos                    g_pending_add_lots = add_lots                    api.log(f"➕ 发出空头加仓指令 (密集区) | 信号价:{current_price:.2f}")    # --- 平仓逻辑 (不受 ADX 限制,只要持仓且触发信号即平仓) ---    # 修复:将平仓逻辑移出 ADX 过滤块,避免震荡市无法止损    # 多头平仓:下穿触发均线    if pos > 0 and cross_down_trigger and (api.get_idx() - g_entry_bar) >= 1:        api.sell(order_type='next_bar_open', reason='下穿触发均线平仓')        pnl = current_price - g_avg_entry_price        close_type = "止损" if pnl < 0 else "止盈"        api.log(f"📉 发出多头平仓指令 ({close_type}) | 均价:{g_avg_entry_price:.2f} | 信号价:{current_price:.2f}")        g_add_position_count = 0        # 注意:价格和状态将在成交后或反转时重置,这里先不重置以避免逻辑冲突    # 空头平仓:上穿触发均线    elif pos < 0 and cross_up_trigger and (api.get_idx() - g_entry_bar) >= 1:        api.buycover(order_type='next_bar_open', reason='上穿触发均线平仓')        pnl = g_avg_entry_price - current_price        close_type = "止损" if pnl < 0 else "止盈"        api.log(f"📈 发出空头平仓指令 ({close_type}) | 均价:{g_avg_entry_price:.2f} | 信号价:{current_price:.2f}")        g_add_position_count = 0    # --- 趋势反转强制平仓 (不受 ADX 限制) ---    # 多头持仓但主趋势转空    if pos > 0 and not is_long_trend:        api.sell(order_type='next_bar_open', reason='主趋势转空强制平仓')        pnl = current_price - g_avg_entry_price        close_type = "止损" if pnl < 0 else "止盈"        api.log(f"⚠️ 发出趋势反转强制平多指令 ({close_type}) | 均价:{g_avg_entry_price:.2f}")        g_entry_price = 0        g_avg_entry_price = 0        g_add_position_count = 0    # 空头持仓但主趋势转多    elif pos < 0 and not is_short_trend:        api.buycover(order_type='next_bar_open', reason='主趋势转多强制平仓')        pnl = g_avg_entry_price - current_price        close_type = "止损" if pnl < 0 else "止盈"        api.log(f"⚠️ 发出趋势反转强制平空指令 ({close_type}) | 均价:{g_avg_entry_price:.2f}")        g_entry_price = 0        g_avg_entry_price = 0        g_add_position_count = 0    # 如果当前无持仓,清理残留状态    if pos == 0 and g_avg_entry_price != 0 and g_pending_action == 0:        g_avg_entry_price = 0        g_entry_price = 0    # ========== 12. 定期状态输出 ==========    current_idx = api.get_idx()    if current_idx % 100 == 0 and current_idx != g_last_log_bar:        adx_status = "趋势市" if is_trend_market else "震荡市 (禁止开仓)"        adx_direction = "↗增强" if adx_turning_up else ("↘减弱" if adx_turning_down else "→持平")        api.log(f"[状态] K 线:{current_idx} | 趋势:{adx_status} | ADX:{adx_current:.2f}{adx_direction} | 持仓:{pos}")        g_last_log_bar = current_idx    # ========== 13. 图表注释 ==========    if current_idx % 10 == 0:        adx_status = "趋势市" if is_trend_market else "震荡市"        trend_dir = "多头" if is_long_trend else ("空头" if is_short_trend else "震荡")        api.log(f"[注释] 趋势:{trend_dir} | 市场:{adx_status} | ADX:{adx_current:.2f} | 持仓:{pos}手 | 加仓:{g_add_position_count}/{g_max_add_times}")# ============== 主函数 ==============if __name__ == "__main__":    # ========== 运行模式 ==========    RUN_MODE = RunMode.BACKTEST    # ========== 策略参数 ==========    strategy_params = {        # 均线参数        'length_trend'26,              # 主趋势均线周期        'length_trigger'5,             # 触发交易均线周期        # 支撑压力参数        'lookback_highlow'20,          # 前高前低回溯周期        'density_zone_period'30,       # 交易密集区统计周期        # 加仓参数        'enable_add_position'1,        # 是否启用突破加仓 (1=启用,0=关闭)        'max_add_times'2,              # 最大加仓次数        'add_position_ratio'0.5,       # 加仓手数比例 (0.5=半仓加仓)        'profit_for_add'1,             # 盈利多少倍 ATR 后允许加仓        # ADX 趋势过滤参数        'adx_threshold'25,             # ADX 趋势阈值 (<25 震荡,>=25 趋势)        # 到期时间设置        'enable_expire'0,              # 是否启用到期时间 (1=启用,0=不启用)        'expire_year'2026,             # 到期年份        'expire_month'6,               # 到期月份        'expire_day'30,                # 到期日期    }    # ========== 配置 ==========    if RUN_MODE == RunMode.BACKTEST:        config = get_config(RUN_MODE,            # -------- 合约与周期 --------            symbol='rb888',               # 主力连续合约            kline_period='5m',            # K 线周期            adjust_type='1',              # 后复权            # -------- 数据范围 --------            start_date='2025-01-01',            end_date='2025-12-31',            # -------- 回测参数 --------            initial_capital=100000,       # 初始资金            slippage_ticks=1,             # 滑点            # -------- 数据窗口 --------            lookback_bars=500,            debug=False,        )    elif RUN_MODE == RunMode.SIMNOW:        config = get_config(RUN_MODE,            account='simnow_default',            server_name='电信 1',            symbol='rb888',            kline_period='5m',            order_offset_ticks=5,            algo_trading=False,            preload_history=True,            history_lookback_bars=100,            adjust_type='1',            lookback_bars=500,            enable_tick_callback=False,        )    elif RUN_MODE == RunMode.REAL_TRADING:        config = get_config(RUN_MODE,            account='real_default',            symbol='rb888',            kline_period='5m',            order_offset_ticks=5,            algo_trading=True,            preload_history=True,            history_lookback_bars=100,            adjust_type='1',            lookback_bars=500,        )    # ========== 运行 ==========    print(f"\n运行模式:{RUN_MODE.value}")    print(f"合约代码:{config.get('symbol''N/A')}")    runner = UnifiedStrategyRunner(mode=RUN_MODE)    runner.set_config(config)    try:        results = runner.run(            strategy=strategy,            initialize=initialize,            strategy_params=strategy_params        )    except KeyboardInterrupt:        print("\n用户中断")        runner.stop()    except Exception as e:        print(f"\n运行出错:{e}")        import traceback        traceback.print_exc()        runner.stop()