この記事でわかること
AIエージェントの多くは「リクエストを受け取り→処理→レスポンスを返す」同期モデルで動いている。しかし処理が重くなるほど、タイムアウト・スループット不足・サービス間結合の強さという3つの壁にぶつかる。
本記事では、イベント駆動アーキテクチャ(EDA)を使って非同期・疎結合のAIエージェントパイプラインを組む設計パターンを、Kafka・AWS SQS・Google Cloud Pub/Subの3経路でコード付きで解説する。最後に「いつ同期・いつ非同期か」の判断フローも示す。
- イベント駆動AIエージェントの基本構造と同期との違い
- Kafka Streams・AWS SQS+Lambda・Google Cloud Pub/Subの3経路実装例(Pythonコード付き)
- SLA・副作用・スループットで判断する「同期vs非同期」フロー
なぜ同期型エージェントでは限界があるのか
LangGraphやオーケストレーター-ワーカーパターンで実装された同期型エージェントは、リクエスト-レスポンスのHTTPサイクルを前提とする。この構造が持つ根本的な問題は3点だ。
同期型の3つの限界
1. タイムアウトの壁
LLM推論は数秒〜数十秒かかる。複数のエージェントが直列に連鎖すると、合計待ち時間がHTTPタイムアウト(多くの場合30〜60秒)を超える。ウェブ画面が返答を待ち続け、ユーザーはエラー画面を見る。
2. スループットの上限
同期処理では1リクエストが詰まると後続が全て待たされる。トラフィックが集中した瞬間に処理キューがブロックし、全エージェントが停止に近い状態になる。
3. サービス間の強結合
呼び出し元と呼び出し先が直接つながる同期構造では、下流のエージェントがダウンすると上流も道連れになる。スケールアウトも下流に強依存する。
イベント駆動アーキテクチャはこれらの問題を構造で解消する。エージェントは「誰かが読んでいるか」を気にせずイベントをトピックに書き込み、別のエージェントが非同期で取り出して処理する。サービスは疎結合で独立スケール可能になる。
なお、マルチエージェントのオーケストレーター-ワーカーパターンでは同期前提の設計が多い。本記事の非同期パターンはその補完として位置づけるとよい。
イベント駆動AIエージェントの基本構造
イベント駆動パイプラインは3つのロールで構成される。
- Producer(生産者):ユーザーリクエスト・センサー・外部APIなどがイベントをトピック/キューに書き込む
- Broker(仲介者):Kafka・SQS・Pub/Subがイベントを永続保持・配送する
- Consumer(消費者):AIエージェントがイベントを取り出して処理し、結果を別のトピックに書き込む
「タスクキューにジョブを積んで非同期処理→結果をトピックにpublish→別エージェントがsubscribeして連鎖」というパターンが基本形だ。以下のPython疑似コードでイメージをつかんでほしい。
# 基本構造のイメージ(疑似コード)
# Producer: タスクをキューに積む
def submit_task(task_payload: dict) -> str:
task_id = generate_uuid()
broker.publish(topic="agent.tasks.input", message={
"task_id": task_id,
"payload": task_payload,
"timestamp": utcnow()
})
return task_id # 即時レスポンス(処理完了は待たない)
# Consumer: 非同期でタスクを処理
def process_task(message: dict):
result = ai_agent.run(message["payload"])
broker.publish(topic="agent.tasks.output", message={
"task_id": message["task_id"],
"result": result
})
【経路1】Apache Kafka Streams でエージェントパイプラインを組む
Apache Kafka 4.0でZooKeeperが完全廃止となり、KRaftモードが標準になった(2026年2月リリースの4.2.0でさらに安定)。KRaftは1クラスターで最大150万パーティションをサポートし、大規模なイベントパイプラインに耐える。
AIエージェントとの組み合わせでは、入力トピック→エージェント処理→出力トピックの連鎖が基本パターンだ。Kafka 4.0ではQueues for Kafka(KIP-932)が導入され、従来のpub-subに加えてキュー型メッセージングも使えるようになっている(Kafka公式ドキュメント参照)。
# Kafka × AIエージェント実装例(Python / kafka-python 2.x)
from kafka import KafkaProducer, KafkaConsumer
from openai import OpenAI
import json, time
# Producer: タスクを入力トピックに書き込む
producer = KafkaProducer(
bootstrap_servers=["kafka-broker:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
def enqueue_analysis_task(user_query: str, task_id: str):
producer.send("agent.input", value={
"task_id": task_id,
"query": user_query,
"created_at": time.time(),
})
producer.flush()
print(f"Task {task_id} enqueued.")
# Consumer: 入力トピックを購読してLLMで処理
consumer = KafkaConsumer(
"agent.input",
bootstrap_servers=["kafka-broker:9092"],
group_id="agent-worker-group",
value_deserializer=lambda b: json.loads(b.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=False,
)
client = OpenAI()
for message in consumer:
task = message.value
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": task["query"]}],
)
result = response.choices[0].message.content
# 結果を出力トピックにpublish(別エージェントがsubscribeして連鎖可能)
producer.send("agent.output", value={
"task_id": task["task_id"],
"result": result,
"processed_at": time.time(),
})
consumer.commit() # 手動コミット(処理成功を確認してからオフセット更新)
except Exception as e:
# エラーはDLT(Dead Letter Topic)に退避
producer.send("agent.input.dlt", value={
"task_id": task["task_id"],
"error": str(e),
})
consumer.commit()
ポイントは手動コミットだ。LLM処理が成功した後に consumer.commit() を呼ぶことで、失敗時に同じメッセージを再処理できる。処理失敗時はDead Letter Topic(DLT)に退避して、メインフローを止めない。
【経路2】AWS SQS + Lambda でサーバーレス非同期エージェントを組む
SQSはAWSエコシステムとの親和性が高く、Lambda関数と組み合わせるだけでサーバーレスの非同期エージェントが動く。LambdaはSQSをポーリングして自動でスケールするため、インフラ管理が不要だ。
料金はスタンダードキューが100万リクエストあたり$0.40(月間最初の100万件は無料枠あり)。FIFOキューは100万リクエストあたり$0.50。詳細はAWS SQS料金ページを確認してほしい(料金は変更される場合がある)。
# AWS SQS + Lambda × AIエージェント実装例
# ① プロデューサー: SQSにタスクを投入(FastAPI等から呼ぶ想定)
import boto3, json, uuid
sqs = boto3.client("sqs", region_name="ap-northeast-1")
QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/123456789/agent-tasks"
def submit_agent_task(user_query: str) -> dict:
task_id = str(uuid.uuid4())
response = sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps({
"task_id": task_id,
"query": user_query,
}),
MessageGroupId="agent-group", # FIFOキューの場合のみ
)
return {"task_id": task_id, "message_id": response["MessageId"]}
# ② Lambda ハンドラー: SQSイベントを受け取ってLLM推論
from openai import OpenAI
client = OpenAI()
result_sqs = boto3.client("sqs", region_name="ap-northeast-1")
RESULT_QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/123456789/agent-results"
def lambda_handler(event, context):
for record in event["Records"]:
body = json.loads(record["body"])
task_id = body["task_id"]
query = body["query"]
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
max_tokens=1024,
)
result = response.choices[0].message.content
# 結果を別のSQSキューにpublish(連鎖エージェントが取り出す)
result_sqs.send_message(
QueueUrl=RESULT_QUEUE_URL,
MessageBody=json.dumps({"task_id": task_id, "result": result}),
)
except Exception as e:
# Lambda SQSトリガーは例外を投げるとDLQに自動退避(maxReceiveCount設定後)
raise e
return {"statusCode": 200, "body": "OK"}
Lambda側のSQSイベントソースマッピングでは visibility timeout を処理時間の3倍以上に設定する(LLM推論が30秒なら90秒以上)。また Dead Letter Queue(DLQ)を設定し maxReceiveCount を3〜5にしておくことで、失敗メッセージが無限ループしない。
本番デプロイ時の設定全体についてはAIエージェント本番デプロイ・サービング完全ガイドも参照してほしい。
【経路3】Google Cloud Pub/Sub でマルチエージェント連鎖を実装する
Google Cloud Pub/Subは完全マネージドで、Push配信(Cloud RunやCloud Functions を直接トリガー)とPull配信の両方に対応する。配信レイテンシは約100msで、exactly-once配信保証・デッドレターキュー・スキーマ検証をサポートする(Cloud Pub/Sub公式ドキュメント参照)。
Pub/SubはGoogle Cloudエコシステムとの相性が良く、Vertex AI・BigQuery・Cloud Runとの統合が容易だ。
# Google Cloud Pub/Sub × AIエージェント実装例
# pip install google-cloud-pubsub openai
from google.cloud import pubsub_v1
from openai import OpenAI
import json, time, base64
PROJECT_ID = "my-gcp-project"
INPUT_TOPIC = "agent-tasks-input"
OUTPUT_TOPIC = "agent-tasks-output"
SUBSCRIPTION_ID = "agent-worker-sub"
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
openai_client = OpenAI()
# ① プロデューサー: Pub/Subトピックにメッセージをpublish
def publish_task(query: str, task_id: str):
topic_path = publisher.topic_path(PROJECT_ID, INPUT_TOPIC)
message = json.dumps({"task_id": task_id, "query": query}).encode("utf-8")
future = publisher.publish(topic_path, data=message)
print(f"Published task {task_id}: message_id={future.result()}")
# ② コンシューマー: Pullサブスクリプションでメッセージを受信・処理
def process_messages():
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
def callback(message: pubsub_v1.subscriber.message.Message):
try:
body = json.loads(message.data.decode("utf-8"))
task_id = body["task_id"]
query = body["query"]
# LLM推論
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
)
result = response.choices[0].message.content
# 結果を出力トピックにpublish
out_topic = publisher.topic_path(PROJECT_ID, OUTPUT_TOPIC)
out_msg = json.dumps({"task_id": task_id, "result": result}).encode("utf-8")
publisher.publish(out_topic, data=out_msg)
message.ack() # 処理成功後にACK
except Exception as e:
print(f"Error processing task: {e}")
message.nack() # 失敗時はNACKしてデッドレターキューへ退避
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening on {subscription_path}...")
try:
streaming_pull_future.result(timeout=60)
except Exception as e:
streaming_pull_future.cancel()
if __name__ == "__main__":
process_messages()
「いつ同期・いつ非同期か」判断フロー
イベント駆動が万能というわけではない。コンテキストに応じた使い分けが重要だ。以下の3軸で判断する。
| 判断軸 | 同期(リクエスト-レスポンス)を選ぶ | 非同期(イベント駆動)を選ぶ |
|---|---|---|
| SLA(応答時間) | 3秒以内の即時応答が必要(チャットUI、検索補完) | 数秒〜数分の遅延を許容できる(バッチ処理、通知、分析) |
| 副作用 | 結果をリクエスト側が即座に使う(フォーム送信後の確認画面) | 複数システムへの伝播が必要(在庫更新→通知→ログ記録) |
| スループット | 同時リクエスト数が少なく安定(社内管理ツール) | 突発的に大量のタスクが発生する(ユーザーアップロード処理) |
実際には「フロントは同期、バックグラウンド処理は非同期」というハイブリッドが最もよく機能する。ユーザーリクエストを受け取ったら即座に task_id を返し(同期)、実際の処理はキューで非同期に進める。WebSocketやServer-Sent Eventsで処理完了を通知するパターンだ。
冪等性(同じメッセージを複数回処理しても結果が同じになる設計)についてはAIエージェントの冪等性・リトライ・タイムアウト設計ガイドを参照してほしい。
3経路の選び方まとめ
| 項目 | Apache Kafka | AWS SQS | Google Cloud Pub/Sub |
|---|---|---|---|
| 最新バージョン | 4.2(KRaft標準) | マネージド(バージョン不問) | マネージド(バージョン不問) |
| スループット | 数百万msg/秒(自己管理クラスター) | 無制限(SQS側でスケール) | 1トピックあたり最大10,000msg/秒(デフォルト) |
| 配信保証 | At-least-once / Exactly-once(設定次第) | At-least-once(FIFOはExactly-once) | At-least-once / Exactly-once(設定次第) |
| 料金モデル | セルフホスト:インフラ費用 / Confluent Cloud:従量課金 | 約$0.40/100万リクエスト(スタンダード) | 約$0.04/GB(メッセージングデータ量で課金) |
| 向いているケース | 大規模ストリーム処理、オンプレ/ハイブリッド構成 | AWSネイティブ構成、Lambda連携 | GCPネイティブ構成、Vertex AI連携 |
既存インフラがAWSならSQS+Lambda、GCPならPub/Sub+Cloud Runが最も低コストで始められる。クラウド横断・大規模ストリーム処理が要件ならKafkaを選択する。
本番運用時の注意点
イベント駆動パイプラインを本番に乗せる前に確認すべき3点を挙げる。
1. Dead Letter Queue(DLQ)の設置
処理失敗したメッセージが無限ループしないよう、全てのキュー・サブスクリプションにDLQを設定する。SQSなら maxReceiveCount を3〜5、Pub/Subなら dead_letter_policy の max_delivery_attempts を5前後に設定するのが標準だ。
2. メッセージの冪等性設計
At-least-once配信では同じメッセージが複数回届く可能性がある。task_id をDBに記録し、処理済みなら二重実行をスキップするロジックが必須だ。
3. 可観測性の確保
非同期パイプラインはデバッグが難しい。KafkaならConsumer Lag、SQSなら ApproximateNumberOfMessagesNotVisible、Pub/Subなら未配信メッセージ数をCloudMonitoringで監視する。コスト最適化の観点についてはAIエージェントのコスト最適化7原則も参考にしてほしい。
まとめ
イベント駆動AIエージェントのポイントを整理する。
- 同期型の限界:タイムアウト・スループット・強結合の3つが問題になるとき、非同期化を検討する
- Kafka:KRaft標準化(v4.0〜)で大規模ストリーム処理に強い。セルフホストが必要な場合に選択
- SQS+Lambda:AWSネイティブで最も簡単に非同期エージェントを組める。サーバーレスで運用コストが低い
- Pub/Sub:GCPエコシステムとの親和性が高く、Vertex AI連携に最適
- 判断基準:SLA・副作用・スループットの3軸で同期か非同期かを決める
この記事を読んで導入イメージが固まってきた方へ
UravationではAIエージェント導入の研修・コンサルを行っています。イベント駆動アーキテクチャを含む実装支援もお気軽にご相談ください。
