LLM → TTS streaming pipeline

The core latency pattern for voice agents. Pipe LLM tokens directly into TTS as they arrive, don't wait for the full response. Saves 200-400 ms of time-to-first-audio and makes conversations feel natural.

Why it matters

-Full-text pipelineToken-streaming pipeline
Time to first audioLLM ~500 ms + TTS ~300 ms = ~800 msFirst sentence ~150 ms + TTS ~200 ms = ~350 ms
Memory requirementFull response bufferedConstant, chunk by chunk
ComplexitySimpleRequires sentence buffering
Best forShort, known textVoice agents, conversational AI

The pattern

Sentence-buffering strategy

Flush to TTS at sentence boundaries, not word boundaries. Word-by-word flushes kill prosody, the model needs full sentences to produce natural intonation.

  • Accumulate incoming tokens into a string buffer.
  • Flush when a sentence-ending punctuation mark appears: . ! ? ;
  • Apply a minimum buffer length (e.g. 15 characters) to avoid flushing fragments like "Yes."
  • After the LLM stream ends, always flush the remaining buffer, LLMs often end mid-sentence.

LLM → Shunyalabs pipeline

Pick your LLM provider, the pattern is the same on both sides: stream tokens out of the LLM, buffer them until a sentence boundary, flush each complete sentence into Shunya TTS.

python
from openai import AsyncOpenAI
from shunyalabs import AsyncShunyaClient
from shunyalabs.tts import TTSConfig

async def gpt_to_tts(user_message: str):
    oai    = AsyncOpenAI()
    shunya = AsyncShunyaClient()
    config = TTSConfig(model="zero-indic", voice="Sunita", response_format="pcm")

    buffer = ""
    stream = await oai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        stream=True,
    )
    async for chunk in stream:
        token   = chunk.choices[0].delta.content or ""
        buffer += token
        if token in (".", "!", "?", ";") and len(buffer) > 15:
            async for audio in await shunya.tts.stream(buffer, config=config):
                speaker.write(audio)
            buffer = ""

    # Flush remaining buffer after LLM stream ends
    if buffer.strip():
        async for audio in await shunya.tts.stream(buffer, config=config):
            speaker.write(audio)
python
import anthropic
from shunyalabs import AsyncShunyaClient
from shunyalabs.tts import TTSConfig

async def claude_to_tts(user_message: str):
    claude = anthropic.AsyncAnthropic()
    shunya = AsyncShunyaClient()
    config = TTSConfig(model="zero-indic", voice="Rajesh", response_format="pcm")

    buffer = ""
    async with claude.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": user_message}],
    ) as stream:
        async for text in stream.text_stream:
            buffer += text
            if text in (".", "!", "?", ";") and len(buffer) > 15:
                async for audio in await shunya.tts.stream(buffer, config=config):
                    speaker.write(audio)
                buffer = ""

    # Flush remaining buffer after LLM stream ends
    if buffer.strip():
        async for audio in await shunya.tts.stream(buffer, config=config):
            speaker.write(audio)

Interruption handling

In voice agents, users often talk over the bot. When your ASR detects speech_start from the user, cancel the in-flight TTS stream so the user gets the floor immediately.

python
import asyncio
from shunyalabs.tts import TTSConfig

class VoiceAgent:
    def __init__(self):
        self.tts_task   = None
        self.interrupted = asyncio.Event()

    async def on_user_speech_detected(self):
        """Called by ASR when user starts speaking."""
        self.interrupted.set()
        if self.tts_task:
            self.tts_task.cancel()

    async def speak(self, text: str, client):
        self.interrupted.clear()
        config = TTSConfig(model="zero-indic", voice="Varun", response_format="pcm")

        async def _stream():
            async for audio in await client.tts.stream(text, config=config):
                if self.interrupted.is_set():
                    break
                speaker.write(audio)

        self.tts_task = asyncio.create_task(_stream())
        await self.tts_task

Best practices

Minimising time to first audio

  • Pre-open the WebSocket before the LLM call, eliminates ~200 ms connection setup.
  • Use PCM output format, no client-side decoding.
  • Shorter sentences = faster response. Consider instructing your LLM to be concise in conversational contexts.

Handling LLM response edge cases

  • LLM ends mid-sentence. Always flush the remaining buffer after the LLM stream ends.
  • Markdown tokens in voice output. Strip **, ##, *, etc. before sending to TTS.
  • Empty strings. Check buffer.strip() before synthesizing, avoid empty calls.
  • Function calls. Don't send tool-call tokens to TTS. Filter to content-only tokens.

Graceful degradation

  • If TTS fails mid-stream, fall back to a pre-recorded "sorry, let me try again" clip.
  • If LLM is slow, send a pre-recorded filler like "Let me check that for you..." so the user hears something within 500 ms.

Full voice agent skeleton

python
import asyncio, json, websockets
from shunyalabs import AsyncShunyaClient
from shunyalabs.tts import TTSConfig

class VoiceAgent:
    def __init__(self, llm):
        self.llm    = llm
        self.shunya = AsyncShunyaClient()
        self.tts_cfg = TTSConfig(model="zero-indic", voice="Sunita", response_format="pcm")
        self.speaking = False
        self.interrupt = asyncio.Event()

    async def listen_loop(self):
        async with websockets.connect("wss://asr.shunyalabs.ai/ws") as asr:
            await asr.send(json.dumps({
                "api_key": API_KEY, "model": "zero-indic", "language": "hi",
                "sample_rate": 16000, "dtype": "int16",
            }))
            async for raw in asr:
                msg = json.loads(raw)
                if msg["type"] == "speech_start" and self.speaking:
                    self.interrupt.set()        # user cut in
                elif msg["type"] == "final_segment":
                    await self.respond(msg["text"])

    async def respond(self, user_text):
        self.speaking = True
        self.interrupt.clear()
        buffer = ""
        async for token in self.llm.stream(user_text):
            if self.interrupt.is_set():
                break
            buffer += token
            if token in (".", "!", "?", ";") and len(buffer) > 15:
                async for chunk in await self.shunya.tts.stream(buffer, config=self.tts_cfg):
                    if self.interrupt.is_set():
                        break
                    speaker.write(chunk)
                buffer = ""
        if buffer.strip() and not self.interrupt.is_set():
            async for chunk in await self.shunya.tts.stream(buffer, config=self.tts_cfg):
                speaker.write(chunk)
        self.speaking = False