• 删除的流程


    1. private void publishFlushedSegments(boolean forced) throws IOException {
    2. docWriter.purgeFlushTickets(
    3. forced,
    4. ticket -> {
    5. DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
    6. FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
    7. ticket.markPublished();
    8. // 全局的删除
    9. if (newSegment == null) { // this is a flushed global deletes package - not a segments
    10. if (bufferedUpdates != null && bufferedUpdates.any()) { // TODO why can this be null?
    11. // 把删除操作放在eventQueue里面
    12. publishFrozenUpdates(bufferedUpdates);
    13. if (infoStream.isEnabled("IW")) {
    14. infoStream.message("IW", "flush: push buffered updates: " + bufferedUpdates);
    15. }
    16. }
    17. }
    18. // segment内部的删除
    19. else {
    20. assert newSegment.segmentInfo != null;
    21. if (infoStream.isEnabled("IW")) {
    22. infoStream.message(
    23. "IW", "publishFlushedSegment seg-private updates=" + newSegment.segmentUpdates);
    24. }
    25. if (newSegment.segmentUpdates != null && infoStream.isEnabled("DW")) {
    26. infoStream.message(
    27. "IW", "flush: push buffered seg private updates: " + newSegment.segmentUpdates);
    28. }
    29. // now publish!
    30. publishFlushedSegment(
    31. newSegment.segmentInfo,
    32. newSegment.fieldInfos,
    33. // newSegment删除
    34. newSegment.segmentUpdates,
    35. //全局删除
    36. bufferedUpdates,
    37. newSegment.sortMap);
    38. }
    39. });
    40. }

    1. private FlushedSegment(
    2. InfoStream infoStream,
    3. SegmentCommitInfo segmentInfo,
    4. FieldInfos fieldInfos,
    5. BufferedUpdates segmentUpdates,
    6. FixedBitSet liveDocs,
    7. int delCount,
    8. Sorter.DocMap sortMap) {
    9. this.segmentInfo = segmentInfo;
    10. this.fieldInfos = fieldInfos;
    11. this.segmentUpdates =
    12. segmentUpdates != null && segmentUpdates.any()
    13. // frozenBufferedUpdates 的初始化
    14. ? new FrozenBufferedUpdates(infoStream, segmentUpdates, segmentInfo)
    15. : null;
    16. this.liveDocs = liveDocs;
    17. this.delCount = delCount;
    18. this.sortMap = sortMap;
    19. }
    1. private long finishDocuments(DocumentsWriterDeleteQueue.Node<?> deleteNode, int docIdUpTo) {
    2. /*
    3. * here we actually finish the document in two steps 1. push the delete into
    4. * the queue and update our slice. 2. increment the DWPT private document
    5. * id.
    6. *
    7. * the updated slice we get from 1. holds all the deletes that have occurred
    8. * since we updated the slice the last time.
    9. */
    10. // Apply delTerm only after all indexing has
    11. // succeeded, but apply it only to docs prior to when
    12. // this batch started:
    13. long seqNo;
    14. if (deleteNode != null) {
    15. seqNo = deleteQueue.add(deleteNode, deleteSlice);
    16. assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
    17. deleteSlice.apply(pendingUpdates, docIdUpTo);
    18. return seqNo;
    19. } else {
    20. seqNo = deleteQueue.updateSlice(deleteSlice);
    21. if (seqNo < 0) {
    22. seqNo = -seqNo;
    23. // 更新pendingUpdates
    24. deleteSlice.apply(pendingUpdates, docIdUpTo);
    25. } else {
    26. deleteSlice.reset();
    27. }
    28. }
    29. return seqNo;
    30. }

    org.apache.lucene.index.IndexWriter#forceApply 

    segStates = openSegmentStates(infos, seenSegments, updates.delGen());

    把每个FrozenBufferedUpdates都在所有的segments都走一遍 

    1. final boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
    2. if (updates.tryLock()) {
    3. try {
    4. forceApply(updates);
    5. return true;
    6. } finally {
    7. updates.unlock();
    8. }
    9. }
    10. return false;
    11. }

    org.apache.lucene.index.IndexWriter#publishFlushedSegment 

    1. /**
    2. * Atomically adds the segment private delete packet and publishes the flushed segments
    3. * SegmentInfo to the index writer.
    4. */
    5. private synchronized void publishFlushedSegment(
    6. SegmentCommitInfo newSegment,
    7. FieldInfos fieldInfos,
    8. FrozenBufferedUpdates packet,
    9. FrozenBufferedUpdates globalPacket,
    10. Sorter.DocMap sortMap)
    11. throws IOException {
    12. boolean published = false;
    13. try {
    14. // Lock order IW -> BDS
    15. ensureOpen(false);
    16. if (infoStream.isEnabled("IW")) {
    17. infoStream.message("IW", "publishFlushedSegment " + newSegment);
    18. }
    19. if (globalPacket != null && globalPacket.any()) {
    20. publishFrozenUpdates(globalPacket);
    21. }
    22. // Publishing the segment must be sync'd on IW -> BDS to make the sure
    23. // that no merge prunes away the seg. private delete packet
    24. final long nextGen;
    25. if (packet != null && packet.any()) {
    26. // 生成新的gen
    27. nextGen = publishFrozenUpdates(packet);
    28. } else {
    29. // Since we don't have a delete packet to apply we can get a new
    30. // generation right away
    31. nextGen = bufferedUpdatesStream.getNextGen();
    32. // No deletes/updates here, so marked finished immediately:
    33. bufferedUpdatesStream.finishedSegment(nextGen);
    34. }
    35. if (infoStream.isEnabled("IW")) {
    36. infoStream.message(
    37. "IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
    38. }
    39. newSegment.setBufferedDeletesGen(nextGen);
    40. segmentInfos.add(newSegment);
    41. published = true;
    42. checkpoint();
    43. if (packet != null && packet.any() && sortMap != null) {
    44. // TODO: not great we do this heavyish op while holding IW's monitor lock,
    45. // but it only applies if you are using sorted indices and updating doc values:
    46. ReadersAndUpdates rld = getPooledInstance(newSegment, true);
    47. rld.sortMap = sortMap;
    48. // DON't release this ReadersAndUpdates we need to stick with that sortMap
    49. }
    50. FieldInfo fieldInfo =
    51. fieldInfos.fieldInfo(
    52. config.softDeletesField); // will return null if no soft deletes are present
    53. // this is a corner case where documents delete them-self with soft deletes. This is used to
    54. // build delete tombstones etc. in this case we haven't seen any updates to the DV in this
    55. // fresh flushed segment.
    56. // if we have seen updates the update code checks if the segment is fully deleted.
    57. boolean hasInitialSoftDeleted =
    58. (fieldInfo != null
    59. && fieldInfo.getDocValuesGen() == -1
    60. && fieldInfo.getDocValuesType() != DocValuesType.NONE);
    61. final boolean isFullyHardDeleted = newSegment.getDelCount() == newSegment.info.maxDoc();
    62. // we either have a fully hard-deleted segment or one or more docs are soft-deleted. In both
    63. // cases we need
    64. // to go and check if they are fully deleted. This has the nice side-effect that we now have
    65. // accurate numbers
    66. // for the soft delete right after we flushed to disk.
    67. if (hasInitialSoftDeleted || isFullyHardDeleted) {
    68. // this operation is only really executed if needed an if soft-deletes are not configured it
    69. // only be executed
    70. // if we deleted all docs in this newly flushed segment.
    71. ReadersAndUpdates rld = getPooledInstance(newSegment, true);
    72. try {
    73. if (isFullyDeleted(rld)) {
    74. dropDeletedSegment(newSegment);
    75. checkpoint();
    76. }
    77. } finally {
    78. release(rld);
    79. }
    80. }
    81. } finally {
    82. if (published == false) {
    83. adjustPendingNumDocs(-newSegment.info.maxDoc());
    84. }
    85. flushCount.incrementAndGet();
    86. doAfterFlush();
    87. }
    88. }
    gen的大小是按照queue的顺序来的
    

    org.apache.lucene.index.DocumentsWriterFlushQueue#queue

    private final Queue<FlushTicket> queue = new ArrayDeque<>();

    org.apache.lucene.index.IndexWriter#forceApply 

    1. /** Opens SegmentReader and inits SegmentState for each segment. */
    2. private BufferedUpdatesStream.SegmentState[] openSegmentStates(
    3. List<SegmentCommitInfo> infos, Set<SegmentCommitInfo> alreadySeenSegments, long delGen)
    4. throws IOException {
    5. List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
    6. try {
    7. for (SegmentCommitInfo info : infos) {
    8. if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
    9. segStates.add(
    10. new BufferedUpdatesStream.SegmentState(
    11. getPooledInstance(info, true), this::release, info));
    12. alreadySeenSegments.add(info);
    13. }
    14. }
    15. } catch (Throwable t) {
    16. try {
    17. IOUtils.close(segStates);
    18. } catch (Throwable t1) {
    19. t.addSuppressed(t1);
    20. }
    21. throw t;
    22. }
    23. return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
    24. }

    org.apache.lucene.index.IndexWriter#getInfosToApply 

    1. /**
    2. * Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or
    3. * null if the private segment was already merged away.
    4. */
    5. private synchronized List<SegmentCommitInfo> getInfosToApply(FrozenBufferedUpdates updates) {
    6. final List<SegmentCommitInfo> infos;
    7. if (updates.privateSegment != null) {
    8. if (segmentInfos.contains(updates.privateSegment)) {
    9. infos = Collections.singletonList(updates.privateSegment);
    10. } else {
    11. if (infoStream.isEnabled("BD")) {
    12. infoStream.message("BD", "private segment already gone; skip processing updates");
    13. }
    14. infos = null;
    15. }
    16. } else {
    17. infos = segmentInfos.asList();
    18. }
    19. return infos;
    20. }

    1.

    org.apache.lucene.index.DocumentsWriter#applyAllDeletes

    org.apache.lucene.index.DocumentsWriterFlushQueue#addDeletes

    2.

    org.apache.lucene.index.DocumentsWriter#doFlush

    org.apache.lucene.index.DocumentsWriterFlushQueue#addFlushTicket

    以上会调用

    final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);

    生成FlushTicket

    放入

    org.apache.lucene.index.DocumentsWriterFlushQueue#queue

    org.apache.lucene.index.IndexWriter#publishFlushedSegments  -》 
    org.apache.lucene.index.DocumentsWriterFlushQueue#forcePurge =〉

    org.apache.lucene.index.DocumentsWriterFlushQueue#innerPurge

    org.apache.lucene.index.IndexWriter#publishFrozenUpdates

    1. private synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
    2. assert packet != null && packet.any();
    3. long nextGen = bufferedUpdatesStream.push(packet);
    4. // Do this as an event so it applies higher in the stack when we are not holding
    5. // DocumentsWriterFlushQueue.purgeLock:
    6. eventQueue.add(
    7. w -> {
    8. try {
    9. // we call tryApply here since we don't want to block if a refresh or a flush is already
    10. // applying the
    11. // packet. The flush will retry this packet anyway to ensure all of them are applied
    12. tryApply(packet);
    13. } catch (Throwable t) {
    14. try {
    15. w.onTragicEvent(t, "applyUpdatesPacket");
    16. } catch (Throwable t1) {
    17. t.addSuppressed(t1);
    18. }
    19. throw t;
    20. }
    21. w.flushDeletesCount.incrementAndGet();
    22. });
    23. return nextGen;
    24. }

    被放入eventQueue中

    执行

    org.apache.lucene.index.IndexWriter#tryApply

    1. final boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
    2. if (updates.tryLock()) {
    3. try {
    4. forceApply(updates);
    5. return true;
    6. } finally {
    7. updates.unlock();
    8. }
    9. }
    10. return false;
    11. }

     org.apache.lucene.index.IndexWriter#forceApply

    1. /**
    2. * Translates a frozen packet of delete term/query, or doc values updates, into their actual
    3. * docIDs in the index, and applies the change. This is a heavy operation and is done concurrently
    4. * by incoming indexing threads.
    5. */
    6. final void forceApply(FrozenBufferedUpdates updates) throws IOException {
    7. updates.lock();
    8. try {
    9. if (updates.isApplied()) {
    10. // already done
    11. return;
    12. }
    13. long startNS = System.nanoTime();
    14. assert updates.any();
    15. Set<SegmentCommitInfo> seenSegments = new HashSet<>();
    16. int iter = 0;
    17. int totalSegmentCount = 0;
    18. long totalDelCount = 0;
    19. boolean finished = false;
    20. // Optimistic concurrency: assume we are free to resolve the deletes against all current
    21. // segments in the index, despite that
    22. // concurrent merges are running. Once we are done, we check to see if a merge completed
    23. // while we were running. If so, we must retry
    24. // resolving against the newly merged segment(s). Eventually no merge finishes while we were
    25. // running and we are done.
    26. while (true) {
    27. String messagePrefix;
    28. if (iter == 0) {
    29. messagePrefix = "";
    30. } else {
    31. messagePrefix = "iter " + iter;
    32. }
    33. long iterStartNS = System.nanoTime();
    34. long mergeGenStart = mergeFinishedGen.get();
    35. Set<String> delFiles = new HashSet<>();
    36. BufferedUpdatesStream.SegmentState[] segStates;
    37. synchronized (this) {
    38. List<SegmentCommitInfo> infos = getInfosToApply(updates);
    39. if (infos == null) {
    40. break;
    41. }
    42. for (SegmentCommitInfo info : infos) {
    43. delFiles.addAll(info.files());
    44. }
    45. // Must open while holding IW lock so that e.g. segments are not merged
    46. // away, dropped from 100% deletions, etc., before we can open the readers
    47. segStates = openSegmentStates(infos, seenSegments, updates.delGen());
    48. if (segStates.length == 0) {
    49. if (infoStream.isEnabled("BD")) {
    50. infoStream.message("BD", "packet matches no segments");
    51. }
    52. break;
    53. }
    54. if (infoStream.isEnabled("BD")) {
    55. infoStream.message(
    56. "BD",
    57. String.format(
    58. Locale.ROOT,
    59. messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
    60. this,
    61. segStates.length,
    62. mergeGenStart));
    63. }
    64. totalSegmentCount += segStates.length;
    65. // Important, else IFD may try to delete our files while we are still using them,
    66. // if e.g. a merge finishes on some of the segments we are resolving on:
    67. deleter.incRef(delFiles);
    68. }
    69. AtomicBoolean success = new AtomicBoolean();
    70. long delCount;
    71. try (Closeable finalizer = () -> finishApply(segStates, success.get(), delFiles)) {
    72. assert finalizer != null; // access the finalizer to prevent a warning
    73. // don't hold IW monitor lock here so threads are free concurrently resolve
    74. // deletes/updates:
    75. delCount = updates.apply(segStates);
    76. success.set(true);
    77. }
    78. // Since we just resolved some more deletes/updates, now is a good time to write them:
    79. writeSomeDocValuesUpdates();
    80. // It's OK to add this here, even if the while loop retries, because delCount only includes
    81. // newly
    82. // deleted documents, on the segments we didn't already do in previous iterations:
    83. totalDelCount += delCount;
    84. if (infoStream.isEnabled("BD")) {
    85. infoStream.message(
    86. "BD",
    87. String.format(
    88. Locale.ROOT,
    89. messagePrefix
    90. + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
    91. this,
    92. segStates.length,
    93. delCount,
    94. (System.nanoTime() - iterStartNS) / 1000000000.));
    95. }
    96. if (updates.privateSegment != null) {
    97. // No need to retry for a segment-private packet: the merge that folds in our private
    98. // segment already waits for all deletes to
    99. // be applied before it kicks off, so this private segment must already not be in the set
    100. // of merging segments
    101. break;
    102. }
    103. // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if
    104. // we exit, we know mergeCommit will succeed
    105. // in pulling all our delGens into a merge:
    106. synchronized (this) {
    107. long mergeGenCur = mergeFinishedGen.get();
    108. if (mergeGenCur == mergeGenStart) {
    109. // Must do this while still holding IW lock else a merge could finish and skip carrying
    110. // over our updates:
    111. // Record that this packet is finished:
    112. bufferedUpdatesStream.finished(updates);
    113. finished = true;
    114. // No merge finished while we were applying, so we are done!
    115. break;
    116. }
    117. }
    118. if (infoStream.isEnabled("BD")) {
    119. infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
    120. }
    121. // A merge completed while we were running. In this case, that merge may have picked up
    122. // some of the updates we did, but not
    123. // necessarily all of them, so we cycle again, re-applying all our updates to the newly
    124. // merged segment.
    125. iter++;
    126. }
    127. if (finished == false) {
    128. // Record that this packet is finished:
    129. bufferedUpdatesStream.finished(updates);
    130. }
    131. if (infoStream.isEnabled("BD")) {
    132. String message =
    133. String.format(
    134. Locale.ROOT,
    135. "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
    136. this,
    137. totalSegmentCount,
    138. totalDelCount,
    139. (System.nanoTime() - startNS) / 1000000000.);
    140. if (iter > 0) {
    141. message += "; " + (iter + 1) + " iters due to concurrent merges";
    142. }
    143. message += "; " + bufferedUpdatesStream.getPendingUpdatesCount() + " packets remain";
    144. infoStream.message("BD", message);
    145. }
    146. } finally {
    147. updates.unlock();
    148. }
    149. }

    会调用到下面两个方法: 

    org.apache.lucene.index.FrozenBufferedUpdates#apply

    1. /**
    2. * Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the
    3. * index, returning the number of new deleted or updated documents.
    4. */
    5. long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
    6. assert applyLock.isHeldByCurrentThread();
    7. if (delGen == -1) {
    8. // we were not yet pushed
    9. throw new IllegalArgumentException(
    10. "gen is not yet set; call BufferedUpdatesStream.push first");
    11. }
    12. assert applied.getCount() != 0;
    13. if (privateSegment != null) {
    14. assert segStates.length == 1;
    15. assert privateSegment == segStates[0].reader.getOriginalSegmentInfo();
    16. }
    17. totalDelCount += applyTermDeletes(segStates);
    18. totalDelCount += applyQueryDeletes(segStates);
    19. totalDelCount += applyDocValuesUpdates(segStates);
    20. return totalDelCount;
    21. }

    org.apache.lucene.index.IndexWriter#openSegmentStates

    1. /** Opens SegmentReader and inits SegmentState for each segment. */
    2. private BufferedUpdatesStream.SegmentState[] openSegmentStates(
    3. List<SegmentCommitInfo> infos, Set<SegmentCommitInfo> alreadySeenSegments, long delGen)
    4. throws IOException {
    5. List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
    6. try {
    7. for (SegmentCommitInfo info : infos) {
    8. if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
    9. segStates.add(
    10. new BufferedUpdatesStream.SegmentState(
    11. getPooledInstance(info, true), this::release, info));
    12. alreadySeenSegments.add(info);
    13. }
    14. }
    15. } catch (Throwable t) {
    16. try {
    17. IOUtils.close(segStates);
    18. } catch (Throwable t1) {
    19. t.addSuppressed(t1);
    20. }
    21. throw t;
    22. }
    23. return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
    24. }

  • 相关阅读:
    Adobe Acrobat Reader 中的漏洞
    网络和Linux网络_2(套接字编程)socket+UDP网络通信代码
    SpringBoot集成Tomcat服务
    17-数据结构-查找-(顺序、折半、分块)
    Jasypt加解密、信息脱敏
    Scala的隐式转换
    mysql主从库Slave_SQL_Running: No问题经验分享
    MySQL——基本查询语句
    js实现短信验证码一键登录
    【每日一题】倍数求和
  • 原文地址:https://blog.csdn.net/chuanyangwang/article/details/125555345