Integrate AI (x/ai)
Integrate AI
Section titled “Integrate AI”This guide shows how to add AI completion endpoints to a Plumego service using x/ai. You will wire a provider, create a session, call the model, and stream the response back to the client.
For the module boundary rationale, see the x/ai Primer.
Stability note
Section titled “Stability note”The release matrix still treats the x/ai family as an experimental extension family. Inside that family, x/ai/provider, x/ai/session, x/ai/streaming, and x/ai/tool are stable tier and follow semantic versioning. Other subpackages (orchestration, semanticcache, marketplace) are experimental and may change without a deprecation period. Check Release Posture, Extension Maturity, and each module.yaml before depending on a subpackage in a production service path.
What this guide covers
Section titled “What this guide covers”- Initializing a provider (Claude or OpenAI)
- Sending a single completion
- Managing conversation sessions
- Streaming a completion via Server-Sent Events
Step 1 — Initialize a provider
Section titled “Step 1 — Initialize a provider”import ( "github.com/spcent/plumego/x/ai/provider")
// Claudep := provider.NewClaudeProvider(os.Getenv("ANTHROPIC_API_KEY"))
// OpenAIp := provider.NewOpenAIProvider(os.Getenv("OPENAI_API_KEY"))Inject the provider into your application’s dependency struct so every handler receives it explicitly:
type App struct { Core *core.App AI provider.Provider}
func New(cfg Config) (*App, error) { p := provider.NewClaudeProvider(cfg.AnthropicKey) // ... return &App{Core: app, AI: p}, nil}Step 2 — Send a completion
Section titled “Step 2 — Send a completion”func (h AIHandler) Complete(w http.ResponseWriter, r *http.Request) { var req struct { Model string `json:"model"` Message string `json:"message"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { contract.WriteError(w, r, contract.NewErrorBuilder(). Type(contract.TypeValidation). Code(contract.CodeInvalidJSON). Message("invalid request body"). Build()) return }
resp, err := h.provider.Complete(r.Context(), &provider.CompletionRequest{ Model: req.Model, System: "You are a helpful assistant.", Messages: []provider.Message{ {Role: provider.RoleUser, Content: []provider.ContentBlock{ {Type: provider.ContentTypeText, Text: req.Message}, }}, }, MaxTokens: 1024, }) if err != nil { contract.WriteError(w, r, contract.NewErrorBuilder(). Type(contract.TypeUnavailable).Message("AI provider error").Build()) return }
text := "" for _, block := range resp.Content { if block.Type == provider.ContentTypeText { text += block.Text } } contract.WriteResponse(w, r, http.StatusOK, map[string]string{"reply": text}, nil)}Step 3 — Manage conversation sessions
Section titled “Step 3 — Manage conversation sessions”Sessions persist conversation history so multi-turn exchanges work across requests.
import ( "github.com/spcent/plumego/x/ai/session" "github.com/spcent/plumego/x/ai/provider")
// Build the session manager once (e.g. in app.New)storage := session.NewMemoryStorage()mgr := session.NewManager(storage, session.WithConfig(session.DefaultConfig()))
// Create a session for a new conversationsess, err := mgr.Create(ctx, session.CreateOptions{ Model: "claude-sonnet-4-6", System: "You are a helpful assistant.",})
// Append a user turnmgr.AppendMessage(ctx, sess.ID, provider.Message{ Role: provider.RoleUser, Content: []provider.ContentBlock{ {Type: provider.ContentTypeText, Text: userMessage}, },})
// Read the conversation window (respects token budget)messages, _ := mgr.GetActiveContext(ctx, sess.ID, 4096)
// Call the provider with the full contextresp, _ := h.provider.Complete(ctx, &provider.CompletionRequest{ Model: sess.Model, System: sess.SystemPrompt, Messages: messages,})
// Append the assistant replyfor _, block := range resp.Content { if block.Type == provider.ContentTypeText { mgr.AppendMessage(ctx, sess.ID, provider.Message{ Role: provider.RoleAssistant, Content: []provider.ContentBlock{block}, }) }}Pass sess.ID back to the client in the response so subsequent requests can resume the same conversation.
Step 4 — Stream completions via SSE
Section titled “Step 4 — Stream completions via SSE”func (h AIHandler) Stream(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher) if !ok { return }
reader, err := h.provider.CompleteStream(r.Context(), &provider.CompletionRequest{ Model: "claude-sonnet-4-6", Messages: []provider.Message{ {Role: provider.RoleUser, Content: []provider.ContentBlock{ {Type: provider.ContentTypeText, Text: r.URL.Query().Get("q")}, }}, }, MaxTokens: 2048, Stream: true, }) if err != nil { fmt.Fprintf(w, "data: {\"error\":\"stream failed\"}\n\n") flusher.Flush() return }
for { chunk, err := reader.Parse() if err == io.EOF { fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() return } if err != nil { return } if chunk == nil { continue } for _, block := range chunk.Content { if block.Text != "" { fmt.Fprintf(w, "data: %s\n\n", block.Text) flusher.Flush() } } }}Wire the routes
Section titled “Wire the routes”aiH := AIHandler{provider: a.AI, sessions: a.SessionMgr}
app.Post("/api/ai/complete", http.HandlerFunc(aiH.Complete))app.Post("/api/ai/chat", http.HandlerFunc(aiH.Chat))app.Get("/api/ai/stream", http.HandlerFunc(aiH.Stream))