Background: MLOpsインフラストラクチャ
デプロイされた瞬間から、モデルは劣化を始めます。MLOpsはオプションの高度な設定ではありません。Quant戦略を生き続けさせるインフラストラクチャです。
「モデルが動く」から「モデルが役立つ」へ
2023年、あるQuant研究者がモメンタム戦略モデルを完成させた:
- バックテストシャープ: 1.8
- IC平均: 0.04
- クリーンなコード、テスト合格
彼は興奮して本番環境にデプロイした。
3ヶ月後:
- 1ヶ月目: シャープ1.2(「市場状況」)
- 2ヶ月目: シャープ0.4(「もっと観察しよう」)
- 3ヶ月目: シャープ-0.3(「モデルが壊れている?」)
何が起きたのか?
調査の結果判明:
- 本番環境の特徴量計算コードがバックテストと異なっていた。バグによりRSIが1日ずれていた
- モデルバージョン管理が混乱していた。実際に実行されているバージョンが不明
- 特徴量スナップショットとモデル入力が保存されていなかったため、問題を追跡する方法がなかった
- 問題が発見された時には、いつから始まったか誰も分からなかった
教訓: モデル開発は始まりに過ぎません。再現性、バージョン管理、ドリフト監視が本番システムの核心です。これがMLOpsです。
1. なぜQuantにMLOpsが必要か
Quant固有の課題
| 従来のML | Quant ML |
|---|---|
| デプロイ後モデルは比較的安定 | 市場構造は絶えず変化、モデルは必然的に劣化 |
| データ分布は比較的固定 | 金融データは高度に非定常 |
| モデルエラーはユーザー体験に影響 | モデルエラーは直接的に資本損失を引き起こす |
| オフラインバッチ予測で許容 | リアルタイム推論が必要、レイテンシに敏感 |
| 特徴量は安定したデータソースから来る | 特徴量は複数ベンダーから、遅延や欠損の可能性 |
MLOpsの三本柱
Quant MLOps = Feature Store + Model Registry + Drift Monitor
機能:
1. Feature Store -> バックテストとライブの特徴量一貫性を保証(再現性)
2. Model Registry -> モデルバージョンとパフォーマンスを追跡(監査可能性)
3. Drift Monitor -> モデル劣化を検出(タイムリーなストップロス)
2. Feature Store
コア問題: Point-in-Time正確性
Quantにおける最も陰湿なバグはルックアヘッドバイアスです。
誤った例(ルックアヘッドバイアス):
2024-01-15トレーニングサンプル:
特徴量: RSI = 65(2024-01-15終値を使用して計算)
ラベル: 翌日のリターン
問題:
2024-01-15終値は16:00まで分からない
しかしRSI計算でこの値を使用した
-> モデルは「未来の情報」から学習
正しいアプローチ:
2024-01-15トレーニングサンプル:
特徴量: 2024-01-14終値を使用して計算したRSI
ラベル: 2024-01-15から2024-01-16のリターン
Feature Storeのコア能力はPoint-in-Timeクエリの保証です: 任意の過去のタイムスタンプを与えると、その時点で既知だった特徴量値を返します。
デュアルタイムスタンプ設計
特徴量イベントテーブル (feature_events):
+--------------+---------------+-----------------+-----------------+---------+
| entity_key | feature_name | event_time | ingest_time | value |
+--------------+---------------+-----------------+-----------------+---------+
| AAPL.NASDAQ | momentum_5d | 2024-01-15 | 2024-01-15 20:00 | 0.035 |
| AAPL.NASDAQ | rsi_14 | 2024-01-15 | 2024-01-15 20:00 | 62.5 |
+--------------+---------------+-----------------+-----------------+---------+
2つのタイムスタンプの意味:
- event_time: 特徴量が対応するビジネス時間(例: 「これは2024-01-15のRSI」)
- ingest_time: 特徴量がシステムに書き込まれた時刻(例: 「20:00に計算」)
Point-in-Timeクエリルール:
WHERE event_time <= as_of_time AND ingest_time <= as_of_time
なぜ2つのタイムスタンプが必要か?
シナリオ: 2024-01-16 09:30の取引決定をバックテスト
event_timeのみ使用の場合:
クエリ: event_time <= '2024-01-16 09:30'
event_time='2024-01-15'だがingest_time='2024-01-16 22:00'のデータを返す可能性
-> ルックアヘッドバイアス!
正しいデュアルタイムスタンプクエリ:
クエリ: event_time <= '2024-01-16 09:30' AND ingest_time <= '2024-01-16 09:30'
その時点で実際に利用可能だった特徴量のみを返す
データベース設計(TimescaleDB)
-- TimescaleDBは時系列データに最適化されたPostgreSQL拡張
CREATE TABLE IF NOT EXISTS feature_events (
entity_key TEXT NOT NULL, -- 例: 'AAPL.NASDAQ'
feature_name TEXT NOT NULL, -- 特徴量名
feature_version INT NOT NULL DEFAULT 1, -- バージョン(計算ロジック変更時にインクリメント)
event_time TIMESTAMPTZ NOT NULL, -- ビジネス時間
value_double DOUBLE PRECISION, -- 数値特徴量
value_json JSONB, -- 複雑な特徴量(ベクトルなど)
ingest_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- トレーサビリティ
producer TEXT, -- プロデューサー(例: 'momentum_job')
producer_version TEXT, -- コードバージョン(git SHA)
run_id TEXT, -- ジョブID
PRIMARY KEY (entity_key, feature_name, feature_version, event_time)
);
-- ハイパーテーブルに変換して自動パーティショニング
SELECT create_hypertable('feature_events', 'event_time', if_not_exists => TRUE);
-- 最新特徴量クエリ最適化
CREATE INDEX IF NOT EXISTS idx_feature_events_latest
ON feature_events (entity_key, feature_name, feature_version, event_time DESC);
-- 圧縮ポリシー(7日後に圧縮、90%+のスペース節約)
ALTER TABLE feature_events SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'entity_key, feature_name, feature_version',
timescaledb.compress_orderby = 'event_time DESC'
);
SELECT add_compression_policy('feature_events', INTERVAL '7 days');
Python実装
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
@dataclass
class FeatureValue:
"""クエリから返される特徴量値"""
entity_key: str
feature_name: str
feature_version: int
event_time: datetime
value: float | dict[str, Any]
class FeatureStore:
"""
TimescaleDBベースのFeature Store
コア機能:
1. write_features: 特徴量を書き込む
2. get_latest: 最新の特徴量値を取得
3. get_point_in_time: バッチPoint-in-Timeクエリ(トレーニングセット構築用)
"""
def __init__(self, conninfo: str, producer: str | None = None):
self._conninfo = conninfo
self._producer = producer
def write_features(
self,
entity_key: str,
timestamp: datetime,
features: dict[str, float],
*,
feature_version: int = 1,
availability_lag: timedelta | None = None,
) -> int:
"""
特徴量値を書き込む
Args:
entity_key: エンティティ識別子(例: 'AAPL.NASDAQ')
timestamp: 特徴量のビジネス時間(event_time)
features: 特徴量辞書 {feature_name: value}
feature_version: 特徴量バージョン(計算ロジック変更時にインクリメント)
availability_lag: データ利用可能性遅延(バックフィル用)
特徴量がT+1でのみ利用可能な場合、availability_lag=timedelta(days=1)
これによりingest_time = event_time + 1日となる
Returns:
書き込まれた特徴量の数
"""
if not features:
return 0
# ingest_timeを計算
ingest_time = datetime.now()
if availability_lag is not None:
ingest_time = timestamp + availability_lag
# バッチ挿入を構築(ON CONFLICTで冪等性を保証)
sql = """
INSERT INTO feature_events
(entity_key, feature_name, feature_version, event_time, value_double, ingest_time, producer)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (entity_key, feature_name, feature_version, event_time) DO NOTHING
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
for name, value in features.items():
cur.execute(sql, [
entity_key, name, feature_version,
timestamp, float(value), ingest_time, self._producer
])
conn.commit()
return len(features)
def get_latest(
self,
entity_key: str,
feature_names: list[str] | None = None,
*,
as_of: datetime | None = None,
) -> dict[str, FeatureValue]:
"""
エンティティの最新特徴量値を取得
Args:
entity_key: エンティティ識別子
feature_names: クエリする特徴量リスト(Noneですべて)
as_of: Point-in-Timeタイムスタンプ(None で現在)
Returns:
{feature_name: FeatureValue}
"""
# キー: デュアルタイムスタンプフィルタリング
sql = """
SELECT DISTINCT ON (feature_name, feature_version)
feature_name, feature_version, value_double, event_time
FROM feature_events
WHERE entity_key = %s
AND feature_version = 1
"""
params = [entity_key]
# Point-in-Timeフィルター
if as_of is not None:
sql += " AND event_time <= %s AND ingest_time <= %s"
params.extend([as_of, as_of])
# 特徴量名フィルター
if feature_names:
sql += " AND feature_name = ANY(%s)"
params.append(feature_names)
sql += " ORDER BY feature_name, feature_version, event_time DESC"
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
rows = cur.fetchall()
return {
row[0]: FeatureValue(
entity_key=entity_key,
feature_name=row[0],
feature_version=row[1],
event_time=row[3],
value=row[2],
)
for row in rows
}
def get_point_in_time(
self,
entity_times: list[tuple[str, datetime]],
feature_names: list[str] | None = None,
) -> list[FeatureValue]:
"""
バッチPoint-in-Timeクエリ(トレーニングセット構築のコアメソッド)
Args:
entity_times: [(entity_key, as_of_time), ...]
feature_names: クエリする特徴量リスト
Returns:
各(entity, time)ペアに対して、その時点で利用可能な最新特徴量を返す
"""
if not entity_times:
return []
# CTEとDISTINCT ONを使用した効率的なPITクエリ
values_sql = ", ".join(["(%s, %s)"] * len(entity_times))
params = []
for entity_key, as_of_time in entity_times:
params.extend([entity_key, as_of_time])
sql = f"""
WITH entity_times(entity_key, as_of_time) AS (
VALUES {values_sql}
)
SELECT DISTINCT ON (et.entity_key, fe.feature_name)
et.entity_key,
et.as_of_time,
fe.feature_name,
fe.feature_version,
fe.value_double,
fe.event_time AS feature_time
FROM entity_times et
JOIN feature_events fe
ON fe.entity_key = et.entity_key
AND fe.event_time <= et.as_of_time
AND fe.ingest_time <= et.as_of_time
WHERE fe.feature_version = 1
ORDER BY et.entity_key, fe.feature_name, fe.event_time DESC
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
rows = cur.fetchall()
return [
FeatureValue(
entity_key=row[0],
feature_name=row[2],
feature_version=row[3],
event_time=row[5],
value=row[4],
)
for row in rows
]
使用例
# 初期化
store = FeatureStore(
conninfo="postgres://localhost:5432/trading",
producer="momentum_job_v2"
)
# 特徴量を書き込む
store.write_features(
entity_key="AAPL.NASDAQ",
timestamp=datetime(2024, 1, 15, 16, 0), # 市場終了
features={
"momentum_5d": 0.035,
"rsi_14": 62.5,
"volume_ratio": 1.15,
}
)
# リアルタイム推論: 最新特徴量を取得
latest = store.get_latest("AAPL.NASDAQ", ["momentum_5d", "rsi_14"])
print(f"Latest RSI: {latest['rsi_14'].value}")
# トレーニングセット構築: Point-in-Timeクエリ
training_dates = [
("AAPL.NASDAQ", datetime(2024, 1, 10, 9, 30)),
("AAPL.NASDAQ", datetime(2024, 1, 11, 9, 30)),
("AAPL.NASDAQ", datetime(2024, 1, 12, 9, 30)),
("MSFT.NASDAQ", datetime(2024, 1, 10, 9, 30)),
("MSFT.NASDAQ", datetime(2024, 1, 11, 9, 30)),
]
features = store.get_point_in_time(training_dates, ["momentum_5d", "rsi_14"])
# 各時点で利用可能な特徴量値を返す、ルックアヘッドバイアスなし
3. Model Registry
なぜモデル登録が必要か?
シナリオ: モデルパフォーマンス低下、調査が必要
レジストリなしの場合:
- 「今実行しているバージョンは?」 -> 不明
- 「このバージョンのパラメータは?」 -> ファイルを検索
- 「前のバージョンはどこ?」 -> 上書きされた可能性
- 「このバージョンのバックテストパフォーマンスは?」 -> 再実行
レジストリありの場合:
SELECT * FROM models WHERE name = 'momentum_v2';
-> バージョン、パラメータ、メトリクス、トレーニング時間、コードバージョンが一目で分かる
データベース設計
-- モデルメタデータ
CREATE TABLE IF NOT EXISTS models (
model_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
version INT NOT NULL,
strategy_type TEXT, -- 'momentum', 'mean_reversion'など
description TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(name, version)
);
-- モデルメトリクス
CREATE TABLE IF NOT EXISTS model_metrics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
model_id UUID REFERENCES models(model_id),
metric_name TEXT NOT NULL, -- 'sharpe_ratio', 'ic', 'max_drawdown'
value DOUBLE PRECISION,
dataset_type TEXT, -- 'train', 'val', 'test', 'backtest', 'live'
evaluated_at TIMESTAMPTZ DEFAULT NOW()
);
-- モデルアーティファクト(重みファイルなど)
CREATE TABLE IF NOT EXISTS model_artifacts (
artifact_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
model_id UUID REFERENCES models(model_id),
artifact_path TEXT NOT NULL, -- 's3://models/momentum_v2/weights.pkl'
artifact_type TEXT, -- 'weights', 'config', 'scaler', 'onnx'
checksum TEXT, -- SHA256
size_bytes BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- トレーニング実行記録
CREATE TABLE IF NOT EXISTS model_training_runs (
run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
model_id UUID REFERENCES models(model_id),
params JSONB, -- トレーニングハイパーパラメータ
dataset_start TIMESTAMPTZ,
dataset_end TIMESTAMPTZ,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
status TEXT DEFAULT 'running' -- 'running', 'completed', 'failed'
);
Python実装
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from uuid import UUID
import hashlib
import json
@dataclass
class ModelInfo:
"""モデルメタデータ"""
model_id: UUID
name: str
version: int
strategy_type: str | None
description: str | None
created_at: datetime
@dataclass
class ModelWithMetrics:
"""メトリクスを含むモデル"""
model: ModelInfo
metrics: dict[str, float] # {metric_name_dataset: value}
class ModelRegistry:
"""
Model Registry
機能:
1. register_model: 新しいモデルバージョンを登録
2. log_metrics: 評価メトリクスを記録
3. log_artifact: モデルアーティファクトを記録
4. get_best_model: 最高パフォーマンスのモデルを取得
"""
def __init__(self, dsn: str):
self.dsn = dsn
def register_model(
self,
name: str,
strategy_type: str | None = None,
params: dict | None = None,
description: str | None = None,
version: int | None = None,
) -> UUID:
"""
新しいモデルバージョンを登録
Args:
name: モデル名(例: 'momentum_v2')
strategy_type: 戦略タイプ
params: トレーニングパラメータ
description: 説明
version: バージョン番号(None で自動インクリメント)
Returns:
モデルUUID
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
# 自動バージョン番号
if version is None:
cur.execute(
"SELECT COALESCE(MAX(version), 0) + 1 FROM models WHERE name = %s",
(name,)
)
version = cur.fetchone()[0]
# モデル挿入
cur.execute(
"""
INSERT INTO models (name, version, strategy_type, description)
VALUES (%s, %s, %s, %s)
RETURNING model_id
""",
(name, version, strategy_type, description)
)
model_id = cur.fetchone()[0]
# トレーニングパラメータを記録
if params:
cur.execute(
"""
INSERT INTO model_training_runs (model_id, params, started_at, status)
VALUES (%s, %s, %s, 'completed')
""",
(model_id, json.dumps(params), datetime.now())
)
conn.commit()
return model_id
def log_metrics(
self,
model_id: UUID,
metrics: dict[str, float],
dataset_type: str | None = None,
) -> None:
"""
モデルメトリクスを記録
Args:
model_id: モデルUUID
metrics: {metric_name: value}、例: {'sharpe_ratio': 1.5, 'ic': 0.04}
dataset_type: データセットタイプ('train', 'val', 'test', 'backtest', 'live')
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
for metric_name, value in metrics.items():
cur.execute(
"""
INSERT INTO model_metrics (model_id, metric_name, value, dataset_type)
VALUES (%s, %s, %s, %s)
""",
(model_id, metric_name, value, dataset_type)
)
conn.commit()
def log_artifact(
self,
model_id: UUID,
path: str | Path,
artifact_type: str | None = None,
) -> UUID:
"""
モデルアーティファクトを記録
Args:
model_id: モデルUUID
path: アーティファクトパス(ローカルまたはS3)
artifact_type: タイプ('weights', 'config', 'scaler')
Returns:
アーティファクトUUID
"""
path = Path(path)
checksum = None
size_bytes = None
if path.exists():
size_bytes = path.stat().st_size
# SHA256を計算
sha256 = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
checksum = sha256.hexdigest()
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO model_artifacts
(model_id, artifact_path, artifact_type, checksum, size_bytes)
VALUES (%s, %s, %s, %s, %s)
RETURNING artifact_id
""",
(model_id, str(path), artifact_type, checksum, size_bytes)
)
artifact_id = cur.fetchone()[0]
conn.commit()
return artifact_id
def get_model(self, name: str, version: int | None = None) -> ModelInfo | None:
"""モデルを取得(デフォルトで最新バージョン)"""
with self._get_connection() as conn:
with conn.cursor() as cur:
if version is None:
cur.execute(
"""
SELECT model_id, name, version, strategy_type, description, created_at
FROM models WHERE name = %s
ORDER BY version DESC LIMIT 1
""",
(name,)
)
else:
cur.execute(
"""
SELECT model_id, name, version, strategy_type, description, created_at
FROM models WHERE name = %s AND version = %s
""",
(name, version)
)
row = cur.fetchone()
if row:
return ModelInfo(*row)
return None
def get_best_model(
self,
strategy_type: str,
metric_name: str,
dataset_type: str = "test",
higher_is_better: bool = True,
) -> ModelWithMetrics | None:
"""
戦略タイプの最高パフォーマンスモデルを取得
Args:
strategy_type: 戦略タイプ
metric_name: ソートメトリクス(例: 'sharpe_ratio')
dataset_type: データセットタイプ
higher_is_better: 高い値が良いかどうか
Returns:
最高モデルとそのメトリクス
"""
order = "DESC" if higher_is_better else "ASC"
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT m.model_id, m.name, m.version, m.strategy_type,
m.description, m.created_at, mm.value
FROM models m
JOIN model_metrics mm ON m.model_id = mm.model_id
WHERE m.strategy_type = %s
AND mm.metric_name = %s
AND mm.dataset_type = %s
ORDER BY mm.value {order}
LIMIT 1
""",
(strategy_type, metric_name, dataset_type)
)
row = cur.fetchone()
if not row:
return None
model = ModelInfo(*row[:6])
# このモデルのすべてのメトリクスを取得
cur.execute(
"""
SELECT metric_name, value, dataset_type
FROM model_metrics
WHERE model_id = %s
""",
(model.model_id,)
)
metrics = {
f"{r[0]}_{r[2]}": r[1]
for r in cur.fetchall()
}
return ModelWithMetrics(model=model, metrics=metrics)
使用例
registry = ModelRegistry(dsn="postgres://localhost:5432/trading")
# 新しいモデルを登録
model_id = registry.register_model(
name="momentum_xgb",
strategy_type="momentum",
params={
"n_estimators": 100,
"max_depth": 5,
"learning_rate": 0.1,
"features": ["ret_5d", "ret_20d", "vol_20d", "rsi_14"],
},
description="XGBoost momentum model with RSI features"
)
# バックテストメトリクスを記録
registry.log_metrics(model_id, {
"sharpe_ratio": 1.65,
"total_return": 0.28,
"max_drawdown": 0.12,
"ic": 0.042,
"ir": 0.85,
}, dataset_type="backtest")
# テストセットメトリクスを記録
registry.log_metrics(model_id, {
"sharpe_ratio": 1.35,
"ic": 0.035,
}, dataset_type="test")
# モデルアーティファクトを保存
registry.log_artifact(model_id, "models/momentum_xgb_v3.pkl", "weights")
registry.log_artifact(model_id, "models/momentum_xgb_v3_config.json", "config")
# 最高のモメンタムモデルを取得
best = registry.get_best_model("momentum", "sharpe_ratio", "test")
if best:
print(f"Best model: {best.model.name} v{best.model.version}")
print(f"Test Sharpe: {best.metrics.get('sharpe_ratio_test', 'N/A')}")
4. Drift Monitor
ドリフトの三次元
| 次元 | 検出メトリクス | 意味 | 閾値推奨 |
|---|---|---|---|
| Data Drift | PSI | 特徴量分布の変化 | < 0.10 正常、> 0.25 深刻 |
| Prediction Drift | IC | 予測と実際のリターンの相関 | > 0.02 正常、< 0.01 深刻 |
| Performance Drift | Rolling Sharpe | 戦略のリスク調整後リターン | > 0.5 正常、< 0 深刻 |
コアメトリクス計算
import numpy as np
from scipy.stats import spearmanr
def calculate_ic(signals: np.ndarray, returns: np.ndarray) -> float:
"""
Information Coefficientを計算
IC = Spearman相関(予測シグナル、実際のリターン)
解釈:
- IC > 0.05: 優秀
- IC 0.02-0.05: 良好
- IC < 0.02: 要注意
- IC < 0: モデルに問題がある可能性
"""
if len(signals) < 2:
return 0.0
# NaNを削除
mask = ~(np.isnan(signals) | np.isnan(returns))
signals, returns = signals[mask], returns[mask]
if len(signals) < 2:
return 0.0
ic, _ = spearmanr(signals, returns)
return float(ic) if not np.isnan(ic) else 0.0
def calculate_psi(
expected: np.ndarray,
actual: np.ndarray,
bins: int = 10,
) -> float:
"""
Population Stability Index (PSI)を計算
PSI = sum((actual% - expected%) * ln(actual% / expected%))
解釈:
- PSI < 0.10: 分布安定
- PSI 0.10-0.25: 軽度のドリフト、監視
- PSI > 0.25: 重大なドリフト、対応が必要
"""
eps = 1e-6
# ベースライン分布に基づいてビンを作成
_, bin_edges = np.histogram(expected, bins=bins)
# 各ビンの割合を計算
expected_counts, _ = np.histogram(expected, bins=bin_edges)
actual_counts, _ = np.histogram(actual, bins=bin_edges)
expected_pct = expected_counts / len(expected) + eps
actual_pct = actual_counts / len(actual) + eps
# PSI公式
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return float(psi)
def calculate_sharpe(
returns: np.ndarray,
periods_per_year: int = 252,
) -> float:
"""
年率換算シャープレシオを計算
Sharpe = mean(returns) / std(returns) * sqrt(252)
"""
returns = returns[~np.isnan(returns)]
if len(returns) < 2:
return 0.0
mean_ret = np.mean(returns)
std_ret = np.std(returns, ddof=1)
if std_ret < 1e-10:
return 0.0
return (mean_ret / std_ret) * np.sqrt(periods_per_year)
Drift Monitor実装
from dataclasses import dataclass
from datetime import date
@dataclass
class DriftMetrics:
"""日次ドリフトメトリクス"""
date: date
strategy_id: str
ic: float | None = None
ic_5d_avg: float | None = None
psi: float | None = None
sharpe_5d: float | None = None
sharpe_20d: float | None = None
ic_alert: bool = False
psi_alert: bool = False
sharpe_alert: bool = False
@dataclass
class AlertConfig:
"""アラート閾値設定"""
ic_warning: float = 0.02
ic_critical: float = 0.01
psi_warning: float = 0.10
psi_critical: float = 0.25
sharpe_warning: float = 0.5
sharpe_critical: float = 0.0
class DriftMonitor:
"""
ドリフト監視サービス
日次実行、IC、PSI、シャープメトリクスを計算、データベースに保存、アラートをトリガー。
"""
def __init__(self, dsn: str, strategy_id: str = "default"):
self.dsn = dsn
self.strategy_id = strategy_id
self.config = AlertConfig()
def calculate_metrics(self, target_date: date) -> DriftMetrics:
"""指定された日付のドリフトメトリクスを計算"""
metrics = DriftMetrics(date=target_date, strategy_id=self.strategy_id)
# シグナルとリターンを取得
signals, returns = self._get_signals_and_returns(target_date)
if len(signals) > 0:
metrics.ic = calculate_ic(signals, returns)
# シャープを計算するための過去リターンを取得
daily_returns = self._get_daily_returns(lookback_days=60)
if len(daily_returns) >= 5:
metrics.sharpe_5d = calculate_sharpe(daily_returns[-5:])
if len(daily_returns) >= 20:
metrics.sharpe_20d = calculate_sharpe(daily_returns[-20:])
# アラートをチェック
if metrics.ic is not None:
metrics.ic_alert = metrics.ic < self.config.ic_critical
if metrics.psi is not None:
metrics.psi_alert = metrics.psi > self.config.psi_critical
if metrics.sharpe_20d is not None:
metrics.sharpe_alert = metrics.sharpe_20d < self.config.sharpe_critical
return metrics
def save_metrics(self, metrics: DriftMetrics) -> None:
"""メトリクスをデータベースに保存"""
sql = """
INSERT INTO drift_metrics (
date, strategy_id, ic, ic_5d_avg, psi, sharpe_5d, sharpe_20d,
ic_alert, psi_alert, sharpe_alert
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (date, strategy_id) DO UPDATE SET
ic = EXCLUDED.ic,
psi = EXCLUDED.psi,
sharpe_5d = EXCLUDED.sharpe_5d,
sharpe_20d = EXCLUDED.sharpe_20d,
ic_alert = EXCLUDED.ic_alert,
psi_alert = EXCLUDED.psi_alert,
sharpe_alert = EXCLUDED.sharpe_alert
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql, [
metrics.date, metrics.strategy_id,
metrics.ic, metrics.ic_5d_avg, metrics.psi,
metrics.sharpe_5d, metrics.sharpe_20d,
metrics.ic_alert, metrics.psi_alert, metrics.sharpe_alert,
])
conn.commit()
def run_daily(self, target_date: date | None = None) -> DriftMetrics:
"""日次ドリフト監視ジョブ"""
if target_date is None:
target_date = date.today()
print(f"Running drift monitoring for {target_date}")
metrics = self.calculate_metrics(target_date)
self.save_metrics(metrics)
# アラートを出力
if metrics.ic_alert:
print(f"[ALERT] IC = {metrics.ic:.4f} below threshold {self.config.ic_critical}")
if metrics.psi_alert:
print(f"[ALERT] PSI = {metrics.psi:.4f} above threshold {self.config.psi_critical}")
if metrics.sharpe_alert:
print(f"[ALERT] Sharpe = {metrics.sharpe_20d:.4f} below threshold {self.config.sharpe_critical}")
return metrics
アラート対応マトリクス
| アラートタイプ | 深刻度 | 推奨アクション |
|---|---|---|
| IC < 0.02 が5日連続 | 警告 | 特徴量計算が正しいか確認 |
| IC < 0.01 | 深刻 | ポジションを50%削減、モデル診断開始 |
| IC < 0 が3日連続 | 重大 | 戦略を一時停止、手動レビュー |
| PSI > 0.10 | 警告 | 後続の変化を監視 |
| PSI > 0.25 | 深刻 | 再トレーニングプロセスをトリガー |
| シャープ < 0.5 が10日連続 | 警告 | 市場状況を確認 |
| シャープ < 0 が5日連続 | 深刻 | ポジション削減、再トレーニング準備 |
5. 統合: 研究から本番へ
完全ワークフロー
+----------------------------------------------------------------------+
| 研究フェーズ |
+----------------------------------------------------------------------+
| 1. 特徴量開発 |
| +-> Feature Storeに書き込む(正しいavailability_lagを設定) |
| |
| 2. トレーニングセット構築 |
| +-> FeatureStore.get_point_in_time() |
| +-> Parquetエクスポート(不変スナップショット) |
| |
| 3. モデルトレーニング |
| +-> パラメータ、コードバージョンを記録 |
| +-> Model Registryに登録 |
| |
| 4. バックテスト評価 |
| +-> log_metrics(dataset_type='backtest') |
+----------------------------------------------------------------------+
|
v
+----------------------------------------------------------------------+
| デプロイフェーズ |
+----------------------------------------------------------------------+
| 5. モデル選択 |
| +-> get_best_model(strategy_type, metric, dataset_type='test') |
| |
| 6. モデルロード |
| +-> artifact_pathから重みをロード |
| +-> チェックサムを検証 |
+----------------------------------------------------------------------+
|
v
+----------------------------------------------------------------------+
| ランタイムフェーズ |
+----------------------------------------------------------------------+
| 7. リアルタイム推論 |
| +-> FeatureStore.get_latest()で特徴量を取得 |
| +-> モデル予測 |
| +-> シグナル出力 |
| |
| 8. 日次監視 |
| +-> Drift MonitorがIC/PSI/シャープを計算 |
| +-> アラートをトリガー |
| |
| 9. 再トレーニング(必要に応じて) |
| +-> ステップ2に戻る |
+----------------------------------------------------------------------+
再現性チェックリスト
| チェック項目 | 実装方法 | 検証方法 |
|---|---|---|
| コードバージョン | git SHAを記録 | producer_versionフィールド |
| 特徴量バージョン | feature_versionカラム | クエリでバージョン指定 |
| トレーニングデータ | Parquetスナップショット + フィンガープリント | 再トレーニングで同じ結果 |
| モデルパラメータ | model_training_runs.params | JSON保存 |
| モデル重み | model_artifacts.checksum | SHA256検証 |
| 評価メトリクス | model_metricsテーブル | 時間で追跡 |
日次運用スクリプト例
from datetime import date, datetime
def daily_mlops_job(
feature_store: FeatureStore,
model_registry: ModelRegistry,
drift_monitor: DriftMonitor,
strategy_id: str,
):
"""日次MLOpsジョブ"""
today = date.today()
print(f"=== MLOps Daily Job: {today} ===")
# 1. 特徴量ヘルスチェック
print("\n[1] Feature Health Check")
latest = feature_store.get_latest("AAPL.NASDAQ")
for name, fv in latest.items():
age_hours = (datetime.now() - fv.event_time).total_seconds() / 3600
if age_hours > 24:
print(f" WARNING: {name} is {age_hours:.1f} hours old")
else:
print(f" OK: {name} updated {age_hours:.1f} hours ago")
# 2. モデルステータスチェック
print("\n[2] Model Status Check")
current_model = model_registry.get_model("momentum_xgb")
if current_model:
print(f" Current: {current_model.name} v{current_model.version}")
print(f" Created: {current_model.created_at}")
# 3. ドリフト監視
print("\n[3] Drift Monitoring")
drift_metrics = drift_monitor.run_daily(today)
print(f" IC: {drift_metrics.ic}")
print(f" Sharpe (20d): {drift_metrics.sharpe_20d}")
# 4. 決定
if drift_metrics.ic_alert or drift_metrics.sharpe_alert:
print("\n[ACTION REQUIRED] Consider retraining or reducing position size")
else:
print("\n[OK] All metrics within normal range")
# スケジュールジョブ(例: cron)
# 0 6 * * * python -c "from mlops import daily_mlops_job; daily_mlops_job(...)"
さらなる読書
- Machine Learning Design Patterns by Lakshmanan, Robinson, and Munn
- Model Drift and Retraining Strategies