您的位置:首页 > 其它

Storm入门到精通(五)----Storm Grouping机制详解

2018-03-01 10:49 501 查看
所谓的grouping策略就是在Spout与Bolt、Bolt与Bolt之间传递Tuple的方式。总共有七种方式:

      1)shuffleGrouping(随机分组)

      2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)

      3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)

      4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)

      5)noneGrouping(随机分派)

      6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)

      7)Local or shuffle Grouping

      8)customGrouping (自定义的Grouping)
参考资料1:

shuffleGrouping

将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

fieldsGrouping

这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。

All grouping

广播发送, 对于每一个tuple将会复制到每一个bolt中处理。

Global grouping

Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。

None grouping

不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。

Direct grouping

由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)


fieldsGrouping

上面的资料我摘抄自:http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

如果你了解Storm,我想你能明白其中的大多数Grouping。这里的Grouping策略我想着重介绍一下fieldsGrouping,也最难理解的。

fieldsGrouping是按照数据中字段Field的值分组的。下面是我的测试代码:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2);
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
.fieldsGrouping("words", new Fields("word"));


测试的例子Spout是Storm自带的例子,Blot代码如下:
public void execute(Tuple tuple) {
log.info("rev a message: " + tuple.getString(0));
collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}


Storm自带的例子Spout能随机的返回
new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”};
列表中的几个字符串。这也是测试FieldGroup的好例子。

按照我最早做Storm开始前的理解,既然是按照Field分组,那么是所有相同的Field值得数据都会到达一个Blot的。我测试很多次,其结果并不是这样,一个Blot会收到多个不同的值。我没有仔细探究Storm这样分组有什么特别的地方,以至于自己对Storm的学习停滞了很长时间。

Storm能保证所有相同Field值的数据到达的是相同的Blot,但是不保证一个Blot只处理一个值域。

也就是说,所有值是nathan能到达到一个Blot,但是到达同一个Blot的值可能有多个,如"nathan”, “mike"的数据都到达。

理解到这点上,fieldsGrouping就算是理解了。

下面是测试日志:
9144 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9234 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9245 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9335 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9346 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9437 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9447 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9537 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9548 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9639 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9649 [Thread-35-exclaim2] INFO  cn.poin
fb9f
tways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9740 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9749 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9841 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9850 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda


由上面的日志可以看出,golda这个值的数据,的确归并到一个Blot处理的。线程编号:Thread-26-exclaim2。 其它值也都是相同值都是在一个线程内被处理的。

参考资料2:

最近研究Storm的Stream Grouping的时候,对Field Grouping和Shuffle Grouping理解不是很透彻。去看WordCountTopology也不怎么理解,后来脑洞一开,加了一行代码再次运行,彻底顿悟。只能说自己对Storm的基本概念还是没吃透啊。(WordCountTopology这个例子请自行参考Storm-Starter)

[java] view
plaincopy



public void execute(Tuple tuple, BasicOutputCollector collector) {  

    String word = tuple.getString(0);  

  

    // 添加这行代码的作用是看看值相等的word是不是同一个实例执行的,实时证明确实如此  

    System.out.println(this + "====" + word);  

      

    Integer count = counts.get(word);  

    if (count == null)  

        count = 0;  

    count++;  

    counts.put(word, count);  

    collector.emit(new Values(word, count));  

}  

经过反复测试,下面是我个人的一些总结,如果有缺少或者错误我会及时改正。

官方文档里有这么一句话:“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”

一个task就是一个处理逻辑的实例,所以fields能根据tuple
stream的id,也就是下面定义的xxx

[java] view
plaincopy



public void declareOutputFields(OutputFieldsDeclarer declarer) {  

        declarer.declare(new Fields("xxx"));  

}  

xxx所代表的具体内容会由某一个task来处理,并且同一个xxx对应的内容,处理这个内容的task实例是同一个。

【关联到Strom里面Field的概念】

比如说:

bolt第一次emit三个流,即xxx有luonq pangyang qinnl三个值,假设分别建立三个task实例来处理:

[plain] view
plaincopy



luonq -> instance1  

pangyang -> instance2  

qinnl -> instance3  

然后第二次emit四个流,即xxx有luonq qinnanluo py pangyang四个值,假设还是由刚才的三个task实例来处理:

[plain] view
plaincopy



luonq -> instance1  

qinnanluo -> instance2  

py -> instance3  

pangyang -> instance2  

然后第三次emit两个流,即xxx有py qinnl两个值,假设还是由刚才的三个task实例来处理:

[plain] view
plaincopy



py -> instance3  

qinnl -> instance3  

最后我们看看三个task实例都处理了哪些值,分别处理了多少次:

instance1: luonq(处理2次)
instance2: pangyang(处理2次) qinnanluo(处理1次)
instance3: qinnl(处理2次) py(处理2次)

结论:
1. emit发出的值第一次由哪个task实例处理是随机的,此后再次出现这个值,就固定由最初处理他的那个task实例再次处理,直到topology结束

2. 一个task实例可以处理多个emit发出的值

3. 和shuffle Grouping的区别就在于,当emit发出同样的值时,处理他的task是随机的
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: