@Override void writePacket(DataTransferPacket packet) throws IOException { if (!writtenHeader) { writeChecksumHeader(checksum); writtenHeader = true; } boolean forceSync = packet.isForceSync(); int dataLength = packet.dataLength; if (dataLength <= 0) { LOG.warn("NTar: writePacket: Receiving empty packet:" + packet + " for block " + block); } else { dataLength = (int) Math.min(dataLength, this.length - this.writtenLength); setBlockPosition(offsetInBlock); // adjust file position offsetInBlock += dataLength; this.writtenLength += dataLength; int checksumLength = (dataLength + bytesPerChecksum -1)/bytesPerChecksum * checksumSize; byte[] pktBuf = new byte[checksumLength + dataLength]; Arrays.fill(pktBuf, (byte)0); System.arraycopy(packet.buffer, 0, pktBuf, checksumLength, dataLength); ChecksumUtil.updateChunkChecksum(pktBuf, 0, checksumLength, dataLength, checksum); try { if (!finalized) { long writeStartTime = System.currentTimeMillis(); //finally write to the disk : out.write(pktBuf, checksumLength, dataLength); // If this is a partial chunk, then verify that this is the only // chunk in the packet. Calculate new crc for this chunk. if (partialCrc != null) { if (dataLength > bytesPerChecksum) { throw new IOException("Got wrong length during mergeBlock(" + block + ") from " + childAddrs + " " + "A packet can have only one partial chunk."+ " len = " + dataLength + " bytesPerChecksum " + bytesPerChecksum); } partialCrc.update(pktBuf, checksumLength, dataLength); byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); checksumOut.write(buf); LOG.debug("Writing out partial crc for data len " + dataLength); partialCrc = null; } else { checksumOut.write(pktBuf, 0, checksumLength); } datanode.myMetrics.bytesWritten.inc(dataLength); flush(forceSync); this.replicaBeingWritten.setBytesOnDisk(offsetInBlock); // Record time taken to write packet long writePacketDuration = System.currentTimeMillis() - writeStartTime; datanode.myMetrics.writePacketLatency.inc(writePacketDuration); } } catch (ClosedByInterruptException cix) { LOG.warn( "NTar: Thread interrupted when flushing bytes to disk." + "Might cause inconsistent sates", cix); throw cix; } catch (InterruptedIOException iix) { LOG.warn( "NTar: InterruptedIOException when flushing bytes to disk." + "Might cause inconsistent sates", iix); throw iix; } catch (IOException iex) { datanode.checkDiskError(iex); throw iex; } } }