public static IndexDeletionPolicy getIndexDeletionPolicy(Config config) { String deletionPolicyName = config.get("deletion.policy", "org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy"); if (deletionPolicyName.equals(NoDeletionPolicy.class.getName())) { return NoDeletionPolicy.INSTANCE; } else { try { return Class.forName(deletionPolicyName).asSubclass(IndexDeletionPolicy.class).newInstance(); } catch (Exception e) { throw new RuntimeException("unable to instantiate class '" + deletionPolicyName + "' as IndexDeletionPolicy", e); } } }
public void testConcurrentWritesAndCommits() throws Exception { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { final int numIndexingThreads = scaledRandomIntBetween(3, 6); final int numDocsPerThread = randomIntBetween(500, 1000); final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); final List<Thread> indexingThreads = new ArrayList<>(); // create N indexing threads to index documents simultaneously for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) { final int threadIdx = threadNum; Thread indexingThread = new Thread(() -> { try { barrier.await(); // wait for all threads to start at the same time // index random number of docs for (int i = 0; i < numDocsPerThread; i++) { final String id = "thread" + threadIdx + "#" + i; ParsedDocument doc = testParsedDocument(id, "test", null, testDocument(), B_1, null); engine.index(indexForDoc(doc)); } } catch (Exception e) { throw new RuntimeException(e); } }); indexingThreads.add(indexingThread); } // start the indexing threads for (Thread thread : indexingThreads) { thread.start(); } barrier.await(); // wait for indexing threads to all be ready to start // create random commit points boolean doneIndexing; do { doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0; //engine.flush(); // flush and commit } while (doneIndexing == false); // now, verify all the commits have the correct docs according to the user commit data long prevLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) { Map<String, String> userData = commit.getUserData(); long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ? Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) : SequenceNumbersService.NO_OPS_PERFORMED; long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) : SequenceNumbersService.UNASSIGNED_SEQ_NO; // local checkpoint and max seq no shouldn't go backwards assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)); assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo)); try (IndexReader reader = DirectoryReader.open(commit)) { FieldStats stats = SeqNoFieldMapper.SeqNoDefaults.FIELD_TYPE.stats(reader); final long highestSeqNo; if (stats != null) { highestSeqNo = (long) stats.getMaxValue(); } else { highestSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; } // make sure localCheckpoint <= highest seq no found <= maxSeqNo assertThat(highestSeqNo, greaterThanOrEqualTo(localCheckpoint)); assertThat(highestSeqNo, lessThanOrEqualTo(maxSeqNo)); // make sure all sequence numbers up to and including the local checkpoint are in the index FixedBitSet seqNosBitSet = getSeqNosSet(reader, highestSeqNo); for (int i = 0; i <= localCheckpoint; i++) { assertTrue("local checkpoint [" + localCheckpoint + "], _seq_no [" + i + "] should be indexed", seqNosBitSet.get(i)); } } prevLocalCheckpoint = localCheckpoint; prevMaxSeqNo = maxSeqNo; } } }
public void testNoDeletionPolicy() throws Exception { PerfRunData runData = createPerfRunData(null); runData.getConfig().set("deletion.policy", NoDeletionPolicy.class.getName()); new CreateIndexTask(runData).doLogic(); new CloseIndexTask(runData).doLogic(); }