您的位置:首页 > 大数据 > Hadoop

flume坑之channel.transactionCapacity和HdfsSink.batchSize

2015-12-17 12:24 375 查看
  不说过程了,直接说结果!一对相连接的channel-HdfsSink,无意间配置如下:
...
agent.channels.common-channel.transactionCapacity=10
...
agent.sinks.hdfs-sink.hdfs.batchSize=20

  简单测试之后发现flume报如下异常,倒也正常……

[2015-12-17 11:42:09:694 ERROR][org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:467)]process failed
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or in creasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
[2015-12-17 11:42:09:696 ERROR][org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
... 3 more

  但是......但是老子发现停掉发送,channelSize一直不减,hdfs里的数据也一直在涨!!!而且永远停不下来,数据被永远重放!!!
  查看相关HdfsSink代码如下:

public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin();
try {
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}
......

bucketWriter.append(event);
......

transaction.commit();

if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}


  看到了嘛!异常在取第11个数据时channel.take();出现,然后此次事物被回滚,但是,但是尼玛之前取出来的10个Event都被bucketWriter.append(event);了,也就是被写到hdfs了;
然后就是最初的现象了,事物一直不断的被回滚,但部分取到的数据却也写到hdfs了,这尼玛算是什么事务……
  虽然是不合理的配置参数,但flume启动时有一大陀检测参数的代码也没检测到这些,至少给报个错或WARN嘛!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: