【第5回】証券APIとの接続
pm install さようなら — FastAPI + htmx でトレーディングシステムを作る

このシリーズについて
「npm install さようなら — FastAPI + htmx でトレーディングシステムを作る」シリーズの第5回へようこそ!
本シリーズは全8回構成で、日経225先物・オプションの自動売買ダッシュボードをゼロから構築していきます。
| # | タイトル | 状態 |
|---|---|---|
| 1 | 環境構築とプロジェクト設計 | 公開済み |
| 2 | htmx でSPA風ダッシュボードを作る | 公開済み |
| 3 | ダークテーマUIとコンポーネント設計 | 公開済み |
| 4 | DB設計とAlembicマイグレーション | 公開済み |
| 5 | 証券APIとの接続(本記事) | 今ここ |
| 6 | トレーディングエンジン — 司令塔の設計 | 近日公開 |
| 7 | 戦略とリスク管理 | 近日公開 |
| 8 | バックテストと本番デプロイ | 近日公開 |
前提: 第1〜4回を読んでいることを前提に進めますので、まだの方は先にそちらをご覧ください。
目次
- 今回のゴール
- kabuステーションAPI連携の全体像
- ドメインモデルの設計
- ブローカー抽象基底クラス — base.py
- 指数バックオフ付きリトライ機構 — retry.py
- REST API クライアント実装 — kabu_api.py
- WebSocket PUSH リアルタイム配信 — kabu_ws.py
- 市場データサービス — market_data.py
- 設定ファイルと環境変数
- まとめと次回予告
1. 今回のゴール
前回(第4回)まででDB層が完成し、注文・約定・ティックデータを永続化できるようになりました。しかし、肝心の証券会社との接続がまだありません。
今回はいよいよkabuステーションAPIとの連携を構築します。ダッシュボードが「飾り」から「実際にお金が動く本番システム」へと変わる、シリーズの大きなターニングポイントです。
具体的に作るものは以下のとおりです。
- ブローカー抽象基底クラス: 将来的に他の証券APIにも差し替え可能な設計
- REST APIクライアント: 認証・注文発注/取消・板情報取得・ポジション管理
- WebSocketクライアント: リアルタイム板情報のPUSH受信
- リトライ機構: 指数バックオフによる堅牢なエラーハンドリング
- 市場データサービス: ティック配信・DB保存・コールバック管理
完成後のディレクトリ構造は次のようになります。
src/systrade/
├── broker/
│ ├── __init__.py
│ ├── base.py # ブローカー抽象基底クラス
│ ├── kabu_api.py # kabuステーション REST クライアント
│ ├── kabu_ws.py # kabuステーション WebSocket クライアント
│ └── retry.py # 指数バックオフ付きリトライデコレータ
├── models/
│ ├── market.py # Board, Tick, SymbolInfo 等
│ ├── order.py # Order, OrderRequest 等
│ └── position.py # Position, PortfolioSummary
├── services/
│ └── market_data.py # 市場データ取得・キャッシュ・配信
└── config.py # 設定(BrokerConfig 含む)
全体の構成としては、KabuApi(REST)で認証・注文・板情報取得などのリクエスト/レスポンス型の操作を行い、KabuWebSocketでリアルタイムの板情報PUSH配信を受信します。その上にMarketDataServiceがデータの集約・キャッシュ・配信を担う3層構造です。
2. kabuステーションAPI連携の全体像
kabuステーションAPIは、auカブコム証券が提供するローカルREST + WebSocket APIです。PC上で動作する「kabuステーション」アプリケーションがローカルサーバーとして稼働し、localhost:18080 でリクエストを受け付けます。
2つの通信方式
kabuステーションAPIには大きく2つの通信方式があります。
| 方式 | 用途 | エンドポイント例 |
|---|---|---|
| REST API | 認証・注文・板情報取得・ポジション取得 | POST /token, POST /sendorder/future |
| WebSocket | リアルタイム板情報のPUSH配信 | ws://localhost:18080/kabusapi/websocket |
REST APIで「リクエストすれば返ってくる」類の操作を行い、WebSocketで「値動きをリアルタイムに受け取り続ける」という使い分けです。
APIの主要エンドポイント
今回実装するエンドポイントを整理しておきましょう。
■ 認証
POST /token → APIトークン取得
■ 銘柄情報
GET /symbolname/future → 先物の銘柄コード取得
GET /symbolname/option → オプションの銘柄コード取得
■ 板情報
GET /board/{symbol} → 板情報(10本値含む)
■ 注文
POST /sendorder/future → 先物注文(v1.9.0: Password不要)
POST /sendorder/option → オプション注文(v1.9.0: Password不要)
PUT /cancelorder → 注文取消(v1.9.0: Password不要)
■ 照会
GET /orders → 注文一覧
GET /positions → ポジション一覧
GET /wallet/future → 先物余力
GET /wallet/option → OP余力
■ PUSH配信登録
PUT /register → WebSocket配信銘柄登録(最大50銘柄)
PUT /unregister/all → 全銘柄登録解除
:::message
v1.9.0での重要な変更: 以前は注文発注・取消時に Password フィールドが必要でしたが、v1.9.0で廃止されました。本シリーズのコードはv1.9.0以降を前提としています。
:::
デリバティブ固有のコード体系
先物・オプションAPIでは、いくつかの数値コードが頻出します。最初に押さえておくと後がスムーズです。
| パラメータ | 値 | 意味 |
|---|---|---|
| Exchange | 2 |
日通し(デリバティブ推奨) |
| Exchange | 23 |
日中取引のみ |
| Exchange | 24 |
夜間取引のみ |
| Side | "1" |
売り |
| Side | "2" |
買い |
| TradeType | 1 |
新規建て |
| TradeType | 2 |
返済(決済) |
| FrontOrderType | 120 |
成行 |
| FrontOrderType | 20 |
指値 |
| FrontOrderType | 30 |
逆指値 |
| TimeInForce | 1 |
FAS(Fill and Store) |
| TimeInForce | 2 |
FAK(Fill and Kill) |
| TimeInForce | 3 |
FOK(Fill or Kill) |
ここでTimeInForceについて補足しておくと、FAKは「約定できる分だけ約定して残りはキャンセル」、FASは「約定できなかった分は板に残す」、FOKは「全数量が一度に約定しなければ全キャンセル」という意味です。デリバティブ取引では流動性の観点からFAKがよく使われます。
3. ドメインモデルの設計
API連携コードを書く前に、まずドメインモデルを整理しましょう。これは第4回で作ったDB層と証券APIの間をつなぐ「共通言語」になります。
市場データモデル(market.py)
板情報やティックデータを表現するモデルです。
# src/systrade/models/market.py
"""市場データモデル: 価格・板情報・銘柄"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class Exchange(str, Enum):
OSE = "OSE" # 大阪取引所
class ProductType(str, Enum):
FUTURES_LARGE = "futures_large" # 日経225先物(ラージ)
FUTURES_MINI = "futures_mini" # 日経225ミニ先物
FUTURES_MICRO = "futures_micro" # 日経225マイクロ先物
OPTION_CALL = "option_call" # コールオプション
OPTION_PUT = "option_put" # プットオプション
# 先物1枚当たりの乗数
MULTIPLIER = {
ProductType.FUTURES_LARGE: 1000,
ProductType.FUTURES_MINI: 100,
ProductType.FUTURES_MICRO: 10,
ProductType.OPTION_CALL: 1000,
ProductType.OPTION_PUT: 1000,
}
@dataclass
class Quote:
"""板の気配値1段"""
price: float
qty: int
@dataclass
class Board:
"""板情報"""
symbol: str
symbol_name: str
current_price: float
timestamp: datetime
bid_price: float = 0.0
bid_qty: int = 0
ask_price: float = 0.0
ask_qty: int = 0
volume: int = 0
open_price: float = 0.0
high_price: float = 0.0
low_price: float = 0.0
previous_close: float = 0.0
bids: list[Quote] = field(default_factory=list)
asks: list[Quote] = field(default_factory=list)
@dataclass
class SymbolInfo:
"""銘柄情報"""
symbol: str
symbol_name: str
product_type: ProductType
exchange: Exchange = Exchange.OSE
strike_price: float | None = None # オプションのみ
expiry_date: datetime | None = None # SQ日
underlying_symbol: str | None = None # 原資産
@dataclass
class Tick:
"""リアルタイムティックデータ"""
symbol: str
price: float
volume: int
timestamp: datetime
bid: float = 0.0
ask: float = 0.0
ProductType で先物のラージ/ミニ/マイクロとオプションのコール/プットを区別しています。MULTIPLIER は1枚あたりの乗数で、たとえばミニ先物なら1円の値動きが100円の損益になります。
Board は板情報を表すモデルです。ベスト気配(bid/ask)に加えて、bids / asks で最大10本値の気配情報を保持できます。
注文モデル(order.py)
# src/systrade/models/order.py
"""注文・約定モデル"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class Side(str, Enum):
BUY = "buy"
SELL = "sell"
class OrderType(str, Enum):
MARKET = "market" # 成行
LIMIT = "limit" # 指値
class TimeInForce(str, Enum):
FAK = "FAK" # Fill and Kill
FAS = "FAS" # Fill and Store
FOK = "FOK" # Fill or Kill
class OrderStatus(str, Enum):
PENDING = "pending" # 未発注
SUBMITTED = "submitted" # 発注済み
PARTIAL = "partial" # 一部約定
FILLED = "filled" # 全約定
CANCELLED = "cancelled" # 取消済み
REJECTED = "rejected" # 拒否
EXPIRED = "expired" # 期限切れ
class TradeType(str, Enum):
NEW = "new" # 新規
CLOSE = "close" # 返済
@dataclass
class OrderRequest:
"""注文リクエスト"""
symbol: str
side: Side
qty: int
order_type: OrderType = OrderType.MARKET
price: float | None = None
trade_type: TradeType = TradeType.NEW
time_in_force: TimeInForce = TimeInForce.FAK
strategy_name: str = ""
@dataclass
class Order:
"""注文"""
id: str
symbol: str
side: Side
qty: int
order_type: OrderType
status: OrderStatus = OrderStatus.PENDING
price: float | None = None
filled_qty: int = 0
filled_price: float = 0.0
trade_type: TradeType = TradeType.NEW
strategy_name: str = ""
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
broker_order_id: str = ""
message: str = ""
ポイントは OrderRequest と Order を分離していることです。OrderRequest は「これから出す注文」、Order は「証券会社に出した後の注文」を表します。注文発注後にAPIから返ってくる OrderId は broker_order_id に格納します。
ポジションモデル(position.py)
# src/systrade/models/position.py
"""ポジションモデル"""
from dataclasses import dataclass, field
from datetime import datetime
from src.systrade.models.market import ProductType, MULTIPLIER
from src.systrade.models.order import Side
@dataclass
class Position:
"""個別ポジション"""
symbol: str
side: Side
qty: int
avg_price: float
product_type: ProductType
current_price: float = 0.0
strategy_name: str = ""
opened_at: datetime = field(default_factory=datetime.now)
@property
def multiplier(self) -> int:
return MULTIPLIER.get(self.product_type, 100)
@property
def unrealized_pnl(self) -> float:
"""含み損益(円)"""
sign = 1 if self.side == Side.BUY else -1
return sign * (self.current_price - self.avg_price) * self.qty * self.multiplier
@property
def notional(self) -> float:
"""想定元本"""
return self.current_price * self.qty * self.multiplier
Position の unrealized_pnl プロパティに注目してください。買いポジションなら (現在値 - 平均取得単価) * 枚数 * 乗数 で含み損益を計算し、売りポジションなら符号を反転させています。ミニ先物1枚で日経平均が100円動くと 100 * 1 * 100 = 10,000円 の損益が発生する、という計算です。
4. ブローカー抽象基底クラス -- base.py
ここからがいよいよAPI連携の本丸です。まずは「どの証券会社でも共通のインターフェース」を定義する抽象基底クラスを作ります。
# src/systrade/broker/base.py
"""ブローカー抽象インターフェース"""
from abc import ABC, abstractmethod
from src.systrade.models.market import Board, SymbolInfo
from src.systrade.models.order import Order, OrderRequest
from src.systrade.models.position import Position
class Broker(ABC):
"""ブローカーAPI抽象基底クラス"""
@abstractmethod
async def authenticate(self) -> str:
"""認証してAPIトークンを取得"""
...
@abstractmethod
async def get_board(self, symbol: str) -> Board:
"""板情報を取得"""
...
@abstractmethod
async def get_symbol_info_future(
self, deriv_month: int, future_code: str = ""
) -> SymbolInfo:
"""先物銘柄情報を取得"""
...
@abstractmethod
async def get_symbol_info_option(
self,
deriv_month: int,
put_or_call: str,
strike_price: int,
) -> SymbolInfo:
"""オプション銘柄情報を取得"""
...
@abstractmethod
async def send_future_order(self, request: OrderRequest) -> Order:
"""先物注文を発注"""
...
@abstractmethod
async def send_option_order(self, request: OrderRequest) -> Order:
"""オプション注文を発注"""
...
@abstractmethod
async def cancel_order(self, order_id: str) -> bool:
"""注文を取消"""
...
@abstractmethod
async def get_positions(self) -> list[Position]:
"""ポジション一覧を取得"""
...
@abstractmethod
async def get_orders(self) -> list[Order]:
"""注文一覧を取得"""
...
@abstractmethod
async def get_wallet_future(self) -> dict:
"""先物余力を取得"""
...
@abstractmethod
async def get_wallet_option(self) -> dict:
"""オプション余力を取得"""
...
なぜ抽象クラスを挟むのか?
「kabuステーションしか使わないのに抽象化する意味あるの?」と思うかもしれません。これには3つの理由があります。
- テスト容易性: テスト時にモックブローカーを差し込めます。実際のAPIを叩かずにトレーディングエンジンのロジックをテストできます
- 将来の拡張性: 他の証券APIに乗り換えたくなった場合、
Brokerを実装した新しいクラスを作るだけで済みます - 設計の明確化: インターフェースを先に定義することで「証券APIに何を求めるか」が明確になります
すべてのメソッドが async になっている点も重要です。証券APIへの通信はI/O待ちが発生するため、asyncio による非同期処理が自然にフィットします。
5. 指数バックオフ付きリトライ機構 -- retry.py
REST APIクライアントの実装に入る前に、リトライ機構を先に作ります。ネットワーク通信は失敗がつきものですから、ここは堅牢に作っておきたいところです。
# src/systrade/broker/retry.py
"""指数バックオフ付きリトライデコレータ
リトライ対象: 5xx、タイムアウト、接続エラー
4xx(クライアントエラー)はリトライしない。
"""
import asyncio
import functools
import logging
import httpx
logger = logging.getLogger(__name__)
def with_retry(max_retries: int = 3, base_delay: float = 1.0):
"""非同期メソッド用リトライデコレータ(指数バックオフ)
Args:
max_retries: 最大リトライ回数(初回を除く)
base_delay: 初回リトライまでの待機秒数(以降2倍ずつ増加)
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exc: Exception | None = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
# 4xx はリトライしない
if 400 <= e.response.status_code < 500:
raise
last_exc = e
except (httpx.TimeoutException, httpx.ConnectError) as e:
last_exc = e
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
logger.warning(
"%s リトライ %d/%d (%.1f秒後): %s",
func.__qualname__, attempt + 1, max_retries, delay, last_exc,
)
await asyncio.sleep(delay)
# 全リトライ失敗
logger.error("%s 全リトライ失敗: %s", func.__qualname__, last_exc)
raise last_exc # type: ignore[misc]
return wrapper
return decorator
設計のポイント
このリトライデコレータには3つの重要な設計判断が含まれています。
1. 4xxはリトライしない
if 400 <= e.response.status_code < 500:
raise
4xx系(クライアントエラー)は、リクエスト内容が間違っているのでリトライしても結果は変わりません。たとえば「存在しない銘柄コードで板情報を取得しようとした」場合は400番台が返ります。これをリトライしても無駄なので、即座に例外を上げます。
一方、5xx系(サーバーエラー)やタイムアウト・接続エラーは一時的な障害の可能性があるのでリトライ対象とします。
2. 指数バックオフ(Exponential Backoff)
delay = base_delay * (2 ** attempt)
リトライ間隔を 1秒 → 2秒 → 4秒 と倍々に増やしていきます。一定間隔でリトライすると、サーバーが過負荷のときにさらに負荷をかけてしまう「リトライストーム」が起きるリスクがあります。指数バックオフはこれを緩和する定番パターンです。
3. デコレータパターンで宣言的に適用
@with_retry()
async def authenticate(self) -> str:
...
@with_retry(max_retries=5, base_delay=0.5)
async def get_board(self, symbol: str) -> Board:
...
@with_retry() を付けるだけでリトライロジックが適用されます。メソッド本体にリトライのコードが混ざらないので、ビジネスロジックが読みやすく保たれます。デフォルトは3回リトライですが、メソッドごとにカスタマイズも可能です。
6. REST API クライアント実装 -- kabu_api.py
いよいよメインディッシュ、kabuステーションAPIのRESTクライアントです。ここが最もボリュームのあるセクションですが、順番に見ていけば怖くありません。
6.1 クラスの初期化とHTTPクライアント
# src/systrade/broker/kabu_api.py
"""kabuステーションAPI REST実装
API仕様: https://kabucom.github.io/kabusapi/reference/index.html
v1.9.0: 注文・取消からPassword削除
"""
import logging
from datetime import datetime
import httpx
from src.systrade.broker.base import Broker
from src.systrade.broker.retry import with_retry
from src.systrade.config import BrokerConfig
from src.systrade.models.market import (
Board, Exchange, ProductType, Quote, SymbolInfo,
)
from src.systrade.models.order import (
Order, OrderRequest, OrderStatus, OrderType, Side, TradeType,
)
from src.systrade.models.position import Position
logger = logging.getLogger(__name__)
まず、APIの数値コードとドメインモデルの対応を辞書で定義します。
# --- kabuステーションAPI定数 ---
# Side: "1"=売, "2"=買
_SIDE_MAP = {Side.BUY: "2", Side.SELL: "1"}
# TradeType: 1=新規, 2=返済
_TRADE_TYPE_MAP = {TradeType.NEW: 1, TradeType.CLOSE: 2}
# FrontOrderType(デリバティブ): 120=成行, 20=指値, 18=引成(FAK), 28=引指(FAS), 30=逆指値
_FRONT_ORDER_TYPE_MAP = {OrderType.MARKET: 120, OrderType.LIMIT: 20}
# SecurityType → ProductType
_SEC_TYPE_MAP = {
101: ProductType.FUTURES_LARGE, # 日経225先物
901: ProductType.FUTURES_MINI, # 日経225ミニ先物
103: ProductType.OPTION_CALL, # 日経225OP(Call/Putは別途判定)
}
# FutureCode → ProductType
_FUTURE_CODE_MAP = {
"NK225": ProductType.FUTURES_LARGE,
"NK225mini": ProductType.FUTURES_MINI,
"NK225micro": ProductType.FUTURES_MICRO,
}
# Orders API State: 1=待機, 2=処理中, 3=処理済, 4=訂正取消送信中, 5=終了
_ORDER_STATE_MAP = {
1: OrderStatus.PENDING,
2: OrderStatus.SUBMITTED,
3: OrderStatus.SUBMITTED,
4: OrderStatus.SUBMITTED,
5: OrderStatus.FILLED, # 終了(約定/取消/失効を含む)
}
APIが返す数値コード("1", "2", 101, 901 など)と、私たちのドメインモデル(Side.BUY, ProductType.FUTURES_MINI など)を辞書でマッピングしています。APIの生の数値をコード全体にばらまかず、ここで一元管理するのがポイントです。
続いてクラス本体の初期化部分です。
class KabuApi(Broker):
"""kabuステーションAPI REST クライアント"""
def __init__(self, config: BrokerConfig) -> None:
self._config = config
self._base_url = config.base_url
self._token: str = ""
self._client = httpx.AsyncClient(timeout=10.0)
@property
def _headers(self) -> dict[str, str]:
return {"X-API-KEY": self._token, "Content-Type": "application/json"}
async def close(self) -> None:
await self._client.aclose()
HTTPクライアントにはhttpxを使っています。Pythonの requests ライブラリとほぼ同じAPIでありながら、AsyncClient で非同期通信に対応しているのが最大の利点です。FastAPIのイベントループ内で requests を使うとブロッキングしてしまいますが、httpxならその心配がありません。
timeout=10.0 はデフォルトのタイムアウト値で、10秒以内にレスポンスが返ってこなければ TimeoutException が発生します。リトライデコレータがこれをキャッチしてリトライしてくれます。
認証トークンは _headers プロパティで X-API-KEY ヘッダーとして自動付与されます。
6.2 認証(トークン取得)
@with_retry()
async def authenticate(self) -> str:
"""POST /token で認証トークンを取得"""
resp = await self._client.post(
f"{self._base_url}/token",
json={"APIPassword": self._config.password},
)
resp.raise_for_status()
self._token = resp.json()["Token"]
logger.info("kabuステーションAPI認証成功")
return self._token
kabuステーションAPIの認証は非常にシンプルです。POST /token にAPIパスワードを送ると、トークンが返ってきます。このトークンを以後のすべてのリクエストの X-API-KEY ヘッダーに付けます。
:::message トークンはkabuステーションを再起動するまで有効です。一度取得すれば、1日中使い回すことができます。 :::
6.3 銘柄情報の取得
先物やオプションの銘柄コードは、限月(期限)によって変わります。たとえば同じ「日経225ミニ先物」でも、2025年6月限と2025年9月限では異なる銘柄コードが割り当てられます。
@with_retry()
async def get_symbol_info_future(
self, deriv_month: int, future_code: str = "NK225mini"
) -> SymbolInfo:
"""GET /symbolname/future — 先物銘柄コードを取得
Args:
deriv_month: 限月 yyyyMM形式。0=期近
future_code: NK225 / NK225mini / NK225micro
"""
resp = await self._client.get(
f"{self._base_url}/symbolname/future",
params={"DerivMonth": deriv_month, "FutureCode": future_code},
headers=self._headers,
)
resp.raise_for_status()
data = resp.json()
return SymbolInfo(
symbol=data["Symbol"],
symbol_name=data["SymbolName"],
product_type=_FUTURE_CODE_MAP.get(future_code, ProductType.FUTURES_MINI),
)
deriv_month=0 を指定すると期近(最も直近の限月)の銘柄コードが返ります。これは便利で、限月のロールオーバー(期限切れによる乗り換え)を意識せずに「今一番流動性のある銘柄」を自動で取得できます。
オプションの場合は、加えて put_or_call("P" or "C")と strike_price(権利行使価格)も指定します。
@with_retry()
async def get_symbol_info_option(
self,
deriv_month: int,
put_or_call: str,
strike_price: int,
) -> SymbolInfo:
"""GET /symbolname/option — オプション銘柄コードを取得
Args:
deriv_month: 限月 yyyyMM形式。0=期近
put_or_call: "P" or "C"
strike_price: 権利行使価格。0=ATM
"""
resp = await self._client.get(
f"{self._base_url}/symbolname/option",
params={
"DerivMonth": deriv_month,
"PutOrCall": put_or_call,
"StrikePrice": strike_price,
},
headers=self._headers,
)
resp.raise_for_status()
data = resp.json()
pt = ProductType.OPTION_PUT if put_or_call.upper() == "P" else ProductType.OPTION_CALL
return SymbolInfo(
symbol=data["Symbol"],
symbol_name=data["SymbolName"],
product_type=pt,
strike_price=float(strike_price) if strike_price else None,
)
6.4 板情報の取得
板情報はトレーディングの要です。現在の最良気配(ベストビッド/ベストアスク)と10本値の気配情報を取得します。
@with_retry()
async def get_board(self, symbol: str) -> Board:
"""GET /board/{symbol} — 板情報を取得
symbol: "銘柄コード@取引所コード" (例: "165120019@2")
"""
resp = await self._client.get(
f"{self._base_url}/board/{symbol}",
headers=self._headers,
)
resp.raise_for_status()
d = resp.json()
# 板情報はネストオブジェクト: Sell1={Price,Qty,Time,Sign}, Sell2〜10={Price,Qty}
def _parse_quotes(data: dict, prefix: str, count: int) -> list[Quote]:
quotes = []
for i in range(1, count + 1):
level = data.get(f"{prefix}{i}")
if level and level.get("Price") is not None:
quotes.append(Quote(
price=float(level["Price"]),
qty=int(level.get("Qty", 0)),
))
return quotes
return Board(
symbol=str(d.get("Symbol", "")),
symbol_name=str(d.get("SymbolName", "")),
current_price=float(d.get("CurrentPrice", 0) or 0),
timestamp=datetime.now(),
bid_price=float(d.get("BidPrice", 0) or 0),
bid_qty=int(d.get("BidQty", 0) or 0),
ask_price=float(d.get("AskPrice", 0) or 0),
ask_qty=int(d.get("AskQty", 0) or 0),
volume=int(d.get("TradingVolume", 0) or 0),
open_price=float(d.get("OpeningPrice", 0) or 0),
high_price=float(d.get("HighPrice", 0) or 0),
low_price=float(d.get("LowPrice", 0) or 0),
previous_close=float(d.get("PreviousClose", 0) or 0),
bids=_parse_quotes(d, "Sell", 10), # Sell=売り気配(こちらが買える値段)
asks=_parse_quotes(d, "Buy", 10), # Buy=買い気配(こちらが売れる値段)
)
ここで一点注意が必要なのは、APIの Sell / Buy とドメインモデルの bids / asks が逆になっていることです。
- APIの
Sell1〜Sell10:売り気配 = 私たちが買える値段 =bids(板の「売り板」) - APIの
Buy1〜Buy10:買い気配 = 私たちが売れる値段 =asks(板の「買い板」)
これはkabuステーションAPI固有の命名規則で、「誰が」売り/買いなのかの視点が異なります。コメントで明記しておくと混乱を防げます。
また、板情報の各レベルは {Price, Qty, Time, Sign} というネストオブジェクトになっていて、Sell1 だけ Time と Sign を持ち、Sell2 以降は Price と Qty のみという微妙に非対称な構造です。_parse_quotes ヘルパー関数でこの差異を吸収しています。
6.5 注文発注と取消
注文発注は先物とオプションで別のエンドポイントですが、リクエストボディの構造はほぼ同じです。
@with_retry()
async def send_future_order(self, request: OrderRequest) -> Order:
"""POST /sendorder/future — 先物注文
Exchange: 2=日通し, 23=日中, 24=夜間
"""
body = {
"Symbol": request.symbol,
"Exchange": 2, # 日通し
"TradeType": _TRADE_TYPE_MAP[request.trade_type],
"TimeInForce": 2, # FAK
"Side": _SIDE_MAP[request.side],
"Qty": request.qty,
"FrontOrderType": _FRONT_ORDER_TYPE_MAP[request.order_type],
"Price": request.price if request.price else 0,
"ExpireDay": 0,
}
resp = await self._client.post(
f"{self._base_url}/sendorder/future",
json=body,
headers=self._headers,
)
resp.raise_for_status()
data = resp.json()
logger.info("先物注文発注: %s", data)
return Order(
id=str(data.get("OrderId", "")),
symbol=request.symbol,
side=request.side,
qty=request.qty,
order_type=request.order_type,
price=request.price,
status=OrderStatus.SUBMITTED,
trade_type=request.trade_type,
strategy_name=request.strategy_name,
broker_order_id=str(data.get("OrderId", "")),
)
注目すべきポイントをいくつか挙げます。
- Exchange=2(日通し): デリバティブ取引では、日中セッション(8:45〜15:15)と夜間セッション(16:30〜翌6:00)を通して有効な「日通し」注文が一般的です
- TimeInForce=2(FAK): 自動売買では「約定できる分だけ即座に約定し、残りはキャンセル」するFAKが安全です
- Price=0: 成行注文の場合は0を指定します
- v1.9.0でPassword不要: 以前のバージョンではリクエストボディに
"Password": "xxx"が必要でしたが、廃止されました
注文取消もシンプルです。
@with_retry()
async def cancel_order(self, order_id: str) -> bool:
"""PUT /cancelorder — 注文取消(v1.9.0: Password不要)"""
resp = await self._client.put(
f"{self._base_url}/cancelorder",
json={"OrderId": order_id},
headers=self._headers,
)
resp.raise_for_status()
data = resp.json()
logger.info("注文取消: %s", data)
return data.get("OrderId") == order_id
OrderId だけで取消できます。返り値の OrderId が一致していれば取消成功です。
6.6 ポジション一覧の取得
@with_retry()
async def get_positions(self) -> list[Position]:
"""GET /positions — ポジション一覧
product=3(先物)+4(OP), addinfo=trueで現在値・評価損益取得
"""
positions: list[Position] = []
for product in (3, 4): # 3=先物, 4=OP
resp = await self._client.get(
f"{self._base_url}/positions",
params={"product": product, "addinfo": "true"},
headers=self._headers,
)
resp.raise_for_status()
for d in resp.json():
leaves = int(d.get("LeavesQty", 0) or 0)
if leaves == 0:
continue
side = Side.BUY if d.get("Side") == "2" else Side.SELL
sec_type = d.get("SecurityType", 0)
pt = _SEC_TYPE_MAP.get(sec_type, ProductType.FUTURES_MINI)
# OPのPut/Call判定はSymbolNameから推定
if sec_type == 103:
name = d.get("SymbolName", "")
pt = (ProductType.OPTION_PUT
if "プット" in name or "P" in name.upper().split()[-1:]
else ProductType.OPTION_CALL)
positions.append(
Position(
symbol=str(d.get("Symbol", "")),
side=side,
qty=leaves,
avg_price=float(d.get("Price", 0) or 0),
product_type=pt,
current_price=float(d.get("CurrentPrice", 0) or 0),
)
)
return positions
いくつかのポイントを解説します。
product=3,4のループ: kabuステーションAPIでは、先物(product=3)とオプション(product=4)を別々にリクエストする必要があります。1回のリクエストでまとめて取れないため、ループで2回取得しています。
addinfo=true: このパラメータを付けると、ポジションの CurrentPrice(現在値)や ProfitLoss(評価損益)も返ってきます。付けないと建値だけしか分からないので、基本的に常に true にしておくのがおすすめです。
LeavesQty=0のスキップ: 全量決済済みのポジションも一覧に残ることがあるため、残数量が0のものは除外しています。
Put/Callの判定: ここが少し泥臭いところで、APIレスポンスには明示的なPut/Callフラグがありません。SecurityType=103(オプション)の場合、SymbolName に含まれる「プット」という文字列やシンボル末尾の "P" から推定しています。
6.7 注文一覧の取得 -- State=5の落とし穴
@with_retry()
async def get_orders(self) -> list[Order]:
"""GET /orders — 注文一覧
State: 1=待機, 2=処理中, 3=処理済, 4=訂正取消送信中, 5=終了
終了は約定/取消/失効を含む → Detailsで判別
"""
orders = []
for product in (3, 4):
resp = await self._client.get(
f"{self._base_url}/orders",
params={"product": product},
headers=self._headers,
)
resp.raise_for_status()
for d in resp.json():
side = Side.BUY if d.get("Side") == "2" else Side.SELL
state = d.get("State", 1)
status = _ORDER_STATE_MAP.get(state, OrderStatus.PENDING)
# State=5(終了)の場合、Detailsから約定/取消を判別
if state == 5:
details = d.get("Details", [])
cum_qty = float(d.get("CumQty", 0) or 0)
order_qty = float(d.get("OrderQty", 0) or 0)
if cum_qty >= order_qty and order_qty > 0:
status = OrderStatus.FILLED
elif any(det.get("RecType") == 6 for det in details):
status = OrderStatus.CANCELLED
elif any(det.get("RecType") == 3 for det in details):
status = OrderStatus.EXPIRED
else:
status = OrderStatus.FILLED
orders.append(
Order(
id=str(d.get("ID", "")),
symbol=str(d.get("Symbol", "")),
side=side,
qty=int(d.get("OrderQty", 0) or 0),
order_type=OrderType.MARKET,
status=status,
price=float(d.get("Price", 0) or 0),
filled_qty=int(d.get("CumQty", 0) or 0),
filled_price=float(d.get("Price", 0) or 0),
broker_order_id=str(d.get("ID", "")),
)
)
return orders
:::message alert State=5 は「終了」だが、中身は3通りある
kabuステーションAPIの注文ステータスで最も注意が必要なのが State=5(終了)です。これは以下の3つの意味を含みます。
| 条件 | 意味 |
|---|---|
CumQty >= OrderQty かつ OrderQty > 0 |
全量約定(FILLED) |
Details に RecType=6 がある |
取消済み(CANCELLED) |
Details に RecType=3 がある |
失効(EXPIRED) |
単に State=5 を全部 FILLED にしてしまうと、取り消した注文も「約定した」と認識されてしまい、ポジション管理が狂います。必ず Details の中身まで見て判別しましょう。
:::
6.8 PUSH配信用の銘柄登録
WebSocket PUSHでリアルタイム配信を受けるには、事前に PUT /register で銘柄を登録する必要があります。
@with_retry()
async def register(self, symbols: list[dict]) -> dict:
"""PUT /register — 銘柄登録(WebSocket PUSH配信用)
Args:
symbols: [{"Symbol": "165120019", "Exchange": 2}, ...]
"""
resp = await self._client.put(
f"{self._base_url}/register",
json={"Symbols": symbols},
headers=self._headers,
)
resp.raise_for_status()
return resp.json()
@with_retry()
async def unregister_all(self) -> dict:
"""PUT /unregister/all — 全銘柄登録解除"""
resp = await self._client.put(
f"{self._base_url}/unregister/all",
headers=self._headers,
)
resp.raise_for_status()
return resp.json()
登録できる銘柄数は最大50銘柄です。日経225オプションは権利行使価格ごとに銘柄コードが異なるため、チェーン全体を監視しようとすると50銘柄の制限にすぐ到達します。この辺りは監視対象を絞る戦略が必要になりますが、それは第7回(戦略とリスク管理)で詳しく扱います。
使い方の例を示しておきます。
# ミニ先物の期近を登録
symbol_info = await api.get_symbol_info_future(deriv_month=0, future_code="NK225mini")
await api.register([
{"Symbol": symbol_info.symbol, "Exchange": 2}
])
# 複数銘柄をまとめて登録
await api.register([
{"Symbol": "165120019", "Exchange": 2}, # ミニ先物
{"Symbol": "165220019", "Exchange": 2}, # ラージ先物
])
6.9 余力照会
余力(証拠金の空き状況)の照会もシンプルです。
@with_retry()
async def get_wallet_future(self) -> dict:
"""GET /wallet/future — 先物余力(新規建玉可能額等)"""
resp = await self._client.get(
f"{self._base_url}/wallet/future",
headers=self._headers,
)
resp.raise_for_status()
return resp.json()
@with_retry()
async def get_wallet_option(self) -> dict:
"""GET /wallet/option — OP余力(買建玉可能額・売建玉可能額)"""
resp = await self._client.get(
f"{self._base_url}/wallet/option",
headers=self._headers,
)
resp.raise_for_status()
return resp.json()
余力情報はリスク管理で重要になります。「新しい注文を出す前に証拠金が足りるか」を確認する際に使います。
7. WebSocket PUSH リアルタイム配信 -- kabu_ws.py
RESTは「聞いたら答えてくれる」通信ですが、リアルタイムの価格配信にはWebSocketを使います。一度接続すれば、登録した銘柄の板情報が値動きのたびにプッシュされてきます。
7.1 全体構造
# src/systrade/broker/kabu_ws.py
"""kabuステーションAPI WebSocket リアルタイム配信
PUSH仕様: https://kabucom.github.io/kabusapi/ptal/push.html
メッセージはBoard APIと同じ形式(IV, Delta等グリークス含む)
事前にPUT /registerで銘柄登録が必要(最大50銘柄)
"""
import asyncio
import json
import logging
from datetime import datetime
import websockets
from src.systrade.models.market import Tick
logger = logging.getLogger(__name__)
WS_URL = "ws://localhost:18080/kabusapi/websocket"
class KabuWebSocket:
"""WebSocket接続でリアルタイム価格をasyncioキューに配信
PUSHメッセージはBoardSuccessと同じフォーマット。
値の更新がある場合にのみ配信される。
"""
def __init__(self, url: str = WS_URL) -> None:
self._url = url
self._ws = None
self._running = False
self._queue: asyncio.Queue[Tick] = asyncio.Queue(maxsize=10000)
self._raw_queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=10000)
self._task: asyncio.Task | None = None
@property
def queue(self) -> asyncio.Queue[Tick]:
return self._queue
@property
def raw_queue(self) -> asyncio.Queue[dict]:
"""生のPUSHメッセージ(Board形式)を取得するキュー"""
return self._raw_queue
2つのキューを持っているのがポイントです。
_queue:Tickオブジェクトに変換済みのキュー。価格・出来高のみ必要な場面で使います_raw_queue: APIレスポンスの生辞書をそのまま入れるキュー。オプションのIV(インプライド・ボラティリティ)やグリークス(Delta, Gamma等)を参照する戦略向けです
7.2 接続・停止
async def start(self) -> None:
"""WebSocket接続を開始してバックグラウンドで受信"""
self._running = True
self._task = asyncio.create_task(self._receive_loop())
logger.info("WebSocket受信開始: %s", self._url)
async def stop(self) -> None:
"""WebSocket接続を停止"""
self._running = False
if self._ws:
await self._ws.close()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("WebSocket受信停止")
start() は asyncio.create_task() でバックグラウンドタスクを起動するだけです。実際の受信処理は _receive_loop() に委譲されます。
stop() では、まず _running フラグを False にしてループ終了を通知し、WebSocket接続を閉じ、最後にタスクをキャンセルします。CancelledError は正常な終了なので握りつぶしています。
7.3 受信ループと自動再接続
ここが WebSocket 実装のハイライトです。
async def _receive_loop(self) -> None:
while self._running:
try:
async with websockets.connect(self._url) as ws:
self._ws = ws
logger.info("WebSocket接続確立")
async for message in ws:
try:
data = json.loads(message)
# 生データも配信(グリークス等を利用する戦略向け)
self._enqueue(self._raw_queue, data)
tick = self._parse_tick(data)
if tick:
self._enqueue(self._queue, tick)
except (json.JSONDecodeError, KeyError) as e:
logger.warning("WebSocketメッセージ解析エラー: %s", e)
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket接続切断、3秒後に再接続...")
await asyncio.sleep(3)
except Exception as e:
logger.error("WebSocketエラー: %s", e)
await asyncio.sleep(5)
この受信ループには3層の防御が組み込まれています。
第1層: while self._running による永続ループ
WebSocketが切断されても、_running が True である限り再接続を試みます。接続が切れるたびにwhileループの先頭に戻り、websockets.connect() で新しい接続を張り直します。
第2層: ConnectionClosed のハンドリング
ネットワーク障害やkabuステーションの再起動などで接続が切断された場合、3秒待ってから再接続します。即座に再接続するとサーバーに負荷をかけるので、少し間を空けるのがマナーです。
第3層: 汎用 Exception のキャッチ
予期しないエラー(DNS解決失敗、権限エラーなど)の場合は5秒待ちます。ここをキャッチしないと、タスク全体が死んでしまい二度と復活しません。
つまり、接続 → 受信 → 切断検知 → 待機 → 再接続というサイクルが _running フラグが True である限り繰り返されます。
7.4 キューの溢れ対策
高頻度にメッセージが飛んでくると、消費側(トレーディングエンジンなど)の処理が追いつかずキューが溢れる可能性があります。
@staticmethod
def _enqueue(q: asyncio.Queue, item) -> None:
try:
q.put_nowait(item)
except asyncio.QueueFull:
try:
q.get_nowait()
except asyncio.QueueEmpty:
pass
q.put_nowait(item)
キューが満杯(maxsize=10000)になった場合、一番古いデータを捨てて最新データを入れます。金融データでは「最新の価格」が最も重要で、古い価格を処理するより新しい価格を優先すべきだからです。
これは「リングバッファ」的な発想で、メモリを無限に消費することなく、常に直近のデータを保持できます。
7.5 PUSHメッセージのパース
@staticmethod
def _parse_tick(data: dict) -> Tick | None:
"""PUSHメッセージ(Board形式)からTickを生成"""
symbol = data.get("Symbol")
price = data.get("CurrentPrice")
if symbol is None or price is None:
return None
return Tick(
symbol=str(symbol),
price=float(price),
volume=int(data.get("TradingVolume", 0) or 0),
timestamp=datetime.now(),
bid=float(data.get("BidPrice", 0) or 0),
ask=float(data.get("AskPrice", 0) or 0),
)
PUSHメッセージはREST APIの GET /board と同じJSON形式です。ただし、値が変化したフィールドのみが含まれる場合があります。Symbol か CurrentPrice がない場合は有効なティックとして扱わず None を返します。
:::message
WebSocket PUSHメッセージにはオプションのIV(インプライド・ボラティリティ)やDelta/Gamma/Theta/Vegaといったグリークスも含まれます。_raw_queue にはパース前の生データが入るので、オプション戦略ではそちらを参照します。
:::
8. 市場データサービス -- market_data.py
REST APIクライアントとWebSocketクライアントを組み合わせて、上位レイヤー(トレーディングエンジン)に使いやすいインターフェースを提供するのが MarketDataService の役割です。
# src/systrade/services/market_data.py
"""市場データ取得・管理"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import TYPE_CHECKING
from src.systrade.broker.kabu_api import KabuApi
from src.systrade.broker.kabu_ws import KabuWebSocket
from src.systrade.models.market import Board, Tick
if TYPE_CHECKING:
from src.systrade.db.repositories.tick_repo import TickRepository
logger = logging.getLogger(__name__)
TICK_SAMPLE_INTERVAL = 10 # 秒
class MarketDataService:
"""市場データの取得・キャッシュ・配信を管理"""
def __init__(
self,
broker: KabuApi,
ws: KabuWebSocket,
tick_repo: TickRepository | None = None,
) -> None:
self._broker = broker
self._ws = ws
self._tick_repo = tick_repo
self._boards: dict[str, Board] = {}
self._latest_ticks: dict[str, Tick] = {}
self._last_saved: dict[str, datetime] = {} # サンプリング制御
self._callbacks: list = []
self._running = False
self._task: asyncio.Task | None = None
@property
def latest_ticks(self) -> dict[str, Tick]:
return self._latest_ticks
def get_price(self, symbol: str) -> float | None:
tick = self._latest_ticks.get(symbol)
return tick.price if tick else None
async def fetch_board(self, symbol: str) -> Board:
board = await self._broker.get_board(symbol)
self._boards[symbol] = board
return board
def on_tick(self, callback) -> None:
"""ティック受信時のコールバックを登録"""
self._callbacks.append(callback)
8.1 ライフサイクル管理
async def start(self) -> None:
"""WebSocketからのティック受信を開始"""
await self._ws.start()
self._running = True
self._task = asyncio.create_task(self._process_ticks())
logger.info("MarketDataService開始")
async def stop(self) -> None:
self._running = False
await self._ws.stop()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("MarketDataService停止")
start() でWebSocket受信とティック処理の両方を起動し、stop() でクリーンに終了します。FastAPIの lifespan イベントからこれらを呼び出すことで、アプリケーションの起動・終了と連動させます。
8.2 ティック処理ループ
async def _process_ticks(self) -> None:
while self._running:
try:
tick = await asyncio.wait_for(self._ws.queue.get(), timeout=1.0)
self._latest_ticks[tick.symbol] = tick
# サンプリング保存(10秒間隔)
await self._maybe_save_tick(tick)
for cb in self._callbacks:
try:
result = cb(tick)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error("ティックコールバックエラー: %s", e)
except asyncio.TimeoutError:
continue
このループは以下の3つの処理を行います。
- 最新ティックのキャッシュ:
_latest_ticks辞書に銘柄ごとの最新価格を保持。get_price()メソッドで即座に参照できます - DB保存: 10秒間隔でティックをサンプリング保存(後述)
- コールバック呼び出し: トレーディングエンジンなど、ティック受信時に処理を実行したいコンポーネントにイベントを通知
asyncio.wait_for(..., timeout=1.0) でタイムアウト付きにしているのは、キューが空のときに無限に待ち続けないためです。1秒ごとにタイムアウトして continue し、_running フラグをチェックすることで、stop() が呼ばれたときにループを抜けられます。
コールバックの呼び出し部分では asyncio.iscoroutine() でコルーチンかどうかを判定しています。これにより、同期コールバックも非同期コールバックも同じ on_tick() で登録できます。
8.3 ティックのサンプリング保存
WebSocket PUSHは1秒間に数十〜数百回のメッセージが飛んできます。これを全部DBに保存するとあっという間にストレージが溢れてしまいます。
async def _maybe_save_tick(self, tick: Tick) -> None:
"""10秒間隔でティックをDBに保存"""
if not self._tick_repo:
return
last = self._last_saved.get(tick.symbol)
now = tick.timestamp
if last and (now - last).total_seconds() < TICK_SAMPLE_INTERVAL:
return
self._last_saved[tick.symbol] = now
try:
await self._tick_repo.save(
symbol=tick.symbol,
price=tick.price,
volume=tick.volume,
bid=tick.bid,
ask=tick.ask,
timestamp=tick.timestamp,
)
except Exception as e:
logger.error("ティックDB保存エラー: %s", e)
TICK_SAMPLE_INTERVAL = 10 秒間隔でサンプリングすることで、1銘柄あたり1日約3,000レコード(日通し約8.5時間)に抑えています。全ティックが必要なバックテスト用途では別途全量保存の仕組みを用意しますが、通常運用ではこのサンプリングで十分です。
_tick_repo が None の場合(DBなしで動かす場合)は何もしません。DB接続がオプショナルになっているので、開発中はDB無しでも動作確認できます。
9. 設定ファイルと環境変数
最後に、これらのコンポーネントを繋ぐ設定まわりを見ておきましょう。
config.toml
[broker]
base_url = "http://localhost:18080/kabusapi"
password = "" # 環境変数 KABU_API_PASSWORD で上書き推奨
sandbox = false
config.py(ブローカー関連部分)
# src/systrade/config.py
import os
import tomllib
from dataclasses import dataclass
from pathlib import Path
CONFIG_PATH = Path(__file__).resolve().parent.parent.parent / "config.toml"
@dataclass
class BrokerConfig:
base_url: str = "http://localhost:18080/kabusapi"
password: str = ""
sandbox: bool = False
def load_config(path: Path | None = None) -> AppConfig:
"""config.toml を読み込んで AppConfig を返す"""
path = path or CONFIG_PATH
if not path.exists():
return AppConfig()
with open(path, "rb") as f:
raw = tomllib.load(f)
broker = BrokerConfig(**raw.get("broker", {}))
# ... 他の設定 ...
cfg = AppConfig(broker=broker, ...)
# 環境変数オーバーライド
if env_pw := os.environ.get("KABU_API_PASSWORD"):
cfg.broker.password = env_pw
if env_url := os.environ.get("KABU_API_BASE_URL"):
cfg.broker.base_url = env_url
return cfg
パスワードは環境変数で管理するのがベストプラクティスです。config.toml にパスワードを直書きして Git にコミットしてしまう事故を防げます。
# .env ファイル(.gitignore に追加すること!)
KABU_API_PASSWORD=your_api_password_here
起動時の初期化フロー
すべてのコンポーネントを組み合わせて起動するフローは次のようになります。
from src.systrade.config import load_config
from src.systrade.broker.kabu_api import KabuApi
from src.systrade.broker.kabu_ws import KabuWebSocket
from src.systrade.services.market_data import MarketDataService
# 1. 設定読み込み
config = load_config()
# 2. ブローカーAPIクライアント初期化
broker = KabuApi(config.broker)
# 3. 認証
await broker.authenticate()
# 4. 監視銘柄の登録
symbol_info = await broker.get_symbol_info_future(deriv_month=0, future_code="NK225mini")
await broker.register([
{"Symbol": symbol_info.symbol, "Exchange": 2}
])
# 5. WebSocket + MarketDataService 起動
ws = KabuWebSocket()
market_data = MarketDataService(broker=broker, ws=ws)
await market_data.start()
# 6. ティック受信のコールバック登録
def on_new_tick(tick):
print(f"{tick.symbol}: {tick.price}")
market_data.on_tick(on_new_tick)
このフローがFastAPIの lifespan で実行され、アプリケーション起動と同時にリアルタイム配信が始まります。処理の流れとしては、config読み込み → KabuApi初期化 → authenticate → register → WebSocket開始 → ティック受信開始という順序です。
10. まとめと次回予告
まとめ
今回は証券APIとの接続層をまるごと構築しました。要点を整理します。
- 抽象基底クラス
Brokerでインターフェースを定義し、テスト容易性と拡張性を確保した with_retryデコレータ で指数バックオフ付きリトライを実現。5xxとタイムアウトはリトライし、4xxはリトライしない設計KabuApiでkabuステーションAPIのREST操作(認証・注文・板情報・ポジション取得)を網羅。APIの数値コードはマッピング辞書で一元管理- 注文State=5の落とし穴: 「終了」は約定・取消・失効の3通りを含むため、
DetailsのRecTypeまで見て判別する必要がある KabuWebSocketで自動再接続付きのリアルタイム配信を実現。キュー溢れ対策として古いデータを捨てて最新を優先- PUT /register でWebSocket PUSH配信を受ける銘柄を事前登録(最大50銘柄)
MarketDataServiceでREST + WebSocketを統合し、ティックのキャッシュ・サンプリング保存・コールバック配信を提供- httpx
AsyncClientでFastAPIのイベントループ内からノンブロッキングにAPI通信 - ティックデータは10秒間隔でサンプリング保存し、DBサイズの肥大化を防止
- パスワードは環境変数で管理。config.tomlには直書きしない
次回予告
第6回「トレーディングエンジン -- 司令塔の設計」では、今回作ったAPI層の上に乗るトレーディングエンジンを構築します。市場データの受信、戦略シグナルの評価、注文の発行、ポジション管理を統合する「司令塔」の設計パターンについて詳しく解説します。
お楽しみに!
最後まで読んでくださりありがとうございました! 質問やフィードバックがあれば、コメント欄でお気軽にどうぞ。