/** * * @throws Exception */ @Test public void testReadActivelyUpdatedLog() throws Exception { final TestAppender appender = new TestAppender(); LogManager.getRootLogger().addAppender(appender); Configuration conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); // Set single handler thread, so all transactions hit same thread-local ops. conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); FSImage fsimage = cluster.getNamesystem().getFSImage(); StorageDirectory sd = fsimage.getStorage().getStorageDir(0); final DistributedFileSystem fileSys = cluster.getFileSystem(); DFSInotifyEventInputStream events = fileSys.getInotifyEventStream(); fileSys.mkdirs(new Path("/test")); fileSys.mkdirs(new Path("/test/dir1")); fileSys.delete(new Path("/test/dir1"), true); fsimage.getEditLog().logSync(); fileSys.mkdirs(new Path("/test/dir2")); final File inProgressEdit = NNStorage.getInProgressEditsFile(sd, 1); assertTrue(inProgressEdit.exists()); EditLogFileInputStream elis = new EditLogFileInputStream(inProgressEdit); FSEditLogOp op; long pos = 0; while (true) { op = elis.readOp(); if (op != null && op.opCode != FSEditLogOpCodes.OP_INVALID) { pos = elis.getPosition(); } else { break; } } elis.close(); assertTrue(pos > 0); RandomAccessFile rwf = new RandomAccessFile(inProgressEdit, "rw"); rwf.seek(pos); assertEquals(rwf.readByte(), (byte) -1); rwf.seek(pos + 1); rwf.writeByte(2); rwf.close(); events.poll(); String pattern = "Caught exception after reading (.*) ops"; Pattern r = Pattern.compile(pattern); final List<LoggingEvent> log = appender.getLog(); for (LoggingEvent event : log) { Matcher m = r.matcher(event.getRenderedMessage()); if (m.find()) { fail("Should not try to read past latest syned edit log op"); } } } finally { if (cluster != null) { cluster.shutdown(); } LogManager.getRootLogger().removeAppender(appender); } }
/** * Exposes a stream of namesystem events. Only events occurring after the * stream is created are available. * See {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream} * for information on stream usage. * See {@link org.apache.hadoop.hdfs.inotify.Event} * for information on the available events. * <p/> * Inotify users may want to tune the following HDFS parameters to * ensure that enough extra HDFS edits are saved to support inotify clients * that fall behind the current state of the namespace while reading events. * The default parameter values should generally be reasonable. If edits are * deleted before their corresponding events can be read, clients will see a * {@link org.apache.hadoop.hdfs.inotify.MissingEventsException} on * {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream} method calls. * * It should generally be sufficient to tune these parameters: * dfs.namenode.num.extra.edits.retained * dfs.namenode.max.extra.edits.segments.retained * * Parameters that affect the number of created segments and the number of * edits that are considered necessary, i.e. do not count towards the * dfs.namenode.num.extra.edits.retained quota): * dfs.namenode.checkpoint.period * dfs.namenode.checkpoint.txns * dfs.namenode.num.checkpoints.retained * dfs.ha.log-roll.period * <p/> * It is recommended that local journaling be configured * (dfs.namenode.edits.dir) for inotify (in addition to a shared journal) * so that edit transfers from the shared journal can be avoided. * * @throws IOException If there was an error obtaining the stream. */ public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { return dfs.getInotifyEventStream(); }
/** * A version of {@link HdfsAdmin#getInotifyEventStream()} meant for advanced * users who are aware of HDFS edits up to lastReadTxid (e.g. because they * have access to an FSImage inclusive of lastReadTxid) and only want to read * events after this point. */ public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) throws IOException { return dfs.getInotifyEventStream(lastReadTxid); }