Java 类org.apache.lucene.index.TrackingIndexWriter 实例源码

项目:linden    文件:LindenCoreImpl.java   
public LindenCoreImpl(LindenConfig lindenConfig, String subIndexDirectory) throws IOException {
  this.config = lindenConfig;
  idFieldName = config.getSchema().getId();
  facetsConfig = config.createFacetsConfig();

  String directory = config.getIndexDirectory();
  if (subIndexDirectory != null) {
    directory = FilenameUtils.concat(config.getIndexDirectory(), subIndexDirectory);
  }

  indexWriter =
      new IndexWriter(createIndexDirectory(directory, config.getIndexType()), config.createIndexWriterConfig());
  trackingIndexWriter = new TrackingIndexWriter(indexWriter);

  taxoWriter =
      facetsConfig != null ? new DirectoryTaxonomyWriter(createTaxoIndexDirectory(directory, config.getIndexType()))
                           : null;
  commitStrategy = new CommitStrategy(indexWriter, taxoWriter);
  commitStrategy.start();

  lindenNRTSearcherManager = new LindenNRTSearcherManager(config,
                                                          trackingIndexWriter, taxoWriter);
  snippetGenerator = new LindenSnippetGenerator();
}
项目:flow    文件:LuceneFiler.java   
public LuceneFiler(@Nonnull Filer delegate, @Nonnull Config config) throws IOException {
    super(delegate);

    String path = config.getString("index.path");
    maxAge = config.getTime("index.maxAge", "-1");
    double maxMergeMb = config.getDouble("index.maxMergeMb", 4);
    double maxCachedMb = config.getDouble("index.maxCacheMb", 64);
    long targetMaxStale = config.getTime("index.targetMaxStale", "5s");
    long targetMinStale = config.getTime("index.targetMinStale", "1s");

    Directory dir = FSDirectory.open(new File(path).toPath());
    NRTCachingDirectory cachingDir = new NRTCachingDirectory(dir, maxMergeMb, maxCachedMb);
    IndexWriterConfig writerConfig = new IndexWriterConfig(null);
    writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND);

    writer = new TrackingIndexWriter(new IndexWriter(cachingDir, writerConfig));
    manager = new SearcherManager(writer.getIndexWriter(), true, new SearcherFactory());
    thread = new ControlledRealTimeReopenThread<>(writer, manager, targetMaxStale, targetMinStale);
    thread.start();
}
项目:search    文件:TestControlledRealTimeReopenThread.java   
@Override
protected void doAfterWriter(final ExecutorService es) throws Exception {
  final double minReopenSec = 0.01 + 0.05 * random().nextDouble();
  final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble());

  if (VERBOSE) {
    System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
  }

  genWriter = new TrackingIndexWriter(writer);

  final SearcherFactory sf = new SearcherFactory() {
      @Override
      public IndexSearcher newSearcher(IndexReader r) throws IOException {
        TestControlledRealTimeReopenThread.this.warmCalled = true;
        IndexSearcher s = new IndexSearcher(r, es);
        s.search(new TermQuery(new Term("body", "united")), 10);
        return s;
      }
    };

  nrtNoDeletes = new SearcherManager(writer, false, sf);
  nrtDeletes = new SearcherManager(writer, true, sf);

  nrtDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtDeletes, maxReopenSec, minReopenSec);
  nrtDeletesThread.setName("NRTDeletes Reopen Thread");
  nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
  nrtDeletesThread.setDaemon(true);
  nrtDeletesThread.start();

  nrtNoDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec);
  nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
  nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
  nrtNoDeletesThread.setDaemon(true);
  nrtNoDeletesThread.start();
}
项目:r01fb    文件:LuceneIndex.java   
/**
 * Constructor based on an instance of the type responsible of the lucene index persistence
 */
@Inject
public LuceneIndex(final Directory luceneDirectory,
                   final Analyzer analyzer) {
    try {
        // [1]: Create the indexWriter
        _indexWriter = new IndexWriter(luceneDirectory,
                                       new IndexWriterConfig(LuceneConstants.VERSION,
                                                             analyzer));

        // [2a]: Create the TrackingIndexWriter to track changes to the delegated previously created IndexWriter 
        _trackingIndexWriter = new TrackingIndexWriter(_indexWriter);

        // [2b]: Create an IndexSearcher ReferenceManager to safely share IndexSearcher instances across
        //       multiple threads
        _indexSearcherReferenceManager = new SearcherManager(_indexWriter,
                                                             true,
                                                             null);

        // [3]: Create the ControlledRealTimeReopenThread that reopens the index periodically having into 
        //      account the changes made to the index and tracked by the TrackingIndexWriter instance
        //      The index is refreshed every 60sc when nobody is waiting 
        //      and every 100 millis whenever is someone waiting (see search method)
        //      (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
        _indexSearcherReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(_trackingIndexWriter,
                                                                                       _indexSearcherReferenceManager,
                                                                                       60.00,   // when there is nobody waiting
                                                                                       0.1);    // when there is someone waiting
        _indexSearcherReopenThread.start(); // start the refresher thread
    } catch (IOException ioEx) {
        throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
    }
}
项目:orientdb-lucene    文件:OLuceneIndexManagerAbstract.java   
private void reOpen(final ODocument metadata) throws IOException {
  ODatabaseDocumentInternal database = getDatabase();

  final OAbstractPaginatedStorage storageLocalAbstract = (OAbstractPaginatedStorage) database.getStorage().getUnderlying();
  Directory dir = null;
  if (storageLocalAbstract instanceof OLocalPaginatedStorage) {
    String pathname = getIndexPath((OLocalPaginatedStorage) storageLocalAbstract);

    OLogManager.instance().debug(this, "Opening NIOFS Lucene db=%s, path=%s", database.getName(), pathname);

    dir = NIOFSDirectory.open(new File(pathname));
  } else {

    OLogManager.instance().debug(this, "Opening RAM Lucene index db=%s", database.getName());
    dir = new RAMDirectory();

  }

  final IndexWriter indexWriter = createIndexWriter(dir, metadata);
  mgrWriter = new TrackingIndexWriter(indexWriter);
  searcherManager = new SearcherManager(indexWriter, true, null);
  if (nrt != null) {
    nrt.close();
  }

  nrt = new ControlledRealTimeReopenThread(mgrWriter, searcherManager, 60.00, 0.1);
  nrt.setDaemon(true);
  nrt.start();
  flush();
}
项目:Maskana-Gestor-de-Conocimiento    文件:TestControlledRealTimeReopenThread.java   
@Override
protected void doAfterWriter(final ExecutorService es) throws Exception {
  final double minReopenSec = 0.01 + 0.05 * random().nextDouble();
  final double maxReopenSec = minReopenSec * (1.0 + 10 * random().nextDouble());

  if (VERBOSE) {
    System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
  }

  genWriter = new TrackingIndexWriter(writer);

  final SearcherFactory sf = new SearcherFactory() {
      @Override
      public IndexSearcher newSearcher(IndexReader r) throws IOException {
        TestControlledRealTimeReopenThread.this.warmCalled = true;
        IndexSearcher s = new IndexSearcher(r, es);
        s.search(new TermQuery(new Term("body", "united")), 10);
        return s;
      }
    };

  nrtNoDeletes = new SearcherManager(writer, false, sf);
  nrtDeletes = new SearcherManager(writer, true, sf);

  nrtDeletesThread = new ControlledRealTimeReopenThread<IndexSearcher>(genWriter, nrtDeletes, maxReopenSec, minReopenSec);
  nrtDeletesThread.setName("NRTDeletes Reopen Thread");
  nrtDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
  nrtDeletesThread.setDaemon(true);
  nrtDeletesThread.start();

  nrtNoDeletesThread = new ControlledRealTimeReopenThread<IndexSearcher>(genWriter, nrtNoDeletes, maxReopenSec, minReopenSec);
  nrtNoDeletesThread.setName("NRTNoDeletes Reopen Thread");
  nrtNoDeletesThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
  nrtNoDeletesThread.setDaemon(true);
  nrtNoDeletesThread.start();
}
项目:cassandra-fhir-index    文件:LuceneService.java   
/**
 * Builds a new {@link FSIndex}.
 *
 * @param name
 *            the index name
 * @param mbeanName
 *            the JMX MBean object name
 * @param path
 *            the directory path
 * @param analyzer
 *            the index writer analyzer
 * @param refresh
 *            the index reader refresh frequency in seconds
 * @param ramBufferMB
 *            the index writer RAM buffer size in MB
 * @param maxMergeMB
 *            the directory max merge size in MB
 * @param maxCachedMB
 *            the directory max cache size in MB
 * @param refreshTask
 *            action to be done during refresh
 */
public void init(String name, String mbeanName, Path path, Analyzer analyzer, double refresh, int ramBufferMB,
        int maxMergeMB, int maxCachedMB, Runnable refreshTask) {
    try {

        this.path = path;
        this.name = name;

        // Open or create directory
        FSDirectory fsDirectory = FSDirectory.open(path);
        this.directory = new NRTCachingDirectory(fsDirectory, maxMergeMB, maxCachedMB);

        // Setup index writer
        IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
        indexWriterConfig.setRAMBufferSizeMB(ramBufferMB);
        indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
        indexWriterConfig.setUseCompoundFile(true);
        indexWriterConfig.setMergePolicy(new TieredMergePolicy());
        this.indexWriter = new IndexWriter(this.directory, indexWriterConfig);

        // Setup NRT search
        SearcherFactory searcherFactory = new SearcherFactory() {
            @Override
            public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) {
                if (refreshTask != null) {
                    refreshTask.run();
                }
                IndexSearcher searcher = new IndexSearcher(reader);
                searcher.setSimilarity(new NoIDFSimilarity());
                return searcher;
            }
        };
        TrackingIndexWriter trackingWriter = new TrackingIndexWriter(this.indexWriter);
        this.searcherManager = new SearcherManager(this.indexWriter, true, searcherFactory);
        this.searcherReopener = new ControlledRealTimeReopenThread<>(trackingWriter, this.searcherManager, refresh,
                refresh);
        this.searcherReopener.start();

        // Register JMX MBean
        // mbean = new ObjectName(mbeanName);
        // ManagementFactory.getPlatformMBeanServer().registerMBean(service,
        // this.mbean);

    } catch (Exception e) {
        throw new FhirIndexException(e, "Error while creating index %s", name);
    }
}
项目:search    文件:TestControlledRealTimeReopenThread.java   
public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
  IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
  conf.setMergePolicy(NoMergePolicy.INSTANCE);
  Directory d = newDirectory();
  final CountDownLatch latch = new CountDownLatch(1);
  final CountDownLatch signal = new CountDownLatch(1);

  LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
  final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
  final SearcherManager manager = new SearcherManager(_writer, false, null);
  Document doc = new Document();
  doc.add(newTextField("test", "test", Field.Store.YES));
  writer.addDocument(doc);
  manager.maybeRefresh();
  Thread t = new Thread() {
    @Override
    public void run() {
      try {
        signal.await();
        manager.maybeRefresh();
        writer.deleteDocuments(new TermQuery(new Term("foo", "barista")));
        manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        latch.countDown(); // let the add below finish
      }
    }
  };
  t.start();
  _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
  final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen

  assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue

  IndexSearcher searcher = manager.acquire();
  try {
    assertEquals(2, searcher.getIndexReader().numDocs());
  } finally {
    manager.release(searcher);
  }
  final ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<>(writer, manager, 0.01, 0.01);
  thread.start(); // start reopening
  if (VERBOSE) {
    System.out.println("waiting now for generation " + lastGen);
  }

  final AtomicBoolean finished = new AtomicBoolean(false);
  Thread waiter = new Thread() {
    @Override
    public void run() {
      try {
        thread.waitForGeneration(lastGen);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(ie);
      }
      finished.set(true);
    }
  };
  waiter.start();
  manager.maybeRefresh();
  waiter.join(1000);
  if (!finished.get()) {
    waiter.interrupt();
    fail("thread deadlocked on waitForGeneration");
  }
  thread.close();
  thread.join();
  IOUtils.close(manager, _writer, d);
}
项目:gerrit    文件:AbstractLuceneIndex.java   
public TrackingIndexWriter getWriter() {
  return writer;
}
项目:Maskana-Gestor-de-Conocimiento    文件:TestControlledRealTimeReopenThread.java   
public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
  IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
  conf.setMergePolicy(random().nextBoolean() ? NoMergePolicy.COMPOUND_FILES : NoMergePolicy.NO_COMPOUND_FILES);
  Directory d = newDirectory();
  final CountDownLatch latch = new CountDownLatch(1);
  final CountDownLatch signal = new CountDownLatch(1);

  LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
  final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
  final SearcherManager manager = new SearcherManager(_writer, false, null);
  Document doc = new Document();
  doc.add(newTextField("test", "test", Field.Store.YES));
  writer.addDocument(doc);
  manager.maybeRefresh();
  Thread t = new Thread() {
    @Override
    public void run() {
      try {
        signal.await();
        manager.maybeRefresh();
        writer.deleteDocuments(new TermQuery(new Term("foo", "barista")));
        manager.maybeRefresh(); // kick off another reopen so we inc. the internal gen
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        latch.countDown(); // let the add below finish
      }
    }
  };
  t.start();
  _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
  final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen

  assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue

  IndexSearcher searcher = manager.acquire();
  try {
    assertEquals(2, searcher.getIndexReader().numDocs());
  } finally {
    manager.release(searcher);
  }
  final ControlledRealTimeReopenThread<IndexSearcher> thread = new ControlledRealTimeReopenThread<IndexSearcher>(writer, manager, 0.01, 0.01);
  thread.start(); // start reopening
  if (VERBOSE) {
    System.out.println("waiting now for generation " + lastGen);
  }

  final AtomicBoolean finished = new AtomicBoolean(false);
  Thread waiter = new Thread() {
    @Override
    public void run() {
      try {
        thread.waitForGeneration(lastGen);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(ie);
      }
      finished.set(true);
    }
  };
  waiter.start();
  manager.maybeRefresh();
  waiter.join(1000);
  if (!finished.get()) {
    waiter.interrupt();
    fail("thread deadlocked on waitForGeneration");
  }
  thread.close();
  thread.join();
  IOUtils.close(manager, _writer, d);
}
项目:lams    文件:ControlledRealTimeReopenThread.java   
/**
 * Create ControlledRealTimeReopenThread, to periodically
 * reopen the a {@link ReferenceManager}.
 *
 * @param targetMaxStaleSec Maximum time until a new
 *        reader must be opened; this sets the upper bound
 *        on how slowly reopens may occur, when no
 *        caller is waiting for a specific generation to
 *        become visible.
 *
 * @param targetMinStaleSec Mininum time until a new
 *        reader can be opened; this sets the lower bound
 *        on how quickly reopens may occur, when a caller
 *        is waiting for a specific generation to
 *        become visible.
 */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
  if (targetMaxStaleSec < targetMinStaleSec) {
    throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
  }
  this.writer = writer;
  this.manager = manager;
  this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
  this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
  manager.addListener(new HandleRefresh());
}
项目:search    文件:ControlledRealTimeReopenThread.java   
/**
 * Create ControlledRealTimeReopenThread, to periodically
 * reopen the a {@link ReferenceManager}.
 *
 * @param targetMaxStaleSec Maximum time until a new
 *        reader must be opened; this sets the upper bound
 *        on how slowly reopens may occur, when no
 *        caller is waiting for a specific generation to
 *        become visible.
 *
 * @param targetMinStaleSec Mininum time until a new
 *        reader can be opened; this sets the lower bound
 *        on how quickly reopens may occur, when a caller
 *        is waiting for a specific generation to
 *        become visible.
 */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
  if (targetMaxStaleSec < targetMinStaleSec) {
    throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
  }
  this.writer = writer;
  this.manager = manager;
  this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
  this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
  manager.addListener(new HandleRefresh());
}
项目:read-open-source-code    文件:ControlledRealTimeReopenThread.java   
/**
 * Create ControlledRealTimeReopenThread, to periodically
 * reopen the a {@link ReferenceManager}.
 *
 * @param targetMaxStaleSec Maximum time until a new
 *        reader must be opened; this sets the upper bound
 *        on how slowly reopens may occur, when no
 *        caller is waiting for a specific generation to
 *        become visible.
 *
 * @param targetMinStaleSec Mininum time until a new
 *        reader can be opened; this sets the lower bound
 *        on how quickly reopens may occur, when a caller
 *        is waiting for a specific generation to
 *        become visible.
 */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
  if (targetMaxStaleSec < targetMinStaleSec) {
    throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
  }
  this.writer = writer;
  this.manager = manager;
  this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
  this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
  manager.addListener(new HandleRefresh());
}
项目:read-open-source-code    文件:ControlledRealTimeReopenThread.java   
/**
 * Create ControlledRealTimeReopenThread, to periodically
 * reopen the a {@link ReferenceManager}.
 *
 * @param targetMaxStaleSec Maximum time until a new
 *        reader must be opened; this sets the upper bound
 *        on how slowly reopens may occur, when no
 *        caller is waiting for a specific generation to
 *        become visible.
 *
 * @param targetMinStaleSec Mininum time until a new
 *        reader can be opened; this sets the lower bound
 *        on how quickly reopens may occur, when a caller
 *        is waiting for a specific generation to
 *        become visible.
 */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
  if (targetMaxStaleSec < targetMinStaleSec) {
    throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
  }
  this.writer = writer;
  this.manager = manager;
  this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
  this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
  manager.addListener(new HandleRefresh());
}
项目:read-open-source-code    文件:ControlledRealTimeReopenThread.java   
/**
 * Create ControlledRealTimeReopenThread, to periodically
 * reopen the a {@link ReferenceManager}.
 *
 * @param targetMaxStaleSec Maximum time until a new
 *        reader must be opened; this sets the upper bound
 *        on how slowly reopens may occur, when no
 *        caller is waiting for a specific generation to
 *        become visible.
 *
 * @param targetMinStaleSec Mininum time until a new
 *        reader can be opened; this sets the lower bound
 *        on how quickly reopens may occur, when a caller
 *        is waiting for a specific generation to
 *        become visible.
 */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
  if (targetMaxStaleSec < targetMinStaleSec) {
    throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
  }
  this.writer = writer;
  this.manager = manager;
  this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
  this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
  manager.addListener(new HandleRefresh());
}
项目:Maskana-Gestor-de-Conocimiento    文件:ControlledRealTimeReopenThread.java   
/**
 * Create ControlledRealTimeReopenThread, to periodically
 * reopen the a {@link ReferenceManager}.
 *
 * @param targetMaxStaleSec Maximum time until a new
 *        reader must be opened; this sets the upper bound
 *        on how slowly reopens may occur, when no
 *        caller is waiting for a specific generation to
 *        become visible.
 *
 * @param targetMinStaleSec Mininum time until a new
 *        reader can be opened; this sets the lower bound
 *        on how quickly reopens may occur, when a caller
 *        is waiting for a specific generation to
 *        become visible.
 */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
  if (targetMaxStaleSec < targetMinStaleSec) {
    throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
  }
  this.writer = writer;
  this.manager = manager;
  this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
  this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
  manager.addListener(new HandleRefresh());
}