Flume-ng生产环境实践(二)flume-ng 测试过程中event丢失部分body数据
2012-12-07 16:27
495 查看
经过测试发现,当source端单event的body数据大于16字节后,输出到目标只剩下16字节。进过多源代码的分析,发现,源代码中进行了截取。
在LoggerSink.java中:
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
}
我们去看EventHelper.java的dumpEvent方法:
private static final int DEFAULT_MAX_BYTES = 16;
public static String dumpEvent(Event event) {
return dumpEvent(event, DEFAULT_MAX_BYTES);
}
public static String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
if (event == null || event.getBody() == null) {
buffer.append("null");
} else if (event.getBody().length == 0) {
// do nothing... in this case, HexDump.dump() will throw an exception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
HexDump.dump(data, 0, out, 0);
String hexDump = new String(out.toByteArray());
// remove offset since it's not relevant for such a small dataset
if(hexDump.startsWith(HEXDUMP_OFFSET)) {
hexDump = hexDump.substring(HEXDUMP_OFFSET.length());
}
buffer.append(hexDump);
} catch (Exception e) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("Exception while dumping event", e);
}
buffer.append("...Exception while dumping: ").append(e.getMessage());
}
String result = buffer.toString();
if(result.endsWith(EOL) && buffer.length() > EOL.length()) {
buffer.delete(buffer.length() - EOL.length(), buffer.length()).toString();
}
}
return "{ headers:" + event.getHeaders() + " body:" + buffer + " }";
}
不难看出,在event处理过程中,发生了数据截取操作。
在LoggerSink.java中:
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
}
我们去看EventHelper.java的dumpEvent方法:
private static final int DEFAULT_MAX_BYTES = 16;
public static String dumpEvent(Event event) {
return dumpEvent(event, DEFAULT_MAX_BYTES);
}
public static String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
if (event == null || event.getBody() == null) {
buffer.append("null");
} else if (event.getBody().length == 0) {
// do nothing... in this case, HexDump.dump() will throw an exception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
HexDump.dump(data, 0, out, 0);
String hexDump = new String(out.toByteArray());
// remove offset since it's not relevant for such a small dataset
if(hexDump.startsWith(HEXDUMP_OFFSET)) {
hexDump = hexDump.substring(HEXDUMP_OFFSET.length());
}
buffer.append(hexDump);
} catch (Exception e) {
if(LOGGER.isInfoEnabled()) {
LOGGER.info("Exception while dumping event", e);
}
buffer.append("...Exception while dumping: ").append(e.getMessage());
}
String result = buffer.toString();
if(result.endsWith(EOL) && buffer.length() > EOL.length()) {
buffer.delete(buffer.length() - EOL.length(), buffer.length()).toString();
}
}
return "{ headers:" + event.getHeaders() + " body:" + buffer + " }";
}
不难看出,在event处理过程中,发生了数据截取操作。
相关文章推荐
- Flume-ng生产环境实践(二)flume-ng 测试过程中event丢失部分body数据
- Flume-ng生产环境实践(一)Flume-ng生产环境编译
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng生产环境实践(四)实现log格式化interceptor
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng生产环境实践(四)实现log格式化interceptor
- Flume-ng生产环境实践(一)Flume-ng生产环境编译
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng生产环境实践(四)实现log格式化interceptor
- 测试环境运行正常的SQL到生产上奇慢无比,最终导致UI访问超时;确定SQL效率无问题,那么就极有可能使生产环境的表数据量较大而且没有做分析。
- 警惕生产环境与测试环境数据不统一
- 一步一步学Streams:第二部分(13)实践之创建Streams全库复制环境(3)测试
- 生产环境消息队列ActiveMQ的数据积压优化过程
- 生产环境中如何恢复丢失的数据
- 大数据学习[08]:基于HDFS存储的Flume-ng1.7.0集群实践
- DI v6.2,从测试环境移植到生产环境,如何清理以前运行过的垃圾数据(作业监控记录等信息及后台日志记录相关数据)
- 生产数据导入测试环境碰见的一些问题
- 微信公众号测试生产环境分离实践
- 什么是JAVA开发环境、测试环境及生产环境,及它的过程
- matconvnet环境下训练自己的数据集及模型测试-mnist网络结构-cifar10部分数据集