こんにちは、MNTSQでアルゴリズムエンジニアをやっている平田です。 MNTSQではAIで企業の契約業務を変革するプロダクトを開発しています。
ところでみなさん、MCP(Model Context Protocol)使っていますか?
2024年11月にAnthropicがMCPを提唱してから半年しか経っていないのに、MCPを取り巻くAIエージェント開発のエコシステムは爆発的なスピードで成長を遂げています。 (実際、この記事を書いている最中にアップデートがあって、何度か書き直しています🫠)
先日MCPがStreamable HTTPをサポートしたため、MNTSQでも自社プロダクトへのMCP導入を検討し始めました。 Streamable HTTPではサーバーをステートレスにできるので、アーキテクチャがシンプルになり、水平スケーリングが容易になります。これはMNTSQのようなSaaSでのMCP活用において非常に重要です。
この記事では、具体的なアプリケーションの実装を通じて、SaaSでの利用を想定したMCPの使い方を学びます。
アプリケーションの主な要件は次の通りです:
- MCPサーバーをステートレスにする: Streamable HTTPでステートレスなMCPサーバーを構築します。これにより、SSE(Server-Sent Events)よりもアーキテクチャがシンプルになり、保守性やスケーラビリティが向上します。
- 生成されたツールの情報を検証する: Function Callingで誤ったリソースにアクセスすることを防ぐため、ツールの自動実行を無効化して、生成されたツールの情報を検証・修正してから実行します。
- Gemini APIを使う: MNTSQの契約データを扱うには、非常に長いコンテキストウィンドウを持つGeminiが適しています。
- LangChainを使わない: LangChainは便利ですが、実際には使用しない機能が依存関係に含まれます。依存関係の脆弱性や競合によるメンテナンスコストを下げるため、シンプルかつ軽量な構成を保ちます。
アプリケーションの概要
MCPサーバーで提供するツールは何でも良いので、今回はシンプルにElasticsearchをバックエンドとするRAGアプリケーションを実装します。
リポジトリ
この記事で紹介するソースコードや実行方法はすべて次のリポジトリにあります。
アーキテクチャ
graph TD A[Application] -->|ツール実行| M[MCPサーバー<br/>(Streamable HTTP)] A -->|Function Calling| G[Gemini API] M -->|データ取得/検索| E[Elasticsearch] class A appClass class M mcpClass class G apiClass class E dbClass
処理の流れ
sequenceDiagram participant App as Application participant Gemini as Gemini API participant MCP as MCPサーバー Note over App: 初期化 App->>MCP: セッション開始 Activate MCP App->>MCP: ツール一覧取得 (list_tools) MCP-->>App: 利用可能なツール一覧<br/>(get_indices, get_mapping, search) Note over App: 初回リクエスト App->>Gemini: コンテンツ生成リクエスト<br/>(プロンプト + ツール定義) Gemini-->>App: レスポンス loop レスポンスにfunction_callが含まれる限り Note over App: Function Calling App->>MCP: ツール実行 MCP-->>App: ツール実行結果 Note over App: 2回目以降のリクエスト App->>Gemini: 次のコンテンツ生成リクエスト<br/>(履歴 + ツール実行結果) Gemini-->>App: レスポンス end Note over App: 最終回答 App->>App: 最終回答を出力 App->>MCP: セッション終了 Deactivate MCP
例えば、「昨日の売上をカテゴリ別に集計してください。」というプロンプトに対して、次のように動作します。
- セッション開始
- MCPサーバーからツール一覧を取得
- ツール
get_indices
によりインデックス一覧を取得 - ツール
get_mapping
によりkibana_sample_data_ecommerce
のマッピングを取得 - ツール
search
によりプロンプトをElasticsearchのDSLに変換して検索を実行 - 検索結果から回答を生成
回答例:
昨日のカテゴリ別の売上は以下の通りです。 * **Men's Clothing**: 3999.13 * **Women's Clothing**: 3924.91 * **Women's Shoes**: 3360.66 * **Men's Shoes**: 2197.89 * **Men's Accessories**: 1669.72 * **Women's Accessories**: 1292.59
解説
MCPサーバー
MCPサーバーの実装は、主にMCP Python SDKの公式ドキュメントを参考にしています。
MCPサーバーは、次の3つのツールを提供します。
get_indices
: インデックス一覧取得get_mapping
: マッピング取得search
: 検索
@mcp.tool(description="Elasticsearchで検索を実行するツール") def search(index: str, query_body: dict[str, Any], ctx: Context) -> Any: logger.info("search tool called") logger.debug(f"index: {index}, query: {query_body}") es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.search(index=index, body=query_body) return response @mcp.tool(description="Elasticsearchのインデックスを取得するツール") def get_indices(ctx: Context) -> Any: logger.info("get_indices tool called") es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.indices.get_alias("*") return response @mcp.tool(description="Elasticsearchで指定したインデックスのマッピングを取得するツール") def get_mapping(index: str, ctx: Context) -> Any: logger.info("get_mapping tool called") logger.debug(f"index: {index}") es_client: Elasticsearch = ctx.request_context.lifespan_context.es_client response = es_client.indices.get_mapping(index=index) return response
ステートレスを有効化するには FastMCP
で stateless_http=True
を指定します。
mcp = FastMCP(name="SearchServer", stateless_http=True, lifespan=lifespan)
MCPサーバーは FastAPI
にマウントできます。
app = FastAPI(lifespan=lifespan)
app.mount("/search", search.mcp.streamable_http_app())
MCPクライアント
MCPクライアントの実装は、主にGoogle Gen AI SDK( google-genai==1.19.0
)の公式ドキュメントを参考にしています。
Google Gen AI SDKの公式ドキュメントとの差分は次の3点です。
stdio_client
ではなくstreamablehttp_client
を使う- ツールの自動実行を無効化する
- 最終回答に至るまでFunction Callingを繰り返す
Streamable HTTPを使用するため、 streamablehttp_client
を使ってセッションを開始します。
async with streamablehttp_client( f"http://{mcp_server_host}:{mcp_server_port}/search/mcp/" ) as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize()
Function Callingのために、MCPサーバーで提供されるツールの情報と、ツールの自動実行を無効化する設定をGemini APIに渡します。
MCPサーバーで提供されるツールの情報は、 tools=[session]
で渡します。( google-genai==1.15.0
以前は types.FunctionDeclaration
オブジェクトに変換する必要がありました。)
ツールの自動実行を無効化する設定は、 automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True)
で渡します。無効化する理由は、主に認可の目的で、生成されたツールの情報を検証・修正してから実行するためです。
config = types.GenerateContentConfig( system_instruction=SYSTEM_INSTRUCTION, temperature=0, tools=[session], # type: ignore[arg-type] automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True), )
ツールの自動実行を無効化しているので、ツールは自分で実行する必要があります。ツールの実行に関する情報(ツール名と引数)は response.candidates[0].content.parts[-1].function_call
に出力されます。
Gemini APIの公式ドキュメントでは parts[0]
を使用していますが、次のように text
と function_call
が混在する場合があるため parts[-1]
を使用するほうが確実です。この順番はGoogleが保証するものではないですが、ツールの自動実行を無効化するとユーザーの types.FunctionResponse
を待ち受ける状態になるので、末尾になると考えるのが自然です。
{ "parts": [ { "text": "昨日の売上をカテゴリ別に集計するために、まず売上情報が含まれていそうなインデックスを特定する必要があります。\n利用可能なインデックスをリストアップするために、`get_indices()` を実行します。\n" }, { "function_call": { "args": {}, "name": "get_indices" } } ] }
ツールはMCPサーバーが提供しているので、MCPサーバーにリクエストしてツールを実行します。
# NOTE: ツールの引数を修正する場合は、 `function_call.args` を直接書き換える # function_call.args["index"] = "kibana_sample_data_ecommerce" result = await session.call_tool(function_call.name, arguments=function_call.args)
まとめ
この記事では、RAGの実装を通じて、Gemini Function CallingとMCP Streamable HTTPの使い方を学びました。 MCPがStreamable HTTPをサポートしたおかげで、MCPサーバーのアーキテクチャがシンプルになりました。 MCPがJSON-RPCを採用しているおかげで、生成されたツールの情報を柔軟に検証・修正することができます。
余談ですが、実装したアプリケーションを実際に動かしてみると、「売上」に対してちゃんと kibana_sample_data_ecommerce
インデックスを参照したり、DSLを教えていないのにElasticsearchという情報だけで正しくクエリを生成したりと、Geminiの生成能力に驚かされます。
MCPを取り巻くエコシステムはまだ発展途上なので、この記事の情報もすぐ古くなってしまうかもしれませんが、みなさんのAIエージェント開発の一助となれば幸いです。
もしMNTSQの仕事にご興味を持っていただけたら、ぜひ以下のページもご覧ください。