FSDataOutputStream create(PathData item, boolean lazyPersist, boolean direct) throws IOException { try { if (lazyPersist) { EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST); return create(item.path, FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(getConf())), createFlags, getConf().getInt("io.file.buffer.size", 4096), lazyPersist ? 1 : getDefaultReplication(item.path), getDefaultBlockSize(), null, null); } else { return create(item.path, true); } } finally { // might have been created but stream was interrupted if (!direct) { deleteOnExit(item.path); } } }
@Override public FSDataOutputStream createInternal(final Path f, final EnumSet<CreateFlag> flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException { InodeTree.ResolveResult<AbstractFileSystem> res; try { res = fsState.resolve(getUriPath(f), false); } catch (FileNotFoundException e) { if (createParent) { throw readOnlyMountTable("create", f); } else { throw e; } } assert(res.remainingPath != null); return res.targetFileSystem.createInternal(res.remainingPath, flag, absolutePermission, bufferSize, replication, blockSize, progress, checksumOpt, createParent); }
/** Create a file with a length of <code>fileSize</code>. * The file is filled with 'a'. */ private void genFile(Path file, long fileSize) throws IOException { long startTime = Time.now(); FSDataOutputStream out = null; try { out = fc.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), CreateOpts.createParent(), CreateOpts.bufferSize(4096), CreateOpts.repFac((short) 3)); executionTime[CREATE] += (Time.now() - startTime); numOfOps[CREATE]++; long i = fileSize; while (i > 0) { long s = Math.min(fileSize, WRITE_CONTENTS.length); out.write(WRITE_CONTENTS, 0, (int) s); i -= s; } startTime = Time.now(); executionTime[WRITE_CLOSE] += (Time.now() - startTime); numOfOps[WRITE_CLOSE]++; } finally { IOUtils.cleanup(LOG, out); } }
public OutputStream open(String path, EnumSet<CreateFlag> flags, short mode, boolean recursive, int bufsize, long stripeSize, int stripeCount, int stripeOffset, String poolName) throws IOException { File f = new File(path); boolean exists = f.exists(); CreateFlag.validate(path, exists, flags); if(!exists) { File parent = f.getParentFile(); if(!(recursive || parent.exists())) { throw new IOException("Parent directory does not exist: " + parent + "."); } else if (!mkdirs(parent.getAbsolutePath(), (short)-1)) { throw new IOException("Mkdirs failed to create " + parent); } // Stripe must be set first. Cannot be set on an existing file. setstripe(path, stripeSize, stripeCount, stripeOffset, poolName); } OutputStream out = new FileOutputStream(f, flags.contains(CreateFlag.APPEND)); if (mode != (short) -1) { chmod(path, mode); } return new BufferedOutputStream(out, bufsize); }
@Override public void close() throws IOException { // Output the result to a file Results in the output dir FileContext fc; try { fc = FileContext.getFileContext(jobConf); } catch (IOException ioe) { System.err.println("Can not initialize the file system: " + ioe.getLocalizedMessage()); return; } FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"), EnumSet.of(CreateFlag.CREATE)); PrintStream out = new PrintStream(o); printResults(out); out.close(); o.close(); }
/** Construct a new output stream for append. */ private DFSOutputStream(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum) throws IOException { this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK); // The last partial block of the file has to be filled. if (!toNewBlock && lastBlock != null) { // indicate that we are appending to an existing block bytesCurBlock = lastBlock.getBlockSize(); streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum); } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); }
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForAppend", src); try { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum); if (favoredNodes != null && favoredNodes.length != 0) { out.streamer.setFavoredNodes(favoredNodes); } out.start(); return out; } finally { scope.close(); } }
/** * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is * a hint to where the namenode should place the file blocks. * The favored nodes hint is not persisted in HDFS. Hence it may be honored * at the creation time only. HDFS could move the blocks during balancing or * replication, to move the blocks from favored nodes. A value of null means * no favored nodes for this create */ public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes)); beginFileLease(result.getFileId(), result); return result; }
/** * Append to an existing file if {@link CreateFlag#APPEND} is present */ private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag, int buffersize, Progressable progress) throws IOException { if (flag.contains(CreateFlag.APPEND)) { HdfsFileStatus stat = getFileInfo(src); if (stat == null) { // No file to append to // New file needs to be created if create option is present if (!flag.contains(CreateFlag.CREATE)) { throw new FileNotFoundException("failed to append to non-existent file " + src + " on client " + clientName); } return null; } return callAppend(src, buffersize, flag, progress, null); } return null; }
/** * Same as {{@link #create(String, FsPermission, EnumSet, short, long, * Progressable, int, ChecksumOpt)} except that the permission * is absolute (ie has already been masked with umask. */ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException, UnresolvedLinkException { checkOpen(); CreateFlag.validate(flag); DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress); if (result == null) { DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, buffersize, checksum, null); } beginFileLease(result.getFileId(), result); return result; }
/** Method to get stream returned by append call */ private DFSOutputStream callAppend(String src, int buffersize, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) throws IOException { CreateFlag.validateForAppend(flag); try { LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, new EnumSetWritable<>(flag, CreateFlag.class)); return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, progress, blkWithStatus.getLastBlock(), blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, SafeModeException.class, DSQuotaExceededException.class, UnsupportedOperationException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); } }
/** * Create a new file entry in the namespace. * * For description of parameters and exceptions thrown see * {@link ClientProtocol#create}, except it returns valid file status upon * success */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, boolean logRetryCache) throws AccessControlException, SafeModeException, FileAlreadyExistsException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { HdfsFileStatus status = null; try { status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; } return status; }
private void onCreate(ChannelHandlerContext ctx) throws IOException, URISyntaxException { writeContinueHeader(ctx); final String nnId = params.namenodeId(); final int bufferSize = params.bufferSize(); final short replication = params.replication(); final long blockSize = params.blockSize(); final FsPermission permission = params.permission(); EnumSet<CreateFlag> flags = params.overwrite() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE); final DFSClient dfsClient = newDfsClient(nnId, confForCreate); OutputStream out = dfsClient.createWrappedOutputStream(dfsClient.create( path, permission, flags, replication, blockSize, null, bufferSize, null), null); DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, CREATED); final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null); resp.headers().set(LOCATION, uri.toString()); resp.headers().set(CONTENT_LENGTH, 0); ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(), new HdfsWriter(dfsClient, out, resp)); }
public static int convertCreateFlag(EnumSetWritable<CreateFlag> flag) { int value = 0; if (flag.contains(CreateFlag.APPEND)) { value |= CreateFlagProto.APPEND.getNumber(); } if (flag.contains(CreateFlag.CREATE)) { value |= CreateFlagProto.CREATE.getNumber(); } if (flag.contains(CreateFlag.OVERWRITE)) { value |= CreateFlagProto.OVERWRITE.getNumber(); } if (flag.contains(CreateFlag.LAZY_PERSIST)) { value |= CreateFlagProto.LAZY_PERSIST.getNumber(); } if (flag.contains(CreateFlag.NEW_BLOCK)) { value |= CreateFlagProto.NEW_BLOCK.getNumber(); } return value; }
public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) { EnumSet<CreateFlag> result = EnumSet.noneOf(CreateFlag.class); if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) { result.add(CreateFlag.APPEND); } if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) { result.add(CreateFlag.CREATE); } if ((flag & CreateFlagProto.OVERWRITE_VALUE) == CreateFlagProto.OVERWRITE_VALUE) { result.add(CreateFlag.OVERWRITE); } if ((flag & CreateFlagProto.LAZY_PERSIST_VALUE) == CreateFlagProto.LAZY_PERSIST_VALUE) { result.add(CreateFlag.LAZY_PERSIST); } if ((flag & CreateFlagProto.NEW_BLOCK_VALUE) == CreateFlagProto.NEW_BLOCK_VALUE) { result.add(CreateFlag.NEW_BLOCK); } return new EnumSetWritable<CreateFlag>(result, CreateFlag.class); }
@Override public AppendResponseProto append(RpcController controller, AppendRequestProto req) throws ServiceException { try { EnumSetWritable<CreateFlag> flags = req.hasFlag() ? PBHelper.convertCreateFlag(req.getFlag()) : new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)); LastBlockWithStatus result = server.append(req.getSrc(), req.getClientName(), flags); AppendResponseProto.Builder builder = AppendResponseProto.newBuilder(); if (result.getLastBlock() != null) { builder.setBlock(PBHelper.convert(result.getLastBlock())); } if (result.getFileStatus() != null) { builder.setStat(PBHelper.convert(result.getFileStatus())); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
@Override public LastBlockWithStatus append(String src, String clientName, EnumSetWritable<CreateFlag> flag) throws AccessControlException, DSQuotaExceededException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src) .setClientName(clientName).setFlag(PBHelper.convertCreateFlag(flag)) .build(); try { AppendResponseProto res = rpcProxy.append(null, req); LocatedBlock lastBlock = res.hasBlock() ? PBHelper .convert(res.getBlock()) : null; HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat()) : null; return new LastBlockWithStatus(lastBlock, stat); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND), null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
@Test(timeout = 180000) public void testFavoredNodesEndToEndForAppend() throws Exception { // create 10 files with random preferred nodes for (int i = 0; i < NUM_FILES; i++) { Random rand = new Random(System.currentTimeMillis() + i); // pass a new created rand so as to get a uniform distribution each time // without too much collisions (look at the do-while loop in getDatanodes) InetSocketAddress datanode[] = getDatanodes(rand); Path p = new Path("/filename" + i); // create and close the file. dfs.create(p, FsPermission.getDefault(), true, 4096, (short) 3, 4096L, null, null).close(); // re-open for append FSDataOutputStream out = dfs.append(p, EnumSet.of(CreateFlag.APPEND), 4096, null, datanode); out.write(SOME_BYTES); out.close(); BlockLocation[] locations = getBlockLocations(p); // verify the files got created in the right nodes for (BlockLocation loc : locations) { String[] hosts = loc.getNames(); String[] hosts1 = getStringForInetSocketAddrs(datanode); assertTrue(compareNodes(hosts, hosts1)); } } }
/** * Do file create. */ @Override long executeOp(int daemonId, int inputIdx, String clientName) throws IOException { long start = Time.now(); // dummyActionNoSynch(fileIdx); nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), clientName, new EnumSetWritable<CreateFlag>(EnumSet .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE, null); long end = Time.now(); for(boolean written = !closeUponCreate; !written; written = nameNodeProto.complete(fileNames[daemonId][inputIdx], clientName, null, INodeId.GRANDFATHER_INODE_ID)); return end-start; }
/** * Test for create file */ @Test public void testCreate() throws Exception { String src = "/testNamenodeRetryCache/testCreate/file"; // Two retried calls succeed newCall(); HdfsFileStatus status = nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null); Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null)); Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null)); // A non-retried call fails newCall(); try { nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null); Assert.fail("testCreate - expected exception is not thrown"); } catch (IOException e) { // expected } }
@Test public void testAddBlockRetryShouldReturnBlockWithLocations() throws Exception { final String src = "/testAddBlockRetryShouldReturnBlockWithLocations"; NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 3, 1024, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); }
private void testPlacement(String clientMachine, String clientRack) throws IOException { // write 5 files and check whether all times block placed for (int i = 0; i < 5; i++) { String src = "/test-" + i; // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); if (clientRack != null) { assertEquals("First datanode should be rack local", clientRack, locatedBlock.getLocations()[0].getNetworkLocation()); } nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), src, clientMachine); } }
/** * Append to lazy persist file is denied. * @throws IOException */ @Test public void testAppendIsDenied() throws IOException { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); try { client.append(path.toString(), BUFFER_LENGTH, EnumSet.of(CreateFlag.APPEND), null, null).close(); fail("Append to LazyPersist file did not fail as expected"); } catch (Throwable t) { LOG.info("Got expected exception ", t); } }
@SuppressWarnings("unchecked") private static void mockCreate(ClientProtocol mcp, CipherSuite suite, CryptoProtocolVersion version) throws Exception { Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0, new FileEncryptionInfo(suite, version, new byte[suite.getAlgorithmBlockSize()], new byte[suite.getAlgorithmBlockSize()], "fakeKey", "fakeVersion"), (byte) 0)) .when(mcp) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); }
@Override @SuppressWarnings("deprecation") public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { // Check if file should be appended or overwritten. Assume that the file // is overwritten on if the CREATE and OVERWRITE create flags are set. Note // that any other combinations of create flags will result in an open new or // open with append. final EnumSet<CreateFlag> createflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); boolean overwrite = flags.containsAll(createflags); // Delegate the create non-recursive call. return this.createNonRecursive(f, permission, overwrite, bufferSize, replication, blockSize, progress); }
FSDataOutputStream create(PathData item, boolean lazyPersist) throws IOException { try { if (lazyPersist) { EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST); return create(item.path, FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(getConf())), createFlags, getConf().getInt("io.file.buffer.size", 4096), lazyPersist ? 1 : getDefaultReplication(item.path), getDefaultBlockSize(), null, null); } else { return create(item.path, true); } } finally { // might have been created but stream was interrupted deleteOnExit(item.path); } }
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { if(stat.getErasureCodingPolicy() != null) { throw new IOException( "Not support appending to a striping layout file yet."); } try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForAppend", src)) { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum, favoredNodes); out.start(); return out; } }
/** * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is * a hint to where the namenode should place the file blocks. * The favored nodes hint is not persisted in HDFS. Hence it may be honored * at the creation time only. HDFS could move the blocks during balancing or * replication, to move the blocks from favored nodes. A value of null means * no favored nodes for this create */ public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); final FsPermission masked = applyUMask(permission); LOG.debug("{}: masked={}", src, masked); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes)); beginFileLease(result.getFileId(), result); return result; }
/** * Append to an existing file if {@link CreateFlag#APPEND} is present */ private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag, Progressable progress) throws IOException { if (flag.contains(CreateFlag.APPEND)) { HdfsFileStatus stat = getFileInfo(src); if (stat == null) { // No file to append to // New file needs to be created if create option is present if (!flag.contains(CreateFlag.CREATE)) { throw new FileNotFoundException( "failed to append to non-existent file " + src + " on client " + clientName); } return null; } return callAppend(src, flag, progress, null); } return null; }
/** * Same as {{@link #create(String, FsPermission, EnumSet, short, long, * Progressable, int, ChecksumOpt)} except that the permission * is absolute (ie has already been masked with umask. */ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { checkOpen(); CreateFlag.validate(flag); DFSOutputStream result = primitiveAppend(src, flag, progress); if (result == null) { DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, checksum, null); } beginFileLease(result.getFileId(), result); return result; }
public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) { EnumSet<CreateFlag> result = EnumSet.noneOf(CreateFlag.class); if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) { result.add(CreateFlag.APPEND); } if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) { result.add(CreateFlag.CREATE); } if ((flag & CreateFlagProto.OVERWRITE_VALUE) == CreateFlagProto.OVERWRITE_VALUE) { result.add(CreateFlag.OVERWRITE); } if ((flag & CreateFlagProto.LAZY_PERSIST_VALUE) == CreateFlagProto.LAZY_PERSIST_VALUE) { result.add(CreateFlag.LAZY_PERSIST); } if ((flag & CreateFlagProto.NEW_BLOCK_VALUE) == CreateFlagProto.NEW_BLOCK_VALUE) { result.add(CreateFlag.NEW_BLOCK); } return new EnumSetWritable<>(result, CreateFlag.class); }
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) .setMasked(PBHelperClient.convert(masked)) .setClientName(clientName) .setCreateFlag(PBHelperClient.convertCreateFlag(flag)) .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize); builder.addAllCryptoProtocolVersion( PBHelperClient.convert(supportedVersions)); CreateRequestProto req = builder.build(); try { CreateResponseProto res = rpcProxy.create(null, req); return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public LastBlockWithStatus append(String src, String clientName, EnumSetWritable<CreateFlag> flag) throws IOException { AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src) .setClientName(clientName).setFlag( PBHelperClient.convertCreateFlag(flag)) .build(); try { AppendResponseProto res = rpcProxy.append(null, req); LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient .convertLocatedBlockProto(res.getBlock()) : null; HdfsFileStatus stat = (res.hasStat()) ? PBHelperClient.convert(res.getStat()) : null; return new LastBlockWithStatus(lastBlock, stat); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public AppendResponseProto append(RpcController controller, AppendRequestProto req) throws ServiceException { try { EnumSetWritable<CreateFlag> flags = req.hasFlag() ? PBHelperClient.convertCreateFlag(req.getFlag()) : new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)); LastBlockWithStatus result = server.append(req.getSrc(), req.getClientName(), flags); AppendResponseProto.Builder builder = AppendResponseProto.newBuilder(); if (result.getLastBlock() != null) { builder.setBlock(PBHelperClient.convertLocatedBlock( result.getLastBlock())); } if (result.getFileStatus() != null) { builder.setStat(PBHelperClient.convert(result.getFileStatus())); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }