Java 类org.apache.hadoop.fs.FSOutputSummer 实例源码

项目:RDFS    文件:BlockXCodingMerger.java   
@Override
void writePacket(DataTransferPacket packet) throws IOException {

    if (!writtenHeader) {
        writeChecksumHeader(checksum);
        writtenHeader = true;
    }

    boolean forceSync = packet.isForceSync();
    int dataLength = packet.dataLength;

    if (dataLength <= 0) {
      LOG.warn("NTar: writePacket: Receiving empty packet:" + 
              packet + " for block " + block);
    } else {
      dataLength = (int) Math.min(dataLength, this.length - this.writtenLength);
      setBlockPosition(offsetInBlock);  // adjust file position
      offsetInBlock += dataLength;
      this.writtenLength += dataLength;

      int checksumLength = (dataLength + bytesPerChecksum -1)/bytesPerChecksum 
              * checksumSize;

      byte[] pktBuf = new byte[checksumLength + dataLength];
      Arrays.fill(pktBuf, (byte)0);
      System.arraycopy(packet.buffer, 0, pktBuf, 
              checksumLength, dataLength);
      ChecksumUtil.updateChunkChecksum(pktBuf, 0,
              checksumLength, dataLength, checksum);

      try {
        if (!finalized) {
          long writeStartTime = System.currentTimeMillis();
          //finally write to the disk :
          out.write(pktBuf, checksumLength, dataLength);

          // If this is a partial chunk, then verify that this is the only
          // chunk in the packet. Calculate new crc for this chunk.
          if (partialCrc != null) {
            if (dataLength > bytesPerChecksum) {
              throw new IOException("Got wrong length during mergeBlock(" + 
                                    block + ") from " + childAddrs + " " +
                                    "A packet can have only one partial chunk."+
                                    " len = " + dataLength + 
                                    " bytesPerChecksum " + bytesPerChecksum);
            }
            partialCrc.update(pktBuf, checksumLength, dataLength);
            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
            checksumOut.write(buf);
            LOG.debug("Writing out partial crc for data len " + dataLength);
            partialCrc = null;
          } else {
            checksumOut.write(pktBuf, 0, checksumLength);
          }
          datanode.myMetrics.bytesWritten.inc(dataLength);

          flush(forceSync);

          this.replicaBeingWritten.setBytesOnDisk(offsetInBlock);
          // Record time taken to write packet
          long writePacketDuration = System.currentTimeMillis() - writeStartTime;
          datanode.myMetrics.writePacketLatency.inc(writePacketDuration);
        }
      } catch (ClosedByInterruptException cix) {
        LOG.warn( "NTar: Thread interrupted when flushing bytes to disk."
                + "Might cause inconsistent sates", cix);
        throw cix;
      } catch (InterruptedIOException iix) {
        LOG.warn(
            "NTar: InterruptedIOException when flushing bytes to disk."
                + "Might cause inconsistent sates", iix);
        throw iix;
      } catch (IOException iex) {
        datanode.checkDiskError(iex);
        throw iex;
      }
    }
  }