【笔记】Gin项目整合WebSocket

前言

Go语言的Gin项目整合WebSocket

添加依赖

1
go get github.com/gorilla/websocket

定义WebSocket客户端

ws/client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package ws

import (
"bytes"
"github.com/gorilla/websocket"
"log"
"time"
)

const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
maxMessageSize = 512
)

var (
newline = []byte{'\n'}
space = []byte{' '}
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

type Client struct {
Hub *Hub
Conn *websocket.Conn
Send chan []byte
}

func (client *Client) readPump() {
defer func() {
client.Hub.Unregister <- client
client.Conn.Close()
}()
client.Conn.SetReadLimit(maxMessageSize)
client.Conn.SetReadDeadline(time.Now().Add(pongWait))
client.Conn.SetPongHandler(func(string) error {
client.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := client.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
client.Hub.Broadcast <- message
}
}

func (client *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
client.Conn.Close()
}()
for {
select {
case message, ok := <-client.Send:
client.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
client.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := client.Conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)

n := len(client.Send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-client.Send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
client.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := client.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}

定义WebSocket处理中心

ws/hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package ws

import "sync"

type Hub struct {
Clients map[*Client]bool
Broadcast chan []byte
Register chan *Client
Unregister chan *Client
}

func NewHub() *Hub {
return &Hub{
Clients: make(map[*Client]bool),
Broadcast: make(chan []byte),
Register: make(chan *Client),
Unregister: make(chan *Client),
}
}

var once sync.Once
var singleton *Hub

func (hub *Hub) Run() {
for {
select {
case client := <-hub.Register:
hub.Clients[client] = true
case client := <-hub.Unregister:
if _, ok := hub.Clients[client]; ok {
delete(hub.Clients, client)
close(client.Send)
}
case message := <-hub.Broadcast:
for client := range hub.Clients {
select {
case client.Send <- message:
default:
close(client.Send)
delete(hub.Clients, client)
}
}
}
}
}

定义WebSocket控制器

ws/ws_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package ws

import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
)

var wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func wshandler(hub *Hub, writer http.ResponseWriter, request *http.Request) {
conn, err := wsupgrader.Upgrade(writer, request, nil)
if err != nil {
log.Panicln(err)
return
}
client := &Client{Hub: hub, Conn: conn, Send: make(chan []byte, 256)}
client.Hub.Register <- client
go client.writePump()
go client.readPump()
// 监听连接关闭事件
for {
_, _, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
// 连接正常关闭或正在关闭
fmt.Println("连接正常关闭:", err)
} else {
// 连接异常关闭
fmt.Println("连接异常关闭:", err)
}
client.Hub.Unregister <- client
break
}
}
}

func WebSocketController(context *gin.Context, hub *Hub) {
wshandler(hub, context.Writer, context.Request)
}

添加路由

main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"github.com/gin-gonic/gin"
)

func main() {

app := gin.Default()

hub := ws.NewHub()
go hub.Run()
app.GET("/api/webSocket/Connect", func(context *gin.Context) {
ws.WebSocketController(context, hub)
})

app.Run(":8080")
}

完成

参考文献

哔哩哔哩——方应杭讲编程
WebSocket在线测试工具
CSDN——曲江涛