背景知識:モデルドリフトと再訓練戦略
金融市場は非定常的。今日機能するモデルが明日には失敗する可能性がある。ドリフト検出とタイムリーな再訓練は本番環境システムの必須機能である。
1. モデルドリフトとは何か?
モデルドリフトとは、デプロイ後にモデルの予測パフォーマンスが時間とともに徐々に低下する現象を指す。
1.1 ドリフトの2つのタイプ
| タイプ | 定義 | 金融例 |
|---|---|---|
| Data Drift | 入力特徴量の分布が変化 | ボラティリティが15%から40%に上昇(COVID危機) |
| Concept Drift | 特徴量とターゲットの関係が変化 | Momentumファクターが無効化(レジームスイッチ) |
1.2 金融市場におけるドリフトの根本原因
なぜ金融モデルは必然的にドリフトするのか?
1. 市場参加者構造の変化
- 個人投資家の流入 → Momentum効果が強化
- クオンツファンドの増加 → Alphaの減衰
2. マクロ経済環境の変化
- 金利サイクルの変化(QE → 引き締め)
- 経済サイクルの移行(拡大 → 不況)
3. 規制政策の変化
- 空売り規制 → 価格発見メカニズムの変化
- HFT規制 → 市場マイクロ構造の変化
4. 技術と情報の変化
- 新しいデータソースの出現 → 古いファクターがフロントラン
- AIの普及 → 戦略の均質化
2. ドリフト検出方法
2.1 パフォーマンスモニタリング
最も直接的なアプローチ:ローリングウィンドウで戦略パフォーマンスを監視。
import numpy as np
class PerformanceMonitor:
"""パフォーマンスドリフトモニター"""
def __init__(self, window: int = 30, sharpe_threshold: float = 0.5):
self.window = window # ローリングウィンドウ(日数)
self.sharpe_threshold = sharpe_threshold
self.returns = []
def update(self, daily_return: float) -> dict:
"""更新してドリフトをチェック"""
self.returns.append(daily_return)
if len(self.returns) < self.window:
return {'status': 'warming_up'}
# ローリングSharpeを計算
recent = self.returns[-self.window:]
rolling_sharpe = np.mean(recent) / np.std(recent) * np.sqrt(252)
# ドリフトを検出
is_drifting = rolling_sharpe < self.sharpe_threshold
return {
'rolling_sharpe': rolling_sharpe,
'is_drifting': is_drifting,
'alert': 'DRIFT_DETECTED' if is_drifting else 'OK'
}
閾値設定の推奨:
| メトリクス | 警告閾値 | クリティカル閾値 | トリガーアクション |
|---|---|---|---|
| Rolling Sharpe | < 0.5 | < 0 | 再訓練をトリガー |
| Rolling Win Rate | < 45% | < 40% | シグナル品質をチェック |
| Rolling Return | < -5% | < -10% | ポジションサイズを削減 |
2.2 統計的検定方法
Kolmogorov-Smirnov検定(K-S検定)
特徴量分布が著しく変化したかを検出。
import numpy as np
from scipy.stats import ks_2samp
def detect_data_drift(
training_data: np.ndarray,
recent_data: np.ndarray,
significance: float = 0.05
) -> dict:
"""
データドリフト検出のためのK-S検定
原理:2つのサンプルが同じ分布から来たかを比較
H0:2つのサンプルは同じ分布から来る
p < significanceなら、H0を棄却しドリフトが発生したと結論
"""
statistic, p_value = ks_2samp(training_data, recent_data)
return {
'ks_statistic': statistic, # D値、大きいほど分布の差が大きい
'p_value': p_value,
'is_drifting': p_value < significance,
'interpretation': 'DRIFT' if p_value < significance else 'STABLE'
}
# 使用例
training_returns = returns['2020-01':'2022-12']
recent_returns = returns['2024-01':'2024-03']
result = detect_data_drift(training_returns, recent_returns)
print(f"K-S statistic: {result['ks_statistic']:.4f}")
print(f"P-value: {result['p_value']:.4f}")
print(f"Status: {result['interpretation']}")
カイ二乗検定
カテゴリ特徴量のドリフト検出に適している。
from scipy.stats import chi2_contingency
def detect_categorical_drift(
training_counts: dict,
recent_counts: dict,
significance: float = 0.05
) -> dict:
"""
カテゴリ特徴量ドリフトのカイ二乗検定
例:市場レジームラベル分布が変化したかを検出
training_counts = {'bull': 120, 'bear': 80, 'sideways': 50}
recent_counts = {'bull': 10, 'bear': 35, 'sideways': 5}
"""
# 分割表を構築
categories = set(training_counts.keys()) | set(recent_counts.keys())
train_freq = [training_counts.get(c, 0) for c in categories]
recent_freq = [recent_counts.get(c, 0) for c in categories]
contingency_table = [train_freq, recent_freq]
chi2, p_value, dof, expected = chi2_contingency(contingency_table)
return {
'chi2_statistic': chi2,
'p_value': p_value,
'degrees_of_freedom': dof,
'is_drifting': p_value < significance
}
2.3 CUSUM管理図法
Cumulative Sum管理図:予測誤差の持続的なシフトを検出。
class CUSUMDetector:
"""
CUSUM(Cumulative Sum)ドリフト検出器
原理:
- 予測誤差の偏差を累積
- 誤差がランダムなら、累積和は0付近で変動すべき
- 系統的バイアスがあれば、累積和は持続的にドリフト
"""
def __init__(self, threshold: float = 5.0, drift: float = 0.5):
"""
パラメータ:
- threshold: アラートトリガー閾値
- drift: 許容ドリフト量(感度制御)
"""
self.threshold = threshold
self.drift = drift
self.reset()
def reset(self):
self.s_pos = 0 # 正の累積和
self.s_neg = 0 # 負の累積和
self.history = []
def update(self, error: float) -> dict:
"""
CUSUM値を更新
パラメータ:
- error: 予測誤差(予測値 - 実際値)
戻り値:
- ドリフト検出結果
"""
# 正規化された誤差
normalized_error = error
# 累積和を更新
self.s_pos = max(0, self.s_pos + normalized_error - self.drift)
self.s_neg = max(0, self.s_neg - normalized_error - self.drift)
self.history.append({
's_pos': self.s_pos,
's_neg': self.s_neg,
'error': error
})
# ドリフトを検出
drift_up = self.s_pos > self.threshold
drift_down = self.s_neg > self.threshold
if drift_up or drift_down:
direction = 'UP' if drift_up else 'DOWN'
return {
'is_drifting': True,
'direction': direction,
'cusum_value': self.s_pos if drift_up else self.s_neg,
'action': 'RETRAIN_RECOMMENDED'
}
return {
'is_drifting': False,
'cusum_pos': self.s_pos,
'cusum_neg': self.s_neg,
'action': 'CONTINUE_MONITORING'
}
# 使用例
detector = CUSUMDetector(threshold=5.0, drift=0.5)
for pred, actual in zip(predictions, actuals):
error = pred - actual
result = detector.update(error)
if result['is_drifting']:
print(f"Drift detected! Direction: {result['direction']}")
break
CUSUMの利点:
- 徐々に小さい持続的シフトを検出可能
- 単一点検出より敏感
- 確固たる統計的基盤がある
2.4 マルチ指標総合検出
本番環境推奨:複数の検出方法を組み合わせて偽陽性率を削減。
class ComprehensiveDriftDetector:
"""総合ドリフト検出器"""
def __init__(self):
self.performance_monitor = PerformanceMonitor()
self.cusum_detector = CUSUMDetector()
def check_drift(self,
daily_return: float,
prediction_error: float,
training_features: np.array,
recent_features: np.array) -> dict:
results = {}
# 1. パフォーマンス監視
perf_result = self.performance_monitor.update(daily_return)
results['performance'] = perf_result
# 2. CUSUM検出
cusum_result = self.cusum_detector.update(prediction_error)
results['cusum'] = cusum_result
# 3. K-S検定(定期的に実行、例:週次)
ks_result = detect_data_drift(training_features, recent_features)
results['ks_test'] = ks_result
# 総合判断:多数決投票
drift_signals = [
perf_result.get('is_drifting', False),
cusum_result.get('is_drifting', False),
ks_result.get('is_drifting', False)
]
drift_count = sum(drift_signals)
results['overall'] = {
'drift_count': drift_count,
'is_drifting': drift_count >= 2, # 少なくとも2つの検出器がアラーム
'confidence': drift_count / 3,
'recommendation': self._get_recommendation(drift_count)
}
return results
def _get_recommendation(self, drift_count: int) -> str:
if drift_count == 0:
return 'CONTINUE_NORMAL'
elif drift_count == 1:
return 'INCREASE_MONITORING'
elif drift_count == 2:
return 'PREPARE_RETRAIN'
else:
return 'IMMEDIATE_RETRAIN'
3. 再訓練戦略
3.1 スケジュール再訓練
最もシンプルな戦略:固定スケジュールでモデルを再訓練。
| 戦略頻度 | 期間 | 適用可能シナリオ | 長所 | 短所 |
|---|---|---|---|---|
| 日次戦略 | 月次 | 中低頻度ファクター戦略 | シンプル、予測可能 | 遅れる可能性 |
| 週次戦略 | 四半期 | ポートフォリオ配分戦略 | 低コスト | 突然の変化に適応できない |
| 分次レベル戦略 | 週次 | 高頻度取引 | タイムリーな更新 | 高コスト |
# スケジュール再訓練スケジューラ
class ScheduledRetrainer:
def __init__(self, retrain_frequency: str = 'monthly'):
self.frequency = retrain_frequency
self.last_retrain = None
def should_retrain(self, current_date) -> bool:
if self.last_retrain is None:
return True
if self.frequency == 'weekly':
return (current_date - self.last_retrain).days >= 7
elif self.frequency == 'monthly':
return (current_date - self.last_retrain).days >= 30
elif self.frequency == 'quarterly':
return (current_date - self.last_retrain).days >= 90
return False
3.2 トリガー再訓練
よりスマートな戦略:ドリフトが検出された場合のみ再訓練をトリガー。
class TriggeredRetrainer:
"""トリガー再訓練器"""
def __init__(self,
performance_threshold: float = 0.3, # Sharpe閾値
cusum_threshold: float = 5.0,
min_interval_days: int = 7): # 最小再訓練間隔
self.performance_threshold = performance_threshold
self.cusum_threshold = cusum_threshold
self.min_interval_days = min_interval_days
self.last_retrain = None
self.detector = ComprehensiveDriftDetector()
def check_and_retrain(self, model, new_data, current_date) -> dict:
"""再訓練が必要かチェック、必要なら実行"""
# 過度に頻繁な再訓練を防ぐ
if self.last_retrain:
days_since = (current_date - self.last_retrain).days
if days_since < self.min_interval_days:
return {'action': 'SKIP', 'reason': 'Too soon since last retrain'}
# ドリフト検出
drift_result = self.detector.check_drift(...)
if drift_result['overall']['is_drifting']:
# 再訓練を実行
new_model = self._retrain(model, new_data)
self.last_retrain = current_date
return {
'action': 'RETRAINED',
'drift_confidence': drift_result['overall']['confidence'],
'new_model': new_model
}
return {'action': 'CONTINUE', 'drift_confidence': drift_result['overall']['confidence']}
3.3 オンライン学習
継続的更新:完全な再訓練の代わりに、モデルパラメータを段階的に更新。
class OnlineLearner:
"""
オンライン学習更新器
適用可能シナリオ:
- 市場変化に素早く適応する必要がある
- 完全な再訓練がコストがかかりすぎる
- データストリームが継続的に到着
リスク:
- 壊滅的忘却(履歴パターンを失う)
- ノイズに敏感
"""
def __init__(self, model, learning_rate: float = 0.001):
self.model = model
self.learning_rate = learning_rate
self.update_count = 0
def incremental_update(self, new_x, new_y):
"""
モデルを段階的に更新
小さな学習率で単一ステップ勾配降下を使用
"""
# 順伝播
prediction = self.model.predict(new_x)
error = new_y - prediction
# 逆伝播(簡略化した説明)
gradient = self._compute_gradient(new_x, error)
# パラメータ更新
for param, grad in zip(self.model.parameters(), gradient):
param -= self.learning_rate * grad
self.update_count += 1
return {
'prediction': prediction,
'error': error,
'update_count': self.update_count
}
def _compute_gradient(self, x, error):
# 実際の実装はモデルタイプに依存
pass
オンライン学習の落とし穴:
- 壊滅的忘却:新しいデータが古い知識を上書き
- ノイズの蓄積:単一サンプル更新はノイズに惑わされやすい
- 学習率の感度:大きすぎる → 不安定、小さすぎる → 適応が遅い
3.4 ハイブリッド戦略(推奨)
ベストプラクティス:スケジュールとトリガー再訓練を組み合わせる。
class HybridRetrainer:
"""ハイブリッド再訓練戦略"""
def __init__(self):
self.scheduled_interval_days = 30 # スケジュール:月次
self.drift_detector = ComprehensiveDriftDetector()
self.last_scheduled_retrain = None
self.last_triggered_retrain = None
def should_retrain(self, current_date, metrics) -> dict:
"""再訓練が必要か判断"""
# スケジュール再訓練をチェック
scheduled_due = self._check_scheduled(current_date)
# トリガー再訓練をチェック
drift_result = self.drift_detector.check_drift(metrics)
triggered_due = drift_result['overall']['is_drifting']
if scheduled_due and triggered_due:
return {
'should_retrain': True,
'reason': 'BOTH_SCHEDULED_AND_DRIFT',
'priority': 'HIGH'
}
elif triggered_due:
return {
'should_retrain': True,
'reason': 'DRIFT_DETECTED',
'priority': 'HIGH'
}
elif scheduled_due:
return {
'should_retrain': True,
'reason': 'SCHEDULED',
'priority': 'NORMAL'
}
return {'should_retrain': False, 'reason': 'NO_TRIGGER'}
4. 再訓練のベストプラクティス
4.1 訓練データ選択
| 戦略 | 説明 | 長所 | 短所 |
|---|---|---|---|
| Expanding Window | すべての履歴データを使用 | 大きなサンプルサイズ | 古いデータが時代遅れの可能性 |
| Sliding Window | 最近のN日のみを使用 | 新しいパターンに適応 | 重要な履歴を失う可能性 |
| Weighted Window | 最近のデータに高い重み | 履歴と現在をバランス | 重み選択が困難 |
推奨:スライディングウィンドウ + 危機期間データを保持
def prepare_training_data(all_data, window_days=252*2, keep_crisis=True):
"""再訓練データを準備"""
# スライディングウィンドウ
recent_data = all_data.iloc[-window_days:]
if keep_crisis:
# 重要な危機期間データを保持
crisis_periods = [
('2008-09', '2009-03'), # 金融危機
('2020-02', '2020-04'), # COVID
('2022-01', '2022-06'), # 金利上昇ショック
]
crisis_data = []
for start, end in crisis_periods:
if start in all_data.index:
crisis_data.append(all_data.loc[start:end])
# マージ
training_data = pd.concat([recent_data] + crisis_data)
training_data = training_data.drop_duplicates()
return training_data
4.2 モデルバージョン管理
# モデルバージョン管理
class ModelVersionManager:
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.versions = []
def save_version(self, model, metrics: dict, reason: str):
"""モデルバージョンを保存"""
version_id = f"v{len(self.versions)+1}_{datetime.now():%Y%m%d_%H%M}"
version_info = {
'version_id': version_id,
'timestamp': datetime.now(),
'reason': reason,
'metrics': metrics,
'model_path': f"{self.storage_path}/{version_id}.pkl"
}
# モデルを保存
joblib.dump(model, version_info['model_path'])
self.versions.append(version_info)
return version_id
def rollback(self, version_id: str):
"""指定バージョンにロールバック"""
for v in self.versions:
if v['version_id'] == version_id:
return joblib.load(v['model_path'])
raise ValueError(f"Version {version_id} not found")
4.3 A/Bテスト
再訓練後、古いモデルを直接置き換えない。代わりに比較テストを実行。
class ABTester:
"""モデルA/Bテスト"""
def __init__(self, old_model, new_model, test_days: int = 5):
self.old_model = old_model
self.new_model = new_model
self.test_days = test_days
self.old_results = []
self.new_results = []
def run_comparison(self, data) -> dict:
"""比較テストを実行"""
for day_data in data:
old_pred = self.old_model.predict(day_data)
new_pred = self.new_model.predict(day_data)
self.old_results.append(old_pred)
self.new_results.append(new_pred)
# パフォーマンス比較を計算
old_sharpe = calculate_sharpe(self.old_results)
new_sharpe = calculate_sharpe(self.new_results)
improvement = (new_sharpe - old_sharpe) / abs(old_sharpe) if old_sharpe != 0 else 0
return {
'old_sharpe': old_sharpe,
'new_sharpe': new_sharpe,
'improvement': improvement,
'recommendation': 'DEPLOY_NEW' if improvement > 0.1 else 'KEEP_OLD'
}
5. 本番環境ドリフトモニタリングアーキテクチャ
前のセクションでは理論的なドリフト検出方法を扱った。このセクションでは本番環境のドリフトモニタリングシステム実装を提示する。
5.1 コア設計パターン
本番環境システムに必要なもの:
- マルチメトリクス監視:IC、PSI、Sharpeを同時に追跡
- 設定可能な閾値:異なる戦略は異なる許容度を持つ
- 永続ストレージ:分析と監査のためのドリフト履歴
- アラートレベル:警告とクリティカルの重大度を区別
AlertConfigパターン
from dataclasses import dataclass
@dataclass
class AlertConfig:
"""アラート閾値設定"""
# IC(Information Coefficient)閾値
ic_warning: float = 0.02 # IC < 0.02で警告トリガー
ic_critical: float = 0.01 # IC < 0.01でクリティカルアラート
# PSI(Population Stability Index)閾値
psi_warning: float = 0.10 # PSI > 0.10は分布シフトを示す
psi_critical: float = 0.25 # PSI > 0.25は重大なシフトを示す
# Sharpe閾値
sharpe_warning: float = 0.5 # Sharpe < 0.5でパフォーマンス低下
sharpe_critical: float = 0.0 # Sharpe < 0で戦略が損失
閾値の解釈:
| メトリクス | 警告閾値 | クリティカル閾値 | ビジネス意味 |
|---|---|---|---|
| IC | < 0.02 | < 0.01 | シグナル予測力が低下 |
| PSI | > 0.10 | > 0.25 | 特徴量分布がシフト |
| Sharpe | < 0.5 | < 0.0 | リスク調整後リターンが悪化 |
5.2 DriftMetricsデータ構造
ドリフトメトリクスを日次で計算・保存:
from dataclasses import dataclass
from datetime import date
@dataclass
class DriftMetrics:
"""日次ドリフトメトリクス"""
date: date
strategy_id: str
# ICメトリクス(Information Coefficient)
ic: float | None = None # 日次IC
ic_5d_avg: float | None = None # 5日間ローリング平均
ic_20d_avg: float | None = None # 20日間ローリング平均
# PSIメトリクス(分布安定性)
psi: float | None = None
psi_5d_avg: float | None = None
# Sharpeメトリクス(リスク調整後リターン)
sharpe_5d: float | None = None # 5日間Sharpe
sharpe_20d: float | None = None # 20日間Sharpe
sharpe_60d: float | None = None # 60日間Sharpe
# ビジネスメトリクス
daily_return: float | None = None
cumulative_return: float | None = None
trade_count: int = 0
signal_count: int = 0
# アラート状態
ic_alert: bool = False
psi_alert: bool = False
sharpe_alert: bool = False
複数の時間ウィンドウを使う理由:
- 5日間ウィンドウ:素早い反応、短期ドリフトを捉える
- 20日間ウィンドウ:ノイズをフィルタ、トレンドを確認
- 60日間ウィンドウ:長期ベースライン、構造的変化を識別
5.3 DriftMonitorコア実装
import logging
import numpy as np
import psycopg
from psycopg.rows import dict_row
logger = logging.getLogger(__name__)
class DriftMonitor:
"""
本番環境ドリフトモニタリングサービス
責務:
1. IC、PSI、Sharpeメトリクスを計算
2. 設定された閾値と比較してアラート
3. PostgreSQLに永続化
4. 戦略ごとの分離をサポート
"""
def __init__(self, dsn: str, strategy_id: str = "default"):
"""
引数:
dsn: PostgreSQL接続文字列
strategy_id: 戦略識別子(マルチ戦略分離をサポート)
"""
self.dsn = dsn
self.strategy_id = strategy_id
self._config: AlertConfig | None = None
def load_config(self) -> AlertConfig:
"""データベースからアラート設定を読み込み"""
with psycopg.connect(self.dsn) as conn:
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(
"""
SELECT ic_warning, ic_critical, psi_warning, psi_critical,
sharpe_warning, sharpe_critical
FROM drift_alert_config
WHERE strategy_id = %s
""",
(self.strategy_id,),
)
row = cur.fetchone()
if row:
self._config = AlertConfig(**row)
else:
self._config = AlertConfig() # デフォルトを使用
return self._config
def calculate_metrics(self, target_date: date) -> DriftMetrics:
"""
指定日のすべてのドリフトメトリクスを計算
コアロジック:
1. シグナルとリターンを取得、ICを計算
2. 履歴リターンを取得、ローリングSharpeを計算
3. 閾値と比較してアラート状態
"""
if self._config is None:
self.load_config()
metrics = DriftMetrics(date=target_date, strategy_id=self.strategy_id)
# IC(シグナル-リターン相関)を計算
signals, returns = self.get_signals_and_returns(target_date)
if len(signals) > 0 and len(returns) > 0:
metrics.ic = calculate_ic(signals, returns)
metrics.signal_count = len(signals)
# ローリングSharpeを計算
daily_returns = self.get_daily_returns(lookback_days=60)
if len(daily_returns) >= 5:
metrics.sharpe_5d = calculate_sharpe(daily_returns[-5:])
if len(daily_returns) >= 20:
metrics.sharpe_20d = calculate_sharpe(daily_returns[-20:])
if len(daily_returns) >= 60:
metrics.sharpe_60d = calculate_sharpe(daily_returns)
# アラート状態を判定
config = self._config or AlertConfig()
if metrics.ic is not None:
metrics.ic_alert = metrics.ic < config.ic_critical
if metrics.psi is not None:
metrics.psi_alert = metrics.psi > config.psi_critical
if metrics.sharpe_20d is not None:
metrics.sharpe_alert = metrics.sharpe_20d < config.sharpe_critical
return metrics
5.4 PostgreSQL永続化
ドリフトメトリクスの永続化が必要な理由:
- 履歴トレンド分析
- コンプライアンス監査
- 再訓練決定の証拠
def save_metrics(self, metrics: DriftMetrics) -> None:
"""メトリクスをデータベースに保存(冪等upsertをサポート)"""
with psycopg.connect(self.dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO drift_metrics (
date, strategy_id, ic, ic_5d_avg, ic_20d_avg,
psi, psi_5d_avg, sharpe_5d, sharpe_20d, sharpe_60d,
daily_return, cumulative_return, trade_count, signal_count,
ic_alert, psi_alert, sharpe_alert
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s
)
ON CONFLICT (date, strategy_id) DO UPDATE SET
ic = EXCLUDED.ic,
sharpe_20d = EXCLUDED.sharpe_20d,
ic_alert = EXCLUDED.ic_alert,
psi_alert = EXCLUDED.psi_alert,
sharpe_alert = EXCLUDED.sharpe_alert
""",
(
metrics.date, metrics.strategy_id, metrics.ic,
metrics.ic_5d_avg, metrics.ic_20d_avg, metrics.psi,
metrics.psi_5d_avg, metrics.sharpe_5d, metrics.sharpe_20d,
metrics.sharpe_60d, metrics.daily_return,
metrics.cumulative_return, metrics.trade_count,
metrics.signal_count, metrics.ic_alert,
metrics.psi_alert, metrics.sharpe_alert,
),
)
conn.commit()
logger.info(f"Saved drift metrics for {metrics.date}")
データベーススキーマ:
CREATE TABLE drift_metrics (
date DATE NOT NULL,
strategy_id VARCHAR(64) NOT NULL,
ic FLOAT,
ic_5d_avg FLOAT,
ic_20d_avg FLOAT,
psi FLOAT,
psi_5d_avg FLOAT,
sharpe_5d FLOAT,
sharpe_20d FLOAT,
sharpe_60d FLOAT,
daily_return FLOAT,
cumulative_return FLOAT,
trade_count INT DEFAULT 0,
signal_count INT DEFAULT 0,
ic_alert BOOLEAN DEFAULT FALSE,
psi_alert BOOLEAN DEFAULT FALSE,
sharpe_alert BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (date, strategy_id)
);
CREATE TABLE drift_alert_config (
strategy_id VARCHAR(64) PRIMARY KEY,
ic_warning FLOAT DEFAULT 0.02,
ic_critical FLOAT DEFAULT 0.01,
psi_warning FLOAT DEFAULT 0.10,
psi_critical FLOAT DEFAULT 0.25,
sharpe_warning FLOAT DEFAULT 0.5,
sharpe_critical FLOAT DEFAULT 0.0
);
5.5 日次モニタリングジョブ
def run_daily(self, target_date: date | None = None) -> DriftMetrics:
"""
日次ドリフトモニタリングジョブエントリポイント
典型的なデプロイ:市場終了後にcronまたはAirflowで実行
"""
if target_date is None:
target_date = date.today()
logger.info(f"Running drift monitoring for {target_date}")
metrics = self.calculate_metrics(target_date)
self.save_metrics(metrics)
# アラートログ
if metrics.ic_alert:
logger.warning(f"IC ALERT: IC={metrics.ic:.4f} below threshold")
if metrics.psi_alert:
logger.warning(f"PSI ALERT: PSI={metrics.psi:.4f} above threshold")
if metrics.sharpe_alert:
logger.warning(f"SHARPE ALERT: Sharpe={metrics.sharpe_20d:.4f} below threshold")
return metrics
5.6 統合例:再訓練をトリガーするタイミング
ドリフトモニタリングと再訓練決定を組み合わせる:
class RetrainOrchestrator:
"""再訓練オーケストレーター"""
def __init__(self, drift_monitor: DriftMonitor):
self.monitor = drift_monitor
self.consecutive_alerts = 0
self.alert_threshold = 3 # 3日連続後にトリガー
def check_retrain_needed(self, target_date: date) -> dict:
"""
再訓練をトリガーすべきか判断
ルール:
1. IC < 0.01が3日連続 -> トリガー
2. PSI > 0.25の単一発生 -> トリガー
3. 20日間Sharpe < 0 -> トリガー
"""
metrics = self.monitor.run_daily(target_date)
# 連続アラートを追跡
if metrics.ic_alert or metrics.sharpe_alert:
self.consecutive_alerts += 1
else:
self.consecutive_alerts = 0
# トリガー条件を評価
triggers = []
if self.consecutive_alerts >= self.alert_threshold:
triggers.append(f"IC/Sharpe alert for {self.consecutive_alerts} consecutive days")
if metrics.psi_alert:
triggers.append(f"PSI={metrics.psi:.3f} exceeds critical threshold")
if metrics.sharpe_20d is not None and metrics.sharpe_20d < 0:
triggers.append(f"20-day Sharpe={metrics.sharpe_20d:.2f} is negative")
should_retrain = len(triggers) > 0
return {
'should_retrain': should_retrain,
'triggers': triggers,
'metrics': metrics,
'action': 'RETRAIN' if should_retrain else 'CONTINUE'
}
# 使用例
monitor = DriftMonitor(
dsn="postgres://trading:trading@localhost:5432/trading",
strategy_id="momentum_v2"
)
orchestrator = RetrainOrchestrator(monitor)
result = orchestrator.check_retrain_needed(date.today())
if result['should_retrain']:
print(f"Triggering retrain, reasons: {result['triggers']}")
# 再訓練パイプラインを呼び出し
5.7 アーキテクチャまとめ
| コンポーネント | 責務 | 主要設計 |
|---|---|---|
| AlertConfig | 閾値設定 | Dataclass、DB読み込みサポート |
| DriftMetrics | メトリクスコンテナ | マルチウィンドウ、アラート状態 |
| DriftMonitor | コアサービス | 計算 + 保存 + アラート |
| PostgreSQL | 永続化 | 冪等upsert、監査サポート |
| RetrainOrchestrator | 決定オーケストレーション | 連続アラート、マルチ条件トリガー |
本番環境デプロイ推奨:
- 市場終了後T+30分で実行(データ準備を待つ)
- アラートをSlack/PagerDutyに接続
- IC/PSI/Sharpeトレンドチャートを表示するダッシュボード
- 再訓練トリガーは自動的にA/Bテストフローに入る
6. まとめ
検出方法クイックリファレンス
| 方法 | 検出対象 | 感度 | 計算コスト | 推奨シナリオ |
|---|---|---|---|---|
| パフォーマンスモニタリング | 戦略リターン | 中 | 低 | すべての戦略(必須) |
| K-S検定 | 特徴量分布 | 高 | 中 | 定期チェック(週次/月次) |
| カイ二乗検定 | カテゴリ特徴量 | 高 | 低 | 市場レジームラベル |
| CUSUM | 予測誤差 | 高 | 低 | 継続的監視(日次) |
| 総合検出 | 多次元 | 最高 | 中 | 本番環境システム(推奨) |
再訓練戦略クイックリファレンス
| 戦略 | トリガータイプ | 長所 | 短所 | 適用可能シナリオ |
|---|---|---|---|---|
| スケジュール | 時間駆動 | シンプル、予測可能 | 遅れる可能性 | 安定した市場 |
| トリガー | ドリフト駆動 | タイムリーな応答 | 高い複雑さ | 変動の激しい市場 |
| オンライン学習 | 継続的更新 | 最速の適応 | 不安定 | 高頻度シナリオ |
| ハイブリッド | スケジュール + トリガー | バランス | チューニングが必要 | 本番環境(推奨) |
重要な洞察:モデルドリフトは「もし」ではなく「いつ」の問題。堅牢な検出と再訓練メカニズムを確立することがクオンツ戦略の長期的生存の鍵。