# -*- coding: utf-8 -*-
"""LPPL 泡沫模型:0050 的 LPPLS 信心指標計算與擇時回測(可重現版)。

對應文章:
  https://finlab.finance/blog/lppl-bubble-crash-prediction-python

執行:
  pip install finlab numpy scipy pandas
  python strategy.py

方法說明:
  1. LPPL 校準採 Filimonov & Sornette (2013) 線性化:
     非線性參數只剩 (tc, m, omega),線性參數 (A, B, C1, C2) 用最小平方法解。
  2. LPPLS 信心指標 (Sornette et al. 2015):每 5 個交易日為一個觀測點,
     對 30~120 天(間隔 5 天,共 19 個)的回看窗格各做一次擬合,
     通過參數過濾的窗格比例 = 信心值。
  3. 擇時規則:持有 0050,正泡沫信心 >= 0.2 出場;
     信心退回 <= 0.1 或負泡沫信心 >= 0.2 回補。
  4. 所有亂數固定種子,重跑會得到相同數字。
  5. 交易成本:策略用 finlab sim() 台股預設值(手續費 0.1425%、證交稅 0.3%);
     0050 基準為 etl:adj_close 含息買進持有(純指數算術,不含成本)。

投資警語:本程式僅供量化研究與教學用途,過去績效不代表未來表現,
不構成任何投資建議;實際交易前請自行評估風險、滑價與交易容量。
"""
from __future__ import annotations

import os
from concurrent.futures import ProcessPoolExecutor

import numpy as np
import pandas as pd
from scipy.optimize import minimize

START = "2018-01-01"
END = os.environ.get("DATA_END", "2026-06-09")  # 文章數據截止日,想跑最新拿掉即可

# ---------- LPPLS 設定 ----------
WINDOWS = list(range(30, 121, 5))      # 回看窗格 30~120 個交易日,共 19 個
OBS_STEP = 5                           # 每 5 個交易日計算一次信心指標
N_STARTS = 6                           # 每個窗格的多起點搜尋次數
SEED_BASE = 20260611                   # 固定種子,確保可重現

# 合格擬合的參數過濾條件(Sornette et al. 2015)
M_MIN, M_MAX = 0.01, 1.2               # 冪律指數 m
W_MIN, W_MAX = 2.0, 25.0               # log 週期角頻率 omega
OSC_MIN = 2.5                          # 窗格內至少 2.5 次完整震盪
DAMPING_MIN = 0.8                      # 阻尼比 m|B| / (omega|C|)
TC_LO_FRAC, TC_HI_FRAC = -0.05, 0.10   # tc 相對窗格長度的允許範圍

# 擇時門檻
EXIT_THRESHOLD = 0.20                  # 正泡沫信心 >= 0.2 出場
REENTER_THRESHOLD = 0.10               # 信心退回 <= 0.1 回補
NEG_REENTER = 0.20                     # 負泡沫信心 >= 0.2 直接回補(超跌反轉)

# 子行程共用的對數價格陣列(由 initializer 填入,避免重複序列化)
_LOGP: np.ndarray | None = None


def _init_worker(logp: np.ndarray) -> None:
    global _LOGP
    _LOGP = logp


def lppl_design_matrix(t: np.ndarray, tc: float, m: float, w: float) -> np.ndarray:
    """LPPL 線性部分的設計矩陣 [1, f, g, h]。"""
    dt = np.abs(tc - t) + 1e-8
    f = dt ** m
    log_dt = np.log(dt)
    g = f * np.cos(w * log_dt)
    h = f * np.sin(w * log_dt)
    return np.column_stack([np.ones_like(t), f, g, h])


def lppl_sse(theta: np.ndarray, t: np.ndarray, y: np.ndarray) -> float:
    """給定 (tc, m, omega),解出線性參數後的殘差平方和。"""
    tc, m, w = theta
    X = lppl_design_matrix(t, tc, m, w)
    coef, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
    residual = y - X @ coef
    return float(residual @ residual)


def fit_one_window(args: tuple[int, int]) -> dict:
    """擬合單一 (觀測點, 窗格長度),回傳參數與過濾結果。"""
    end_idx, win = args
    t1 = end_idx - win + 1
    t = np.arange(t1, end_idx + 1, dtype=float)
    y = _LOGP[t1:end_idx + 1]
    t2 = float(end_idx)
    dt_win = float(win)

    # 多起點 Nelder-Mead,種子由 (觀測點, 窗格) 決定,完全可重現
    rng = np.random.default_rng(SEED_BASE + end_idx * 1000 + win)
    bounds = [
        (t2 - 0.2 * dt_win, t2 + 0.2 * dt_win),  # tc
        (0.01, 1.99),                            # m
        (2.0, 25.0),                             # omega
    ]
    best = None
    for _ in range(N_STARTS):
        x0 = np.array([rng.uniform(lo, hi) for lo, hi in bounds])
        result = minimize(
            lppl_sse, x0, args=(t, y), method="Nelder-Mead",
            bounds=bounds, options={"maxiter": 400, "xatol": 1e-4, "fatol": 1e-8},
        )
        if best is None or result.fun < best.fun:
            best = result

    tc, m, w = best.x
    X = lppl_design_matrix(t, tc, m, w)
    coef, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
    A, B, C1, C2 = coef
    C = float(np.hypot(C1, C2))

    # 過濾條件:全部通過才算「合格擬合」
    osc = (w / (2 * np.pi)) * np.log((tc - t1) / (tc - t2)) if tc > t2 else 0.0
    damping = (m * abs(B)) / (w * C) if C > 0 else np.inf
    tc_ok = (t2 + TC_LO_FRAC * dt_win) <= tc <= (t2 + TC_HI_FRAC * dt_win)
    qualified = (
        (M_MIN < m < M_MAX)
        and (W_MIN < w < W_MAX)
        and tc_ok
        and (osc > OSC_MIN)
        and (damping > DAMPING_MIN)
    )
    return {
        "end_idx": end_idx,
        "window": win,
        "tc": float(tc),
        "m": float(m),
        "omega": float(w),
        "B": float(B),
        "qualified_pos": bool(qualified and B < 0),
        "qualified_neg": bool(qualified and B > 0),
    }


def build_exposure(pos_conf: pd.Series, neg_conf: pd.Series) -> pd.Series:
    """狀態機:正泡沫信心高出場,信心退潮或負泡沫高回補。"""
    exposure = pd.Series(1.0, index=pos_conf.index)
    holding = True
    for date in pos_conf.index:
        p = pos_conf.loc[date]
        n = neg_conf.loc[date]
        if holding and p >= EXIT_THRESHOLD:
            holding = False
        elif not holding and (p <= REENTER_THRESHOLD or n >= NEG_REENTER):
            holding = True
        exposure.loc[date] = 1.0 if holding else 0.0
    return exposure


def perf_stats(creturn: pd.Series) -> dict:
    """純指數算術績效統計(基準與策略用同一套公式)。"""
    cr = creturn.dropna()
    ret = cr.pct_change().dropna()
    years = (cr.index[-1] - cr.index[0]).days / 365.25
    total = float(cr.iloc[-1] / cr.iloc[0] - 1)
    mret = cr.resample("ME").last().pct_change().dropna()
    return {
        "cagr": round(((1 + total) ** (1 / years) - 1) * 100, 2),
        "daily_sharpe": round(float(ret.mean() / ret.std() * 252 ** 0.5), 2),
        "monthly_sortino": round(float(mret.mean() / mret[mret < 0].std() * 12 ** 0.5), 2),
        "max_drawdown": round(float((cr / cr.cummax() - 1).min()) * 100, 2),
        "total_return": round(total * 100, 1),
    }


def main() -> None:
    from finlab import data
    from finlab.backtest import sim

    # ---------- 資料(finlab 會自動引導登入) ----------
    adj_close = data.get("etl:adj_close")
    price = adj_close["0050"]
    price = price[price.index <= END].dropna()
    del adj_close

    logp = np.log(price.values)
    dates = price.index

    # 觀測點:每 5 個交易日一個,需有 120 天歷史,從 2017-07 起算暖身
    first_obs = max(120, int(np.searchsorted(dates, np.datetime64("2017-07-01"))))
    obs_idx = list(range(first_obs, len(dates), OBS_STEP))

    # ---------- 滾動 LPPLS 擬合(多核心平行) ----------
    tasks = [(i, w) for i in obs_idx for w in WINDOWS]
    print(f"fitting {len(tasks)} windows on {len(obs_idx)} observation dates...")
    rows = []
    with ProcessPoolExecutor(max_workers=8, initializer=_init_worker,
                             initargs=(logp,)) as executor:
        for row in executor.map(fit_one_window, tasks, chunksize=32):
            rows.append(row)
    fits = pd.DataFrame(rows)

    # ---------- 信心指標:通過過濾的窗格比例 ----------
    grouped = fits.groupby("end_idx")[["qualified_pos", "qualified_neg"]].mean()
    pos_conf = pd.Series(grouped["qualified_pos"].values,
                         index=dates[grouped.index]).reindex(dates).ffill().fillna(0)
    neg_conf = pd.Series(grouped["qualified_neg"].values,
                         index=dates[grouped.index]).reindex(dates).ffill().fillna(0)

    in_sample = dates >= START
    pos_conf = pos_conf[in_sample]
    neg_conf = neg_conf[in_sample]
    price_in = price[in_sample]

    # ---------- 0050 買進持有基準(純指數算術,不含成本) ----------
    bench_creturn = price_in / price_in.iloc[0]
    print("0050 buy-and-hold:", perf_stats(bench_creturn))

    # ---------- LPPLS 擇時策略(finlab sim 含台股預設成本) ----------
    exposure = build_exposure(pos_conf, neg_conf)
    position = pd.DataFrame({"0050": exposure})
    report = sim(position, upload=False)
    strat_creturn = report.creturn
    # sim() 的 creturn 會延伸到執行當日,必須雙端截斷到 END 釘日,否則統計混入快照日後的行情
    strat_creturn = strat_creturn[(strat_creturn.index >= START) & (strat_creturn.index <= END)]
    print("LPPLS timing:", perf_stats(strat_creturn))
    print("time in market: {:.1%}".format(exposure.mean()))
    print("number of trades:", int((exposure.diff().abs() > 0).sum()))

    # ---------- 輸出 ----------
    pd.DataFrame({
        "adj_close": price_in,
        "pos_conf": pos_conf,
        "neg_conf": neg_conf,
        "exposure": exposure,
        "strategy_creturn": strat_creturn.reindex(price_in.index).ffill(),
        "bench_creturn": bench_creturn,
    }).to_csv("lppls_0050_data.csv")
    print("saved lppls_0050_data.csv")


if __name__ == "__main__":
    main()
