Sync
Handbook
【第1回】環境構築とプロジェクト設計【第4回】DB設計とAlembicマイグレーション
【第2回】htmx でSPA風ダッシュボードを作る【第3回】ダークテーマUIとコンポーネント設計
【第5回】証券APIとの接続npm install さようなら — FastAPI + htmx でトレーディングシステムを作るnpm install さようなら — FastAPI + htmx でトレーディングシステムを作る
npm install さようなら — FastAPI + htmx でトレーディングシステムを作る
Zennライクな記事プラットフォームを自作するZennライクな記事プラットフォームを自作するClaude Code「スキル」と「エージェント」徹底解説
Google GeminiとImagen 4の全貌
OpenAI / ChatGPTAI & 機械学習アイデア & ノート

その他

Zennライクな記事プラットフォームを自作するGemini画像生成技術のレビュー

© 2026 Sync. All rights reserved.

Tech
2026/2/12

npm install さようなら — FastAPI + htmx でトレーディングシステムを作る

【第6回】トレーディングエンジン — 司令塔の設計

npm install さようなら — FastAPI + htmx でトレーディングシステムを作る

このシリーズについて

「npm install さようなら — FastAPI + htmx でトレーディングシステムを作る」シリーズの第6回へようこそ!

本シリーズは全8回構成で、日経225先物・オプションの自動売買ダッシュボードをゼロから構築していきます。

# タイトル 状態
1 環境構築とプロジェクト設計 公開済み
2 htmx でSPA風ダッシュボードを作る 公開済み
3 ダークテーマUIとコンポーネント設計 公開済み
4 DB設計とAlembicマイグレーション 公開済み
5 証券APIとの接続 公開済み
6 トレーディングエンジン — 司令塔の設計(本記事) 今ここ
7 戦略とリスク管理 近日公開
8 バックテストと本番デプロイ 近日公開

前提: 第1〜4回を読んでいることを前提としますので、未読の方はそちらから読み進めてください。特に第4回のDB設計(リポジトリパターン)と第5回の証券API接続の知識が本記事で活きてきます。


目次

  1. エンジンの全体アーキテクチャ
  2. EngineState — ステートマシン設計
  3. ライフサイクル — start() から stop() まで
  4. バックグラウンドタスク群 — エンジンの心臓部
  5. シグナル処理パイプライン
  6. OrderManager — 注文の一生を管理する
  7. 損切り最優先アーキテクチャ
  8. app.state との連携
  9. まとめと次回予告

1. エンジンの全体アーキテクチャ

まずはトレーディングエンジンの全体像を俯瞰しましょう。このエンジンは、前回(第5回)で構築した証券API接続層の上に乗る「司令塔」です。市場データの受信からシグナル生成、リスクチェック、発注まで、すべてのフローを統括します。

┌─────────────────────────────────────────────────────────────┐
│                     TradingEngine(司令塔)                    │
│                                                             │
│  ┌──────────┐   ┌──────────────┐   ┌───────────────────┐   │
│  │EngineState│   │  戦略群        │   │  RiskManager      │   │
│  │ステート    │   │ FuturesTrend  │   │  損切り/利確/      │   │
│  │マシン     │   │ OptionSeller  │   │  トレーリング/     │   │
│  │          │   │ DeltaHedge    │   │  日次損失上限      │   │
│  └──────────┘   └──────┬───────┘   └─────────┬─────────┘   │
│                        │  Signal               │ RiskCheck   │
│                        v                       v             │
│              ┌──────────────────────────┐                    │
│              │  _process_signal()       │                    │
│              │  シグナル処理パイプライン    │                    │
│              └────────────┬─────────────┘                    │
│                           │ OrderRequest                     │
│                           v                                  │
│              ┌──────────────────────────┐                    │
│              │  OrderManager            │                    │
│              │  発注 / 取消 / 約定追跡    │                    │
│              └────────────┬─────────────┘                    │
│                           │                                  │
├───────────────────────────┼──────────────────────────────────┤
│                           v                                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │               MarketDataService                      │    │
│  │  KabuWebSocket(PUSH) ──> ティック ──> コールバック     │    │
│  └─────────────────────────────────────────────────────┘    │
│                           │                                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │               KabuApi(REST)                        │    │
│  │  認証 / 銘柄解決 / 板情報 / 発注 / ポジション取得      │    │
│  └─────────────────────────────────────────────────────┘    │
├─────────────────────────────────────────────────────────────┤
│  バックグラウンドタスク群                                      │
│  [注文同期 5s] [ポジション監視 3s] [バー確定 60s] [OP戦略]    │
└─────────────────────────────────────────────────────────────┘

データの流れは 下から上 です。WebSocketで受信したティックデータが MarketDataService を経由して戦略に届き、戦略がシグナルを生成し、リスクチェックを通過した注文だけが OrderManager 経由でブローカーに送信されます。


2. EngineState -- ステートマシン設計

トレーディングエンジンのような長時間稼働するシステムでは、「今エンジンがどういう状態にあるのか」を厳密に管理することが極めて重要です。起動中にもう一度起動を叩いたり、停止中に注文が流れたりしたら大変ですよね。

本システムでは、エンジンの状態を 5つの EngineState で管理しています。

class EngineState(str, Enum):
    STOPPED = "stopped"      # 停止中
    STARTING = "starting"    # 起動処理中
    RUNNING = "running"      # 稼働中
    STOPPING = "stopping"    # 停止処理中
    ERROR = "error"          # エラー発生

状態遷移は次のとおりです。

                  start()
  STOPPED ──────────────> STARTING
     ^                       │
     │                       │ 成功
     │                       v
     │                    RUNNING
     │                       │
     │         stop()        │
     │    <──────────── STOPPING
     │
     │    start()失敗
     └──────────────── ERROR
                          │
                          │ start() で復帰可能
                          v
                       STARTING ...

ポイントは str を継承した Enum にしていることです。これにより、JSON シリアライズが自動的に文字列になるため、FastAPI のレスポンスや htmx のSSE配信でそのまま使えます。第2回で作ったダッシュボードに state: "running" のような形でリアルタイム表示できるわけですね。

もう一つ注目してほしいのが、STARTING と STOPPING という 中間状態 の存在です。認証やWebSocket接続には数秒かかるため、その間に重複起動されないようガードしています。

async def start(self) -> None:
    """エンジン起動: 認証 → 銘柄解決 → 戦略初期化 → バックグラウンドタスク開始"""
    if self._state == EngineState.RUNNING:
        logger.warning("エンジンは既に稼働中です")
        return

    self._state = EngineState.STARTING
    self._error_message = ""
    try:
        # ... 起動処理 ...
        self._state = EngineState.RUNNING
    except Exception as e:
        self._state = EngineState.ERROR
        self._error_message = str(e)
        await self._cleanup()
        raise

RUNNING の場合は早期リターンし、try/except で失敗時は ERROR に遷移してクリーンアップを行います。_error_message にエラー内容を保持しているので、ダッシュボードのUI側でユーザーに何が起きたか表示できます。


3. ライフサイクル -- start() から stop() まで

3.1 start() -- 8ステップの起動シーケンス

start() メソッドは、エンジンの「心臓を動かし始める」処理です。8つのステップを順番に実行していきます。

async def start(self) -> None:
    self._state = EngineState.STARTING

    # 1. ブローカー認証
    self._broker = KabuApi(self._config.broker)
    await self._broker.authenticate()

    # 2. 先物銘柄解決(期近限月の自動取得)
    product = "NK225mini" if self._config.strategy.futures_trend.product == "mini" else "NK225"
    info = await self._broker.get_symbol_info_future(0, product)
    self._futures_symbol = info.symbol

    # 3. PUSH配信の登録
    await self._broker.register([
        {"Symbol": self._futures_symbol, "Exchange": 2}
    ])

    # 4. サービス初期化(DB リポジトリの注入)
    self._ws = KabuWebSocket()
    self._risk_manager = RiskManager(self._config.risk, pnl_repo=pnl_repo)
    await self._risk_manager.restore_from_db()
    self._order_manager = OrderManager(self._broker, self._risk_manager, order_repo=order_repo)
    self._market_data = MarketDataService(self._broker, self._ws, tick_repo=tick_repo)

    # 5. 戦略初期化
    self._init_strategies()

    # 6. 既存ポジション復元
    await self._restore_positions()

    # 7. ティックコールバック登録 → MarketData開始
    self._market_data.on_tick(self._on_tick)
    await self._market_data.start()

    # 8. バックグラウンドタスク起動
    self._tasks = [
        asyncio.create_task(self._order_sync_loop()),
        asyncio.create_task(self._position_monitor_loop()),
        asyncio.create_task(self._bar_builder_loop()),
    ]

    self._state = EngineState.RUNNING
    self._started_at = datetime.now()

各ステップの役割を整理します。

ステップ 処理 解説
1 ブローカー認証 kabuステーションAPIにトークンを取得
2 銘柄解決 get_symbol_info_future(0, ...) で期近限月の銘柄コードを自動取得
3 PUSH登録 WebSocketでリアルタイムティックを受信するための銘柄登録
4 サービス初期化 リスク管理・注文管理・市場データの各サービスをDIで組み立て
5 戦略初期化 先物トレンド・オプション売り・デルタヘッジの3戦略を構築
6 ポジション復元 ブローカーから既存ポジションを取得し、戦略の状態にセット
7 ティック配信開始 WebSocket接続を開始し、コールバックチェーンを構築
8 バックグラウンドタスク 注文同期・ポジション監視・バー確定のループを起動

特に ステップ6のポジション復元 は見落としがちですが非常に重要です。エンジンを再起動した際に、既に保有しているポジションの存在を戦略に伝えないと、「ポジションがあるのに新規エントリーしてしまう」という事故が起きます。

async def _restore_positions(self) -> None:
    """既存ポジションからの戦略状態復元"""
    positions = await self._broker.get_positions()
    for pos in positions:
        if pos.product_type in (
            ProductType.FUTURES_LARGE,
            ProductType.FUTURES_MINI,
            ProductType.FUTURES_MICRO,
        ):
            if self._futures_strategy and pos.symbol == self._futures_symbol:
                self._futures_strategy.set_position(pos.side, pos.avg_price)
                logger.info(
                    "先物ポジション復元: %s %s @ %.0f",
                    pos.side.value, pos.symbol, pos.avg_price,
                )

3.2 stop() -- 安全な停止処理

停止処理は起動の逆順で、まずバックグラウンドタスクを止めてから、注文の取消やリソースのクリーンアップを行います。

async def stop(self, cancel_orders: bool = True, close_positions: bool = False) -> None:
    if self._state == EngineState.STOPPED:
        return

    self._state = EngineState.STOPPING

    # 1. バックグラウンドタスク停止
    for task in self._tasks:
        task.cancel()
    for task in self._tasks:
        try:
            await task
        except asyncio.CancelledError:
            pass
    self._tasks.clear()

    # 2. 注文取消(オプション)
    if cancel_orders and self._order_manager:
        count = await self._order_manager.cancel_all()
        logger.info("注文取消: %d件", count)

    # 3. 全決済(オプション)
    if close_positions:
        await self._close_all_positions()

    # 4. リソースクリーンアップ
    await self._cleanup()
    self._state = EngineState.STOPPED

ここで注目してほしいのが、stop() の引数です。

  • cancel_orders=True: デフォルトで未約定の注文をすべて取り消します
  • close_positions=False: デフォルトではポジションは保持します。明示的に True を渡したときだけ全決済します

なぜデフォルトで全決済しないのか? それは、エンジンの再起動やメンテナンス時にポジションを意図せず手仕舞いしたくないケースが多いからです。全決済は「日次終了時」や「緊急停止時」だけ使います。

タスクのキャンセルでは、task.cancel() を呼んでから await task で CancelledError を拾っています。これはPythonの asyncio で非同期タスクを安全に停止するイディオムです。各バックグラウンドタスク内でも except asyncio.CancelledError: return でキャンセルを正しくハンドリングしています。


4. バックグラウンドタスク群 -- エンジンの心臓部

エンジンが RUNNING 状態の間、複数のバックグラウンドタスクが並行して動き続けます。これらは asyncio.create_task() で起動される非同期ループです。

┌───────────────────────────────────────────────────────┐
│              バックグラウンドタスク群                      │
│                                                       │
│  ┌─────────────┐  ┌─────────────┐  ┌──────────────┐  │
│  │注文同期ループ │  │ポジション    │  │バー確定ループ │  │
│  │_order_sync  │  │監視ループ    │  │_bar_builder  │  │
│  │_loop()      │  │_position    │  │_loop()       │  │
│  │             │  │_monitor     │  │              │  │
│  │ 5秒間隔     │  │_loop()      │  │ 60秒間隔     │  │
│  │             │  │             │  │              │  │
│  │・注文同期    │  │ 3秒間隔     │  │・1分足確定   │  │
│  │・約定検出    │  │             │  │・戦略on_bar  │  │
│  │・日次PnL    │  │・損切り判定  │  │  呼び出し    │  │
│  │  チェック    │  │・トレーリング│  │              │  │
│  │・PnL保存    │  │  ストップ   │  │              │  │
│  │             │  │・利確判定    │  │              │  │
│  └─────────────┘  └─────────────┘  └──────────────┘  │
│                                                       │
│  ┌─────────────────────────────────────────────────┐  │
│  │ オプション戦略ループ _options_loop()              │  │
│  │ (config.options.chain_refresh_interval 間隔)     │  │
│  │ ・チェーン取得 ・IV/グリークス計算 ・デルタヘッジ   │  │
│  │ ※ options_seller.enabled=true のときだけ起動      │  │
│  └─────────────────────────────────────────────────┘  │
└───────────────────────────────────────────────────────┘

各タスクを詳しく見ていきましょう。

4.1 注文同期ループ (5秒間隔)

ブローカーの注文状態をポーリングで同期し、約定を検出するタスクです。

async def _order_sync_loop(self) -> None:
    """注文同期ループ(5秒毎)"""
    while True:
        try:
            await asyncio.sleep(5)
            if self._state != EngineState.RUNNING or not self._order_manager:
                continue

            # ブローカーと同期
            prev_filled = set(o.id for o in self._order_manager.filled_orders)
            await self._order_manager.sync_orders()

            # 新規約定の処理
            for order in self._order_manager.filled_orders:
                if order.id not in prev_filled:
                    self._on_order_filled(order)

            # 日次損失上限チェック
            if self._risk_manager and self._risk_manager.is_daily_loss_limit():
                if not self._daily_loss_stopped:
                    self._daily_loss_stopped = True
                    await self._order_manager.cancel_all()
                    self._disable_all_strategies()

            # 日次目標チェック
            if self._risk_manager and self._risk_manager.is_daily_target_reached():
                if not self._daily_target_reached:
                    self._daily_target_reached = True

            # PnL状態をDBに定期保存
            if self._risk_manager:
                unrealized = 0.0
                if self._broker:
                    positions = await self._broker.get_positions()
                    unrealized = sum(p.unrealized_pnl for p in positions)
                await self._risk_manager.save_to_db(unrealized)

        except asyncio.CancelledError:
            return
        except Exception as e:
            logger.error("注文同期エラー: %s", e)

このループには重要な責務が 4つ 詰まっています。

  1. 注文状態の同期: OrderManager.sync_orders() でブローカーのAPIから最新の注文状態を取得
  2. 約定検出: 前回のsync時点からの新規約定を検出し、_on_order_filled() で後処理(通知・戦略状態リセット)
  3. 日次リスクチェック: 損失上限到達で全注文取消 + 全戦略無効化、目標利益到達で新規注文停止
  4. PnL永続化: 実現損益 + 含み損益をDBに定期保存(第4回で設計したリポジトリ経由)

「なぜWebSocketの約定通知ではなくポーリングなのか?」と疑問に思うかもしれません。kabuステーションAPIのPUSH通知は板情報がメインで、注文の約定通知は別途ポーリングが必要だからです。5秒間隔なら十分な応答速度ですし、ポーリングのほうが状態の整合性を保ちやすいというメリットもあります。

4.2 ポジション監視ループ (3秒間隔)

損切りの最前線 がこのタスクです。全ポジションの含み損益を3秒ごとにチェックし、損切り条件に抵触したら即座に成行決済シグナルを出します。

async def _position_monitor_loop(self) -> None:
    """ポジション監視ループ(3秒毎): 損切り最優先"""
    while True:
        try:
            await asyncio.sleep(3)
            if self._state != EngineState.RUNNING or not self._broker or not self._risk_manager:
                continue

            positions = await self._broker.get_positions()

            for pos in positions:
                # 既に決済中のシンボルはスキップ
                if pos.symbol in self._closing_symbols:
                    continue

                # チェック優先順位: 損切り > トレーリング > 利確
                if self._risk_manager.check_stop_loss(pos):
                    await self._emit_close_signal(pos, "損切り")
                elif self._risk_manager.check_trailing_stop(pos):
                    await self._emit_close_signal(pos, "トレーリングストップ")
                elif self._risk_manager.check_take_profit(pos):
                    await self._emit_close_signal(pos, "利確")

        except asyncio.CancelledError:
            return
        except Exception as e:
            logger.error("ポジション監視エラー: %s", e)

if/elif の順番が全てを物語っています。損切り > トレーリングストップ > 利確の順で評価し、損切りが最も高い優先度 を持ちます。なぜこの設計が重要なのかは、セクション7で詳しく解説します。

4.3 バー確定ループ (60秒間隔)

ティックデータ(個々の約定値)を1分足のOHLCV(始値・高値・安値・終値・出来高)に集約し、戦略の on_bar() を呼び出すタスクです。

async def _bar_builder_loop(self) -> None:
    """バー確定ループ: 1分毎に確定した足で戦略のon_bar()を呼ぶ"""
    while True:
        try:
            await asyncio.sleep(self._bar_interval)  # 60秒
            if self._state != EngineState.RUNNING:
                continue

            for symbol, bar in list(self._current_bar.items()):
                if not bar.get("completed"):
                    bar["completed"] = True

                ohlcv = {
                    "open": bar["open"],
                    "high": bar["high"],
                    "low": bar["low"],
                    "close": bar["close"],
                    "volume": bar["volume"],
                }

                # 先物トレンド戦略: エントリーシグナル生成
                if (
                    self._futures_strategy
                    and self._futures_strategy.enabled
                    and symbol == self._futures_symbol
                ):
                    signal = await self._futures_strategy.on_bar(ohlcv)
                    if signal:
                        await self._process_signal(signal)

                self._current_bar.pop(symbol, None)

        except asyncio.CancelledError:
            return

ティック→バーの集約は _update_bar() メソッドが _on_tick() コールバック内で毎ティック呼ばれて行っています。

def _update_bar(self, tick: Tick) -> None:
    """ティックをバーに集約"""
    bar = self._current_bar.get(tick.symbol)
    now = tick.timestamp

    if bar is None or (now - bar["start"]).total_seconds() >= self._bar_interval:
        # 新しいバー開始
        self._current_bar[tick.symbol] = {
            "open": tick.price,
            "high": tick.price,
            "low": tick.price,
            "close": tick.price,
            "volume": tick.volume,
            "start": now,
            "completed": False,
        }
    else:
        bar["high"] = max(bar["high"], tick.price)
        bar["low"] = min(bar["low"], tick.price)
        bar["close"] = tick.price
        bar["volume"] += tick.volume

この二段構えの設計がミソです。ティックが来るたびにバーを更新(O/H/L/C/Vを計算)しつつ、バーの確定判定は独立したループで行います。こうすることで、ティック処理の遅延がバー確定のタイミングに影響しません。


5. シグナル処理パイプライン

戦略が生成した Signal は、すぐに発注されるわけではありません。_process_signal() というパイプラインを通過して、複数のフィルターやチェックを経て初めて注文になります。

Signal(戦略が生成)
  │
  ├─ [1] 日次制限チェック ── 日次目標 or 損失上限に達していたら新規注文を拒否
  │                         ※ 決済注文は常に通過
  │
  ├─ [2] 重複シグナル排除 ── 同一銘柄・同一方向で10秒以内の重複を排除
  │
  ├─ [3] クローズ重複排除 ── 既に決済処理中の銘柄をスキップ
  │
  ├─ [4] シグナルログ記録 ── メモリ(直近50件)+ DB保存
  │
  ├─ [5] ポジションサイジング ── 先物新規注文の枚数を動的に計算
  │
  └─ [6] 発注 ── OrderManager.submit_order() → ブローカーAPI

実際のコードを見てみましょう。

async def _process_signal(self, signal: Signal) -> None:
    """シグナルを処理して発注"""
    if not signal.order_request or not self._order_manager:
        return

    req = signal.order_request
    is_close = req.trade_type == TradeType.CLOSE

    # [1] 日次目標チェック(決済は通す)
    if not is_close and (self._daily_target_reached or self._daily_loss_stopped):
        logger.info("新規注文拒否(日次制限): %s", signal.reason)
        return

    # [2] 重複シグナル排除(同一銘柄・同一方向で10秒以内)
    sig_key = f"{req.symbol}:{req.side.value}:{req.trade_type.value}"
    now = datetime.now()
    last = self._last_signal.get(sig_key)
    if last and (now - last).total_seconds() < 10:
        logger.debug("重複シグナルスキップ: %s", sig_key)
        return
    self._last_signal[sig_key] = now

    # [3] クローズ重複排除
    if is_close:
        if req.symbol in self._closing_symbols:
            return
        self._closing_symbols.add(req.symbol)

    # [4] シグナルログ記録
    self._recent_signals.append({
        "time": now.strftime("%H:%M:%S"),
        "type": signal.signal_type.value,
        "symbol": req.symbol,
        "side": req.side.value,
        "trade_type": req.trade_type.value,
        "reason": signal.reason,
        "strategy": req.strategy_name,
    })
    if len(self._recent_signals) > 50:
        self._recent_signals = self._recent_signals[-50:]

    # [5] ポジションサイジング(先物新規注文のみ)
    if (
        self._position_sizer
        and req.strategy_name == "futures_trend"
        and req.trade_type == TradeType.NEW
    ):
        daily_pnl = self._risk_manager.realized_pnl_today if self._risk_manager else 0.0
        req.qty = self._position_sizer.calculate_qty(signal, daily_pnl)

    # [6] 発注
    order = await self._order_manager.submit_order(req, positions, is_option=is_option)

なぜ決済注文は日次制限をスルーするのか?

is_close のチェックに注目してください。日次損失上限に達しても、決済注文だけは必ず通します。損失上限に達したのに損切りの決済ができない、という状況は本末転倒ですよね。「壊れたブレーキ」にならないための設計です。

重複シグナル排除の仕組み

同じ銘柄・同じ方向のシグナルが10秒以内に連続発生した場合、2回目以降はスキップします。ティックが高頻度で飛んでくる局面では、戦略が立て続けに同じシグナルを出すことがあります。これを素通しすると重複発注になってしまうので、sig_key(銘柄+方向+売買区分)を辞書で管理して排除しています。

_closing_symbols も同じ発想です。損切りシグナルが出て決済注文を送った直後に、次のポジション監視ループでまた損切りシグナルが出る、という「二重決済」を防いでいます。


6. OrderManager -- 注文の一生を管理する

OrderManager は注文のライフサイクル全体を管理するクラスです。発注から約定(またはキャンセル・拒否)まで、注文の状態遷移を追跡します。

6.1 注文の状態遷移

まず、注文がたどる状態遷移を確認しましょう。

class OrderStatus(str, Enum):
    PENDING = "pending"       # 未発注
    SUBMITTED = "submitted"   # 発注済み
    PARTIAL = "partial"       # 一部約定
    FILLED = "filled"         # 全約定
    CANCELLED = "cancelled"   # 取消済み
    REJECTED = "rejected"     # 拒否
    EXPIRED = "expired"       # 期限切れ
                                submit_order()
  PENDING ────────────────────────> SUBMITTED
                                       │
                             ┌─────────┼──────────┐
                             │         │          │
                             v         v          v
                         PARTIAL    FILLED    CANCELLED
                             │                    ^
                             │                    │ cancel()
                             v                    │
                          FILLED             SUBMITTED ───> REJECTED
                                                       ───> EXPIRED

6.2 発注 -- リスクチェックが門番

submit_order() の最初の行が全てを物語っています。

async def submit_order(
    self,
    request: OrderRequest,
    positions: list | None = None,
    is_option: bool = False,
) -> Order | None:
    """注文を発注(リスクチェック付き)"""
    # リスクチェック --- ここが門番
    check = self._risk.check_order(request, positions or [])
    if not check.allowed:
        logger.warning("注文拒否: %s", check.reason)
        return None

    try:
        if is_option:
            order = await self._broker.send_option_order(request)
        else:
            order = await self._broker.send_future_order(request)

        self._active_orders[order.id] = order

        if self._order_repo:
            await self._order_repo.save_order(order)

        return order
    except Exception as e:
        logger.error("注文発注エラー: %s", e)
        return None

RiskManager.check_order() が 門番 として機能し、以下をチェックします。

  • 日次損失上限: 既に上限に達していたら発注させない
  • ポジション上限: 先物・オプションそれぞれの最大枚数を超えないか
  • 証拠金チェック: 証拠金が足りるか(将来の拡張ポイント)

この「OrderManagerは必ずRiskManagerを経由する」という設計が、後述する損切り最優先アーキテクチャの基盤になっています。

6.3 注文同期 -- ブローカーとの状態合わせ

async def sync_orders(self) -> None:
    """ブローカーの注文状態と同期"""
    broker_orders = await self._broker.get_orders()
    for bo in broker_orders:
        if bo.id in self._active_orders:
            local = self._active_orders[bo.id]
            local.status = bo.status
            local.filled_qty = bo.filled_qty
            local.filled_price = bo.filled_price
            local.updated_at = datetime.now()

            if self._order_repo:
                await self._order_repo.update_order(local)

            if bo.status == OrderStatus.FILLED:
                self._active_orders.pop(bo.id)
                self._filled_orders.append(local)
            elif bo.status in (
                OrderStatus.CANCELLED,
                OrderStatus.REJECTED,
                OrderStatus.EXPIRED,
            ):
                self._active_orders.pop(bo.id)

ローカル(_active_orders 辞書)とリモート(ブローカーAPI)を照合し、ステータスの変化を検出します。約定した注文は _active_orders から _filled_orders に移動、取消・拒否・期限切れは _active_orders から削除されます。

この同期のたびにDBへも永続化しているので、エンジンが途中で落ちても注文履歴は失われません。

6.4 全注文取消

async def cancel_all(self) -> int:
    """全アクティブ注文を取消"""
    count = 0
    for order_id in list(self._active_orders.keys()):
        if await self.cancel(order_id):
            count += 1
    return count

cancel_all() は日次損失上限到達時やエンジン停止時に呼ばれます。list() でコピーを取ってからイテレートしているのは、ループ中に辞書が変更されるのを防ぐためです。Pythonの定番パターンですね。


7. 損切り最優先アーキテクチャ

ここが本記事で最も伝えたいポイントです。

自動売買システムにおいて、損切りは他の全てに優先します。利確よりも、新規エントリーよりも、何よりも。その理由をトレーダーの視点で説明します。

なぜ損切りが最優先なのか

相場には「利益は逃げないが、損失は爆発する」という格言があります。特に日経225先物のようなレバレッジ商品では、1ティック(5円)の動きが数千円の損益になります。

例を出しましょう。日経225ミニ先物を39,000円で1枚買ったとします。

  • 損切りラインを38,500円(-500円 = -50,000円の損失)に設定
  • 利確ラインを39,500円(+500円 = +50,000円の利益)に設定

ここで、急落が起きたとします。38,500円を一瞬で割り込んで38,000円まで落ちた場合、損切りが1秒遅れただけで損失は 50,000円 → 100,000円 と倍増します。

一方、利確が1秒遅れても? 39,500円から39,400円に戻っただけで、まだ40,000円の利益が残っています。

この非対称性が、損切り最優先の根拠です。

アーキテクチャレベルでの実装

この思想は、コードの構造そのものに組み込まれています。

優先度 1: 日次損失上限 → 全停止・全注文取消(_order_sync_loop)
優先度 2: 個別損切り   → 即座に成行CLOSE(_position_monitor_loop)
優先度 3: トレーリング → 利益確保しつつ追従(_position_monitor_loop)
優先度 4: 利確         → 目標到達でCLOSE(_position_monitor_loop)
優先度 5: 日次目標利益 → 新規注文停止(_order_sync_loop)

この優先順位は trading_engine.py の冒頭コメントにも明記されています。

"""ライブトレーディングエンジン(司令塔)

損切り最優先の設計:
  1. 日次損失上限 → 全停止・全注文取消
  2. 個別損切り (stop_loss_pct) → 即座に成行CLOSE
  3. トレーリングストップ → 利益確保しつつ追従
  4. 利確 (take_profit_pct) → 目標到達でCLOSE
  5. 日次目標利益 → 新規注文停止(決済は許可)
"""

具体的にどう実装されているか、3つの層で見ていきましょう。

層1: ポジション監視ループの if/elif 順序

# チェック優先順位: 損切り > トレーリング > 利確
if self._risk_manager.check_stop_loss(pos):
    await self._emit_close_signal(pos, "損切り")
elif self._risk_manager.check_trailing_stop(pos):
    await self._emit_close_signal(pos, "トレーリングストップ")
elif self._risk_manager.check_take_profit(pos):
    await self._emit_close_signal(pos, "利確")

if/elif なので、損切り条件を満たしていたらトレーリングストップや利確の判定はそもそも実行されません。

層2: シグナルパイプラインでの決済優遇

# 日次目標チェック(決済は通す)
if not is_close and (self._daily_target_reached or self._daily_loss_stopped):
    logger.info("新規注文拒否(日次制限): %s", signal.reason)
    return

日次の損失上限に達しても、決済注文は必ず通過します。損切りが日次制限でブロックされることは絶対にありません。

層3: 日次損失上限の即時対応

if self._risk_manager.is_daily_loss_limit():
    if not self._daily_loss_stopped:
        self._daily_loss_stopped = True
        await self._order_manager.cancel_all()     # 全注文取消
        self._disable_all_strategies()              # 全戦略無効化

日次損失上限に到達した瞬間、cancel_all() で未約定の注文を全て取消し、_disable_all_strategies() で全ての戦略を無効化します。もうこれ以上の損失は出させない、という強い意志の表れです。

決済注文のフロー

損切り(や利確)が発動するとき、_emit_close_signal() が呼ばれます。

async def _emit_close_signal(self, pos, reason: str) -> None:
    """ポジションに対するCLOSEシグナルを発行"""
    exit_side = Side.SELL if pos.side == Side.BUY else Side.BUY
    signal = Signal(
        signal_type=SignalType.SELL if exit_side == Side.SELL else SignalType.BUY,
        symbol=pos.symbol,
        reason=reason,
        strength=1.0,
        order_request=OrderRequest(
            symbol=pos.symbol,
            side=exit_side,
            qty=pos.qty,
            order_type=OrderType.MARKET,     # 成行注文
            trade_type=TradeType.CLOSE,       # 返済
            strategy_name="risk_manager",     # 発信元はリスク管理
        ),
    )
    await self._process_signal(signal)

注目ポイントを整理します。

  • OrderType.MARKET: 損切りは成行注文です。指値にすると滑って約定しないリスクがあるからです
  • TradeType.CLOSE: 返済注文なので、シグナルパイプラインの日次制限チェックをスルーします
  • strategy_name="risk_manager": 戦略ではなくリスク管理が発信元。ログで追跡しやすくなります
  • strength=1.0: 最大強度。迷いなく即座に実行すべきことを表しています

8. app.state との連携

ここまで見てきたトレーディングエンジンは、FastAPIの app.state に格納されて、アプリケーション全体からアクセスできるようになります。

# main.py(起動時)
app.state.engine = TradingEngine(config, notifier)

ダッシュボードのエンドポイントからは request.app.state.engine でアクセスします。

@router.get("/api/engine/status")
async def engine_status(request: Request):
    engine: TradingEngine = request.app.state.engine
    return engine.get_status()

get_status() は、ダッシュボードが必要とする情報を辞書で返します。

def get_status(self) -> dict:
    return {
        "state": self._state.value,
        "error": self._error_message,
        "started_at": self._started_at.isoformat() if self._started_at else None,
        "futures_symbol": self._futures_symbol_name or self._futures_symbol,
        "active_orders": len(self._order_manager.active_orders) if self._order_manager else 0,
        "realized_pnl": self._risk_manager.realized_pnl_today if self._risk_manager else 0,
        "daily_target_reached": self._daily_target_reached,
        "daily_loss_stopped": self._daily_loss_stopped,
        "strategies": {
            "futures_trend": self._futures_strategy.enabled if self._futures_strategy else False,
            "options_seller": self._options_strategy.enabled if self._options_strategy else False,
            "delta_hedge": self._delta_hedge.enabled if self._delta_hedge else False,
        },
    }

htmxのポーリング(hx-trigger="every 2s")でこのエンドポイントを叩けば、ダッシュボードにリアルタイムでエンジンの状態が反映されます。第2回で構築したhtmxの仕組みが、ここで威力を発揮するわけですね。

エンジンの起動・停止もエンドポイント経由で行います。

@router.post("/api/engine/start")
async def start_engine(request: Request):
    engine: TradingEngine = request.app.state.engine
    await engine.start()
    return {"status": "started"}

@router.post("/api/engine/stop")
async def stop_engine(request: Request):
    engine: TradingEngine = request.app.state.engine
    await engine.stop()
    return {"status": "stopped"}

app.state をハブにすることで、エンジンの内部状態がHTTPエンドポイント、SSE配信、テンプレートレンダリングなど、どこからでも統一的にアクセスできます。グローバル変数を使うよりも遥かにクリーンですし、テスト時にはモックを注入するのも容易です。


9. まとめと次回予告

まとめ

今回はトレーディングエンジンの設計を深掘りしました。重要なポイントを整理します。

  • EngineState: 5つの状態(STOPPED / STARTING / RUNNING / STOPPING / ERROR)で管理するステートマシン。str を継承した Enum でJSON互換にし、htmxダッシュボードとシームレスに連携
  • start() の8ステップ: 認証 → 銘柄解決 → PUSH登録 → サービスDI → 戦略初期化 → ポジション復元 → ティック配信開始 → バックグラウンドタスク起動。特にポジション復元は再起動時の事故防止に不可欠
  • stop() の安全設計: デフォルトで全注文取消するが、全決済は明示的に指定したときだけ実行。タスクのキャンセルは asyncio のイディオムに従う
  • 4つのバックグラウンドタスク: 注文同期(5秒)・ポジション監視(3秒)・バー確定(60秒)・オプション戦略。それぞれ独立した間隔で非同期に並行動作
  • シグナル処理パイプライン: 日次制限チェック → 重複排除 → ログ記録 → ポジションサイジング → 発注。決済注文は日次制限をスルーする
  • OrderManager: RiskManagerを門番として組み込み、リスクチェックを通過した注文だけを発注。ブローカーとの状態同期でローカルとリモートの整合性を保つ
  • 損切り最優先: if/elif の順序、決済注文の日次制限スルー、日次損失上限での全停止という3つの層で、損切りが他の全てに優先することをアーキテクチャレベルで保証

次回予告

第7回: 戦略とリスク管理 では、今回のエンジンが呼び出す「戦略」と「リスク管理」の中身に踏み込みます。

  • 先物トレンド戦略: EMA(指数移動平均)クロスとATR(真の値幅)を使ったエントリー・エグジット
  • オプション売り戦略: プット売りの銘柄選定ロジックとIV(インプライドボラティリティ)による行使価格選択
  • デルタヘッジ: ポートフォリオ全体のデルタをニュートラルに保つ先物ヘッジ
  • RiskManager の全メソッド解説: 損切り・利確・トレーリングストップの判定ロジック
  • PositionSizer: 日次損益に応じた動的な枚数調整

今回のエンジンが「何をするか」を決める部分なので、トレーディングの面白さが一番詰まった回になると思います。お楽しみに!

最後まで読んでくださりありがとうございました!