lucene4.7源码研究之索引建立过程(3)-2
接上篇
87 synchronized(this) {//构建索引管理对象 88 deleter = new IndexFileDeleter(directory, 89 config.getIndexDeletionPolicy(),//默认策略为KeepOnlyLastCommitDeleter 90 segmentInfos, infoStream, this, 91 initialIndexExists); 92 } 93 94 if (deleter.startingCommitDeleted) { 95 // Deletion policy deleted the "head" commit point. 96 // We have to mark ourself as changed so that if we 97 // are closed w/o any further changes we write a new 98 // segments_N file. 99 changed(); 100 } 101 102 if (infoStream.isEnabled("IW")) { 103 infoStream.message("IW", "init: create=" + create); 104 messageState(); 105 } 106 107 success = true; 108 109 } finally { 110 if (!success) { 111 if (infoStream.isEnabled("IW")) { 112 infoStream.message("IW", "init: hit exception on init; releasing write lock"); 113 } 114 IOUtils.closeWhileHandlingException(writeLock); 115 writeLock = null; 116 } 117 } 118 }
第87行,构建索引管理IndexFileDeleter,所谓管理就是对索引文件进行引用计数
1 /** 2 * Initialize the deleter: find all previous commits in 3 * the Directory, incref the files they reference, call 4 * the policy to let it delete commits. This will remove 5 * any files not referenced by any of the commits. 6 * @throws IOException if there is a low-level IO error 7 */ 8 public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, 9 InfoStream infoStream, IndexWriter writer, boolean initialIndexExists) throws IOException { 10 this.infoStream = infoStream; 11 this.writer = writer; 12 13 final String currentSegmentsFile = segmentInfos.getSegmentsFileName();//拿到segment_gen文件 14 15 if (infoStream.isEnabled("IFD")) {//由于InfoStream实现为NoOutput,isEnable返回false 16 infoStream.message("IFD", "init: current segments file is \"" + currentSegmentsFile + "\"; deletionPolicy=" + policy); 17 } 18 19 this.policy = policy; 20 this.directory = directory; 21 22 // First pass: walk the files and initialize our ref 23 // counts: 24 long currentGen = segmentInfos.getGeneration();//拿到当前gen 25 26 CommitPoint currentCommitPoint = null;//commit指针 27 String[] files = null; 28 try { 29 files = directory.listAll();//列出所有索引文件 30 } catch (NoSuchDirectoryException e) { 31 // it means the directory is empty, so ignore it. 32 files = new String[0]; 33 } 34 35 if (currentSegmentsFile != null) { 36 Matcher m = IndexFileNames.CODEC_FILE_PATTERN.matcher("");//所有符合codec校验的文件都符合这个正则表达式 37 for (String fileName : files) { 38 m.reset(fileName); 39 if (!fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)//1.非写锁文件 2.非段gen文件 3.符合正则或者以segment开头的文件 40 && (m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS))) { 41 42 // Add this file to refCounts with initial count 0: 43 getRefCount(fileName);//如果符合上述条件的文件,引用数加1,具体怎么加1的呢,详解如下 44 45 if (fileName.startsWith(IndexFileNames.SEGMENTS)) {//所有以segment开头文件 46 47 // This is a commit (segments or segments_N), and 48 // it‘s valid (<= the max gen). Load it, then 49 // incref all files it refers to: 50 if (infoStream.isEnabled("IFD")) { 51 infoStream.message("IFD", "init: load commit \"" + fileName + "\""); 52 } 53 SegmentInfos sis = new SegmentInfos(); 54 try { 55 sis.read(directory, fileName);//读取该segment,目的就是为了走异常的逻辑 56 } catch (FileNotFoundException e) {//根据注解的意思是在不同的虚拟机中切换writer可能会导致segments读脏,不太理解 57 // LUCENE-948: on NFS (and maybe others), if 58 // you have writers switching back and forth 59 // between machines, it‘s very likely that the 60 // dir listing will be stale and will claim a 61 // file segments_X exists when in fact it 62 // doesn‘t. So, we catch this and handle it 63 // as if the file does not exist 64 if (infoStream.isEnabled("IFD")) { 65 infoStream.message("IFD", "init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point"); 66 } 67 sis = null; 68 } catch (IOException e) { 69 if (SegmentInfos.generationFromSegmentsFileName(fileName) <= currentGen && directory.fileLength(fileName) > 0) {//如果读到了老的segment,就throw 70 throw e; 71 } else { 72 // Most likely we are opening an index that 73 // has an aborted "future" commit, so suppress 74 // exc in this case 75 sis = null; 76 } 77 } 78 if (sis != null) {//如果读取没有出现异常 79 final CommitPoint commitPoint = new CommitPoint(commitsToDelete, directory, sis);//创建commit指针对象 80 if (sis.getGeneration() == segmentInfos.getGeneration()) {//如果读取的时与当前版本一致的segment,则赋值给当前的提交指针 81 currentCommitPoint = commitPoint; 82 } 83 commits.add(commitPoint);//加入commitPoint列表 84 incRef(sis, true);//把当前sis加入IndexFileDeleter引用进行管理 85 86 if (lastSegmentInfos == null || sis.getGeneration() > lastSegmentInfos.getGeneration()) {//如果读取到最新的sis,则赋值给lastSegmentInfos 87 lastSegmentInfos = sis; 88 } 89 } 90 } 91 } 92 } 93 } 94 95 if (currentCommitPoint == null && currentSegmentsFile != null && initialIndexExists) { 96 // We did not in fact see the segments_N file 97 // corresponding to the segmentInfos that was passed 98 // in. Yet, it must exist, because our caller holds 99 // the write lock. This can happen when the directory 100 // listing was stale (eg when index accessed via NFS 101 // client with stale directory listing cache). So we 102 // try now to explicitly open this commit point: 103 SegmentInfos sis = new SegmentInfos(); 104 try { 105 sis.read(directory, currentSegmentsFile); 106 } catch (IOException e) { 107 throw new CorruptIndexException("failed to locate current segments_N file \"" + currentSegmentsFile + "\""); 108 } 109 if (infoStream.isEnabled("IFD")) { 110 infoStream.message("IFD", "forced open of current segments file " + segmentInfos.getSegmentsFileName()); 111 } 112 currentCommitPoint = new CommitPoint(commitsToDelete, directory, sis); 113 commits.add(currentCommitPoint); 114 incRef(sis, true);//sis文件引用加1 115 } 116 117 // We keep commits list in sorted order (oldest to newest): 118 CollectionUtil.timSort(commits);//从老到新排序 119 120 // Now delete anything with ref count at 0. These are 121 // presumably abandoned files eg due to crash of 122 // IndexWriter. 123 for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {//将refCount中文件引用count为0的文件直接删除 124 RefCount rc = entry.getValue(); 125 final String fileName = entry.getKey(); 126 if (0 == rc.count) { 127 if (infoStream.isEnabled("IFD")) { 128 infoStream.message("IFD", "init: removing unreferenced file \"" + fileName + "\""); 129 } 130 deleteFile(fileName);//删除详见下方 131 } 132 } 133 134 // Finally, give policy a chance to remove things on 135 // startup: 136 policy.onInit(commits);//使用删除策略做最后一次检查,默认keepOnlyLastDeletionCommitPolicy,难道只有删除有事务?该策略会保留一次删除commit 137 138 // Always protect the incoming segmentInfos since 139 // sometime it may not be the most recent commit 140 checkpoint(segmentInfos, false);//详情见140 141 142 startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted(); 143 144 deleteCommits(); 145 }
第43行IndexFileDeleter中包含一个final的静态内部类,RefCount,来维护各个文件的引用数量,如何创建,接下
/** * Tracks the reference count for a single index file: */ final private static class RefCount { // fileName used only for better assert error messages final String fileName; boolean initDone; RefCount(String fileName) { this.fileName = fileName; } int count;//数量 public int IncRef() { if (!initDone) { initDone = true; } else { assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-increment for file \"" + fileName + "\""; } return ++count; } public int DecRef() { assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-decrement for file \"" + fileName + "\""; return --count; } }
接43行代码
private RefCount getRefCount(String fileName) { assert locked(); RefCount rc; if (!refCounts.containsKey(fileName)) {//refCounts Hashmap保存文件引用数量 rc = new RefCount(fileName); refCounts.put(fileName, rc); } else { rc = refCounts.get(fileName); } return rc; }
同样都是在IndexFileDeleter中
// called only from assert private boolean locked() { return writer == null || Thread.holdsLock(writer); }
第130行
void deleteFile(String fileName) throws IOException { assert locked();//拿到写锁 try { if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "delete \"" + fileName + "\""); } directory.deleteFile(fileName);//调用Directory删除,代码如下 } catch (IOException e) { // if delete fails if (directory.fileExists(fileName)) { // Some operating systems (e.g. Windows) don‘t // permit a file to be deleted while it is opened // for read (e.g. by another process or thread). So // we assume that when a delete fails it is because // the file is open in another process, and queue // the file for subsequent deletion. if (infoStream.isEnabled("IFD")) { infoStream.message("IFD", "unable to remove file \"" + fileName + "\": " + e.toString() + "; Will re-try later."); } if (deletable == null) { deletable = new ArrayList<String>(); } deletable.add(fileName); // add to deletable 如果删除失败,将file全部放到可删除列表deletable,但是deletable会什么时候删除呢? } } }
FSDirectory
/** Removes an existing file in the directory. */ @Override public void deleteFile(String name) throws IOException { ensureOpen(); File file = new File(directory, name); if (!file.delete())//删除 throw new IOException("Cannot delete " + file); staleFiles.remove(name);//并且把名字放入staleFiles,同步set }
第140行,checkPoint
/** * For definition of "check point" see IndexWriter comments: * "Clarification: Check Points (and commits)". * * Writer calls this when it has made a "consistent * change" to the index, meaning new files are written to * the index and the in-memory SegmentInfos have been * modified to point to those files. * * This may or may not be a commit (segments_N may or may * not have been written). * * We simply incref the files referenced by the new * SegmentInfos and decref the files we had previously * seen (if any). * * If this is a commit, we also call the policy to give it * a chance to remove other commits. If any commits are * removed, we decref their files as well. */ public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException { assert locked(); assert Thread.holdsLock(writer); long t0 = 0; if (infoStream.isEnabled("IFD")) { t0 = System.nanoTime(); infoStream.message("IFD", "now checkpoint \"" + writer.segString(writer.toLiveInfos(segmentInfos)) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]"); } // Try again now to delete any previously un-deletable // files (because they were in use, on Windows): deletePendingFiles(); // Incref the files: incRef(segmentInfos, isCommit); if (isCommit) { // Append to our commits list: commits.add(new CommitPoint(commitsToDelete, directory, segmentInfos)); // Tell policy so it can remove commits: policy.onCommit(commits); // Decref files for commits that were deleted by the policy: deleteCommits(); } else { // DecRef old files from the last checkpoint, if any: decRef(lastFiles); lastFiles.clear(); // Save files so we can decr on next checkpoint/commit: lastFiles.addAll(segmentInfos.files(directory, false)); } if (infoStream.isEnabled("IFD")) { long t1 = System.nanoTime(); infoStream.message("IFD", ((t1-t0)/1000000) + " msec to checkpoint"); } }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。