AIエージェント入門

AIエージェント長時間実行・中断再開設計ガイド2026|Checkpoint・Durable Execution比較

AIエージェント長時間実行・中断再開設計ガイド2026|Checkpoint・Durable Execution比較

この記事の結論

AIエージェントの長時間実行・中断再開を実装するための設計パターン。LangGraph・Temporal・Bedrock AgentCoreを横断比較し、Checkpoint粒度の決め方とPythonコード例を解説します。

結論:AIエージェントの長時間実行は「失敗前提の設計」と「状態永続化」がカギ。LangGraph・Temporal・Trigger.dev・Bedrock AgentCoreを用途別に使い分け、Checkpoint粒度を正しく設定することで、数十分〜数時間のタスクも安定して完走できます。

  • 要点1:長時間タスクの主な失敗原因はタイムアウト・OOM・コンテキスト切れの3つ
  • 要点2:LangGraph v1.0ではMemorySaverがInMemorySaverにリネーム(2026年6月時点・公式確認済み)
  • 要点3:フレームワーク横断比較表で自分のスタックに合ったツールを即座に選べる

対象読者:AIエージェントを実務に導入している開発者・PM・MLエンジニア

今日やること:既存エージェントのタイムアウト上限を確認し、LangGraph InMemorySaverを使ったCheckpointを1箇所追加してみる

AIエージェントを実際に業務で動かしていると、必ず直面する壁があります。

「30分かかるデータ収集タスクを実行したら、途中でタイムアウトして最初からやり直し」「数千件のAPIコールをするバッチ処理が、800件目でネットワークエラー→全件再実行のループ」——こうした失敗を経験した開発者は少なくないはずです。

本記事では、長時間実行エージェントが失敗しやすい構造的な理由を整理した上で、Checkpoint設計・Durable Executionのパターンと、主要フレームワーク(LangGraph・Trigger.dev・Temporal・Amazon Bedrock AgentCore)の横断比較をPythonコード付きで解説します。

なお、Trigger.devの具体的な実装手順については既存の関連記事「Trigger.dev完全ガイド|AIエージェント長時間タスク実装」で詳しく扱っています。本記事は設計思想とフレームワーク横断比較に焦点を当て、棲み分けています。

なぜ長時間AIエージェントは失敗しやすいのか

「なんとなく不安定」と感じている場合でも、失敗の原因は大抵4パターンのいずれかに収まります。

原因1:環境タイムアウト

実行環境によって上限は大きく異なります。AWS Lambda(最大15分)・Vercel Edge Functions(デフォルト30秒)・一般的なHTTPリクエスト(30秒〜数分)は、長時間バッチ処理には向きません。エージェントの設計段階で「実行環境のタイムアウト上限」を必ず確認してください。

原因2:コンテキストウィンドウの上限

Anthropic Claude 3.5系は最大200Kトークン(2026年6月時点・Anthropic公式)ですが、長時間の会話やツール呼び出しが積み重なると上限に達します。モデルの文脈長を超えた時点でエージェントは「今何をしていたか」を忘れます。状態を外部に保存するCheckpoint設計はこの問題への直接的な対処です。

原因3:OOM(メモリ不足)による強制終了

大量のデータをメモリに展開しながら処理するエージェントは、使用メモリが増え続けてOOMで終了することがあります。ストリーミング処理・ページネーション・定期的なガベージコレクション呼び出しで対処しますが、根本解決には「処理済みを捨てる」設計が必要です。

原因4:ネットワーク・外部API障害

数百〜数千件のAPIコールをするエージェントは、外部サービスの一時的な障害の影響を受けやすいです。再試行ロジック(指数バックオフ)と組み合わせた冪等性設計が必須です。冪等性の実装パターンについては「AIエージェントの冪等性・タイムアウト設計ガイド」も参照してください。

Checkpoint設計の基本原則

Checkpointとは何か

Checkpointとは、エージェントの処理状態(ステップ番号・中間結果・会話履歴など)を外部ストレージに保存することです。失敗時に最後のCheckpointから再開できるため、「最初からやり直し」を防げます。

Checkpoint粒度の決め方

粒度が細かすぎるとストレージI/Oが増えてボトルネックになり、粗すぎると再実行コストが大きくなります。判断基準は以下の2軸です。

  • 副作用の有無:外部への書き込み(DB更新・メール送信・API POST)の直前に必ずCheckpointを置く
  • ステップの処理時間:1ステップの処理時間が実測で5〜10秒を超える場合は粒度を細かくする目安(実測値の目安・環境により異なる)

再開時の冪等性担保

Checkpointから再開するとき、同じステップを「2度実行しても問題ない」状態にしておく必要があります。

# 冪等性担保の基本パターン
# 注意: 本番環境では必ずテスト環境で動作確認後に使用してください

import hashlib

def process_with_idempotency(item_id: str, action_fn):
    """
    idempotency_keyで処理済みを管理し、二重実行を防ぐ

    動作環境: Python 3.10+
    """
    idempotency_key = hashlib.sha256(f"action:{item_id}".encode()).hexdigest()

    # すでに処理済みならスキップ
    if is_already_processed(idempotency_key):
        print(f"[スキップ] {item_id} は処理済みです")
        return get_cached_result(idempotency_key)

    # 処理実行
    result = action_fn(item_id)

    # 処理済みとして記録
    mark_as_processed(idempotency_key, result)
    return result

フレームワーク横断比較:LangGraph vs Temporal vs Trigger.dev vs Bedrock AgentCore

2026年6月時点で主要な4フレームワークを用途・特徴・向き不向きで比較します。

フレームワーク 言語 Checkpoint方式 向いているケース 向かないケース
LangGraph Python(主) InMemorySaver / SqliteSaver / PostgresSaver LangChainベースの既存エージェント、グラフ型ワークフロー マイクロサービス間をまたぐ複雑な分散処理
Temporal Python・Go・Java・TypeScript Temporalサーバーに自動永続化 分散マイクロサービス間の複雑なワークフロー、金融・EC等の高信頼性要件 シンプルなシングルエージェント処理
Trigger.dev TypeScript / Node.js ジョブキュー方式(中断点でスリープ→自動再開) Node.js環境のバックグラウンドジョブ、スケジューリング Pythonベースのエージェント、複雑な状態グラフ
Bedrock AgentCore 任意(マネージド) セッション継続・最大8時間対応 AWS環境でのエージェントデプロイ、フレームワーク非依存の長時間タスク オンプレ環境、AWSから独立したい場合

選択フロー(3つの質問で決める)

  • Q1. スタックはPythonかTypeScript/Node.jsか?
    Python → LangGraphまたはTemporal / Node.js → Trigger.dev
  • Q2. マイクロサービス間をまたぐ複雑な分散処理か?
    Yes → Temporal / No → LangGraph(Pythonなら)
  • Q3. AWS環境でフレームワーク非依存のデプロイを望むか?
    Yes → Bedrock AgentCore

LangGraph InMemorySaverの実装(v1.0対応・2026年6月時点)

LangGraph v1.0ではMemorySaverがInMemorySaverにリネームされています(公式ドキュメント準拠・2026年6月確認済み)。以下は実験・開発環境向けのサンプルです。本番環境ではSqliteSaverまたはPostgresSaverを使用してください。

# LangGraph v1.0 InMemorySaver 基本実装
# 動作環境: Python 3.10+, langgraph>=1.0
# 注意: InMemorySaverはプロセス終了で状態が消える。本番はSqliteSaverかPostgresSaverを使うこと

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    step_count: int
    results: list

def process_step(state: AgentState) -> AgentState:
    """1ステップの処理(ここに実際のロジックを実装)"""
    step = state["step_count"]
    print(f"[ステップ {step}] 処理中...")

    # 実際の処理(API呼び出し等)をここに記述
    result = f"ステップ{step}の結果"

    return {
        "step_count": step + 1,
        "results": state["results"] + [result],
        "messages": state["messages"]
    }

def should_continue(state: AgentState) -> str:
    """処理を続けるか終了するかを判断"""
    if state["step_count"] >= 5:  # 5ステップで終了
        return "end"
    return "continue"

# グラフ構築
workflow = StateGraph(AgentState)
workflow.add_node("process", process_step)
workflow.add_conditional_edges("process", should_continue, {
    "continue": "process",
    "end": END
})
workflow.set_entry_point("process")

# InMemorySaverでコンパイル
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# 実行(thread_idで状態を識別)
config = {"configurable": {"thread_id": "batch-job-001"}}
initial_state = {"messages": [], "step_count": 0, "results": []}

result = graph.invoke(initial_state, config)
print(f"完了: {len(result['results'])}件処理")

PostgresSaverで本番対応する

プロセスをまたいで状態を永続化するには、PostgresSaverを使います。

# 本番環境向け PostgresSaver
# 動作環境: Python 3.10+, langgraph>=1.0, psycopg[binary]
# 注意: 接続文字列は環境変数から読み込む。ハードコード禁止

import os
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import Connection

DB_URI = os.environ["DATABASE_URL"]  # 例: postgresql://user:pass@host/db

with Connection.connect(DB_URI, autocommit=True) as conn:
    checkpointer = PostgresSaver(conn)
    checkpointer.setup()  # テーブル初期化(初回のみ)

    graph = workflow.compile(checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "production-job-001"}}

    # 初回実行
    result = graph.invoke(initial_state, config)

# ---- プロセス再起動後 ----
with Connection.connect(DB_URI, autocommit=True) as conn:
    checkpointer = PostgresSaver(conn)
    graph = workflow.compile(checkpointer=checkpointer)

    # 同じthread_idで再実行すると最後のCheckpointから再開
    config = {"configurable": {"thread_id": "production-job-001"}}
    state = graph.get_state(config)  # 前回の状態を取得
    print(f"前回のステップ数: {state.values.get('step_count', 0)}")

Temporalで分散ワークフローを実装する(Python SDK v1.28.0)

Temporalは実行状態をTemporalサーバーに自動で永続化します。プロセスが落ちても、別のWorkerが引き継いで処理を続けます。

# Temporal Python SDK v1.28.0 サンプル
# 動作環境: Python 3.10+, temporalio>=1.28.0
# 注意: Temporalサーバーの起動が必要(ローカル開発: temporal server start-dev)

import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

@activity.defn
async def process_batch(batch_id: str) -> str:
    """外部API呼び出し等の実際の処理(失敗時はTemporalが自動リトライ)"""
    print(f"バッチ {batch_id} を処理中...")
    # 実際のAPI呼び出しや重い処理をここに
    return f"バッチ {batch_id} 完了"

@workflow.defn
class LongRunningAgentWorkflow:
    @workflow.run
    async def run(self, total_batches: int) -> list:
        results = []
        for i in range(total_batches):
            # 各Activityはタイムアウト・リトライを個別設定
            result = await workflow.execute_activity(
                process_batch,
                f"batch-{i}",
                start_to_close_timeout=timedelta(minutes=5),  # 1バッチの上限時間
                retry_policy=RetryPolicy(
                    initial_interval=timedelta(seconds=1),
                    maximum_attempts=3
                )
            )
            results.append(result)
        return results

async def main():
    client = await Client.connect("localhost:7233")

    # ワークフロー開始(workflow_idで一意識別・冪等性担保)
    result = await client.execute_workflow(
        LongRunningAgentWorkflow.run,
        args=[10],  # 10バッチ処理
        id="agent-workflow-001",
        task_queue="agent-tasks"
    )
    print(f"処理結果: {result}")

設計分岐フロー:バッチ型 vs リアルタイム型

長時間タスクの設計は「バッチ処理として中断OK」か「リアルタイム応答必須」かで大きく変わります。

バッチ型(中断・再開OK)

  • 対象: データ収集・分析・レポート生成・大量API呼び出し
  • 設計: CheckpointごとにDBに状態保存 → 失敗時は最後の保存点から再開
  • 推奨: LangGraph(PostgresSaver)/ Temporal / Bedrock AgentCore

リアルタイム型(即時応答必須)

  • 対象: ユーザーが待機しているチャット・API応答
  • 設計: 処理を非同期化し、結果をポーリングまたはWebhookで通知
  • 推奨: タスクIDを返す→フロントがポーリング、またはBedrock AgentCoreの非同期API

Amazon Bedrock AgentCoreの長時間実行サポート(2026年3月GA)

Amazon Bedrock AgentCoreは2026年3月に一般提供(GA)となり、最大8時間の非同期ワークロードに対応しています(AWSドキュメント準拠・2026年6月時点)。

アイドルタイムアウトを回避するには、エージェントコンテナの/pingエンドポイントがHealthyBusyを返し続けることで、バックグラウンド処理の継続をAgentCoreに伝えます。フレームワーク非依存なので、LangGraph・Temporalで構築したエージェントをそのままデプロイできるのが特徴です。

よくある失敗パターンと回避策

失敗1:Checkpointの粒度が粗すぎる

「完了したらまとめて保存」という設計は、途中失敗時の再実行コストが最大になります。副作用のある処理の直前に必ずCheckpointを置く習慣をつけてください。

失敗2:再実行時に副作用が2回発生する

メール送信・DB INSERT・外部APIへのPOSTは、再実行で重複しやすい典型例です。idempotency_keyを使った処理済みチェックを必ず実装してください。関連実装パターンは「AIエージェントの冪等性・タイムアウト設計ガイド」を参照してください。

失敗3:状態管理をメモリだけに頼る

InMemorySaverはプロセス終了で状態が消えます。「試せた!」で満足せず、本番前には必ずSqliteSaverまたはPostgresSaverに切り替えてください。会話履歴の管理パターンについては「AIエージェントの状態・会話履歴管理ガイド」も参考にしてください。

失敗4:thread_idが衝突する

LangGraphのthread_idが別のジョブと衝突すると、間違った状態が引き継がれます。f"job-{uuid4()}"のようにUUIDを含めた一意なIDを使ってください。

状態スキーマの設計指針

長時間タスクの状態スキーマは「シリアライズ可能」かつ「差分更新しやすい」設計が重要です。

# 長時間タスクに適した状態スキーマ設計
# 動作環境: Python 3.10+

from typing import TypedDict, Annotated, Optional
import operator

class BatchAgentState(TypedDict):
    # リスト型はoperator.addで差分追記(全量置換より効率的)
    processed_ids: Annotated[list[str], operator.add]
    failed_ids: Annotated[list[str], operator.add]

    # スカラー値は上書き
    current_cursor: Optional[str]       # ページネーションのカーソル
    total_count: int                     # 処理すべき総件数
    error_count: int                     # エラー件数(閾値超えで停止判断)

    # メタデータ
    job_id: str                          # 一意のジョブID
    started_at: str                      # ISO8601形式
    last_checkpoint_at: Optional[str]   # 最後のCheckpoint時刻

# ポイント:
# - Annotated[list, operator.add] を使うとCheckpoint間の差分だけ保存
# - スカラー値は最後の値で上書き(シンプル)
# - メタデータを含めると監視・デバッグが楽になる

コスト最適化:Checkpointのストレージコストを抑える

Checkpointを頻繁に保存するとストレージI/Oコストが上昇します。長時間タスクのコスト設計全般については「AIエージェントのコスト最適化7原則」を参照してください。

Checkpoint観点での主な対策は以下の2点です。

  • Checkpointのライフサイクル管理:完了済みジョブのCheckpointは一定期間後に削除する(例:30日保持→削除)
  • 圧縮:状態データが大きい場合はgzip圧縮してからDBに保存し、読み出し時に展開する

まとめ:長時間エージェント設計の3原則

  1. 失敗前提の設計:「必ず途中で失敗する」と仮定し、どのステップから再開できるかを最初に設計する
  2. 副作用の直前にCheckpoint:書き込み系の処理の直前に状態を保存し、再実行時の重複副作用を防ぐ
  3. フレームワークはスタック優先で選ぶ:LangGraph(Python・グラフ型)、Temporal(分散マイクロサービス)、Trigger.dev(Node.js・ジョブキュー)、Bedrock AgentCore(AWS環境)

公式ドキュメント・参考リソース

この記事を読んでAIエージェントの長時間実行設計に取り組もうと思った方へ。

Uravationでは、AIエージェント設計・導入の研修・コンサルティングを行っています。設計レビューや実装支援のご相談はお気軽にどうぞ。

Need help moving from reading to rollout?

この記事を読んで導入イメージが固まってきた方へ

Uravationでは、AIエージェントの要件整理、PoC設計、社内導入、研修まで一気通貫で支援しています。

この記事をシェア

X Facebook LINE

※ 本記事の情報は2026年6月時点のものです。サービスの料金・仕様は変更される可能性があります。最新情報は各サービスの公式サイトをご確認ください。

関連記事