【Flume】flume于transactionCapacity和batchSize进行详细的分析和质疑的概念
2015-09-24 16:04
357 查看
我不知道你用flume读者熟悉无论这两个概念
一开始我是有点困惑,?
没感觉到transactionCapacity的作用啊?
batchSize又是干啥的啊?
……
……
带着这些问题,我们深入源代码来看一下:
![](http://img.blog.csdn.net/20150312145457373?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2ltb25jaGk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
![](http://img.blog.csdn.net/20150312145558556?<br/><br/>watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2ltb25jaGk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
Exec Source
![](http://img.blog.csdn.net/20150312145847240?<br/><br/>watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2ltb25jaGk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
通过上面这三张图,相信大家应该知道batchSize从哪来的了
batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。
即一次性你能够处理batchSize个event,这个一次性就是指在一个事务中。
当你处理的event数量超出了batchSize。那么事务就会提交了。
注意,这里有一个隐晦的地方,就是batchSize一定不能大于transactionCapacity
![](http://img.blog.csdn.net/20150312150638277?<br/><br/>watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2ltb25jaGk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
首先。从这个图中我们就能够看出transactionCapacity这个概念的来源了,它来自于通道中。不同于batchSize(Source,Sink)
那么。在通道中是怎样使用该事务容量的呢??
内存通道中有个内部类MemoryTransaction
putList就是用来存放put操作带来的event channel的put
takeList存放的event是用来被take操作消耗的,返回拿到的一个event channel的take
两种情况:
![](http://img.blog.csdn.net/20150312153840431?<br/><br/>watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2ltb25jaGk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
由于在take操作的时候就已经将event从queue中取出了。而queue中取出的event正是靠put的提交来的
![](http://img.blog.csdn.net/20150312154159172?<br/><br/>watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2ltb25jaGk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
事务回滚针对take操作,你把event拿出去。结果处理失败了,那当然得丢回来,等待下一次处理了!
。
由于进入了rollback操作,说明commit操作出现异常,也就是commit操作失败了,那putList和takeList两个队列当然也没有被清空
不知道说。大家对这个更清楚它是否??
一开始我是有点困惑,?
没感觉到transactionCapacity的作用啊?
batchSize又是干啥的啊?
……
……
带着这些问题,我们深入源代码来看一下:
batchSize
batchSize这个概念首先它出如今哪里呢?kafkaSink的process方法
HDFS Sink
Exec Source
通过上面这三张图,相信大家应该知道batchSize从哪来的了
batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。
即一次性你能够处理batchSize个event,这个一次性就是指在一个事务中。
当你处理的event数量超出了batchSize。那么事务就会提交了。
注意,这里有一个隐晦的地方,就是batchSize一定不能大于transactionCapacity
以下再来说说transactionCapacity
首先。从这个图中我们就能够看出transactionCapacity这个概念的来源了,它来自于通道中。不同于batchSize(Source,Sink)
那么。在通道中是怎样使用该事务容量的呢??
内存通道中有个内部类MemoryTransaction
private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDeque<Event> takeList; private LinkedBlockingDeque<Event> putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque<Event>(transCapacity); takeList = new LinkedBlockingDeque<Event>(transCapacity); channelCounter = counter; }这里就用到了事务容量,它就是putList和takeList的容量大小
putList就是用来存放put操作带来的event channel的put
if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); }每一次put前,都会预判put是否成功,从异常的提示信息就能够看出来。put不成功即事务容量满了
takeList存放的event是用来被take操作消耗的,返回拿到的一个event channel的take
if(takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); }take前也会预判,假设takeList已经满了。说明take操作太慢了,出现了event堆积的现象,这时候你应该调整事务容量
什么情况下。事务会提交呢,事务提交做了什么呢??
commit即事务提交两种情况:
1、put的event提交
while(!putList.isEmpty()) { if(!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); }event所有放到queue中。queue才是真正的flume中event的队列。它的容量是capacity。看上一张图就可以。
2、take的event提交
由于在take操作的时候就已经将event从queue中取出了。而queue中取出的event正是靠put的提交来的
最后。再看看事务是怎样回滚的??
事务回滚针对take操作,你把event拿出去。结果处理失败了,那当然得丢回来,等待下一次处理了!
。
由于进入了rollback操作,说明commit操作出现异常,也就是commit操作失败了,那putList和takeList两个队列当然也没有被清空
while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); }循环将event又一次加入到queue中。
不知道说。大家对这个更清楚它是否??
相关文章推荐
- 转载:MyEclipse启动Tomcat缓慢的原因及解决办法
- [iOS]UIProgressView的高度
- 黑马程序员-继承、接口与多态
- Win7_x86_64位连接vmclient 提示vmrc控制台连接已断开
- date_format()的使用
- 深度学习入门简介
- 你刚才在淘宝上买了一件东西【技术普及贴】
- POJ 1113:Wall
- leetcode Interleaving String
- stm32的pwm学习总结
- 关于省市区,编辑页面怎么显示原来用户填写的
- SOA系列之基本概念
- android 获取对权限的选择
- Apache和Nginx运行原理解析
- POJ 1113:Wall
- AndroidDevTools下载地址
- vc++ ^ 符号
- STM32学习笔记--按键输入实验
- 【Python】BeautifulSoup文档(windows-pyhton2.7.10)
- 小强的HTML5移动开发之路(20)——HTML5 Web SQL Database