您的位置:首页 > 数据库 > Mongodb

【Go语言实战】基于 WebSocket + MongoDB 的IM即时聊天Demo

2021-12-09 09:46 1431 查看

文章目录

  • 2.3 写入
  • 2.4 读取
  • 2.5 插入与查询
  • 2.6 对方不在线
  • 3. 演示
  • 4. 源码地址
  • 写在前面

    这个项目是基于WebSocket + MongoDB + MySQL + Redis。
    业务逻辑很简单,只是两人的聊天。

    • MySQL
      用来存储用户基本信息
    • MongoDB
      20000
      用来存放用户聊天信息
    • Redis
      用来存储处理过期信息

    github地址

    https://github.com/CocaineCong/gin-chat-demo

    1. WebSocket原理

    WebSocket
    是应用层第七层上的一个应用层协议,它必须依赖 HTTP 协议进行一次握手。
    握手成功后,数据就直接从
    TCP
    通道传输,与
    HTTP
    无关了。即:
    WebSocket
    分为握手和数据传输阶段。
    即进行了HTTP握手 + 双工的TCP连接。

    WebSocket 是一种在单个TCP连接上进行

    全双工通信
    的协议。

    WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。


    如果只是想左图这样的不断发送http请求,轮询的效率是非常低,非常浪费资源,所以就有了websocket协议了,建立在 TCP 协议之上,服务器端的实现比较容易。

    WebSocket协议一旦建立之后,互相沟通所消耗的请求头是很小的,服务器向客户端推送消息的功耗就小了。

    2. 具体流程

    2.1 定义类型

    • 发送消息的结构体
    type SendMsg struct {
    Type int `json:"type"`
    Content string `json:"content"`
    }
    • 回复消息的结构体
    type ReplyMsg struct {
    From 	string  `json:"from"`
    Code 	int 	`json:"code"`
    Content string  `json:"content"`
    }
    • 用户结构体
    type Client struct {
    ID 		string
    SendID 	string
    Socket  *websocket.Conn
    Send chan []byte
    }
    • 广播类(包括广播内容和源用户)
    type Broadcast struct {
    Client 	*Client
    Message []byte
    Type 	int
    }
    • 用户管理
    type ClientManager struct {
    Clients 	map[string]*Client
    Broadcast 	chan *Broadcast
    Reply 		chan *Client
    Register 	chan *Client
    Unregister  chan *Client
    }
    • 信息转JSON (包括:发送者、接收者、内容)
    type Message struct {
    Sender 		string 		`json:"sender,omitempty"`
    Recipient 	string 		`json:"recipient,omitempty"`
    Content 	string 		`json:"content,omitempty"`
    }

    2.2 进行连接

    • 定义一个管理
      Manager
    var Manager  = ClientManager{
    Clients	 : 	make(map[string]*Client),   // 参与连接的用户,出于性能的考虑,需要设置最大连接数
    Broadcast:	make(chan *Broadcast),
    Register : 	make(chan *Client),
    Reply 	 : 	make(chan *Client),
    Unregister:	make(chan *Client),
    }

    2.2.1 服务器监听连接

    用 for 不断进行监听查看哪个用户进入通道通信,对用户一旦有用户进来,就 Register 进行注册

    for {
    case conn := <- Manager.Register:
    log.Printf("建立新连接: %v", conn.ID)
    Manager.Clients[conn.ID] = conn
    replyMsg := &ReplyMsg{
    Code:    e.WebsocketSuccess,
    Content: "已连接至服务器",
    }
    msg , _ := json.Marshal(replyMsg)
    _ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
    }

    2.2.2 服务器监听断开连接

    同样的,也可以用来对服务器和用户之间连接的断开。

    case conn := <-Manager.Unregister: // 断开连接
    log.Printf("连接失败:%v", conn.ID)
    if _, ok := Manager.Clients[conn.ID]; ok {
    replyMsg := &ReplyMsg{
    Code:    e.WebsocketEnd,
    Content: "连接已断开",
    }
    msg , _ := json.Marshal(replyMsg)
    _ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
    close(conn.Send)
    delete(Manager.Clients, conn.ID)
    }

    2.2.3 用户连接服务器

    我们采用的是

    gin框架
    ,所以这里我们可以先引入路由

    r := gin.Default()
    r.Use(gin.Recovery(),gin.Logger())
    v1 := r.Group("/")
    {
    v1.GET("ping", func(c *gin.Context) {
    c.JSON(200,"SUCCESS")
    })
    v1.GET("ws",service.WsHandler)
    }

    再在

    service层
    创建一个
    handler
    处理

    • 读取两人的id
    uid:=c.Query("uid") // 自己的id
    toUid:=c.Query("toUid") // 对方的id
    • 升级ws协议
    conn, err := (&websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题
    return true
    }}).Upgrade(c.Writer, c.Request, nil) // 升级成ws协议
    • 创建用户实例
    client := &Client{
    ID 		: createId(uid,toUid),
    SendID	: createId(toUid,uid),
    Socket	: conn,
    Send	: make(chan []byte),
    }
    • 用户注册到用户管理上面
    Manager.Register <- client
    • 开通两个协程, 一个读,一个写
    go client.Read()
    go client.Write()

    2.3 写入

    2.3.1 定义类型

    我们定义的接受类型是json形式,结构体如下
    我们这里设计了几个

    type

    • type = 1 接受消息

    • type = 2 获取历史消息

    type SendMsg struct {
    Type    int `json:"type"`
    Content string `json:"content"`
    }

    2.3.2 读取数据

    先用

    PongHandler
    返回当前的
    socket
    对象

    c.Socket.PongHandler()
    sendMsg := new(SendMsg)
    // _,msg,_:=c.Socket.ReadMessage() // 不是json格式用这个
    err := c.Socket.ReadJSON(&sendMsg)  // json格式就用这个

    2.3.3 接受消息

    如果传过来的

    type=1
    的话,那么我们就可以先去redis上面查询一下当前有多少人进行了连接。

    r1 ,_ := cache.RedisClient.Get(c.ID).Result()
    r2 ,_ := cache.RedisClient.Get(c.SendID).Result()

    如果有三个人在线上,并且没有接受消息的话,就拒绝访问。

    replyMsg := &ReplyMsg{
    Code:    e.WebsocketLimit,
    Content: "达到限制",
    }
    msg , _ := json.Marshal(replyMsg)
    _ = c.Socket.WriteMessage(websocket.TextMessage, msg)

    如果没有的话,就先记录到

    redis
    中进行缓存

    cache.RedisClient.Incr(c.ID)
    _ , _ =cache.RedisClient.Expire(c.ID,time.Hour*24*30*3).Result()

    之后,我们再进行广播消息

    Manager.Broadcast <- &Broadcast{
    Client:c,
    Message:[]byte(sendMsg.Content),
    }

    2.3.3 获取历史消息

    那这个时候我们传来的 type 就等于

    2
    ,Content就是
    时间戳

    我们设置的话,是只保存三个月的,三个月过后我们就可以删除了。

    timeT, err := strconv.Atoi(sendMsg.Content) // 传送来时间
    if err != nil {
    timeT = 9999999
    }
    results, _ := FindManyMsg(conf.MongoDBName,c.SendID,c.ID,int64(timeT),10)

    这个

    FindManyMsg
    后面再说
    返回前十条

    if len(results) > 10 {
    results = results[:10]
    }else if len(results) == 0{
    replyMsg := &ReplyMsg{
    Code:e.WebsocketEnd,
    Content:"到底了",
    }

    写入返回

    msg , _ := json.Marshal(replyMsg)
    _ = c.Socket.WriteMessage(websocket.TextMessage,msg)

    2.4 读取

    我们用一个for循环进行消息的读取。

    如果有消息的话,就

    WriteMessage
    写下来。发送出去。

    for{
    select {
    case message, ok := <-c.Send :
    if !ok {
    _=c.Socket.WriteMessage(websocket.CloseMessage,[]byte{})
    return
    }
    log.Println(c.ID,"接受消息:",string(message))
    replyMsg := &ReplyMsg{
    Code:e.WebsocketSuccessMessage,
    Content:fmt.Sprintf("%s",string(message)),
    }
    msg , _ := json.Marshal(replyMsg)
    _ = c.Socket.WriteMessage(websocket.TextMessage, msg)
    }
    }

    2.5 插入与查询

    2.5.1 插入数据

    我们使用的是mongoDB进行消息的存储,MongoDB的插入非常简单,文档数据库,插入json格式即可。

    • 定义一个存储的数据类型
    type Trainer struct {
    Content 	string `bson:"content"`   // 内容
    StartTime 	int64  `bson:"startTime"` // 创建时间
    EndTime 	int64  `bson:"endTime"`   // 过期时间
    Read 		uint   `bson:"read"`      // 已读
    }
    • 传入数据库,用户ID,内容,是否已读,过期时间
    func InsertMsg(database string, id string, content string, read uint, expire int64) (err error) {
    collection := conf.MongoDBClient.Database(database).Collection(id)
    comment := ws.Trainer{
    Content:   content,
    StartTime: time.Now().Unix(),
    EndTime:   time.Now().Unix() + expire,
    Read:      read,
    }
    _, err = collection.InsertOne(context.TODO(),comment)
    return
    }

    2.5.2 查询数据

    MongoDB的查询也非常容易,按照json格式进行查询。

    • 定义一个存储对象的切片
    var resultsMe []ws.Trainer
    • 通过用户id查询所有的用户消息
    idCollection := conf.MongoDBClient.Database(database).Collection(id)
    • 根据传入的
      time
      定义一个过滤器,进行这个时间内的查询。
    filter := bson.M{"startTime": bson.M{"$lt": time}}
    • 根据
      filter
      进行查询,然后再通过时间进行
      倒序排序
      ,并且
      限定页数
    sendIdTimeCursor, err := sendIdCollection.Find(context.TODO(), filter,
    options.Find().SetSort(bson.D{{"StartTime", -1}}), options.Find().
    SetLimit(int64(pageSize)))
    • 把数据查询数据传入到resultsMe中
    err = idTimeCurcor.All(context.TODO(), &resultsMe)

    2.6 对方不在线

    • 广播信息
    case broadcast := <-Manager.Broadcast:
    message := broadcast.Message
    sendId := broadcast.Client.SendID
    flag := false // 默认对方不在线
    • 如果没有这个人的话就一直找就可以了
    for id, conn := range Manager.Clients {
    if id != sendId {
    continue
    }
    select {
    case conn.Send <- message:
    flag = true
    default:
    close(conn.Send)
    delete(Manager.Clients, conn.ID)
    }
    }
    • 还是找到的话

    就可以当作已读信息,存储

    if flag {
    log.Println("对方在线应答")
    replyMsg := &ReplyMsg{
    Code:    e.WebsocketOnlineReply,
    Content: "对方在线应答",
    }
    msg , err := json.Marshal(replyMsg)
    _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg)
    err = InsertMsg(conf.MongoDBName, id, string(message), 1, int64(3*month))
    if err != nil {
    fmt.Println("InsertOneMsg Err", err)
    }
    }
    • 如果没有找到的话,就是未读消息了。
    else {
    log.Println("对方不在线")
    replyMsg := ReplyMsg{
    Code:    e.WebsocketOfflineReply,
    Content: "对方不在线应答",
    }
    msg , err := json.Marshal(replyMsg)
    _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg)
    err = InsertMsg(conf.MongoDBName, id, string(message), 0, int64(3*month))
    if err != nil {
    fmt.Println("InsertOneMsg Err", err)
    }
    }

    3. 演示

    • 测试http连接

    • 进行ws连接,连接服务器

    • 当id=1上线,但是id=2没上线的时候发送消息
    • 当id=2上线之后

    • 再次发消息,就是在线应答了

    • 这边就实时接受到消息了
    • 获取历史信息

    4. 源码地址

    github地址

    https://github.com/CocaineCong/gin-chat-demo

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: