AIエージェント入門

ストリーミング応答 実装ガイド|SSE・トークン逐次表示【2026】

この記事の結論

LLM/AIエージェントのストリーミング応答を実装で使いこなす。SSE・トークン逐次表示の仕組み、Anthropic/OpenAIの最小実装、ツール進捗の可視化、200応答後のエラー処理とUXまで2026年版で解説。

LLM・AIエージェントの応答は、最終的な出力が同じでも「一気にドンと出る」か「文字が流れるように出る」かで、体感速度がまったく変わります。本記事は、ストリーミング応答(SSE・トークン逐次表示・ツール実行の進捗可視化)を、開発者が実装レベルで使いこなすためのガイドです。Anthropic Messages API / OpenAI のストリーミング仕様を2026年6月時点の公式ドキュメントで確認したうえで、最小実装からエージェント特有の難所、エラー処理、UXの工夫までを通しで解説します。

ストリーミング応答の流れを示す図解。モデルがトークンを生成し、SSEでチャンク(delta)を逐次送信、画面に逐次表示して完了する流れ。
ストリーミング応答の流れ(モデルがトークン生成→SSEでchunk逐次送信→画面に逐次表示→完了)

なぜストリーミングが体感速度に効くのか

ストリーミングが効くのは、ユーザーが「待たされている」と感じる時間の正体が、合計の応答時間ではなく「最初の何かが出るまでの時間」だからです。指標で言えば TTFT(Time To First Token、最初のトークンが届くまでの時間)が支配的に効きます。

非ストリーミング(一括レスポンス)の場合、サーバーはモデルの生成が完全に終わるまでレスポンスを返しません。500トークンの回答なら、生成が終わる数秒〜十数秒のあいだ、画面は「考え中」のスピナーのまま固まります。一方ストリーミングでは、モデルが最初の数トークンを出した瞬間にそれを画面へ流せます。総生成時間は変わらなくても、「最初の文字が出るまでの時間」は数十分の一に縮みます。

この差はエージェントでさらに広がります。エージェントは「思考 → ツール呼び出し → ツール実行 → 結果を読んで再思考 → 最終回答」と複数ステップを踏むため、一括レスポンスだと全ステップ完了まで画面は無言のまま。ユーザーは「止まっているのか、動いているのか」さえわかりません。ストリーミングで途中経過(どのツールを、どの引数で呼んでいるか)を流せば、待ち時間そのものは変わらなくても「ちゃんと動いている」という安心感を提供できます。これが離脱率と満足度を直接左右します。

仕組み:SSE・チャンク/delta・event型

LLM ストリーミングの土台は Server-Sent Events(SSE)です。SSE は HTTP の上で動く片方向(サーバー → クライアント)のストリーム配信プロトコルで、レスポンスの Content-Typetext/event-stream にし、接続を開いたまま小さなイベントを少しずつ送り続けます。WebSocket と違い双方向通信はできませんが、LLM の「サーバーが一方的にトークンを送る」用途には過不足なく、HTTP/プロキシとの相性も良好です。

SSE の1イベントは、行ベースのテキストで表現されます。代表的なフィールドは次のとおりです。

  • event: — イベントの種類名(例: content_block_delta)。省略時は message 扱い。
  • data: — ペイロード本体。LLM API では JSON 文字列が入ることが多い。
  • 空行 — 1つのイベントの区切り。空行が来たら「このイベントは終わり」を意味する。

LLM API は、この SSE の上に独自の「イベント型」を載せています。大きく2系統あります。

1) Anthropic Messages API(イベント名付き・構造化)。各 SSE イベントに名前が付き(event: message_start など)、data の JSON にも対応する type が入ります。テキスト・ツール入力・思考(thinking)などが「content block」という単位で index を持って流れてくるのが特徴です。

2) OpenAI(2系統)。Chat Completions API では、各チャンクが完全な message ではなく choices[].delta という差分を運び、最後に data: [DONE] という番兵(センチネル)で終了します。新しい Responses API では、response.created / response.output_text.delta / response.completed のような意味づけされた(semantic)イベント型で流れ、各イベントが type を持ちます。

つまり「チャンク/delta」は差分の運び方、「event型」は差分の種類の見分け方だと理解すると整理しやすくなります。

最小実装:Anthropic / OpenAI のストリーミング

まずは公式 SDK を使った最小構成から押さえます。SDK が SSE のパースとイベント accumulate(蓄積)を肩代わりしてくれるので、最短で「文字が流れる」状態を作れます。手順は次のとおりです。

  1. API キーを環境変数に置き、公式 SDK をインストールする(pip install anthropic または pip install openai)。
  2. リクエストで stream=true(SDK ではストリーミング用メソッド)を指定する。
  3. 返ってくるイベント/テキストをループで受け取り、届いた断片をそのまま出力先(標準出力やフロント)へ流す。
  4. テキスト断片だけが欲しいなら、SDK が提供する「テキスト専用イテレータ」を使うとイベント判定を省ける。

Anthropic Messages API(Python SDK)。テキスト断片だけを取り出す最短形です。

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-opus-4-8",
    max_tokens=1024,
    messages=[{"role": "user", "content": "ストリーミングを一言で説明して"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

イベントを自分で見分けたい場合は、typedelta.type で分岐します。テキストは content_block_delta の中の text_delta に入っています。

with client.messages.stream(
    model="claude-opus-4-8",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
) as stream:
    for event in stream:
        if event.type == "content_block_delta" and event.delta.type == "text_delta":
            print(event.delta.text, end="", flush=True)

OpenAI(Python SDK・Chat Completions)。stream=True を渡すと、各チャンクの choices[0].delta.content にトークン断片が入ります。最後は None になるので分岐します。

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True,
)

for chunk in stream:
    delta = chunk.choices[0].delta.content
    if delta is not None:
        print(delta, end="", flush=True)

どちらも SDK が SSE のパースを内部で処理するため、生の data: 行を自前で切り出す必要はありません。直接 HTTP を叩く場合のみ、SSE のフレーミング(空行区切り・data: プレフィックス除去・JSON パース)を自分で書くことになります。

エージェント特有の難所:途中経過・ツール実行・中断

単純なチャットなら「テキストを流すだけ」で済みますが、エージェントになると一気に難所が増えます。テキストの隙間にツール呼び出し・複数ステップ・思考が挟まるためです。

難所1:ツール引数は「部分 JSON」で流れてくる。Anthropic のツール使用ストリーミングでは、ツール入力の delta は input_json_delta という型で、完成した JSON ではなく断片文字列(partial_jsonとして届きます。たとえば {"location": "San Francisc … のように壊れた JSON 片が連続します。これをそのまま json.loads() に渡すと当然エラーになります。正しくは、断片を順に連結し、content_block_stop を受け取ってからまとめてパースします。

buffers = {}  # index ごとに partial_json を貯める

with client.messages.stream(
    model="claude-opus-4-8",
    max_tokens=1024,
    tools=tools,
    messages=messages,
) as stream:
    for event in stream:
        if event.type == "content_block_delta" and event.delta.type == "input_json_delta":
            idx = event.index
            buffers[idx] = buffers.get(idx, "") + event.delta.partial_json
        elif event.type == "content_block_stop":
            raw = buffers.pop(event.index, None)
            if raw:
                import json
                tool_input = json.loads(raw)  # ここで初めて完全な JSON になる
                print("tool call:", tool_input)

なお Anthropic のドキュメントには、現行モデルは input の「キーと値を1組ずつ」まとめて出すため、ツール使用中はストリーミングイベントの間に遅延が生じることがあると明記されています(より細かい粒度で流す fine-grained tool streaming もオプションで用意されています)。UI 側は「ツールの引数を組み立て中」という状態を表現できると親切です。

難所2:複数ステップの途中経過。Anthropic では各出力ブロックが index を持ち、content_block_start でそのブロックの種別(text / tool_use / thinking / server_tool_use など)がわかります。エージェント UI では、この content_block_start をフックに「テキストを書いています」「get_weather を呼んでいます」と表示を切り替えると、ステップの進行が可視化できます。

難所3:中断/キャンセル。ユーザーが「ストップ」を押したとき、ストリームを安全に止める必要があります。実装上は、HTTP 接続(リクエスト)をクライアント側でクローズする=SSE の読み取りループを抜けるのが基本です。Python SDK の with ... as stream: ブロックを抜ければ接続は閉じられます。Web フロントなら AbortControllerfetch を中断します。中断時点までに受け取ったテキストは捨てずに残す(部分応答として表示しておく)と、ユーザー体験が崩れません。

フロントエンドへの流し方:SSE と WebSocket、バッファリング

サーバーで受けた LLM のストリームを、さらにブラウザへ中継する必要があります。選択肢は主に2つです。

  • SSE(推奨・素直):サーバーが text/event-stream でレスポンスを開き、LLM から届いた断片を即座に data: イベントとしてブラウザへ書き出す。ブラウザは EventSource で受ける(ただし EventSource は GET のみ・カスタムヘッダ不可なので、POST やトークン送付が必要なら fetch + ReadableStream で SSE を読む構成にする)。LLM のストリーミングは片方向なので、SSE がそのまま当てはまります。
  • WebSocket:双方向が必要な場合(例: 生成中にユーザー入力を割り込ませる、音声と同時にやり取りする)に選ぶ。ただし接続管理・再接続・スケールの負担が SSE より重い。「テキストを流すだけ」なら過剰になりがち。

ブラウザ側で fetch + ReadableStream から SSE を読む最小例は次のようになります。

const res = await fetch("/api/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message }),
  signal: abortController.signal, // 中断用
});

const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = "";

while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  buf += decoder.decode(value, { stream: true });

  // SSE はイベントごとに空行(\n\n)で区切られる
  const parts = buf.split("\n\n");
  buf = parts.pop(); // 末尾は未完成かもしれないので持ち越す
  for (const part of parts) {
    const line = part.split("\n").find((l) => l.startsWith("data:"));
    if (!line) continue;
    const data = line.slice(5).trim();
    if (data === "[DONE]") return;
    const json = JSON.parse(data);
    appendToUI(json.text); // 画面に追記
  }
}

ここで重要なのが バッファリング(buf の持ち越し)です。ネットワークのチャンク境界は SSE のイベント境界と一致しないため、data: の途中で read() が切れることがあります。「空行が来るまでイベントは未完成」という前提で、半端な断片を次の読み取りまで持ち越さないと、JSON パースが壊れます。これはストリーミング実装で最も踏みやすい落とし穴の一つです。

エラー処理:200後のストリーム中エラー・再接続・部分応答

ストリーミングのエラー処理が普通の API と決定的に違うのは、HTTP 200 を返した「後」にもエラーが起こりうる点です。接続が確立した時点でステータスコードは 200 で確定しているため、その後にモデル側で過負荷やネットワーク断が起きても、ステータスコードでは検知できません。

Anthropic Messages API は、これをストリーム内の error イベントで通知します。たとえば高負荷時には、非ストリーミングなら HTTP 529 に相当する overloaded_error が、ストリームの途中で次のように流れてきます。

event: error
data: {"type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"}}

したがって、受信ループ内で type == "error" を必ずハンドリングし、ステータスコードだけに頼らないことが鉄則です。実装時の指針は次のとおりです。

  1. ストリーム受信ループの中で、error 型イベントを専用に分岐して捕捉する。
  2. エラーを受けたら、それまでに受信済みのテキストを「部分応答」として保持する(捨てない)。
  3. ネットワーク断・タイムアウトの場合は、捕捉した部分応答をもとに継続リクエストを組み立てて再開(resume)する。
  4. 未知のイベント型は無視して落とさない(API のバージョニング方針上、新しいイベント型が追加されうるため、知らない型は graceful にスキップする)。

再接続・継続については、Anthropic のドキュメントが「capture-and-resume(部分応答を保存してから続きを要求する)」戦略を示しています。受信済みのテキストを起点に続きを要求することで、応答全体を最初から作り直さずに済みます。ただしツール使用ブロックや思考(thinking)ブロックは部分的には復元できず、再開できるのは直近のテキストブロックからである点に注意が必要です。冪等性やリトライの設計と合わせて、エージェント全体のリトライ戦略の中で扱うべきテーマです(関連: AIエージェントの堅牢化|リトライ・冪等性・タイムアウト設計ガイド)。

UXの工夫:タイピング表示・ツール実行の可視化・ストップボタン

仕組みが動いたら、最後は「速く感じさせる」UI 設計です。同じストリームでも、見せ方で体感は大きく変わります。

  • タイピング表示とカーソル:届いた文字を即追記し、末尾に点滅カーソル(▍)を置くと「生成中」が直感的に伝わります。逆に、断片を貯めて一括描画すると、せっかくのストリーミングが台無しになります。
  • 過度な再レンダリングを避ける:トークンごとに重い Markdown 全体パースを走らせると、長文で UI がカクつきます。一定間隔(例: 数十ミリ秒)でまとめて描画する、差分のみ追記する、といった軽量化が有効です。
  • ツール実行の可視化content_block_start でツールを検知したら、「🔧 get_weather を実行中…」のようなチップ(ステータス表示)を出す。引数が固まったら(content_block_stop 後に)どんな引数で呼んだかを折りたたみで見せると、エージェントの動きが「ブラックボックス」でなくなります。
  • ストップボタン:生成中は送信ボタンを停止ボタンに切り替え、押下で AbortController によりストリームを中断。中断時点までのテキストは残し、「中断しました」と明示する。これにより「暴走しても止められる」という安心感を与えられます。
  • 思考(thinking)の扱い:拡張思考をストリーミングすると thinking_delta が流れます。これを本文と混ぜず、折りたたみの「思考中…」領域に分離して表示すると、最終回答の可読性を保てます。

こうした UX の積み重ねは、エージェントが扱うコンテキストや状態の設計とも密接に関わります。長い対話・多段ステップを破綻なく見せるには、応答の流し方だけでなく文脈管理の設計も重要です(関連: コンテキストエンジニアリング|AIエージェントの文脈管理・圧縮ガイド)。

まとめ

ストリーミングは「速くする」技術ではなく、「速いと感じさせる」技術です。総生成時間は同じでも、最初の文字が出るまでの時間(TTFT)を縮め、途中経過を見せることで、待ち時間の体験を根本から変えます。

実装の勘所を最後に整理します。仕組みは SSE が土台で、Anthropic は message_startcontent_block_deltatext_delta / input_json_delta / thinking_delta)〜 message_stop の構造化イベント、OpenAI は choices[].delta + [DONE](Chat Completions)または semantic イベント(Responses API)。最小実装は SDK に任せ、エージェント化したらツール引数の部分 JSON 連結・ステップの可視化・中断対応を足す。そして200 応答後のストリーム中エラーを必ず捕捉し、部分応答を捨てずに再接続する——ここまで設計して初めて「堅牢で速く感じる」エージェント体験が完成します。

この記事を読んで導入イメージが固まってきた方へ

UravationではAIエージェント導入の研修・コンサルを行っています。ストリーミング UI を含む実装設計から、社内導入の伴走まで対応します。

参考・出典

関連記事(AIエージェント実装ガイド)

Need help moving from reading to rollout?

この記事を読んで導入イメージが固まってきた方へ

Uravationでは、AIエージェントの要件整理、PoC設計、社内導入、研修まで一気通貫で支援しています。

この記事をシェア

X Facebook LINE

※ 本記事の情報は2026年6月時点のものです。サービスの料金・仕様は変更される可能性があります。最新情報は各サービスの公式サイトをご確認ください。

関連記事