spring mvc 4整合spring websocket,点对点式通讯,stomp监听连接
2017-04-20 15:49
369 查看
spring boot整合spring websocket和本例子一致,只是不需要在xml加入扫描配置(需要引入相应的依赖)
先引入spring websocket需要的两个依赖
在mvc 的xml加入一下代码,扫描注解方式的配置
配置类
webSocketConfig:
HttpSessionIdHandshakeInterceptor:
点对对式发送消息到某一个用户,需要把某一个用户的id存放在session中(项目使用accountId表示用户的唯一性,已在登录的时候,把这个accountId保存到HttpSession中),这里直接获取accountId就可以
监听用户连接情况PresenceChannelInterceptor:
说明:这里用到的CacheManager的缓存技术是ehcache,大家请结合实际项目保存。(CacheManager代码就不给了)
StompSubscribeEventListener:
常量类:
Controller:
下面的两个方法不要放到service层,亲测在controller层才能够使用
消息实体:
MsgWeixin(请根据项目修改)
前端页面JS代码(需引进jquery)
说明:
${???}等都是后台页面模板语言的参数值
${resLocal!} :资源路径
${accountId}:后台设置好的当前用户id,spring mvc设置到页面
${chatId}:后台设置好的聊天对话的id
${receiverId}:接收人用户id
发生的错误说明
1: 若页面初始化websocket出现错误,打开浏览器的控制台看地址带有info地址,是否显示200。
2: 若不是200证明是路径不对,针对后台websocketConfig配置类和页面的连接地址进行检查。
比如:
websocketConfig配置类
stompEndpointRegistry.addEndpoint("/contactChatSocket")
页面
${webPath}:是后台设置好的路径,请修改项目配置路径
var sock = new SockJS("${webPath}contactChatSocket");
3: 若是200,后面还是报错,检查浏览器的页面地址端口是否填上。
4: 若是200连接成功,发送消息没有提示是否成功或者失败的(没有接收到消息的)
如:
解决方法:
请在后台用logback的debug级别打印spring websocket,会有一个详细的地址,检查页面接收消息的地址是否和这个地址一样。如
后台日志:
15:24:51.394 [clientInboundChannel-2] DEBUG org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler - Processing MESSAGE destination=/user/2016091817033128922/queue/contactMessage2017041515541301674
session=null payload={"isSelf":true,"content":"15","msgType":1,"createtime":"2017-04-20 15:24:51.383"...(truncated)
15:24:51.395 [clientInboundChannel-2] DEBUG org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler - Processing MESSAGE destination=/user/2017021210143438633/queue/contactMessage2017041515541301674
session=null payload={"isSelf":false,"content":"15","msgType":1,"createtime":"2017-04-20 15:24:51.383...(truncated)
蓝色标记部分对应
stomp.subscribe事件的address:
stomp.subscribe(address, function (message) {}
);
检查蓝色标记部分和address是否一致,若一致,就可以接收其他用户发送过来的消息
5:运行项目时,异常信息含有
(spring mvc整合才有可能发生的异常,spring boot没有)
<async-supported>true</async-supported>
这段代码等字样,在web.xml的<filter>和<servlet>两个标签里面加入这段代码就可以解决
<async-supported>true</async-supported>
先引入spring websocket需要的两个依赖
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>4.1.5.RELEASE</version> </dependency>
在mvc 的xml加入一下代码,扫描注解方式的配置
<!--扫描配置--> <context:component-scan base-package="com.demo.websocket.config" />
配置类
webSocketConfig:
import com.demo.websocket.config.HttpSessionIdHandshakeInterceptor; import com.demo.websocket.config.PresenceChannelInterceptor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.*; /** * web socket配置 * Created by earl on 2017/4/11. */ @Configuration //开启对websocket的支持,使用stomp协议传输代理消息, // 这时控制器使用@MessageMapping和@RequestMaping一样 @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { /** * 服务器要监听的端口,message会从这里进来,要对这里加一个Handler * 这样在网页中就可以通过websocket连接上服务了 */ @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { //注册stomp的节点,映射到指定的url,并指定使用sockjs协议 stompEndpointRegistry.addEndpoint("/contactChatSocket").withSockJS().setInterceptors(httpSessionIdHandshakeInterceptor()); ; } //配置消息代理 @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // queue、topic、user代理 registry.enableSimpleBroker("/queue", "/topic","/user"); registry.setUserDestinationPrefix("/user/"); } /** * 消息传输参数配置 */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { registry.setMessageSizeLimit(8192) //设置消息字节数大小 .setSendBufferSizeLimit(8192)//设置消息缓存大小 .setSendTimeLimit(10000); //设置消息发送时间限制毫秒 } /** * 输入通道参数设置 */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(4) //设置消息输入通道的线程池线程数 .maxPoolSize(8)//最大线程数 .keepAliveSeconds(60);//线程活动时间 registration.setInterceptors(presenceChannelInterceptor()); } /** * 输出通道参数设置 */ @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(4).maxPoolSize(8); registration.setInterceptors(presenceChannelInterceptor()); } @Bean public HttpSessionIdHandshakeInterceptor httpSessionIdHandshakeInterceptor() { return new HttpSessionIdHandshakeInterceptor(); } @Bean public PresenceChannelInterceptor presenceChannelInterceptor() { return new PresenceChannelInterceptor(); } }
HttpSessionIdHandshakeInterceptor:
点对对式发送消息到某一个用户,需要把某一个用户的id存放在session中(项目使用accountId表示用户的唯一性,已在登录的时候,把这个accountId保存到HttpSession中),这里直接获取accountId就可以
import com.demo.websocket.constants.Constants; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; import javax.servlet.http.HttpSession; import java.util.Map; /** * websocket握手(handshake)接口 * Created by earl on 2017/4/17. */ @Component public class HttpSessionIdHandshakeInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { //解决The extension [x-webkit-deflate-frame] is not supported问题 if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) { request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate"); } //检查session的值是否存在 if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; HttpSession session = servletRequest.getServletRequest().getSession(false); String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID); //把session和accountId存放起来 attributes.put(Constants.SESSIONID, session.getId()); attributes.put(Constants.SKEY_ACCOUNT_ID, accountId); } return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { super.afterHandshake(request, response, wsHandler, ex); } }
监听用户连接情况PresenceChannelInterceptor:
说明:这里用到的CacheManager的缓存技术是ehcache,大家请结合实际项目保存。(CacheManager代码就不给了)
import club.yelv.component.CacheManager; import com.demo.websocket.constants.CacheConstant; import com.demo.websocket.constants.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.stereotype.Component; /** * stomp连接处理类 * Created by earl on 2017/4/19. */ @Component public class PresenceChannelInterceptor extends ChannelInterceptorAdapter { private static final Logger logger = LoggerFactory.getLogger(PresenceChannelInterceptor.class); @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) { StompHeaderAccessor sha = StompHeaderAccessor.wrap(message); // ignore non-STOMP messages like heartbeat messages if(sha.getCommand() == null) { return; } //这里的sessionId和accountId对应HttpSessionIdHandshakeInterceptor拦截器的存放key String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString(); String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString(); //判断客户端的连接状态 switch(sha.getCommand()) { case CONNECT: connect(sessionId,accountId); break; case CONNECTED: break; case DISCONNECT: disconnect(sessionId,accountId,sha); break; default: break; } } //连接成功 private void connect(String sessionId,String accountId){ logger.debug(" STOMP Connect [sessionId: " + sessionId + "]"); //存放至ehcache String cacheName = CacheConstant.WEBSOCKET_ACCOUNT; //若在多个浏览器登录,直接覆盖保存 CacheManager.put(cacheName ,cacheName+accountId,sessionId); } //断开连接 private void disconnect(String sessionId,String accountId, StompHeaderAccessor sha){ logger.debug("STOMP Disconnect [sessionId: " + sessionId + "]"); sha.getSessionAttributes().remove(Constants.SESSIONID); sha.getSessionAttributes().remove(Constants.SKEY_ACCOUNT_ID); //ehcache移除 String cacheName = CacheConstant.WEBSOCKET_ACCOUNT; if (CacheManager.containsKey(cacheName,cacheName+accountId) ){ CacheManager.remove(cacheName ,cacheName+accountId); } } }
StompSubscribeEventListener:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionSubscribeEvent; /** * 监听订阅地址的用户 */ @Component public class StompSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> { private static final Logger logger = LoggerFactory.getLogger(StompSubscribeEventListener.class); @Override public void onApplicationEvent(SessionSubscribeEvent sessionSubscribeEvent) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionSubscribeEvent.getMessage()); //这里的sessionId对应HttpSessionIdHandshakeInterceptor拦截器的存放key // String sessionId = headerAccessor.getSessionAttributes().get(Constants.SESSIONID).toString(); logger.info("stomp Subscribe : "+headerAccessor.getMessageHeaders() ); } }
常量类:
public class Constants { private Constants(){} /** * SessionId */ public static final String SESSIONID = "sessionid"; /** * Session对象Key, 用户id */ public static final String SKEY_ACCOUNT_ID = "accountId"; } /** * 缓存常量名称 * Created by earl on 2017/4/17. */ public class CacheConstant { private CacheConstant(){} /** * websocket用户accountId */ public static final String WEBSOCKET_ACCOUNT = "websocket_account"; }
Controller:
下面的两个方法不要放到service层,亲测在controller层才能够使用
//通过SimpMessagingTemplate向用户发送消息 @Autowiredprivate SimpMessagingTemplate messagingTemplate; /** * 发送消息 *@MessageMapping是stomp提供发送消息的注解,类似@RequestMappeing * @param json json文本 */ @MessageMapping("/sendChatMsg") public void sendChatMsg( String json)throws Exception{ if (StringUtils.isNotBlank(json) ) { //json转换为实体,JsonUtils代码不给了,自己到网上拿一个 MsgWeixin po = JsonUtil.toObject(json, MsgWeixin.class); /*浏览器用户通讯订阅的地址.拼接聊天对话id,准确将消息发到某一个聊天中 *防止一个对话能接收多个联系人的消息 */ String chatAddress = "/queue/contactMessage" + po.getChatId(); //保存到数据库 msgWeixinService .insert(); //若缓存已不存在accountId,证明是连接断开,不推送消息 String cacheName = CacheConstant.WEBSOCKET_ACCOUNT; if (!CacheManager.containsKey(cacheName,cacheName+po.getToUserId()) ){ return ; } //向用户发送消息,第一个参数是接收人、第二个参数是浏览器订阅的地址,第三个是消息本身 messagingTemplate.convertAndSendToUser(po.getFromUserId(),chatAddress,po.getContent()); } } //处理发送消息的错误,json是sendChatMsg方法的参数,传递到这里 @MessageExceptionHandler public void handleExceptions(Exception e,String json) { log.error("Error handling message: " + e.getMessage()); MsgWeixin po = JsonUtil.toObject(json,MsgWeixin.class); String errorJson = ResultUtil.getError(ResultCode.UN_KNOW_EXCEPTION); //把错误信息发回给发送人 messagingTemplate.convertAndSendToUser( po.getFromUserId(),"/queue/contactErrors"+po.getChatId(),errorJson); }
消息实体:
MsgWeixin(请根据项目修改)
public class MsgWeixin { private Date createtime ; private String schoolId ; private String wxMsgId ; private String toUserId ; private String appId ; private String content ; private String fromUserId ; private String status ; private String chatId ; private String msgType ; public Date getCreatetime() { return createtime; } public void setCreatetime(Date createtime) { this.createtime = createtime; } public String getSchoolId() { return schoolId; } public void setSchoolId(String schoolId) { this.schoolId = schoolId; } public String getWxMsgId() { return wxMsgId; } public void setWxMsgId(String wxMsgId) { this.wxMsgId = wxMsgId; } public String getToUserId() { return toUserId; } public void setToUserId(String toUserId) { this.toUserId = toUserId; } public String getAppId() { return appId; } public void setAppId(String appId) { this.appId = appId; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getFromUserId() { return fromUserId; } public void setFromUserId(String fromUserId) { this.fromUserId = fromUserId; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getChatId() { return chatId; } public void setChatId(String chatId) { this.chatId = chatId; } public String getMsgType() { return msgType; } public void setMsgType(String msgType) { this.msgType = msgType; } }
前端页面JS代码(需引进jquery)
说明:
${???}等都是后台页面模板语言的参数值
${resLocal!} :资源路径
${accountId}:后台设置好的当前用户id,spring mvc设置到页面
${chatId}:后台设置好的聊天对话的id
${receiverId}:接收人用户id
<script src="${resLocal!}js/plugins/socket/sockjs.js"></script> <script src="${resLocal!}js/plugins/socket/stomp.js"></script> <script type="text/javascript"> var stomp,sock; $(document).ready(function () { if (window.WebSocket){ websocketConfig(); } else { alert("错误","浏览器不支持websocket技术通讯."); } }); //websocket配置 function websocketConfig() { //连接url为endpointChat的endpoint,对应后台WebSoccketConfig的配置 sock = new SockJS("${webPath}contactChatSocket"); sockHandle(); stompConfig(); } function stompConfig() { //使用STOMP子协议的websocket客户端 stomp = Stomp.over(sock); stomp.connect({accountId:'${accountId}'}, function(frame) { //这里的订阅地址和controller的convertAndSendToUser地址一致,多出的user必须的,指的是用户之间发送消息 //这里的地址和后台的Controller sendChatMsg方法的chatAddress 一致 stomp.subscribe("/user/${accountId}/queue/contactMessage${chatId}", function (message) { //处理响应消息 var json = JSON.parse(message.body); console.log(json); }); //订阅处理错误的消息地址,接收错误消息 stomp.subscribe("/user/${accountId}/queue/contactErrors${chatId}", function (message) { var response = JSON.parse(message.body); var error = response.error; alert("发送失败"+error); }); }); } //发送消息 function sendMessage() { //文本框的内容 var content = $("#content").val(); if ($.trim($("#content").val()).length <= 0){ return ; } //拼接成json传到后台 var data = { chatId:'${chatId}', schoolId:'${schoolId}', toUserId:'${receiverId}', content:content, fromUserId:'${accountId}', appId:'${appId}', msgType:1 }; //发送 stomp.send("/sendChatMsg",{},JSON.stringify(data) ); } function sockHandle() { sock.onopen = function () { console.log("------连接成功------"); }; sock.onmessage = function (event) { console.log('-------Received: ' + event.data); }; sock.onclose = function (event) { console.log('--------Info: connection closed.------'); }; //连接发生错误 sock.onerror = function () { alert("连接错误", "网络超时或通讯地址错误."); disconnect(); } ; } //关闭websocket function disconnect() { if (sock != null) { sock.close(); sock = null; } }
发生的错误说明
1: 若页面初始化websocket出现错误,打开浏览器的控制台看地址带有info地址,是否显示200。
2: 若不是200证明是路径不对,针对后台websocketConfig配置类和页面的连接地址进行检查。
比如:
websocketConfig配置类
stompEndpointRegistry.addEndpoint("/contactChatSocket")
页面
${webPath}:是后台设置好的路径,请修改项目配置路径
var sock = new SockJS("${webPath}contactChatSocket");
3: 若是200,后面还是报错,检查浏览器的页面地址端口是否填上。
4: 若是200连接成功,发送消息没有提示是否成功或者失败的(没有接收到消息的)
如:
解决方法:
请在后台用logback的debug级别打印spring websocket,会有一个详细的地址,检查页面接收消息的地址是否和这个地址一样。如
后台日志:
15:24:51.394 [clientInboundChannel-2] DEBUG org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler - Processing MESSAGE destination=/user/2016091817033128922/queue/contactMessage2017041515541301674
session=null payload={"isSelf":true,"content":"15","msgType":1,"createtime":"2017-04-20 15:24:51.383"...(truncated)
15:24:51.395 [clientInboundChannel-2] DEBUG org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler - Processing MESSAGE destination=/user/2017021210143438633/queue/contactMessage2017041515541301674
session=null payload={"isSelf":false,"content":"15","msgType":1,"createtime":"2017-04-20 15:24:51.383...(truncated)
蓝色标记部分对应
stomp.subscribe事件的address:
stomp.subscribe(address, function (message) {}
);
检查蓝色标记部分和address是否一致,若一致,就可以接收其他用户发送过来的消息
5:运行项目时,异常信息含有
(spring mvc整合才有可能发生的异常,spring boot没有)
<async-supported>true</async-supported>
这段代码等字样,在web.xml的<filter>和<servlet>两个标签里面加入这段代码就可以解决
<async-supported>true</async-supported>
相关文章推荐
- Spring4.3.3 WebSocket-STOMP协议集成 (1)-WebSocket协议通讯小栗子
- Spring4.3.3 WebSocket-STOMP协议集成 (2)-WebSocket-stomp子协议通讯小栗子
- spring websocket stomp 连接开启了用户名和密码认证的ActiveMQ
- Spring4.3.3 WebSocket-STOMP协议集成 (1.1)-WebSocket协议通讯小栗子
- Spring+STOMP实现WebSocket广播订阅、权限认证、一对一通讯(附源码)
- Spring4.3.3 WebSocket-STOMP协议集成 (2.1)-WebSocket-stomp子协议通讯小栗子
- spring+websocket整合(springMVC+spring+MyBatis即SSM框架和websocket技术的整合)
- springMVC+MyBatis+Spring 整合(4) ---解决Spring MVC 对AOP不起作用的问题
- spring与ibatis的整合及多数据库连接的解决方案
- spring与ibatis的整合及多数据库连接
- java框架整合例子(spring、spring mvc、spring data jpa、hibernate)
- spring监听与IBM MQ JMS整合
- 整合spring mvc和mybatis,spring,maven的pom脚本
- spring mvc系列文章 - springmvc spring mybatis ibatis freemark整合开发(1.0版)
- spring+websocket整合(springMVC+spring+MyBatis即SSM框架和websocket技术的整合)
- spring+websocket整合(springMVC+spring+MyBatis即SSM框架和websocket技术的整合)
- 基于注解的Spring MVC整合Hibernate(所需jar包,spring和Hibernate整合配置,springMVC配置,重定向,批量删除)
- Activemq Spring 嵌入整合及通过数据库来验证连接权限
- spring3 jsp页面使用<form:form modelAttribute="xxxx" action="xxxx">报错,附连接数据库的spring MVC annotation 案例
- spring 3 + hibernate 3 + spring mvc 整合入门