結論:AIエージェントの長時間実行は「失敗前提の設計」と「状態永続化」がカギ。LangGraph・Temporal・Trigger.dev・Bedrock AgentCoreを用途別に使い分け、Checkpoint粒度を正しく設定することで、数十分〜数時間のタスクも安定して完走できます。
- 要点1:長時間タスクの主な失敗原因はタイムアウト・OOM・コンテキスト切れの3つ
- 要点2:LangGraph v1.0ではMemorySaverが
InMemorySaverにリネーム(2026年6月時点・公式確認済み) - 要点3:フレームワーク横断比較表で自分のスタックに合ったツールを即座に選べる
対象読者:AIエージェントを実務に導入している開発者・PM・MLエンジニア
今日やること:既存エージェントのタイムアウト上限を確認し、LangGraph InMemorySaverを使ったCheckpointを1箇所追加してみる
AIエージェントを実際に業務で動かしていると、必ず直面する壁があります。
「30分かかるデータ収集タスクを実行したら、途中でタイムアウトして最初からやり直し」「数千件のAPIコールをするバッチ処理が、800件目でネットワークエラー→全件再実行のループ」——こうした失敗を経験した開発者は少なくないはずです。
本記事では、長時間実行エージェントが失敗しやすい構造的な理由を整理した上で、Checkpoint設計・Durable Executionのパターンと、主要フレームワーク(LangGraph・Trigger.dev・Temporal・Amazon Bedrock AgentCore)の横断比較をPythonコード付きで解説します。
なお、Trigger.devの具体的な実装手順については既存の関連記事「Trigger.dev完全ガイド|AIエージェント長時間タスク実装」で詳しく扱っています。本記事は設計思想とフレームワーク横断比較に焦点を当て、棲み分けています。
なぜ長時間AIエージェントは失敗しやすいのか
「なんとなく不安定」と感じている場合でも、失敗の原因は大抵4パターンのいずれかに収まります。
原因1:環境タイムアウト
実行環境によって上限は大きく異なります。AWS Lambda(最大15分)・Vercel Edge Functions(デフォルト30秒)・一般的なHTTPリクエスト(30秒〜数分)は、長時間バッチ処理には向きません。エージェントの設計段階で「実行環境のタイムアウト上限」を必ず確認してください。
原因2:コンテキストウィンドウの上限
Anthropic Claude 3.5系は最大200Kトークン(2026年6月時点・Anthropic公式)ですが、長時間の会話やツール呼び出しが積み重なると上限に達します。モデルの文脈長を超えた時点でエージェントは「今何をしていたか」を忘れます。状態を外部に保存するCheckpoint設計はこの問題への直接的な対処です。
原因3:OOM(メモリ不足)による強制終了
大量のデータをメモリに展開しながら処理するエージェントは、使用メモリが増え続けてOOMで終了することがあります。ストリーミング処理・ページネーション・定期的なガベージコレクション呼び出しで対処しますが、根本解決には「処理済みを捨てる」設計が必要です。
原因4:ネットワーク・外部API障害
数百〜数千件のAPIコールをするエージェントは、外部サービスの一時的な障害の影響を受けやすいです。再試行ロジック(指数バックオフ)と組み合わせた冪等性設計が必須です。冪等性の実装パターンについては「AIエージェントの冪等性・タイムアウト設計ガイド」も参照してください。
Checkpoint設計の基本原則
Checkpointとは何か
Checkpointとは、エージェントの処理状態(ステップ番号・中間結果・会話履歴など)を外部ストレージに保存することです。失敗時に最後のCheckpointから再開できるため、「最初からやり直し」を防げます。
Checkpoint粒度の決め方
粒度が細かすぎるとストレージI/Oが増えてボトルネックになり、粗すぎると再実行コストが大きくなります。判断基準は以下の2軸です。
- 副作用の有無:外部への書き込み(DB更新・メール送信・API POST)の直前に必ずCheckpointを置く
- ステップの処理時間:1ステップの処理時間が実測で5〜10秒を超える場合は粒度を細かくする目安(実測値の目安・環境により異なる)
再開時の冪等性担保
Checkpointから再開するとき、同じステップを「2度実行しても問題ない」状態にしておく必要があります。
# 冪等性担保の基本パターン
# 注意: 本番環境では必ずテスト環境で動作確認後に使用してください
import hashlib
def process_with_idempotency(item_id: str, action_fn):
"""
idempotency_keyで処理済みを管理し、二重実行を防ぐ
動作環境: Python 3.10+
"""
idempotency_key = hashlib.sha256(f"action:{item_id}".encode()).hexdigest()
# すでに処理済みならスキップ
if is_already_processed(idempotency_key):
print(f"[スキップ] {item_id} は処理済みです")
return get_cached_result(idempotency_key)
# 処理実行
result = action_fn(item_id)
# 処理済みとして記録
mark_as_processed(idempotency_key, result)
return result
フレームワーク横断比較:LangGraph vs Temporal vs Trigger.dev vs Bedrock AgentCore
2026年6月時点で主要な4フレームワークを用途・特徴・向き不向きで比較します。
| フレームワーク | 言語 | Checkpoint方式 | 向いているケース | 向かないケース |
|---|---|---|---|---|
| LangGraph | Python(主) | InMemorySaver / SqliteSaver / PostgresSaver | LangChainベースの既存エージェント、グラフ型ワークフロー | マイクロサービス間をまたぐ複雑な分散処理 |
| Temporal | Python・Go・Java・TypeScript | Temporalサーバーに自動永続化 | 分散マイクロサービス間の複雑なワークフロー、金融・EC等の高信頼性要件 | シンプルなシングルエージェント処理 |
| Trigger.dev | TypeScript / Node.js | ジョブキュー方式(中断点でスリープ→自動再開) | Node.js環境のバックグラウンドジョブ、スケジューリング | Pythonベースのエージェント、複雑な状態グラフ |
| Bedrock AgentCore | 任意(マネージド) | セッション継続・最大8時間対応 | AWS環境でのエージェントデプロイ、フレームワーク非依存の長時間タスク | オンプレ環境、AWSから独立したい場合 |
選択フロー(3つの質問で決める)
- Q1. スタックはPythonかTypeScript/Node.jsか?
Python → LangGraphまたはTemporal / Node.js → Trigger.dev - Q2. マイクロサービス間をまたぐ複雑な分散処理か?
Yes → Temporal / No → LangGraph(Pythonなら) - Q3. AWS環境でフレームワーク非依存のデプロイを望むか?
Yes → Bedrock AgentCore
LangGraph InMemorySaverの実装(v1.0対応・2026年6月時点)
LangGraph v1.0ではMemorySaverがInMemorySaverにリネームされています(公式ドキュメント準拠・2026年6月確認済み)。以下は実験・開発環境向けのサンプルです。本番環境ではSqliteSaverまたはPostgresSaverを使用してください。
# LangGraph v1.0 InMemorySaver 基本実装
# 動作環境: Python 3.10+, langgraph>=1.0
# 注意: InMemorySaverはプロセス終了で状態が消える。本番はSqliteSaverかPostgresSaverを使うこと
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
step_count: int
results: list
def process_step(state: AgentState) -> AgentState:
"""1ステップの処理(ここに実際のロジックを実装)"""
step = state["step_count"]
print(f"[ステップ {step}] 処理中...")
# 実際の処理(API呼び出し等)をここに記述
result = f"ステップ{step}の結果"
return {
"step_count": step + 1,
"results": state["results"] + [result],
"messages": state["messages"]
}
def should_continue(state: AgentState) -> str:
"""処理を続けるか終了するかを判断"""
if state["step_count"] >= 5: # 5ステップで終了
return "end"
return "continue"
# グラフ構築
workflow = StateGraph(AgentState)
workflow.add_node("process", process_step)
workflow.add_conditional_edges("process", should_continue, {
"continue": "process",
"end": END
})
workflow.set_entry_point("process")
# InMemorySaverでコンパイル
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
# 実行(thread_idで状態を識別)
config = {"configurable": {"thread_id": "batch-job-001"}}
initial_state = {"messages": [], "step_count": 0, "results": []}
result = graph.invoke(initial_state, config)
print(f"完了: {len(result['results'])}件処理")
PostgresSaverで本番対応する
プロセスをまたいで状態を永続化するには、PostgresSaverを使います。
# 本番環境向け PostgresSaver
# 動作環境: Python 3.10+, langgraph>=1.0, psycopg[binary]
# 注意: 接続文字列は環境変数から読み込む。ハードコード禁止
import os
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import Connection
DB_URI = os.environ["DATABASE_URL"] # 例: postgresql://user:pass@host/db
with Connection.connect(DB_URI, autocommit=True) as conn:
checkpointer = PostgresSaver(conn)
checkpointer.setup() # テーブル初期化(初回のみ)
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "production-job-001"}}
# 初回実行
result = graph.invoke(initial_state, config)
# ---- プロセス再起動後 ----
with Connection.connect(DB_URI, autocommit=True) as conn:
checkpointer = PostgresSaver(conn)
graph = workflow.compile(checkpointer=checkpointer)
# 同じthread_idで再実行すると最後のCheckpointから再開
config = {"configurable": {"thread_id": "production-job-001"}}
state = graph.get_state(config) # 前回の状態を取得
print(f"前回のステップ数: {state.values.get('step_count', 0)}")
Temporalで分散ワークフローを実装する(Python SDK v1.28.0)
Temporalは実行状態をTemporalサーバーに自動で永続化します。プロセスが落ちても、別のWorkerが引き継いで処理を続けます。
# Temporal Python SDK v1.28.0 サンプル
# 動作環境: Python 3.10+, temporalio>=1.28.0
# 注意: Temporalサーバーの起動が必要(ローカル開発: temporal server start-dev)
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
@activity.defn
async def process_batch(batch_id: str) -> str:
"""外部API呼び出し等の実際の処理(失敗時はTemporalが自動リトライ)"""
print(f"バッチ {batch_id} を処理中...")
# 実際のAPI呼び出しや重い処理をここに
return f"バッチ {batch_id} 完了"
@workflow.defn
class LongRunningAgentWorkflow:
@workflow.run
async def run(self, total_batches: int) -> list:
results = []
for i in range(total_batches):
# 各Activityはタイムアウト・リトライを個別設定
result = await workflow.execute_activity(
process_batch,
f"batch-{i}",
start_to_close_timeout=timedelta(minutes=5), # 1バッチの上限時間
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_attempts=3
)
)
results.append(result)
return results
async def main():
client = await Client.connect("localhost:7233")
# ワークフロー開始(workflow_idで一意識別・冪等性担保)
result = await client.execute_workflow(
LongRunningAgentWorkflow.run,
args=[10], # 10バッチ処理
id="agent-workflow-001",
task_queue="agent-tasks"
)
print(f"処理結果: {result}")
設計分岐フロー:バッチ型 vs リアルタイム型
長時間タスクの設計は「バッチ処理として中断OK」か「リアルタイム応答必須」かで大きく変わります。
バッチ型(中断・再開OK)
- 対象: データ収集・分析・レポート生成・大量API呼び出し
- 設計: CheckpointごとにDBに状態保存 → 失敗時は最後の保存点から再開
- 推奨: LangGraph(PostgresSaver)/ Temporal / Bedrock AgentCore
リアルタイム型(即時応答必須)
- 対象: ユーザーが待機しているチャット・API応答
- 設計: 処理を非同期化し、結果をポーリングまたはWebhookで通知
- 推奨: タスクIDを返す→フロントがポーリング、またはBedrock AgentCoreの非同期API
Amazon Bedrock AgentCoreの長時間実行サポート(2026年3月GA)
Amazon Bedrock AgentCoreは2026年3月に一般提供(GA)となり、最大8時間の非同期ワークロードに対応しています(AWSドキュメント準拠・2026年6月時点)。
アイドルタイムアウトを回避するには、エージェントコンテナの/pingエンドポイントがHealthyBusyを返し続けることで、バックグラウンド処理の継続をAgentCoreに伝えます。フレームワーク非依存なので、LangGraph・Temporalで構築したエージェントをそのままデプロイできるのが特徴です。
よくある失敗パターンと回避策
失敗1:Checkpointの粒度が粗すぎる
「完了したらまとめて保存」という設計は、途中失敗時の再実行コストが最大になります。副作用のある処理の直前に必ずCheckpointを置く習慣をつけてください。
失敗2:再実行時に副作用が2回発生する
メール送信・DB INSERT・外部APIへのPOSTは、再実行で重複しやすい典型例です。idempotency_keyを使った処理済みチェックを必ず実装してください。関連実装パターンは「AIエージェントの冪等性・タイムアウト設計ガイド」を参照してください。
失敗3:状態管理をメモリだけに頼る
InMemorySaverはプロセス終了で状態が消えます。「試せた!」で満足せず、本番前には必ずSqliteSaverまたはPostgresSaverに切り替えてください。会話履歴の管理パターンについては「AIエージェントの状態・会話履歴管理ガイド」も参考にしてください。
失敗4:thread_idが衝突する
LangGraphのthread_idが別のジョブと衝突すると、間違った状態が引き継がれます。f"job-{uuid4()}"のようにUUIDを含めた一意なIDを使ってください。
状態スキーマの設計指針
長時間タスクの状態スキーマは「シリアライズ可能」かつ「差分更新しやすい」設計が重要です。
# 長時間タスクに適した状態スキーマ設計
# 動作環境: Python 3.10+
from typing import TypedDict, Annotated, Optional
import operator
class BatchAgentState(TypedDict):
# リスト型はoperator.addで差分追記(全量置換より効率的)
processed_ids: Annotated[list[str], operator.add]
failed_ids: Annotated[list[str], operator.add]
# スカラー値は上書き
current_cursor: Optional[str] # ページネーションのカーソル
total_count: int # 処理すべき総件数
error_count: int # エラー件数(閾値超えで停止判断)
# メタデータ
job_id: str # 一意のジョブID
started_at: str # ISO8601形式
last_checkpoint_at: Optional[str] # 最後のCheckpoint時刻
# ポイント:
# - Annotated[list, operator.add] を使うとCheckpoint間の差分だけ保存
# - スカラー値は最後の値で上書き(シンプル)
# - メタデータを含めると監視・デバッグが楽になる
コスト最適化:Checkpointのストレージコストを抑える
Checkpointを頻繁に保存するとストレージI/Oコストが上昇します。長時間タスクのコスト設計全般については「AIエージェントのコスト最適化7原則」を参照してください。
Checkpoint観点での主な対策は以下の2点です。
- Checkpointのライフサイクル管理:完了済みジョブのCheckpointは一定期間後に削除する(例:30日保持→削除)
- 圧縮:状態データが大きい場合はgzip圧縮してからDBに保存し、読み出し時に展開する
まとめ:長時間エージェント設計の3原則
- 失敗前提の設計:「必ず途中で失敗する」と仮定し、どのステップから再開できるかを最初に設計する
- 副作用の直前にCheckpoint:書き込み系の処理の直前に状態を保存し、再実行時の重複副作用を防ぐ
- フレームワークはスタック優先で選ぶ:LangGraph(Python・グラフ型)、Temporal(分散マイクロサービス)、Trigger.dev(Node.js・ジョブキュー)、Bedrock AgentCore(AWS環境)
公式ドキュメント・参考リソース
- LangGraph Persistence 公式ドキュメント(langgraph v1.0 InMemorySaver・PostgresSaver仕様・2026年6月時点)
- Temporal 公式ドキュメント(Python SDK v1.28.0 Durable Execution・Workflow/Activity設計)
- Amazon Bedrock AgentCore 長時間実行ガイド(AWS公式・最大8時間対応・2026年3月GA)
この記事を読んでAIエージェントの長時間実行設計に取り組もうと思った方へ。
Uravationでは、AIエージェント設計・導入の研修・コンサルティングを行っています。設計レビューや実装支援のご相談はお気軽にどうぞ。
