30分ブレークアウト戦略にショートもやってみたら往復ビンタくらった。

investment

前回は30分ブレークアウト戦略で買い(ロング)のみでバックテストやった。その結果がこれ。
11銘柄中、8銘柄で勝ち、3銘柄で負け、トータル勝ち。資金は100万円。イビデンでリターンが5.7%ということは57000円の利確となる。
期間は60日。そのうちトレードは各銘柄30日くらいなので、半分しかトレードしてない。

株価が下がっている日は空売り(ショート)すればトレード回数が倍になって利確も倍になるのではないかと考えて、ショートもできるようにChatGPTにソースを変更してもらった。

取引回数は倍くらいに増えて、リターンが増えた銘柄もあるけど、負け銘柄が増えて、トータルでプラマイゼロみたいな結果になった。これは予想外。ドテンは往復ビンタをくらうだけなのでやってはいけないのかな。確かに実トレードでも、空売りで入って負けて、今日は上昇トレンドかと思って買うと天井で往復ビンタをくらう。バックテストもドテンは負ける結果になった。


OpeningRangeBreakout_30m__Optimize-short.py

import yfinance as yf
import pandas as pd
from backtesting import Backtest, Strategy
from datetime import datetime, time, timedelta
import pytz
#import multiprocessing as mp
#import backtesting as btlib  # pip: Backtesting.py

# =============================
# 30分ブレークアウト(ORBO 30m)
# =============================
# 概要:
# ・市場オープン(09:00)〜09:30の高値・安値を Opening Range(OR) とする
# ・09:30以降、上抜けでロング(ORHブレイク)。ストップは ORL。任意でTP=R倍
# ・同日中は 1 回の新規のみ(デフォルト)。約定は Backtesting.py 仕様で次バー寄り。
# ・資金上限クリップ/未約定抑制/立ち上がり検出を実装
#
# 使い方:
#   python OpeningRangeBreakout_30m.py  (スクリプト単体実行)
#   下の __main__ の設定を必要に応じて変更
#
# 依存:
#   pip install yfinance backtesting pandas pytz

JST = "Asia/Tokyo"


# 1) Windowsの並列最適化は spawn 前提
#btlib.Pool = mp.get_context("spawn").Pool


class ORB30(Strategy):
    # === パラメータ ===
    MARKET_OPEN = time(9, 0)     # 東証オープン
    OR_MINUTES = 30              # オープニングレンジの分数
    NO_NEW_AFTER = time(14, 30)  # この時刻以降は新規を出さない

    use_fixed_size = False       # True: 固定枚数で発注 / False: リスク一定
    fixed_size = 100             # 固定発注サイズ

    risk_per_trade = 0.01        # 口座の1%を1トレードの最大リスクに
    tp_R = 1.5                   # 利確は R=1.5 倍(0 なら TP なし)
    one_entry_per_day = True     # 同日中は1回のみエントリー
    eod_flatten = True                 # 引け前に必ず手仕舞い
    eod_close_time = time(14, 55)      # 例: 14:55でクローズ

    # === 内部状態 ===
    def init(self):
        self.current_day = None
        self.orh = None
        self.orl = None
        self.or_ready = False
        self.entry_done_today = False
        self.entry_ok_prev = False
        self.last_entry_bar = -999999
        # ログ用
        self.afford_clip = 0  # 資金クリップ回数
        self.afford_block = 0 # 資金不足で size<1 になりブロック回数
        # VWAP 累積用(その日ごとにリセット)
        self._cum_pv = 0.0   # 累積(価格×出来高)
        self._cum_vol = 0.0  # 累積出来高

    def _jst_dt(self):
        ts = pd.Timestamp(self.data.index[-1])
        if ts.tzinfo is None:
            return ts.tz_localize(JST)
        return ts.tz_convert(JST)

    def _time_of_day(self):
        return self._jst_dt().time()

    def _date(self):
        return self._jst_dt().date()

    def _within_or_window(self, t: time) -> bool:
        start = self.MARKET_OPEN
        end = (datetime.combine(datetime.today(), start) +
            timedelta(minutes=self.OR_MINUTES)).time()
        return start <= t < end

    def _after_or_window(self, t: time) -> bool:
        start = self.MARKET_OPEN
        end = (datetime.combine(datetime.today(), start) +
            timedelta(minutes=self.OR_MINUTES)).time()
        return t >= end

    def next(self):
        t = self._time_of_day()
        d = self._date()
        # if not hasattr(self, "_dbg"):
        #     self._dbg = 0
        # if self._dbg < 6:  # 最初の数回だけ出力
        #     print("[DBG] t,d:", t, d, "| orh,orl:", self.orh, self.orl, "| or_ready:", self.or_ready)
        #     self._dbg += 1

        close = float(self.data.Close[-1])
        high  = float(self.data.High[-1])
        low   = float(self.data.Low[-1])
        vol   = float(self.data.Volume[-1])

        # === 日替わり初期化 ===
        if self.current_day != d:
            self.current_day = d
            self.orh = None; self.orl = None; self.or_ready = False
            self.entry_done_today = False
            self.entry_ok_prev = False
            self.last_entry_bar = -999999
            self._cum_pv = 0.0
            self._cum_vol = 0.0
            if self.position:
                self.position.close()
                return

        # === OR構築(オープニングレンジ内) ===
        if self._within_or_window(t):
            self.orh = high if (self.orh is None or high > self.orh) else self.orh
            self.orl = low  if (self.orl is None or low  < self.orl) else self.orl
            self.or_ready = False
            if vol > 0:
                self._cum_pv  += close * vol
                self._cum_vol += vol
            return

        # OR確定(ウィンドウ終了後に一度だけ)
        if (not self.or_ready) and (self.orh is not None) and (self.orl is not None) and self._after_or_window(t):
            self.or_ready = True

        # === VWAP更新(場中毎バー) ===
        if vol > 0:
            self._cum_pv  += close * vol
            self._cum_vol += vol
        vwap_now = self._cum_pv / max(self._cum_vol, 1e-9)

        # === デイトレ:引け前に必ずクローズ(最優先) ===
        if self.position and self.eod_flatten and t >= self.eod_close_time:
            self.position.close()
            return

        # 既存ポジ保有中は新規を出さない(EODクローズは上で済ませる)
        # if self.position:
        #     return

        # 新規発注の共通ガード
        if not self.or_ready:
            return
        if t >= self.NO_NEW_AFTER:
            return
        if self.one_entry_per_day and self.entry_done_today:
            return

        # === エントリー条件(VWAPコンファーム付き) ===
        long_ok  = (close > self.orh) and (close > vwap_now) if (self.orh is not None) else False
        short_ok = (close < self.orl) and (close < vwap_now) if (self.orl is not None) else False
        entry_ok = long_ok or short_ok

        # 立ち上がりだけ許可
        edge = entry_ok and (not self.entry_ok_prev)
        self.entry_ok_prev = entry_ok
        if not edge or self.orders:
            return

        # === リスク定義・SL/TP(方向別) ===
        if long_ok:
            risk = max(close - self.orl, 1e-6)
            sl   = self.orl
            tp   = close + self.tp_R * (close - sl) if self.tp_R > 0 else None
            side = self.buy
        else:
            risk = max(self.orh - close, 1e-6)
            sl   = self.orh
            tp   = close - self.tp_R * (sl - close) if self.tp_R > 0 else None
            side = self.sell

        # === サイジング(資金上限クリップ付き) ===
        if self.use_fixed_size:
            size = int(self.fixed_size)
        else:
            size = int((self.equity * self.risk_per_trade) / risk)
        size = max(1, size)
        max_affordable = int(self.equity // max(close, 1e-9))
        if max_affordable <= 0:
            self.afford_block += 1
            return
        if size > max_affordable:
            self.afford_clip += 1
            size = max_affordable

        # === 発注 ===
        if tp is not None:
            side(size=size, sl=sl, tp=tp)
        else:
            side(size=size, sl=sl)

        self.entry_done_today = True              


# =============================
# データ取得 & バックテスト実行
# =============================
def load_data(ticker: str = "7203.T", period: str = "60d", interval: str = "5m") -> pd.DataFrame:
    """
    yfinance の列体系(単一/二重)に頑強に対応し、
    Backtesting.py が期待する単一レベルの ['Open','High','Low','Close','Volume'] 列へ正規化する。
    さらに空データや重複インデックスも整える。
    """
    df = yf.download(
        ticker,
        period=period,
        interval=interval,
        auto_adjust=False,
        actions=False,
        group_by="column",   # 可能なら単一列
        progress=False,
    )

    if df is None or len(df) == 0:
        raise ValueError("No data returned from yfinance. Try a shorter 'period' or different 'interval'.")

    # --- 列正規化 ---
    def _ensure_ohlcv_columns(_df: pd.DataFrame) -> pd.DataFrame:
        # すでに単一レベルで Open などが揃っているならそのまま
        base = {"Open", "High", "Low", "Close"}
        if not isinstance(_df.columns, pd.MultiIndex):
            cols = set(map(str, _df.columns))
            # 小文字化対応
            if base.issubset(cols):
                return _df
            if base.issubset({c.capitalize() for c in cols}):
                _df.columns = [c.capitalize() for c in _df.columns]
                return _df

        # MultiIndex の場合:OHLCV が level=0 にいるケースが多い
        if isinstance(_df.columns, pd.MultiIndex):
            lv0 = list(map(str, _df.columns.get_level_values(0)))
            lv1 = list(map(str, _df.columns.get_level_values(1)))
            if base.issubset(set(lv0)):
                _df.columns = _df.columns.get_level_values(0)
                return _df
            if base.issubset(set(lv1)):
                _df.columns = _df.columns.get_level_values(1)
                return _df
            # それでもダメなら、各タプルの先頭を採用
            try:
                _df.columns = [t[0] if isinstance(t, tuple) and len(t) > 0 else str(t) for t in _df.columns]
                return _df
            except Exception:
                pass
        return _df

    df = _ensure_ohlcv_columns(df)

    # 余計な列を落とし、欠けている列がないか検証
    keep = ["Open", "High", "Low", "Close", "Volume"]
    # Volume が存在しない市場/足でも動くように、まずは可能な列だけ選択
    present = [c for c in keep if c in df.columns]
    df = df[present].copy()

    # Open/High/Low/Close が揃っているか最終確認
    required = {"Open", "High", "Low", "Close"}
    if not required.issubset(set(df.columns)):
        raise ValueError(f"Columns after normalization: {list(df.columns)} — Missing one of {sorted(required)}")

    # 欠損・重複・順序
    df = df.dropna()
    if df.index.has_duplicates:
        df = df[~df.index.duplicated(keep='last')]
    df = df.sort_index()

    return df

def normalize_heatmap(hm):
    """Backtesting.pyの版差対策:SeriesでもDataFrameでもtp_Rとmetricを取り出す"""
    if hm is None:
        return None, None
    if isinstance(hm, pd.Series):
        metric = hm.name or "Sharpe Ratio"
        df = hm.rename(metric).reset_index()
        df.columns = ['tp_R', metric]
        return df, metric
    # DataFrame
    metric_candidates = [c for c in hm.columns
                         if c.lower().replace(' ','') in ('sharperatio','sharpe')]
    metric = metric_candidates[0] if metric_candidates else hm.columns[-1]
    df = hm.reset_index()
    if 'tp_R' not in df.columns and df.index.name:
        df = df.rename(columns={df.index.name: 'tp_R'})
    return df, metric


brand_names = {
    "1570.T": "Nikkei NF",
    "6526.T": "Sociotech",
    "7203.T": "Toyota",
    "6857.T": "Advantest",
    "6146.T": "Disco",
    "4062.T": "Ibiden",
    "6920.T": "LaserTech",
    "5803.T": "Fujikura",
    "8035.T": "Tokyo Electron",
    "6758.T": "SONY group",
    "9984.T": "Softbank group",
    "7011.T": "Mitsubishi Heavy Ind.",
    "7013.T": "IHI",
#    "AAPL":   "Apple Inc.",
    }

PERIOD   = "60d"
INTERVAL = "5m"

param_grid = [1.0, 1.2, 1.5, 1.8, 2.0, 2.5, 3.0]  # tp_R 候補
summary_rows = []

for tk,name in brand_names.items():
    print(f"\n=== {tk} {name} ===")
    try:
        df = load_data(tk, PERIOD, INTERVAL)
        bt = Backtest(df, ORB30, cash=1_000_000, commission=0.0005, exclusive_orders=True)

        # 最適化(Sharpe基準を例に)
        res = bt.optimize(
            tp_R=param_grid,
            maximize='Sharpe Ratio',
            constraint=lambda p: p.tp_R > 0,
            return_heatmap=True
        )
        stats, heatmap = res if isinstance(res, tuple) else (res, None)
        best_tp = stats._strategy.tp_R

        # 全試行の表を整形して保存
        hm_df, metric_col = normalize_heatmap(heatmap)
        if hm_df is not None:
            hm_df.sort_values(metric_col, ascending=False).to_csv(f"{tk}_opt_heatmap.csv", index=False,float_format="%.6f")

        # ベスト設定の詳細
        best_trades = stats['_trades']
        best_trades.to_csv(f"{tk}_trades_best.csv", index=False)

        # ベスト設定で再実行→プロット保存(GUI不要)
        _ = bt.run(tp_R=best_tp)
        bt.plot(filename=f"{tk}_orb30_best.html", open_browser=False)

        # 要点をサマリー行に
        summary_rows.append({
            "Ticker": tk,
            "name":name,
            "Best tp_R": best_tp,
            "Sharpe": stats.get('Sharpe Ratio', None),
            "Return%": stats.get('Return [%]', None),
            "MaxDD%": stats.get('Max. Drawdown [%]', None),
            "#Trades": stats.get('# Trades', None),
            "PF": stats.get('Profit Factor', None)
        })

        print(f"  -> Best tp_R={best_tp}, Sharpe={stats.get('Sharpe Ratio')}, Return%={stats.get('Return [%]')}")
        print(f"  Saved: {tk}_opt_heatmap.csv, {tk}_trades_best.csv, {tk}_orb30_best.html")

    except Exception as e:
        print(f"  [ERROR] {tk}: {e}")

# 銘柄横断サマリー
summary = pd.DataFrame(summary_rows)
summary = summary.sort_values("Sharpe", ascending=False)
print("\n=== Summary ===")
print(summary)
summary.to_csv("summary_by_ticker.csv", index=False)
print("Saved summary_by_ticker.csv")


コメント

タイトルとURLをコピーしました