private static InotifyProtos.MetadataUpdateType metadataUpdateTypeConvert( Event.MetadataUpdateEvent.MetadataType type) { switch (type) { case TIMES: return InotifyProtos.MetadataUpdateType.META_TYPE_TIMES; case REPLICATION: return InotifyProtos.MetadataUpdateType.META_TYPE_REPLICATION; case OWNER: return InotifyProtos.MetadataUpdateType.META_TYPE_OWNER; case PERMS: return InotifyProtos.MetadataUpdateType.META_TYPE_PERMS; case ACLS: return InotifyProtos.MetadataUpdateType.META_TYPE_ACLS; case XATTRS: return InotifyProtos.MetadataUpdateType.META_TYPE_XATTRS; default: return null; } }
private static Event.MetadataUpdateEvent.MetadataType metadataUpdateTypeConvert( InotifyProtos.MetadataUpdateType type) { switch (type) { case META_TYPE_TIMES: return Event.MetadataUpdateEvent.MetadataType.TIMES; case META_TYPE_REPLICATION: return Event.MetadataUpdateEvent.MetadataType.REPLICATION; case META_TYPE_OWNER: return Event.MetadataUpdateEvent.MetadataType.OWNER; case META_TYPE_PERMS: return Event.MetadataUpdateEvent.MetadataType.PERMS; case META_TYPE_ACLS: return Event.MetadataUpdateEvent.MetadataType.ACLS; case META_TYPE_XATTRS: return Event.MetadataUpdateEvent.MetadataType.XATTRS; default: return null; } }
static InotifyProtos.MetadataUpdateType metadataUpdateTypeConvert( Event.MetadataUpdateEvent.MetadataType type) { switch (type) { case TIMES: return InotifyProtos.MetadataUpdateType.META_TYPE_TIMES; case REPLICATION: return InotifyProtos.MetadataUpdateType.META_TYPE_REPLICATION; case OWNER: return InotifyProtos.MetadataUpdateType.META_TYPE_OWNER; case PERMS: return InotifyProtos.MetadataUpdateType.META_TYPE_PERMS; case ACLS: return InotifyProtos.MetadataUpdateType.META_TYPE_ACLS; case XATTRS: return InotifyProtos.MetadataUpdateType.META_TYPE_XATTRS; default: return null; } }
/** * Returns the next event in the stream, waiting up to the specified amount of * time for a new event. Returns null if a new event is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. * * @param time number of units of the given TimeUnit to wait * @param tu the desired TimeUnit * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public Event poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; Event next = null; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { LOG.debug("timed poll(): timed out"); break; } else if (timeLeft < nextWait * 2) { nextWait = timeLeft; } else { nextWait *= 2; } LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", nextWait); Thread.sleep(nextWait); } return next; }
/** * Returns the next event in the stream, waiting indefinitely if a new event * is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public Event take() throws IOException, InterruptedException, MissingEventsException { Event next = null; int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 // to avoid stampedes at the NN if there are multiple clients int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); Thread.sleep(sleepTime); // the maximum sleep is 2 minutes nextWaitMin = Math.min(60000, nextWaitMin * 2); } return next; }
private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType type) { switch (type) { case I_TYPE_DIRECTORY: return Event.CreateEvent.INodeType.DIRECTORY; case I_TYPE_FILE: return Event.CreateEvent.INodeType.FILE; case I_TYPE_SYMLINK: return Event.CreateEvent.INodeType.SYMLINK; default: return null; } }
private static InotifyProtos.INodeType createTypeConvert(Event.CreateEvent.INodeType type) { switch (type) { case DIRECTORY: return InotifyProtos.INodeType.I_TYPE_DIRECTORY; case FILE: return InotifyProtos.INodeType.I_TYPE_FILE; case SYMLINK: return InotifyProtos.INodeType.I_TYPE_SYMLINK; default: return null; } }
@Test(timeout = 120000) public void testNNFailover() throws IOException, URISyntaxException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs (cluster.getDfsCluster(), conf)).dfs; DFSInotifyEventInputStream eis = client.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); EventBatch batch = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { batch = waitForNextEvents(eis); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { cluster.shutdown(); } }
@Test(timeout = 120000) public void testReadEventsWithTimeout() throws IOException, InterruptedException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); final DFSClient client = new DFSClient(cluster.getDfsCluster() .getNameNode(0).getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client.getInotifyEventStream(); ScheduledExecutorService ex = Executors .newSingleThreadScheduledExecutor(); ex.schedule(new Runnable() { @Override public void run() { try { client.mkdirs("/dir", null, false); } catch (IOException e) { // test will fail LOG.error("Unable to create /dir", e); } } }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up EventBatch batch = eis.poll(5, TimeUnit.SECONDS); Assert.assertNotNull(batch); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath()); } finally { cluster.shutdown(); } }
static InotifyProtos.INodeType createTypeConvert(Event.CreateEvent.INodeType type) { switch (type) { case DIRECTORY: return InotifyProtos.INodeType.I_TYPE_DIRECTORY; case FILE: return InotifyProtos.INodeType.I_TYPE_FILE; case SYMLINK: return InotifyProtos.INodeType.I_TYPE_SYMLINK; default: return null; } }
private static Event.CreateEvent.INodeType createTypeConvert( InotifyProtos.INodeType type) { switch (type) { case I_TYPE_DIRECTORY: return Event.CreateEvent.INodeType.DIRECTORY; case I_TYPE_FILE: return Event.CreateEvent.INodeType.FILE; case I_TYPE_SYMLINK: return Event.CreateEvent.INodeType.SYMLINK; default: return null; } }
/** * Returns the next event in the stream or null if no new events are currently * available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next event in the * stream because the data for the event (and possibly some subsequent events) * has been deleted (generally because this stream is a very large number of * events behind the current state of the NameNode). It is safe to continue * reading from the stream after this exception is thrown -- the next * available event will be returned. */ public Event poll() throws IOException, MissingEventsException { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getEvents().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } }
@Test(timeout = 120000) public void testNNFailover() throws IOException, URISyntaxException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs (cluster.getDfsCluster(), conf)).dfs; DFSInotifyEventInputStream eis = client.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); Event next = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { next = waitForNextEvent(eis); Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { cluster.shutdown(); } }
@Test(timeout = 120000) public void testReadEventsWithTimeout() throws IOException, InterruptedException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); final DFSClient client = new DFSClient(cluster.getDfsCluster() .getNameNode(0).getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client.getInotifyEventStream(); ScheduledExecutorService ex = Executors .newSingleThreadScheduledExecutor(); ex.schedule(new Runnable() { @Override public void run() { try { client.mkdirs("/dir", null, false); } catch (IOException e) { // test will fail LOG.error("Unable to create /dir", e); } } }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up Event next = eis.poll(5, TimeUnit.SECONDS); Assert.assertTrue(next != null); Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir")); } finally { cluster.shutdown(); } }
@Test public void testPreserveEditLogs() throws Exception { conf = new HdfsConfiguration(); conf = UpgradeUtilities.initializeStorageStateConf(1, conf); String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false); log("Normal NameNode upgrade", 1); File[] created = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current"); for (final File createdDir : created) { List<String> fileNameList = IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE); for (String fileName : fileNameList) { String tmpFileName = fileName + ".tmp"; File existingFile = new File(createdDir, fileName); File tmpFile = new File(createdDir, tmpFileName); Files.move(existingFile.toPath(), tmpFile.toPath()); File newFile = new File(createdDir, fileName); Preconditions.checkState(newFile.createNewFile(), "Cannot create new edits log file in " + createdDir); EditLogFileInputStream in = new EditLogFileInputStream(tmpFile, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false); EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile, (int)tmpFile.length()); out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1); FSEditLogOp logOp = in.readOp(); while (logOp != null) { out.write(logOp); logOp = in.readOp(); } out.setReadyToFlush(); out.flushAndSync(true); out.close(); Files.delete(tmpFile.toPath()); } } cluster = createCluster(); DFSInotifyEventInputStream ieis = cluster.getFileSystem().getInotifyEventStream(0); EventBatch batch = ieis.poll(); Event[] events = batch.getEvents(); assertTrue("Should be able to get transactions before the upgrade.", events.length > 0); assertEquals(events[0].getEventType(), Event.EventType.CREATE); assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade"); cluster.shutdown(); UpgradeUtilities.createEmptyDirs(nameNodeDirs); }
@Test(timeout = 120000) public void testTwoActiveNNs() throws IOException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0) .getNameNodeAddress(), conf); DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1) .getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client0.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client0.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().transitionToActive(1); for (int i = 10; i < 20; i++) { client1.mkdirs("/dir" + i, null, false); } // make sure that the old active can't read any further than the edits // it logged itself (it has no idea whether the in-progress edits from // the other writer have actually been committed) EventBatch batch = null; for (int i = 0; i < 10; i++) { batch = waitForNextEvents(eis); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { try { cluster.shutdown(); } catch (ExitUtil.ExitException e) { // expected because the old active will be unable to flush the // end-of-segment op since it is fenced } } }
public static EventsList convert(GetEditsFromTxidResponseProto resp) throws IOException { List<Event> events = Lists.newArrayList(); for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) { switch(p.getType()) { case EVENT_CLOSE: InotifyProtos.CloseEventProto close = InotifyProtos.CloseEventProto.parseFrom(p.getContents()); events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(), close.getTimestamp())); break; case EVENT_CREATE: InotifyProtos.CreateEventProto create = InotifyProtos.CreateEventProto.parseFrom(p.getContents()); events.add(new Event.CreateEvent.Builder() .iNodeType(createTypeConvert(create.getType())) .path(create.getPath()) .ctime(create.getCtime()) .ownerName(create.getOwnerName()) .groupName(create.getGroupName()) .perms(convert(create.getPerms())) .replication(create.getReplication()) .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : create.getSymlinkTarget()) .overwrite(create.getOverwrite()).build()); break; case EVENT_METADATA: InotifyProtos.MetadataUpdateEventProto meta = InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents()); events.add(new Event.MetadataUpdateEvent.Builder() .path(meta.getPath()) .metadataType(metadataUpdateTypeConvert(meta.getType())) .mtime(meta.getMtime()) .atime(meta.getAtime()) .replication(meta.getReplication()) .ownerName( meta.getOwnerName().isEmpty() ? null : meta.getOwnerName()) .groupName( meta.getGroupName().isEmpty() ? null : meta.getGroupName()) .perms(meta.hasPerms() ? convert(meta.getPerms()) : null) .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry( meta.getAclsList())) .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs( meta.getXAttrsList())) .xAttrsRemoved(meta.getXAttrsRemoved()) .build()); break; case EVENT_RENAME: InotifyProtos.RenameEventProto rename = InotifyProtos.RenameEventProto.parseFrom(p.getContents()); events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(), rename.getTimestamp())); break; case EVENT_APPEND: InotifyProtos.AppendEventProto reopen = InotifyProtos.AppendEventProto.parseFrom(p.getContents()); events.add(new Event.AppendEvent(reopen.getPath())); break; case EVENT_UNLINK: InotifyProtos.UnlinkEventProto unlink = InotifyProtos.UnlinkEventProto.parseFrom(p.getContents()); events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp())); break; default: throw new RuntimeException("Unexpected inotify event type: " + p.getType()); } } return new EventsList(events, resp.getEventsList().getFirstTxid(), resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid()); }