Skip to content

Integrate AI (x/ai)

This guide shows how to add AI completion endpoints to a Plumego service using x/ai. You will wire a provider, create a session, call the model, and stream the response back to the client.

For the module boundary rationale, see the x/ai Primer.

The release matrix still treats the x/ai family as an experimental extension family. Inside that family, x/ai/provider, x/ai/session, x/ai/streaming, and x/ai/tool are stable tier and follow semantic versioning. Other subpackages (orchestration, semanticcache, marketplace) are experimental and may change without a deprecation period. Check Release Posture, Extension Maturity, and each module.yaml before depending on a subpackage in a production service path.

  • Initializing a provider (Claude or OpenAI)
  • Sending a single completion
  • Managing conversation sessions
  • Streaming a completion via Server-Sent Events
import (
"github.com/spcent/plumego/x/ai/provider"
)
// Claude
p := provider.NewClaudeProvider(os.Getenv("ANTHROPIC_API_KEY"))
// OpenAI
p := provider.NewOpenAIProvider(os.Getenv("OPENAI_API_KEY"))

Inject the provider into your application’s dependency struct so every handler receives it explicitly:

type App struct {
Core *core.App
AI provider.Provider
}
func New(cfg Config) (*App, error) {
p := provider.NewClaudeProvider(cfg.AnthropicKey)
// ...
return &App{Core: app, AI: p}, nil
}
func (h AIHandler) Complete(w http.ResponseWriter, r *http.Request) {
var req struct {
Model string `json:"model"`
Message string `json:"message"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
contract.WriteError(w, r, contract.NewErrorBuilder().
Type(contract.TypeValidation).
Code(contract.CodeInvalidJSON).
Message("invalid request body").
Build())
return
}
resp, err := h.provider.Complete(r.Context(), &provider.CompletionRequest{
Model: req.Model,
System: "You are a helpful assistant.",
Messages: []provider.Message{
{Role: provider.RoleUser, Content: []provider.ContentBlock{
{Type: provider.ContentTypeText, Text: req.Message},
}},
},
MaxTokens: 1024,
})
if err != nil {
contract.WriteError(w, r, contract.NewErrorBuilder().
Type(contract.TypeUnavailable).Message("AI provider error").Build())
return
}
text := ""
for _, block := range resp.Content {
if block.Type == provider.ContentTypeText {
text += block.Text
}
}
contract.WriteResponse(w, r, http.StatusOK, map[string]string{"reply": text}, nil)
}

Sessions persist conversation history so multi-turn exchanges work across requests.

import (
"github.com/spcent/plumego/x/ai/session"
"github.com/spcent/plumego/x/ai/provider"
)
// Build the session manager once (e.g. in app.New)
storage := session.NewMemoryStorage()
mgr := session.NewManager(storage, session.WithConfig(session.DefaultConfig()))
// Create a session for a new conversation
sess, err := mgr.Create(ctx, session.CreateOptions{
Model: "claude-sonnet-4-6",
System: "You are a helpful assistant.",
})
// Append a user turn
mgr.AppendMessage(ctx, sess.ID, provider.Message{
Role: provider.RoleUser,
Content: []provider.ContentBlock{
{Type: provider.ContentTypeText, Text: userMessage},
},
})
// Read the conversation window (respects token budget)
messages, _ := mgr.GetActiveContext(ctx, sess.ID, 4096)
// Call the provider with the full context
resp, _ := h.provider.Complete(ctx, &provider.CompletionRequest{
Model: sess.Model,
System: sess.SystemPrompt,
Messages: messages,
})
// Append the assistant reply
for _, block := range resp.Content {
if block.Type == provider.ContentTypeText {
mgr.AppendMessage(ctx, sess.ID, provider.Message{
Role: provider.RoleAssistant,
Content: []provider.ContentBlock{block},
})
}
}

Pass sess.ID back to the client in the response so subsequent requests can resume the same conversation.

func (h AIHandler) Stream(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
return
}
reader, err := h.provider.CompleteStream(r.Context(), &provider.CompletionRequest{
Model: "claude-sonnet-4-6",
Messages: []provider.Message{
{Role: provider.RoleUser, Content: []provider.ContentBlock{
{Type: provider.ContentTypeText, Text: r.URL.Query().Get("q")},
}},
},
MaxTokens: 2048,
Stream: true,
})
if err != nil {
fmt.Fprintf(w, "data: {\"error\":\"stream failed\"}\n\n")
flusher.Flush()
return
}
for {
chunk, err := reader.Parse()
if err == io.EOF {
fmt.Fprintf(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
if err != nil {
return
}
if chunk == nil {
continue
}
for _, block := range chunk.Content {
if block.Text != "" {
fmt.Fprintf(w, "data: %s\n\n", block.Text)
flusher.Flush()
}
}
}
}
aiH := AIHandler{provider: a.AI, sessions: a.SessionMgr}
app.Post("/api/ai/complete", http.HandlerFunc(aiH.Complete))
app.Post("/api/ai/chat", http.HandlerFunc(aiH.Chat))
app.Get("/api/ai/stream", http.HandlerFunc(aiH.Stream))