private void commitPerThreadBytes(ThreadState perThread) { final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed; perThread.bytesUsed += delta; /* * We need to differentiate here if we are pending since setFlushPending * moves the perThread memory to the flushBytes and we could be set to * pending during a delete */ if (perThread.flushPending) { flushBytes += delta; } else { activeBytes += delta; } assert updatePeaks(delta); }
DocumentsWriterPerThread nextPendingFlush() { int numPending; boolean fullFlush; synchronized (this) { final DocumentsWriterPerThread poll; if ((poll = flushQueue.poll()) != null) { updateStallState(); return poll; } fullFlush = this.fullFlush; numPending = this.numPending; } if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush final int limit = perThreadPool.getActiveThreadState(); for (int i = 0; i < limit && numPending > 0; i++) { final ThreadState next = perThreadPool.getThreadState(i); if (next.flushPending) { final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next); if (dwpt != null) { return dwpt; } } } } return null; }
private Iterator<ThreadState> getPerThreadsIterator(final int upto) { return new Iterator<ThreadState>() { int i = 0; @Override public boolean hasNext() { return i < upto; } @Override public ThreadState next() { return perThreadPool.getThreadState(i++); } @Override public void remove() { throw new UnsupportedOperationException("remove() not supported."); } }; }
ThreadState obtainAndLock() { final ThreadState perThread = perThreadPool.getAndLock(Thread .currentThread(), documentsWriter); boolean success = false; try { if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: addFlushableState(perThread); } success = true; // simply return the ThreadState even in a flush all case sine we already hold the lock return perThread; } finally { if (!success) { // make sure we unlock if this fails perThreadPool.release(perThread); } } }
void addFlushableState(ThreadState perThread) { if (infoStream.isEnabled("DWFC")) { infoStream.message("DWFC", "addFlushableState " + perThread.dwpt); } final DocumentsWriterPerThread dwpt = perThread.dwpt; assert perThread.isHeldByCurrentThread(); assert perThread.isInitialized(); assert fullFlush; assert dwpt.deleteQueue != documentsWriter.deleteQueue; if (dwpt.getNumDocsInRAM() > 0) { synchronized(this) { if (!perThread.flushPending) { setFlushPending(perThread); } final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread); assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; assert dwpt == flushingDWPT : "flushControl returned different DWPT"; fullFlushBuffer.add(flushingDWPT); } } else { perThreadPool.reset(perThread, closed); // make this state inactive } }
@Override public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDeleteTerms()) { // Flush this state by num del terms final int maxBufferedDeleteTerms = indexWriterConfig .getMaxBufferedDeleteTerms(); if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) { control.setApplyAllDeletes(); } } if ((flushOnRAM() && control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) { control.setApplyAllDeletes(); if (infoStream.isEnabled("FP")) { infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); } } }
@Override public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDocCount() && state.dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { // Flush this state by num docs control.setFlushPending(state); } else if (flushOnRAM()) {// flush by RAM final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); if (totalRam >= limit) { if (infoStream.isEnabled("FP")) { infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); } markLargestWriterPending(control, state, totalRam); } } }
private final void abortThreadState(final ThreadState perThread, Set<String> newFiles) { assert perThread.isHeldByCurrentThread(); if (perThread.isActive()) { // we might be closed if (perThread.isInitialized()) { try { subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM()); perThread.dwpt.abort(newFiles); } finally { perThread.dwpt.checkAndResetHasAborted(); flushControl.doOnAbort(perThread); } } else { flushControl.doOnAbort(perThread); } } else { assert closed; } }
private final int abortThreadState(final ThreadState perThread, Set<String> newFiles) { assert perThread.isHeldByCurrentThread(); if (perThread.isActive()) { // we might be closed if (perThread.isInitialized()) { try { int abortedDocCount = perThread.dwpt.getNumDocsInRAM(); subtractFlushedNumDocs(abortedDocCount); perThread.dwpt.abort(newFiles); return abortedDocCount; } finally { perThread.dwpt.checkAndResetHasAborted(); flushControl.doOnAbort(perThread); } } else { flushControl.doOnAbort(perThread); // This DWPT was never initialized so it has no indexed documents: return 0; } } else { assert closed; return 0; } }
ThreadState obtainAndLock() { final ThreadState perThread = perThreadPool.getAndLock(Thread .currentThread(), documentsWriter); boolean success = false; try { if (perThread.isActive() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: addFlushableState(perThread); } success = true; // simply return the ThreadState even in a flush all case sine we already hold the lock return perThread; } finally { if (!success) { // make sure we unlock if this fails perThread.unlock(); } } }
@Override public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDeleteTerms()) { // Flush this state by num del terms final int maxBufferedDeleteTerms = indexWriterConfig .getMaxBufferedDeleteTerms(); if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) { control.setApplyAllDeletes(); } } final DocumentsWriter writer = this.writer.get(); if ((flushOnRAM() && control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) { control.setApplyAllDeletes(); if (writer.infoStream.isEnabled("FP")) { writer.infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); } } }
@Override public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDocCount() && state.dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { // Flush this state by num docs control.setFlushPending(state); } else if (flushOnRAM()) {// flush by RAM final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); if (totalRam >= limit) { final DocumentsWriter writer = this.writer.get(); if (writer.infoStream.isEnabled("FP")) { writer.infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); } markLargestWriterPending(control, state, totalRam); } } }
/** * Returns the current most RAM consuming non-pending {@link ThreadState} with * at least one indexed document. * <p> * This method will never return <code>null</code> */ protected ThreadState findLargestNonPendingWriter( DocumentsWriterFlushControl control, ThreadState perThreadState) { assert perThreadState.dwpt.getNumDocsInRAM() > 0; long maxRamSoFar = perThreadState.bytesUsed; // the dwpt which needs to be flushed eventually ThreadState maxRamUsingThreadState = perThreadState; assert !perThreadState.flushPending : "DWPT should have flushed"; Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates(); while (activePerThreadsIterator.hasNext()) { ThreadState next = activePerThreadsIterator.next(); if (!next.flushPending) { final long nextRam = next.bytesUsed; if (nextRam > maxRamSoFar && next.dwpt.getNumDocsInRAM() > 0) { maxRamSoFar = nextRam; maxRamUsingThreadState = next; } } } assert assertMessage("set largest ram consuming thread pending on lower watermark"); return maxRamUsingThreadState; }
ThreadState obtainAndLock() { final ThreadState perThread = perThreadPool.getAndLock(Thread .currentThread(), documentsWriter); boolean success = false; try { if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: addFlushableState(perThread); } success = true; // simply return the ThreadState even in a flush all case sine we already hold the lock return perThread; } finally { if (!success) { // make sure we unlock if this fails perThread.unlock(); } } }
@Override public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDocCount() && state.dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { // Flush this state by num docs control.setFlushPending(state); } else if (flushOnRAM()) {// flush by RAM final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); if (totalRam >= limit) { if (infoStream.isEnabled("FP")) { infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); } markLargestWriterPending(control, state, totalRam); } } }