LLM → TTS streaming pipeline
The core latency pattern for voice agents. Pipe LLM tokens directly into TTS as they arrive, don't wait for the full response. Saves 200-400 ms of time-to-first-audio and makes conversations feel natural.
Why it matters
| - | Full-text pipeline | Token-streaming pipeline |
|---|---|---|
| Time to first audio | LLM ~500 ms + TTS ~300 ms = ~800 ms | First sentence ~150 ms + TTS ~200 ms = ~350 ms |
| Memory requirement | Full response buffered | Constant, chunk by chunk |
| Complexity | Simple | Requires sentence buffering |
| Best for | Short, known text | Voice agents, conversational AI |
The pattern
Sentence-buffering strategy
Flush to TTS at sentence boundaries, not word boundaries. Word-by-word flushes kill prosody, the model needs full sentences to produce natural intonation.
- Accumulate incoming tokens into a string buffer.
- Flush when a sentence-ending punctuation mark appears:
. ! ? ; - Apply a minimum buffer length (e.g. 15 characters) to avoid flushing fragments like "Yes."
- After the LLM stream ends, always flush the remaining buffer, LLMs often end mid-sentence.
LLM → Shunyalabs pipeline
Pick your LLM provider, the pattern is the same on both sides: stream tokens out of the LLM, buffer them until a sentence boundary, flush each complete sentence into Shunya TTS.
python
from openai import AsyncOpenAI
from shunyalabs import AsyncShunyaClient
from shunyalabs.tts import TTSConfig
async def gpt_to_tts(user_message: str):
oai = AsyncOpenAI()
shunya = AsyncShunyaClient()
config = TTSConfig(model="zero-indic", voice="Sunita", response_format="pcm")
buffer = ""
stream = await oai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
stream=True,
)
async for chunk in stream:
token = chunk.choices[0].delta.content or ""
buffer += token
if token in (".", "!", "?", ";") and len(buffer) > 15:
async for audio in await shunya.tts.stream(buffer, config=config):
speaker.write(audio)
buffer = ""
# Flush remaining buffer after LLM stream ends
if buffer.strip():
async for audio in await shunya.tts.stream(buffer, config=config):
speaker.write(audio)python
import anthropic
from shunyalabs import AsyncShunyaClient
from shunyalabs.tts import TTSConfig
async def claude_to_tts(user_message: str):
claude = anthropic.AsyncAnthropic()
shunya = AsyncShunyaClient()
config = TTSConfig(model="zero-indic", voice="Rajesh", response_format="pcm")
buffer = ""
async with claude.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
buffer += text
if text in (".", "!", "?", ";") and len(buffer) > 15:
async for audio in await shunya.tts.stream(buffer, config=config):
speaker.write(audio)
buffer = ""
# Flush remaining buffer after LLM stream ends
if buffer.strip():
async for audio in await shunya.tts.stream(buffer, config=config):
speaker.write(audio)Interruption handling
In voice agents, users often talk over the bot. When your ASR detects speech_start from the user, cancel the in-flight TTS stream so the user gets the floor immediately.
python
import asyncio
from shunyalabs.tts import TTSConfig
class VoiceAgent:
def __init__(self):
self.tts_task = None
self.interrupted = asyncio.Event()
async def on_user_speech_detected(self):
"""Called by ASR when user starts speaking."""
self.interrupted.set()
if self.tts_task:
self.tts_task.cancel()
async def speak(self, text: str, client):
self.interrupted.clear()
config = TTSConfig(model="zero-indic", voice="Varun", response_format="pcm")
async def _stream():
async for audio in await client.tts.stream(text, config=config):
if self.interrupted.is_set():
break
speaker.write(audio)
self.tts_task = asyncio.create_task(_stream())
await self.tts_taskBest practices
Minimising time to first audio
- Pre-open the WebSocket before the LLM call, eliminates ~200 ms connection setup.
- Use PCM output format, no client-side decoding.
- Shorter sentences = faster response. Consider instructing your LLM to be concise in conversational contexts.
Handling LLM response edge cases
- LLM ends mid-sentence. Always flush the remaining buffer after the LLM stream ends.
- Markdown tokens in voice output. Strip
**,##,*, etc. before sending to TTS. - Empty strings. Check
buffer.strip()before synthesizing, avoid empty calls. - Function calls. Don't send tool-call tokens to TTS. Filter to content-only tokens.
Graceful degradation
- If TTS fails mid-stream, fall back to a pre-recorded "sorry, let me try again" clip.
- If LLM is slow, send a pre-recorded filler like "Let me check that for you..." so the user hears something within 500 ms.
Full voice agent skeleton
python
import asyncio, json, websockets
from shunyalabs import AsyncShunyaClient
from shunyalabs.tts import TTSConfig
class VoiceAgent:
def __init__(self, llm):
self.llm = llm
self.shunya = AsyncShunyaClient()
self.tts_cfg = TTSConfig(model="zero-indic", voice="Sunita", response_format="pcm")
self.speaking = False
self.interrupt = asyncio.Event()
async def listen_loop(self):
async with websockets.connect("wss://asr.shunyalabs.ai/ws") as asr:
await asr.send(json.dumps({
"api_key": API_KEY, "model": "zero-indic", "language": "hi",
"sample_rate": 16000, "dtype": "int16",
}))
async for raw in asr:
msg = json.loads(raw)
if msg["type"] == "speech_start" and self.speaking:
self.interrupt.set() # user cut in
elif msg["type"] == "final_segment":
await self.respond(msg["text"])
async def respond(self, user_text):
self.speaking = True
self.interrupt.clear()
buffer = ""
async for token in self.llm.stream(user_text):
if self.interrupt.is_set():
break
buffer += token
if token in (".", "!", "?", ";") and len(buffer) > 15:
async for chunk in await self.shunya.tts.stream(buffer, config=self.tts_cfg):
if self.interrupt.is_set():
break
speaker.write(chunk)
buffer = ""
if buffer.strip() and not self.interrupt.is_set():
async for chunk in await self.shunya.tts.stream(buffer, config=self.tts_cfg):
speaker.write(chunk)
self.speaking = False