您的位置:首页 > 其它

SpoolDirectorySource使用及源码分析

2015-11-07 16:58 387 查看

Spooling Directory Source简介

Spooling Directory Source可以获取硬盘上“spooling”目录的数据,这个Source将监视指定目录是否有新文件,如果有新文件的话,就解析这个新文件。事件的解析逻辑是可插拔的。在文件的内容所有的都读取到Channel之后,Spooling Directory Source会重名或者是删除该文件以表示文件已经读取完成。

不像Exec Source,这个Source是可靠的,且不会丢失数据。即使Flume重启或者被Kill。但是需要注意如下两点:

        1,如果文件在放入spooling目录之后还在写,那么Flume会打印错误日志,并且停止处理该文件。

        2,如果文件之后重复使用,Flume将打印错误日志,并且停止处理。

为了避免以上问题,我们可以使用唯一的标识符来命令文件,例如:时间戳。
尽管这个Source是可靠的,但是如果下游发生故障,也会导致Event重复,这种情况就需要通过Flume的其他组件提供保障了。

属性名默认描述
channels
type组件名:
spooldir
.
spoolDir读取文件的目录。
fileSuffix.COMPLETEDSpooling读取过的文件,添加的后缀。
deletePolicynever完成后的文件是否删除。
never:不删除
 或 
immediate:立即删除
fileHeaderfalse是不把路径加入到Heander
fileHeaderKeyfile路径加入到Header的Key是什么
basenameHeaderfalse是不把文件名加入到Heander
basenameHeaderKeybasename文件名加入到Header的Key是什么
ignorePattern^$采用正则表达是去过滤一些文件。只有符合正则表达式的文件才会被使用。
trackerDir.flumespool被处理文件的元数据的存储目录,如果不是绝对路径,就被会解析到spoolDir目录下。
consumeOrderoldest消费spooling目录文件的规则,分别有:oldest,youngest和random。在oldest 和 youngest的情况下,

通过文件的最后修改时间来比较文件。如果最后修改时间相同,就根据字典的序列从小开始。在随机的情况

下,就随意读取文件。如果文件列表很长,采用oldest/youngest可能会很慢,因为用oldest/youngest要

扫描文件。但是如果采用random的话,就可能造成新的文件消耗的很快,老的文件一直都没有被消费。
maxBackoff4000如果Channel已经满了,那么该Source连续尝试写入该Channel的最长时间(单位:毫秒)。
batchSize100批量传输到Channel的粒度。
inputCharsetUTF-8字符集
decodeErrorPolicy
FAIL
在文件中有不可解析的字符时的解析策略。
FAIL
: 抛出一个异常,并且不能解析该文件。
REPLACE
:
取代不可

解析的字符,通常用Unicode U+FFFD. 
IGNORE
: 丢弃不可能解析字符序列。
deserializer
LINE
自定序列化的方式,自定的话,必须实现
EventDeserializer.Builder
.
deserializer.*
bufferMaxLines已废弃。
bufferMaxLineLength5000(不推荐使用) 一行中最大的长度,可以使用deserializer.maxLineLength代替。
selector.typereplicatingreplicating(复制) 或 multiplexing(复用)
selector.*取决于selector.type的值
interceptors空格分割的interceptor列表。
interceptors.*

SpoolDirectorySource示例

读取文件写入到file_roll中

a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

#resources
a1.sources.source1.type = spooldir
a1.sources.source1.channels = channel1
a1.sources.source1.spoolDir = E:\\home\\spooling
a1.sources.source1.fileHeader = true
a1.sources.source1.fileHeaderKey = fishfile
a1.sources.source1.basenameHeader = true
a1.sources.source1.basenameHeaderKey = fishbasename

a1.sinks.sink1.type = file_roll
a1.sinks.sink1.sink.directory = E:\\home\\file_roll
a1.sinks.sink1.sink.rollInterval = 300
a1.sinks.sink1.sink.serializer = TEXT
a1.sinks.sink1.sink.batchSize = 100

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1


SpoolDirectorySource源码分析

一,调用configure(Context context)方法初始化:

@Override
public synchronized void configure(Context context) {
//spool目录
spoolDirectory = context.getString(SPOOL_DIRECTORY);
Preconditions.checkState(spoolDirectory != null,
"Configuration must specify a spooling directory");
//完成后的文件后缀
completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
DEFAULT_SPOOLED_FILE_SUFFIX);
//删除策略,never:不删除 或 immediate:立即删除
deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
//以下四个参数是是否在header中加入文件名和文件路径。
fileHeader = context.getBoolean(FILENAME_HEADER,
DEFAULT_FILE_HEADER);
fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
DEFAULT_FILENAME_HEADER_KEY);
basenameHeader = context.getBoolean(BASENAME_HEADER,
DEFAULT_BASENAME_HEADER);
basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
DEFAULT_BASENAME_HEADER_KEY);
//批量处理的数量
batchSize = context.getInteger(BATCH_SIZE,
DEFAULT_BATCH_SIZE);
//字符集
inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
//在文件中有不可解析的字符时的解析策略
decodeErrorPolicy = DecodeErrorPolicy.valueOf(
context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY)
.toUpperCase(Locale.ENGLISH));
//过滤文件的正则表达式
ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
//被处理文件的元数据的存储目录
trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);

//序列化
deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
"."));
//消费spooling目录文件的规则
consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER,
DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH));

// "Hack" to support backwards compatibility with previous generation of
// spooling directory source, which did not support deserializers
Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);
if (bufferMaxLineLength != null && deserializerType != null &&
deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
deserializerContext.put(LineDeserializer.MAXLINE_KEY,
bufferMaxLineLength.toString());
}

maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}


start方法:

@Override
public synchronized void start() {
logger.info("SpoolDirectorySource source starting with directory: {}",
spoolDirectory);

executor = Executors.newSingleThreadScheduledExecutor();

File directory = new File(spoolDirectory);
//构建ReliableSpoolingFileEventReader对象
try {
reader = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(directory)
.completedSuffix(completedSuffix)
.ignorePattern(ignorePattern)
.trackerDirPath(trackerDirPath)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.annotateBaseName(basenameHeader)
.baseNameHeader(basenameHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
.inputCharset(inputCharset)
.decodeErrorPolicy(decodeErrorPolicy)
.consumeOrder(consumeOrder)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser",
ioe);
}
//构建SpoolDirectoryRunnable线程。
Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
//每隔POLL_DELAY_MS(500ms)执行以下SpoolDirectoryRunnable线程。
executor.scheduleWithFixedDelay(
runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

super.start();
logger.debug("SpoolDirectorySource source started");
sourceCounter.start();
}
构建ReliableSpoolingFileEventReader类,构造方法功能:

1,spooling目录是否存在,是否是目录

2,通过创建零时文件测试spooling目录的权限。

3,创建trackerDir目录和.flumespool-main.meta文件

SpoolDirectoryRunnable线程,主要用于发送和读取Event:

private class SpoolDirectoryRunnable implements Runnable {
private ReliableSpoolingFileEventReader reader;
private SourceCounter sourceCounter;

public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
SourceCounter sourceCounter) {
this.reader = reader;
this.sourceCounter = sourceCounter;
}

@Override
public void run() {
int backoffInterval = 250;
try {
while (!Thread.interrupted()) {
//ReliableSpoolingFileEventReader读取batchSize大小的Event
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
//统计
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();

try {
//将Event数组发送到Channel
getChannelProcessor().processEventBatch(events);
//commit会记录最后一次读取的行数,以便下次知道从哪里开始读
reader.commit();
} catch (ChannelException ex) {
//ChannelProcessor批量提交Event出错,会抛出ChannelException异常,此时reader.commit是没有执行的
//所以在接下来的continue后,继续通过reader读取文件的话,还是从原来的位置读取,以保证数据不会丢失。
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}
continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (Throwable t) {
logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
"Uncaught exception in SpoolDirectorySource thread. " +
"Restart or reconfigure Flume to continue processing.", t);
hasFatalError = true;
Throwables.propagate(t);
}
}
}
ReliableSpoolingFileEventReader读取Event:

public List<Event> readEvents(int numEvents) throws IOException {
//committed初始化为true
if (!committed) {
if (!currentFile.isPresent()) {
throw new IllegalStateException("File should not roll when " +
"commit is outstanding.");
}
logger.info("Last read was never committed - resetting mark position.");
//正常情况下,会在SpoolDirectorySource类中记录读取的字节数之后,将commited设置为true
//没有设置为true,可能是因为发送到Channel异常了,调用下面reset方法可以保证数据不丢失。
currentFile.get().getDeserializer().reset();
} else {
// Check if new files have arrived since last call
if (!currentFile.isPresent()) {
//读取文件,读取文件过程中使用FileFilter过滤掉completedSuffix后缀的文件,然后根据消费文件的规则(consumeOrder)去消费文件。
currentFile = getNextFile();
}
// Return empty list if no new files
if (!currentFile.isPresent()) {
return Collections.emptyList();
}
}

EventDeserializer des = currentFile.get().getDeserializer();
//根据序列化类读取Event
List<Event> events = des.readEvents(numEvents);

/* It's possible that the last read took us just up to a file boundary.
* If so, try to roll to the next file, if there is one.
* Loop until events is not empty or there is no next file in case of 0 byte files */
while (events.isEmpty()) {
logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
retireCurrentFile();
currentFile = getNextFile();
if (!currentFile.isPresent()) {
return Collections.emptyList();
}
events = currentFile.get().getDeserializer().readEvents(numEvents);
}
//添加<span style="font-family:Microsoft Yahei;">文件路径到Header</span>
if (annotateFileName) {
String filename = currentFile.get().getFile().getAbsolutePath();
for (Event event : events) {
event.getHeaders().put(fileNameHeader, filename);
}
}
//添加文件名到Header
if (annotateBaseName) {
String basename = currentFile.get().getFile().getName();
for (Event event : events) {
event.getHeaders().put(baseNameHeader, basename);
}
}

committed = false;
lastFileRead = currentFile;
return events;
}
消费文件的规则,以OLDEST为例:

for (File candidateFile: candidateFiles) {
//已选择文件的最后修改时间,减去文件列表取的文件最后修改时间
long compare = selectedFile.lastModified() -
candidateFile.lastModified();
if (compare == 0) { // ts is same pick smallest lexicographically.
//时间一样就根据字典序列排序
selectedFile = smallerLexicographical(selectedFile, candidateFile);
} else if (compare > 0) { // candidate is older (cand-ts < selec-ts).
selectedFile = candidateFile;
}
}


在ReliableSpoolingFileEventReader类读取Events之后,调用commit方法提交,这里有个问题,batchSize为200,我们每次也就读取200行,那么SpoolingDirectorySource是如何标记我们读到文件的那个位置的呢?

其实SpoolingDirectorySource在commit时,会调用EventDeserializer的mark方法标记此时读取在文件的什么位置。源码如下:

public synchronized void storePosition(long position) throws IOException {
metaCache.setOffset(position);
writer.append(metaCache);
writer.sync();
writer.flush();
}
上面的方法会把文件的读取到什么位置记录到.flumespool\.flumespool-main.meta(默认情况下)文件中。

public void commit() throws IOException {
if (!committed && currentFile.isPresent()) {
currentFile.get().getDeserializer().mark();
//mark成功后,将committed设置为true
committed = true;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息