实时通信 WebSocket(x/websocket)
实时通信 WebSocket
Section titled “实时通信 WebSocket”本指南展示如何使用 x/websocket 为 Plumego 服务添加 WebSocket 支持。你将升级 HTTP 连接、将客户端加入命名房间,并向已连接客户端广播消息。
x/websocket 为 experimental 状态。在生产中使用前请参阅发布策略和扩展成熟度。
边界说明见 x/websocket Primer。
第一步 — 创建 Server
Section titled “第一步 — 创建 Server”x/websocket.New 返回一个 *Server,它包含 Hub、认证处理器和路由注册:
import ( "os"
"github.com/spcent/plumego/x/websocket")
cfg := websocket.DefaultWebSocketConfig()cfg.WSRoutePath = "/ws" // WebSocket 升级端点cfg.Secret = []byte(os.Getenv("WS_SECRET"))cfg.MaxRoomRegistrations = 1000cfg.MaxRoomConnections = 100// admin broadcast 默认关闭。启用时使用独立 secret 或 BroadcastAuthorizer,// 不要复用 cfg.Secret。cfg.BroadcastEnabled = truecfg.BroadcastSecret = []byte(os.Getenv("WS_BROADCAST_SECRET"))
ws, err := websocket.New(cfg)if err != nil { return err}DefaultWebSocketConfig() 为 worker 数量、队列大小和超时设置了安全默认值。它不会读取环境变量;请在应用 wiring 中显式传入 secret。
第二步 — 注册路由
Section titled “第二步 — 注册路由”在应用路由器上挂载 WebSocket 端点:
import "github.com/spcent/plumego/router"
r := router.NewRouter()if err := ws.RegisterRoutes(r); err != nil { log.Fatal(err)}RegisterRoutes 在 cfg.WSRoutePath 挂载升级 handler。客户端通过标准 WebSocket 握手连接到该路径。
第三步 — 向房间广播
Section titled “第三步 — 向房间广播”获取 Hub 并从任意 goroutine 发送消息:
hub := ws.Hub()
// 向 "chat:general" 房间的所有连接广播文本消息hub.BroadcastRoom("chat:general", websocket.OpcodeText, []byte(`{"type":"message","text":"Hello"}`))
// 向所有房间的所有连接广播hub.BroadcastAll(websocket.OpcodeText, []byte(`{"type":"ping"}`))对 JSON 或 UTF-8 字符串使用 websocket.OpcodeText,对二进制帧使用 websocket.OpcodeBinary。
第四步 — 带房间和认证的自定义 handler
Section titled “第四步 — 带房间和认证的自定义 handler”如需完全控制 — 自定义房间名称、JWT 认证、连接元数据 — 直接在自己的 handler 中调用 ServeWSWithConfig:
import ( "net/http" "time" "github.com/spcent/plumego/x/websocket")
type ChatHandler struct { hub *websocket.Hub token websocket.TokenAuthenticator room websocket.RoomAuthorizer}
func (h *ChatHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { websocket.ServeWSWithConfig(w, r, websocket.ServerConfig{ Hub: h.hub, TokenAuth: h.token, RoomAuth: h.room, QueueSize: 256, // 每个连接的发送队列大小 SendTimeout: 5*time.Second, // 发送超时 SendBehavior: websocket.SendDrop, // 队列满时丢弃消息 AllowedOrigins: []string{"https://app.example.com"}, })}在启动时 wiring auth 和 hub:
tokenAuth, err := websocket.NewSimpleHS256TokenAuth([]byte(os.Getenv("WS_SECRET")))if err != nil { return err}roomAuth := websocket.NewSimpleRoomAuth()hub, err := websocket.NewHubWithConfig(websocket.HubConfig{WorkerCount: 4, JobQueueSize: 512})if err != nil { return err}
r.Get("/ws/chat", &ChatHandler{hub: hub, token: tokenAuth, room: roomAuth})用每个房间的密码保护特定房间:
auth.SetRoomPassword("vip-room", "s3cr3t")客户端在升级请求时通过 X-WebSocket-Room-Password header 提供密码。
第五步 — 查看 Hub 状态
Section titled “第五步 — 查看 Hub 状态”// 某个房间的活跃连接数count := hub.GetRoomCount("chat:general")
// 所有房间的连接-房间注册数registrations := hub.GetRoomRegistrationCount()
// 列出所有房间名称rooms := hub.GetRooms()第六步 — 优雅关闭
Section titled “第六步 — 优雅关闭”在应用关闭序列中关闭 WebSocket Server:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()
if err := ws.Shutdown(ctx); err != nil { logger.Error("websocket shutdown error", log.Fields{"err": err.Error()})}Shutdown 停止接受新连接,排空任务队列,并在返回前关闭所有已打开的连接。
SendBehavior 选项
Section titled “SendBehavior 选项”| 常量 | 客户端队列满时的行为 |
|---|---|
websocket.SendBlock | 阻塞直到有空间 |
websocket.SendDrop | 静默丢弃消息 |
websocket.SendClose | 关闭连接 |
对于慢客户端不应阻塞广播方的实时信息流,使用 SendDrop。当消息投递是关键需求且延迟有界时,使用 SendBlock。