public LindenCoreImpl(LindenConfig lindenConfig, String subIndexDirectory) throws IOException { this.config = lindenConfig; idFieldName = config.getSchema().getId(); facetsConfig = config.createFacetsConfig(); String directory = config.getIndexDirectory(); if (subIndexDirectory != null) { directory = FilenameUtils.concat(config.getIndexDirectory(), subIndexDirectory); } indexWriter = new IndexWriter(createIndexDirectory(directory, config.getIndexType()), config.createIndexWriterConfig()); trackingIndexWriter = new TrackingIndexWriter(indexWriter); taxoWriter = facetsConfig != null ? new DirectoryTaxonomyWriter(createTaxoIndexDirectory(directory, config.getIndexType())) : null; commitStrategy = new CommitStrategy(indexWriter, taxoWriter); commitStrategy.start(); lindenNRTSearcherManager = new LindenNRTSearcherManager(config, trackingIndexWriter, taxoWriter); snippetGenerator = new LindenSnippetGenerator(); }
public LuceneFiler(@Nonnull Filer delegate, @Nonnull Config config) throws IOException { super(delegate); String path = config.getString("index.path"); maxAge = config.getTime("index.maxAge", "-1"); double maxMergeMb = config.getDouble("index.maxMergeMb", 4); double maxCachedMb = config.getDouble("index.maxCacheMb", 64); long targetMaxStale = config.getTime("index.targetMaxStale", "5s"); long targetMinStale = config.getTime("index.targetMinStale", "1s"); Directory dir = FSDirectory.open(new File(path).toPath()); NRTCachingDirectory cachingDir = new NRTCachingDirectory(dir, maxMergeMb, maxCachedMb); IndexWriterConfig writerConfig = new IndexWriterConfig(null); writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND); writer = new TrackingIndexWriter(new IndexWriter(cachingDir, writerConfig)); manager = new SearcherManager(writer.getIndexWriter(), true, new SearcherFactory()); thread = new ControlledRealTimeReopenThread<>(writer, manager, targetMaxStale, targetMinStale); thread.start(); }
@Override protected void doAfterWriter(final ExecutorService es) throws Exception { final double minReopenSec = 0.01 + 0.05 * random().nextDouble(); final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble()); if (VERBOSE) { System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); } genWriter = new TrackingIndexWriter(writer); final SearcherFactory sf = new SearcherFactory() { @Override public IndexSearcher newSearcher(IndexReader r) throws IOException { TestControlledRealTimeReopenThread.this.warmCalled = true; IndexSearcher s = new IndexSearcher(r, es); s.search(new TermQuery(new Term("body", "united")), 10); return s; } }; nrtNoDeletes = new SearcherManager(writer, false, sf); nrtDeletes = new SearcherManager(writer, true, sf); nrtDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtDeletes, maxReopenSec, minReopenSec); nrtDeletesThread.setName("NRTDeletes Reopen Thread"); nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtDeletesThread.setDaemon(true); nrtDeletesThread.start(); nrtNoDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec); nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread"); nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtNoDeletesThread.setDaemon(true); nrtNoDeletesThread.start(); }
/** * Constructor based on an instance of the type responsible of the lucene index persistence */ @Inject public LuceneIndex(final Directory luceneDirectory, final Analyzer analyzer) { try { // [1]: Create the indexWriter _indexWriter = new IndexWriter(luceneDirectory, new IndexWriterConfig(LuceneConstants.VERSION, analyzer)); // [2a]: Create the TrackingIndexWriter to track changes to the delegated previously created IndexWriter _trackingIndexWriter = new TrackingIndexWriter(_indexWriter); // [2b]: Create an IndexSearcher ReferenceManager to safely share IndexSearcher instances across // multiple threads _indexSearcherReferenceManager = new SearcherManager(_indexWriter, true, null); // [3]: Create the ControlledRealTimeReopenThread that reopens the index periodically having into // account the changes made to the index and tracked by the TrackingIndexWriter instance // The index is refreshed every 60sc when nobody is waiting // and every 100 millis whenever is someone waiting (see search method) // (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html) _indexSearcherReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(_trackingIndexWriter, _indexSearcherReferenceManager, 60.00, // when there is nobody waiting 0.1); // when there is someone waiting _indexSearcherReopenThread.start(); // start the refresher thread } catch (IOException ioEx) { throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage()); } }
private void reOpen(final ODocument metadata) throws IOException { ODatabaseDocumentInternal database = getDatabase(); final OAbstractPaginatedStorage storageLocalAbstract = (OAbstractPaginatedStorage) database.getStorage().getUnderlying(); Directory dir = null; if (storageLocalAbstract instanceof OLocalPaginatedStorage) { String pathname = getIndexPath((OLocalPaginatedStorage) storageLocalAbstract); OLogManager.instance().debug(this, "Opening NIOFS Lucene db=%s, path=%s", database.getName(), pathname); dir = NIOFSDirectory.open(new File(pathname)); } else { OLogManager.instance().debug(this, "Opening RAM Lucene index db=%s", database.getName()); dir = new RAMDirectory(); } final IndexWriter indexWriter = createIndexWriter(dir, metadata); mgrWriter = new TrackingIndexWriter(indexWriter); searcherManager = new SearcherManager(indexWriter, true, null); if (nrt != null) { nrt.close(); } nrt = new ControlledRealTimeReopenThread(mgrWriter, searcherManager, 60.00, 0.1); nrt.setDaemon(true); nrt.start(); flush(); }
@Override protected void doAfterWriter(final ExecutorService es) throws Exception { final double minReopenSec = 0.01 + 0.05 * random().nextDouble(); final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble()); if (VERBOSE) { System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); } genWriter = new TrackingIndexWriter(writer); final SearcherFactory sf = new SearcherFactory() { @Override public IndexSearcher newSearcher(IndexReader r) throws IOException { TestControlledRealTimeReopenThread.this.warmCalled = true; IndexSearcher s = new IndexSearcher(r, es); s.search(new TermQuery(new Term("body", "united")), 10); return s; } }; nrtNoDeletes = new SearcherManager(writer, false, sf); nrtDeletes = new SearcherManager(writer, true, sf); nrtDeletesThread = new ControlledRealTimeReopenThread<IndexSearcher>(genWriter, nrtDeletes, maxReopenSec, minReopenSec); nrtDeletesThread.setName("NRTDeletes Reopen Thread"); nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtDeletesThread.setDaemon(true); nrtDeletesThread.start(); nrtNoDeletesThread = new ControlledRealTimeReopenThread<IndexSearcher>(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec); nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread"); nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY)); nrtNoDeletesThread.setDaemon(true); nrtNoDeletesThread.start(); }
/** * Builds a new {@link FSIndex}. * * @param name * the index name * @param mbeanName * the JMX MBean object name * @param path * the directory path * @param analyzer * the index writer analyzer * @param refresh * the index reader refresh frequency in seconds * @param ramBufferMB * the index writer RAM buffer size in MB * @param maxMergeMB * the directory max merge size in MB * @param maxCachedMB * the directory max cache size in MB * @param refreshTask * action to be done during refresh */ public void init(String name, String mbeanName, Path path, Analyzer analyzer, double refresh, int ramBufferMB, int maxMergeMB, int maxCachedMB, Runnable refreshTask) { try { this.path = path; this.name = name; // Open or create directory FSDirectory fsDirectory = FSDirectory.open(path); this.directory = new NRTCachingDirectory(fsDirectory, maxMergeMB, maxCachedMB); // Setup index writer IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); indexWriterConfig.setRAMBufferSizeMB(ramBufferMB); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); indexWriterConfig.setUseCompoundFile(true); indexWriterConfig.setMergePolicy(new TieredMergePolicy()); this.indexWriter = new IndexWriter(this.directory, indexWriterConfig); // Setup NRT search SearcherFactory searcherFactory = new SearcherFactory() { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) { if (refreshTask != null) { refreshTask.run(); } IndexSearcher searcher = new IndexSearcher(reader); searcher.setSimilarity(new NoIDFSimilarity()); return searcher; } }; TrackingIndexWriter trackingWriter = new TrackingIndexWriter(this.indexWriter); this.searcherManager = new SearcherManager(this.indexWriter, true, searcherFactory); this.searcherReopener = new ControlledRealTimeReopenThread<>(trackingWriter, this.searcherManager, refresh, refresh); this.searcherReopener.start(); // Register JMX MBean // mbean = new ObjectName(mbeanName); // ManagementFactory.getPlatformMBeanServer().registerMBean(service, // this.mbean); } catch (Exception e) { throw new FhirIndexException(e, "Error while creating index %s", name); } }
public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException { IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random())); conf.setMergePolicy(NoMergePolicy.INSTANCE); Directory d = newDirectory(); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch signal = new CountDownLatch(1); LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); final TrackingIndexWriter writer = new TrackingIndexWriter(_writer); final SearcherManager manager = new SearcherManager(_writer, false, null); Document doc = new Document(); doc.add(newTextField("test", "test", Field.Store.YES)); writer.addDocument(doc); manager.maybeRefresh(); Thread t = new Thread() { @Override public void run() { try { signal.await(); manager.maybeRefresh(); writer.deleteDocuments(new TermQuery(new Term("foo", "barista"))); manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); // let the add below finish } } }; t.start(); _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue IndexSearcher searcher = manager.acquire(); try { assertEquals(2, searcher.getIndexReader().numDocs()); } finally { manager.release(searcher); } final ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<>(writer, manager, 0.01, 0.01); thread.start(); // start reopening if (VERBOSE) { System.out.println("waiting now for generation " + lastGen); } final AtomicBoolean finished = new AtomicBoolean(false); Thread waiter = new Thread() { @Override public void run() { try { thread.waitForGeneration(lastGen); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException(ie); } finished.set(true); } }; waiter.start(); manager.maybeRefresh(); waiter.join(1000); if (!finished.get()) { waiter.interrupt(); fail("thread deadlocked on waitForGeneration"); } thread.close(); thread.join(); IOUtils.close(manager, _writer, d); }
public TrackingIndexWriter getWriter() { return writer; }
public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException { IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); conf.setMergePolicy(random().nextBoolean() ? NoMergePolicy.COMPOUND_FILES : NoMergePolicy.NO_COMPOUND_FILES); Directory d = newDirectory(); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch signal = new CountDownLatch(1); LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); final TrackingIndexWriter writer = new TrackingIndexWriter(_writer); final SearcherManager manager = new SearcherManager(_writer, false, null); Document doc = new Document(); doc.add(newTextField("test", "test", Field.Store.YES)); writer.addDocument(doc); manager.maybeRefresh(); Thread t = new Thread() { @Override public void run() { try { signal.await(); manager.maybeRefresh(); writer.deleteDocuments(new TermQuery(new Term("foo", "barista"))); manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); // let the add below finish } } }; t.start(); _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue IndexSearcher searcher = manager.acquire(); try { assertEquals(2, searcher.getIndexReader().numDocs()); } finally { manager.release(searcher); } final ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<IndexSearcher>(writer, manager, 0.01, 0.01); thread.start(); // start reopening if (VERBOSE) { System.out.println("waiting now for generation " + lastGen); } final AtomicBoolean finished = new AtomicBoolean(false); Thread waiter = new Thread() { @Override public void run() { try { thread.waitForGeneration(lastGen); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException(ie); } finished.set(true); } }; waiter.start(); manager.maybeRefresh(); waiter.join(1000); if (!finished.get()) { waiter.interrupt(); fail("thread deadlocked on waitForGeneration"); } thread.close(); thread.join(); IOUtils.close(manager, _writer, d); }
/** * Create ControlledRealTimeReopenThread, to periodically * reopen the a {@link ReferenceManager}. * * @param targetMaxStaleSec Maximum time until a new * reader must be opened; this sets the upper bound * on how slowly reopens may occur, when no * caller is waiting for a specific generation to * become visible. * * @param targetMinStaleSec Mininum time until a new * reader can be opened; this sets the lower bound * on how quickly reopens may occur, when a caller * is waiting for a specific generation to * become visible. */ public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) { if (targetMaxStaleSec < targetMinStaleSec) { throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); } this.writer = writer; this.manager = manager; this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec); this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec); manager.addListener(new HandleRefresh()); }