Real-Time with WebSockets (x/websocket)
Real-Time with WebSockets
Section titled “Real-Time with WebSockets”This guide shows how to add WebSocket support to a Plumego service using x/websocket. You will upgrade HTTP connections, join clients to named rooms, and broadcast messages to connected clients.
x/websocket is experimental. See Release Posture and Extension Maturity before depending on it in production.
Boundary rationale is in the x/websocket Primer.
Step 1 — Create the Server
Section titled “Step 1 — Create the Server”x/websocket.New returns a *Server that owns a Hub, an auth handler, and its own route registration:
import ( "os"
"github.com/spcent/plumego/x/websocket")
cfg := websocket.DefaultWebSocketConfig()cfg.WSRoutePath = "/ws" // WebSocket upgrade endpointcfg.Secret = []byte(os.Getenv("WS_SECRET"))cfg.MaxRoomRegistrations = 1000cfg.MaxRoomConnections = 100// Admin broadcast is disabled by default. If you enable it, use a dedicated// secret or BroadcastAuthorizer instead of reusing cfg.Secret.cfg.BroadcastEnabled = truecfg.BroadcastSecret = []byte(os.Getenv("WS_BROADCAST_SECRET"))
ws, err := websocket.New(cfg)if err != nil { return err}DefaultWebSocketConfig() sets safe defaults for worker count, queue sizes, and timeouts. It does not read environment variables; pass secrets from your application wiring.
Step 2 — Register routes
Section titled “Step 2 — Register routes”Mount the WebSocket endpoint on your application router:
import "github.com/spcent/plumego/router"
r := router.NewRouter()if err := ws.RegisterRoutes(r); err != nil { log.Fatal(err)}RegisterRoutes mounts the upgrade handler at cfg.WSRoutePath. Clients connect to that path using the standard WebSocket handshake.
Step 3 — Broadcast to a room
Section titled “Step 3 — Broadcast to a room”Retrieve the Hub and send messages from any goroutine:
hub := ws.Hub()
// Broadcast a text message to all connections in "chat:general"hub.BroadcastRoom("chat:general", websocket.OpcodeText, []byte(`{"type":"message","text":"Hello"}`))
// Broadcast to every connected client across all roomshub.BroadcastAll(websocket.OpcodeText, []byte(`{"type":"ping"}`))Use websocket.OpcodeText for JSON or UTF-8 strings and websocket.OpcodeBinary for binary frames.
Step 4 — Custom handler with rooms and auth
Section titled “Step 4 — Custom handler with rooms and auth”For full control — custom room names, JWT auth, connection metadata — call ServeWSWithConfig directly from your own handler:
import ( "net/http" "time" "github.com/spcent/plumego/x/websocket")
type ChatHandler struct { hub *websocket.Hub token websocket.TokenAuthenticator room websocket.RoomAuthorizer}
func (h *ChatHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { websocket.ServeWSWithConfig(w, r, websocket.ServerConfig{ Hub: h.hub, TokenAuth: h.token, RoomAuth: h.room, QueueSize: 256, // send queue size per connection SendTimeout: 5*time.Second, // send timeout SendBehavior: websocket.SendDrop, // drop message if queue full AllowedOrigins: []string{"https://app.example.com"}, })}Wire the auth and hub at startup:
tokenAuth, err := websocket.NewSimpleHS256TokenAuth([]byte(os.Getenv("WS_SECRET")))if err != nil { return err}roomAuth := websocket.NewSimpleRoomAuth()hub, err := websocket.NewHubWithConfig(websocket.HubConfig{WorkerCount: 4, JobQueueSize: 512})if err != nil { return err}
r.Get("/ws/chat", &ChatHandler{hub: hub, token: tokenAuth, room: roomAuth})Room passwords
Section titled “Room passwords”Protect specific rooms with a per-room password:
auth.SetRoomPassword("vip-room", "s3cr3t")Clients include the password in a X-WebSocket-Room-Password header during the upgrade request.
Step 5 — Inspect hub state
Section titled “Step 5 — Inspect hub state”// Number of active connections in a roomcount := hub.GetRoomCount("chat:general")
// Connection-room registrations across all roomsregistrations := hub.GetRoomRegistrationCount()
// List all room namesrooms := hub.GetRooms()Step 6 — Graceful shutdown
Section titled “Step 6 — Graceful shutdown”Shut down the WebSocket server as part of your application shutdown sequence:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()
if err := ws.Shutdown(ctx); err != nil { logger.Error("websocket shutdown error", log.Fields{"err": err.Error()})}Shutdown stops accepting new connections, drains the job queue, and closes all open connections before returning.
SendBehavior options
Section titled “SendBehavior options”| Constant | Effect when client queue is full |
|---|---|
websocket.SendBlock | Block until space is available |
websocket.SendDrop | Silently drop the message |
websocket.SendClose | Close the connection |
Use SendDrop for real-time feeds where a slow client should not stall the broadcaster. Use SendBlock when message delivery is critical and bounded latency is acceptable.