# -*- coding: utf-8 -*-
"""三種月營收選股法(均線法 / 創新高法 / 成長法)同條件對決回測。

對應文章:
  https://finlab.finance/blog/python-monthly-revenue-three-methods

執行方式(finlab 會在需要資料時自動引導登入):
  pip install finlab
  python strategy.py

共同設定(同股池、同再平衡節奏,只換訊號):
  - 股票池:有公布月營收的台股上市櫃公司,近 20 日成交金額中位數 > 5,000 萬
  - 再平衡:跟隨月營收公告節奏(finlab 月營收索引已對齊公告截止日,無前視)
  - 交易成本:finlab sim() 台股預設(手續費 0.1425% 與證交稅 0.3%)
  - 基準:0050 還原價(etl:adj_close)買進持有,純指數算術、不含交易成本

投資警語:本程式僅供量化研究與教學用途,過去績效不代表未來表現,
不構成任何投資建議;實際交易前請自行評估風險、滑價與交易容量。
"""
import pandas as pd
from finlab import data
from finlab.backtest import sim

END = "2026-06-09"  # 文章數據的資料截止日;留空可跑到最新


# ---------- 載入資料 ----------
rev = data.get("monthly_revenue:當月營收")
value = data.get("price:成交金額")
adj = data.get("etl:adj_close")

if END:
    rev = rev[rev.index <= END]
    value = value[value.index <= END]
    adj = adj[adj.index <= END]


# ---------- 股票池:流動性濾網 ----------
# 近 20 日成交金額中位數 > 5,000 萬,排除回測買得到、實單買不到的低流動性股
liquid = value.rolling(20).median() > 50_000_000

# 對齊到月營收公告日;只留兩邊都有的股票(ETF 與指數沒有月營收,自然排除)
common = rev.columns.intersection(liquid.columns)
liquid = liquid.reindex(rev.index, method="ffill")[common].fillna(False)
rev = rev[common]


# ---------- 三種月營收訊號(不疊任何價格條件) ----------
# 1. 平均線法:近 3 個月營收平均 > 近 12 個月營收平均
signal_ma = (rev.average(3) > rev.average(12)) & liquid

# 2. 創新高法:當月營收創近 24 個月新高
signal_high = (rev == rev.rolling(24).max()) & liquid

# 3. 成長法:先用 4 個月平均把營收曲線平滑,再要求平滑線連續 5 個月走高
smooth = rev.average(4)
signal_growth = (smooth > smooth.shift()).sustain(5) & liquid

# 傳統基準訊號:MoM > 0 與 YoY > 0
signal_mom = (rev / rev.shift(1) - 1 > 0) & liquid
yoy = rev / rev.shift(12) - 1
signal_yoy = (yoy > 0) & liquid


# ---------- 回測:全持有版(訊號為真的股票全部等權持有) ----------
START = "2012-01-01"  # 確保最長的 24 個月新高訊號在起點就算得出來


def clip_creturn(creturn):
    """sim() 的淨值曲線會延伸到執行當日;統計前先把兩端截斷到回測區間。"""
    clipped = creturn[creturn.index >= START]
    if END:
        clipped = clipped[clipped.index <= END]
    return clipped


def print_stats(name, report):
    """對截斷後的淨值曲線用純算術計算統計,與文中 0050 基準同一套公式。"""
    cr = clip_creturn(report.creturn)
    ret = cr.pct_change().dropna()
    total = cr.iloc[-1] / cr.iloc[0] - 1
    years = (cr.index[-1] - cr.index[0]).days / 365.25
    cagr = (1 + total) ** (1 / years) - 1
    sharpe = ret.mean() / ret.std() * 252 ** 0.5
    mdd = (cr / cr.cummax() - 1).min()
    print(name, "CAGR", round(cagr * 100, 2), "%",
          "Sharpe", round(sharpe, 2),
          "MDD", round(mdd * 100, 2), "%")


for name, signal in [
    ("均線法", signal_ma),
    ("創新高法", signal_high),
    ("成長法", signal_growth),
    ("MoM>0", signal_mom),
    ("YoY>0", signal_yoy),
]:
    position = signal[signal.index >= START]
    report = sim(position, upload=False)
    print_stats(name, report)


# ---------- 回測:Top 30 版(訊號池內按 YoY 由高到低取前 30 檔) ----------
for name, signal in [
    ("均線法 Top30", signal_ma),
    ("創新高法 Top30", signal_high),
    ("成長法 Top30", signal_growth),
]:
    position = yoy.where(signal).is_largest(30)
    position = position[position.index >= START]
    report = sim(position, upload=False)
    print_stats(name, report)


# ---------- 0050 含息基準(純指數算術,不經 sim、不含成本) ----------
benchmark = adj["0050"]
benchmark = benchmark[benchmark.index >= START].dropna()
total_return = benchmark.iloc[-1] / benchmark.iloc[0] - 1
years = (benchmark.index[-1] - benchmark.index[0]).days / 365.25
cagr = (1 + total_return) ** (1 / years) - 1
mdd = (benchmark / benchmark.cummax() - 1).min()
print("0050 含息", "CAGR", round(cagr * 100, 2), "%", "MDD", round(mdd * 100, 2), "%")
