Streaming AI Responses via gRPC + SSE: Python FastAPI → Go Backend → Browser
When building an AI-powered feature, you often end up with a Python service running the model (because the ML ecosystem lives in Python) and a Go backend serving your API. The challenge: how do you stream AI-generated tokens all the way from Python to the browser in real time?
This post walks through the full stack: gRPC server-side streaming from Python FastAPI to Go, then Server-Sent Events (SSE) from Go to the browser — including heartbeats so long-running AI jobs don’t drop the connection.
Architecture Overview
Browser ──(SSE/HTTP)──► Go Backend ──(gRPC stream)──► Python FastAPI (AI)
Three layers, two protocols:
- gRPC between Go and Python — HTTP/2, strongly typed, efficient binary framing
- SSE between Go and the browser — simple HTTP/1.1 text stream, natively supported in browsers with
EventSource
The Python service runs the AI model and yields tokens one by one as they’re generated. Go receives them over a gRPC stream and forwards each token to the browser as an SSE event.
Step 1: Define the Proto
The key detail here is returns (stream GenerateResponse) — this is a server-side streaming RPC, not the unary RPC you see in most gRPC tutorials. Without streaming, you’d have to wait for the entire AI response before sending anything back.
// proto/ai.proto
syntax = "proto3";
package ai;
option go_package = "yourmodule/proto/ai";
message GenerateRequest {
string prompt = 1;
string session_id = 2;
}
message GenerateResponse {
string token = 1;
bool done = 2;
}
service AIService {
// Server-side streaming RPC — Python yields, Go reads
rpc Generate(GenerateRequest) returns (stream GenerateResponse);
}
Generate stubs for both sides:
# Go
protoc \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/ai.proto
# Python
python -m grpc_tools.protoc \
-I ./proto \
--python_out=. \
--grpc_python_out=. \
proto/ai.proto
Step 2: Python FastAPI — gRPC Streaming Server
The Python service acts as the gRPC server. It runs the AI model and yields tokens as they’re produced. The async for loop is important here — you want to yield each token immediately rather than buffering the entire response.
# server.py
import asyncio
import grpc
from grpc import aio
import ai_pb2
import ai_pb2_grpc
# Example with Anthropic SDK — swap for OpenAI, Ollama, etc.
# from anthropic import AsyncAnthropic
# client = AsyncAnthropic()
class AIServicer(ai_pb2_grpc.AIServiceServicer):
async def Generate(self, request, context):
prompt = request.prompt
print(f"[gRPC] Received prompt: {prompt}")
# --- Replace with your actual LLM streaming call ---
# async with client.messages.stream(
# model="claude-opus-4-5",
# max_tokens=1024,
# messages=[{"role": "user", "content": prompt}]
# ) as stream:
# async for text in stream.text_stream:
# yield ai_pb2.GenerateResponse(token=text, done=False)
# Simulated token stream for demo
words = f"Streaming response to: {prompt}".split()
for word in words:
await asyncio.sleep(0.1)
yield ai_pb2.GenerateResponse(token=word + " ", done=False)
# Signal completion
yield ai_pb2.GenerateResponse(token="", done=True)
async def serve():
options = [
# Match these with Go client keepalive settings (see Step 4)
("grpc.keepalive_time_ms", 20000),
("grpc.keepalive_timeout_ms", 10000),
("grpc.keepalive_permit_without_calls", 0),
("grpc.http2.min_recv_ping_interval_without_data_ms", 15000),
]
server = aio.server(options=options)
ai_pb2_grpc.add_AIServiceServicer_to_server(AIServicer(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)
print(f"[gRPC] Python server started on {listen_addr}")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
Note: No heartbeat needed on the Python side. The Go client will simply block on
stream.Recv()while Python is processing. The gRPC layer (HTTP/2) handles the connection at the transport level.
Step 3: Go — gRPC Client Wrapper
Create the gRPC client once at startup and reuse it. grpc.ClientConn is safe for concurrent use, so a single connection handles all in-flight requests.
// internal/aiclient/client.go
package aiclient
import (
"context"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
pb "yourmodule/proto/ai"
)
type Client struct {
conn *grpc.ClientConn
client pb.AIServiceClient
}
func New(addr string) (*Client, error) {
// Keepalive: important when Go and Python sit behind a proxy or
// Kubernetes ingress — idle TCP connections can be silently dropped
// while the AI model is thinking.
kaParams := keepalive.ClientParameters{
Time: 20 * time.Second, // send ping after 20s of inactivity
Timeout: 10 * time.Second, // consider dead if no pong in 10s
PermitWithoutStream: false, // only ping during active streams
}
conn, err := grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(kaParams),
)
if err != nil {
return nil, err
}
return &Client{
conn: conn,
client: pb.NewAIServiceClient(conn),
}, nil
}
func (c *Client) Close() {
c.conn.Close()
}
func (c *Client) StreamGenerate(ctx context.Context, prompt, sessionID string) (pb.AIService_GenerateClient, error) {
return c.client.Generate(ctx, &pb.GenerateRequest{
Prompt: prompt,
SessionId: sessionID,
})
}
Step 4: Go — SSE Handler with Heartbeat
This is the core of the whole setup. The handler:
- Opens the SSE connection to the browser
- Starts a goroutine to read from the gRPC stream
- Forwards tokens as SSE
data:events - Sends a heartbeat comment (
: ping) every 15 seconds when no tokens arrive — keeping the browser connection alive during long AI processing
// internal/handler/sse.go
package handler
import (
"context"
"fmt"
"io"
"log"
"net/http"
"time"
"yourmodule/internal/aiclient"
)
const heartbeatInterval = 15 * time.Second
type SSEHandler struct {
ai *aiclient.Client
}
func NewSSEHandler(ai *aiclient.Client) *SSEHandler {
return &SSEHandler{ai: ai}
}
type streamEvent struct {
token string
done bool
err error
}
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Required SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
prompt := r.URL.Query().Get("prompt")
sessionID := r.URL.Query().Get("session_id")
if prompt == "" {
http.Error(w, "prompt is required", http.StatusBadRequest)
return
}
// Hard timeout — adjust to your worst-case AI processing time
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Minute)
defer cancel()
stream, err := h.ai.StreamGenerate(ctx, prompt, sessionID)
if err != nil {
log.Printf("failed to start gRPC stream: %v", err)
fmt.Fprintf(w, "event: error\ndata: failed to connect to AI service\n\n")
flusher.Flush()
return
}
// Buffered channel so the gRPC reader doesn't block immediately
// if the SSE writer is briefly busy flushing
eventCh := make(chan streamEvent, 8)
// Goroutine: read from gRPC stream → push to channel
go func() {
defer close(eventCh)
for {
resp, err := stream.Recv()
if err == io.EOF {
eventCh <- streamEvent{done: true}
return
}
if err != nil {
eventCh <- streamEvent{err: err}
return
}
if resp.Done {
eventCh <- streamEvent{done: true}
return
}
eventCh <- streamEvent{token: resp.Token}
}
}()
// Ticker resets on real activity — heartbeats only fire during idle gaps
heartbeat := time.NewTicker(heartbeatInterval)
defer heartbeat.Stop()
for {
select {
case evt, ok := <-eventCh:
if !ok {
return
}
if evt.err != nil {
log.Printf("stream error: %v", evt.err)
fmt.Fprintf(w, "event: error\ndata: stream interrupted\n\n")
flusher.Flush()
return
}
if evt.done {
fmt.Fprintf(w, "event: done\ndata: [DONE]\n\n")
flusher.Flush()
return
}
// Forward token to browser
fmt.Fprintf(w, "data: %s\n\n", evt.token)
flusher.Flush()
// Reset heartbeat — no ping needed while tokens are flowing
heartbeat.Reset(heartbeatInterval)
case <-heartbeat.C:
// SSE comment line — browsers ignore it, proxies/nginx see activity
fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
log.Printf("[SSE] heartbeat sent for session=%s", sessionID)
case <-ctx.Done():
fmt.Fprintf(w, "event: error\ndata: request timeout or cancelled\n\n")
flusher.Flush()
return
}
}
}
Step 5: Wire It Up in main.go
// main.go
package main
import (
"log"
"net/http"
"yourmodule/internal/aiclient"
"yourmodule/internal/handler"
)
func main() {
// Single connection, created once, reused across all requests
ai, err := aiclient.New("localhost:50051")
if err != nil {
log.Fatalf("failed to connect to AI gRPC service: %v", err)
}
defer ai.Close()
sseHandler := handler.NewSSEHandler(ai)
mux := http.NewServeMux()
mux.Handle("/api/ai/stream", sseHandler)
log.Println("Go HTTP server listening on :8080")
log.Fatal(http.ListenAndServe(":8080", mux))
}
Step 6: Frontend — Consuming SSE
const prompt = "Explain gRPC streaming in simple terms";
const evtSource = new EventSource(
`/api/ai/stream?prompt=${encodeURIComponent(prompt)}&session_id=abc123`
);
const output = document.getElementById("output");
// Default message event — receives tokens
evtSource.onmessage = (e) => {
output.innerText += e.data;
};
// Named "done" event — AI finished
evtSource.addEventListener("done", () => {
evtSource.close();
console.log("Stream complete");
});
// Named "error" event — something went wrong
evtSource.addEventListener("error", (e) => {
console.error("SSE error", e);
evtSource.close();
});
Tip: The heartbeat (
: ping\n\n) is a comment line in the SSE spec. Browsers silently ignore it — you don’t need any client-side handler for it. It exists purely to prevent proxies and load balancers from closing what they perceive as an idle connection.
Heartbeat: Where and Why
A common question: do you need heartbeats on the Python gRPC server side too? Short answer: no — but it depends on your infrastructure.
| Layer | Protocol | Needs heartbeat? | Why |
|---|---|---|---|
| Browser ↔ Go | SSE / HTTP/1.1 | ✅ Yes | Proxies drop idle HTTP connections |
| Go ↔ Python | gRPC / HTTP/2 | ⚠️ Only behind proxies | HTTP/2 handles keepalive at transport level |
| Python (internal) | — | ❌ No | Just keep yielding tokens |
If Go and Python run in the same cluster with no intermediate proxy, the gRPC connection is fine with no extra config. If there’s a load balancer (AWS ALB, nginx, Kubernetes ingress) between them, add the keepalive.ClientParameters on the Go side and matching grpc.keepalive_* options on the Python server (as shown in Step 2) — otherwise the LB may silently reset the TCP connection mid-stream.
Key Takeaways
- Use server-side streaming in your proto.
returns (stream GenerateResponse)is what makes this work. Unary RPCs wait for the full response before returning — useless for LLM token streaming. - One gRPC connection, many streams. Create
aiclient.Clientonce at startup. HTTP/2 multiplexes many concurrent streams over a single TCP connection — don’t open a newgrpc.Dialper request. - Context cancellation is automatic. When the browser closes the SSE tab,
r.Context()is cancelled, which propagates to the gRPC stream —stream.Recv()returns an error and the goroutine exits cleanly. No manual cleanup needed. - Heartbeat resets on activity. The
heartbeat.Reset(heartbeatInterval)call means the ping only fires during true idle gaps (model thinking), not while tokens are actively streaming. - Hard timeout as a safety net.
context.WithTimeout(3 * time.Minute)ensures a stuck Python worker can’t hold a goroutine and browser connection indefinitely.