应用程序通过WebSocket自行推送业务消息给Subscriber的实现
2017-07-12 17:01
573 查看
首先是使用 Spring Boot 构建包含 WebSocket 的工程。然后定义一个 Java-Config 的 WebSocket :
如果没有客户端需要发消息给服务端,或者懒得写一个前缀,那么 ApplicationDestinationPrefixes 也可以不设置。
剩下的就是定制一个暴露 WebSocket 接口的 Controller 即可:
大多数都是将这个 Controller 直接标记为 Spring 的 @Controller, 而我需要暴露一个 RESTFul 的接口,也就是这里的pushMessage() ,所以就标记为 @RestController。在这个方法里面,借助 SimpMessagingTemplate 可直接将 Rest 请求过来的信息广播给所有订阅了 "/backend/broadcast" 的客户端。
我们的业务很简单,页面上传了一个文件包,应用程序批处理文件包里的所有文件,产生的所有日志通过 WebSocket 实时地显示在页面上,让用户知晓处理过程。程序把日志文本当作请求体,调用 pushMessage() 就可以单方面推送消息给客户端。服务端创建 Socket Server, 监听 Socket 消息的实现:
@Component
public class LogServer implements InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(LogServer.class);
@Autowired
private RestTemplate restTemp;
@Autowired
private Environment env;
@Value("${websocket.port}")
private Integer wsPort;
// 当Spring应用重启时,需要关掉当前的websocket连接释放端口,所以把websocket句柄设置为实例变量
private ServerSocket serverSocket;
@Override
public void destroy() {
logger.info("Shutdown Socket Service.........");
try {
serverSocket.close();
} catch (IOException e) {
logger.error("There is an exception when close socket::{}", e);
}
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("进入 LogServer.afterPropertiesSet() 启动 Socket 服务");
String wsServerUrl = getWebSocketServerUrl();
// 必须用线程让 socket 不占用主线程去监听端口,否则主程序没办法起来
new Thread() {
public void run() {
ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
ServerSocket serverSocket = null;
try {
serverSocket = serverSocketFactory.createServerSocket(5000);
} catch (IOException ignored) {
logger.error("Unable to create server");
System.exit(-1);
}
logger.info("LogServer running on port: {}", 5000);
while (true) {
List<String> list = new ArrayList<>();
try {
handleSocket(serverSocket, wsServerUrl, list);
} catch (Exception e) {
logger.error("Socket 线程被打断,原因是::", e);
throw new RuntimeException(e);
}
}
}
}.start();
}
private String getWebSocketServerUrl() {
// 获取应用部署的服务器,端口号,构造 pushMessage 的请求路径
}
private void handleSocket(ServerSocket serverSocket, String wsServerUrl, List<String> list) throws Exception {
try (Socket socket = serverSocket.accept()) {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line = null;
while ((line = br.readLine()) != null) {
line = line.trim();
int contentStartInd = line.indexOf(">") + 1;
int contentEndInd = line.lastIndexOf("<");
if (line.contains("message")) {
list.add(line.substring(contentStartInd, contentEndInd));
}
if (list.size() == 3) {
String message = StringUtils.join(list, " - ");
// 用RestTemplate调用封装Socket处理逻辑的REST接口
restTemp.postForEntity(wsServerUrl, message, Integer.class);
list = new ArrayList<>(); // 一条日志处理完,清空容器准备接收下一条
}
}
}
}
}
由于 Socket 服务启动后会一直监听给定的端口,占用当前线程资源,所以需要新开一个线程去做,主线程继续 Spring Boot 应用资源的加载。
客户端多开一个 Log 的 Socket 输出源,可以参照 Socket Logging 链接的实现,作为测试的 main(),需要增加一行 handler 的 close() 方法,否则会报 java.net.SocketException: Connection reset 错误。
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { stompEndpointRegistry.addEndpoint("/platoEndpoint") // 客户端连接服务端的端点 .setAllowedOrigins("*") // 不设置前台连接时报 403 错误 .withSockJS(); // 开启SockJS支持 } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/backend"); // 客户端订阅地址的前缀 registry.setApplicationDestinationPrefixes("/frontend"); // 客户端请求服务端的前缀 } }
如果没有客户端需要发消息给服务端,或者懒得写一个前缀,那么 ApplicationDestinationPrefixes 也可以不设置。
剩下的就是定制一个暴露 WebSocket 接口的 Controller 即可:
/** * @MessageMapping:需要在此 value 的值前加上WebSocketConfig注册的 * ApplicationDestinationPrefixes(如果有),就构成了整个请求的路径。 * @SendTo: value 是指服务端将把消息发送到订阅了这个路径的所有客户端 * * 用法:1. 根据当前配置,客户端使用Stomp,stompClient.send("/frontend/input", {}, obj); * 发送 obj 给服务端,服务端调用 showLog()处理,然后将处理的结果转发给所有通过 * stompClient.subscribe('/backend/output', function (response) { ... }) * 订阅了服务端暴露的接口的客户端。 * 2. 如果服务端需要在运行时,根据需要自行把信息推送给前端,则需要使用 * SimpMessagingTemplate的convertAndSend()主动调用广播端口,也就是@SendTo的值 */ @RestController public class PlatoWebSocketController { @MessageMapping("/request") @SendTo("/backend/broadcast") public String showLog(String fileName) { return "Message:::filename=" + fileName; } //----------服务端自己直接调用,达到主动推送消息给客户端---------- @Autowired private SimpMessagingTemplate template; @RequestMapping(value = "ws/message", method = RequestMethod.POST) public void pushMessage(@RequestBody String fileName){ template.convertAndSend("/backend/broadcast", fileName); } }
大多数都是将这个 Controller 直接标记为 Spring 的 @Controller, 而我需要暴露一个 RESTFul 的接口,也就是这里的pushMessage() ,所以就标记为 @RestController。在这个方法里面,借助 SimpMessagingTemplate 可直接将 Rest 请求过来的信息广播给所有订阅了 "/backend/broadcast" 的客户端。
我们的业务很简单,页面上传了一个文件包,应用程序批处理文件包里的所有文件,产生的所有日志通过 WebSocket 实时地显示在页面上,让用户知晓处理过程。程序把日志文本当作请求体,调用 pushMessage() 就可以单方面推送消息给客户端。服务端创建 Socket Server, 监听 Socket 消息的实现:
@Component
public class LogServer implements InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(LogServer.class);
@Autowired
private RestTemplate restTemp;
@Autowired
private Environment env;
@Value("${websocket.port}")
private Integer wsPort;
// 当Spring应用重启时,需要关掉当前的websocket连接释放端口,所以把websocket句柄设置为实例变量
private ServerSocket serverSocket;
@Override
public void destroy() {
logger.info("Shutdown Socket Service.........");
try {
serverSocket.close();
} catch (IOException e) {
logger.error("There is an exception when close socket::{}", e);
}
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("进入 LogServer.afterPropertiesSet() 启动 Socket 服务");
String wsServerUrl = getWebSocketServerUrl();
// 必须用线程让 socket 不占用主线程去监听端口,否则主程序没办法起来
new Thread() {
public void run() {
ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
ServerSocket serverSocket = null;
try {
serverSocket = serverSocketFactory.createServerSocket(5000);
} catch (IOException ignored) {
logger.error("Unable to create server");
System.exit(-1);
}
logger.info("LogServer running on port: {}", 5000);
while (true) {
List<String> list = new ArrayList<>();
try {
handleSocket(serverSocket, wsServerUrl, list);
} catch (Exception e) {
logger.error("Socket 线程被打断,原因是::", e);
throw new RuntimeException(e);
}
}
}
}.start();
}
private String getWebSocketServerUrl() {
// 获取应用部署的服务器,端口号,构造 pushMessage 的请求路径
}
private void handleSocket(ServerSocket serverSocket, String wsServerUrl, List<String> list) throws Exception {
try (Socket socket = serverSocket.accept()) {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line = null;
while ((line = br.readLine()) != null) {
line = line.trim();
int contentStartInd = line.indexOf(">") + 1;
int contentEndInd = line.lastIndexOf("<");
if (line.contains("message")) {
list.add(line.substring(contentStartInd, contentEndInd));
}
if (list.size() == 3) {
String message = StringUtils.join(list, " - ");
// 用RestTemplate调用封装Socket处理逻辑的REST接口
restTemp.postForEntity(wsServerUrl, message, Integer.class);
list = new ArrayList<>(); // 一条日志处理完,清空容器准备接收下一条
}
}
}
}
}
由于 Socket 服务启动后会一直监听给定的端口,占用当前线程资源,所以需要新开一个线程去做,主线程继续 Spring Boot 应用资源的加载。
客户端多开一个 Log 的 Socket 输出源,可以参照 Socket Logging 链接的实现,作为测试的 main(),需要增加一行 handler 的 close() 方法,否则会报 java.net.SocketException: Connection reset 错误。
public static void main(String argv[]) throws IOException { final Logger logger = Logger.getLogger(Test.class.getName()); Handler handler = new SocketHandler("192.168.1.82", 5000); logger.addHandler(handler); logger.log(Level.SEVERE, "Hello, 中国2"); logger.log(Level.INFO, "Welcome Home"); handler.close(); }
相关文章推荐
- iPhone消息推送机制(Push)实现及通过.net应用程序发送消息给ios应用程序
- RabbitMQ订阅发布的消息,通过WebSocket实现数据实时推送到前端
- iPhone消息推送机制(Push)实现及通过.net应用程序发送消息给ios应用程序
- iPhone消息推送机制(Push)实现及通过.net应用程序发送消息给ios应用程序
- RabbitMQ订阅发布的消息,通过WebSocket实现数据实时推送到前端上显
- iPhone消息推送机制(Push)实现及通过.net应用程序发送消息给ios应用程序
- android利用WebSocket实现消息推送
- android通过服务实现消息推送
- 通过Socket.IO与nodeJs实现即时消息推送
- resin4.0.44+websocket 实现私信功能服务端消息推送
- websocket实现android消息推送
- android通过服务实现消息推送
- Spring+Websocket集群实现广告消息的推送
- Grails 中使用 grails-events-push 实现 ajax/comet/websocket 消息推送
- 使用EventSource实现页面消息推送 与 websocket 的区别
- 一种通过xmpp实现离线消息推送的方法及系统
- 利用websocket实现android消息推送
- Android 通过消息栏实现应用程序前后台切换效果Notification
- 利用websocket实现安卓消息推送
- Spring+Websocket实现消息的推送