【Go语言实战】基于 WebSocket + MongoDB 的IM即时聊天Demo
文章目录
写在前面
这个项目是基于WebSocket + MongoDB + MySQL + Redis。
业务逻辑很简单,只是两人的聊天。
MySQL
用来存储用户基本信息MongoDB 20000
用来存放用户聊天信息Redis
用来存储处理过期信息
github地址
https://github.com/CocaineCong/gin-chat-demo
1. WebSocket原理
WebSocket是应用层第七层上的一个应用层协议,它必须依赖 HTTP 协议进行一次握手。
握手成功后,数据就直接从TCP通道传输,与HTTP无关了。即:WebSocket分为握手和数据传输阶段。
即进行了HTTP握手 + 双工的TCP连接。
WebSocket 是一种在单个TCP连接上进行
全双工通信的协议。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
如果只是想左图这样的不断发送http请求,轮询的效率是非常低,非常浪费资源,所以就有了websocket协议了,建立在 TCP 协议之上,服务器端的实现比较容易。
WebSocket协议一旦建立之后,互相沟通所消耗的请求头是很小的,服务器向客户端推送消息的功耗就小了。
2. 具体流程
2.1 定义类型
- 发送消息的结构体
type SendMsg struct { Type int `json:"type"` Content string `json:"content"` }
- 回复消息的结构体
type ReplyMsg struct { From string `json:"from"` Code int `json:"code"` Content string `json:"content"` }
- 用户结构体
type Client struct { ID string SendID string Socket *websocket.Conn Send chan []byte }
- 广播类(包括广播内容和源用户)
type Broadcast struct { Client *Client Message []byte Type int }
- 用户管理
type ClientManager struct { Clients map[string]*Client Broadcast chan *Broadcast Reply chan *Client Register chan *Client Unregister chan *Client }
- 信息转JSON (包括:发送者、接收者、内容)
type Message struct { Sender string `json:"sender,omitempty"` Recipient string `json:"recipient,omitempty"` Content string `json:"content,omitempty"` }
2.2 进行连接
- 定义一个管理
Manager
var Manager = ClientManager{ Clients : make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数 Broadcast: make(chan *Broadcast), Register : make(chan *Client), Reply : make(chan *Client), Unregister: make(chan *Client), }
2.2.1 服务器监听连接
用 for 不断进行监听查看哪个用户进入通道通信,对用户一旦有用户进来,就 Register 进行注册
for { case conn := <- Manager.Register: log.Printf("建立新连接: %v", conn.ID) Manager.Clients[conn.ID] = conn replyMsg := &ReplyMsg{ Code: e.WebsocketSuccess, Content: "已连接至服务器", } msg , _ := json.Marshal(replyMsg) _ = conn.Socket.WriteMessage(websocket.TextMessage, msg) }
2.2.2 服务器监听断开连接
同样的,也可以用来对服务器和用户之间连接的断开。
case conn := <-Manager.Unregister: // 断开连接 log.Printf("连接失败:%v", conn.ID) if _, ok := Manager.Clients[conn.ID]; ok { replyMsg := &ReplyMsg{ Code: e.WebsocketEnd, Content: "连接已断开", } msg , _ := json.Marshal(replyMsg) _ = conn.Socket.WriteMessage(websocket.TextMessage, msg) close(conn.Send) delete(Manager.Clients, conn.ID) }
2.2.3 用户连接服务器
我们采用的是
gin框架,所以这里我们可以先引入路由
r := gin.Default() r.Use(gin.Recovery(),gin.Logger()) v1 := r.Group("/") { v1.GET("ping", func(c *gin.Context) { c.JSON(200,"SUCCESS") }) v1.GET("ws",service.WsHandler) }
再在
service层创建一个
handler处理
- 读取两人的id
uid:=c.Query("uid") // 自己的id toUid:=c.Query("toUid") // 对方的id
- 升级ws协议
conn, err := (&websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题 return true }}).Upgrade(c.Writer, c.Request, nil) // 升级成ws协议
- 创建用户实例
client := &Client{ ID : createId(uid,toUid), SendID : createId(toUid,uid), Socket : conn, Send : make(chan []byte), }
- 用户注册到用户管理上面
Manager.Register <- client
- 开通两个协程, 一个读,一个写
go client.Read() go client.Write()
2.3 写入
2.3.1 定义类型
我们定义的接受类型是json形式,结构体如下
我们这里设计了几个
type
-
type = 1 接受消息
-
type = 2 获取历史消息
type SendMsg struct { Type int `json:"type"` Content string `json:"content"` }
2.3.2 读取数据
先用
PongHandler返回当前的
socket对象
c.Socket.PongHandler() sendMsg := new(SendMsg) // _,msg,_:=c.Socket.ReadMessage() // 不是json格式用这个 err := c.Socket.ReadJSON(&sendMsg) // json格式就用这个
2.3.3 接受消息
如果传过来的
type=1的话,那么我们就可以先去redis上面查询一下当前有多少人进行了连接。
r1 ,_ := cache.RedisClient.Get(c.ID).Result() r2 ,_ := cache.RedisClient.Get(c.SendID).Result()
如果有三个人在线上,并且没有接受消息的话,就拒绝访问。
replyMsg := &ReplyMsg{ Code: e.WebsocketLimit, Content: "达到限制", } msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage, msg)
如果没有的话,就先记录到
redis中进行缓存
cache.RedisClient.Incr(c.ID) _ , _ =cache.RedisClient.Expire(c.ID,time.Hour*24*30*3).Result()
之后,我们再进行广播消息
Manager.Broadcast <- &Broadcast{ Client:c, Message:[]byte(sendMsg.Content), }
2.3.3 获取历史消息
那这个时候我们传来的 type 就等于
2,Content就是
时间戳了
我们设置的话,是只保存三个月的,三个月过后我们就可以删除了。
timeT, err := strconv.Atoi(sendMsg.Content) // 传送来时间 if err != nil { timeT = 9999999 } results, _ := FindManyMsg(conf.MongoDBName,c.SendID,c.ID,int64(timeT),10)
这个
FindManyMsg后面再说
返回前十条
if len(results) > 10 { results = results[:10] }else if len(results) == 0{ replyMsg := &ReplyMsg{ Code:e.WebsocketEnd, Content:"到底了", }
写入返回
msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage,msg)
2.4 读取
我们用一个for循环进行消息的读取。
如果有消息的话,就
WriteMessage写下来。发送出去。
for{ select { case message, ok := <-c.Send : if !ok { _=c.Socket.WriteMessage(websocket.CloseMessage,[]byte{}) return } log.Println(c.ID,"接受消息:",string(message)) replyMsg := &ReplyMsg{ Code:e.WebsocketSuccessMessage, Content:fmt.Sprintf("%s",string(message)), } msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage, msg) } }
2.5 插入与查询
2.5.1 插入数据
我们使用的是mongoDB进行消息的存储,MongoDB的插入非常简单,文档数据库,插入json格式即可。
- 定义一个存储的数据类型
type Trainer struct { Content string `bson:"content"` // 内容 StartTime int64 `bson:"startTime"` // 创建时间 EndTime int64 `bson:"endTime"` // 过期时间 Read uint `bson:"read"` // 已读 }
- 传入数据库,用户ID,内容,是否已读,过期时间
func InsertMsg(database string, id string, content string, read uint, expire int64) (err error) { collection := conf.MongoDBClient.Database(database).Collection(id) comment := ws.Trainer{ Content: content, StartTime: time.Now().Unix(), EndTime: time.Now().Unix() + expire, Read: read, } _, err = collection.InsertOne(context.TODO(),comment) return }
2.5.2 查询数据
MongoDB的查询也非常容易,按照json格式进行查询。
- 定义一个存储对象的切片
var resultsMe []ws.Trainer
- 通过用户id查询所有的用户消息
idCollection := conf.MongoDBClient.Database(database).Collection(id)
- 根据传入的
time
定义一个过滤器,进行这个时间内的查询。
filter := bson.M{"startTime": bson.M{"$lt": time}}
- 根据
filter
进行查询,然后再通过时间进行倒序排序
,并且限定页数
。
sendIdTimeCursor, err := sendIdCollection.Find(context.TODO(), filter, options.Find().SetSort(bson.D{{"StartTime", -1}}), options.Find(). SetLimit(int64(pageSize)))
- 把数据查询数据传入到resultsMe中
err = idTimeCurcor.All(context.TODO(), &resultsMe)
2.6 对方不在线
- 广播信息
case broadcast := <-Manager.Broadcast: message := broadcast.Message sendId := broadcast.Client.SendID flag := false // 默认对方不在线
- 如果没有这个人的话就一直找就可以了
for id, conn := range Manager.Clients { if id != sendId { continue } select { case conn.Send <- message: flag = true default: close(conn.Send) delete(Manager.Clients, conn.ID) } }
- 还是找到的话
就可以当作已读信息,存储
if flag { log.Println("对方在线应答") replyMsg := &ReplyMsg{ Code: e.WebsocketOnlineReply, Content: "对方在线应答", } msg , err := json.Marshal(replyMsg) _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg) err = InsertMsg(conf.MongoDBName, id, string(message), 1, int64(3*month)) if err != nil { fmt.Println("InsertOneMsg Err", err) } }
- 如果没有找到的话,就是未读消息了。
else { log.Println("对方不在线") replyMsg := ReplyMsg{ Code: e.WebsocketOfflineReply, Content: "对方不在线应答", } msg , err := json.Marshal(replyMsg) _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg) err = InsertMsg(conf.MongoDBName, id, string(message), 0, int64(3*month)) if err != nil { fmt.Println("InsertOneMsg Err", err) } }
3. 演示
- 测试http连接
- 进行ws连接,连接服务器
- 当id=1上线,但是id=2没上线的时候发送消息
- 当id=2上线之后
- 再次发消息,就是在线应答了
- 这边就实时接受到消息了
- 获取历史信息
4. 源码地址
github地址
https://github.com/CocaineCong/gin-chat-demo
- 基于node.js+Express.js+Jade+MongoDB开发Web即时聊天系统
- 基于node.js+Express.js+Jade+MongoDB开发Web即时聊天系统
- 基于Express+Socket.io+MongoDB的即时聊天系统的设计与实现
- 实现基于vue全家桶+mint-ui+node.js+socket.io+MongoDB模仿微信的网页即时聊天项目
- golang实战使用gin+xorm搭建go语言web框架restgo详解2 框架基本架构
- 基于 Web 的 Go 语言 IDE - Wide 1.4.0 发布!
- 基于 Web 的 Go 语言 IDE - Wide 1.5.1 发布!
- node-socket实现web的即时聊天系统
- [置顶] Web端即时聊天项目实现(基于WebSocket)
- tornado+websocket+mongodb实现在线视屏文字聊天
- 基于 Web 的 Go 语言 IDE - Wide 1.5.2 发布!
- golang实战使用gin+xorm搭建go语言web框架restgo详解3 系统常用配置参数
- golang实战使用gin+xorm搭建go语言web框架restgo详解9 session、日志、鉴权、验证码等
- Android基于环信SDK开发IM即时聊天
- 基于 Web 的 Go 语言 IDE - Wide 1.5.1 发布!
- netty学习三:基于socket的聊天小demo
- 基于 Web 的 Go 语言 IDE - Wide 1.1.0 发布!
- 基于 Web 的 Go 语言 IDE - Wide 1.5.2 发布!
- 基于 Web 的 Go 语言 IDE - Wide 1.2.0 发布!
- golang实战使用gin+xorm搭建go语言web框架restgo详解4 路由配置