跳转到内容

实时通信 WebSocket(x/websocket)

本指南展示如何使用 x/websocket 为 Plumego 服务添加 WebSocket 支持。你将升级 HTTP 连接、将客户端加入命名房间,并向已连接客户端广播消息。

x/websocket 为 experimental 状态。在生产中使用前请参阅发布策略扩展成熟度

边界说明见 x/websocket Primer

x/websocket.New 返回一个 *Server,它包含 Hub、认证处理器和路由注册:

import (
"os"
"github.com/spcent/plumego/x/websocket"
)
cfg := websocket.DefaultWebSocketConfig()
cfg.WSRoutePath = "/ws" // WebSocket 升级端点
cfg.Secret = []byte(os.Getenv("WS_SECRET"))
cfg.MaxRoomRegistrations = 1000
cfg.MaxRoomConnections = 100
// admin broadcast 默认关闭。启用时使用独立 secret 或 BroadcastAuthorizer,
// 不要复用 cfg.Secret。
cfg.BroadcastEnabled = true
cfg.BroadcastSecret = []byte(os.Getenv("WS_BROADCAST_SECRET"))
ws, err := websocket.New(cfg)
if err != nil {
return err
}

DefaultWebSocketConfig() 为 worker 数量、队列大小和超时设置了安全默认值。它不会读取环境变量;请在应用 wiring 中显式传入 secret。

在应用路由器上挂载 WebSocket 端点:

import "github.com/spcent/plumego/router"
r := router.NewRouter()
if err := ws.RegisterRoutes(r); err != nil {
log.Fatal(err)
}

RegisterRoutescfg.WSRoutePath 挂载升级 handler。客户端通过标准 WebSocket 握手连接到该路径。

获取 Hub 并从任意 goroutine 发送消息:

hub := ws.Hub()
// 向 "chat:general" 房间的所有连接广播文本消息
hub.BroadcastRoom("chat:general", websocket.OpcodeText, []byte(`{"type":"message","text":"Hello"}`))
// 向所有房间的所有连接广播
hub.BroadcastAll(websocket.OpcodeText, []byte(`{"type":"ping"}`))

对 JSON 或 UTF-8 字符串使用 websocket.OpcodeText,对二进制帧使用 websocket.OpcodeBinary

第四步 — 带房间和认证的自定义 handler

Section titled “第四步 — 带房间和认证的自定义 handler”

如需完全控制 — 自定义房间名称、JWT 认证、连接元数据 — 直接在自己的 handler 中调用 ServeWSWithConfig

import (
"net/http"
"time"
"github.com/spcent/plumego/x/websocket"
)
type ChatHandler struct {
hub *websocket.Hub
token websocket.TokenAuthenticator
room websocket.RoomAuthorizer
}
func (h *ChatHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
websocket.ServeWSWithConfig(w, r, websocket.ServerConfig{
Hub: h.hub,
TokenAuth: h.token,
RoomAuth: h.room,
QueueSize: 256, // 每个连接的发送队列大小
SendTimeout: 5*time.Second, // 发送超时
SendBehavior: websocket.SendDrop, // 队列满时丢弃消息
AllowedOrigins: []string{"https://app.example.com"},
})
}

在启动时 wiring auth 和 hub:

tokenAuth, err := websocket.NewSimpleHS256TokenAuth([]byte(os.Getenv("WS_SECRET")))
if err != nil {
return err
}
roomAuth := websocket.NewSimpleRoomAuth()
hub, err := websocket.NewHubWithConfig(websocket.HubConfig{WorkerCount: 4, JobQueueSize: 512})
if err != nil {
return err
}
r.Get("/ws/chat", &ChatHandler{hub: hub, token: tokenAuth, room: roomAuth})

用每个房间的密码保护特定房间:

auth.SetRoomPassword("vip-room", "s3cr3t")

客户端在升级请求时通过 X-WebSocket-Room-Password header 提供密码。

// 某个房间的活跃连接数
count := hub.GetRoomCount("chat:general")
// 所有房间的连接-房间注册数
registrations := hub.GetRoomRegistrationCount()
// 列出所有房间名称
rooms := hub.GetRooms()

在应用关闭序列中关闭 WebSocket Server:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ws.Shutdown(ctx); err != nil {
logger.Error("websocket shutdown error", log.Fields{"err": err.Error()})
}

Shutdown 停止接受新连接,排空任务队列,并在返回前关闭所有已打开的连接。

常量客户端队列满时的行为
websocket.SendBlock阻塞直到有空间
websocket.SendDrop静默丢弃消息
websocket.SendClose关闭连接

对于慢客户端不应阻塞广播方的实时信息流,使用 SendDrop。当消息投递是关键需求且延迟有界时,使用 SendBlock