npm install さようなら — FastAPI + htmx でトレーディングシステムを作る
【第7回】戦略とリスク管理

このシリーズについて
こんにちは、souです。 「npm install さようなら — FastAPI + htmx でトレーディングシステムを作る」シリーズの第7回へようこそ!
本シリーズは全8回構成で、日経225先物・オプションの自動売買ダッシュボードをゼロから構築していきます。
| # | タイトル | 状態 |
|---|---|---|
| 1 | 環境構築とプロジェクト設計 | 公開済み |
| 2 | htmx でSPA風ダッシュボードを作る | 公開済み |
| 3 | ダークテーマUIとコンポーネント設計 | 公開済み |
| 4 | DB設計とAlembicマイグレーション | 公開済み |
| 5 | 証券APIとの接続 | 公開済み |
| 6 | トレーディングエンジン — 司令塔の設計 | 公開済み |
| 7 | 戦略とリスク管理(本記事) | 今ここ |
| 8 | バックテストと本番デプロイ | 近日公開 |
前提: このシリーズの第1〜4回を読んでいることを前提とします。
目次
- 今回のゴール
- Strategy 抽象基底クラスの設計
- テクニカル指標の計算 — indicators.py
- 先物トレンドフォロー戦略の実装
- リスクマネージャー — 生き残るための防御壁
- 動的ポジションサイジング — 信頼度ベースの枚数決定
- 戦略の ON/OFF 制御
- まとめと次回予告
1. 今回のゴール
前回(第6回)でトレーディングエンジンという「司令塔」が完成しました。しかし、司令塔には肝心の判断基準がまだありません。
今回は自動売買システムの頭脳にあたる部分を構築します。具体的には以下の4つです。
- Strategy 抽象基底クラス — すべての戦略が従う共通インターフェース
- テクニカル指標の計算モジュール — RSI、SMA、MACD、ボリンジャーバンド
- 先物トレンドフォロー戦略 — 3つのインジケータの複合シグナルでエントリー
- リスクマネージャー + ポジションサイザー — 損失を限定し、確信度に応じて枚数を動的調整
完成後のディレクトリ構造はこのようになります。
src/systrade/
├── strategy/
│ ├── __init__.py
│ ├── base.py # 抽象基底クラス + Signal データクラス
│ └── futures_trend.py # 先物トレンドフォロー戦略
└── services/
├── indicators.py # テクニカル指標(RSI, SMA, MACD, BB)
├── risk_manager.py # リスク管理(損切り・利確・日次上限)
└── position_sizer.py # 動的ポジションサイジング
データの流れを整理すると、Tick データが Strategy に入り、Signal が生成され、RiskManager でリスクチェックされた後、PositionSizer で発注枚数が決定されてエンジンに渡される、という一方向のパイプラインになっています。
2. Strategy 抽象基底クラスの設計
なぜ抽象基底クラスが必要か
自動売買システムでは、将来的に複数の戦略を並行運用したくなるものです。たとえば「先物のトレンドフォロー」と「オプションの売り戦略」を同時に動かしたい、といったケースですね。
このとき、すべての戦略が同じインターフェースに従っていれば、トレーディングエンジンは戦略の中身を知らなくても統一的に扱えます。これが抽象基底クラスを使う理由です。
Signal データクラス
まず、戦略が出力する「売買シグナル」を定義します。
# src/systrade/strategy/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from src.systrade.models.market import Tick
from src.systrade.models.order import OrderRequest
class SignalType(str, Enum):
BUY = "buy"
SELL = "sell"
HOLD = "hold"
@dataclass
class Signal:
"""売買シグナル"""
signal_type: SignalType
symbol: str
reason: str = ""
strength: float = 0.0 # 0.0〜1.0
order_request: OrderRequest | None = None
context: dict | None = None # インジケータ値等のメタデータ
Signal の各フィールドの役割を整理しておきます。
| フィールド | 型 | 役割 |
|---|---|---|
signal_type |
SignalType |
BUY / SELL / HOLD の3択 |
symbol |
str |
対象銘柄コード |
reason |
str |
シグナル発生理由(ログ・UI表示用) |
strength |
float |
シグナルの強さ(0.0〜1.0)。ポジションサイジングで利用 |
order_request |
OrderRequest |
具体的な発注内容。エンジンがそのまま証券APIに送る |
context |
dict |
テクニカル指標の値などメタデータ。ポジションサイザーが参照 |
context フィールドが地味に重要です。ここにインジケータの数値(RSI値やMACD値など)を入れておくことで、後段のポジションサイザーが「どれくらい自信のあるシグナルか」を判断できるようになります。
抽象基底クラス
class Strategy(ABC):
"""戦略の抽象基底クラス"""
def __init__(self, name: str, enabled: bool = True) -> None:
self.name = name
self.enabled = enabled
@abstractmethod
async def on_tick(self, tick: Tick) -> Signal | None:
"""ティック受信時の処理"""
...
@abstractmethod
async def on_bar(self, bar: dict) -> Signal | None:
"""バー(OHLCV)確定時の処理"""
...
@abstractmethod
async def generate_signal(self) -> Signal | None:
"""現在の状態からシグナルを生成"""
...
def toggle(self, enabled: bool) -> None:
self.enabled = enabled
ポイントは3つの抽象メソッドです。
on_tick()— リアルタイムの価格更新ごとに呼ばれます。スキャルピングやトレーリングストップの判定に使いますon_bar()— ローソク足(OHLCV)が確定したタイミングで呼ばれます。SMAやMACDなど足ベースの指標はここで計算しますgenerate_signal()— 現在のインジケータ状態からシグナルを生成する純粋な判定ロジックです
この設計により、ティック駆動型の高頻度戦略からバー駆動型のスイング戦略まで、さまざまなスタイルに対応できます。
toggle() メソッドで戦略の ON/OFF を切り替えられるようにしている点も見逃せません。これについてはセクション7で詳しく解説します。
3. テクニカル指標の計算 — indicators.py
戦略が判断を下すために必要なテクニカル指標を計算するモジュールです。ここでは4つの指標を実装しています。
3-1. RSI(相対力指数)
RSI は「最近の値動きのうち、上昇がどれくらいの割合を占めているか」を 0〜100 の数値で表す指標です。一般的に 30以下で売られすぎ、70以上で買われすぎ と判断されます。
計算式は以下の通りです。
$$ RSI = 100 - \frac{100}{1 + RS} $$
$$ RS = \frac{\text{平均上昇幅}}{\text{平均下落幅}} $$
コードでは Wilder の平滑移動平均(ewm の alpha=1/period)を使っています。これにより直近の値動きにより敏感に反応します。
# src/systrade/services/indicators.py
import numpy as np
import pandas as pd
def add_rsi(df: pd.DataFrame, period: int = 14, column: str = "Close") -> pd.DataFrame:
"""RSI(相対力指数)を追加"""
delta = df[column].diff()
gain = delta.where(delta > 0, 0.0)
loss = (-delta).where(delta < 0, 0.0)
avg_gain = gain.ewm(alpha=1.0 / period, min_periods=period).mean()
avg_loss = loss.ewm(alpha=1.0 / period, min_periods=period).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df["RSI"] = 100.0 - (100.0 / (1.0 + rs))
return df
avg_loss がゼロの場合(全期間上昇し続けた場合)にゼロ除算が起きないよう、replace(0, np.nan) で安全に処理しているのがポイントです。
3-2. SMA(単純移動平均)
SMA は指定期間の終値の単純平均です。短期SMA(デフォルト5期間) と長期SMA(デフォルト25期間) の2本を計算し、クロス(交差)でトレンド転換を検知します。
$$ SMA_n = \frac{1}{n} \sum_{i=0}^{n-1} P_{t-i} $$
def add_sma(
df: pd.DataFrame,
short: int = 5,
long: int = 25,
column: str = "Close",
) -> pd.DataFrame:
"""短期・長期SMA(単純移動平均)を追加"""
df["SMA_short"] = df[column].rolling(window=short).mean()
df["SMA_long"] = df[column].rolling(window=long).mean()
return df
- ゴールデンクロス: 短期SMAが長期SMAを下から上に突き抜ける → 上昇トレンドの兆し
- デッドクロス: 短期SMAが長期SMAを上から下に突き抜ける → 下降トレンドの兆し
3-3. MACD(移動平均収束拡散法)
MACD はトレンドの方向と勢いを同時に捉える指標です。短期EMA(指数移動平均)と長期EMAの差を取り、さらにその移動平均(シグナルライン)との差分(ヒストグラム)を計算します。
$$ MACD = EMA_{12} - EMA_{26} $$
$$ \text{Signal} = EMA_9(MACD) $$
$$ \text{Histogram} = MACD - \text{Signal} $$
def add_macd(
df: pd.DataFrame,
fast: int = 12,
slow: int = 26,
signal: int = 9,
column: str = "Close",
) -> pd.DataFrame:
"""MACD(移動平均収束拡散法)を追加"""
ema_fast = df[column].ewm(span=fast, adjust=False).mean()
ema_slow = df[column].ewm(span=slow, adjust=False).mean()
df["MACD"] = ema_fast - ema_slow
df["MACD_signal"] = df["MACD"].ewm(span=signal, adjust=False).mean()
df["MACD_hist"] = df["MACD"] - df["MACD_signal"]
return df
ヒストグラムが正ならMACDがシグナルラインの上にある(=上昇トレンドの勢いがある)ことを意味します。
3-4. ボリンジャーバンド
ボリンジャーバンドは移動平均を中心に、標準偏差の倍数分だけ上下にバンドを描きます。バンドの幅が狭まる「スクイーズ」は、大きな値動きの前兆とされています。
$$ \text{Upper} = SMA_{20} + 2\sigma $$
$$ \text{Lower} = SMA_{20} - 2\sigma $$
def add_bollinger_bands(
df: pd.DataFrame,
period: int = 20,
std_dev: float = 2.0,
column: str = "Close",
) -> pd.DataFrame:
"""ボリンジャーバンドを追加"""
sma = df[column].rolling(window=period).mean()
std = df[column].rolling(window=period).std()
df["BB_middle"] = sma
df["BB_upper"] = sma + std_dev * std
df["BB_lower"] = sma - std_dev * std
return df
一括計算の便利関数
4つの指標を毎回個別に呼ぶのは面倒なので、一括計算する関数も用意しています。
def add_all_indicators(
df: pd.DataFrame,
rsi_period: int = 14,
sma_short: int = 5,
sma_long: int = 25,
macd_fast: int = 12,
macd_slow: int = 26,
macd_signal: int = 9,
bb_period: int = 20,
bb_std: float = 2.0,
) -> pd.DataFrame:
"""全テクニカル指標を一括追加"""
df = add_rsi(df, period=rsi_period)
df = add_sma(df, short=sma_short, long=sma_long)
df = add_macd(df, fast=macd_fast, slow=macd_slow, signal=macd_signal)
df = add_bollinger_bands(df, period=bb_period, std_dev=bb_std)
return df
すべてのパラメータをキーワード引数で指定できるようにしているので、config.toml から読み込んだ設定値をそのまま渡せます。
実際にチャート上に表示する場合、RSI はメインチャートの下部にサブチャートとして配置し、SMA とボリンジャーバンドはメインチャートに重ね、MACD は別のサブチャートに表示するのが一般的なレイアウトです。
4. 先物トレンドフォロー戦略の実装
なぜ「複合シグナル」か
テクニカル指標はそれぞれ得意分野が異なります。
| 指標 | 得意 | 弱点 |
|---|---|---|
| SMA クロス | トレンド転換の検知 | レンジ相場でダマシが多い |
| RSI | 過熱感の検知 | 強いトレンドで天井・底を誤判定 |
| MACD | トレンドの勢いの把握 | 遅行性がある |
単一の指標だけに頼ると、どうしても弱点に引っかかる場面が出てきます。そこで、3つの指標が「同じ方向を向いたとき」だけ エントリーすることで、ダマシを大幅に減らすのが複合シグナルの考え方です。
FuturesTrendStrategy クラス
戦略の全体像を見ていきましょう。まずはコンストラクタから。
# src/systrade/strategy/futures_trend.py
import logging
from collections import deque
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
from src.systrade.config import FuturesTrendConfig
from src.systrade.models.market import Tick
from src.systrade.models.order import OrderRequest, OrderType, Side, TradeType
from src.systrade.services.indicators import add_all_indicators
from src.systrade.strategy.base import Signal, SignalType, Strategy
logger = logging.getLogger(__name__)
class FuturesTrendStrategy(Strategy):
"""先物トレンドフォロー戦略"""
def __init__(self, config: FuturesTrendConfig, symbol: str = "") -> None:
super().__init__(name="futures_trend", enabled=config.enabled)
self._config = config
self._symbol = symbol
self._prices: deque[float] = deque(maxlen=max(config.sma_long, 26) + 50)
self._current_side: Side | None = None
self._entry_price: float = 0.0
self._trailing_stop: float = 0.0
self._trailing_pct: float = 1.0 # トレーリングストップ 1%
いくつか注目すべきポイントがあります。
deque(双方向キュー) で価格履歴を保持しています。maxlenを設定することで、古い価格は自動的に捨てられます。メモリ効率がよいですねmaxlenはmax(config.sma_long, 26) + 50としています。SMAの長期期間とMACDのスロー期間(26)のうち大きい方に、余裕として50を足しています- トレーリングストップは1% に設定。日経ミニ先物だと約400円の値幅です
設定パラメータ(config.toml)
FuturesTrendConfig は以下のようなデータクラスで定義されています。
@dataclass
class FuturesTrendConfig:
enabled: bool = True
product: str = "mini"
sma_short: int = 5
sma_long: int = 25
rsi_period: int = 14
rsi_oversold: int = 30
rsi_overbought: int = 70
position_sizer: PositionSizerConfig = field(default_factory=PositionSizerConfig)
config.toml では次のように設定できます。
[strategy.futures_trend]
enabled = true
product = "mini"
sma_short = 5
sma_long = 25
rsi_period = 14
rsi_oversold = 30
rsi_overbought = 70
ティック受信 — on_tick()
async def on_tick(self, tick: Tick) -> Signal | None:
if tick.symbol != self._symbol or not self.enabled:
return None
self._prices.append(tick.price)
self._update_trailing_stop(tick.price)
# トレーリングストップ判定
if self._current_side == Side.BUY and tick.price <= self._trailing_stop:
return self._exit_signal("トレーリングストップ(ロング)")
if self._current_side == Side.SELL and tick.price >= self._trailing_stop:
return self._exit_signal("トレーリングストップ(ショート)")
return None
on_tick() はティックごとに呼ばれるため、処理は軽く保つ必要があります。ここではトレーリングストップの判定だけを行い、テクニカル指標の計算は on_bar() に任せています。
トレーリングストップの更新ロジック
def _update_trailing_stop(self, price: float) -> None:
if self._current_side == Side.BUY:
new_stop = price * (1 - self._trailing_pct / 100)
if new_stop > self._trailing_stop:
self._trailing_stop = new_stop
elif self._current_side == Side.SELL:
new_stop = price * (1 + self._trailing_pct / 100)
if new_stop < self._trailing_stop:
self._trailing_stop = new_stop
トレーリングストップのポイントは「有利な方向にだけ動く」ことです。
- ロングポジションの場合: 価格が上がるたびにストップ水準を切り上げる。下がってもストップは下がらない
- ショートポジションの場合: 価格が下がるたびにストップ水準を切り下げる。上がってもストップは上がらない
これにより、利益が乗ったポジションは「ある程度の利益を確保しつつ、さらなる上昇を狙える」という理想的な動きになります。ロングポジションの場合、価格上昇に追従してストップ水準が切り上がり、価格が反転してストップに到達した時点でイグジットされます。
バー確定時の処理 — on_bar() と generate_signal()
async def on_bar(self, bar: dict) -> Signal | None:
if not self.enabled:
return None
self._prices.append(bar.get("close", 0))
return await self.generate_signal()
on_bar() はシンプルに終値を追加して generate_signal() を呼ぶだけです。判定ロジックの本体を見てみましょう。
async def generate_signal(self) -> Signal | None:
if not self.enabled or len(self._prices) < self._config.sma_long + 10:
return None
df = pd.DataFrame({"Close": list(self._prices)})
df = add_all_indicators(
df,
rsi_period=self._config.rsi_period,
sma_short=self._config.sma_short,
sma_long=self._config.sma_long,
)
latest = df.iloc[-1]
prev = df.iloc[-2]
rsi = latest.get("RSI", 50)
sma_s = latest.get("SMA_short", 0)
sma_l = latest.get("SMA_long", 0)
sma_s_prev = prev.get("SMA_short", 0)
sma_l_prev = prev.get("SMA_long", 0)
macd = latest.get("MACD", 0)
macd_signal = latest.get("MACD_signal", 0)
macd_hist = latest.get("MACD_hist", 0)
ここで add_all_indicators() を呼んで、蓄積された価格データから一気に全インジケータを計算しています。最新の行(iloc[-1])と1つ前の行(iloc[-2])を取得し、SMAクロスの判定に使います。
エントリー条件 — 3つの指標の複合判定
# --- エントリー判定 ---
# ロング: SMAゴールデンクロス + RSI売られすぎ回復 + MACD>Signal
buy_cond = (
sma_s > sma_l
and sma_s_prev <= sma_l_prev
and rsi < self._config.rsi_overbought
and rsi > self._config.rsi_oversold
and macd > macd_signal
)
# ショート: SMAデッドクロス + RSI買われすぎ回復 + MACD<Signal
sell_cond = (
sma_s < sma_l
and sma_s_prev >= sma_l_prev
and rsi > self._config.rsi_oversold
and rsi < self._config.rsi_overbought
and macd < macd_signal
)
ロングエントリーの条件を図解すると次のようになります。
ロング(買い)エントリー条件:
┌─────────────────────────────────────────┐
│ (1) SMA ゴールデンクロス発生 │
│ 今回: SMA_short > SMA_long │
│ 前回: SMA_short <= SMA_long │
│ │
│ AND │
│ │
│ (2) RSI が適正範囲内 │
│ 30 < RSI < 70 │
│ (過熱していない状態) │
│ │
│ AND │
│ │
│ (3) MACD がシグナルラインを上回っている │
│ MACD > MACD_signal │
└─────────────────────────────────────────┘
ここで一点注意が必要なのは、RSI の条件が「30〜70の範囲内」であることです。「30以下で買い」ではなく、「売られすぎ(30以下)から回復して30を超えたが、まだ70には達していない」状態を狙っています。極端な過熱状態を避けるフィルターとして機能しています。
コンテキストの構築
# インジケータ context(PositionSizer 用)
sma_spread = (sma_s - sma_l) / sma_l if sma_l else 0.0
bb_width = (bb_upper - bb_lower) / bb_middle if bb_middle else 0.0
bb_range = bb_upper - bb_lower
bb_position = (close - bb_lower) / bb_range if bb_range else 0.5
context = {
"rsi": float(rsi),
"macd_hist": float(macd_hist),
"sma_spread": float(sma_spread),
"bb_width": float(bb_width),
"bb_position": float(bb_position),
}
このコンテキスト情報は、後段のポジションサイザーが「このシグナルはどれくらい信頼できるか」を数値化するために使います。
sma_spread: 短期SMAと長期SMAの乖離率。大きいほどトレンドが強いbb_width: ボリンジャーバンドの幅を中央値で正規化。小さいほどスクイーズ(収束)状態bb_position: 現在の価格がバンド内のどこに位置しているか(0.0=下限、1.0=上限)
エントリー / イグジット シグナルの生成
# ポジション無しの場合 → エントリー
if self._current_side is None:
if buy_cond:
return self._entry_signal(Side.BUY, "SMAクロス+RSI+MACD ロング", context)
if sell_cond:
return self._entry_signal(Side.SELL, "SMAクロス+RSI+MACD ショート", context)
return None
# ポジション有りの場合 → 逆シグナルでイグジット
if self._current_side == Side.BUY and sell_cond:
return self._exit_signal("逆シグナル(売り転換)")
if self._current_side == Side.SELL and buy_cond:
return self._exit_signal("逆シグナル(買い転換)")
return None
ポジションを持っていないときはエントリー、持っているときは逆方向のシグナルが出たらイグジットします。つまり、ロングポジション中にショートのシグナルが出たら手仕舞い という「ドテン」ではなく「一旦決済」のスタイルです。
ヘルパーメソッドも見ておきましょう。
def _entry_signal(self, side: Side, reason: str, context: dict | None = None) -> Signal:
return Signal(
signal_type=SignalType.BUY if side == Side.BUY else SignalType.SELL,
symbol=self._symbol,
reason=reason,
strength=0.7,
order_request=OrderRequest(
symbol=self._symbol,
side=side,
qty=1,
order_type=OrderType.MARKET,
trade_type=TradeType.NEW,
strategy_name=self.name,
),
context=context,
)
def _exit_signal(self, reason: str) -> Signal:
exit_side = Side.SELL if self._current_side == Side.BUY else Side.BUY
return Signal(
signal_type=SignalType.SELL if exit_side == Side.SELL else SignalType.BUY,
symbol=self._symbol,
reason=reason,
strength=1.0,
order_request=OrderRequest(
symbol=self._symbol,
side=exit_side,
qty=1,
order_type=OrderType.MARKET,
trade_type=TradeType.CLOSE,
strategy_name=self.name,
),
)
注目すべきは、エントリーシグナルの strength が 0.7 なのに対し、イグジットシグナルの strength は 1.0 です。「入るときは慎重に、出るときは迅速に」という原則を反映しています。また、発注タイプはすべて MARKET(成行注文)を使用しています。先物のトレンドフォローではスリッページよりも約定確実性を重視するためです。
5. リスクマネージャー -- 生き残るための防御壁
どんなに優れた戦略でも、リスク管理がなければ一度の暴落で口座が吹き飛びます。リスクマネージャーは「損失を限定し、利益を伸ばす」ための守りの仕組みです。
RiskConfig — 設定パラメータ
@dataclass
class RiskConfig:
max_position_futures: int = 5 # 先物の最大保有枚数
max_position_options: int = 10 # オプションの最大保有枚数
max_loss_per_day: float = 50_000 # 日次最大損失(円)
stop_loss_pct: float = 3.0 # 損切りライン(%)
take_profit_pct: float = 5.0 # 利確ライン(%)
trailing_stop_pct: float = 1.5 # トレーリングストップ幅(%)
use_trailing_stop: bool = True # トレーリングストップ有効化
daily_profit_target: float = 100_000 # 日次利益目標(円)
デフォルト値は「日経ミニ先物を5枚以内で日中トレード」を想定した設定です。日次損失は5万円が上限、利益目標は10万円としています。
注文前のリスクチェックフロー
check_order() メソッドがエンジンから呼ばれ、注文を出してよいかどうかを判定します。
注文リクエスト受信
│
▼
┌──────────────────────┐
│ 日次損失上限チェック │ ← 本日の実現損益が -50,000円 以下か?
│ (max_loss_per_day) │
└──────────┬───────────┘
│ OK
▼
┌──────────────────────┐
│ ポジション上限チェック │ ← 先物5枚 / OP10枚 を超えないか?
│ (max_position_*) │
└──────────┬───────────┘
│ OK
▼
✅ 注文許可
コードを見てみましょう。
class RiskManager:
"""リスク管理"""
def __init__(self, config: RiskConfig, pnl_repo: PnLRepository | None = None) -> None:
self._config = config
self._pnl_repo = pnl_repo
self._realized_pnl_today: float = 0.0
self._peak_pnl: dict[str, float] = {} # 銘柄別の最高含み益%
def check_order(
self,
request: OrderRequest,
current_positions: list[Position],
margin_available: float = float("inf"),
) -> RiskCheck:
"""注文前のリスクチェック"""
# 日次損失上限チェック
if self._realized_pnl_today <= -self._config.max_loss_per_day:
return RiskCheck(
allowed=False,
reason=f"日次損失上限到達: {self._realized_pnl_today:.0f}円",
)
# ポジション上限チェック
futures_count = sum(
p.qty
for p in current_positions
if p.product_type
in (
ProductType.FUTURES_LARGE,
ProductType.FUTURES_MINI,
ProductType.FUTURES_MICRO,
)
)
options_count = sum(
p.qty
for p in current_positions
if p.product_type
in (ProductType.OPTION_CALL, ProductType.OPTION_PUT)
)
# 新規注文のみチェック(決済注文はチェック不要)
if request.trade_type == TradeType.NEW:
is_option = any(
pt.value in request.symbol.lower()
for pt in (ProductType.OPTION_CALL, ProductType.OPTION_PUT)
)
if is_option:
if options_count + request.qty > self._config.max_position_options:
return RiskCheck(
allowed=False,
reason=f"OP枚数上限: {options_count}+{request.qty} > {self._config.max_position_options}",
)
else:
if futures_count + request.qty > self._config.max_position_futures:
return RiskCheck(
allowed=False,
reason=f"先物枚数上限: {futures_count}+{request.qty} > {self._config.max_position_futures}",
)
return RiskCheck(allowed=True)
ここで重要なのは、決済注文に対してはポジション上限チェックをスキップしている 点です。損切りの決済注文が「枚数上限に引っかかって通らない」となったら大変ですからね。
損切り・利確・トレーリングストップ
ポジション保有中は3つの判定が常に走ります。
def check_stop_loss(self, position: Position) -> bool:
"""損切り判定(最優先)"""
if position.avg_price == 0:
return False
pnl_pct = self._calc_pnl_pct(position)
if pnl_pct <= -self._config.stop_loss_pct:
logger.warning(
"損切りシグナル: %s 含み損 %.2f%%", position.symbol, pnl_pct
)
return True
return False
def check_take_profit(self, position: Position) -> bool:
"""利確判定"""
if position.avg_price == 0:
return False
pnl_pct = self._calc_pnl_pct(position)
if pnl_pct >= self._config.take_profit_pct:
logger.info(
"利確シグナル: %s 含み益 %.2f%%", position.symbol, pnl_pct
)
return True
return False
def check_trailing_stop(self, position: Position) -> bool:
"""トレーリングストップ判定(ピーク含み益から下落で利確)"""
if not self._config.use_trailing_stop or position.avg_price == 0:
return False
pnl_pct = self._calc_pnl_pct(position)
# ピーク更新
prev_peak = self._peak_pnl.get(position.symbol, 0.0)
if pnl_pct > prev_peak:
self._peak_pnl[position.symbol] = pnl_pct
return False
peak = self._peak_pnl.get(position.symbol, 0.0)
# ピークが利益圏にあり、そこからtrailing_stop_pct分下落したらトリガー
if peak > self._config.trailing_stop_pct and (peak - pnl_pct) >= self._config.trailing_stop_pct:
logger.info(
"トレーリングストップ: %s ピーク %.2f%% → 現在 %.2f%%",
position.symbol, peak, pnl_pct,
)
return True
return False
3つの判定の優先順位と動作をまとめます。
| 判定 | 条件 | 優先度 | 意味 |
|---|---|---|---|
| 損切り | 含み損 >= 3.0% | 最高 | 損失が拡大する前に強制退場 |
| 利確 | 含み益 >= 5.0% | 中 | 目標利益を確保 |
| トレーリングストップ | ピーク含み益から 1.5% 下落 | 中 | 利益を伸ばしつつ、反転時にも利益を確保 |
トレーリングストップの挙動をもう少し詳しく説明します。たとえばこんなケースです。
(1) エントリー: 含み益 0.0% → ピーク記録: 0.0%
(2) 価格上昇: 含み益 +2.0% → ピーク更新: 2.0%
(3) さらに上昇: 含み益 +3.5% → ピーク更新: 3.5% ← ピーク > 1.5% なのでトレーリング有効
(4) 少し下落: 含み益 +2.5% → 3.5% - 2.5% = 1.0% < 1.5% → まだOK
(5) さらに下落: 含み益 +1.8% → 3.5% - 1.8% = 1.7% >= 1.5% → トリガー! 利確
ピーク含み益が +3.5% まで伸びた後、1.5% 以上下落して +1.8% になった時点で自動的に利確されます。+1.8% の利益は確保できるわけですね。
DB との連携
リスクマネージャーはDBと連携して、日次の損益状態を永続化しています。
async def restore_from_db(self) -> None:
"""DBから本日のPnL状態を復元"""
if not self._pnl_repo:
return
try:
data = await self._pnl_repo.get_today()
if data:
self._realized_pnl_today = data["realized_pnl"]
self._peak_pnl = data.get("peak_pnl", {})
except Exception as e:
logger.error("PnL復元エラー: %s", e)
async def save_to_db(self, unrealized_pnl: float = 0.0) -> None:
"""現在のPnL状態をDBに保存"""
if not self._pnl_repo:
return
try:
await self._pnl_repo.upsert_daily(
realized_pnl=self._realized_pnl_today,
unrealized_pnl=unrealized_pnl,
peak_pnl=self._peak_pnl,
)
except Exception as e:
logger.error("PnL保存エラー: %s", e)
アプリケーションの再起動時に restore_from_db() を呼ぶことで、「再起動したら日次損失上限がリセットされてしまった」という事故を防ぎます。本番運用では非常に大事なポイントです。
日次目標 / 損失上限の到達判定
def is_daily_target_reached(self) -> bool:
"""日次目標利益に達したか"""
if self._realized_pnl_today >= self._config.daily_profit_target:
return True
return False
def is_daily_loss_limit(self) -> bool:
"""日次損失上限に達したか"""
if self._realized_pnl_today <= -self._config.max_loss_per_day:
return True
return False
これらのメソッドをエンジン側で定期的にチェックし、日次目標 (+100,000円) に達したらその日の新規エントリーを停止する、日次損失上限 (-50,000円) に達したら全ポジションを強制決済する、といった制御が可能になります。
6. 動的ポジションサイジング -- 信頼度ベースの枚数決定
なぜ動的サイジングか
「毎回1枚ずつ」という固定枚数の運用はシンプルですが、最適ではありません。確信度の高いシグナルでは多めに、低いシグナルでは少なめに 張ることで、期待値を最大化できます。
ポジションサイザーは 4つのスコアリング基準 で信頼度(confidence)を 0.0〜1.0 で算出し、発注枚数に変換します。
設定パラメータ
@dataclass
class PositionSizerConfig:
enabled: bool = True
min_qty: int = 1 # 最小枚数
max_qty: int = 3 # 最大枚数
weight_trend: float = 0.3 # トレンド強度の重み
weight_confluence: float = 0.3 # インジケータ合致度の重み
weight_daily_pnl: float = 0.2 # 日次損益の重み
weight_volatility: float = 0.2 # ボラティリティの重み
confidence_threshold: float = 0.6 # この閾値未満は最小枚数
4つの重みの合計が 1.0 になるように設定するのがポイントです。
全体の流れ
class PositionSizer:
"""シグナルの context と日次損益から発注枚数を決定する"""
def __init__(self, config: PositionSizerConfig) -> None:
self._config = config
def calculate_qty(self, signal: Signal, daily_pnl: float = 0.0) -> int:
# 決済注文はサイジング対象外(元の qty をそのまま返す)
if not self._config.enabled:
return signal.order_request.qty if signal.order_request else 1
if signal.order_request and signal.order_request.trade_type == TradeType.CLOSE:
return signal.order_request.qty
ctx = signal.context or {}
is_buy = signal.signal_type == SignalType.BUY
trend = self._score_trend(ctx.get("sma_spread", 0.0))
confluence = self._score_confluence(ctx, is_buy)
pnl = self._score_daily_pnl(daily_pnl)
vol = self._score_volatility(ctx.get("bb_width", 0.0))
w = self._config
confidence = (
w.weight_trend * trend
+ w.weight_confluence * confluence
+ w.weight_daily_pnl * pnl
+ w.weight_volatility * vol
)
confidence = max(0.0, min(1.0, confidence))
qty = self._map_qty(confidence)
return qty
信頼度の計算式を数式で表すと以下のようになります。
$$ \text{confidence} = 0.3 \times S_{\text{trend}} + 0.3 \times S_{\text{confluence}} + 0.2 \times S_{\text{pnl}} + 0.2 \times S_{\text{vol}} $$
各スコア $S$ はすべて 0.0〜1.0 の範囲に正規化されます。
4つのスコアリング関数
それぞれの関数を詳しく見ていきましょう。
(1) トレンド強度スコア
@staticmethod
def _score_trend(sma_spread: float) -> float:
"""トレンド強度: SMA乖離率の絶対値で判定。
0.5%未満 → 0.0, 1.5%以上 → 1.0, 間は線形補間。
"""
abs_spread = abs(sma_spread) * 100 # %換算
if abs_spread < 0.5:
return 0.0
if abs_spread >= 1.5:
return 1.0
return (abs_spread - 0.5) / 1.0
SMAの短期と長期の乖離率(スプレッド)が大きいほど、はっきりしたトレンドが出ていることを意味します。0.5% 未満はノイズと見なしてスコア 0、1.5% 以上は明確なトレンドとしてスコア 1.0 です。
(2) インジケータ合致度スコア
@staticmethod
def _score_confluence(ctx: dict, is_buy: bool) -> float:
"""インジケータ合致度: RSI方向 + MACD方向 + BB位置の一致数 / 3。"""
matches = 0
# RSI: 買いなら50以下(売られすぎ寄り)、売りなら50以上
rsi = ctx.get("rsi", 50.0)
if is_buy and rsi < 50:
matches += 1
elif not is_buy and rsi > 50:
matches += 1
# MACD_hist: 買いならプラス、売りならマイナス
macd_hist = ctx.get("macd_hist", 0.0)
if is_buy and macd_hist > 0:
matches += 1
elif not is_buy and macd_hist < 0:
matches += 1
# BB位置: 買いなら下半分(< 0.5)、売りなら上半分(> 0.5)
bb_pos = ctx.get("bb_position", 0.5)
if is_buy and bb_pos < 0.5:
matches += 1
elif not is_buy and bb_pos > 0.5:
matches += 1
return matches / 3.0
3つの指標がシグナルの方向と一致しているかを数えます。3つ中3つ一致なら 1.0、2つなら約0.67、1つなら約0.33 です。
(3) 日次損益スコア
@staticmethod
def _score_daily_pnl(daily_pnl: float) -> float:
"""+3万以上 → 1.0, 0〜3万 → 0.5, マイナス → 0.0。"""
if daily_pnl < 0:
return 0.0
if daily_pnl >= 30_000:
return 1.0
return 0.5
「勝っている日はもう少し攻めてよい、負けている日は守りに入る」という発想です。マイナスの日はスコアが 0 になり、枚数が自動的に抑制されます。
(4) ボラティリティスコア
@staticmethod
def _score_volatility(bb_width: float) -> float:
"""BB幅が狭い(スクイーズ) → 1.0, 広い → 0.0。
bb_width < 0.02 → 1.0, > 0.06 → 0.0, 間は線形。
"""
if bb_width <= 0.02:
return 1.0
if bb_width >= 0.06:
return 0.0
return 1.0 - (bb_width - 0.02) / 0.04
ボリンジャーバンドが「スクイーズ(収縮)」しているとき、つまりボラティリティが低いときに高スコアを返します。スクイーズの後はブレイクアウトが起きやすく、トレンドフォロー戦略にとって好条件だからです。
信頼度 → 枚数のマッピング
def _map_qty(self, confidence: float) -> int:
"""confidence → qty のマッピング。
confidence < threshold → min_qty
threshold 以上 → min_qty から max_qty へ線形補間。
"""
cfg = self._config
if confidence < cfg.confidence_threshold:
return cfg.min_qty
ratio = (confidence - cfg.confidence_threshold) / (1.0 - cfg.confidence_threshold)
ratio = max(0.0, min(1.0, ratio))
qty = cfg.min_qty + ratio * (cfg.max_qty - cfg.min_qty)
return max(cfg.min_qty, min(cfg.max_qty, math.floor(qty + 0.5)))
閾値(デフォルト 0.6)を下回ると最小枚数(1枚)、閾値以上は 1〜3枚に線形マッピングされます。
具体例で理解する
実際の数値で計算してみましょう。
ケース: 好条件のロングシグナル
context:
- sma_spread: 0.012 (1.2%) → SMAが1.2%乖離
- rsi: 42 → 50以下 = 買い方向と一致
- macd_hist: 15.5 → プラス = 買い方向と一致
- bb_position: 0.35 → 下半分 = 買い方向と一致
- bb_width: 0.025 → スクイーズ気味
daily_pnl: +15,000円(本日プラス)
各スコアを計算します。
(1) トレンド強度:
abs_spread = 1.2% → (1.2 - 0.5) / 1.0 = 0.70
(2) インジケータ合致度:
RSI < 50 ✓, MACD_hist > 0 ✓, BB_pos < 0.5 ✓
matches = 3/3 = 1.00
(3) 日次損益:
15,000円 → 0〜30,000円の範囲 → 0.50
(4) ボラティリティ:
bb_width = 0.025 → 1.0 - (0.025 - 0.02) / 0.04 = 0.875
信頼度を計算します。
confidence = 0.3 * 0.70 + 0.3 * 1.00 + 0.2 * 0.50 + 0.2 * 0.875
= 0.21 + 0.30 + 0.10 + 0.175
= 0.785
枚数にマッピングします。
confidence = 0.785 >= threshold (0.6)
ratio = (0.785 - 0.6) / (1.0 - 0.6) = 0.4625
qty = 1 + 0.4625 * (3 - 1) = 1 + 0.925 = 1.925 → 四捨五入 → 2枚
好条件が揃ったので、最小の1枚ではなく 2枚 でエントリーすることになります。
7. 戦略の ON/OFF 制御
ダッシュボードから戦略を動的にON/OFFできるのは、運用上とても便利です。
Strategy 基底クラスの toggle()
class Strategy(ABC):
def __init__(self, name: str, enabled: bool = True) -> None:
self.name = name
self.enabled = enabled
def toggle(self, enabled: bool) -> None:
self.enabled = enabled
シンプルに enabled フラグを切り替えるだけです。各メソッドの冒頭で if not self.enabled: return None をチェックしているため、OFF にした瞬間から新しいシグナルは一切生成されなくなります。
実際のガード処理
FuturesTrendStrategy の各メソッドでは、以下のようにガードが入っています。
async def on_tick(self, tick: Tick) -> Signal | None:
if tick.symbol != self._symbol or not self.enabled:
return None
# ...
async def on_bar(self, bar: dict) -> Signal | None:
if not self.enabled:
return None
# ...
async def generate_signal(self) -> Signal | None:
if not self.enabled or len(self._prices) < self._config.sma_long + 10:
return None
# ...
ここで一点注意が必要なのは、戦略をOFFにしても既存のポジションは自動的に決済されない ということです。「戦略OFF = 新規シグナル停止」であり、ポジション管理は別の責務(エンジンやリスクマネージャー)が担当します。
htmx のダッシュボードからは、以下のようなリクエストで戦略を切り替えられます。
<button
hx-post="/api/strategy/futures_trend/toggle"
hx-vals='{"enabled": false}'
hx-swap="outerHTML"
class="btn btn-warning"
>
戦略を停止
</button>
ダッシュボードの戦略管理パネルでは、各戦略に ON/OFF トグルスイッチが配置されており、現在のシグナル状態やインジケータ値がリアルタイムで表示されます。
8. まとめと次回予告
まとめ
今回は自動売買システムの「頭脳」部分を構築しました。要点を整理します。
- Strategy 抽象基底クラス は
on_tick()/on_bar()/generate_signal()の3つのメソッドを定義し、あらゆるスタイルの戦略を統一的に扱えるようにした - テクニカル指標モジュール(indicators.py) では RSI / SMA / MACD / ボリンジャーバンドの4指標を実装。すべて pandas の DataFrame に列として追加する方式で、パラメータは設定ファイルから注入できる
- 先物トレンドフォロー戦略 は SMA クロス + RSI フィルター + MACD 確認の3条件複合シグナルで、単一指標のダマシを低減している
- リスクマネージャー は日次損失上限(5万円)・ポジション上限(先物5枚)・損切り(3%)・利確(5%)・トレーリングストップ(ピークから1.5%下落)の多層防御を実現。DB連携で再起動時も状態を復元できる
- ポジションサイザー はトレンド強度・インジケータ合致度・日次損益・ボラティリティの4軸で信頼度(0.0〜1.0)を算出し、1〜3枚の範囲で動的に枚数を調整する
- 戦略の ON/OFF は
toggle()メソッドで即座に切り替え可能。ダッシュボードの htmx ボタンからリアルタイム制御できる
次回予告
次回の第8回(最終回) は「バックテストと本番デプロイ」です!
- 今回作った戦略を過去データでバックテストする仕組み
- Plotly でバックテスト結果を可視化する方法
- Docker Compose による本番環境のデプロイ
- systemd でのプロセス管理と自動再起動
- 本番運用で気をつけるべきこと
いよいよ自動売買システムが本番稼働します。最後まで読んでくださりありがとうございました! 次回もお楽しみに!