# -*- coding: utf-8 -*-
"""台股相關性分析 + 金控股配對交易回測(完整可重現腳本)。

對應文章:
  https://finlab.finance/blog/taiwan-stock-correlation-python

執行(任一有 finlab 的 Python 環境):
  pip install finlab yfinance statsmodels pandas numpy
  python strategy.py

兩段分析:
  A. 全球指數相關性(yfinance,2015-01 起):
     - 價格水準相關 vs 日報酬相關的對照(前者會高估)
     - 台股加權與各國指數的日報酬相關排行
     - 台股 vs 美股「同日」與「美股前一交易日」的相關(時差效應)
     - 台股 vs S&P 500 的 120 日滾動相關
  B. 台股金控股配對交易(finlab etl:adj_close,2018-01 起):
     - 形成期 252 個交易日:日報酬相關 > 0.6 且 Engle-Granger 共整合
       p < 0.05,取 p 值最小 5 組,每半年重選
     - 交易期:spread(形成期 OLS 對沖比)60 日 z-score,
       |z| > 2 進場(作多便宜腿、放空貴腿各 50%),z 回到 0 出場
     - 成本:手續費 0.1425%(雙邊)、證交稅 0.3%(賣出與融券放空)、
       借券成本年化 1%(估計);市值中性多空損益自行計算
     - 基準:0050 還原價買進持有(純指數算術,不含交易成本)

投資警語:本程式僅供量化研究與教學用途,過去績效不代表未來表現,
不構成任何投資建議;實際融券放空有強制回補、停券與借券費率變動風險。
"""
from __future__ import annotations

import json
import os
import warnings
from itertools import combinations

import numpy as np
import pandas as pd
import yfinance as yf
from statsmodels.tsa.stattools import coint

import finlab
from finlab import data

warnings.filterwarnings("ignore")

finlab.login()  # 第一次使用會自動引導登入

START = "2018-01-01"
END = os.environ.get("DATA_END", "2026-06-09")


# =====================================================================
# Part A. 全球指數相關性
# =====================================================================
INDICES = {
    "台股加權": "^TWII",
    "S&P 500": "^GSPC",
    "那斯達克": "^IXIC",
    "費城半導體": "^SOX",
    "道瓊工業": "^DJI",
    "日經 225": "^N225",
    "韓國 KOSPI": "^KS11",
    "香港恆生": "^HSI",
    "上證綜指": "000001.SS",
    "深證成指": "399001.SZ",
    "英國富時 100": "^FTSE",
    "Cboe UK 100": "^BUK100P",
    "德國 DAX": "^GDAXI",
}

raw = yf.download(list(INDICES.values()), start="2015-01-01", end=END,
                  auto_adjust=True, progress=False)["Close"]
px = raw.rename(columns={v: k for k, v in INDICES.items()})[list(INDICES.keys())]
px = px.dropna(how="all")

# 價格水準相關(會被共同趨勢灌水)vs 日報酬相關(應採用)
corr_price = px.corr()
returns = px.pct_change(fill_method=None)
corr_return = returns.corr()

print("台股與各指數的日報酬相關排行:")
print(corr_return["台股加權"].drop("台股加權").sort_values(ascending=False).round(2))

# 時差效應:台股收盤早於美股開盤,改用「美股前一交易日」報酬再算一次
both = returns[["台股加權", "S&P 500"]].copy()
both["美股前一日"] = both["S&P 500"].shift(1)
both = both.dropna()
print("台股 vs S&P 500 同日相關:", round(both["台股加權"].corr(both["S&P 500"]), 2))
print("台股 vs S&P 500 前一日相關:", round(both["台股加權"].corr(both["美股前一日"]), 2))

# 120 日滾動相關:相關係數本身會隨市況大幅變動
aligned = returns[["台股加權", "S&P 500"]].dropna()
rolling_corr = aligned["台股加權"].rolling(120).corr(aligned["S&P 500"])


# =====================================================================
# Part B. 金控股配對交易(walk-forward)
# =====================================================================
FIN = {
    "2880": "華南金", "2881": "富邦金", "2882": "國泰金", "2883": "凱基金",
    "2884": "玉山金", "2885": "元大金", "2886": "兆豐金", "2887": "台新金",
    "2888": "新光金", "2889": "國票金", "2890": "永豐金", "2891": "中信金",
    "2892": "第一金", "5880": "合庫金",
}

adj = data.get("etl:adj_close")
adj = adj[adj.index <= END]

# 0050 基準:還原價買進持有(純指數算術)
bench_price = adj["0050"]
bench_price = bench_price[(bench_price.index >= START)].dropna()
bench_return = bench_price.pct_change().dropna()

prices = adj[[c for c in FIN if c in adj.columns]]
prices = prices[prices.index >= "2016-01-01"]
rets = prices.pct_change(fill_method=None)
logp = np.log(prices)

FORM = 252          # 形成期長度(交易日)
ZWIN = 60           # z-score 滾動視窗
Z_IN, Z_OUT = 2.0, 0.0
CORR_MIN, P_MAX = 0.6, 0.05
MAX_PAIRS = 5
FEE = 0.001425      # 券商手續費(未打折)
TAX = 0.003         # 證交稅(現股賣出與融券放空)
BORROW = 0.01       # 借券成本(年化,估計值)

dates = prices.index


def select_pairs(form_idx):
    """形成期:報酬相關 + Engle-Granger 共整合,回傳 (p值, A, B, 對沖比)。"""
    candidates = []
    for a, b in combinations(prices.columns, 2):
        pa = logp.loc[form_idx, a]
        pb = logp.loc[form_idx, b]
        if pa.isna().any() or pb.isna().any():
            continue
        corr = rets.loc[form_idx, a].corr(rets.loc[form_idx, b])
        if not (corr > CORR_MIN):
            continue
        _, pvalue, _ = coint(pa, pb)
        if pvalue < P_MAX:
            beta = float(np.polyfit(pb, pa, 1)[0])
            candidates.append((pvalue, a, b, beta))
    candidates.sort()
    return candidates[:MAX_PAIRS]


def trade_pair(a, b, beta, trade_idx):
    """交易期:z-score 狀態機,回傳該 pair 的每日報酬(已扣成本)。"""
    spread = logp[a] - beta * logp[b]
    z = (spread - spread.rolling(ZWIN).mean()) / spread.rolling(ZWIN).std()
    position = 0  # +1:作多 A 放空 B(z < -2);-1:放空 A 作多 B(z > 2)
    pair_ret = pd.Series(0.0, index=trade_idx)
    for i, d in enumerate(trade_idx):
        ra, rb = rets.at[d, a], rets.at[d, b]
        zv = z.get(d, np.nan)
        if position != 0:
            if np.isnan(ra) or np.isnan(rb):   # 下市或停牌,強制出場
                position = 0
                continue
            pair_ret.iloc[i] = position * 0.5 * (ra - rb)
            pair_ret.iloc[i] -= BORROW / 252 * 0.5            # 借券成本
            crossed = (position == -1 and zv <= Z_OUT) or (position == 1 and zv >= -Z_OUT)
            if crossed or i == len(trade_idx) - 1:
                pair_ret.iloc[i] -= 0.5 * (FEE + TAX) + 0.5 * FEE   # 出場成本
                position = 0
        elif not np.isnan(zv) and abs(zv) > Z_IN and i < len(trade_idx) - 1:
            position = -1 if zv > 0 else 1
            pair_ret.iloc[i] -= 0.5 * FEE + 0.5 * (FEE + TAX)       # 進場成本
    return pair_ret


# 每半年重選一次 pairs,walk-forward 串接
period_returns = []
for period_start in pd.date_range(START, END, freq="6MS"):
    period_end = min(period_start + pd.DateOffset(months=6) - pd.Timedelta(days=1),
                     pd.Timestamp(END))
    history = dates[dates < period_start]
    trade_idx = dates[(dates >= period_start) & (dates <= period_end)]
    if len(history) < FORM or len(trade_idx) == 0:
        continue
    selected = select_pairs(history[-FORM:])
    port = pd.Series(0.0, index=trade_idx)
    for pvalue, a, b, beta in selected:
        port += trade_pair(a, b, beta, trade_idx) / len(selected)
    period_returns.append(port)

strategy_return = pd.concat(period_returns).sort_index()
strategy_return = strategy_return[~strategy_return.index.duplicated()]
equity = (1 + strategy_return).cumprod()


def summarize(name, daily_return, equity_curve):
    years = (daily_return.index[-1] - daily_return.index[0]).days / 365.25
    total = float(equity_curve.iloc[-1] / equity_curve.iloc[0] - 1)
    downside = daily_return[daily_return < 0]
    return {
        "name": name,
        "cagr": round(((1 + total) ** (1 / years) - 1) * 100, 2),
        "daily_sharpe": round(float(daily_return.mean() / daily_return.std() * 252 ** 0.5), 2),
        "daily_sortino": round(float(daily_return.mean() / downside.std() * 252 ** 0.5), 2),
        "max_drawdown": round(float((equity_curve / equity_curve.cummax() - 1).min()) * 100, 2),
        "total_return": round(total * 100, 1),
    }


stats_strategy = summarize("金控配對交易(含成本)", strategy_return, equity)
stats_bench = summarize("0050 含息買進持有", bench_return, bench_price / bench_price.iloc[0])
print(json.dumps([stats_strategy, stats_bench], ensure_ascii=False, indent=2))
print("策略與 0050 日報酬相關:", round(float(strategy_return.corr(bench_return)), 2))
