/** * Parse a single hlog and put the edits in entryBuffers * * @param in the hlog reader * @param path the path of the log file * @param entryBuffers the buffer to hold the parsed edits * @param fs the file system * @param conf the configuration * @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException * @throws IOException * @throws CorruptedLogFileException if hlog is corrupted */ private void parseHLog(final Reader in, Path path, EntryBuffers entryBuffers, final FileSystem fs, final Configuration conf, boolean skipErrors) throws IOException, CorruptedLogFileException { int editsCount = 0; try { Entry entry; while ((entry = getNextLogLine(in, path, skipErrors)) != null) { entryBuffers.appendEntry(entry); editsCount++; } } catch (InterruptedException ie) { IOException t = new InterruptedIOException(); t.initCause(ie); throw t; } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); } }
/** * Append a log entry into the corresponding region buffer. * Blocks if the total heap usage has crossed the specified threshold. * * @throws InterruptedException * @throws IOException */ void appendEntry(Entry entry) throws InterruptedException, IOException { HLogKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); if (buffer == null) { buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); buffers.put(key.getEncodedRegionName(), buffer); } incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available synchronized (dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); dataAvailable.wait(3000); } dataAvailable.notifyAll(); } checkForErrors(); }
synchronized RegionEntryBuffer getChunkToWrite() { long biggestSize=0; byte[] biggestBufferKey=null; for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { long size = entry.getValue().heapSize(); if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) { biggestSize = size; biggestBufferKey = entry.getKey(); } } if (biggestBufferKey == null) { return null; } RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); currentlyWriting.add(biggestBufferKey); return buffer; }
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs, Configuration conf) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!HBaseFileSystem.deleteFileFromFileSystem(fs, regionedits)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(fs, regionedits, conf); LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); return (new WriterAndPath(regionedits, w)); }
/** * Get a writer and path for a log starting at the given entry. * * This function is threadsafe so long as multiple threads are always * acting on different regions. * * @return null if this region shouldn't output any logs */ WriterAndPath getWriterAndPath(Entry entry) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); WriterAndPath ret = logWriters.get(region); if (ret != null) { return ret; } // If we already decided that this region doesn't get any output // we don't need to check again. if (blacklistedRegions.contains(region)) { return null; } ret = createWAP(region, entry, rootDir, fs, conf); if (ret == null) { blacklistedRegions.add(region); return null; } logWriters.put(region, ret); return ret; }
private static void transformFile(Path input, Path output) throws IOException { SequenceFileLogReader in = new SequenceFileLogReader(); SequenceFileLogWriter out = new SequenceFileLogWriter(); try { Configuration conf = HBaseConfiguration.create(); FileSystem inFS = input.getFileSystem(conf); FileSystem outFS = output.getFileSystem(conf); in.init(inFS, input, conf); boolean compress = in.reader.isWALCompressionEnabled(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); out.init(outFS, output, conf); Entry e = null; while ((e = in.next()) != null) out.append(e); } finally { in.close(); out.close(); } }
/** * @throws IOException * @see https://issues.apache.org/jira/browse/HBASE-3020 */ @Test public void testRecoveredEditsPathForMeta() throws IOException { FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); Path tdir = new Path(hbaseDir, Bytes.toString(HConstants.META_TABLE_NAME)); Path regiondir = new Path(tdir, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); long now = System.currentTimeMillis(); HLog.Entry entry = new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); }
/** * Test old recovered edits file doesn't break HLogSplitter. * This is useful in upgrading old instances. */ @Test public void testOldRecoveredEditsFileSidelined() throws IOException { FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); Path tdir = new Path(hbaseDir, Bytes.toString(HConstants.META_TABLE_NAME)); Path regiondir = new Path(tdir, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); long now = System.currentTimeMillis(); HLog.Entry entry = new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path parent = HLog.getRegionDirRecoveredEditsDir(regiondir); assertEquals(parent.getName(), HLog.RECOVERED_EDITS_DIR); fs.createNewFile(parent); // create a recovered.edits file Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); HLog.createWriter(fs, p, conf).close(); }
/** * Verify the content of the WAL file. * Verify that sequenceids are ascending and that the file has expected number * of edits. * @param wal * @return Count of edits. * @throws IOException */ private long verify(final Path wal, final boolean verbose) throws IOException { HLog.Reader reader = HLog.getReader(wal.getFileSystem(getConf()), wal, getConf()); long previousSeqid = -1; long count = 0; try { while (true) { Entry e = reader.next(); if (e == null) break; count++; long seqid = e.getKey().getLogSeqNum(); if (verbose) LOG.info("seqid=" + seqid); if (previousSeqid >= seqid) { throw new IllegalStateException("wal=" + wal.getName() + ", previousSeqid=" + previousSeqid + ", seqid=" + seqid); } previousSeqid = seqid; } } finally { reader.close(); } return count; }
/** * Append a log entry into the corresponding region buffer. * Blocks if the total heap usage has crossed the specified threshold. * * @throws InterruptedException * @throws IOException */ void appendEntry(Entry entry) throws InterruptedException, IOException { HLogKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); if (buffer == null) { buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); buffers.put(key.getEncodedRegionName(), buffer); } incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available synchronized (dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); dataAvailable.wait(2000); } dataAvailable.notifyAll(); } checkForErrors(); }
/** * @return RegionEntryBuffer a buffer of edits to be written or replayed. */ synchronized RegionEntryBuffer getChunkToWrite() { long biggestSize = 0; byte[] biggestBufferKey = null; for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { long size = entry.getValue().heapSize(); if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { biggestSize = size; biggestBufferKey = entry.getKey(); } } if (biggestBufferKey == null) { return null; } RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); currentlyWriting.add(biggestBufferKey); return buffer; }
/** * Get a writer and path for a log starting at the given entry. This function is threadsafe so * long as multiple threads are always acting on different regions. * @return null if this region shouldn't output any logs */ private WriterAndPath getWriterAndPath(Entry entry) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); WriterAndPath ret = (WriterAndPath) writers.get(region); if (ret != null) { return ret; } // If we already decided that this region doesn't get any output // we don't need to check again. if (blacklistedRegions.contains(region)) { return null; } ret = createWAP(region, entry, rootDir, fs, conf); if (ret == null) { blacklistedRegions.add(region); return null; } writers.put(region, ret); return ret; }
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs, Configuration conf) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(fs, regionedits, conf); LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); return (new WriterAndPath(regionedits, w)); }
private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions) throws IOException { RegionServerWriter rsw = null; long startTime = System.nanoTime(); try { rsw = getRegionServerWriter(key); rsw.sink.replayEntries(actions); // Pass along summary statistics rsw.incrementEdits(actions.size()); rsw.incrementNanoTime(System.nanoTime() - startTime); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.fatal(" Got while writing log entry to log", e); throw e; } }
@Override protected boolean flush() throws IOException { String curLoc = null; int curSize = 0; List<Pair<HRegionLocation, HLog.Entry>> curQueue = null; synchronized (this.serverToBufferQueueMap) { for (String locationKey : this.serverToBufferQueueMap.keySet()) { curQueue = this.serverToBufferQueueMap.get(locationKey); if (!curQueue.isEmpty()) { curSize = curQueue.size(); curLoc = locationKey; break; } } if (curSize > 0) { this.serverToBufferQueueMap.remove(curLoc); } } if (curSize > 0) { this.processWorkItems(curLoc, curQueue); dataAvailable.notifyAll(); return true; } return false; }
/** * @throws IOException * @see https://issues.apache.org/jira/browse/HBASE-3020 */ @Test (timeout=300000) public void testRecoveredEditsPathForMeta() throws IOException { FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); Path regiondir = new Path(tdir, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); long now = System.currentTimeMillis(); HLog.Entry entry = new HLog.Entry(new HLogKey(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); }
/** * Test old recovered edits file doesn't break HLogSplitter. * This is useful in upgrading old instances. */ @Test (timeout=300000) public void testOldRecoveredEditsFileSidelined() throws IOException { FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); Path regiondir = new Path(tdir, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); long now = System.currentTimeMillis(); HLog.Entry entry = new HLog.Entry(new HLogKey(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path parent = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR); fs.createNewFile(parent); // create a recovered.edits file Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); HLogFactory.createRecoveredEditsWriter(fs, p, conf).close(); }
/** * Path to a file under RECOVERED_EDITS_DIR directory of the region found in * <code>logEntry</code> named for the sequenceid in the passed * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332. * This method also ensures existence of RECOVERED_EDITS_DIR under the region * creating it if necessary. * @param fs * @param logEntry * @param rootDir HBase root dir. * @return Path to file into which to dump split log edits. * @throws IOException */ static Path getRegionSplitEditsPath(final FileSystem fs, final Entry logEntry, final Path rootDir, boolean isCreate) throws IOException { Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey() .getTablename()); Path regiondir = HRegion.getRegionDir(tableDir, Bytes.toString(logEntry.getKey().getEncodedRegionName())); Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir); if (!fs.exists(regiondir)) { LOG.info("This region's directory doesn't exist: " + regiondir.toString() + ". It is very likely that it was" + " already split so it's safe to discard those edits."); return null; } if (isCreate && !fs.exists(dir)) { if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); } // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure // region's replayRecoveredEdits will not delete it String fileName = formatRecoveredEditsFileName(logEntry.getKey() .getLogSeqNum()); fileName = getTmpRecoveredEditsFileName(fileName); return new Path(dir, fileName); }
private void parseHLog(final Reader in, Path path, EntryBuffers entryBuffers, final FileSystem fs, final Configuration conf, boolean skipErrors) throws IOException, CorruptedLogFileException { int editsCount = 0; try { Entry entry; while ((entry = getNextLogLine(in, path, skipErrors)) != null) { entryBuffers.appendEntry(entry); editsCount++; } } catch (InterruptedException ie) { IOException t = new InterruptedIOException(); t.initCause(ie); throw t; } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); } }
/** * Get a writer and path for a log starting at the given entry. * * This function is threadsafe so long as multiple threads are always * acting on different regions. * * @return null if this region shouldn't output any logs */ WriterAndPath getWriterAndPath(Entry entry) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); WriterAndPath ret = logWriters.get(region); if (ret != null) { return ret; } // If we already decided that this region doesn't get any output // we don't need to check again. if (blacklistedRegions.contains(region)) { return null; } ret = createWAP(region, entry, rootDir, null, fs, conf); if (ret == null) { blacklistedRegions.add(region); return null; } logWriters.put(region, ret); return ret; }
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs, Configuration conf) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(fs, regionedits, conf); LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); return (new WriterAndPath(regionedits, w)); }