flume LineDeserializer Line length exceeds max (2048), truncating line!扩大一行数据量大小的采集上限
2017-08-15 17:36
1196 查看
简介
在一次使用flume+kafka+sparkstreaming架构处理日志时,出现一个很奇怪的问题:日志中的某一行数据总会被切分成了多行,总的输出日志行数也比原始日志文件多出了几十行,导致具体的处理逻辑中出现各种错误。经过排查,定位到问题是出现在flume上。 在flume中采集的一行数据量是有大小限制的,默认为2048,即2KB,而我的日志中有些行的大小已经超过了这个限制,然后flume就会将之切分成多行。同时flume也会在采集的时候弹出警告信息:
[WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
下面是flume1.7的源码中出现问题的地方(本段代码,位于flume源码包org\apache\flume\serialization类中)。
public static final String MAXLINE_KEY = "maxLineLength";
public static final int MAXLINE_DFLT = 2048; //默认的大小
LineDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.outputCharset = Charset.forName(
context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
this.isOpen = true;
}
/********/
private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;
// FIXME: support \r\n
if (c == '\n') {
break;
}
sb.append((char)c);
if (readChars >= maxLineLength) {//这里就会进行大小判断
logger.warn("Line length exceeds max ({}), truncating line!",
maxLineLength);
break;
}
}
if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}
修改及测试
因此,接下来就可以修改源码,将上面代码中的2048,改成更大的值,我改成4096,暂时够用了。然后编译即可。我为了方便,只编译org\apache\flume\serialization这个类。也可以用maven等去编译,同时注意编译使用的JDK要和flume运行环境的JDK大版本一致。然后放入flume的core核心包里替换原有的class(可以先备份core包)
要修改的jar包:
修改值:
替换class:
接下来重新采集文件,警告信息消失,sparkstreaming中消费的数据也正常了。
在一次使用flume+kafka+sparkstreaming架构处理日志时,出现一个很奇怪的问题:日志中的某一行数据总会被切分成了多行,总的输出日志行数也比原始日志文件多出了几十行,导致具体的处理逻辑中出现各种错误。经过排查,定位到问题是出现在flume上。 在flume中采集的一行数据量是有大小限制的,默认为2048,即2KB,而我的日志中有些行的大小已经超过了这个限制,然后flume就会将之切分成多行。同时flume也会在采集的时候弹出警告信息:
[WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
下面是flume1.7的源码中出现问题的地方(本段代码,位于flume源码包org\apache\flume\serialization类中)。
public static final String MAXLINE_KEY = "maxLineLength";
public static final int MAXLINE_DFLT = 2048; //默认的大小
LineDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.outputCharset = Charset.forName(
context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
this.isOpen = true;
}
/********/
private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;
// FIXME: support \r\n
if (c == '\n') {
break;
}
sb.append((char)c);
if (readChars >= maxLineLength) {//这里就会进行大小判断
logger.warn("Line length exceeds max ({}), truncating line!",
maxLineLength);
break;
}
}
if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}
修改及测试
因此,接下来就可以修改源码,将上面代码中的2048,改成更大的值,我改成4096,暂时够用了。然后编译即可。我为了方便,只编译org\apache\flume\serialization这个类。也可以用maven等去编译,同时注意编译使用的JDK要和flume运行环境的JDK大版本一致。然后放入flume的core核心包里替换原有的class(可以先备份core包)
要修改的jar包:
修改值:
替换class:
接下来重新采集文件,警告信息消失,sparkstreaming中消费的数据也正常了。
相关文章推荐
- android:maxLines="1"没满一行显示省略号,用android:singleLine="true"解决
- flume 数据采集
- 分布式日志收集框架Flume:从指定网端口采集数据输出到控制台
- Sqoop Flume 数据采集引擎
- oracle sqlldr 数据导入错误Field in data file exceeds maximum length解决
- Flume+HBase采集和存储日志数据
- oracle sqlldr数据导入错误Field in data file exceeds maximum length解决
- 使用FineUploader 上传大文件设置IIS7文件上传的最大大小 maxAllowedContentLength,maxRequestLength
- flume采集数据到kafka和hive
- Flume采集数据到HDFS时,文件中有乱码
- Flume和Kafka完成实时数据的采集
- flume采集数据到hdfs
- Flume + HDFS Sink采集数据及如何添加第三方JAR
- Flume简介与使用(二)——Thrift Source采集数据
- flume使用(二):采集远程日志数据到MySql数据库
- 读取 XML 数据时,超出最大字符串内容长度配额 (8192)。通过更改在创建 XML 读取器时所使用的 XmlDictionaryReaderQuotas 对象的 MaxStringContentLength 属性,可增加此配额。
- 模拟nginx+flume数据采集+Kafka集群
- WCF:读取 XML 数据时,超出最大字符串内容长度配额 (8192)。通过更改在创建 XML 读取器时所使用的 XmlDictionaryReaderQuotas 对象的 MaxStringContentLength 属性,可增加此配额。
- The length of the string exceeds the value set on the maxJsonLength property
- 开源数据采集组件比较: scribe、chukwa、kafka、flume