您的位置:首页 > 其它

zbus消息队列应用实例

2017-04-10 10:23 211 查看
maven:

<!-- zubs -->
<dependency>
<groupId>org.zbus</groupId>
<artifactId>zbus</artifactId>
<version>6.2.6</version>
</dependency>

生产者:

@Component
public class ZbusProducerHolder {

private static org.slf4j.Logger Logger = LoggerFactory.getLogger(ZbusProducerHolder.class);
@Value("${zbus.broker.address}")
private String mqAddress;
@Value("${zbus.broker.name}")
private String brokerName;
private Broker broker;
private Producer producer;
@PostConstruct
public void init() {
try {
BrokerConfig config=new BrokerConfig();
config.setServerAddress(mqAddress);
broker=new SingleBroker(config);
producer=new Producer(broker,brokerName);
producer.createMQ();
}catch (Exception e){

}

}

/**
* 发送
*/
public void sendMsg(String datas) throws Exception{
Message msg=new Message();
msg.setBody(datas);
producer.sendSync(msg);
}
}

消费:

@Component
public class ZbusConsumer {
private static org.slf4j.Logger Logger    = LoggerFactory.getLogger(ZbusConsumer.class);
private static final Integer    CUP_COUNT = Runtime.getRuntime().availableProcessors() * 2;

@Value("${zbus.broker.address}")
private String                  mqAddress;
private Broker                  broker;
@Value("${zbus.broker.message.send.name}")
private String                  messageSendBrokerName;
@Value("${zbus.broker.user.info.name}")
private String                  userInfoBrokerName;
@Inject
private MsgSendBiz              msgSendBiz;
@Inject
private UserInfoBiz             userInfoBiz;

@PostConstruct
public void init() {
try {
// 创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress(mqAddress);
broker = new SingleBroker(brokerConfig);

MqConfig config1 = new MqConfig();
config1.setBroker(broker);
config1.setMq(messageSendBrokerName);

MqConfig config2 = new MqConfig();
config2.setBroker(broker);
config2.setMq(userInfoBrokerName);

for (int i = 0; i < CUP_COUNT; i++) {
final Consumer c1 = new Consumer(config1);
c1.onMessage((msg, sess) -> msgSendBiz.sendTemplateMsg(msg.getBodyString()));
c1.start();

final Consumer c2 = new Consumer(config2);
c2.onMessage((msg, sess) -> userInfoBiz.saveUserInfo(msg.getBodyString()));
c2.start();
}
} catch (Exception e) {
Logger.error("zbus消费队列出错:{}", e);
}
}

@PreDestroy
public void destroy() {
try {
broker.close();
Logger.error("zbus消费队列服务停止:{}", new Date());
} catch (IOException e) {
Logger.error("zbus消费队列停止出错:{}", e);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: