AIエージェント入門

イベント駆動AIエージェント設計ガイド2026|Kafka・SQS・Pub-Subで非同期パイプラインを組む

イベント駆動AIエージェント設計ガイド2026|Kafka・SQS・Pub-Subで非同期パイプラインを組む

この記事の結論

Kafka・AWS SQS・Google Cloud Pub/Subを使ったイベント駆動AIエージェントの非同期パイプライン設計を、Pythonコード付きで解説。同期vs非同期の判断フローも紹介します。

この記事でわかること

AIエージェントの多くは「リクエストを受け取り→処理→レスポンスを返す」同期モデルで動いている。しかし処理が重くなるほど、タイムアウト・スループット不足・サービス間結合の強さという3つの壁にぶつかる。

本記事では、イベント駆動アーキテクチャ(EDA)を使って非同期・疎結合のAIエージェントパイプラインを組む設計パターンを、Kafka・AWS SQS・Google Cloud Pub/Subの3経路でコード付きで解説する。最後に「いつ同期・いつ非同期か」の判断フローも示す。

  • イベント駆動AIエージェントの基本構造と同期との違い
  • Kafka Streams・AWS SQS+Lambda・Google Cloud Pub/Subの3経路実装例(Pythonコード付き)
  • SLA・副作用・スループットで判断する「同期vs非同期」フロー

なぜ同期型エージェントでは限界があるのか

LangGraphやオーケストレーター-ワーカーパターンで実装された同期型エージェントは、リクエスト-レスポンスのHTTPサイクルを前提とする。この構造が持つ根本的な問題は3点だ。

同期型の3つの限界

1. タイムアウトの壁
LLM推論は数秒〜数十秒かかる。複数のエージェントが直列に連鎖すると、合計待ち時間がHTTPタイムアウト(多くの場合30〜60秒)を超える。ウェブ画面が返答を待ち続け、ユーザーはエラー画面を見る。

2. スループットの上限
同期処理では1リクエストが詰まると後続が全て待たされる。トラフィックが集中した瞬間に処理キューがブロックし、全エージェントが停止に近い状態になる。

3. サービス間の強結合
呼び出し元と呼び出し先が直接つながる同期構造では、下流のエージェントがダウンすると上流も道連れになる。スケールアウトも下流に強依存する。

イベント駆動アーキテクチャはこれらの問題を構造で解消する。エージェントは「誰かが読んでいるか」を気にせずイベントをトピックに書き込み、別のエージェントが非同期で取り出して処理する。サービスは疎結合で独立スケール可能になる。

なお、マルチエージェントのオーケストレーター-ワーカーパターンでは同期前提の設計が多い。本記事の非同期パターンはその補完として位置づけるとよい。

イベント駆動AIエージェントの基本構造

イベント駆動パイプラインは3つのロールで構成される。

  • Producer(生産者):ユーザーリクエスト・センサー・外部APIなどがイベントをトピック/キューに書き込む
  • Broker(仲介者):Kafka・SQS・Pub/Subがイベントを永続保持・配送する
  • Consumer(消費者):AIエージェントがイベントを取り出して処理し、結果を別のトピックに書き込む

「タスクキューにジョブを積んで非同期処理→結果をトピックにpublish→別エージェントがsubscribeして連鎖」というパターンが基本形だ。以下のPython疑似コードでイメージをつかんでほしい。

# 基本構造のイメージ(疑似コード)
# Producer: タスクをキューに積む
def submit_task(task_payload: dict) -> str:
    task_id = generate_uuid()
    broker.publish(topic="agent.tasks.input", message={
        "task_id": task_id,
        "payload": task_payload,
        "timestamp": utcnow()
    })
    return task_id  # 即時レスポンス(処理完了は待たない)

# Consumer: 非同期でタスクを処理
def process_task(message: dict):
    result = ai_agent.run(message["payload"])
    broker.publish(topic="agent.tasks.output", message={
        "task_id": message["task_id"],
        "result": result
    })

【経路1】Apache Kafka Streams でエージェントパイプラインを組む

Apache Kafka 4.0でZooKeeperが完全廃止となり、KRaftモードが標準になった(2026年2月リリースの4.2.0でさらに安定)。KRaftは1クラスターで最大150万パーティションをサポートし、大規模なイベントパイプラインに耐える。

AIエージェントとの組み合わせでは、入力トピック→エージェント処理→出力トピックの連鎖が基本パターンだ。Kafka 4.0ではQueues for Kafka(KIP-932)が導入され、従来のpub-subに加えてキュー型メッセージングも使えるようになっている(Kafka公式ドキュメント参照)。

# Kafka × AIエージェント実装例(Python / kafka-python 2.x)
from kafka import KafkaProducer, KafkaConsumer
from openai import OpenAI
import json, time

# Producer: タスクを入力トピックに書き込む
producer = KafkaProducer(
    bootstrap_servers=["kafka-broker:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

def enqueue_analysis_task(user_query: str, task_id: str):
    producer.send("agent.input", value={
        "task_id": task_id,
        "query": user_query,
        "created_at": time.time(),
    })
    producer.flush()
    print(f"Task {task_id} enqueued.")

# Consumer: 入力トピックを購読してLLMで処理
consumer = KafkaConsumer(
    "agent.input",
    bootstrap_servers=["kafka-broker:9092"],
    group_id="agent-worker-group",
    value_deserializer=lambda b: json.loads(b.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=False,
)

client = OpenAI()

for message in consumer:
    task = message.value
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": task["query"]}],
        )
        result = response.choices[0].message.content

        # 結果を出力トピックにpublish(別エージェントがsubscribeして連鎖可能)
        producer.send("agent.output", value={
            "task_id": task["task_id"],
            "result": result,
            "processed_at": time.time(),
        })
        consumer.commit()  # 手動コミット(処理成功を確認してからオフセット更新)

    except Exception as e:
        # エラーはDLT(Dead Letter Topic)に退避
        producer.send("agent.input.dlt", value={
            "task_id": task["task_id"],
            "error": str(e),
        })
        consumer.commit()

ポイントは手動コミットだ。LLM処理が成功した後に consumer.commit() を呼ぶことで、失敗時に同じメッセージを再処理できる。処理失敗時はDead Letter Topic(DLT)に退避して、メインフローを止めない。

【経路2】AWS SQS + Lambda でサーバーレス非同期エージェントを組む

SQSはAWSエコシステムとの親和性が高く、Lambda関数と組み合わせるだけでサーバーレスの非同期エージェントが動く。LambdaはSQSをポーリングして自動でスケールするため、インフラ管理が不要だ。

料金はスタンダードキューが100万リクエストあたり$0.40(月間最初の100万件は無料枠あり)。FIFOキューは100万リクエストあたり$0.50。詳細はAWS SQS料金ページを確認してほしい(料金は変更される場合がある)。

# AWS SQS + Lambda × AIエージェント実装例

# ① プロデューサー: SQSにタスクを投入(FastAPI等から呼ぶ想定)
import boto3, json, uuid

sqs = boto3.client("sqs", region_name="ap-northeast-1")
QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/123456789/agent-tasks"

def submit_agent_task(user_query: str) -> dict:
    task_id = str(uuid.uuid4())
    response = sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({
            "task_id": task_id,
            "query": user_query,
        }),
        MessageGroupId="agent-group",  # FIFOキューの場合のみ
    )
    return {"task_id": task_id, "message_id": response["MessageId"]}

# ② Lambda ハンドラー: SQSイベントを受け取ってLLM推論
from openai import OpenAI

client = OpenAI()
result_sqs = boto3.client("sqs", region_name="ap-northeast-1")
RESULT_QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/123456789/agent-results"

def lambda_handler(event, context):
    for record in event["Records"]:
        body = json.loads(record["body"])
        task_id = body["task_id"]
        query = body["query"]

        try:
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": query}],
                max_tokens=1024,
            )
            result = response.choices[0].message.content

            # 結果を別のSQSキューにpublish(連鎖エージェントが取り出す)
            result_sqs.send_message(
                QueueUrl=RESULT_QUEUE_URL,
                MessageBody=json.dumps({"task_id": task_id, "result": result}),
            )
        except Exception as e:
            # Lambda SQSトリガーは例外を投げるとDLQに自動退避(maxReceiveCount設定後)
            raise e

    return {"statusCode": 200, "body": "OK"}

Lambda側のSQSイベントソースマッピングでは visibility timeout を処理時間の3倍以上に設定する(LLM推論が30秒なら90秒以上)。また Dead Letter Queue(DLQ)を設定し maxReceiveCount を3〜5にしておくことで、失敗メッセージが無限ループしない。

本番デプロイ時の設定全体についてはAIエージェント本番デプロイ・サービング完全ガイドも参照してほしい。

【経路3】Google Cloud Pub/Sub でマルチエージェント連鎖を実装する

Google Cloud Pub/Subは完全マネージドで、Push配信(Cloud RunやCloud Functions を直接トリガー)とPull配信の両方に対応する。配信レイテンシは約100msで、exactly-once配信保証・デッドレターキュー・スキーマ検証をサポートする(Cloud Pub/Sub公式ドキュメント参照)。

Pub/SubはGoogle Cloudエコシステムとの相性が良く、Vertex AI・BigQuery・Cloud Runとの統合が容易だ。

# Google Cloud Pub/Sub × AIエージェント実装例
# pip install google-cloud-pubsub openai

from google.cloud import pubsub_v1
from openai import OpenAI
import json, time, base64

PROJECT_ID = "my-gcp-project"
INPUT_TOPIC = "agent-tasks-input"
OUTPUT_TOPIC = "agent-tasks-output"
SUBSCRIPTION_ID = "agent-worker-sub"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
openai_client = OpenAI()

# ① プロデューサー: Pub/Subトピックにメッセージをpublish
def publish_task(query: str, task_id: str):
    topic_path = publisher.topic_path(PROJECT_ID, INPUT_TOPIC)
    message = json.dumps({"task_id": task_id, "query": query}).encode("utf-8")
    future = publisher.publish(topic_path, data=message)
    print(f"Published task {task_id}: message_id={future.result()}")

# ② コンシューマー: Pullサブスクリプションでメッセージを受信・処理
def process_messages():
    subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)

    def callback(message: pubsub_v1.subscriber.message.Message):
        try:
            body = json.loads(message.data.decode("utf-8"))
            task_id = body["task_id"]
            query = body["query"]

            # LLM推論
            response = openai_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": query}],
            )
            result = response.choices[0].message.content

            # 結果を出力トピックにpublish
            out_topic = publisher.topic_path(PROJECT_ID, OUTPUT_TOPIC)
            out_msg = json.dumps({"task_id": task_id, "result": result}).encode("utf-8")
            publisher.publish(out_topic, data=out_msg)

            message.ack()  # 処理成功後にACK

        except Exception as e:
            print(f"Error processing task: {e}")
            message.nack()  # 失敗時はNACKしてデッドレターキューへ退避

    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print(f"Listening on {subscription_path}...")
    try:
        streaming_pull_future.result(timeout=60)
    except Exception as e:
        streaming_pull_future.cancel()

if __name__ == "__main__":
    process_messages()

「いつ同期・いつ非同期か」判断フロー

イベント駆動が万能というわけではない。コンテキストに応じた使い分けが重要だ。以下の3軸で判断する。

判断軸 同期(リクエスト-レスポンス)を選ぶ 非同期(イベント駆動)を選ぶ
SLA(応答時間) 3秒以内の即時応答が必要(チャットUI、検索補完) 数秒〜数分の遅延を許容できる(バッチ処理、通知、分析)
副作用 結果をリクエスト側が即座に使う(フォーム送信後の確認画面) 複数システムへの伝播が必要(在庫更新→通知→ログ記録)
スループット 同時リクエスト数が少なく安定(社内管理ツール) 突発的に大量のタスクが発生する(ユーザーアップロード処理)

実際には「フロントは同期、バックグラウンド処理は非同期」というハイブリッドが最もよく機能する。ユーザーリクエストを受け取ったら即座に task_id を返し(同期)、実際の処理はキューで非同期に進める。WebSocketやServer-Sent Eventsで処理完了を通知するパターンだ。

冪等性(同じメッセージを複数回処理しても結果が同じになる設計)についてはAIエージェントの冪等性・リトライ・タイムアウト設計ガイドを参照してほしい。

3経路の選び方まとめ

項目 Apache Kafka AWS SQS Google Cloud Pub/Sub
最新バージョン 4.2(KRaft標準) マネージド(バージョン不問) マネージド(バージョン不問)
スループット 数百万msg/秒(自己管理クラスター) 無制限(SQS側でスケール) 1トピックあたり最大10,000msg/秒(デフォルト)
配信保証 At-least-once / Exactly-once(設定次第) At-least-once(FIFOはExactly-once) At-least-once / Exactly-once(設定次第)
料金モデル セルフホスト:インフラ費用 / Confluent Cloud:従量課金 約$0.40/100万リクエスト(スタンダード) 約$0.04/GB(メッセージングデータ量で課金)
向いているケース 大規模ストリーム処理、オンプレ/ハイブリッド構成 AWSネイティブ構成、Lambda連携 GCPネイティブ構成、Vertex AI連携

既存インフラがAWSならSQS+Lambda、GCPならPub/Sub+Cloud Runが最も低コストで始められる。クラウド横断・大規模ストリーム処理が要件ならKafkaを選択する。

本番運用時の注意点

イベント駆動パイプラインを本番に乗せる前に確認すべき3点を挙げる。

1. Dead Letter Queue(DLQ)の設置
処理失敗したメッセージが無限ループしないよう、全てのキュー・サブスクリプションにDLQを設定する。SQSなら maxReceiveCount を3〜5、Pub/Subなら dead_letter_policymax_delivery_attempts を5前後に設定するのが標準だ。

2. メッセージの冪等性設計
At-least-once配信では同じメッセージが複数回届く可能性がある。task_id をDBに記録し、処理済みなら二重実行をスキップするロジックが必須だ。

3. 可観測性の確保
非同期パイプラインはデバッグが難しい。KafkaならConsumer Lag、SQSなら ApproximateNumberOfMessagesNotVisible、Pub/Subなら未配信メッセージ数をCloudMonitoringで監視する。コスト最適化の観点についてはAIエージェントのコスト最適化7原則も参考にしてほしい。

まとめ

イベント駆動AIエージェントのポイントを整理する。

  • 同期型の限界:タイムアウト・スループット・強結合の3つが問題になるとき、非同期化を検討する
  • Kafka:KRaft標準化(v4.0〜)で大規模ストリーム処理に強い。セルフホストが必要な場合に選択
  • SQS+Lambda:AWSネイティブで最も簡単に非同期エージェントを組める。サーバーレスで運用コストが低い
  • Pub/Sub:GCPエコシステムとの親和性が高く、Vertex AI連携に最適
  • 判断基準:SLA・副作用・スループットの3軸で同期か非同期かを決める

この記事を読んで導入イメージが固まってきた方へ

UravationではAIエージェント導入の研修・コンサルを行っています。イベント駆動アーキテクチャを含む実装支援もお気軽にご相談ください。

Need help moving from reading to rollout?

この記事を読んで導入イメージが固まってきた方へ

Uravationでは、AIエージェントの要件整理、PoC設計、社内導入、研修まで一気通貫で支援しています。

この記事をシェア

X Facebook LINE

※ 本記事の情報は2026年6月時点のものです。サービスの料金・仕様は変更される可能性があります。最新情報は各サービスの公式サイトをご確認ください。

関連記事