您的位置:首页 > Web前端 > JavaScript

Jstorm与RocketMQ整合

2016-07-04 22:09 375 查看
如果是经常关注阿里巴巴的朋友们,看到我这篇博客的题目,就知道我在参加今年的中间件比赛。

好了,废话不说,开始了。

首先我们知道,rocketmq的consumer有两种,一种是DefaultMQPushConsumer另外一个是DefaultMQPullConsumer

两个有什么区别呢?

对我们自己写的代码来说,使用push就是被动接受mq的消息,而使用pull就是需要主动的去mq上拉取消息。

那么再与jstorm集成的时候,选择哪个呢?

我最开始选择的是pull,后来遇到各种问题,放弃了。

选择push才是真爱呀。

具体怎么做呢?

在spot的open里初始化DefaultMQPushConsumer,registerMessageListener的时候填入自己,当然我们的spot实现了MessageListenerConcurrently,

然后在spot里面的consumeMessage里面写自己的逻辑,合适的时候,用collector发射消息就是了。

能上点干货么?

public class EmitPaymentSpot extends BaseRichSpout
implements MessageListenerConcurrently{
private static final long serialVersionUID = -3085994102089532269L;
private SpoutOutputCollector collector;
private transient DefaultMQPushConsumer consumer;

@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context,	SpoutOutputCollector collector) {
log.error("init DefaultMQPushConsumer");
consumer = new DefaultMQPushConsumer(RaceConfig.MetaConsumerGroup);

//   consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("ip:port")

try {
consumer.subscribe(RaceConfig.MqTmallTradeTopic, "*");
consumer.subscribe(RaceConfig.MqTaobaoTradeTopic, "*");
consumer.subscribe(RaceConfig.MqPayTopic, "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener(this);
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}

log.error("Consumer Started.");
this.collector = collector;
}

@Override
public void nextTuple() {
//do nothing
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//...
}

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {

for (MessageExt msg : msgs) {
byte[] body = msg.getBody();
if (body.length == 2 && body[0] == 0 && body[1] == 0) {

log.error("Got the end signal");
collector.emit("stop",new Values("stop"));
continue;
}
if (msg.getTopic().equals(RaceConfig.MqPayTopic)) {
return doPayTopic(body);
}else if (msg.getTopic().equals(RaceConfig.MqTaobaoTradeTopic)) {
putTaobaoTradeToTair(body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else if (msg.getTopic().equals(RaceConfig.MqTmallTradeTopic)) {
putTmallTradeToTair(body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}


当然还有第二种方式,是官方推荐的。

其实我觉得和我的方式差不多..

就是在生成consumer的时候使用工厂模式而已。

相关的代码,比较麻烦大家见
https://github.com/alibaba/jstorm/blob/master/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: