您的位置:首页 > 其它

lucene4.7源码研究之索引建立过程(3)-2

2015-04-23 10:42 429 查看
接上篇

87       synchronized(this) {//构建索引管理对象
88         deleter = new IndexFileDeleter(directory,
89                                        config.getIndexDeletionPolicy(),//默认策略为KeepOnlyLastCommitDeleter
90                                        segmentInfos, infoStream, this,
91                                        initialIndexExists);
92       }
93
94       if (deleter.startingCommitDeleted) {
95         // Deletion policy deleted the "head" commit point.
96         // We have to mark ourself as changed so that if we
97         // are closed w/o any further changes we write a new
98         // segments_N file.
99         changed();//version++
100       }
101
102       if (infoStream.isEnabled("IW")) {
103         infoStream.message("IW", "init: create=" + create);
104         messageState();
105       }
106
107       success = true;
108
109     } finally {
110       if (!success) {
111         if (infoStream.isEnabled("IW")) {
112           infoStream.message("IW", "init: hit exception on init; releasing write lock");
113         }
114         IOUtils.closeWhileHandlingException(writeLock);
115         writeLock = null;
116       }
117     }
118   }


第87行,构建索引管理IndexFileDeleter,所谓管理就是对索引文件进行引用计数,IndexFileDeleter构造时做了以下事情,使用segment.read来检查复合lucene索引规则文件的有效性,包括初始化这些文件的引用,然后删除无引用文件,删除失败的文件会有checkpoit中进行再次删除,针对commit的策略对commit进行管理,对segment.gen文件进行change记录

/**
* Initialize the deleter: find all previous commits in
* the Directory, incref the files they reference, call
* the policy to let it delete commits.  This will remove
* any files not referenced by any of the commits.
* @throws IOException if there is a low-level IO error
*/
public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos,
InfoStream infoStream, IndexWriter writer, boolean initialIndexExists) throws IOException {
this.infoStream = infoStream;
this.writer = writer;

final String currentSegmentsFile = segmentInfos.getSegmentsFileName();//拿到segment_gen文件

if (infoStream.isEnabled("IFD")) {//由于InfoStream实现为NoOutput,isEnable返回false
infoStream.message("IFD", "init: current segments file is \"" + currentSegmentsFile + "\"; deletionPolicy=" + policy);
}

this.policy = policy;
this.directory = directory;

// First pass: walk the files and initialize our ref
// counts:
long currentGen = segmentInfos.getGeneration();//拿到当前gen

CommitPoint currentCommitPoint = null;//commit指针
String[] files = null;
try {
files = directory.listAll();//列出所有索引文件
} catch (NoSuchDirectoryException e) {
// it means the directory is empty, so ignore it.
files = new String[0];
}

if (currentSegmentsFile != null) {//在跟文件存在的情况下
Matcher m = IndexFileNames.CODEC_FILE_PATTERN.matcher("");//所有符合codec校验的文件都符合这个正则表达式
for (String fileName : files) {
m.reset(fileName);
if (!fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)//1.非写锁文件 2.非段gen文件 3.符合正则或者以segment开头的文件
&& (m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS))) {

// Add this file to refCounts with initial count 0:
getRefCount(fileName);//如果符合上述条件的文件,引用数初始化为0

if (fileName.startsWith(IndexFileNames.SEGMENTS)) {//所有以segment开头文件,除了segment.gen文件

// This is a commit (segments or segments_N), and
// it's valid (<= the max gen).  Load it, then
// incref all files it refers to:
if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "init: load commit \"" + fileName + "\"");
}
SegmentInfos sis = new SegmentInfos();
try {
sis.read(directory, fileName);//读取该segment,目的就是为了走异常的逻辑,判断segment文件是否异常
} catch (FileNotFoundException e) {//根据注解的意思是在不同的虚拟机中切换writer可能会导致segments读脏,不太理解
// LUCENE-948: on NFS (and maybe others), if
// you have writers switching back and forth
// between machines, it's very likely that the
// dir listing will be stale and will claim a
// file segments_X exists when in fact it
// doesn't.  So, we catch this and handle it
// as if the file does not exist
if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point");
}
sis = null;
} catch (IOException e) {
if (SegmentInfos.generationFromSegmentsFileName(fileName) <= currentGen && directory.fileLength(fileName) > 0) {//如果读到了老的segment,就throw
throw e;
} else {
// Most likely we are opening an index that
// has an aborted "future" commit, so suppress
// exc in this case
sis = null;
}
}
if (sis != null) {//如果读取没有出现异常
final CommitPoint commitPoint = new CommitPoint(commitsToDelete, directory, sis);//创建commit指针对象
if (sis.getGeneration() == segmentInfos.getGeneration()) {//如果读取的时与当前版本一致的segment,则赋值给当前的提交指针
currentCommitPoint = commitPoint;
}
commits.add(commitPoint);//加入commitPoint列表
incRef(sis, true);//引用数+1,把当前sis加入IndexFileDeleter引用进行管理,除了segment.gen文件

if (lastSegmentInfos == null || sis.getGeneration() > lastSegmentInfos.getGeneration()) {//如果读取到最新的sis,则赋值给lastSegmentInfos
lastSegmentInfos = sis;
}
}
}
}
}
}

if (currentCommitPoint == null && currentSegmentsFile != null && initialIndexExists) {//对gen文件进行处理
// We did not in fact see the segments_N file
// corresponding to the segmentInfos that was passed
// in.  Yet, it must exist, because our caller holds
// the write lock.  This can happen when the directory
// listing was stale (eg when index accessed via NFS
// client with stale directory listing cache).  So we
// try now to explicitly open this commit point:
SegmentInfos sis = new SegmentInfos();
try {
sis.read(directory, currentSegmentsFile);
} catch (IOException e) {
throw new CorruptIndexException("failed to locate current segments_N file \"" + currentSegmentsFile + "\"");
}
if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "forced open of current segments file " + segmentInfos.getSegmentsFileName());
}
currentCommitPoint = new CommitPoint(commitsToDelete, directory, sis);
commits.add(currentCommitPoint);
incRef(sis, true);//segment.gen文件引用加1
}

// We keep commits list in sorted order (oldest to newest):
CollectionUtil.timSort(commits);//从老到新排序

// Now delete anything with ref count at 0.  These are
// presumably abandoned files eg due to crash of
// IndexWriter.
for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {//将refCount中文件引用count为0的文件直接删除
RefCount rc = entry.getValue();
final String fileName = entry.getKey();
if (0 == rc.count) {
if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "init: removing unreferenced file \"" + fileName + "\"");
}
deleteFile(fileName);//删除详见下方
}
}

// Finally, give policy a chance to remove things on
// startup:
policy.onInit(commits);//使用删除策略做最后一次检查,默认keepOnlyLastDeletionCommitPolicy,难道只有删除有事务?该策略会保留一次删除commit

// Always protect the incoming segmentInfos since
// sometime it may not be the most recent commit
checkpoint(segmentInfos, false);//详情见140

startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();

deleteCommits();
}


第43行IndexFileDeleter中包含一个final的静态内部类,RefCount,来维护各个文件的引用数量,如何创建,接下

/**
* Tracks the reference count for a single index file:
*/
final private static class RefCount {

// fileName used only for better assert error messages
final String fileName;
boolean initDone;
RefCount(String fileName) {
this.fileName = fileName;
}

int count;//数量

public int IncRef() {
if (!initDone) {
initDone = true;
} else {
assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-increment for file \"" + fileName + "\"";
}
return ++count;
}

public int DecRef() {
assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-decrement for file \"" + fileName + "\"";
return --count;
}
}


接43行代码,对于没有引用的文件进行引用,初始化count为0

private RefCount getRefCount(String fileName) {
assert locked();
RefCount rc;
if (!refCounts.containsKey(fileName)) {//refCounts Hashmap保存文件引用数量
rc = new RefCount(fileName);
refCounts.put(fileName, rc);
} else {
rc = refCounts.get(fileName);
}
return rc;
}


同样都是在IndexFileDeleter中

// called only from assert
private boolean locked() {
return writer == null || Thread.holdsLock(writer);
}


第130行

void deleteFile(String fileName)
throws IOException {
assert locked();//拿到写锁
try {
if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "delete \"" + fileName + "\"");
}
directory.deleteFile(fileName);//调用Directory删除,代码如下
} catch (IOException e) {  // if delete fails
if (directory.fileExists(fileName)) {

// Some operating systems (e.g. Windows) don't
// permit a file to be deleted while it is opened
// for read (e.g. by another process or thread). So
// we assume that when a delete fails it is because
// the file is open in another process, and queue
// the file for subsequent deletion.

if (infoStream.isEnabled("IFD")) {
infoStream.message("IFD", "unable to remove file \"" + fileName + "\": " + e.toString() + "; Will re-try later.");
}
if (deletable == null) {
deletable = new ArrayList<String>();
}
deletable.add(fileName);                // add to deletable 如果删除失败,将file全部放到可删除列表deletable,deletable在checkpoint时会被再次删除
}
}
}


FSDirectory

/** Removes an existing file in the directory. */
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
File file = new File(directory, name);
if (!file.delete())//删除
throw new IOException("Cannot delete " + file);
staleFiles.remove(name);//并且把名字放入staleFiles,同步set
}


第140行,checkPoint

/**
* For definition of "check point" see IndexWriter comments:
* "Clarification: Check Points (and commits)".
*
* Writer calls this when it has made a "consistent
* change" to the index, meaning new files are written to
* the index and the in-memory SegmentInfos have been
* modified to point to those files.
*
* This may or may not be a commit (segments_N may or may
* not have been written).
*
* We simply incref the files referenced by the new
* SegmentInfos and decref the files we had previously
* seen (if any).
*
* If this is a commit, we also call the policy to give it
* a chance to remove other commits.  If any commits are
* removed, we decref their files as well.
*/
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
assert locked();

assert Thread.holdsLock(writer);
long t0 = 0;
if (infoStream.isEnabled("IFD")) {
t0 = System.nanoTime();
infoStream.message("IFD", "now checkpoint \"" + writer.segString(writer.toLiveInfos(segmentInfos)) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}

// Try again now to delete any previously un-deletable
// files (because they were in use, on Windows):
deletePendingFiles();//再次删除之前删除失败的文件

// Incref the files:
incRef(segmentInfos, isCommit);//如果被提交,则引用数加1

if (isCommit) {
// Append to our commits list:
commits.add(new CommitPoint(commitsToDelete, directory, segmentInfos));

// Tell policy so it can remove commits:
policy.onCommit(commits);//删除commit只剩最近一次commit

// Decref files for commits that were deleted by the policy:
deleteCommits();
} else {
// DecRef old files from the last checkpoint, if any:
decRef(lastFiles);//每次checkpoint都会重新引用lastFiles
lastFiles.clear();

// Save files so we can decr on next checkpoint/commit:
lastFiles.addAll(segmentInfos.files(directory, false));
}
if (infoStream.isEnabled("IFD")) {
long t1 = System.nanoTime();
infoStream.message("IFD", ((t1-t0)/1000000) + " msec to checkpoint");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: