您的位置:首页 > 编程语言 > Java开发

spring mvc 4整合spring websocket,点对点式通讯,stomp监听连接

2017-04-20 15:49 369 查看
spring boot整合spring websocket和本例子一致,只是不需要在xml加入扫描配置(需要引入相应的依赖)

先引入spring websocket需要的两个依赖

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>4.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.1.5.RELEASE</version>
</dependency>


在mvc 的xml加入一下代码,扫描注解方式的配置

<!--扫描配置-->
<context:component-scan base-package="com.demo.websocket.config" />

配置类

webSocketConfig:

import com.demo.websocket.config.HttpSessionIdHandshakeInterceptor;
import com.demo.websocket.config.PresenceChannelInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

/**
* web socket配置
* Created by earl on 2017/4/11.
*/
@Configuration
//开启对websocket的支持,使用stomp协议传输代理消息,
// 这时控制器使用@MessageMapping和@RequestMaping一样
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

/**
*  服务器要监听的端口,message会从这里进来,要对这里加一个Handler
*  这样在网页中就可以通过websocket连接上服务了
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
//注册stomp的节点,映射到指定的url,并指定使用sockjs协议
stompEndpointRegistry.addEndpoint("/contactChatSocket").withSockJS().setInterceptors(httpSessionIdHandshakeInterceptor()); ;
}

//配置消息代理
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// queue、topic、user代理
registry.enableSimpleBroker("/queue", "/topic","/user");
registry.setUserDestinationPrefix("/user/");
}

/**
* 消息传输参数配置
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(8192) //设置消息字节数大小
.setSendBufferSizeLimit(8192)//设置消息缓存大小
.setSendTimeLimit(10000); //设置消息发送时间限制毫秒
}

/**
* 输入通道参数设置
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(4) //设置消息输入通道的线程池线程数
.maxPoolSize(8)//最大线程数
.keepAliveSeconds(60);//线程活动时间
registration.setInterceptors(presenceChannelInterceptor());
}

/**
* 输出通道参数设置
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
registration.setInterceptors(presenceChannelInterceptor());
}

@Bean
public HttpSessionIdHandshakeInterceptor httpSessionIdHandshakeInterceptor() {
return new HttpSessionIdHandshakeInterceptor();
}

@Bean
public PresenceChannelInterceptor presenceChannelInterceptor() {
return new PresenceChannelInterceptor();
}

}

HttpSessionIdHandshakeInterceptor:

点对对式发送消息到某一个用户,需要把某一个用户的id存放在session中(项目使用accountId表示用户的唯一性,已在登录的时候,把这个accountId保存到HttpSession中),这里直接获取accountId就可以

import com.demo.websocket.constants.Constants;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import javax.servlet.http.HttpSession;
import java.util.Map;

/**
* websocket握手(handshake)接口
* Created by earl on 2017/4/17.
*/
@Component
public class HttpSessionIdHandshakeInterceptor extends HttpSessionHandshakeInterceptor  {

@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
//解决The extension [x-webkit-deflate-frame] is not supported问题
if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) {
request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate");
}
//检查session的值是否存在
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession(false);
String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID);
//把session和accountId存放起来
attributes.put(Constants.SESSIONID, session.getId());
attributes.put(Constants.SKEY_ACCOUNT_ID, accountId);
}
return super.beforeHandshake(request, response, wsHandler, attributes);
}

@Override
public void afterHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}

}


监听用户连接情况PresenceChannelInterceptor:

说明:这里用到的CacheManager的缓存技术是ehcache,大家请结合实际项目保存。(CacheManager代码就不给了)

import club.yelv.component.CacheManager;
import com.demo.websocket.constants.CacheConstant;
import com.demo.websocket.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.stereotype.Component;

/**
* stomp连接处理类
* Created by earl on 2017/4/19.
*/
@Component
public class PresenceChannelInterceptor extends ChannelInterceptorAdapter {

private static final Logger logger = LoggerFactory.getLogger(PresenceChannelInterceptor.class);

@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);
// ignore non-STOMP messages like heartbeat messages
if(sha.getCommand() == null) {
return;
}
//这里的sessionId和accountId对应HttpSessionIdHandshakeInterceptor拦截器的存放key
String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString();
String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString();
//判断客户端的连接状态
switch(sha.getCommand()) {
case CONNECT:
connect(sessionId,accountId);
break;
case CONNECTED:
break;
case DISCONNECT:
disconnect(sessionId,accountId,sha);
break;
default:
break;
}
}

//连接成功
private void connect(String sessionId,String accountId){
logger.debug(" STOMP Connect [sessionId: " + sessionId + "]");
//存放至ehcache
String cacheName = CacheConstant.WEBSOCKET_ACCOUNT;
//若在多个浏览器登录,直接覆盖保存
CacheManager.put(cacheName ,cacheName+accountId,sessionId);
}

//断开连接
private void disconnect(String sessionId,String accountId, StompHeaderAccessor sha){
logger.debug("STOMP Disconnect [sessionId: " + sessionId + "]");
sha.getSessionAttributes().remove(Constants.SESSIONID);
sha.getSessionAttributes().remove(Constants.SKEY_ACCOUNT_ID);
//ehcache移除
String cacheName = CacheConstant.WEBSOCKET_ACCOUNT;
if (CacheManager.containsKey(cacheName,cacheName+accountId) ){
CacheManager.remove(cacheName ,cacheName+accountId);
}

}

}


StompSubscribeEventListener:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;

/**
* 监听订阅地址的用户
*/
@Component
public class StompSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {

private static final Logger logger = LoggerFactory.getLogger(StompSubscribeEventListener.class);

@Override
public void onApplicationEvent(SessionSubscribeEvent sessionSubscribeEvent) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionSubscribeEvent.getMessage());
//这里的sessionId对应HttpSessionIdHandshakeInterceptor拦截器的存放key
// String sessionId = headerAccessor.getSessionAttributes().get(Constants.SESSIONID).toString();
logger.info("stomp Subscribe : "+headerAccessor.getMessageHeaders()  );
}
}


常量类:

public class Constants {

private Constants(){}

/**
* SessionId
*/
public static final String SESSIONID = "sessionid";

/**
* Session对象Key, 用户id
*/
public static final String SKEY_ACCOUNT_ID = "accountId";

}

/**
* 缓存常量名称
* Created by earl on 2017/4/17.
*/
public class CacheConstant {

private CacheConstant(){}

/**
* websocket用户accountId
*/
public static final String WEBSOCKET_ACCOUNT = "websocket_account";

}


Controller:

下面的两个方法不要放到service层,亲测在controller层才能够使用

//通过SimpMessagingTemplate向用户发送消息
@Autowiredprivate
SimpMessagingTemplate messagingTemplate;

/**
* 发送消息
*@MessageMapping是stomp提供发送消息的注解,类似@RequestMappeing
* @param json json文本
*/
@MessageMapping("/sendChatMsg")
public void sendChatMsg( String json)throws Exception{
if (StringUtils.isNotBlank(json) ) {
//json转换为实体,JsonUtils代码不给了,自己到网上拿一个
MsgWeixin po = JsonUtil.toObject(json, MsgWeixin.class);
/*浏览器用户通讯订阅的地址.拼接聊天对话id,准确将消息发到某一个聊天中
*防止一个对话能接收多个联系人的消息
*/
String chatAddress = "/queue/contactMessage" + po.getChatId();
//保存到数据库
msgWeixinService .insert();

//若缓存已不存在accountId,证明是连接断开,不推送消息
String cacheName = CacheConstant.WEBSOCKET_ACCOUNT;
if (!CacheManager.containsKey(cacheName,cacheName+po.getToUserId()) ){
return ;
}
//向用户发送消息,第一个参数是接收人、第二个参数是浏览器订阅的地址,第三个是消息本身
messagingTemplate.convertAndSendToUser(po.getFromUserId(),chatAddress,po.getContent());
}
}

//处理发送消息的错误,json是sendChatMsg方法的参数,传递到这里
@MessageExceptionHandler
public  void handleExceptions(Exception e,String json) {
log.error("Error handling message: " + e.getMessage());
MsgWeixin po = JsonUtil.toObject(json,MsgWeixin.class);
String errorJson = ResultUtil.getError(ResultCode.UN_KNOW_EXCEPTION);
//把错误信息发回给发送人
messagingTemplate.convertAndSendToUser( po.getFromUserId(),"/queue/contactErrors"+po.getChatId(),errorJson);
}


消息实体:

MsgWeixin(请根据项目修改)

public class MsgWeixin  {

private Date createtime  ;
private   String schoolId  ;
private   String wxMsgId   ;
private   String toUserId   ;
private   String appId  ;
private   String content ;
private   String fromUserId  ;
private   String status  ;
private   String chatId  ;
private   String msgType  ;

public Date getCreatetime() {
return createtime;
}

public void setCreatetime(Date createtime) {
this.createtime = createtime;
}

public String getSchoolId() {
return schoolId;
}

public void setSchoolId(String schoolId) {
this.schoolId = schoolId;
}

public String getWxMsgId() {
return wxMsgId;
}

public void setWxMsgId(String wxMsgId) {
this.wxMsgId = wxMsgId;
}

public String getToUserId() {
return toUserId;
}

public void setToUserId(String toUserId) {
this.toUserId = toUserId;
}

public String getAppId() {
return appId;
}

public void setAppId(String appId) {
this.appId = appId;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public String getFromUserId() {
return fromUserId;
}

public void setFromUserId(String fromUserId) {
this.fromUserId = fromUserId;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public String getChatId() {
return chatId;
}

public void setChatId(String chatId) {
this.chatId = chatId;
}

public String getMsgType() {
return msgType;
}

public void setMsgType(String msgType) {
this.msgType = msgType;
}

}


前端页面JS代码(需引进jquery)

说明:

${???}等都是后台页面模板语言的参数值

${resLocal!} :资源路径

${accountId}:后台设置好的当前用户id,spring mvc设置到页面

${chatId}:后台设置好的聊天对话的id

${receiverId}:接收人用户id


<script src="${resLocal!}js/plugins/socket/sockjs.js"></script>
<script src="${resLocal!}js/plugins/socket/stomp.js"></script>

<script type="text/javascript">

var stomp,sock;

$(document).ready(function () {
if (window.WebSocket){
websocketConfig();
} else {
alert("错误","浏览器不支持websocket技术通讯.");
}
});

//websocket配置
function websocketConfig() {
//连接url为endpointChat的endpoint,对应后台WebSoccketConfig的配置
sock = new SockJS("${webPath}contactChatSocket");
sockHandle();
stompConfig();
}

function stompConfig() {
//使用STOMP子协议的websocket客户端
stomp = Stomp.over(sock);
stomp.connect({accountId:'${accountId}'}, function(frame) {
//这里的订阅地址和controller的convertAndSendToUser地址一致,多出的user必须的,指的是用户之间发送消息
//这里的地址和后台的Controller sendChatMsg方法的chatAddress 一致
stomp.subscribe("/user/${accountId}/queue/contactMessage${chatId}", function (message) {
//处理响应消息
var json = JSON.parse(message.body);
console.log(json);

});

//订阅处理错误的消息地址,接收错误消息
stomp.subscribe("/user/${accountId}/queue/contactErrors${chatId}", function (message) {
var response = JSON.parse(message.body);
var error = response.error;
alert("发送失败"+error);
});
});
}

//发送消息
function sendMessage() {
//文本框的内容
var content =  $("#content").val();
if ($.trim($("#content").val()).length <= 0){
return ;
}
//拼接成json传到后台
var data = {
chatId:'${chatId}',
schoolId:'${schoolId}',
toUserId:'${receiverId}',
content:content,
fromUserId:'${accountId}',
appId:'${appId}',
msgType:1
};
//发送
stomp.send("/sendChatMsg",{},JSON.stringify(data) );
}

function sockHandle() {
sock.onopen = function () {
console.log("------连接成功------");
};
sock.onmessage = function (event) {
console.log('-------Received: ' + event.data);
};
sock.onclose = function (event) {
console.log('--------Info: connection closed.------');
};
//连接发生错误
sock.onerror = function () {
alert("连接错误", "网络超时或通讯地址错误.");
disconnect();
} ;
}

//关闭websocket
function disconnect() {
if (sock != null) {
sock.close();
sock = null;
}
}


发生的错误说明

1: 若页面初始化websocket出现错误,打开浏览器的控制台看地址带有info地址,是否显示200。

2: 若不是200证明是路径不对,针对后台websocketConfig配置类和页面的连接地址进行检查。

比如:

websocketConfig配置类

stompEndpointRegistry.addEndpoint("/contactChatSocket")

页面

${webPath}:是后台设置好的路径,请修改项目配置路径

        var sock = new SockJS("${webPath}contactChatSocket");

3: 若是200,后面还是报错,检查浏览器的页面地址端口是否填上。

4: 若是200连接成功,发送消息没有提示是否成功或者失败的(没有接收到消息的)


如:



解决方法

请在后台用logback的debug级别打印spring websocket,会有一个详细的地址,检查页面接收消息的地址是否和这个地址一样。如

后台日志:

15:24:51.394 [clientInboundChannel-2] DEBUG org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler - Processing MESSAGE destination=/user/2016091817033128922/queue/contactMessage2017041515541301674
session=null payload={"isSelf":true,"content":"15","msgType":1,"createtime":"2017-04-20 15:24:51.383"...(truncated)

15:24:51.395 [clientInboundChannel-2] DEBUG org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler - Processing MESSAGE destination=/user/2017021210143438633/queue/contactMessage2017041515541301674
session=null payload={"isSelf":false,"content":"15","msgType":1,"createtime":"2017-04-20 15:24:51.383...(truncated)

蓝色标记部分对应

stomp.subscribe事件的address

stomp.subscribe(address, function (message) {}

);

检查蓝色标记部分和address是否一致,若一致,就可以接收其他用户发送过来的消息

 5:运行项目时,异常信息含有

(spring mvc整合才有可能发生的异常,spring boot没有)

<async-supported>true</async-supported>

这段代码等字样,在web.xml的<filter>和<servlet>两个标签里面加入这段代码就可以解决


<async-supported>true</async-supported>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐