您的位置:首页 > 编程语言 > Java开发

SpringBoot学习-(十四)SpringBoot中建立WebSocket连接(STOMP实现发送消息给指定用户)

2017-09-29 15:15 696 查看
使用STOMP实现发送消息给指定用户步骤如下:

添加pom文件依赖

书写客户端用户实体类

书写客户端渠道拦截适配器

配置websocket stomp

书写控制层

书写客户端

1.添加pom文件依赖

<!-- springboot websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>


2.书写客户端用户实体类

自定义客户端用户实体类,封装来自于客户端的信息,相当于为每一个客户端提供唯一的标识

package com.ahut.entity;

import java.security.Principal;

/**
*
* @ClassName: User
* @Description: 客户端用户
* @author cheng
* @date 2017年9月29日 下午3:02:54
*/
public final class User implements Principal {

private final String name;

public User(String name) {
this.name = name;
}

@Override
public String getName() {
return name;
}
}


3.书写客户端渠道拦截适配器

利用拦截的方式,获取包含在stomp中的用户信息,并将认证的用户信息设置到当前的访问器中

package com.ahut.websocket;

import java.util.LinkedList;
import java.util.Map;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;

import com.ahut.entity.User;

/**
*
* @ClassName: UserInterceptor
* @Description: 客户端渠道拦截适配器
* @author cheng
* @date 2017年9月29日 下午2:40:12
*/
public class UserInterceptor extends ChannelInterceptorAdapter {

/**
* 获取包含在stomp中的用户信息
*/
@SuppressWarnings("rawtypes")
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
Object name = ((Map) raw).get("name");
if (name instanceof LinkedList) {
// 设置当前访问器的认证用户
accessor.setUser(new User(((LinkedList) name).get(0).toString()));
}
}
}
return message;
}
}


4.配置websocket stomp

package com.ahut.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import com.ahut.websocket.UserInterceptor;

/**
*
* @ClassName: WebSocketStompConfig
* @Description: springboot websocket stomp配置
* @author cheng
* @date 2017年9月27日 下午3:45:36
*/

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {

/**
* 注册stomp的端点
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
// 在网页上我们就可以通过这个链接
// http://localhost:8080/webSocketServer // 来和服务器的WebSocket连接
registry.addEndpoint("/webSocketServer").setAllowedOrigins("*").withSockJS();
}

/**
* 配置信息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 订阅Broker名称
registry.enableSimpleBroker("/queue", "/topic");
// 全局使用的消息前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");
// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
// registry.setUserDestinationPrefix("/user/");
}

/**
* 配置客户端入站通道拦截器
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(createUserInterceptor());
}

/**
*
* @Title: createUserInterceptor
* @Description: 将客户端渠道拦截器加入spring ioc容器
* @return
*/
@Bean
public UserInterceptor createUserInterceptor() {
return new UserInterceptor();
}

}


5.书写控制层

package com.ahut.action;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import com.ahut.entity.ServerMessage;

/**
*
* @ClassName: WebSocketAction
* @Description: websocket控制层
* @author cheng
* @date 2017年9月27日 下午4:20:58
*/
@Controller
public class WebSocketAction {

private Logger logger = LoggerFactory.getLogger(this.getClass());

//spring提供的发送消息模板
@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
private SimpUserRegistry userRegistry;

@RequestMapping(value = "/templateTest")
public void templateTest() {
logger.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
for (SimpUser user : userRegistry.getUsers()) {
logger.info("用户" + i++ + "---" + user);
}
//发送消息给指定用户
messagingTemplate.convertAndSendToUser("test", "/queue/message", new ServerMessage("服务器主动推的数据"));
}

}


代码分析:

SimpUserRegistry用来获取连接的客户端信息

userRegistry.getUsers()将返回一个用户列表

模拟发送信息给指定用户,浏览器访问

localhost:8080/templateTest


使用test作为连接用户名,并且订阅了/user/queue/message主题的客户端就会收到服务器主动推送的消息

查看convertAndSendToUser的源码如下:

@Override
public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers,
MessagePostProcessor postProcessor) throws MessagingException {

Assert.notNull(user, "User must not be null");
user = StringUtils.replace(user, "/", "%2F");
super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}


可以发现messagingTemplate.convertAndSendToUser(“test”, “/queue/message”, new ServerMessage(“服务器主动推的数据”));最终发送的目的地地址为:

/user/test/queue/message


若用户名中包含”/”,则替换成”%2F”

6.书写客户端

<!DOCTYPE html>
<html>

<head>
<title>stomp</title>
</head>

<body>
Welcome<br/><input id="text" type="text" />
<button onclick="send()">发送消息</button>
<button onclick="subscribe3()">订阅消息/user/queue/message</button>
<hr/>
<div id="message"></div>
</body>

<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script type="text/javascript">
// 建立连接对象(还未发起连接)
var socket = new SockJS("http://localhost:8080/webSocketServer");

// 获取 STOMP 子协议的客户端对象
var stompClient = Stomp.over(socket);

// 向服务器发起websocket连接并发送CONNECT帧
stompClient.connect(
{
name: 'test' // 携带客户端信息
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
setMessageInnerHTML("连接成功");
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
setMessageInnerHTML("连接失败");
}
);

//订阅消息
function subscribe3() {
stompClient.subscribe('/user/queue/message', function (response) {
var returnData = JSON.parse(response.body);
setMessageInnerHTML("/user/queue/message 你接收到的消息为:" + returnData.responseMessage);
});
}

//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}

</script>

</html>


代码分析:

当你的客户端连接时,他们必须提供他们的用户名:

// 向服务器发起websocket连接并发送CONNECT帧
stompClient.connect(
{
name: 'test' // 携带客户端信息
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
setMessageInnerHTML("连接成功");
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
setMessageInnerHTML("连接失败");
}
);


用户需要先订阅/user/queue/message主题,才能收到发送给自己的消息

总结:

客户端订阅:/user/queue/message

服务器推送指定用户:/user/客户端用户名/queue/message
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐