您的位置:首页 > 大数据

getting start with storm 翻译 第四章 part-3

2013-08-14 16:07 323 查看

转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/9966239

队列消息

第二个方法是连接你的spouts到一个排队系统,该系统将从消息发射器接收消息并将消息供spouts消费。使用排队系统的优点是它可以作为spouts和数据源之间的中间件。很多时候你可以通过排队来做多队列的消息回放。这意味着你不需要知道任何关于消息发射器的



图4-3 直接连接协调器

东西,添加和删除发射器比直接连接更简单。这种架构的问题是队列将成为你的失败点,并且你将你的处理流增加了一层。

图4-4显示了架构的模式。



图4-4 使用排队系统


你可以使用循环拉或者哈希队列(用哈希来划分消息队列并发送到spout或者建立多个队列)来在队列间并行的处理,在多个spouts间划分消息。

 

你将建立一个用Redis及它们的Java库Jedis做排队系统的示例。在这个例子中,你将建立一个日志处理器从一个未知的源收集日志,它使用lpush插入消息到队列中,使用blpop来允许你等待一个消息。如果你有多个处理,使用blpop将使你以循环的方式接收消息。

 

为从Redis中接收消息,你将使用一个在open spout中创建的线程(使用一个线程来避免锁定nextTuple方法所在的主循环)。

new Thread(newRunnable(){
@Override
public voidrun(){
while(true){
try{
Jedis client =newJedis(redisHost,redisPort);
List<String>res=client.blpop(Integer.MAX_VALUE,queues);
messages.offer(res.get(1));
}catch(Exception e){
LOG.error("Error reading queues from redis",e);
try {
Thread.sleep(100);
} catch(InterruptedException e1) {}
}
}
}
}).start()

这个线程的唯一目的是建立连接并且执行blpop命令。当消息被接收到时,它被添加到一个内部的会被nextTuple方法消费的消息队列。这里你可以看到Redis队列是源并且你不知道谁是消息的发射者以及它们的数量。


我们推荐不要在spout中创建许多线程,因为每个spout运行在一个不同的线程中。与其创建很多线程,更好的方法是增加并行度。这将以分布式的方式在storm集群中创建更多的线程。

 

在nextTuple方法中,你唯一要做的是接受消息并再次发射它们。

public voidnextTuple(){
while(!messages.isEmpty()){
collector.emit(newValues(messages.poll()));
}
}


你可以通过修改spout使其通过Redis具有重发消息的能力来使整个topology变得更加可靠。

DRPC

DPRCSpout是实现了接收来自DRPC服务器的函数调用流并且处理它的spout(见第三章中的示例)。在最常用的情况下,backtype.storm.drpc.DRPCSpout是足够的,但有可能使用来自storm包的DRPC类来创建你自己的实现。

结论

你已经看到了最常用的spout实现的模型,它们的优点,以及怎样使得消息可靠。基于你致力解决的问题来设计spout通信是重要的。没有一种适合所有topologues的架构。如果你了解源并且可以控制这些源,你可以使用直接连接,如果你需要添加未知源的能力或者从多个源接收消息的能力,使用列队连接是更好的。如果你需要在线处理,你将需要使用DRPCSpouts或者类似的实现。

尽管你已经了解了三种主要类型的连接,仍有无限的实现它的方式,这取决于你的需求。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Storm 大数据