Java 类org.apache.hadoop.hdfs.inotify.MissingEventsException 实例源码

项目:hadoop    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
  EventBatch next = null;
  try {
    int nextWaitMin = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      // sleep for a random period between nextWaitMin and nextWaitMin * 2
      // to avoid stampedes at the NN if there are multiple clients
      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
      Thread.sleep(sleepTime);
      // the maximum sleep is 2 minutes
      nextWaitMin = Math.min(60000, nextWaitMin * 2);
    }
  } finally {
    scope.close();
  }

  return next;
}
项目:aliyun-oss-hadoop-fs    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  EventBatch next;
  try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) {
    long initialTime = Time.monotonicNow();
    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
    long nextWait = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
      if (timeLeft <= 0) {
        LOG.debug("timed poll(): timed out");
        break;
      } else if (timeLeft < nextWait * 2) {
        nextWait = timeLeft;
      } else {
        nextWait *= 2;
      }
      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
          nextWait);
      Thread.sleep(nextWait);
    }
  }
  return next;
}
项目:aliyun-oss-hadoop-fs    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  EventBatch next;
  try (TraceScope ignored = tracer.newScope("inotifyTake")) {
    int nextWaitMin = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      // sleep for a random period between nextWaitMin and nextWaitMin * 2
      // to avoid stampedes at the NN if there are multiple clients
      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
      Thread.sleep(sleepTime);
      // the maximum sleep is 2 minutes
      nextWaitMin = Math.min(60000, nextWaitMin * 2);
    }
  }

  return next;
}
项目:big-c    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
  EventBatch next = null;
  try {
    int nextWaitMin = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      // sleep for a random period between nextWaitMin and nextWaitMin * 2
      // to avoid stampedes at the NN if there are multiple clients
      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
      Thread.sleep(sleepTime);
      // the maximum sleep is 2 minutes
      nextWaitMin = Math.min(60000, nextWaitMin * 2);
    }
  } finally {
    scope.close();
  }

  return next;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  long initialTime = Time.monotonicNow();
  long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
  long nextWait = INITIAL_WAIT_MS;
  EventBatch next = null;
  while ((next = poll()) == null) {
    long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
    if (timeLeft <= 0) {
      LOG.debug("timed poll(): timed out");
      break;
    } else if (timeLeft < nextWait * 2) {
      nextWait = timeLeft;
    } else {
      nextWait *= 2;
    }
    LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
        nextWait);
    Thread.sleep(nextWait);
  }

  return next;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  EventBatch next = null;
  int nextWaitMin = INITIAL_WAIT_MS;
  while ((next = poll()) == null) {
    // sleep for a random period between nextWaitMin and nextWaitMin * 2
    // to avoid stampedes at the NN if there are multiple clients
    int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
    LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
    Thread.sleep(sleepTime);
    // the maximum sleep is 2 minutes
    nextWaitMin = Math.min(60000, nextWaitMin * 2);
  }

  return next;
}
项目:FlexMap    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event in the stream, waiting up to the specified amount of
 * time for a new event. Returns null if a new event is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public Event poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  long initialTime = Time.monotonicNow();
  long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
  long nextWait = INITIAL_WAIT_MS;
  Event next = null;
  while ((next = poll()) == null) {
    long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
    if (timeLeft <= 0) {
      LOG.debug("timed poll(): timed out");
      break;
    } else if (timeLeft < nextWait * 2) {
      nextWait = timeLeft;
    } else {
      nextWait *= 2;
    }
    LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
        nextWait);
    Thread.sleep(nextWait);
  }

  return next;
}
项目:FlexMap    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event in the stream, waiting indefinitely if a new event
 * is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public Event take() throws IOException, InterruptedException,
    MissingEventsException {
  Event next = null;
  int nextWaitMin = INITIAL_WAIT_MS;
  while ((next = poll()) == null) {
    // sleep for a random period between nextWaitMin and nextWaitMin * 2
    // to avoid stampedes at the NN if there are multiple clients
    int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
    LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
    Thread.sleep(sleepTime);
    // the maximum sleep is 2 minutes
    nextWaitMin = Math.min(60000, nextWaitMin * 2);
  }

  return next;
}
项目:hadoop    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
  EventBatch next = null;
  try {
    long initialTime = Time.monotonicNow();
    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
    long nextWait = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
      if (timeLeft <= 0) {
        LOG.debug("timed poll(): timed out");
        break;
      } else if (timeLeft < nextWait * 2) {
        nextWait = timeLeft;
      } else {
        nextWait *= 2;
      }
      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
          nextWait);
      Thread.sleep(nextWait);
    }
  } finally {
    scope.close();
  }
  return next;
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:aliyun-oss-hadoop-fs    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  try (TraceScope ignored = tracer.newScope("inotifyPoll")) {
    // need to keep retrying until the NN sends us the latest committed txid
    if (lastReadTxid == -1) {
      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
      lastReadTxid = namenode.getCurrentEditLogTxid();
      return null;
    }
    if (!it.hasNext()) {
      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
      if (el.getLastTxid() != -1) {
        // we only want to set syncTxid when we were actually able to read some
        // edits on the NN -- otherwise it will seem like edits are being
        // generated faster than we can read them when the problem is really
        // that we are temporarily unable to read edits
        syncTxid = el.getSyncTxid();
        it = el.getBatches().iterator();
        long formerLastReadTxid = lastReadTxid;
        lastReadTxid = el.getLastTxid();
        if (el.getFirstTxid() != formerLastReadTxid + 1) {
          throw new MissingEventsException(formerLastReadTxid + 1,
              el.getFirstTxid());
        }
      } else {
        LOG.debug("poll(): read no edits from the NN when requesting edits " +
            "after txid {}", lastReadTxid);
        return null;
      }
    }

    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
      // newly seen edit log ops actually got converted to events
      return it.next();
    } else {
      return null;
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:big-c    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
  EventBatch next = null;
  try {
    long initialTime = Time.monotonicNow();
    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
    long nextWait = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
      if (timeLeft <= 0) {
        LOG.debug("timed poll(): timed out");
        break;
      } else if (timeLeft < nextWait * 2) {
        nextWait = timeLeft;
      } else {
        nextWait *= 2;
      }
      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
          nextWait);
      Thread.sleep(nextWait);
    }
  } finally {
    scope.close();
  }
  return next;
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  // need to keep retrying until the NN sends us the latest committed txid
  if (lastReadTxid == -1) {
    LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
    lastReadTxid = namenode.getCurrentEditLogTxid();
    return null;
  }
  if (!it.hasNext()) {
    EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
    if (el.getLastTxid() != -1) {
      // we only want to set syncTxid when we were actually able to read some
      // edits on the NN -- otherwise it will seem like edits are being
      // generated faster than we can read them when the problem is really
      // that we are temporarily unable to read edits
      syncTxid = el.getSyncTxid();
      it = el.getBatches().iterator();
      long formerLastReadTxid = lastReadTxid;
      lastReadTxid = el.getLastTxid();
      if (el.getFirstTxid() != formerLastReadTxid + 1) {
        throw new MissingEventsException(formerLastReadTxid + 1,
            el.getFirstTxid());
      }
    } else {
      LOG.debug("poll(): read no edits from the NN when requesting edits " +
        "after txid {}", lastReadTxid);
      return null;
    }
  }

  if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
    // newly seen edit log ops actually got converted to events
    return it.next();
  } else {
    return null;
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:FlexMap    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event in the stream or null if no new events are currently
 * available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next event in the
 * stream because the data for the event (and possibly some subsequent events)
 * has been deleted (generally because this stream is a very large number of
 * events behind the current state of the NameNode). It is safe to continue
 * reading from the stream after this exception is thrown -- the next
 * available event will be returned.
 */
public Event poll() throws IOException, MissingEventsException {
  // need to keep retrying until the NN sends us the latest committed txid
  if (lastReadTxid == -1) {
    LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
    lastReadTxid = namenode.getCurrentEditLogTxid();
    return null;
  }
  if (!it.hasNext()) {
    EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
    if (el.getLastTxid() != -1) {
      // we only want to set syncTxid when we were actually able to read some
      // edits on the NN -- otherwise it will seem like edits are being
      // generated faster than we can read them when the problem is really
      // that we are temporarily unable to read edits
      syncTxid = el.getSyncTxid();
      it = el.getEvents().iterator();
      long formerLastReadTxid = lastReadTxid;
      lastReadTxid = el.getLastTxid();
      if (el.getFirstTxid() != formerLastReadTxid + 1) {
        throw new MissingEventsException(formerLastReadTxid + 1,
            el.getFirstTxid());
      }
    } else {
      LOG.debug("poll(): read no edits from the NN when requesting edits " +
        "after txid {}", lastReadTxid);
      return null;
    }
  }

  if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
    // newly seen edit log ops actually got converted to events
    return it.next();
  } else {
    return null;
  }
}
项目:FlexMap    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    Event next = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      next = waitForNextEvent(eis);
      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:FlexMap    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    Event next = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertTrue(next != null);
    Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
    Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  TraceScope scope =
      Trace.startSpan("inotifyPoll", traceSampler);
  try {
    // need to keep retrying until the NN sends us the latest committed txid
    if (lastReadTxid == -1) {
      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
      lastReadTxid = namenode.getCurrentEditLogTxid();
      return null;
    }
    if (!it.hasNext()) {
      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
      if (el.getLastTxid() != -1) {
        // we only want to set syncTxid when we were actually able to read some
        // edits on the NN -- otherwise it will seem like edits are being
        // generated faster than we can read them when the problem is really
        // that we are temporarily unable to read edits
        syncTxid = el.getSyncTxid();
        it = el.getBatches().iterator();
        long formerLastReadTxid = lastReadTxid;
        lastReadTxid = el.getLastTxid();
        if (el.getFirstTxid() != formerLastReadTxid + 1) {
          throw new MissingEventsException(formerLastReadTxid + 1,
              el.getFirstTxid());
        }
      } else {
        LOG.debug("poll(): read no edits from the NN when requesting edits " +
          "after txid {}", lastReadTxid);
        return null;
      }
    }

    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
      // newly seen edit log ops actually got converted to events
      return it.next();
    } else {
      return null;
    }
  } finally {
    scope.close();
  }
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
public static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:big-c    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  TraceScope scope =
      Trace.startSpan("inotifyPoll", traceSampler);
  try {
    // need to keep retrying until the NN sends us the latest committed txid
    if (lastReadTxid == -1) {
      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
      lastReadTxid = namenode.getCurrentEditLogTxid();
      return null;
    }
    if (!it.hasNext()) {
      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
      if (el.getLastTxid() != -1) {
        // we only want to set syncTxid when we were actually able to read some
        // edits on the NN -- otherwise it will seem like edits are being
        // generated faster than we can read them when the problem is really
        // that we are temporarily unable to read edits
        syncTxid = el.getSyncTxid();
        it = el.getBatches().iterator();
        long formerLastReadTxid = lastReadTxid;
        lastReadTxid = el.getLastTxid();
        if (el.getFirstTxid() != formerLastReadTxid + 1) {
          throw new MissingEventsException(formerLastReadTxid + 1,
              el.getFirstTxid());
        }
      } else {
        LOG.debug("poll(): read no edits from the NN when requesting edits " +
          "after txid {}", lastReadTxid);
        return null;
      }
    }

    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
      // newly seen edit log ops actually got converted to events
      return it.next();
    } else {
      return null;
    }
  } finally {
    scope.close();
  }
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:FlexMap    文件:TestDFSInotifyEventInputStream.java   
private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  Event next = null;
  while ((next = eis.poll()) == null);
  return next;
}
项目:FlexMap    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    Event next = null;
    for (int i = 0; i < 10; i++) {
      next = waitForNextEvent(eis);
      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}