ZBus消息中间件和WebSocket的联合使用
2017-09-29 14:46
471 查看
1、ZBusconfig.java, zbus的启动、生产、回调处理消息的方法。
2、WebSocketStompConfig.java, websocket的配置
3、页面javascript调用代码如下:
package com.accenture.icc.zbus.config; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.zbus.broker.Broker; import org.zbus.broker.ZbusBroker; import org.zbus.mq.Consumer; import org.zbus.mq.Consumer.ConsumerHandler; import org.zbus.mq.MqAdmin; import org.zbus.mq.Producer; import org.zbus.net.http.Message; import com.accenture.icc.pojo.AnalogInputData; import com.accenture.icc.pojo.DataUnit; import com.accenture.icc.pojo.TripleDataWrapper; @Configuration @PropertySource("classpath:data.properties") public class ZbusConfig { private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class); @Value("${zbus.mq.sub}") private String subMq; @Value("${zbus.mq.recv}") private String recvMq; @Value("${zbus.mq.alarm}") private String alarmMq; @Value("${zbus.mq.subCommonData}") private String subIndexDataMq; @Value("${zbus.mq.recvCommonData}") private String indexDataMq; @Value("${csv.power.maxsize}") private int powerMaxListSize; @Value("${csv.power.period}") private int powerPeriod; @Value("${csv.consumption.maxsize}") private int consumptionMaxListSize; @Value("${csv.consumption.period}") private int consumptionPeriod; @Autowired private TripleDataWrapper powerWrapper; @Autowired private TripleDataWrapper consumptionWrapper; @Autowired private TripleDataWrapper transformerWrapper; @Autowired private TripleDataWrapper powerFactorWrapper; @Autowired private TripleDataWrapper unbalanceWrapper; @Autowired private TripleDataWrapper stationParamWrapper; /** * 回调函数 * @param messaging * @return */ @Bean public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandler = new ConsumerHandler() { @Override public void handle(Message msg, Consumer consumer) throws IOException { logger.info("RECEIVING MESSAGE: {}", msg.getBodyString()); String[] fields = msg.getBodyString().split(","); if(fields.length < 5){ return; } String tag = fields[0]; int tableId = 0; int recordId = 0; int fieldId = 0; double value = 0; String timeStamp = ""; try{ tableId = Integer.parseInt(fields[1]); recordId = Integer.parseInt(fields[2]); fieldId = Integer.parseInt(fields[3]); value = Double.parseDouble(fields[4]); if(value < 0.01 || value == 128) { value = 0; } timeStamp = fields[6]; } catch (NumberFormatException e) { logger.info("message format error."); return; } AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp); if(powerWrapper.containsRecord(recordId)){ List<AnalogInputData> datalist = powerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(consumptionWrapper.containsRecord(recordId)){ List<AnalogInputData> datalist = consumptionWrapper.getDataListByRecordid(recordId); if(datalist.size() == consumptionMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(transformerWrapper.containsRecord(recordId)) { List<AnalogInputData> datalist = transformerWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(powerFactorWrapper.containsRecord(recordId)) { List<AnalogInputData> datalist = powerFactorWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } if(unbalanceWrapper.containsRecord(recordId)) { List<AnalogInputData> datalist = unbalanceWrapper.getDataListByRecordid(recordId); if(datalist.size() == powerMaxListSize) { datalist.remove(0); } datalist.add(analogInputData); } broadcastAnalog(recordId, messaging); } }; return consumerHandler; } private void broadcastAnalog(int recordId, SimpMessageSendingOperations messaging) { Map<String, Object> resMap = new HashMap<>(); resMap.put("status", 1000); int groupid = -1; int factoryid = -1; List<DataUnit> dataUnits = null; String destination = ""; if(powerWrapper.containsRecord(recordId)){ groupid = powerWrapper.getGroupIdByRecord(recordId); dataUnits = powerWrapper.getDataUnitsByGroupid(groupid); factoryid = powerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(consumptionWrapper.containsRecord(recordId)){ groupid = consumptionWrapper.getGroupIdByRecord(recordId); dataUnits = consumptionWrapper.getDataUnitsByGroupid(groupid); factoryid = consumptionWrapper.getFactoryIdByRecord(recordId); destination = "/topic/consumption/"+factoryid+"/1"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(transformerWrapper.containsRecord(recordId)) { groupid = transformerWrapper.getGroupIdByRecord(recordId); dataUnits = transformerWrapper.getDataUnitsByGroupid(groupid); factoryid = transformerWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/2"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(powerFactorWrapper.containsRecord(recordId)) { groupid = powerFactorWrapper.getGroupIdByRecord(recordId); dataUnits = powerFactorWrapper.getDataUnitsByGroupid(groupid); factoryid = powerFactorWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/3"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } if(unbalanceWrapper.containsRecord(recordId)) { groupid = unbalanceWrapper.getGroupIdByRecord(recordId); dataUnits = unbalanceWrapper.getDataUnitsByGroupid(groupid); factoryid = unbalanceWrapper.getFactoryIdByRecord(recordId); destination = "/topic/analogs/"+factoryid+"/4"; resMap.put("dataUnits", dataUnits); resMap.put("groupid", groupid); messaging.convertAndSend(destination, resMap); } } /** * @return * @throws IOException */ @Bean(destroyMethod="close") public Broker broker(@Value("${zbus.address}") String zbusAddress) throws IOException { Broker broker = new ZbusBroker(zbusAddress); return broker; } /** * @param consumerHandler * @return * @throws IOException * @throws InterruptedException */ @Bean(initMethod="start") public Consumer zbusConsumer(Broker broker, ConsumerHandler consumerHandler) throws IOException, InterruptedException { MqAdmin mqAdmin = new MqAdmin(broker, recvMq); mqAdmin.removeMQ(); Consumer consumer = new Consumer(broker, recvMq); consumer.onMessage(consumerHandler); return consumer; } @Bean(initMethod="start") public Consumer zbusConsumerAlarm(Broker broker, ConsumerHandler consumerHandlerAlarm) throws IOException, InterruptedException { MqAdmin mqAdminAlarm = new MqAdmin(broker, alarmMq); mqAdminAlarm.removeMQ(); Consumer consumerAlarm = new Consumer(broker, alarmMq); consumerAlarm.onMessage(consumerHandlerAlarm); return consumerAlarm; } @Bean(initMethod="start") public Consumer zbusConsumerIndexData(Broker broker, ConsumerHandler consumerHandlerIndexData) throws IOException, InterruptedException{ MqAdmin mqAdmin2 = new MqAdmin(broker, indexDataMq); mqAdmin2.removeMQ(); Consumer consumerIndexData = new Consumer(broker, indexDataMq); consumerIndexData.onMessage(consumerHandlerIndexData); return consumerIndexData; } /** * d560 @return * @throws IOException * @throws InterruptedException */ @Bean public Producer zbusProducer(Broker broker) throws IOException, InterruptedException { List<Integer> powerRecords = powerWrapper.getRecordIds(); List<Integer> consumptionRecords = consumptionWrapper.getRecordIds(); List<Integer> transformerRecords = transformerWrapper.getRecordIds(); List<Integer> stationParamRecords = stationParamWrapper.getRecordIds(); Producer producer = new Producer(broker, subMq); producer.createMQ(); subscribeData(producer, powerRecords, powerPeriod); subscribeData(producer, consumptionRecords, consumptionPeriod); subscribeData(producer, transformerRecords, powerPeriod); producer.setMq(subIndexDataMq); producer.createMQ(); subscribeData(producer, stationParamRecords, powerPeriod); return producer; } /** * @param producer * @param records * @param period * @throws IOException * @throws InterruptedException */ private void subscribeData(Producer producer, List<Integer> records, int period) throws IOException, InterruptedException { for(Integer recordid : records) { long periodMillis = period * 60000; String msgbody = String.format("sub:%d,101,%d,7,%d", recordid, recordid, periodMillis); Message message = new Message(); message.setBody(msgbody); message = producer.sendSync(message); } } @Bean public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() { return new PropertySourcesPlaceholderConfigurer(); } }
2、WebSocketStompConfig.java, websocket的配置
package com.accenture.icc.zbus.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; /** * STOMP 配置 */ @Configuration @EnableWebSocketMessageBroker public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer{ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/message").setAllowedOrigins("*").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic"); registry.setApplicationDestinationPrefixes("/app"); } @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); container.setMaxTextMessageBufferSize(5242880); //5MB container.setMaxBinaryMessageBufferSize(5242880); return container; } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { super.configureWebSocketTransport(registration); registration.setMessageSizeLimit(5242880);//5mb, default 64kb registration.setSendBufferSizeLimit(5242880); } }
3、页面javascript调用代码如下:
<script type="text/javascript" src="<%=basePath%>resources/js/sockjs.min.js"></script> <script type="text/javascript" src="<%=basePath%>resources/js/stomp.min.js"></script>
function initUnbalanceDataStomp(){ var sockurl = "<%=basePath %>views/message"; var socket = new SockJS(sockurl); var stompClient = Stomp.over(socket); // callback function to be called when stomp client is connected to server var connectCallback = function() { console.log('connected!'); // 订阅analog数据 stompClient.subscribe('/topic/analogs/'+'${sessionScope.station.rtuId}'+'/4', function(data){ var body = JSON.parse(data.body); if(body.status==1000) { console.log("do some thing"); } }); }; // callback function to be called when stomp client could not connect to server var errorCallback = function(error) { // display the error's message header: console.log(error); }; stompClient.connect("guest", "guest", connectCallback, errorCallback); }
相关文章推荐
- 【Java消息中间件】Java消息中间件( 第4章 使用activemq - 安装activemq )
- WebSocket使用:整合Spring实现消息推送
- 以中间件,路由,跨进程事件的姿势使用WebSocket--Node.js篇
- [中间件] 消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用
- 消息中间件 ActiveMQ的简单使用
- Java消息中间件学习笔记七 -- Spring中使用JMS
- 使用Websocket实现消息推送(上)
- 详解在Spring Boot框架下使用WebSocket实现消息推送
- 在Spring Boot框架下使用WebSocket实现消息推送
- 在Spring Boot框架下使用WebSocket实现消息推送
- android 使用 websocket 进行长链接的一个简单的 demo,可以用来收发消息或别的操作,里面用到了 autobahn 的 jar 包
- 使用websocket 实现即时消息
- 在Spring Boot框架下使用WebSocket实现消息推送
- spring boot / cloud (九) 使用rabbitmq消息中间件
- 在Spring Boot框架下使用WebSocket实现消息推送
- SpringBoot使用RabbitMQ做消息中间件
- JMS消息中间件原理及ActiveMQ使用方法
- struts2框架如何使用websocket实现web程序消息推送
- 【转载】使用 Apache MINA2 实现 Web 系统的消息中间件
- Apache ActiveMQ消息中间件的基本使用