您的位置:首页 > 编程语言 > Java开发

[ Spring Boot ] 整合 Websocket 实现消息推送框架的设计笔记

2017-12-14 09:44 1121 查看

前段时间,项目中用Websocket实现了一套后台向前端推送的Service层搭建,感兴趣的童鞋可以了解下^_^

Maven pom

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Configuration

package cn.com.showclear.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

/**
* Websocket 整合Springboot 开发
* <p>
* Target:
* > 1、预案删除后台推送预案运行界面自动刷新
*
* @author YF-XIACHAOYANG
* @date 2017/11/23 13:49
*/
@Configuration
//EnableWebSocketMessageBroker注解表示开启使用STOMP协议来传输基于代理的消息,Broker就是代理的意思。
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

/**
* 注册STOMP协议节点,同时指定使用SockJS协议
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/endpointSang").withSockJS();
}

/**
* 配置消息代理,由于我们是实现推送功能,这里的消息代理是/msg/...
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/msg/");
}
}

Controller

消息体
1、RequestMessage
package cn.com.showclear.common.utils;

/**
* 浏览器发送消息的接收类
* 浏览器发送来的消息用这个类来接收
* @author YF-XIACHAOYANG
* @date 2017/11/23 14:30
*/
public class RequestMessage {
private String name;

public String getName() {
return name;
}
}
2、ResponseMessage
package cn.com.showclear.common.utils;

/**
* 响应消息类
* 服务器返回给浏览器的消息由这个类来承载
* @author YF-XIACHAOYANG
* @date 2017/11/23 14:30
*/
public class ResponseMessage {
private String responseMessage;

public ResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}

public String getResponseMessage() {
return responseMessage;
}
}
请求控制器
package cn.com.showclear.common.controller;

import cn.com.showclear.common.utils.RequestMessage;
import cn.com.showclear.common.utils.ResponseMessage;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

/**
* Websocket 请求控制器
* 负责后台接收推送过来的消息并完成推送的交互
* @author YF-XIACHAOYANG
* @date 2017/11/23 14:26
*/
@Controller
public class WsController {

//接收主题
@MessageMapping("/hello")
//推送主题
@SendTo("/msg/resp")
public ResponseMessage say(RequestMessage message) {
System.out.println(message.getName());
return new ResponseMessage("hello," + message.getName() + " !");
}
}

前端脚本

1、STOMP协议的客户端脚本stomp.js、
2、SockJS的客户端脚本sock.js
3、jQuery
建立连接和主题推送
/*连接、订阅主题回调*/
function connect() {
var socket = new SockJS('/endpointSang');
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log('Connected:' + frame);
stompClient.subscribe('/msg/resp', function (response) {
showResponse(JSON.parse(response.body).responseMessage);
})
});
}

/*推送*/
function sendName() {
var name = $('#name').val();
console.log('name:' + name);
stompClient.send("/hello", {}, JSON.stringify({'name': name}));
}

推送框架设计

不使用@SendTo注解,通过SimpMessagingTemplate完成消息推送服务层的搭建,
1、@SendTo 适合放在WebsocketListener[@Controller]中监听指定指定消息代理并完成任务转发
2、notifyService 适合在不同控制层[@Controller]中直接完成推送服务

一、后端推送服务搭建

service
/**
* 消息推送服务
*/
interface NOTIFY{
/**
* 通知消息
* @param noticeVO
*/
void notice(NoticeVO noticeVO);
}
impl
package cn.com.showclear.plan.impl.common;

import cn.com.showclear.plan.pojo.common.NoticeVO;
import cn.com.showclear.plan.service.common.BaseServices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

/**
* 消息推送服务
* @author YF-XIACHAOYANG
* @date 2017/11/23 15:36
*/
@Service
public class NotificationServiceImpl implements BaseServices.NOTIFY {

private static final Logger log = LoggerFactory.getLogger(NotificationServiceImpl.class);

@Autowired
private SimpMessagingTemplate messagingTemplate;

/**
* 通知消息发送
*
* @param noticeVO
*/
@Override
public void notice(NoticeVO noticeVO) {
try {
messagingTemplate.convertAndSend(noticeVO.getSubject(), noticeVO.getData());
} catch (Exception e) {
log.error("notice msg error.", e);
}
}
}
NoticeVO 消息体基本对象
package cn.com.showclear.plan.pojo.common;

/**
* 消息通知对象
*
* @author LuLihong
* @date 2017-08-30
**/
public class NoticeVO {
private final String subject;
private final Object data;

public NoticeVO(String subject, Object data) {
this.subject = subject;
this.data = data;
}

public Object getData() {
return data;
}

public String getSubject() {
return subject;
}
}
eg
......

@Autowired
private BaseServices.NOTIFY notifyService;

......

/**
* 预案结束
*
* @return
*/
@RequestMapping(value = "/finishPlan", method = RequestMethod.POST)
public RespMapJson finishPlan(Integer planReId) {
DataSourceTypeManager.set(DataSources.PLAN);
RespMapJson resp = psmRunService.finishPlan(planReId);
if (resp.getCode() == 0) {
//通知预案在运行界面刷新
resp.put("isRefresh", true);
notifyService.notice(new NoticeVO(NoticeSubject.MSG_REFRESH, resp.getResp()));
}
return resp;
}

二、前端推送管理和回调监听

amd & require
/**
* require 通用配置
* @author Yiyuery
*/
require.config({
baseUrl: window.main.contextPath,
paths: {

jquery: "js/lib/jquery/jquery-1.9.1",

//websocket
stomp: "js/websocket/stomp",
sockjs: "js/websocket/sockjs.min",

//ws-utils
'scooper-notice': 'js/scooper/scooper.notice',
'msg-ws':'js/websocket/msg-websocket',

//提示组件
layer: 'js/lib/layer/layer',

//自定义组件
capsule: 'js/lib/capsule/capsule.util',
},
/*定义模块依赖*/
shim: {
layer: { deps: ['jquery'] },
capsule: { deps: ['jquery', 'layer', 'pager'] }
}
});
推送管理
/**
* Created by LuLihong on 2017/8/30.
*/
window.scooper = window.scooper || {};
/**
* 消息通知
* @type {{}}
*/
window.scooper.notice = {
/**
* 主题
*/
subjects: {
/**预案运行数目变更通知主题*/
refresh: '/msg/refresh',
/**测试监听主题**/
test:'/msg/test',
/**测试通道占用**/
topic:'/topic/resp'
},

/**
* 主题监听器
*/
listeners: {},

/**
* 获取需要监听的主题
* @returns {[*,*,*]}
*/
getSubjects: function() {
return [this.subjects.refresh,this.subjects.test,this.subjects.topic];
},

/**
* 添加监听器
* @param subject
* @param listener
*/
addListener: function(subject, listener) {
this.listeners[subject] = listener;
},

/**
* 删除监听器
* @param subject
*/
removeListener: function(subject) {
delete this.listeners[subject];
},

/**
* 获取监听器
* @param subject
* @returns {*}
*/
getListener: function(subject) {
return this.listeners[subject];
},

/**
* 接收到通知
* @param subject
* @param notice
*/
recvNotice: function(subject, notice) {
window.console.log('recv notice: ' + subject + '=' + notice);
var listener = this.listeners[subject];
if (listener) {
var noticeObj = JSON.parse(notice);
listener(noticeObj);

}
}
};
推送核心模块
/**
* 获取后台的通知消息,以websocket方式获取。
* Created by LuLihong on 2017/8/30.
*/
define(['jquery', 'scooper-notice', 'stomp', 'sockjs'], function ($) {

var stompClient = null;

/**
* 定义模块
* @type {{}}
*/
var MSGWS = {
/**
* 创建SocKJS实例并获取websocket连接
*/
connect: function (fn) {
//申明连接的SockJS的endpoint名称:与后台WebsocketConfig保持一致
var socket = new SockJS(window.main.contextPath + '/endpointSang', null, {rtt: 5000});
//使用STOMP来创建WebSocket客户端
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
var subjects = window.scooper.notice.getSubjects();
$.each(subjects, function (i, subject) {
/**
* 订阅/msg/resp等主题发送来的消息,分发事件
* Controller中的say方法上添加的@SendTo注解的参数
* stompClient中的send方法表示发送一条消息到服务端
*/
stompClient.subscribe(subject, function (resp) {
window.scooper.notice.recvNotice(subject, resp.body);
});
});
});
if (fn instanceof Function) {
fn();
}
},
/**
* 断开连接
*/
disconnect: function () {
if (stompClient != null) {
stompClient.disconnect();
}

window.console.log('Disconnected');
},
/**
* 检查连接状态
*/
checkState: function () {
if (stompClient == null || !stompClient.connected) {
MSGWS.connect();
}
},
/**
* 连接保持定时监听
*/
keepListenerTimer: function () {
setInterval(MSGWS.checkState, 5000);
},
/**
* 初始化
*/
init: function () {
MSGWS.connect(function () {
MSGWS.keepListenerTimer();
});

}
};

/*立即执行函数,完成连接*/
MSGWS.init();

/**
* 对外开放部分接口
*/
return {
/**
* 关闭连接
*/
disconnect: MSGWS.disconnect
}
});
注册回调监听
/*头部引入模块*/
define(["require", "exports", "jquery", "avalon", "capsule", "layer", 'msg-ws'], function (require, exports, $, avalon, capsule, layer, msgWs)

/**
* 注册websocket回调主题
*/
function regWebSocketListener() {
window.scooper.notice.addListener('/msg/refresh', wsNotify);
},

/**
* websocket消息通知
* @param msg
*/
function wsNotify(isRefresh) {
if (isRefresh) {
layer.msg('执行中预案数目发生变更,正在重新加载...');
capsule.baseUtil.delay(2, function () {
window.location.reload();
});
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: