# 台股選股回測 — 穩健性 / 敏感度 / 年度 / turnover 全套(2018~)
# 本檔輸出文章「穩健性檢查」所有表格的可追溯數字。
# 環境:finlab(conda)。benchmark 0050 一律用 etl:adj_close(含息)。
# 跑法:conda activate finlab && python robustness.py
import warnings; warnings.filterwarnings("ignore")
import numpy as np, pandas as pd
from finlab import data
from finlab.backtest import sim

START = '2018-01-01'
close  = data.get('price:收盤價')
adj    = data.get('etl:adj_close')
roe    = data.get('fundamental_features:ROE稅後').index_str_to_date().reindex(close.index, method='ffill')
rev    = data.get('monthly_revenue:去年同月增減(%)').reindex(close.index, method='ffill')
amount = (close * data.get('price:成交股數')).rolling(60).mean()
cats   = data.get('security_categories').set_index('stock_id')['category']

def pr(df):
    return pd.DataFrame(df).astype(float).rank(axis=1, pct=True)

def liquid_mask():
    return amount.rank(axis=1, pct=True) > 0.5

def composite(mom_days=120, vol_days=120, lowvol_w=2, topn=25, power=3,
              liquidity=True, exclude=None, weight='power3'):
    quality  = pr(roe)
    momentum = pr(close / close.shift(mom_days) - 1)
    lowvol   = pr(-close.pct_change().rolling(vol_days).std())
    revenue  = pr(rev)
    score = (quality + momentum + lowvol_w * lowvol + revenue) / (3 + lowvol_w)
    if liquidity:
        score = score.where(liquid_mask())
    if exclude is not None:
        excl_ids = cats[cats.isin(exclude)].index.astype(str)
        keep = [c for c in score.columns if c not in set(excl_ids)]
        score = score[keep]
    top = score.rank(axis=1, ascending=False) <= topn
    if weight == 'equal':
        w = top.astype(float)
    elif weight == 'score':
        w = score.where(top)
    elif weight == 'power2':
        w = (score ** 2).where(top)
    else:  # power3
        w = (score ** 3).where(top)
    pos = w.div(w.sum(axis=1), axis=0).fillna(0)
    return pos[pos.index >= START]

def stats(pos, fee=None, extra_cost=None):
    # finlab sim() 無 slippage 參數;用提高 fee_ratio 模擬「滑價/交易摩擦加倍」
    kw = {}
    if fee is not None: kw['fee_ratio'] = fee
    if extra_cost is not None: kw['fee_ratio'] = 0.001425 + extra_cost
    r = sim(pos, resample='M', resample_offset='14D', upload=False, **kw)
    s = r.get_stats()
    return r, s

def row(name, s):
    print(f"{name:30s} CAGR={s['cagr']*100:6.2f}%  Sharpe(d)={s.get('daily_sharpe'):.2f}  "
          f"Sortino(m)={s.get('monthly_sortino'):.2f}  MDD={s['max_drawdown']*100:6.2f}%")

# ---------- 0050 benchmark(含息),含月 Sortino 同口徑 ----------
e = adj['0050'].dropna(); e = e[e.index >= START]
yrs = (e.index[-1] - e.index[0]).days / 365.25
cagr0050 = (e.iloc[-1] / e.iloc[0]) ** (1 / yrs) - 1
r0 = e.pct_change().dropna()
sharpe0050 = r0.mean() / r0.std() * 252 ** 0.5
mdd0050 = ((e / e.cummax()) - 1).min()
# 月 Sortino:月報酬 → 下行標準差
em = e.resample('M').last(); rm0 = em.pct_change().dropna()
downside0 = rm0[rm0 < 0].std()
sortino_m0050 = rm0.mean() / downside0 * (12 ** 0.5)
print("== 0050 含息 benchmark ==")
print(f"  CAGR={cagr0050*100:.2f}%  Sharpe(d)={sharpe0050:.2f}  "
      f"monthly_sortino={sortino_m0050:.2f}  MDD={mdd0050*100:.2f}%")

# ---------- 低波動單因子(top 30 月頻)— 補進單因子表 ----------
# 註:純取全市場最低波動會選到近乎停滯的全額交割/低流動股(波動被低估),
#     故與複合策略同口徑加上「近 60 日成交額前 50%」流動性過濾。
print("\n== 低波動單因子(近 60 日波動,選 30 檔,月頻,含流動性過濾)==")
lv = (-close.pct_change().rolling(60).std()).where(liquid_mask())
lvpos = (lv.rank(axis=1, ascending=False) <= 30)
lvpos = lvpos[lvpos.index >= START]
r_lv = sim(lvpos, resample='M', upload=False); slv = r_lv.get_stats()
row("低波動(60日,30檔,過濾)", slv)
print("  (無流動性過濾版本 CAGR 近 0%、選到停滯股,不具代表性,故採過濾版)")

# ---------- Hero 基準 ----------
print("\n== Hero 複合(預設)==")
hero, sh = stats(composite())
row("Hero 品質+動能+低波+營收", sh)

# ---------- 年度績效(策略 vs 0050)----------
print("\n== 年度報酬(策略 vs 0050)==")
heq = hero.eval_strategy if False else None
# 用 report 的權益曲線
he = hero  # position
rep, _ = stats(composite())
eq = rep.creturn  # cumulative return series of strategy
ann_s = eq.resample('Y').last().pct_change()
ann_s.iloc[0] = eq.resample('Y').last().iloc[0] / 1 - 1
ann_b = e.resample('Y').last().pct_change()
ann_b.iloc[0] = e.resample('Y').last().iloc[0] / e.iloc[0] - 1
for y in range(2018, 2027):
    sv = ann_s[ann_s.index.year == y]
    bv = ann_b[ann_b.index.year == y]
    s_str = f"{sv.iloc[0]*100:6.2f}%" if len(sv) else "  n/a"
    b_str = f"{bv.iloc[0]*100:6.2f}%" if len(bv) else "  n/a"
    print(f"  {y}: 策略 {s_str}   0050 {b_str}")

# ---------- 敏感度:動能天數 ----------
print("\n== 敏感度 1:動能回看期(60/120/240 日)==")
for d in (60, 120, 240):
    _, s = stats(composite(mom_days=d))
    row(f"動能 {d} 日", s)

# ---------- 敏感度:持股檔數 ----------
print("\n== 敏感度 2:持股檔數(20/25/30/50)==")
for n in (20, 25, 30, 50):
    _, s = stats(composite(topn=n))
    row(f"持股 {n} 檔", s)

# ---------- 敏感度:低波權重 ----------
print("\n== 敏感度 3:低波權重(1/2/3 倍)==")
for w in (1, 2, 3):
    _, s = stats(composite(lowvol_w=w))
    row(f"低波 {w} 倍權重", s)

# ---------- 敏感度:加權方式 ----------
print("\n== 敏感度 4:加權方式(等權/分數/平方/三次方)==")
for wm, lbl in [('equal','等權'), ('score','分數'), ('power2','分數平方'), ('power3','分數三次方')]:
    _, s = stats(composite(weight=wm))
    row(lbl, s)

# ---------- 成本壓力 ----------
print("\n== 敏感度 5:成本壓力(預設 vs 手續費2倍 vs 滑價0.2%)==")
_, s = stats(composite()); row("預設成本", s)
_, s = stats(composite(), fee=0.001425*2); row("手續費2倍", s)
_, s = stats(composite(), extra_cost=0.002); row("摩擦+0.2%(滑價代理)", s)

# ---------- 排除半導體/電子族群 ----------
print("\n== 敏感度 6:排除電子/半導體族群後再測 ==")
ELEC = ['電子零組件業','光電業','電腦及週邊設備業','其他電子業','通信網路業',
        '資訊服務業','數位雲端','電子通路業','電機機械']
_, s = stats(composite(exclude=ELEC)); row("排除電子族群", s)

# ---------- turnover ----------
print("\n== turnover(平均月周轉率,估算)==")
pos = composite()
# 月頻 resample:取每月有變動的持股集合,算換手
mp = pos.resample('M').last()
mp = mp[(mp.T != 0).any()]
turn = (mp.diff().abs().sum(axis=1) / 2).dropna()  # 單邊換手率
print(f"  平均月單邊周轉率 = {turn.mean()*100:.1f}%   單月最高 = {turn.max()*100:.1f}%")

# ---------- MDD 發生時點 ----------
print("\n== MDD 發生時點 ==")
def mdd_period(series):
    dd = series / series.cummax() - 1
    trough = dd.idxmin()
    peak = series[:trough].idxmax()
    return peak, trough, dd.min()
pk, tr, m = mdd_period(rep.creturn)
print(f"  策略 MDD={m*100:.2f}%  期間 {pk.date()} → {tr.date()}")
pk, tr, m = mdd_period(e)
print(f"  0050 MDD={m*100:.2f}%  期間 {pk.date()} → {tr.date()}")
