/** * Test that that writes to an incomplete block are available to a reader */ @Test (timeout = 30000) public void testUnfinishedBlockRead() throws IOException { // create a new file in the root, write data, do no close Path file1 = new Path("/unfinished-block"); FSDataOutputStream stm = TestFileCreation.createFile(fileSystem, file1, 1); // write partial block and sync int partialBlockSize = blockSize / 2; writeFileAndSync(stm, partialBlockSize); // Make sure a client can read it before it is closed checkCanRead(fileSystem, file1, partialBlockSize); stm.close(); }
private static byte[] createFile(FileSystem fs, Path name, long length, short replication, long blocksize) throws IOException { final FSDataOutputStream out = fs.create(name, false, 4096, replication, blocksize); try { for(long n = length; n > 0; ) { ran.nextBytes(buffer); final int w = n < buffer.length? (int)n: buffer.length; out.write(buffer, 0, w); md5.update(buffer, 0, w); n -= w; } } finally { IOUtils.closeStream(out); } return md5.digest(); }
/** * Save a matrix partition to file. * * @param output the output * @param partitionMeta the meta * @throws IOException the io exception */ public void save(DataOutputStream output , ModelPartitionMeta partitionMeta) throws IOException { FSDataOutputStream dataOutputStream = new FSDataOutputStream(output, null, partitionMeta != null ? partitionMeta.getOffset() : 0); dataOutputStream.writeInt(rows.size()); long offset; for (Map.Entry<Integer, ServerRow> entry : rows.entrySet()) { offset = dataOutputStream.getPos(); dataOutputStream.writeInt(entry.getKey()); ServerRow row = entry.getValue(); row.writeTo(dataOutputStream); if (partitionMeta != null) { partitionMeta.setRowMeta(new RowOffset(entry.getKey(), offset)); } } }
private void copyPartitions(Path mapOutputPath, Path indexPath) throws IOException { FileSystem localFs = FileSystem.getLocal(jobConf); FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, BUF_SIZE); SpillRecord spillRecord = new SpillRecord(numberOfPartitions); IndexRecord indexRecord = new IndexRecord(); for (int i = 0; i < numberOfPartitions; i++) { indexRecord.startOffset = rawOutput.getPos(); byte buffer[] = outStreams[i].toByteArray(); IFileOutputStream checksumOutput = new IFileOutputStream(rawOutput); checksumOutput.write(buffer); // Write checksum. checksumOutput.finish(); // Write index record indexRecord.rawLength = (long)buffer.length; indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset; spillRecord.putIndex(indexRecord, i); reporter.progress(); } rawOutput.close(); spillRecord.writeToFile(indexPath, jobConf); }
@Override public void run() { System.out.println("Workload starting "); for (int i = 0; i < numberOfFiles; i++) { Path filename = new Path(id + "." + i); try { System.out.println("Workload processing file " + filename); FSDataOutputStream stm = createFile(fs, filename, replication); DFSOutputStream dfstream = (DFSOutputStream) (stm.getWrappedStream()); dfstream.setArtificialSlowdown(1000); writeFile(stm, myseed); stm.close(); checkFile(fs, filename, replication, numBlocks, fileSize, myseed); } catch (Throwable e) { System.out.println("Workload exception " + e); assertTrue(e.toString(), false); } // increment the stamp to indicate that another file is done. synchronized (this) { stamp++; } } }
public void makeRenamePending(FileFolder dst) throws IOException { // Propose (but don't do) the rename. Path home = fs.getHomeDirectory(); String relativeHomeDir = getRelativePath(home.toString()); NativeAzureFileSystem.FolderRenamePending pending = new NativeAzureFileSystem.FolderRenamePending( relativeHomeDir + "/" + this.getName(), relativeHomeDir + "/" + dst.getName(), null, (NativeAzureFileSystem) fs); // Get the rename pending file contents. String renameDescription = pending.makeRenamePendingFileContents(); // Create a rename-pending file and write rename information to it. final String renamePendingStr = this.getName() + "-RenamePending.json"; Path renamePendingFile = new Path(renamePendingStr); FSDataOutputStream out = fs.create(renamePendingFile, true); assertTrue(out != null); writeString(out, renameDescription); }
@Test public void testRewritingClusterIdToPB() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); TEST_UTIL.createRootDir(); TEST_UTIL.getConfiguration().setBoolean("hbase.replication", true); Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); FileSystem fs = rootDir.getFileSystem(TEST_UTIL.getConfiguration()); Path filePath = new Path(rootDir, HConstants.CLUSTER_ID_FILE_NAME); FSDataOutputStream s = null; try { s = fs.create(filePath); s.writeUTF(UUID.randomUUID().toString()); } finally { if (s != null) { s.close(); } } TEST_UTIL.startMiniHBaseCluster(1, 1); HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); assertEquals(1, master.getServerManager().getOnlineServersList().size()); }
FSDataOutputStream create(PathData item, boolean lazyPersist) throws IOException { try { if (lazyPersist) { EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST); return create(item.path, FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(getConf())), createFlags, getConf().getInt("io.file.buffer.size", 4096), lazyPersist ? 1 : getDefaultReplication(item.path), getDefaultBlockSize(), null, null); } else { return create(item.path, true); } } finally { // might have been created but stream was interrupted deleteOnExit(item.path); } }
public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) throws IOException { long offset = stream.getPos(); // Write EOF Entry ProcedureWALEntry.newBuilder() .setType(ProcedureWALEntry.Type.EOF) .build().writeDelimitedTo(stream); // Write Tracker tracker.writeTo(stream); stream.write(TRAILER_VERSION); StreamUtils.writeLong(stream, TRAILER_MAGIC); StreamUtils.writeLong(stream, offset); }
@Override public FSDataOutputStream createInternal(final Path f, final EnumSet<CreateFlag> flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException { InodeTree.ResolveResult<AbstractFileSystem> res; try { res = fsState.resolve(getUriPath(f), false); } catch (FileNotFoundException e) { if (createParent) { throw readOnlyMountTable("create", f); } else { throw e; } } assert(res.remainingPath != null); return res.targetFileSystem.createInternal(res.remainingPath, flag, absolutePermission, bufferSize, replication, blockSize, progress, checksumOpt, createParent); }
/** Try openning a file for append. */ private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception { for(int i = 0; i < 10; i++) { try { return fs.append(p); } catch(RemoteException re) { if (re.getClassName().equals(RecoveryInProgressException.class.getName())) { AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re); Thread.sleep(1000); } else throw re; } } throw new IOException("Cannot append to " + p); }
/** * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the * data buffer required for the stream is specified by the * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * * @param conf * @param out * @return FSDataOutputStream * @throws IOException */ public static FSDataOutputStream wrapIfNecessary(Configuration conf, FSDataOutputStream out) throws IOException { if (isEncryptedSpillEnabled(conf)) { out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); byte[] iv = createIV(conf); out.write(iv); if (LOG.isDebugEnabled()) { LOG.debug("IV written to Stream [" + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf), getBufferSize(conf), getEncryptionKey(), iv); } else { return out; } }
/** * Write the snapshot description into the working directory of a snapshot * @param snapshot description of the snapshot being taken * @param workingDir working directory of the snapshot * @param fs {@link FileSystem} on which the snapshot should be taken * @throws IOException if we can't reach the filesystem and the file cannot be cleaned up on * failure */ public static void writeSnapshotInfo(SnapshotDescription snapshot, Path workingDir, FileSystem fs) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, fs.getConf(), HConstants.DATA_FILE_UMASK_KEY); Path snapshotInfo = new Path(workingDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE); try { FSDataOutputStream out = FSUtils.create(fs, snapshotInfo, perms, true); try { snapshot.writeTo(out); } finally { out.close(); } } catch (IOException e) { // if we get an exception, try to remove the snapshot info if (!fs.delete(snapshotInfo, false)) { String msg = "Couldn't delete snapshot info file: " + snapshotInfo; LOG.error(msg); throw new IOException(msg); } } }
@Test public void testCreatedFileIsImmediatelyVisible() throws Throwable { describe("verify that a newly created file exists as soon as open returns"); Path path = path("testCreatedFileIsImmediatelyVisible"); FSDataOutputStream out = null; try { out = getFileSystem().create(path, false, 4096, (short) 1, 1024); if (!getFileSystem().exists(path)) { if (isSupported(IS_BLOBSTORE)) { // object store: downgrade to a skip so that the failure is visible // in test results skip("Filesystem is an object store and newly created files are not immediately visible"); } assertPathExists("expected path to be visible before anything written", path); } } finally { IOUtils.closeStream(out); } }
@Parameters(name = "method: {0}") public static Object[] methodsToTest() { List<Method> methods = FluentIterable .of(FSDataOutputStream.class.getDeclaredMethods()) .filter(new Predicate<Method>() { @Override public boolean apply(Method input) { if (Modifier.isStatic(input.getModifiers())) { return false; } if (!Modifier.isPublic(input.getModifiers())) { return false; } return Arrays.asList(input.getExceptionTypes()).contains(IOException.class); } }).toList(); return methods.toArray(); }
void createFile(int count, String compress) throws IOException { conf = new Configuration(); path = new Path(ROOT, outputFile + "." + compress); fs = path.getFileSystem(conf); FSDataOutputStream out = fs.create(path); Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf); int nx; for (nx = 0; nx < count; nx++) { byte[] key = composeSortedKey(KEY, count, nx).getBytes(); byte[] value = (VALUE + nx).getBytes(); writer.append(key, value); } writer.close(); out.close(); }
@Test public void testVLongByte() throws IOException { FSDataOutputStream out = fs.create(path); for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; ++i) { Utils.writeVLong(out, i); } out.close(); Assert.assertEquals("Incorrect encoded size", (1 << Byte.SIZE) + 96, fs .getFileStatus( path).getLen()); FSDataInputStream in = fs.open(path); for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; ++i) { long n = Utils.readVLong(in); Assert.assertEquals(n, i); } in.close(); fs.delete(path, false); }
private long writeAndVerify(int shift) throws IOException { FSDataOutputStream out = fs.create(path); for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) { Utils.writeVLong(out, ((long) i) << shift); } out.close(); FSDataInputStream in = fs.open(path); for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) { long n = Utils.readVLong(in); Assert.assertEquals(n, ((long) i) << shift); } in.close(); long ret = fs.getFileStatus(path).getLen(); fs.delete(path, false); return ret; }
/** * The idea for making sure that there is no more than one instance * running in an HDFS is to create a file in the HDFS, writes the hostname * of the machine on which the instance is running to the file, but did not * close the file until it exits. * * This prevents the second instance from running because it can not * creates the file while the first one is running. * * This method checks if there is any running instance. If no, mark yes. * Note that this is an atomic operation. * * @return null if there is a running instance; * otherwise, the output stream to the newly created file. */ private OutputStream checkAndMarkRunning() throws IOException { try { if (fs.exists(idPath)) { // try appending to it so that it will fail fast if another balancer is // running. IOUtils.closeStream(fs.append(idPath)); fs.delete(idPath, true); } final FSDataOutputStream fsout = fs.create(idPath, false); // mark balancer idPath to be deleted during filesystem closure fs.deleteOnExit(idPath); if (write2IdFile) { fsout.writeBytes(InetAddress.getLocalHost().getHostName()); fsout.hflush(); } return fsout; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; } else { throw e; } } }
/** * Create an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param permission * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. * @param blockSize * @param progress * @throws IOException * @see #setPermission(Path, FsPermission) */ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { String key = pathToKey(f); if (!overwrite && exists(f)) { throw new FileAlreadyExistsException(f + " already exists"); } if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, key, progress, statistics, cannedACL, serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold, threadPoolExecutor), statistics); } // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, bucket, key, progress, cannedACL, statistics, serverSideEncryptionAlgorithm), null); }
@Test public void testAvailable() throws IOException { // write FILE_SIZE bytes to page blob FSDataOutputStream out = fs.create(PATH); byte[] data = new byte[FILE_SIZE]; Arrays.fill(data, (byte) 5); out.write(data, 0, FILE_SIZE); out.close(); // Test available() for different read sizes verifyAvailable(1); verifyAvailable(100); verifyAvailable(5000); verifyAvailable(FILE_SIZE); verifyAvailable(MAX_STRIDE); fs.delete(PATH, false); }
private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name, short repl) throws IOException { // create and write a file that contains three blocks of data FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), repl, blockSize); byte[] buffer = new byte[fileSize]; Random rand = new Random(seed); rand.nextBytes(buffer); stm.write(buffer); // need to make sure that we actually write out both file blocks // (see FSOutputSummer#flush) stm.flush(); // Do not close stream, return it // so that it is not garbage collected return stm; }
private void metablocks(final String compress) throws Exception { if (cacheConf == null) cacheConf = new CacheConfig(conf); Path mFile = new Path(ROOT_DIR, "meta.hfile"); FSDataOutputStream fout = createFSOutput(mFile); HFileContext meta = new HFileContextBuilder() .withCompression(AbstractHFileWriter.compressionByName(compress)) .withBlockSize(minBlockSize).build(); Writer writer = HFile.getWriterFactory(conf, cacheConf) .withOutputStream(fout) .withFileContext(meta) .create(); someTestingWithMetaBlock(writer); writer.close(); fout.close(); FSDataInputStream fin = fs.open(mFile); Reader reader = HFile.createReaderFromStream(mFile, fs.open(mFile), this.fs.getFileStatus(mFile).getLen(), cacheConf, conf); reader.loadFileInfo(); // No data -- this should return false. assertFalse(reader.getScanner(false, false).seekTo()); someReadingWithMetaBlock(reader); fs.delete(mFile, true); reader.close(); fin.close(); }
@Override public FSDataOutputStream append(Path f) throws IOException { try { return newFSDataOutputStreamWrapper(underlyingFs.append(f)); } catch(FSError e) { throw propagateFSError(e); } }
private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) throws IOException { if (fs.exists(name)) { fs.delete(name, true); } FSDataOutputStream fout = fs.create(name); return fout; }
private int writeBlock(FSDataOutputStream os, HFileContext fileContext, int size) throws IOException { HFileBlock.Writer hbw = new HFileBlock.Writer(null, fileContext); DataOutputStream dos = hbw.startWriting(BlockType.DATA); for (int j = 0; j < size; j++) { dos.writeInt(j); } hbw.writeHeaderAndData(os); LOG.info("Wrote a block at " + os.getPos() + " with" + " onDiskSizeWithHeader=" + hbw.getOnDiskSizeWithHeader() + " uncompressedSizeWithoutHeader=" + hbw.getOnDiskSizeWithoutHeader() + " uncompressedSizeWithoutHeader=" + hbw.getUncompressedSizeWithoutHeader()); return hbw.getOnDiskSizeWithHeader(); }
/** * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records * the offset of this block so that it can be referenced in the next block * of the same type. * * @param out * @throws IOException */ public void writeHeaderAndData(FSDataOutputStream out) throws IOException { long offset = out.getPos(); if (startOffset != -1 && offset != startOffset) { throw new IOException("A " + blockType + " block written to a " + "stream twice, first at offset " + startOffset + ", then at " + offset); } startOffset = offset; writeHeaderAndData((DataOutputStream) out); }
/** * tests functionality for big files ( > 5Gb) upload */ @Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT) public void testFilePartUpload() throws Throwable { final Path path = new Path("/test/testFilePartUpload"); int len = 8192; final byte[] src = SwiftTestUtils.dataset(len, 32, 144); FSDataOutputStream out = fs.create(path, false, getBufferSize(), (short) 1, BLOCK_SIZE); try { int totalPartitionsToWrite = len / PART_SIZE_BYTES; assertPartitionsWritten("Startup", out, 0); //write 2048 int firstWriteLen = 2048; out.write(src, 0, firstWriteLen); //assert long expected = getExpectedPartitionsWritten(firstWriteLen, PART_SIZE_BYTES, false); SwiftUtils.debug(LOG, "First write: predict %d partitions written", expected); assertPartitionsWritten("First write completed", out, expected); //write the rest int remainder = len - firstWriteLen; SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder); out.write(src, firstWriteLen, remainder); expected = getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false); assertPartitionsWritten("Remaining data", out, expected); out.close(); expected = getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true); assertPartitionsWritten("Stream closed", out, expected); Header[] headers = fs.getStore().getObjectHeaders(path, true); for (Header header : headers) { LOG.info(header.toString()); } byte[] dest = readDataset(fs, path, len); LOG.info("Read dataset from " + path + ": data length =" + len); //compare data SwiftTestUtils.compareByteArrays(src, dest, len); FileStatus status; final Path qualifiedPath = path.makeQualified(fs); status = fs.getFileStatus(qualifiedPath); //now see what block location info comes back. //This will vary depending on the Swift version, so the results //aren't checked -merely that the test actually worked BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len); assertNotNull("Null getFileBlockLocations()", locations); assertTrue("empty array returned for getFileBlockLocations()", locations.length > 0); //last bit of test -which seems to play up on partitions, which we download //to a skip try { validatePathLen(path, len); } catch (AssertionError e) { //downgrade to a skip throw new AssumptionViolatedException(e, null); } } finally { IOUtils.closeStream(out); } }
@Test(timeout=300000) public void testFileSizeExtension() throws IOException { final int writeSize = 1024 * 1024; final int numWrites = 129; final byte dataByte = 5; byte[] data = new byte[writeSize]; Arrays.fill(data, dataByte); FSDataOutputStream output = fs.create(PATH); try { for (int i = 0; i < numWrites; i++) { output.write(data); output.hflush(); LOG.debug("total writes = " + (i + 1)); } } finally { output.close(); } // Show that we wrote more than the default page blob file size. assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE); // Verify we can list the new size. That will prove we expanded the file. FileStatus[] status = fs.listStatus(PATH); assertTrue(status[0].getLen() == numWrites * writeSize); LOG.debug("Total bytes written to " + PATH + " = " + status[0].getLen()); fs.delete(PATH, false); }
@Test /** Abandon a block while creating a file */ public void testAbandonBlock() throws IOException { String src = FILE_NAME_PREFIX + "foo"; // Start writing a file but do not close it FSDataOutputStream fout = fs.create(new Path(src), true, 4096, (short)1, 512L); for (int i = 0; i < 1024; i++) { fout.write(123); } fout.hflush(); long fileId = ((DFSOutputStream)fout.getWrappedStream()).getFileId(); // Now abandon the last block DFSClient dfsclient = DFSClientAdapter.getDFSClient(fs); LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE); int orginalNumBlocks = blocks.locatedBlockCount(); LocatedBlock b = blocks.getLastLocatedBlock(); dfsclient.getNamenode().abandonBlock(b.getBlock(), fileId, src, dfsclient.clientName); // call abandonBlock again to make sure the operation is idempotent dfsclient.getNamenode().abandonBlock(b.getBlock(), fileId, src, dfsclient.clientName); // And close the file fout.close(); // Close cluster and check the block has been abandoned after restart cluster.restartNameNode(); blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE); Assert.assertEquals("Blocks " + b + " has not been abandoned.", orginalNumBlocks, blocks.locatedBlockCount() + 1); }
@Test public void testPutOpWithRedirect() { Future<String> future1 = contentLengthFuture(redirectResponse); Future<String> future2 = contentLengthFuture(errResponse); try { FSDataOutputStream os = fs.create(p); os.write(new byte[]{0}); os.close(); Assert.fail(); } catch (IOException ioe) {} // expected Assert.assertEquals("0", getContentLength(future1)); Assert.assertEquals("chunked", getContentLength(future2)); }
public static void appendPrefix(Path dir, String prefix, String inputFile, String outputFile) throws IOException { FileSystem fileSystem = dir.getFileSystem(new Configuration()); Path in = new Path(dir, inputFile); Path out = new Path(dir, outputFile); FSDataInputStream fsDataInputStream = fileSystem.open(in); InputStreamReader inputStreamReader = new InputStreamReader(fsDataInputStream); BufferedReader reader = new BufferedReader(inputStreamReader); FSDataOutputStream fsDataOutputStream = fileSystem.create(out); OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fsDataOutputStream); BufferedWriter writer = new BufferedWriter(outputStreamWriter); try { String line; while ((line = reader.readLine()) != null){ String[] keyVal = line.split("\\t"); writer.write(keyVal[0] + "\t" + prefix + keyVal[1] + "\n"); } } finally { reader.close(); inputStreamReader.close(); fsDataInputStream.close(); writer.flush(); writer.close(); } }
@Override public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException { try { return newFSDataOutputStreamWrapper(underlyingFs.create(f, overwrite, bufferSize)); } catch(FSError e) { throw propagateFSError(e); } }
@Test public void testScheduleSameBlock() throws IOException { final Configuration conf = new HdfsConfiguration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(4).build(); try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); final String file = "/testScheduleSameBlock/file"; { final FSDataOutputStream out = dfs.create(new Path(file)); out.writeChars("testScheduleSameBlock"); out.close(); } final Mover mover = newMover(conf); mover.init(); final Mover.Processor processor = mover.new Processor(); final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); final List<MLocation> locations = MLocation.toLocations(lb); final MLocation ml = locations.get(0); final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations); final List<StorageType> storageTypes = new ArrayList<StorageType>( Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT)); Assert.assertTrue(processor.scheduleMoveReplica(db, ml, storageTypes)); Assert.assertFalse(processor.scheduleMoveReplica(db, ml, storageTypes)); } finally { cluster.shutdown(); } }
private void writeInlineBlocks(HFileBlock.Writer hbw, FSDataOutputStream outputStream, HFileBlockIndex.BlockIndexWriter biw, boolean isClosing) throws IOException { while (biw.shouldWriteBlock(isClosing)) { long offset = outputStream.getPos(); biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType())); hbw.writeHeaderAndData(outputStream); biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(), hbw.getUncompressedSizeWithoutHeader()); LOG.info("Wrote an inline index block at " + offset + ", size " + hbw.getOnDiskSizeWithHeader()); } }
@Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { return super.create(fullPath(f), permission, overwrite, bufferSize, replication, blockSize, progress); }
@Override public FSDataOutputStream createInternal(final Path f, final EnumSet<CreateFlag> flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws IOException, UnresolvedLinkException { return myFs.createInternal(fullPath(f), flag, absolutePermission, bufferSize, replication, blockSize, progress, checksumOpt, createParent); }
/** * Regression test for HDFS-2742. The issue in this bug was: * - DN does a block report while file is open. This BR contains * the block in RBW state. * - Standby queues the RBW state in PendingDatanodeMessages * - Standby processes edit logs during failover. Before fixing * this bug, it was mistakenly applying the RBW reported state * after the block had been completed, causing the block to get * marked corrupt. Instead, we should now be applying the RBW * message on OP_ADD, and then the FINALIZED message on OP_CLOSE. */ @Test public void testBlockReportsWhileFileBeingWritten() throws Exception { FSDataOutputStream out = fs.create(TEST_FILE_PATH); try { AppendTestUtil.write(out, 0, 10); out.hflush(); // Block report will include the RBW replica, but will be // queued on the StandbyNode. cluster.triggerBlockReports(); } finally { IOUtils.closeStream(out); } cluster.transitionToStandby(0); cluster.transitionToActive(1); // Verify that no replicas are marked corrupt, and that the // file is readable from the failed-over standby. BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager()); BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager()); assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks()); assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks()); DFSTestUtil.readFile(fs, TEST_FILE_PATH); }
@Override public FSDataOutputStream createInternal(final Path f, final EnumSet<CreateFlag> flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException { throw readOnlyMountTable("create", f); }