第 21 課:プロジェクト実践

目標:最初の20レッスンからの知識を統合して、ゼロから実行可能なMulti-Agent Trading Systemプロトタイプを構築する。


プロジェクト概要

このレッスンでは、エンドツーエンドのMulti-Agent Trading Systemの構築を案内します。これはおもちゃのプロジェクトではなく、実際の取引システムに拡張できるフレームワークです。

システム目標

次元目標非目標
機能市場Regimeを特定、シグナルを生成、リスクを制御、取引を実行High-frequency trading、複雑なデリバティブ
市場US株式日次頻度戦略分未満の戦略
規模10-50銘柄のポートフォリオ数千からの銘柄選択
運用ローカル開発環境 + Paper Trading直接ライブ取引

最終成果物

multi-agent-trading-system/
├── agents/
   ├── regime_agent.py      # 市場Regime検出
   ├── signal_agent.py      # シグナル生成
   ├── risk_agent.py        # リスクコントロール
   ├── execution_agent.py   # 注文実行
   └── monitor_agent.py     # システム監視
├── core/
   ├── data_manager.py      # データ管理
   ├── portfolio.py         # ポートフォリオ管理
   └── order.py             # 注文モデル
├── strategies/
   ├── momentum.py          # Momentum戦略
   └── mean_revert.py       # Mean reversion戦略
├── config/
   └── settings.yaml        # 設定ファイル
├── tests/
   └── ...                  # テストケース
└── main.py                  # エントリーポイント

21.1 システムアーキテクチャ

全体アーキテクチャ図

Multi-Agent Trading System Architecture

データフロー

Market Data ---------> Data Manager ---------> All Agents
                         |
                         v
Positions/Balance <---- Portfolio <---------- Execution Agent
                         |
                         v
Logs/Metrics --------> Monitor Agent --------> Alerts

Modular Monolith First

上記のアーキテクチャ図は、クリーンな境界を持つ個別のAgentを示しています。よくある間違いは、各Agentを独自のプロセス、メッセージキュー、データベースを持つ別々のマイクロサービスとしてすぐにデプロイすることです。これをしないでください。

Modular Monolithから始める:モジュール間でクリーンなモジュール境界と共有Protobuf契約を持つ単一プロセス。

上記のAgentは論理的な分離であり、デプロイメント境界ではありません。最初のバージョンでは、regime_agent.pysignal_agent.pyrisk_agent.pyexecution_agent.pyはすべて同じプロセスで実行され、明確に定義されたデータ契約(dataclassesまたはProtobufメッセージ)を使用して関数呼び出しを通じて通信します。

次の場合にのみ別々のサービスに抽出します:

  • スケーリングが独立したデプロイメントを必要とする(例:シグナル研究はGPUが必要だが実行は不要)
  • 異なるコンポーネントが根本的に異なるレイテンシ層を持つ(例:Rustでのサブミリ秒のリスクチェック vs. Pythonでの秒単位の研究)
  • チームサイズが独立したリリースサイクルを必要とする

重要な洞察:モジュール境界は設計決定です。サービス境界はインフラストラクチャ決定です。 最初に設計を正しくします。早すぎる分割は、ネットワークレイテンシ、分散デバッグの複雑さ、運用オーバーヘッドを追加します - いずれも戦略が機能するかどうかを検証するのに役立ちません。


21.2 ステップバイステップ実装

ステップ1:Data Manager

目標:市場データを取得して管理する。

Data Manager責任:
- 履歴相場を取得(バックテスト用)
- リアルタイム相場を取得(ライブ取引用)
- データクリーニングと標準化
- テクニカル指標を計算

紙設計:Data Interface

メソッド入力出力目的
get_historysymbol, daysDataFrame履歴データを取得
get_latestsymbolsDict最新価格を取得
calculate_indicatorsdfdf with indicatorsテクニカル指標を計算
validatedfbool, errorsデータ品質チェック
コードフレームワーク(エンジニアリファレンス)
import yfinance as yf
import pandas as pd
import numpy as np
from typing import List, Dict, Optional

class DataManager:
    """Data Manager"""

    def __init__(self, cache_dir: str = "./data_cache"):
        self.cache_dir = cache_dir

    def get_history(
        self,
        symbol: str,
        days: int = 252,
        end_date: Optional[str] = None
    ) -> pd.DataFrame:
        """履歴市場データを取得"""
        ticker = yf.Ticker(symbol)
        df = ticker.history(period=f"{days}d")

        # 列名を標準化
        df = df.rename(columns={
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Volume": "volume"
        })

        return df[["open", "high", "low", "close", "volume"]]

    def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """テクニカル指標を計算"""
        # 移動平均
        df["sma_20"] = df["close"].rolling(20).mean()
        df["sma_50"] = df["close"].rolling(50).mean()

        # ボラティリティ
        df["volatility"] = df["close"].pct_change().rolling(20).std() * np.sqrt(252)

        # RSI
        delta = df["close"].diff()
        gain = delta.where(delta > 0, 0).rolling(14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        df["rsi"] = 100 - (100 / (1 + gain / loss))

        # ATR
        high_low = df["high"] - df["low"]
        high_close = (df["high"] - df["close"].shift()).abs()
        low_close = (df["low"] - df["close"].shift()).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        df["atr"] = tr.rolling(14).mean()

        return df

    def validate(self, df: pd.DataFrame) -> tuple:
        """データ品質チェック"""
        errors = []

        if df.empty:
            errors.append("DataFrame is empty")

        if df["close"].isnull().any():
            errors.append(f"Missing close prices: {df['close'].isnull().sum()}")

        if (df["close"] <= 0).any():
            errors.append("Invalid prices (<=0)")

        return len(errors) == 0, errors

ステップ2:Regime Agent

目標:現在の市場Regimeを特定する。

Regime Agent出力:
- regime: "trending" | "mean_reverting" | "crisis" | "uncertain"
- confidence: 0.0 - 1.0
- regime_weights: {"trending": 0.6, "mean_reverting": 0.3, "crisis": 0.1}

紙設計:Regime検出ルール

条件状態重み配分
ADX >25 and Vol < 25%TrendingTrend 80%、Mean reversion 15%、Defensive 5%
ADX < 20 and Vol < 20%Range-boundTrend 20%、Mean reversion 70%、Defensive 10%
Vol >30% and Corr >0.7CrisisTrend 10%、Mean reversion 10%、Defensive 80%
その他Uncertain各33%
コードフレームワーク(エンジニアリファレンス)
from dataclasses import dataclass
from typing import Dict

@dataclass
class RegimeState:
    regime: str
    confidence: float
    weights: Dict[str, float]

class RegimeAgent:
    """市場Regime検出Agent"""

    def __init__(self, config: dict = None):
        self.config = config or {}
        self.adx_threshold = self.config.get("adx_threshold", 25)
        self.vol_crisis = self.config.get("vol_crisis", 0.30)

    def detect(self, market_data: dict) -> RegimeState:
        """
        市場Regimeを検出

        market_data: {
            "adx": float,
            "volatility": float,
            "correlation": float,
            "trend_strength": float
        }
        """
        adx = market_data.get("adx", 20)
        vol = market_data.get("volatility", 0.15)
        corr = market_data.get("correlation", 0.5)

        # Crisis検出が優先
        if vol > self.vol_crisis and corr > 0.7:
            return RegimeState(
                regime="crisis",
                confidence=0.8,
                weights={"momentum": 0.1, "mean_revert": 0.1, "defensive": 0.8}
            )

        # Trend検出
        if adx > self.adx_threshold and vol < 0.25:
            return RegimeState(
                regime="trending",
                confidence=0.7,
                weights={"momentum": 0.7, "mean_revert": 0.2, "defensive": 0.1}
            )

        # Range-bound検出
        if adx < 20 and vol < 0.20:
            return RegimeState(
                regime="mean_reverting",
                confidence=0.6,
                weights={"momentum": 0.2, "mean_revert": 0.7, "defensive": 0.1}
            )

        # Uncertain
        return RegimeState(
            regime="uncertain",
            confidence=0.3,
            weights={"momentum": 0.33, "mean_revert": 0.33, "defensive": 0.34}
        )

ステップ3:Signal Agent

目標:戦略に基づいて取引シグナルを生成する。

Signal Agent出力:
- signals: [
    {"symbol": "AAPL", "direction": "long", "strength": 0.7, "source": "momentum"},
    {"symbol": "MSFT", "direction": "short", "strength": 0.5, "source": "mean_revert"}
  ]

紙設計:シグナル集約ロジック

シナリオ処理
単一戦略シグナル直接出力、strength = 戦略シグナル x Regime重み
複数戦略が一致シグナル強度を強化、加重平均を取る
複数戦略が衝突より高い重みの戦略を取る、または取引しない
コードフレームワーク(エンジニアリファレンス)
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class Signal:
    symbol: str
    direction: str  # "long" | "short" | "close"
    strength: float  # 0.0 - 1.0
    source: str
    timestamp: str

class SignalAgent:
    """シグナル生成Agent"""

    def __init__(self, strategies: list):
        self.strategies = strategies

    def generate_signals(
        self,
        market_data: dict,
        regime_weights: dict
    ) -> List[Signal]:
        """すべての戦略からシグナルを集約"""
        all_signals = {}

        for strategy in self.strategies:
            strategy_name = strategy.name
            weight = regime_weights.get(strategy_name, 0.33)

            raw_signals = strategy.generate(market_data)

            for sig in raw_signals:
                key = (sig.symbol, sig.direction)
                weighted_strength = sig.strength * weight

                if key in all_signals:
                    # 同じ方向のシグナルをスタック
                    all_signals[key].strength += weighted_strength
                else:
                    all_signals[key] = Signal(
                        symbol=sig.symbol,
                        direction=sig.direction,
                        strength=weighted_strength,
                        source=strategy_name,
                        timestamp=sig.timestamp
                    )

        # 弱いシグナルをフィルター
        return [s for s in all_signals.values() if s.strength > 0.3]

ステップ4:Risk Agent

目標:シグナルをレビュー、リスクをコントロールする。

Risk Agent決定:
- APPROVE: 合格、実行可能
- REDUCE: 合格だがポジションサイズを削減
- REJECT: 拒否

紙設計:レビュールール

ルールチェック内容トリガーアクション
単一取引制限ポジション >10%10%に削減
シンボル制限同じシンボル >20%拒否または削減
総ポジション制限総エクスポージャー >80%拒否
Drawdownコントロール現在のDrawdown >10%すべての新規ポジションを拒否
Circuit breakerDrawdown >15%強制デレバレッジ
コードフレームワーク(エンジニアリファレンス)
from enum import Enum
from dataclasses import dataclass
from typing import Optional

class Decision(Enum):
    APPROVE = "approve"
    REDUCE = "reduce"
    REJECT = "reject"

@dataclass
class RiskDecision:
    decision: Decision
    reason: str
    adjusted_size: Optional[float] = None

class RiskAgent:
    """Risk Control Agent"""

    def __init__(self, config: dict):
        self.max_single = config.get("max_single_position", 0.10)
        self.max_symbol = config.get("max_symbol_exposure", 0.20)
        self.max_total = config.get("max_total_exposure", 0.80)
        self.drawdown_stop = config.get("drawdown_stop", 0.10)
        self.drawdown_circuit = config.get("drawdown_circuit", 0.15)

    def check(
        self,
        signal: Signal,
        proposed_size: float,
        portfolio: dict,
        current_drawdown: float
    ) -> RiskDecision:
        """取引シグナルをレビュー"""

        # Circuit breakerチェック
        if current_drawdown >= self.drawdown_circuit:
            return RiskDecision(Decision.REJECT, "Circuit breaker active")

        # Drawdownコントロール
        if current_drawdown >= self.drawdown_stop:
            if signal.direction != "close":
                return RiskDecision(Decision.REJECT, "Drawdown limit reached")

        # 単一取引制限
        if proposed_size > self.max_single:
            return RiskDecision(
                Decision.REDUCE,
                f"Size exceeds single limit",
                adjusted_size=self.max_single
            )

        # シンボル制限
        current_exposure = portfolio.get(signal.symbol, 0)
        if current_exposure + proposed_size > self.max_symbol:
            allowed = self.max_symbol - current_exposure
            if allowed <= 0:
                return RiskDecision(Decision.REJECT, "Symbol limit reached")
            return RiskDecision(
                Decision.REDUCE,
                "Symbol limit",
                adjusted_size=allowed
            )

        # 総エクスポージャー制限
        total_exposure = sum(portfolio.values()) + proposed_size
        if total_exposure > self.max_total:
            return RiskDecision(Decision.REJECT, "Total exposure limit")

        return RiskDecision(Decision.APPROVE, "Passed all checks")

ステップ5:Execution Agent

目標:取引注文を実行する。

Execution Agent責任:
- シグナルを注文に変換
- 注文タイプを選択(market/limit)
- 注文ステータスを管理
- 実行結果を記録
コードフレームワーク(エンジニアリファレンス)
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import uuid

class OrderStatus(Enum):
    PENDING = "pending"
    SUBMITTED = "submitted"
    FILLED = "filled"
    CANCELLED = "cancelled"
    REJECTED = "rejected"

@dataclass
class Order:
    order_id: str
    symbol: str
    side: str  # "buy" | "sell"
    quantity: int
    order_type: str  # "market" | "limit"
    limit_price: float = None
    status: OrderStatus = OrderStatus.PENDING
    filled_price: float = None
    filled_time: str = None

class ExecutionAgent:
    """注文実行Agent"""

    def __init__(self, broker_client=None, is_paper: bool = True):
        self.broker = broker_client
        self.is_paper = is_paper
        self.orders = {}

    def create_order(
        self,
        symbol: str,
        direction: str,
        size: float,
        portfolio_value: float,
        current_price: float
    ) -> Order:
        """注文を作成"""
        # 株数を計算
        dollar_amount = portfolio_value * size
        quantity = int(dollar_amount / current_price)

        if quantity <= 0:
            return None

        side = "buy" if direction == "long" else "sell"

        order = Order(
            order_id=str(uuid.uuid4())[:8],
            symbol=symbol,
            side=side,
            quantity=quantity,
            order_type="market"
        )

        self.orders[order.order_id] = order
        return order

    def submit_order(self, order: Order) -> bool:
        """注文を送信"""
        if self.is_paper:
            # フィルをシミュレート
            order.status = OrderStatus.FILLED
            order.filled_time = datetime.now().isoformat()
            # Paper tradingは現在価格でのフィルを仮定
            return True

        # ライブ取引はブローカーAPIを呼び出す
        # response = self.broker.submit_order(order)
        # ...
        return True

ステップ6:Mainループ

目標:すべてのAgentを一緒に接続する。

# Mainループ疑似コード
def main_loop():
    # 1. データを取得
    market_data = data_manager.get_latest(symbols)

    # 2. Regimeを検出
    regime_state = regime_agent.detect(market_data)

    # 3. シグナルを生成
    signals = signal_agent.generate(market_data, regime_state.weights)

    # 4. リスクレビュー
    for signal in signals:
        proposed_size = calculate_position_size(signal)
        decision = risk_agent.check(signal, proposed_size, portfolio, drawdown)

        if decision.decision == Decision.APPROVE:
            size = proposed_size
        elif decision.decision == Decision.REDUCE:
            size = decision.adjusted_size
        else:
            continue

        # 5. 注文を実行
        order = execution_agent.create_order(
            signal.symbol, signal.direction, size, portfolio_value, current_price
        )
        execution_agent.submit_order(order)

    # 6. ポートフォリオを更新
    portfolio.update()

    # 7. ログ
    monitor_agent.log_daily_summary()

教育コードから本番システムへ

このレッスンは、アクセシビリティのためにすべてのAgentをPythonで実装しています。本番では、異なるコンポーネントは通常、レイテンシ要件に基づいて異なる言語を使用します:

  • Risk Engine -- Rust(不変のハードコードされた制限、ナノ秒レイテンシ)
  • Order Management -- Go(高並行性、goroutineモデル)
  • Research/ML -- Python(豊富なMLエコシステム)

重要なのは言語の選択ではなく、クリーンなモジュール境界と標準化された通信プロトコル(例:gRPC + Protobuf)です。PythonプロトタイプがAgent間に明確に定義されたインターフェースを持っている場合、パフォーマンスクリティカルなパスをRustやGoに後で移行することは、書き直しではなくリファクタリングです。


21.3 Backtest検証

Backtestingフレームワーク

Backtestingは戦略が機能することを証明するためではなく、戦略の問題を発見するためです。

Backtestチェックリスト(レッスン7 Quality Gate参照):

チェック項目合格基準
先見バイアスなしすべてのデータがT+1で利用可能
現実的なコストモデリングスリッページ、手数料を含む
Out-of-sample検証OOS Sharpe > IS Sharpe x 0.7
パラメータの安定性パラメータ+/-20%で戦略が崩壊しない
極端なシナリオテスト2008、2020を別々にテスト

主要Backtestメトリクス

メトリクス目標警告レベル
年次リターン>15%< 10%
Sharpe Ratio>1.0< 0.5
Max Drawdown< 20%>30%
Win Rate>50%< 40%
Profit/Loss Ratio>1.5< 1.0
Turnover< 500%/年>1000%/年

21.4 PaperからLive Tradingへ

段階的検証

ステージ1:Historical Backtest(2-4週間)
├── 戦略ロジックが正しいことを検証
├── システムが完全に実行できることを検証
└── 目標:Backtestメトリクスが基準を満たす

ステージ2:Paper Trading(2-4週間)
├── 実際の相場、偽の取引を使用
├── リアルタイムデータ処理を検証
└── 目標:実行がエラーフリー

ステージ3:Small Capital Live(4-8週間)
├── 計画資本の5-10%をデプロイ
├── 実際の実行を検証
└── 目標:ライブパフォーマンスがBacktestに近い

ステージ4:Gradual Scale-Up(継続中)
├── 月次評価、段階的に資本を増やす
├── 継続的な監視と調整
└── 目標:安定した収益性

Pre-Liveチェックリスト

[ ] システムチェック
  +-- すべてのAgentプロセスが安定して &gt;1週間実行
  +-- アラートシステムがテスト済み
  +-- ログが完全
  +-- 災害復旧プロセスが検証済み

[ ] 戦略チェック
  +-- BacktestがQuality Gateを通過
  +-- Paper trading &gt;2週間
  +-- スリッページ見積もりが合理的
  +-- 極端なシナリオの緊急時計画

[ ] 運用チェック
  +-- 市場前/後のチェックリストが準備完了
  +-- 連絡先情報が更新済み
  +-- バックアッププランが準備済み
  +-- 資本移転が確認済み

[ ] 精神的準備
  +-- リスク許容度が明確
  +-- 明確なストップロスラインを設定
  +-- Drawdownに直面する準備ができている

21.5 プロジェクト拡張方向

オプションの拡張機能

方向説明複雑さ
Multi-marketA株、HK株、暗号通貨をサポート
Multi-timeframe分レベルの戦略をサポート
LLM統合ニュース分析、調査レポート解析
Online Learning戦略自動進化
Webインターフェースビジュアル監視
分散デプロイメントマルチサーバー運用

継続的改善サイクル

+------------------------------------------------------------+
|                                                            |
|    Monitor ------> 問題を発見 ------> 原因を分析           |
|      ^                                          |          |
|      |                                          v          |
|    Deploy <------- テスト検証 <------- 改善計画            |
|                                                            |
+------------------------------------------------------------+

合格基準

プロジェクト合格チェックリスト

ステージ基準合格標準
コードシステムが実行されるpython main.py エラーなし
すべてのAgentが動作ログがAgent出力を表示
設定が変更可能settings.yamlへの変更が有効
BacktestBacktestが実行されるBacktestレポートを生成
メトリクスが正しく計算される2-3取引を手動で検証
明らかなバグなしBacktest曲線が合理的
Paper Tradingリアルタイムデータを取得できるログが最新価格を表示
シグナルを生成できるシグナルリストが空でない
リスクコントロールが機能制限超過注文が拒否される
ドキュメンテーションREADMEがある他の人が実行方法を理解できる
設定ドキュメントがあるパラメータの意味が明確

自己評価質問

プロジェクトを完了した後、これらの質問に答えてください:

  1. あなたのシステムはどの市場Regimeで最高/最悪のパフォーマンスを発揮しますか?
  2. 最大のDrawdownの原因は何でしたか?
  3. 明日ライブに移行する場合、最も心配することは何ですか?
  4. システムのどの側面がまだ改善できますか?

レッスン成果物

このレッスンを完了すると、以下が得られます:

  1. 完全なコードフレームワーク - 実行可能なMulti-Agentシステム
  2. Backtest検証結果 - 戦略パフォーマンス評価
  3. Pre-Liveチェックリスト - PaperからLive Tradingへのパス
  4. 拡張方向ガイド - 将来の改善のためのアイデア

重要なポイント

  • Multi-Agentシステム全体のアーキテクチャを理解
  • 各Agentの責任とインターフェース設計をマスター
  • 最初の20レッスンからの知識を1つのシステムに統合できる
  • BacktestからLive Tradingへの検証パスを理解

さらなる読み物


次回レッスンプレビュー

レッスン22:SummaryとAdvanced Directions

このコースはまもなく終了します。最後のレッスンでは、全体の学習の旅をレビューし、コアインサイトをまとめ、高度な方向を展望します - 個人プロジェクトからキャリア開発、技術的深さから業界の視点まで。

この章を引用する
Zhang, Wayland (2026). 第 21 課:プロジェクト実装. In AIクオンツ取引:ゼロからイチへ. https://waylandz.com/quant-book-ja/Lesson-21-Project-Implementation
@incollection{zhang2026quant_Lesson_21_Project_Implementation,
  author = {Zhang, Wayland},
  title = {第 21 課:プロジェクト実装},
  booktitle = {AIクオンツ取引:ゼロからイチへ},
  year = {2026},
  url = {https://waylandz.com/quant-book-ja/Lesson-21-Project-Implementation}
}