lucene4.7源码研究之索引建立过程(3)-2

接上篇

 87       synchronized(this) {//构建索引管理对象
 88         deleter = new IndexFileDeleter(directory,
 89                                        config.getIndexDeletionPolicy(),//默认策略为KeepOnlyLastCommitDeleter
 90                                        segmentInfos, infoStream, this,
 91                                        initialIndexExists);
 92       }
 93 
 94       if (deleter.startingCommitDeleted) {
 95         // Deletion policy deleted the "head" commit point.
 96         // We have to mark ourself as changed so that if we
 97         // are closed w/o any further changes we write a new
 98         // segments_N file.
 99         changed();
100       }
101 
102       if (infoStream.isEnabled("IW")) {
103         infoStream.message("IW", "init: create=" + create);
104         messageState();
105       }
106 
107       success = true;
108 
109     } finally {
110       if (!success) {
111         if (infoStream.isEnabled("IW")) {
112           infoStream.message("IW", "init: hit exception on init; releasing write lock");
113         }
114         IOUtils.closeWhileHandlingException(writeLock);
115         writeLock = null;
116       }
117     }
118   }

 第87行,构建索引管理IndexFileDeleter,所谓管理就是对索引文件进行引用计数

  1  /**
  2    * Initialize the deleter: find all previous commits in
  3    * the Directory, incref the files they reference, call
  4    * the policy to let it delete commits.  This will remove
  5    * any files not referenced by any of the commits.
  6    * @throws IOException if there is a low-level IO error
  7    */
  8   public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos,
  9                           InfoStream infoStream, IndexWriter writer, boolean initialIndexExists) throws IOException {
 10     this.infoStream = infoStream;
 11     this.writer = writer;
 12 
 13     final String currentSegmentsFile = segmentInfos.getSegmentsFileName();//拿到segment_gen文件
 14 
 15     if (infoStream.isEnabled("IFD")) {//由于InfoStream实现为NoOutput,isEnable返回false
 16       infoStream.message("IFD", "init: current segments file is \"" + currentSegmentsFile + "\"; deletionPolicy=" + policy);
 17     }
 18 
 19     this.policy = policy;
 20     this.directory = directory;
 21 
 22     // First pass: walk the files and initialize our ref
 23     // counts:
 24     long currentGen = segmentInfos.getGeneration();//拿到当前gen
 25 
 26     CommitPoint currentCommitPoint = null;//commit指针
 27     String[] files = null;
 28     try {
 29       files = directory.listAll();//列出所有索引文件
 30     } catch (NoSuchDirectoryException e) {
 31       // it means the directory is empty, so ignore it.
 32       files = new String[0];
 33     }
 34     
 35     if (currentSegmentsFile != null) {
 36       Matcher m = IndexFileNames.CODEC_FILE_PATTERN.matcher("");//所有符合codec校验的文件都符合这个正则表达式
 37       for (String fileName : files) {
 38         m.reset(fileName);
 39         if (!fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)//1.非写锁文件 2.非段gen文件 3.符合正则或者以segment开头的文件
 40             && (m.matches() || fileName.startsWith(IndexFileNames.SEGMENTS))) {
 41           
 42           // Add this file to refCounts with initial count 0:
 43           getRefCount(fileName);//如果符合上述条件的文件,引用数加1,具体怎么加1的呢,详解如下
 44           
 45           if (fileName.startsWith(IndexFileNames.SEGMENTS)) {//所有以segment开头文件
 46             
 47             // This is a commit (segments or segments_N), and
 48             // it‘s valid (<= the max gen).  Load it, then
 49             // incref all files it refers to:
 50             if (infoStream.isEnabled("IFD")) {
 51               infoStream.message("IFD", "init: load commit \"" + fileName + "\"");
 52             }
 53             SegmentInfos sis = new SegmentInfos();
 54             try {
 55               sis.read(directory, fileName);//读取该segment,目的就是为了走异常的逻辑
 56             } catch (FileNotFoundException e) {//根据注解的意思是在不同的虚拟机中切换writer可能会导致segments读脏,不太理解
 57               // LUCENE-948: on NFS (and maybe others), if
 58               // you have writers switching back and forth
 59               // between machines, it‘s very likely that the
 60               // dir listing will be stale and will claim a
 61               // file segments_X exists when in fact it
 62               // doesn‘t.  So, we catch this and handle it
 63               // as if the file does not exist
 64               if (infoStream.isEnabled("IFD")) {
 65                 infoStream.message("IFD", "init: hit FileNotFoundException when loading commit \"" + fileName + "\"; skipping this commit point");
 66               }
 67               sis = null;
 68             } catch (IOException e) {
 69               if (SegmentInfos.generationFromSegmentsFileName(fileName) <= currentGen && directory.fileLength(fileName) > 0) {//如果读到了老的segment,就throw
 70                 throw e;
 71               } else {
 72                 // Most likely we are opening an index that
 73                 // has an aborted "future" commit, so suppress
 74                 // exc in this case
 75                 sis = null;
 76               }
 77             }
 78             if (sis != null) {//如果读取没有出现异常
 79               final CommitPoint commitPoint = new CommitPoint(commitsToDelete, directory, sis);//创建commit指针对象
 80               if (sis.getGeneration() == segmentInfos.getGeneration()) {//如果读取的时与当前版本一致的segment,则赋值给当前的提交指针
 81                 currentCommitPoint = commitPoint;
 82               }
 83               commits.add(commitPoint);//加入commitPoint列表
 84               incRef(sis, true);//把当前sis加入IndexFileDeleter引用进行管理
 85               
 86               if (lastSegmentInfos == null || sis.getGeneration() > lastSegmentInfos.getGeneration()) {//如果读取到最新的sis,则赋值给lastSegmentInfos
 87                 lastSegmentInfos = sis;
 88               }
 89             }
 90           }
 91         }
 92       }
 93     }
 94 
 95     if (currentCommitPoint == null && currentSegmentsFile != null && initialIndexExists) {
 96       // We did not in fact see the segments_N file
 97       // corresponding to the segmentInfos that was passed
 98       // in.  Yet, it must exist, because our caller holds
 99       // the write lock.  This can happen when the directory
100       // listing was stale (eg when index accessed via NFS
101       // client with stale directory listing cache).  So we
102       // try now to explicitly open this commit point:
103       SegmentInfos sis = new SegmentInfos();
104       try {
105         sis.read(directory, currentSegmentsFile);
106       } catch (IOException e) {
107         throw new CorruptIndexException("failed to locate current segments_N file \"" + currentSegmentsFile + "\"");
108       }
109       if (infoStream.isEnabled("IFD")) {
110         infoStream.message("IFD", "forced open of current segments file " + segmentInfos.getSegmentsFileName());
111       }
112       currentCommitPoint = new CommitPoint(commitsToDelete, directory, sis);
113       commits.add(currentCommitPoint);
114       incRef(sis, true);//sis文件引用加1
115     }
116 
117     // We keep commits list in sorted order (oldest to newest):
118     CollectionUtil.timSort(commits);//从老到新排序
119 
120     // Now delete anything with ref count at 0.  These are
121     // presumably abandoned files eg due to crash of
122     // IndexWriter.
123     for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {//将refCount中文件引用count为0的文件直接删除
124       RefCount rc = entry.getValue();
125       final String fileName = entry.getKey();
126       if (0 == rc.count) {
127         if (infoStream.isEnabled("IFD")) {
128           infoStream.message("IFD", "init: removing unreferenced file \"" + fileName + "\"");
129         }
130         deleteFile(fileName);//删除详见下方
131       }
132     }
133 
134     // Finally, give policy a chance to remove things on
135     // startup:
136     policy.onInit(commits);//使用删除策略做最后一次检查,默认keepOnlyLastDeletionCommitPolicy,难道只有删除有事务?该策略会保留一次删除commit
137 
138     // Always protect the incoming segmentInfos since
139     // sometime it may not be the most recent commit
140     checkpoint(segmentInfos, false);//详情见140
141 
142     startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();
143 
144     deleteCommits();
145   }

 

第43行IndexFileDeleter中包含一个final的静态内部类,RefCount,来维护各个文件的引用数量,如何创建,接下

  /**
   * Tracks the reference count for a single index file:
   */
  final private static class RefCount {

    // fileName used only for better assert error messages
    final String fileName;
    boolean initDone;
    RefCount(String fileName) {
      this.fileName = fileName;
    }

    int count;//数量

    public int IncRef() {
      if (!initDone) {
        initDone = true;
      } else {
        assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-increment for file \"" + fileName + "\"";
      }
      return ++count;
    }

    public int DecRef() {
      assert count > 0: Thread.currentThread().getName() + ": RefCount is 0 pre-decrement for file \"" + fileName + "\"";
      return --count;
    }
  }

 

接43行代码

  private RefCount getRefCount(String fileName) {
    assert locked();
    RefCount rc;
    if (!refCounts.containsKey(fileName)) {//refCounts Hashmap保存文件引用数量
      rc = new RefCount(fileName);
      refCounts.put(fileName, rc);
    } else {
      rc = refCounts.get(fileName);
    }
    return rc;
  }

 

同样都是在IndexFileDeleter中

  // called only from assert
  private boolean locked() {
    return writer == null || Thread.holdsLock(writer);
  }

 

第130行

  void deleteFile(String fileName)
       throws IOException {
    assert locked();//拿到写锁
    try {
      if (infoStream.isEnabled("IFD")) {
        infoStream.message("IFD", "delete \"" + fileName + "\"");
      }
      directory.deleteFile(fileName);//调用Directory删除,代码如下
    } catch (IOException e) {  // if delete fails
      if (directory.fileExists(fileName)) {

        // Some operating systems (e.g. Windows) don‘t
        // permit a file to be deleted while it is opened
        // for read (e.g. by another process or thread). So
        // we assume that when a delete fails it is because
        // the file is open in another process, and queue
        // the file for subsequent deletion.

        if (infoStream.isEnabled("IFD")) {
          infoStream.message("IFD", "unable to remove file \"" + fileName + "\": " + e.toString() + "; Will re-try later.");
        }
        if (deletable == null) {
          deletable = new ArrayList<String>();
        }
        deletable.add(fileName);                // add to deletable 如果删除失败,将file全部放到可删除列表deletable,但是deletable会什么时候删除呢?
      }
    }
  }

 

FSDirectory

  /** Removes an existing file in the directory. */
  @Override
  public void deleteFile(String name) throws IOException {
    ensureOpen();
    File file = new File(directory, name);
    if (!file.delete())//删除
      throw new IOException("Cannot delete " + file);
    staleFiles.remove(name);//并且把名字放入staleFiles,同步set
  }

 

第140行,checkPoint

  /**
   * For definition of "check point" see IndexWriter comments:
   * "Clarification: Check Points (and commits)".
   *
   * Writer calls this when it has made a "consistent
   * change" to the index, meaning new files are written to
   * the index and the in-memory SegmentInfos have been
   * modified to point to those files.
   *
   * This may or may not be a commit (segments_N may or may
   * not have been written).
   *
   * We simply incref the files referenced by the new
   * SegmentInfos and decref the files we had previously
   * seen (if any).
   *
   * If this is a commit, we also call the policy to give it
   * a chance to remove other commits.  If any commits are
   * removed, we decref their files as well.
   */
  public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
    assert locked();

    assert Thread.holdsLock(writer);
    long t0 = 0;
    if (infoStream.isEnabled("IFD")) {
      t0 = System.nanoTime();
      infoStream.message("IFD", "now checkpoint \"" + writer.segString(writer.toLiveInfos(segmentInfos)) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
    }

    // Try again now to delete any previously un-deletable
    // files (because they were in use, on Windows):
    deletePendingFiles();

    // Incref the files:
    incRef(segmentInfos, isCommit);

    if (isCommit) {
      // Append to our commits list:
      commits.add(new CommitPoint(commitsToDelete, directory, segmentInfos));

      // Tell policy so it can remove commits:
      policy.onCommit(commits);

      // Decref files for commits that were deleted by the policy:
      deleteCommits();
    } else {
      // DecRef old files from the last checkpoint, if any:
      decRef(lastFiles);
      lastFiles.clear();

      // Save files so we can decr on next checkpoint/commit:
      lastFiles.addAll(segmentInfos.files(directory, false));
    }
    if (infoStream.isEnabled("IFD")) {
      long t1 = System.nanoTime();
      infoStream.message("IFD", ((t1-t0)/1000000) + " msec to checkpoint");
    }
  }

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。