- private void publishFlushedSegments(boolean forced) throws IOException {
- docWriter.purgeFlushTickets(
- forced,
- ticket -> {
- DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
- FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
- ticket.markPublished();
- // 全局的删除
- if (newSegment == null) { // this is a flushed global deletes package - not a segments
- if (bufferedUpdates != null && bufferedUpdates.any()) { // TODO why can this be null?
- // 把删除操作放在eventQueue里面
- publishFrozenUpdates(bufferedUpdates);
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "flush: push buffered updates: " + bufferedUpdates);
- }
- }
- }
- // segment内部的删除
- else {
- assert newSegment.segmentInfo != null;
- if (infoStream.isEnabled("IW")) {
- infoStream.message(
- "IW", "publishFlushedSegment seg-private updates=" + newSegment.segmentUpdates);
- }
- if (newSegment.segmentUpdates != null && infoStream.isEnabled("DW")) {
- infoStream.message(
- "IW", "flush: push buffered seg private updates: " + newSegment.segmentUpdates);
- }
- // now publish!
- publishFlushedSegment(
- newSegment.segmentInfo,
- newSegment.fieldInfos,
- // newSegment删除
- newSegment.segmentUpdates,
- //全局删除
- bufferedUpdates,
- newSegment.sortMap);
- }
- });
- }
- private FlushedSegment(
- InfoStream infoStream,
- SegmentCommitInfo segmentInfo,
- FieldInfos fieldInfos,
- BufferedUpdates segmentUpdates,
- FixedBitSet liveDocs,
- int delCount,
- Sorter.DocMap sortMap) {
- this.segmentInfo = segmentInfo;
- this.fieldInfos = fieldInfos;
- this.segmentUpdates =
- segmentUpdates != null && segmentUpdates.any()
- // frozenBufferedUpdates 的初始化
- ? new FrozenBufferedUpdates(infoStream, segmentUpdates, segmentInfo)
- : null;
- this.liveDocs = liveDocs;
- this.delCount = delCount;
- this.sortMap = sortMap;
- }
- private long finishDocuments(DocumentsWriterDeleteQueue.Node<?> deleteNode, int docIdUpTo) {
- /*
- * here we actually finish the document in two steps 1. push the delete into
- * the queue and update our slice. 2. increment the DWPT private document
- * id.
- *
- * the updated slice we get from 1. holds all the deletes that have occurred
- * since we updated the slice the last time.
- */
- // Apply delTerm only after all indexing has
- // succeeded, but apply it only to docs prior to when
- // this batch started:
- long seqNo;
- if (deleteNode != null) {
- seqNo = deleteQueue.add(deleteNode, deleteSlice);
- assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
- deleteSlice.apply(pendingUpdates, docIdUpTo);
- return seqNo;
- } else {
- seqNo = deleteQueue.updateSlice(deleteSlice);
- if (seqNo < 0) {
- seqNo = -seqNo;
- // 更新pendingUpdates
- deleteSlice.apply(pendingUpdates, docIdUpTo);
- } else {
- deleteSlice.reset();
- }
- }
-
- return seqNo;
- }
org.apache.lucene.index.IndexWriter#forceApply
segStates = openSegmentStates(infos, seenSegments, updates.delGen());
把每个FrozenBufferedUpdates都在所有的segments都走一遍
- final boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
- if (updates.tryLock()) {
- try {
- forceApply(updates);
- return true;
- } finally {
- updates.unlock();
- }
- }
- return false;
- }
org.apache.lucene.index.IndexWriter#publishFlushedSegment
- /**
- * Atomically adds the segment private delete packet and publishes the flushed segments
- * SegmentInfo to the index writer.
- */
- private synchronized void publishFlushedSegment(
- SegmentCommitInfo newSegment,
- FieldInfos fieldInfos,
- FrozenBufferedUpdates packet,
- FrozenBufferedUpdates globalPacket,
- Sorter.DocMap sortMap)
- throws IOException {
- boolean published = false;
- try {
- // Lock order IW -> BDS
- ensureOpen(false);
-
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "publishFlushedSegment " + newSegment);
- }
-
- if (globalPacket != null && globalPacket.any()) {
- publishFrozenUpdates(globalPacket);
- }
-
- // Publishing the segment must be sync'd on IW -> BDS to make the sure
- // that no merge prunes away the seg. private delete packet
- final long nextGen;
- if (packet != null && packet.any()) {
- // 生成新的gen
- nextGen = publishFrozenUpdates(packet);
- } else {
- // Since we don't have a delete packet to apply we can get a new
- // generation right away
- nextGen = bufferedUpdatesStream.getNextGen();
- // No deletes/updates here, so marked finished immediately:
- bufferedUpdatesStream.finishedSegment(nextGen);
- }
- if (infoStream.isEnabled("IW")) {
- infoStream.message(
- "IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
- }
- newSegment.setBufferedDeletesGen(nextGen);
- segmentInfos.add(newSegment);
- published = true;
- checkpoint();
- if (packet != null && packet.any() && sortMap != null) {
- // TODO: not great we do this heavyish op while holding IW's monitor lock,
- // but it only applies if you are using sorted indices and updating doc values:
- ReadersAndUpdates rld = getPooledInstance(newSegment, true);
- rld.sortMap = sortMap;
- // DON't release this ReadersAndUpdates we need to stick with that sortMap
- }
- FieldInfo fieldInfo =
- fieldInfos.fieldInfo(
- config.softDeletesField); // will return null if no soft deletes are present
- // this is a corner case where documents delete them-self with soft deletes. This is used to
- // build delete tombstones etc. in this case we haven't seen any updates to the DV in this
- // fresh flushed segment.
- // if we have seen updates the update code checks if the segment is fully deleted.
- boolean hasInitialSoftDeleted =
- (fieldInfo != null
- && fieldInfo.getDocValuesGen() == -1
- && fieldInfo.getDocValuesType() != DocValuesType.NONE);
- final boolean isFullyHardDeleted = newSegment.getDelCount() == newSegment.info.maxDoc();
- // we either have a fully hard-deleted segment or one or more docs are soft-deleted. In both
- // cases we need
- // to go and check if they are fully deleted. This has the nice side-effect that we now have
- // accurate numbers
- // for the soft delete right after we flushed to disk.
- if (hasInitialSoftDeleted || isFullyHardDeleted) {
- // this operation is only really executed if needed an if soft-deletes are not configured it
- // only be executed
- // if we deleted all docs in this newly flushed segment.
- ReadersAndUpdates rld = getPooledInstance(newSegment, true);
- try {
- if (isFullyDeleted(rld)) {
- dropDeletedSegment(newSegment);
- checkpoint();
- }
- } finally {
- release(rld);
- }
- }
-
- } finally {
- if (published == false) {
- adjustPendingNumDocs(-newSegment.info.maxDoc());
- }
- flushCount.incrementAndGet();
- doAfterFlush();
- }
- }
gen的大小是按照queue的顺序来的
org.apache.lucene.index.DocumentsWriterFlushQueue#queue
private final Queue<FlushTicket> queue = new ArrayDeque<>();
org.apache.lucene.index.IndexWriter#forceApply
- /** Opens SegmentReader and inits SegmentState for each segment. */
- private BufferedUpdatesStream.SegmentState[] openSegmentStates(
- List<SegmentCommitInfo> infos, Set<SegmentCommitInfo> alreadySeenSegments, long delGen)
- throws IOException {
- List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
- try {
- for (SegmentCommitInfo info : infos) {
- if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
- segStates.add(
- new BufferedUpdatesStream.SegmentState(
- getPooledInstance(info, true), this::release, info));
- alreadySeenSegments.add(info);
- }
- }
- } catch (Throwable t) {
- try {
- IOUtils.close(segStates);
- } catch (Throwable t1) {
- t.addSuppressed(t1);
- }
- throw t;
- }
-
- return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
- }
org.apache.lucene.index.IndexWriter#getInfosToApply
- /**
- * Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or
- * null if the private segment was already merged away.
- */
- private synchronized List<SegmentCommitInfo> getInfosToApply(FrozenBufferedUpdates updates) {
- final List<SegmentCommitInfo> infos;
- if (updates.privateSegment != null) {
- if (segmentInfos.contains(updates.privateSegment)) {
- infos = Collections.singletonList(updates.privateSegment);
- } else {
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "private segment already gone; skip processing updates");
- }
- infos = null;
- }
- } else {
- infos = segmentInfos.asList();
- }
- return infos;
- }
1.
org.apache.lucene.index.DocumentsWriter#applyAllDeletes
org.apache.lucene.index.DocumentsWriterFlushQueue#addDeletes
2.
org.apache.lucene.index.DocumentsWriter#doFlush
org.apache.lucene.index.DocumentsWriterFlushQueue#addFlushTicket
以上会调用
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
生成FlushTicket
放入
org.apache.lucene.index.DocumentsWriterFlushQueue#queue
中
org.apache.lucene.index.IndexWriter#publishFlushedSegments -》
org.apache.lucene.index.DocumentsWriterFlushQueue#forcePurge =〉
org.apache.lucene.index.DocumentsWriterFlushQueue#innerPurge
org.apache.lucene.index.IndexWriter#publishFrozenUpdates
- private synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
- assert packet != null && packet.any();
- long nextGen = bufferedUpdatesStream.push(packet);
- // Do this as an event so it applies higher in the stack when we are not holding
- // DocumentsWriterFlushQueue.purgeLock:
- eventQueue.add(
- w -> {
- try {
- // we call tryApply here since we don't want to block if a refresh or a flush is already
- // applying the
- // packet. The flush will retry this packet anyway to ensure all of them are applied
- tryApply(packet);
- } catch (Throwable t) {
- try {
- w.onTragicEvent(t, "applyUpdatesPacket");
- } catch (Throwable t1) {
- t.addSuppressed(t1);
- }
- throw t;
- }
- w.flushDeletesCount.incrementAndGet();
- });
- return nextGen;
- }
被放入eventQueue中
执行
org.apache.lucene.index.IndexWriter#tryApply
- final boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
- if (updates.tryLock()) {
- try {
- forceApply(updates);
- return true;
- } finally {
- updates.unlock();
- }
- }
- return false;
- }
org.apache.lucene.index.IndexWriter#forceApply
- /**
- * Translates a frozen packet of delete term/query, or doc values updates, into their actual
- * docIDs in the index, and applies the change. This is a heavy operation and is done concurrently
- * by incoming indexing threads.
- */
- final void forceApply(FrozenBufferedUpdates updates) throws IOException {
- updates.lock();
- try {
- if (updates.isApplied()) {
- // already done
- return;
- }
- long startNS = System.nanoTime();
-
- assert updates.any();
-
- Set<SegmentCommitInfo> seenSegments = new HashSet<>();
-
- int iter = 0;
- int totalSegmentCount = 0;
- long totalDelCount = 0;
-
- boolean finished = false;
-
- // Optimistic concurrency: assume we are free to resolve the deletes against all current
- // segments in the index, despite that
- // concurrent merges are running. Once we are done, we check to see if a merge completed
- // while we were running. If so, we must retry
- // resolving against the newly merged segment(s). Eventually no merge finishes while we were
- // running and we are done.
- while (true) {
- String messagePrefix;
- if (iter == 0) {
- messagePrefix = "";
- } else {
- messagePrefix = "iter " + iter;
- }
-
- long iterStartNS = System.nanoTime();
-
- long mergeGenStart = mergeFinishedGen.get();
-
- Set<String> delFiles = new HashSet<>();
- BufferedUpdatesStream.SegmentState[] segStates;
-
- synchronized (this) {
- List<SegmentCommitInfo> infos = getInfosToApply(updates);
- if (infos == null) {
- break;
- }
-
- for (SegmentCommitInfo info : infos) {
- delFiles.addAll(info.files());
- }
-
- // Must open while holding IW lock so that e.g. segments are not merged
- // away, dropped from 100% deletions, etc., before we can open the readers
- segStates = openSegmentStates(infos, seenSegments, updates.delGen());
-
- if (segStates.length == 0) {
-
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "packet matches no segments");
- }
- break;
- }
-
- if (infoStream.isEnabled("BD")) {
- infoStream.message(
- "BD",
- String.format(
- Locale.ROOT,
- messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
- this,
- segStates.length,
- mergeGenStart));
- }
-
- totalSegmentCount += segStates.length;
-
- // Important, else IFD may try to delete our files while we are still using them,
- // if e.g. a merge finishes on some of the segments we are resolving on:
- deleter.incRef(delFiles);
- }
-
- AtomicBoolean success = new AtomicBoolean();
- long delCount;
- try (Closeable finalizer = () -> finishApply(segStates, success.get(), delFiles)) {
- assert finalizer != null; // access the finalizer to prevent a warning
- // don't hold IW monitor lock here so threads are free concurrently resolve
- // deletes/updates:
- delCount = updates.apply(segStates);
- success.set(true);
- }
-
- // Since we just resolved some more deletes/updates, now is a good time to write them:
- writeSomeDocValuesUpdates();
-
- // It's OK to add this here, even if the while loop retries, because delCount only includes
- // newly
- // deleted documents, on the segments we didn't already do in previous iterations:
- totalDelCount += delCount;
-
- if (infoStream.isEnabled("BD")) {
- infoStream.message(
- "BD",
- String.format(
- Locale.ROOT,
- messagePrefix
- + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
- this,
- segStates.length,
- delCount,
- (System.nanoTime() - iterStartNS) / 1000000000.));
- }
- if (updates.privateSegment != null) {
- // No need to retry for a segment-private packet: the merge that folds in our private
- // segment already waits for all deletes to
- // be applied before it kicks off, so this private segment must already not be in the set
- // of merging segments
-
- break;
- }
-
- // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if
- // we exit, we know mergeCommit will succeed
- // in pulling all our delGens into a merge:
- synchronized (this) {
- long mergeGenCur = mergeFinishedGen.get();
-
- if (mergeGenCur == mergeGenStart) {
-
- // Must do this while still holding IW lock else a merge could finish and skip carrying
- // over our updates:
-
- // Record that this packet is finished:
- bufferedUpdatesStream.finished(updates);
-
- finished = true;
-
- // No merge finished while we were applying, so we are done!
- break;
- }
- }
-
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
- }
-
- // A merge completed while we were running. In this case, that merge may have picked up
- // some of the updates we did, but not
- // necessarily all of them, so we cycle again, re-applying all our updates to the newly
- // merged segment.
-
- iter++;
- }
-
- if (finished == false) {
- // Record that this packet is finished:
- bufferedUpdatesStream.finished(updates);
- }
-
- if (infoStream.isEnabled("BD")) {
- String message =
- String.format(
- Locale.ROOT,
- "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
- this,
- totalSegmentCount,
- totalDelCount,
- (System.nanoTime() - startNS) / 1000000000.);
- if (iter > 0) {
- message += "; " + (iter + 1) + " iters due to concurrent merges";
- }
- message += "; " + bufferedUpdatesStream.getPendingUpdatesCount() + " packets remain";
- infoStream.message("BD", message);
- }
- } finally {
- updates.unlock();
- }
- }
会调用到下面两个方法:
org.apache.lucene.index.FrozenBufferedUpdates#apply
- /**
- * Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the
- * index, returning the number of new deleted or updated documents.
- */
- long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
- assert applyLock.isHeldByCurrentThread();
- if (delGen == -1) {
- // we were not yet pushed
- throw new IllegalArgumentException(
- "gen is not yet set; call BufferedUpdatesStream.push first");
- }
-
- assert applied.getCount() != 0;
-
- if (privateSegment != null) {
- assert segStates.length == 1;
- assert privateSegment == segStates[0].reader.getOriginalSegmentInfo();
- }
-
- totalDelCount += applyTermDeletes(segStates);
- totalDelCount += applyQueryDeletes(segStates);
- totalDelCount += applyDocValuesUpdates(segStates);
-
- return totalDelCount;
- }
org.apache.lucene.index.IndexWriter#openSegmentStates
- /** Opens SegmentReader and inits SegmentState for each segment. */
- private BufferedUpdatesStream.SegmentState[] openSegmentStates(
- List<SegmentCommitInfo> infos, Set<SegmentCommitInfo> alreadySeenSegments, long delGen)
- throws IOException {
- List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
- try {
- for (SegmentCommitInfo info : infos) {
- if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
- segStates.add(
- new BufferedUpdatesStream.SegmentState(
- getPooledInstance(info, true), this::release, info));
- alreadySeenSegments.add(info);
- }
- }
- } catch (Throwable t) {
- try {
- IOUtils.close(segStates);
- } catch (Throwable t1) {
- t.addSuppressed(t1);
- }
- throw t;
- }
-
- return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
- }