【第4回】DB設計とAlembicマイグレーション
npm install さようなら — FastAPI + htmx でトレーディングシステムを作る

このシリーズについて
こんにちは、souです。 「npm install さようなら — FastAPI + htmx でトレーディングシステムを作る」シリーズの第4回へようこそ!
本シリーズは全8回構成で、日経225先物・オプションの自動売買ダッシュボードをゼロから構築していきます。
| # | タイトル | 状態 |
|---|---|---|
| 1 | 環境構築とプロジェクト設計 | 公開済み |
| 2 | htmx でSPA風ダッシュボードを作る | 公開済み |
| 3 | ダークテーマUIとコンポーネント設計 | 公開済み |
| 4 | DB設計とAlembicマイグレーション(本記事) | 今ここ |
| 5 | 証券APIとの接続 👑 | 近日公開 |
| 6 | トレーディングエンジン — 司令塔の設計 👑 | 近日公開 |
| 7 | 戦略とリスク管理 👑 | 近日公開 |
| 8 | バックテストと本番デプロイ 👑 | 近日公開 |
👑 マークの付いた第5回以降は有料パートになります。
目次
- 今回のゴール
- なぜ SQLAlchemy ORM ではなく Core か
- なぜ SQLite か
- 非同期エンジンの初期化 — engine.py
- テーブル定義 — tables.py
- リポジトリパターンによるデータアクセス
- Alembic の async 対応マイグレーション
- まとめと次回予告
1. 今回のゴール
前回(第3回)まででダークテーマの美しいダッシュボード UI が完成しました。しかし、データはまだどこにも永続化されていません。
今回はデータベース層をまるごと構築します。具体的には以下の4つをやっていきます。
- SQLAlchemy Core + aiosqlite による非同期DBエンジンの構築
- 6つのテーブル(orders / executions / signals / daily_pnl / ticks / config_changes)の設計
- リポジトリパターンによるデータアクセス層の抽象化
- Alembic の async 対応マイグレーション設定
完成後のディレクトリ構造は次のようになります。
src/systrade/db/
├── __init__.py
├── engine.py # 非同期エンジン & セッションファクトリ
├── tables.py # SQLAlchemy Core テーブル定義
└── repositories/
├── __init__.py
├── order_repo.py # 注文・約定リポジトリ
├── signal_repo.py # シグナルリポジトリ
├── pnl_repo.py # 日次PnLリポジトリ
├── tick_repo.py # ティックリポジトリ
└── config_repo.py # 設定変更監査証跡リポジトリ
alembic/
├── env.py # async対応マイグレーション環境
├── script.py.mako # マイグレーションテンプレート
└── versions/
└── 001_initial.py # 初期スキーマ
alembic.ini # Alembic設定ファイル
DB層のアーキテクチャは、FastAPI アプリからリポジトリ層を経由し、SQLAlchemy Core と aiosqlite を通じて SQLite に接続する構成です。それでは順番に作っていきましょう!
2. なぜ SQLAlchemy ORM ではなく Core か
SQLAlchemy には2つのレイヤーがあります。
| レイヤー | 特徴 |
|---|---|
| ORM | モデルクラスをPythonオブジェクトとして扱う。リレーション管理、遅延ロードなどリッチな機能 |
| Core | SQLをPythonic に組み立てるDSL。軽量でオーバーヘッドが少ない |
トレーディングシステムでは書き込み性能が重要です。リアルタイムでティックデータが流れ込み、注文ステータスが次々に更新されます。ORM はオブジェクトの状態追跡(Identity Map)やリレーションの遅延ロードなど便利な機能を持ちますが、そのぶんオーバーヘッドがあります。
本プロジェクトでは次の理由から SQLAlchemy Core を採用しました。
- パフォーマンス: ORM のセッション管理(Identity Map)を経由しないため、INSERT/UPDATE が高速
- 明示的なSQL制御: 発行されるSQLが予測しやすく、デバッグが容易
- テーブル数が少ない: 6テーブルと小規模なので、ORM のリレーション管理機能が不要
- リポジトリパターンとの相性: データアクセスを自前のリポジトリクラスに閉じ込めるため、ORM のメリットが薄い
ここで一点注意が必要なのは、Core を使う場合でも
async_sessionmakerを使ってトランザクション管理を行うという点です。SQLAlchemy 2.0 以降は Core でもセッション経由の実行がサポートされており、トランザクションの commit/rollback を安全に制御できます。
3. なぜ SQLite か
「トレーディングシステムなのに PostgreSQL じゃないの?」と思う方もいるかもしれません。SQLite を選んだ理由を整理します。
| 観点 | SQLite | PostgreSQL |
|---|---|---|
| デプロイ | ファイル1つ。サーバー不要 | 別プロセスのDBサーバーが必要 |
| 同時接続 | WALモードで読み書き並行可能 | ネイティブで高い並行性 |
| 書き込み性能 | 単一ライターだが十分高速 | 複数ライターに対応 |
| 運用コスト | バックアップ = ファイルコピー | pg_dump / レプリケーション設定が必要 |
本システムは 単一プロセスで動く個人向けの自動売買システム です。書き込みは1プロセスからのみなので、SQLite の「単一ライター」制約は問題になりません。
さらに WALモード(Write-Ahead Logging)を有効にすることで、書き込み中でも読み取りがブロックされなくなります。ダッシュボードから最新の注文状況を閲覧しつつ、裏でティックデータを書き込む――そんなユースケースにぴったりです。
将来的にスケールアウトが必要になったら PostgreSQL に移行すればよく、その際も SQLAlchemy を使っていれば接続文字列の変更だけで済みます。この辺りが SQLAlchemy を使うメリットですね。
4. 非同期エンジンの初期化 -- engine.py
まずはDBエンジンを作りましょう。すべての起点となるファイルです。
"""Async SQLAlchemy engine & session factory"""
import logging
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from src.systrade.db.tables import metadata
logger = logging.getLogger(__name__)
_engine: AsyncEngine | None = None
async_session: async_sessionmaker | None = None
async def init_db(db_path: str = "data/systrade.db") -> None:
"""DB初期化: エンジン作成 → WALモード有効化 → テーブル作成"""
global _engine, async_session
# data/ ディレクトリ確保
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
url = f"sqlite+aiosqlite:///{db_path}"
_engine = create_async_engine(url, echo=False)
# WALモード有効化(読み書き並行性向上)
async with _engine.begin() as conn:
await conn.exec_driver_sql("PRAGMA journal_mode=WAL")
await conn.exec_driver_sql("PRAGMA busy_timeout=5000")
# テーブル作成(存在しなければ)
async with _engine.begin() as conn:
await conn.run_sync(metadata.create_all)
async_session = async_sessionmaker(_engine, expire_on_commit=False)
logger.info("DB初期化完了: %s", db_path)
async def close_db() -> None:
"""DB接続クローズ"""
global _engine, async_session
if _engine:
await _engine.dispose()
_engine = None
async_session = None
logger.info("DB接続クローズ")
ポイントを順番に見ていきましょう。
4-1. 接続文字列
url = f"sqlite+aiosqlite:///{db_path}"
sqlite+aiosqlite というドライバ指定がポイントです。aiosqlite は SQLite を非同期(asyncio)で使うためのラッパーライブラリで、内部的にスレッドプールを使って同期の sqlite3 モジュールを非同期APIで呼び出します。
4-2. WAL モードと busy_timeout
async with _engine.begin() as conn:
await conn.exec_driver_sql("PRAGMA journal_mode=WAL")
await conn.exec_driver_sql("PRAGMA busy_timeout=5000")
ここが今回のキモです。
WALモード(Write-Ahead Logging) は SQLite のジャーナルモードの一つで、デフォルトの DELETE モードとは書き込みの仕組みが異なります。通常モード(DELETE)では読み取りが書き込みをブロックしますが、WAL モードでは読み取りと書き込みを並行して行えます。両者の違いを表にまとめます。
| 項目 | DELETEモード(デフォルト) | WALモード |
|---|---|---|
| 読み取り中の書き込み | ブロックされる | 並行可能 |
| 書き込み中の読み取り | ブロックされる | 並行可能 |
| 複数ライター | 不可 | 不可(SQLiteの制約) |
| ファイル | .db のみ | .db + .db-wal + .db-shm |
トレーディングダッシュボードでは「ティックデータを書き込みながら、画面には最新の注文一覧を表示する」というシーンが頻繁に発生します。WAL モードならこの2つが互いをブロックしません。
busy_timeout=5000 は、ロック取得を5秒間リトライするという設定です。万が一書き込みが競合しても、即座にエラーにならずに待ってくれます。
4-3. テーブル自動作成
async with _engine.begin() as conn:
await conn.run_sync(metadata.create_all)
metadata.create_all は、テーブル定義ファイル(後述の tables.py)に基づいて「存在しないテーブルだけ」を CREATE します。開発中はこれでサクッとテーブルを作れるので便利です。
本番ではこの自動作成の代わりに Alembic マイグレーションを使いますが、開発環境でDBファイルを消してやり直す場面では重宝します。
4-4. セッションファクトリ
async_session = async_sessionmaker(_engine, expire_on_commit=False)
expire_on_commit=False を指定しているのは、commit 後にカラムの値を再度読み込まないようにするためです。Core を使う場合はオブジェクトの状態追跡が不要なので、この設定がパフォーマンス上有利になります。
5. テーブル定義 -- tables.py
次はテーブル定義です。SQLAlchemy Core では Table オブジェクトでスキーマを宣言します。
"""SQLAlchemy Core テーブル定義"""
from sqlalchemy import (
Column,
Date,
DateTime,
Float,
Index,
Integer,
MetaData,
String,
Table,
Text,
)
metadata = MetaData()
orders = Table(
"orders",
metadata,
Column("id", String(64), primary_key=True),
Column("symbol", String(32), nullable=False),
Column("side", String(8), nullable=False),
Column("qty", Integer, nullable=False),
Column("order_type", String(16), nullable=False),
Column("status", String(16), nullable=False),
Column("price", Float, nullable=True),
Column("filled_qty", Integer, nullable=False, default=0),
Column("filled_price", Float, nullable=False, default=0.0),
Column("trade_type", String(8), nullable=False),
Column("strategy_name", String(64), nullable=False, default=""),
Column("created_at", DateTime, nullable=False),
Column("updated_at", DateTime, nullable=False),
Column("broker_order_id", String(64), nullable=False, default=""),
Column("message", Text, nullable=False, default=""),
Index("ix_orders_symbol", "symbol"),
Index("ix_orders_status", "status"),
)
executions = Table(
"executions",
metadata,
Column("id", String(64), primary_key=True),
Column("order_id", String(64), nullable=False),
Column("symbol", String(32), nullable=False),
Column("side", String(8), nullable=False),
Column("qty", Integer, nullable=False),
Column("price", Float, nullable=False),
Column("executed_at", DateTime, nullable=False),
Index("ix_executions_order_id", "order_id"),
)
signals = Table(
"signals",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("time", DateTime, nullable=False),
Column("signal_type", String(8), nullable=False),
Column("symbol", String(32), nullable=False),
Column("side", String(8), nullable=False),
Column("trade_type", String(8), nullable=False),
Column("reason", Text, nullable=False, default=""),
Column("strategy", String(64), nullable=False, default=""),
Index("ix_signals_time", "time"),
)
daily_pnl = Table(
"daily_pnl",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("date", Date, nullable=False, unique=True),
Column("realized_pnl", Float, nullable=False, default=0.0),
Column("unrealized_pnl", Float, nullable=False, default=0.0),
Column("peak_pnl_json", Text, nullable=False, default="{}"),
Column("updated_at", DateTime, nullable=False),
Index("ix_daily_pnl_date", "date", unique=True),
)
ticks = Table(
"ticks",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("symbol", String(32), nullable=False),
Column("price", Float, nullable=False),
Column("volume", Integer, nullable=False, default=0),
Column("bid", Float, nullable=False, default=0.0),
Column("ask", Float, nullable=False, default=0.0),
Column("timestamp", DateTime, nullable=False),
Index("ix_ticks_symbol", "symbol"),
Index("ix_ticks_timestamp", "timestamp"),
)
config_changes = Table(
"config_changes",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("changed_at", DateTime, nullable=False),
Column("section", String(64), nullable=False),
Column("field", String(64), nullable=False),
Column("old_value", Text, nullable=False, default=""),
Column("new_value", Text, nullable=False, default=""),
Column("source", String(32), nullable=False, default="dashboard"),
)
テーブルが6つあるので、それぞれの役割を整理しましょう。
5-1. テーブル一覧と設計意図
| テーブル | 用途 | 主キー |
|---|---|---|
orders |
注文の管理(新規・返済) | id (UUID文字列) |
executions |
約定の記録 | id (UUID文字列) |
signals |
売買シグナルのログ | id (自動採番) |
daily_pnl |
日次の損益サマリ | id (自動採番) |
ticks |
ティック(価格)データ | id (自動採番) |
config_changes |
設定変更の監査証跡 | id (自動採番) |
5-2. orders テーブルの詳細
注文テーブルは一番カラムが多いので、もう少し詳しく見てみましょう。
orders
├── id : String(64) -- UUIDで一意に識別
├── symbol : String(32) -- 銘柄コード(例: "NK225F")
├── side : String(8) -- "buy" or "sell"
├── qty : Integer -- 注文数量
├── order_type : String(16) -- "market"(成行) or "limit"(指値)
├── status : String(16) -- pending → submitted → filled/cancelled
├── price : Float? -- 指値価格(成行なら NULL)
├── filled_qty : Integer -- 約定済み数量
├── filled_price : Float -- 約定平均価格
├── trade_type : String(8) -- "new"(新規) or "close"(返済)
├── strategy_name : String(64) -- どの戦略が発注したか
├── created_at : DateTime -- 注文作成日時
├── updated_at : DateTime -- 最終更新日時
├── broker_order_id : String(64) -- 証券会社側の注文ID
└── message : Text -- エラーメッセージ等
side や status を Enum 型ではなく String にしているのは、SQLite が Enum 型をネイティブでサポートしていないためです。アプリケーション側では Python の Enum クラスで型安全に管理し、DB保存時に .value で文字列に変換しています(後述のリポジトリで確認できます)。
5-3. インデックス設計
各テーブルにはクエリパターンに合わせたインデックスを設定しています。
Index("ix_orders_symbol", "symbol"), # 銘柄での絞り込み
Index("ix_orders_status", "status"), # ステータスでの絞り込み
例えば「status = 'submitted' の注文一覧を取得する」というクエリが頻繁に実行されるため、status カラムにインデックスを張っています。
ticks テーブルには symbol と timestamp の両方にインデックスがあります。「直近N分の特定銘柄のティックデータ」を高速に取得するためですね。
5-4. daily_pnl テーブルのJSON列
Column("peak_pnl_json", Text, nullable=False, default="{}"),
peak_pnl_json は JSON 文字列を TEXT 型に格納しています。ここにはトレーリングストップ用のピーク損益情報が入ります。SQLite は JSON 関数もサポートしていますが、今回はアプリケーション側で json.dumps / json.loads を使ってシリアライズしています。
なぜ JSON カラムか? ピーク損益は戦略ごとにキーが異なる可能性があり、スキーマを固定しにくいためです。柔軟性を持たせるために JSON を採用しました。
5-5. config_changes テーブル -- 監査証跡
config_changes = Table(
"config_changes",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("changed_at", DateTime, nullable=False),
Column("section", String(64), nullable=False),
Column("field", String(64), nullable=False),
Column("old_value", Text, nullable=False, default=""),
Column("new_value", Text, nullable=False, default=""),
Column("source", String(32), nullable=False, default="dashboard"),
)
これはちょっとユニークなテーブルです。ダッシュボードから設定(config.toml に対応するパラメータ)を変更したとき、何をいつ変えたかを記録します。
トレーディングシステムでは「あのとき損切りパーセンテージを何%にしていたか」を後から確認したいことがよくあります。source カラムで「ダッシュボード経由の変更」なのか「API経由の変更」なのかも区別できるようにしています。
6. リポジトリパターンによるデータアクセス
6-1. リポジトリパターンとは
リポジトリパターンは、データベースへのアクセスロジックを専用のクラスに閉じ込める設計パターンです。ビジネスロジック層がリポジトリを介してDB層にアクセスする3層構造になっています。
FastAPI ルーター
↓ 呼び出し
トレーディングエンジン(ビジネスロジック)
↓ 呼び出し
リポジトリ(データアクセス) ← ★ ここを今回作る
↓ SQL実行
SQLAlchemy Core → aiosqlite → SQLite
このパターンのメリットは以下の通りです。
- テスト容易性: リポジトリをモックに差し替えれば、DBなしでビジネスロジックをテストできる
- 関心の分離: SQL の組み立てがリポジトリに閉じるので、ビジネスロジックがスッキリする
- DB移行の容易さ: 将来 PostgreSQL に乗り換える場合、リポジトリの内部実装だけ変更すればよい
6-2. OrderRepository -- 注文・約定リポジトリ
まずは一番重要な注文リポジトリから見ていきます。
"""注文・約定リポジトリ"""
import logging
from datetime import datetime
from sqlalchemy import insert, update
from sqlalchemy.ext.asyncio import async_sessionmaker
from src.systrade.db.tables import executions, orders
from src.systrade.models.order import Execution, Order
logger = logging.getLogger(__name__)
class OrderRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
async def save_order(self, order: Order) -> None:
"""注文をupsert"""
async with self._sf() as session:
# INSERT OR REPLACE (SQLite)
stmt = insert(orders).values(
id=order.id,
symbol=order.symbol,
side=order.side.value,
qty=order.qty,
order_type=order.order_type.value,
status=order.status.value,
price=order.price,
filled_qty=order.filled_qty,
filled_price=order.filled_price,
trade_type=order.trade_type.value,
strategy_name=order.strategy_name,
created_at=order.created_at,
updated_at=order.updated_at,
broker_order_id=order.broker_order_id,
message=order.message,
).prefix_with("OR REPLACE")
await session.execute(stmt)
await session.commit()
async def update_order(self, order: Order) -> None:
"""注文ステータス更新"""
async with self._sf() as session:
stmt = (
update(orders)
.where(orders.c.id == order.id)
.values(
status=order.status.value,
filled_qty=order.filled_qty,
filled_price=order.filled_price,
updated_at=order.updated_at,
message=order.message,
)
)
await session.execute(stmt)
await session.commit()
async def save_execution(self, execution: Execution) -> None:
"""約定を保存"""
async with self._sf() as session:
stmt = insert(executions).values(
id=execution.id,
order_id=execution.order_id,
symbol=execution.symbol,
side=execution.side.value,
qty=execution.qty,
price=execution.price,
executed_at=execution.executed_at,
).prefix_with("OR IGNORE")
await session.execute(stmt)
await session.commit()
いくつかポイントを解説します。
コンストラクタで session_factory を受け取る
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
リポジトリは async_sessionmaker を注入されて使います。これにより、テスト時にインメモリDBのセッションファクトリを渡すといったことが簡単にできます。
INSERT OR REPLACE による Upsert
stmt = insert(orders).values(...).prefix_with("OR REPLACE")
SQLite 固有の構文ですが、INSERT OR REPLACE は主キーが重複した場合に既存行を DELETE → INSERT します。注文の初回保存と更新保存を一つのメソッドで処理できるので便利です。
Enum の .value 変換
side=order.side.value, # Side.BUY → "buy"
order_type=order.order_type.value, # OrderType.MARKET → "market"
Python 側では Side.BUY のように Enum で扱い、DB に保存する際に .value で文字列に変換しています。これにより、アプリケーション層では型安全なコードを書きつつ、DB層ではシンプルな文字列で格納できます。
約定保存の OR IGNORE
stmt = insert(executions).values(...).prefix_with("OR IGNORE")
約定データは証券APIからのWebSocket通知で受け取りますが、同じ約定が複数回通知されることがあります。OR IGNORE を使うことで、重複した約定IDの INSERT を静かに無視できます。
6-3. SignalRepository -- シグナルリポジトリ
"""シグナルリポジトリ"""
import logging
from datetime import datetime
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from src.systrade.db.tables import signals
logger = logging.getLogger(__name__)
class SignalRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
async def save(
self,
*,
time: datetime,
signal_type: str,
symbol: str,
side: str,
trade_type: str,
reason: str = "",
strategy: str = "",
) -> None:
"""シグナルを保存"""
async with self._sf() as session:
stmt = insert(signals).values(
time=time,
signal_type=signal_type,
symbol=symbol,
side=side,
trade_type=trade_type,
reason=reason,
strategy=strategy,
)
await session.execute(stmt)
await session.commit()
シグナルリポジトリはシンプルな INSERT のみです。* を使ったキーワード専用引数にしているのがポイントで、呼び出し側で引数名を明示的に指定する必要があるため、うっかりパラメータを間違える心配がありません。
# 呼び出し例
await signal_repo.save(
time=datetime.now(),
signal_type="entry",
symbol="NK225F",
side="buy",
trade_type="new",
reason="SMA crossover detected",
strategy="futures_trend",
)
6-4. PnLRepository -- 日次損益リポジトリ
"""日次PnLリポジトリ"""
import json
import logging
from datetime import date, datetime
from sqlalchemy import insert, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker
from src.systrade.db.tables import daily_pnl
logger = logging.getLogger(__name__)
class PnLRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
async def upsert_daily(
self,
*,
target_date: date | None = None,
realized_pnl: float = 0.0,
unrealized_pnl: float = 0.0,
peak_pnl: dict[str, float] | None = None,
) -> None:
"""日次PnLをupsert"""
target_date = target_date or date.today()
peak_json = json.dumps(peak_pnl or {})
now = datetime.now()
async with self._sf() as session:
# 既存レコードチェック
row = await session.execute(
select(daily_pnl.c.id).where(daily_pnl.c.date == target_date)
)
existing = row.scalar_one_or_none()
if existing:
stmt = (
update(daily_pnl)
.where(daily_pnl.c.id == existing)
.values(
realized_pnl=realized_pnl,
unrealized_pnl=unrealized_pnl,
peak_pnl_json=peak_json,
updated_at=now,
)
)
else:
stmt = insert(daily_pnl).values(
date=target_date,
realized_pnl=realized_pnl,
unrealized_pnl=unrealized_pnl,
peak_pnl_json=peak_json,
updated_at=now,
)
await session.execute(stmt)
await session.commit()
async def get_today(self) -> dict | None:
"""本日のPnLレコードを取得"""
async with self._sf() as session:
row = await session.execute(
select(daily_pnl).where(daily_pnl.c.date == date.today())
)
result = row.mappings().first()
if not result:
return None
return {
"realized_pnl": result["realized_pnl"],
"unrealized_pnl": result["unrealized_pnl"],
"peak_pnl": json.loads(result["peak_pnl_json"]),
}
PnL リポジトリは SELECT → 条件分岐 → INSERT or UPDATE という「手動 upsert」パターンを使っています。
daily_pnl テーブルは1日に1レコードなので、「今日のレコードがあれば更新、なければ新規作成」というロジックです。orders テーブルのように INSERT OR REPLACE でもよいのですが、PnL の場合は date が UNIQUE 制約で、主キーの id とは別なため、明示的に SELECT で存在チェックしています。
get_today() メソッドでは row.mappings().first() を使っています。これは SQLAlchemy 2.0 の Core API で、結果を辞書ライクなオブジェクトとして取得する方法です。ORM のモデルクラスを使わない Core ならではのアプローチですね。
6-5. TickRepository -- ティックリポジトリ
"""ティックリポジトリ(サンプリング保存)"""
import logging
from datetime import datetime
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from src.systrade.db.tables import ticks
logger = logging.getLogger(__name__)
class TickRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
async def save(
self,
*,
symbol: str,
price: float,
volume: int = 0,
bid: float = 0.0,
ask: float = 0.0,
timestamp: datetime | None = None,
) -> None:
"""ティックを保存"""
async with self._sf() as session:
stmt = insert(ticks).values(
symbol=symbol,
price=price,
volume=volume,
bid=bid,
ask=ask,
timestamp=timestamp or datetime.now(),
)
await session.execute(stmt)
await session.commit()
ティックリポジトリは INSERT のみのシンプルな設計です。
ファイル名のコメントに「サンプリング保存」とあるのは、すべてのティックを保存するわけではなく、一定間隔でサンプリングしたデータのみを保存する想定だからです。日経225先物は取引時間中に大量のティックが発生しますが、すべてを SQLite に入れるとファイルサイズが膨大になります。実際のサンプリングロジックはティックリポジトリの呼び出し側(トレーディングエンジン)で制御します。
6-6. ConfigChangeRepository -- 設定変更監査証跡
"""設定変更監査証跡リポジトリ"""
import logging
from datetime import datetime
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from src.systrade.db.tables import config_changes
logger = logging.getLogger(__name__)
class ConfigChangeRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
async def log_change(
self,
*,
section: str,
field: str,
old_value: str,
new_value: str,
source: str = "dashboard",
) -> None:
"""設定変更を記録"""
async with self._sf() as session:
stmt = insert(config_changes).values(
changed_at=datetime.now(),
section=section,
field=field,
old_value=old_value,
new_value=new_value,
source=source,
)
await session.execute(stmt)
await session.commit()
使い方のイメージとしてはこんな感じです。
# ダッシュボードで損切りパーセンテージを変更したとき
await config_repo.log_change(
section="risk",
field="stop_loss_pct",
old_value="3.0",
new_value="2.5",
source="dashboard",
)
config.toml の [risk] セクションの stop_loss_pct を 3.0 から 2.5 に変更した、という履歴が残ります。あとから「いつ何を変えたか」を追跡できるので、トレードの振り返りに役立ちます。
6-7. リポジトリの共通パターン
ここまで5つのリポジトリを見てきましたが、共通パターンが見えてきたと思います。
class XxxRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._sf = session_factory
async def some_method(self, ...) -> None:
async with self._sf() as session:
stmt = insert(some_table).values(...)
await session.execute(stmt)
await session.commit()
すべてのリポジトリが以下のルールに従っています。
- コンストラクタで
async_sessionmakerを受け取る -- 依存性注入 - 各メソッドで
async with self._sf() as session-- メソッド単位でセッションを管理 - 明示的な
await session.commit()-- 自動コミットに頼らない - キーワード専用引数 (
*) -- 呼び出し側の可読性を確保
このパターンを守ることで、リポジトリが増えてもコードの一貫性が保たれます。
7. Alembic の async 対応マイグレーション
metadata.create_all は開発中に便利ですが、本番運用ではスキーマの変更履歴を管理したいですよね。そこで Alembic を使います。
Alembic は SQLAlchemy 公式のマイグレーションツールで、Rails でいう db:migrate のようなものです。
7-1. Alembic のセットアップ
まずは必要なパッケージがインストールされていることを確認しましょう。pyproject.toml に以下が含まれています。
dependencies = [
# ...
"sqlalchemy>=2.0.0",
"aiosqlite>=0.20.0",
"greenlet>=3.0.0",
"alembic>=1.13.0",
]
greenlet は SQLAlchemy の async 機能で内部的に使われるパッケージです。明示的に依存に入れておくと安心です。
Alembic の初期化は以下のコマンドで行います。
# Alembic初期化(asyncテンプレート)
alembic init -t async alembic
ここで一点注意。通常の
alembic init alembicではなく、-t asyncテンプレートを指定しています。これにより、async対応のenv.pyが生成されます。
7-2. alembic.ini -- 設定ファイル
[alembic]
script_location = alembic
sqlalchemy.url = sqlite+aiosqlite:///data/systrade.db
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
sqlalchemy.url に sqlite+aiosqlite:///data/systrade.db を指定しています。engine.py と同じドライバ・パスを使う点がポイントです。
7-3. env.py -- 非同期マイグレーション環境
ここが Alembic の async 対応で最も重要なファイルです。
"""Alembic async migration environment"""
import asyncio
import os
import sys
from logging.config import fileConfig
from pathlib import Path
# プロジェクトルートをsys.pathに追加
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from alembic import context
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from src.systrade.db.tables import metadata as target_metadata
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# 環境変数でDB pathをオーバーライド
db_path = os.environ.get("DATABASE_PATH", "data/systrade.db")
config.set_main_option("sqlalchemy.url", f"sqlite+aiosqlite:///{db_path}")
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode (async)."""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
順番にポイントを解説します。
sys.path の追加
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
Alembic はプロジェクトルートから実行されますが、src.systrade.db.tables をインポートするためにプロジェクトルートを sys.path に追加しています。
target_metadata の指定
from src.systrade.db.tables import metadata as target_metadata
tables.py で定義した metadata を target_metadata として渡すことで、Alembic の --autogenerate がテーブル定義の差分を検出できるようになります。
環境変数によるDBパスのオーバーライド
db_path = os.environ.get("DATABASE_PATH", "data/systrade.db")
config.set_main_option("sqlalchemy.url", f"sqlite+aiosqlite:///{db_path}")
alembic.ini に書かれた接続先を、環境変数 DATABASE_PATH でオーバーライドできるようにしています。テスト用に別のDBファイルを使いたい場合などに便利です。
async エンジンの利用
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
async_engine_from_config で非同期エンジンを作り、connection.run_sync でマイグレーション関数を同期的に実行しています。Alembic のマイグレーション自体は同期関数(upgrade() / downgrade())ですが、エンジンの接続が非同期なので、この run_sync ブリッジが必要になります。
poolclass=pool.NullPool を指定しているのは、マイグレーション実行時にはコネクションプールが不要なためです。
7-4. 初期マイグレーション
マイグレーションファイルの自動生成は以下のコマンドで行います。
# マイグレーションファイルの自動生成
alembic revision --autogenerate -m "Initial schema"
本プロジェクトでは 001_initial.py として手動で作成しています。
"""Initial schema
Revision ID: 001
Revises:
Create Date: 2025-01-01 00:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "001"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"orders",
sa.Column("id", sa.String(64), primary_key=True),
sa.Column("symbol", sa.String(32), nullable=False),
sa.Column("side", sa.String(8), nullable=False),
sa.Column("qty", sa.Integer, nullable=False),
sa.Column("order_type", sa.String(16), nullable=False),
sa.Column("status", sa.String(16), nullable=False),
sa.Column("price", sa.Float, nullable=True),
sa.Column("filled_qty", sa.Integer, nullable=False, server_default="0"),
sa.Column("filled_price", sa.Float, nullable=False, server_default="0.0"),
sa.Column("trade_type", sa.String(8), nullable=False),
sa.Column("strategy_name", sa.String(64), nullable=False, server_default=""),
sa.Column("created_at", sa.DateTime, nullable=False),
sa.Column("updated_at", sa.DateTime, nullable=False),
sa.Column("broker_order_id", sa.String(64), nullable=False, server_default=""),
sa.Column("message", sa.Text, nullable=False, server_default=""),
)
op.create_index("ix_orders_symbol", "orders", ["symbol"])
op.create_index("ix_orders_status", "orders", ["status"])
op.create_table(
"executions",
sa.Column("id", sa.String(64), primary_key=True),
sa.Column("order_id", sa.String(64), nullable=False),
sa.Column("symbol", sa.String(32), nullable=False),
sa.Column("side", sa.String(8), nullable=False),
sa.Column("qty", sa.Integer, nullable=False),
sa.Column("price", sa.Float, nullable=False),
sa.Column("executed_at", sa.DateTime, nullable=False),
)
op.create_index("ix_executions_order_id", "executions", ["order_id"])
op.create_table(
"signals",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("time", sa.DateTime, nullable=False),
sa.Column("signal_type", sa.String(8), nullable=False),
sa.Column("symbol", sa.String(32), nullable=False),
sa.Column("side", sa.String(8), nullable=False),
sa.Column("trade_type", sa.String(8), nullable=False),
sa.Column("reason", sa.Text, nullable=False, server_default=""),
sa.Column("strategy", sa.String(64), nullable=False, server_default=""),
)
op.create_index("ix_signals_time", "signals", ["time"])
op.create_table(
"daily_pnl",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("date", sa.Date, nullable=False, unique=True),
sa.Column("realized_pnl", sa.Float, nullable=False, server_default="0.0"),
sa.Column("unrealized_pnl", sa.Float, nullable=False, server_default="0.0"),
sa.Column("peak_pnl_json", sa.Text, nullable=False, server_default="{}"),
sa.Column("updated_at", sa.DateTime, nullable=False),
)
op.create_index("ix_daily_pnl_date", "daily_pnl", ["date"], unique=True)
op.create_table(
"ticks",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("symbol", sa.String(32), nullable=False),
sa.Column("price", sa.Float, nullable=False),
sa.Column("volume", sa.Integer, nullable=False, server_default="0"),
sa.Column("bid", sa.Float, nullable=False, server_default="0.0"),
sa.Column("ask", sa.Float, nullable=False, server_default="0.0"),
sa.Column("timestamp", sa.DateTime, nullable=False),
)
op.create_index("ix_ticks_symbol", "ticks", ["symbol"])
op.create_index("ix_ticks_timestamp", "ticks", ["timestamp"])
op.create_table(
"config_changes",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("changed_at", sa.DateTime, nullable=False),
sa.Column("section", sa.String(64), nullable=False),
sa.Column("field", sa.String(64), nullable=False),
sa.Column("old_value", sa.Text, nullable=False, server_default=""),
sa.Column("new_value", sa.Text, nullable=False, server_default=""),
sa.Column("source", sa.String(32), nullable=False, server_default="dashboard"),
)
def downgrade() -> None:
op.drop_table("config_changes")
op.drop_table("ticks")
op.drop_table("daily_pnl")
op.drop_table("signals")
op.drop_table("executions")
op.drop_table("orders")
この辺りは tables.py の定義内容をそのままマイグレーションに落としたものです。
注目してほしいのは default と server_default の違いです。
| 指定場所 | 意味 |
|---|---|
tables.py の default=0 |
Python アプリケーション側でデフォルト値を設定 |
マイグレーションの server_default="0" |
DB(SQLite)側でデフォルト値を設定 |
tables.py では default(アプリ側デフォルト)を使い、マイグレーションでは server_default(DB側デフォルト)を使っています。マイグレーションで server_default を指定しておくと、SQLAlchemy を経由せずに直接 SQL を叩いた場合でもデフォルト値が効きます。
7-5. マイグレーションの実行
# マイグレーション実行(最新まで適用)
alembic upgrade head
# 現在のリビジョン確認
alembic current
# マイグレーション履歴の確認
alembic history
# 1つ前に戻す
alembic downgrade -1
実行すると、INFO レベルでマイグレーション適用のログが表示され、各テーブルが作成されたことを確認できます。
7-6. 新しいカラムを追加する場合
将来的にテーブルにカラムを追加したくなった場合は、以下の手順で行います。
# 1. tables.py にカラムを追加
# 2. マイグレーションファイルを自動生成
alembic revision --autogenerate -m "Add xxx column to orders"
# 3. 生成されたファイルを確認・修正
# 4. マイグレーション実行
alembic upgrade head
この辺りは --autogenerate が tables.py の metadata と実際のDB スキーマを比較して差分を検出してくれるので、非常に楽です。ただし SQLite の場合、ALTER TABLE でカラムの削除やリネームが制限される点には注意が必要です。Alembic はそのような場合にテーブルの再作成(batch mode)で対応しますが、データ量が多いテーブルでは時間がかかることがあります。
7-7. マイグレーションテンプレート
新しいマイグレーションファイルが生成される際のテンプレートも確認しておきましょう。
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
alembic revision --autogenerate を実行すると、このテンプレートを元にマイグレーションファイルが生成されます。${upgrades} と ${downgrades} の部分に、検出された差分のコードが自動的に埋め込まれます。
8. まとめと次回予告
まとめ
今回のポイントを整理します。
- SQLAlchemy Core を採用し、ORM のオーバーヘッドを排除。トレーディングシステムに必要な書き込み性能を確保した
- aiosqlite で SQLite を非同期対応に。FastAPI の async ルーターとシームレスに連携できる
- WAL モード を有効化して読み書きの並行性を向上。ダッシュボード表示とデータ書き込みが互いをブロックしない
- 6つのテーブル(orders, executions, signals, daily_pnl, ticks, config_changes)を設計。トレーディングシステムの基本的なデータモデルをカバー
- リポジトリパターンでデータアクセスを抽象化。テスト容易性・保守性・DB移行の容易さを確保
- Alembic の async 対応で、aiosqlite ドライバでもマイグレーションが実行できるようにした
INSERT OR REPLACEやINSERT OR IGNOREなど、SQLite 固有の機能もリポジトリ内に閉じ込めて利用
次回予告: 第5回「証券APIとの接続」
次回からはいよいよ有料パートに入ります!
第5回では、kabuステーションAPI(auカブコム証券の REST / WebSocket API)と接続し、リアルタイムの板情報取得・注文発注・約定通知受信を実装します。今回作ったリポジトリ層が、実際の注文データや約定データを保存する「受け皿」として活躍します。
次回の内容(予定):
- kabuステーションAPIのトークン認証
- REST API での注文発注・キャンセル
- WebSocket での板情報・約定通知のリアルタイム受信
- 非同期 HTTP クライアント(httpx)の活用
:::message ここから先(第5回〜第8回)は有料パートになります。 証券APIとの接続、トレーディングエンジンの設計、戦略実装、バックテスト・デプロイまで、実践的な内容を詳しく解説していきます。 興味のある方はぜひ購読してください! :::
最後まで読んでくださりありがとうございました! DB層ができたことで、いよいよアプリケーションの「骨格」が完成しました。次回からは外部の証券APIと繋がり、リアルなデータが流れ始めます。楽しみにしていてください😆