lucene4.7源码研究之索引建立过程(3)-2
2015-04-23 10:42
429 查看
接上篇
第87行,构建索引管理IndexFileDeleter,所谓管理就是对索引文件进行引用计数,IndexFileDeleter构造时做了以下事情,使用segment.read来检查复合lucene索引规则文件的有效性,包括初始化这些文件的引用,然后删除无引用文件,删除失败的文件会有checkpoit中进行再次删除,针对commit的策略对commit进行管理,对segment.gen文件进行change记录
第43行IndexFileDeleter中包含一个final的静态内部类,RefCount,来维护各个文件的引用数量,如何创建,接下
接43行代码,对于没有引用的文件进行引用,初始化count为0
同样都是在IndexFileDeleter中
第130行
FSDirectory
第140行,checkPoint
87 synchronized(this) {//构建索引管理对象 88 deleter = new IndexFileDeleter(directory, 89 config.getIndexDeletionPolicy(),//默认策略为KeepOnlyLastCommitDeleter 90 segmentInfos, infoStream, this, 91 initialIndexExists); 92 } 93 94 if (deleter.startingCommitDeleted) { 95 // Deletion policy deleted the "head" commit point. 96 // We have to mark ourself as changed so that if we 97 // are closed w/o any further changes we write a new 98 // segments_N file. 99 changed();//version++ 100 } 101 102 if (infoStream.isEnabled("IW")) { 103 infoStream.message("IW", "init: create=" + create); 104 messageState(); 105 } 106 107 success = true; 108 109 } finally { 110 if (!success) { 111 if (infoStream.isEnabled("IW")) { 112 infoStream.message("IW", "init: hit exception on init; releasing write lock"); 113 } 114 IOUtils.closeWhileHandlingException(writeLock); 115 writeLock = null; 116 } 117 } 118 }
第87行,构建索引管理IndexFileDeleter,所谓管理就是对索引文件进行引用计数,IndexFileDeleter构造时做了以下事情,使用segment.read来检查复合lucene索引规则文件的有效性,包括初始化这些文件的引用,然后删除无引用文件,删除失败的文件会有checkpoit中进行再次删除,针对commit的策略对commit进行管理,对segment.gen文件进行change记录
/** * Initialize the deleter: find all previous commits in * the Directory, incref the files they reference, call * the policy to let it delete commits. This will remove * any files not referenced by any of the commits. * @throws IOException if there is a low-level IO error */ public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, InfoStream infoStream, IndexWriter writer, boolean initialIndexExists) throws IOException { this.infoStream = infoStream; this.writer = writer; final String currentSegmentsFile = segmentInfos.getSegmentsFileName();//拿到segment_gen文件 if (infoStream.isEnabled("IFD")) {//由于InfoStream实现为NoOutput,isEnable返回false infoStream.message("IFD", "init: current segments file is \"" + currentSegmentsFile + "\"; deletionPolicy=" + policy); } this.policy = policy; this.directory = directory; // First pass: walk the files and initialize our ref // counts: long currentGen = segmentInfos.getGeneration();//拿到当前gen CommitPoint currentCommitPoint = null;//commit指针 String[] files = null; try { files = directory.listAll();//列出所有索引文件 } catch (NoSuchDirectoryException e) { // it means the directory is empty, so ignore it. files = new String[0]; } if (currentSegmentsFile != null) {//在跟文件存在的情况下 Matcher m = IndexFileNames.CODEC_FILE_PATTERN.matcher("");//所有符合codec校验的文件都符合这个正则表达式 for (String fileName : files) { m.reset(fileName); if (!fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)//1.非写锁文件 2.非段gen文件 3.符合正则或者以segment开头的文件 && (m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS))) { // Add this file to refCounts with initial count 0: getRefCount(fileName);//如果符合上述条件的文件,引用数初始化为0 if (fileName.startsWith(IndexFileNames.SEGMENTS)) {//所有以segment开头文件,除了segment.gen文件 // This is a commit (segments or segments_N), and // it's valid (<= the max gen). Load it, then // incref all files it refers to: if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "init: load commit \"" + fileName + "\""); } SegmentInfos sis = new SegmentInfos(); try { sis.read(directory, fileName);//读取该segment,目的就是为了走异常的逻辑,判断segment文件是否异常 } catch (FileNotFoundException e) {//根据注解的意思是在不同的虚拟机中切换writer可能会导致segments读脏,不太理解 // LUCENE-948: on NFS (and maybe others), if // you have writers switching back and forth // between machines, it's very likely that the // dir listing will be stale and will claim a // file segments_X exists when in fact it // doesn't. So, we catch this and handle it // as if the file does not exist if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point"); } sis = null; } catch (IOException e) { if (SegmentInfos.generationFromSegmentsFileName(fileName) <= currentGen && directory.fileLength(fileName) > 0) {//如果读到了老的segment,就throw throw e; } else { // Most likely we are opening an index that // has an aborted "future" commit, so suppress // exc in this case sis = null; } } if (sis != null) {//如果读取没有出现异常 final CommitPoint commitPoint = new CommitPoint(commitsToDelete, directory, sis);//创建commit指针对象 if (sis.getGeneration() == segmentInfos.getGeneration()) {//如果读取的时与当前版本一致的segment,则赋值给当前的提交指针 currentCommitPoint = commitPoint; } commits.add(commitPoint);//加入commitPoint列表 incRef(sis, true);//引用数+1,把当前sis加入IndexFileDeleter引用进行管理,除了segment.gen文件 if (lastSegmentInfos == null || sis.getGeneration() > lastSegmentInfos.getGeneration()) {//如果读取到最新的sis,则赋值给lastSegmentInfos lastSegmentInfos = sis; } } } } } } if (currentCommitPoint == null && currentSegmentsFile != null && initialIndexExists) {//对gen文件进行处理 // We did not in fact see the segments_N file // corresponding to the segmentInfos that was passed // in. Yet, it must exist, because our caller holds // the write lock. This can happen when the directory // listing was stale (eg when index accessed via NFS // client with stale directory listing cache). So we // try now to explicitly open this commit point: SegmentInfos sis = new SegmentInfos(); try { sis.read(directory, currentSegmentsFile); } catch (IOException e) { throw new CorruptIndexException("failed to locate current segments_N file \"" + currentSegmentsFile + "\""); } if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "forced open of current segments file " + segmentInfos.getSegmentsFileName()); } currentCommitPoint = new CommitPoint(commitsToDelete, directory, sis); commits.add(currentCommitPoint); incRef(sis, true);//segment.gen文件引用加1 } // We keep commits list in sorted order (oldest to newest): CollectionUtil.timSort(commits);//从老到新排序 // Now delete anything with ref count at 0. These are // presumably abandoned files eg due to crash of // IndexWriter. for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {//将refCount中文件引用count为0的文件直接删除 RefCount rc = entry.getValue(); final String fileName = entry.getKey(); if (0 == rc.count) { if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "init: removing unreferenced file \"" + fileName + "\""); } deleteFile(fileName);//删除详见下方 } } // Finally, give policy a chance to remove things on // startup: policy.onInit(commits);//使用删除策略做最后一次检查,默认keepOnlyLastDeletionCommitPolicy,难道只有删除有事务?该策略会保留一次删除commit // Always protect the incoming segmentInfos since // sometime it may not be the most recent commit checkpoint(segmentInfos, false);//详情见140 startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted(); deleteCommits(); }
第43行IndexFileDeleter中包含一个final的静态内部类,RefCount,来维护各个文件的引用数量,如何创建,接下
/** * Tracks the reference count for a single index file: */ final private static class RefCount { // fileName used only for better assert error messages final String fileName; boolean initDone; RefCount(String fileName) { this.fileName = fileName; } int count;//数量 public int IncRef() { if (!initDone) { initDone = true; } else { assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-increment for file \"" + fileName + "\""; } return ++count; } public int DecRef() { assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-decrement for file \"" + fileName + "\""; return --count; } }
接43行代码,对于没有引用的文件进行引用,初始化count为0
private RefCount getRefCount(String fileName) { assert locked(); RefCount rc; if (!refCounts.containsKey(fileName)) {//refCounts Hashmap保存文件引用数量 rc = new RefCount(fileName); refCounts.put(fileName, rc); } else { rc = refCounts.get(fileName); } return rc; }
同样都是在IndexFileDeleter中
// called only from assert private boolean locked() { return writer == null || Thread.holdsLock(writer); }
第130行
void deleteFile(String fileName) throws IOException { assert locked();//拿到写锁 try { if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "delete \"" + fileName + "\""); } directory.deleteFile(fileName);//调用Directory删除,代码如下 } catch (IOException e) { // if delete fails if (directory.fileExists(fileName)) { // Some operating systems (e.g. Windows) don't // permit a file to be deleted while it is opened // for read (e.g. by another process or thread). So // we assume that when a delete fails it is because // the file is open in another process, and queue // the file for subsequent deletion. if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "unable to remove file \"" + fileName + "\": " + e.toString() + "; Will re-try later."); } if (deletable == null) { deletable = new ArrayList<String>(); } deletable.add(fileName); // add to deletable 如果删除失败,将file全部放到可删除列表deletable,deletable在checkpoint时会被再次删除 } } }
FSDirectory
/** Removes an existing file in the directory. */ @Override public void deleteFile(String name) throws IOException { ensureOpen(); File file = new File(directory, name); if (!file.delete())//删除 throw new IOException("Cannot delete " + file); staleFiles.remove(name);//并且把名字放入staleFiles,同步set }
第140行,checkPoint
/** * For definition of "check point" see IndexWriter comments: * "Clarification: Check Points (and commits)". * * Writer calls this when it has made a "consistent * change" to the index, meaning new files are written to * the index and the in-memory SegmentInfos have been * modified to point to those files. * * This may or may not be a commit (segments_N may or may * not have been written). * * We simply incref the files referenced by the new * SegmentInfos and decref the files we had previously * seen (if any). * * If this is a commit, we also call the policy to give it * a chance to remove other commits. If any commits are * removed, we decref their files as well. */ public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException { assert locked(); assert Thread.holdsLock(writer); long t0 = 0; if (infoStream.isEnabled("IFD")) { t0 = System.nanoTime(); infoStream.message("IFD", "now checkpoint \"" + writer.segString(writer.toLiveInfos(segmentInfos)) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]"); } // Try again now to delete any previously un-deletable // files (because they were in use, on Windows): deletePendingFiles();//再次删除之前删除失败的文件 // Incref the files: incRef(segmentInfos, isCommit);//如果被提交,则引用数加1 if (isCommit) { // Append to our commits list: commits.add(new CommitPoint(commitsToDelete, directory, segmentInfos)); // Tell policy so it can remove commits: policy.onCommit(commits);//删除commit只剩最近一次commit // Decref files for commits that were deleted by the policy: deleteCommits(); } else { // DecRef old files from the last checkpoint, if any: decRef(lastFiles);//每次checkpoint都会重新引用lastFiles lastFiles.clear(); // Save files so we can decr on next checkpoint/commit: lastFiles.addAll(segmentInfos.files(directory, false)); } if (infoStream.isEnabled("IFD")) { long t1 = System.nanoTime(); infoStream.message("IFD", ((t1-t0)/1000000) + " msec to checkpoint"); } }
相关文章推荐
- lucene4.7源码研究之索引建立过程(3)-1
- lucene4.7源码研究之索引建立过程(2)
- lucene4.7源码研究之索引建立过程(1)
- lucene(全文搜索)_根据内容建立索引_源码下载
- lucene(全文搜索)_建立索引_根据关键字全文搜索_源码下载
- lucene建立索引的过程
- Lucene 3.0.0的细节初窥(2)-研究在索引过程中的缓存
- 针对Lucene 建立索引过程与搜索过程耗费时间的矛盾一点思路
- lucene索引建立的效率研究
- Lucene建立索引的过程学习
- lucene 建立索引的过程
- lucene4.5源码分析系列:索引的创建过程
- Lucene源码解析--索引创建过程
- lucene源码-倒排索引的读过程
- lucene建立索引的过程
- lucene4.7索引源码研究之Segment元文件
- Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
- 利用lucene对整个数据库建立索引
- 通过Lucene索引文件学习Lucene索引过程
- live555源码研究(十)------在编译过程中遇到的问题及解决方法