您的位置:首页 > 编程语言 > Java开发

应用程序通过WebSocket自行推送业务消息给Subscriber的实现

2017-07-12 17:01 573 查看
首先是使用 Spring Boot 构建包含 WebSocket 的工程。然后定义一个 Java-Config 的 WebSocket : 

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry.addEndpoint("/platoEndpoint")  // 客户端连接服务端的端点
.setAllowedOrigins("*") // 不设置前台连接时报 403 错误
.withSockJS(); // 开启SockJS支持
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/backend"); // 客户端订阅地址的前缀
registry.setApplicationDestinationPrefixes("/frontend"); // 客户端请求服务端的前缀
}
}

如果没有客户端需要发消息给服务端,或者懒得写一个前缀,那么 ApplicationDestinationPrefixes 也可以不设置。

剩下的就是定制一个暴露 WebSocket 接口的 Controller 即可:

/**
* @MessageMapping:需要在此 value 的值前加上WebSocketConfig注册的
* 		ApplicationDestinationPrefixes(如果有),就构成了整个请求的路径。
* @SendTo: value 是指服务端将把消息发送到订阅了这个路径的所有客户端
*
* 用法:1. 根据当前配置,客户端使用Stomp,stompClient.send("/frontend/input", {}, obj);
* 		发送 obj 给服务端,服务端调用 showLog()处理,然后将处理的结果转发给所有通过
* 		stompClient.subscribe('/backend/output', function (response) { ... })
* 		订阅了服务端暴露的接口的客户端。
* 		2. 如果服务端需要在运行时,根据需要自行把信息推送给前端,则需要使用
* 		SimpMessagingTemplate的convertAndSend()主动调用广播端口,也就是@SendTo的值
*/
@RestController
public class PlatoWebSocketController {
@MessageMapping("/request")
@SendTo("/backend/broadcast")
public String showLog(String fileName) {
return "Message:::filename=" + fileName;
}

//----------服务端自己直接调用,达到主动推送消息给客户端----------
@Autowired
private SimpMessagingTemplate template;

@RequestMapping(value = "ws/message", method = RequestMethod.POST)
public void pushMessage(@RequestBody String fileName){
template.convertAndSend("/backend/broadcast", fileName);
}
}

大多数都是将这个 Controller 直接标记为 Spring 的 @Controller, 而我需要暴露一个 RESTFul 的接口,也就是这里的pushMessage() ,所以就标记为 @RestController。在这个方法里面,借助 SimpMessagingTemplate 可直接将 Rest 请求过来的信息广播给所有订阅了 "/backend/broadcast" 的客户端。

我们的业务很简单,页面上传了一个文件包,应用程序批处理文件包里的所有文件,产生的所有日志通过 WebSocket 实时地显示在页面上,让用户知晓处理过程。程序把日志文本当作请求体,调用 pushMessage() 就可以单方面推送消息给客户端。服务端创建 Socket Server, 监听 Socket 消息的实现:

@Component
public class LogServer implements InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(LogServer.class);

@Autowired
private RestTemplate restTemp;
@Autowired
private Environment env;

@Value("${websocket.port}")
private Integer wsPort;

// 当Spring应用重启时,需要关掉当前的websocket连接释放端口,所以把websocket句柄设置为实例变量
private ServerSocket serverSocket;

@Override
public void destroy() {
logger.info("Shutdown Socket Service.........");
try {
serverSocket.close();
} catch (IOException e) {
logger.error("There is an exception when close socket::{}", e);
}
}

@Override
public void afterPropertiesSet() throws Exception {
logger.info("进入 LogServer.afterPropertiesSet() 启动 Socket 服务");

String wsServerUrl = getWebSocketServerUrl();
// 必须用线程让 socket 不占用主线程去监听端口,否则主程序没办法起来
new Thread() {
public void run() {
ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
ServerSocket serverSocket = null;
try {
serverSocket = serverSocketFactory.createServerSocket(5000);
} catch (IOException ignored) {
logger.error("Unable to create server");
System.exit(-1);
}
logger.info("LogServer running on port: {}", 5000);

while (true) {
List<String> list = new ArrayList<>();
try {
handleSocket(serverSocket, wsServerUrl, list);
} catch (Exception e) {
logger.error("Socket 线程被打断,原因是::", e);
throw new RuntimeException(e);
}
}
}
}.start();
}

private String getWebSocketServerUrl() {
// 获取应用部署的服务器,端口号,构造 pushMessage 的请求路径
}

private void handleSocket(ServerSocket serverSocket, String wsServerUrl, List<String> list) throws Exception {
try (Socket socket = serverSocket.accept()) {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line = null;
while ((line = br.readLine()) != null) {
line = line.trim();
int contentStartInd = line.indexOf(">") + 1;
int contentEndInd = line.lastIndexOf("<");

if (line.contains("message")) {
list.add(line.substring(contentStartInd, contentEndInd));
}
if (list.size() == 3) {
String message = StringUtils.join(list, " - ");

// 用RestTemplate调用封装Socket处理逻辑的REST接口
restTemp.postForEntity(wsServerUrl, message, Integer.class);

list = new ArrayList<>(); // 一条日志处理完,清空容器准备接收下一条
}
}
}
}
}

由于 Socket 服务启动后会一直监听给定的端口,占用当前线程资源,所以需要新开一个线程去做,主线程继续 Spring Boot 应用资源的加载。

客户端多开一个 Log 的 Socket 输出源,可以参照 Socket Logging 链接的实现,作为测试的 main(),需要增加一行 handler 的 close() 方法,否则会报 java.net.SocketException: Connection reset 错误。

public static void main(String argv[]) throws IOException {
final Logger logger = Logger.getLogger(Test.class.getName());
Handler handler = new SocketHandler("192.168.1.82", 5000);
logger.addHandler(handler);
logger.log(Level.SEVERE, "Hello, 中国2");
logger.log(Level.INFO, "Welcome Home");
handler.close();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring boot websocket