# -*- coding: utf-8 -*-
"""BT-ATR:實測「ATR 移動停損」在台股月營收動能策略上的效果。

基準策略(baseline):
  - 池:60 日均量 > 1,000 張、收盤價 > 10 元、月營收 YoY > 0
  - 選股:月營收 YoY(去年同月增減%)排名前 25 名,等權
  - 月再平衡(月底訊號、次一交易日收盤成交)
對照組:同策略 + ATR(14) 移動停損(chandelier 式,2x / 3x 各一版)
  - ATR 用還原高低收價計算(Wilder EWM, alpha=1/14),避免除權息缺口誤觸
  - 停損規則:持有期間內,還原收盤 < 持有期高點 - k*ATR(14) → 次日收盤出場,
    出場後該檔閒置為現金,直到下一次月再平衡才可重新入選(與 finlab 原生
    stop_trading_next_period 語意一致)
成本:fee_ratio=1.425/1000/3(三折手續費)、tax_ratio=3/1000(finlab 預設台股成本)
benchmark:0050(etl:adj_close 還原含息),fee=0
回測區間:2018-01-01 ~ 資料最新日

輸出(/tmp/seo-bt/atr/):metrics.json、report_baseline.html、report_strategy.html、
equity_monthly.csv(畫圖用)、atr_strategy.py 由本檔複製。
用法:cd ~/Documents/finlab && uv run python /tmp/seo-bt/atr/run_backtest.py
"""
import warnings; warnings.filterwarnings("ignore")
import json
from pathlib import Path

import numpy as np
import pandas as pd

from finlab import data
from finlab.backtest import sim

OUT = Path("/tmp/seo-bt/atr")
OUT.mkdir(parents=True, exist_ok=True)
START = "2018-01-01"
TOPN = 25
FEE = 1.425 / 1000 / 3
TAX = 3 / 1000

# ---------- 資料 ----------
adj = data.get("etl:adj_close")
adj_h = data.get("etl:adj_high")
adj_l = data.get("etl:adj_low")
rev = data.get("monthly_revenue:去年同月增減(%)")  # index 已對齊公告截止日

dates = adj.index
cols = adj.columns

close = data.get("price:收盤價").reindex(index=dates, columns=cols)
vol = data.get("price:成交股數").reindex(index=dates, columns=cols)

# ---------- 月底選股訊號(資訊皆為當日可得,次日才交易) ----------
revd = rev.reindex(index=dates, method="ffill").reindex(columns=cols)
liq = vol.rolling(60).mean() > 1_000_000  # 60 日均量 > 1,000 張
pool = liq & (close > 10) & (revd > 0)
score = revd.where(pool)
sel_daily_raw = score.rank(axis=1, ascending=False, method="first") <= TOPN  # 每日皆算,僅月底取值

# 月底(每月最後一個交易日)取訊號,次一交易日生效,持有到下一次再平衡
month_id = pd.Series(dates).dt.to_period("M").values
is_month_end = np.r_[month_id[:-1] != month_id[1:], True]  # 各月最後交易日
sel_me = pd.DataFrame(
    np.where(is_month_end[:, None], sel_daily_raw.values, np.nan),
    index=dates, columns=cols,
)
held = sel_me.ffill().shift(1).fillna(0).astype(bool)  # 次日起生效
held.loc[held.index < START] = False

# 各期(再平衡日~下次再平衡前一日)固定權重 1/N,停損後閒置為現金不再分配
n_sel = held.sum(axis=1)
period_start = np.r_[True, is_month_end[:-1]]  # 再平衡生效日 = 月初第一個交易日
period_id_arr = np.cumsum(period_start)  # 每日所屬持有期編號(全市場共用)

# ---------- ATR(14)(Wilder)與 chandelier 移動停損 ----------
prev_c = adj.shift(1)
tr = pd.DataFrame(
    np.fmax(np.fmax((adj_h - adj_l).values, (adj_h - prev_c).abs().values),
            (adj_l - prev_c).abs().values),
    index=dates, columns=cols,
)
atr = tr.ewm(alpha=1 / 14, adjust=False).mean()


def grouped_cummax(values: np.ndarray, period_ids: np.ndarray) -> np.ndarray:
    """對每檔股票、每個持有期做 cummax(stacked groupby,單次 O(n))。"""
    n_d, n_c = values.shape
    key = (period_ids[:, None] * (n_c + 1) + np.arange(n_c)).ravel(order="F")
    s = pd.Series(values.ravel(order="F"))
    out = s.groupby(key).cummax()
    return out.values.reshape((n_d, n_c), order="F")


def apply_atr_stop(held_bool: pd.DataFrame, mult: float) -> pd.DataFrame:
    """持有期內:還原收盤 < 期間高點 - mult*ATR(14) → 次日出場,期末前不回補。"""
    a = adj.where(held_bool)  # 只看持有中的價格
    hi = grouped_cummax(a.values, period_id_arr)  # 持有期內最高還原收盤
    with np.errstate(invalid="ignore"):
        trigger = a.values < (hi - mult * atr.values)
    trigger = np.where(np.isnan(a.values), False, trigger)
    stopped = grouped_cummax(trigger.astype(float), period_id_arr) > 0
    # 次日出場;shift 不可跨期(新期第一天不繼承前期停損狀態)
    same_period = np.r_[False, period_id_arr[1:] == period_id_arr[:-1]]
    stopped_s = np.vstack([np.zeros((1, len(cols)), bool), stopped[:-1]])
    stopped_s = stopped_s & same_period[:, None]
    return pd.DataFrame(held_bool.values & ~stopped_s, index=dates, columns=cols)


def to_weights(held_bool: pd.DataFrame) -> pd.DataFrame:
    """期初等權 1/N(N=該期入選檔數);停損出場後其權重回到現金。"""
    n_per_day = pd.Series(n_sel.values, index=dates)
    n_period = n_per_day.groupby(period_id_arr).transform("max")
    w = held_bool.astype(float).div(n_period.replace(0, np.nan), axis=0)
    return w.fillna(0)


def annual_turnover(w: pd.DataFrame) -> float:
    """以目標權重估算的單邊年化換手率(|Δw|/2 年加總平均)。"""
    ww = w[w.index >= START]
    t = ww.diff().abs().sum(axis=1) / 2
    yearly = t.groupby(ww.index.year).sum()
    yearly = yearly[(yearly.index > 2017) & (yearly.index < ww.index[-1].year)]
    return float(yearly.mean())


def run(w: pd.DataFrame, name: str):
    r = sim(w[w.index >= START], resample=None, upload=False,
            fee_ratio=FEE, tax_ratio=TAX, trade_at_price="close")
    s = r.get_stats()
    m = dict(
        name=name,
        cagr=round(s["cagr"] * 100, 1),
        daily_sharpe=round(s["daily_sharpe"], 2),
        daily_sortino=round(s["daily_sortino"], 2),
        max_drawdown=round(s["max_drawdown"] * 100, 1),
        win_ratio=round(s.get("win_ratio", float("nan")) * 100, 1),
        annual_turnover_oneway=round(annual_turnover(w), 2),
    )
    return r, m


# ---------- 跑四個版本 ----------
w_base = to_weights(held)
held_atr2 = apply_atr_stop(held, 2.0)
held_atr3 = apply_atr_stop(held, 3.0)
w_atr2 = to_weights(held_atr2)
w_atr3 = to_weights(held_atr3)

r_base, m_base = run(w_base, "月營收動能 25 檔(無停損)")
r_atr2, m_atr2 = run(w_atr2, "同策略 + ATR(14)x2 移動停損")
r_atr3, m_atr3 = run(w_atr3, "同策略 + ATR(14)x3 移動停損")

# benchmark 0050 含息
w_b = pd.DataFrame(index=dates)
w_b["0050"] = 1.0
r_bench = sim(w_b[w_b.index >= START], resample="Q", upload=False, fee_ratio=0)
s_b = r_bench.get_stats()
m_bench = dict(
    name="0050 含息(etl:adj_close)",
    cagr=round(s_b["cagr"] * 100, 1),
    daily_sharpe=round(s_b["daily_sharpe"], 2),
    daily_sortino=round(s_b["daily_sortino"], 2),
    max_drawdown=round(s_b["max_drawdown"] * 100, 1),
)

# 停損觸發統計
stop_days2 = int((held & ~held_atr2).any(axis=1).sum())
stops2 = int(((held & ~held_atr2) & ~(held & ~held_atr2).shift(1, fill_value=False)).values.sum())
stops3 = int(((held & ~held_atr3) & ~(held & ~held_atr3).shift(1, fill_value=False)).values.sum())

# 交叉驗證:同選股用 finlab 原生 trail_stop=0.10(百分比移動停損),確認方向一致
pos_daily_bool = sel_daily_raw[sel_daily_raw.index >= START]
r_native_base = sim(pos_daily_bool, resample="M", upload=False, fee_ratio=FEE, tax_ratio=TAX)
r_native_ts = sim(pos_daily_bool, resample="M", upload=False, fee_ratio=FEE, tax_ratio=TAX,
                  trail_stop=0.10)
s_nb, s_nt = r_native_base.get_stats(), r_native_ts.get_stats()
crosscheck = {
    "note": "同一選股以 finlab 原生 sim(resample='M') 重跑:無停損 vs trail_stop=0.10",
    "native_baseline": {"cagr": round(s_nb["cagr"] * 100, 1),
                        "daily_sharpe": round(s_nb["daily_sharpe"], 2),
                        "max_drawdown": round(s_nb["max_drawdown"] * 100, 1)},
    "native_trail10": {"cagr": round(s_nt["cagr"] * 100, 1),
                       "daily_sharpe": round(s_nt["daily_sharpe"], 2),
                       "max_drawdown": round(s_nt["max_drawdown"] * 100, 1)},
}

end_date = str(dates[-1])[:10]
out = {
    "window": f"2018-01-01 ~ {end_date}",
    "cost": {"fee_ratio": FEE, "tax_ratio": TAX, "trade_at_price": "close",
             "note": "finlab 預設台股成本(手續費三折 0.0475%+證交稅 0.3%)"},
    "universe": "60 日均量>1000 張、收盤>10 元、月營收 YoY>0,取 YoY 前 25 名等權,月再平衡",
    "atr_rule": "Wilder ATR(14)(還原高低收);持有期內還原收盤 < 期間高點 - k*ATR → 次日收盤出場,當期不回補",
    "stop_events": {"atr2x": stops2, "atr3x": stops3},
    "baseline": m_base, "atr2x": m_atr2, "atr3x": m_atr3, "benchmark": m_bench,
    "crosscheck_native_trail_stop": crosscheck,
}
(OUT / "metrics.json").write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(out, ensure_ascii=False, indent=2))

# ---------- 報告 HTML ----------
r_base.to_html(str(OUT / "report_baseline.html"), title="月營收動能 25 檔(無停損)2018-2026")
best = r_atr3 if m_atr3["daily_sharpe"] >= m_atr2["daily_sharpe"] else r_atr2
best_name = "ATR(14)x3" if best is r_atr3 else "ATR(14)x2"
best.to_html(str(OUT / "report_strategy.html"), title=f"月營收動能 + {best_name} 移動停損 2018-2026")
print("best ATR variant for report_strategy.html:", best_name)

# ---------- 權益曲線(月頻,給 plotly 畫 PNG) ----------
def eq(r):
    c = r.creturn[r.creturn.index >= START]
    c = c.resample("ME").last().dropna()
    return c / c.iloc[0]

eqdf = pd.DataFrame({
    "baseline": eq(r_base), "atr2x": eq(r_atr2), "atr3x": eq(r_atr3), "benchmark": eq(r_bench),
}).dropna()
eqdf.to_csv(OUT / "equity_monthly.csv")
print("wrote", OUT / "equity_monthly.csv", eqdf.shape)
