Java 类org.apache.hadoop.hdfs.util.DataTransferThrottler 实例源码

项目:hadoop    文件:TransferFsImage.java   
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
    long imageTxId, Storage dstStorage, InputStream stream,
    long advertisedSize, DataTransferThrottler throttler) throws IOException {

  String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

  List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
  if (dstFiles.isEmpty()) {
    throw new IOException("No targets in destination storage!");
  }

  MD5Hash advertisedDigest = parseMD5Header(request);
  MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
      advertisedSize, advertisedDigest, fileName, stream, throttler);
  LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
      + dstFiles.get(0).length() + " bytes.");
  return hash;
}
项目:hadoop    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.monotonicNow();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.monotonicNow();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:aliyun-oss-hadoop-fs    文件:TransferFsImage.java   
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
    long imageTxId, Storage dstStorage, InputStream stream,
    long advertisedSize, DataTransferThrottler throttler) throws IOException {

  String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

  List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
  if (dstFiles.isEmpty()) {
    throw new IOException("No targets in destination storage!");
  }

  MD5Hash advertisedDigest = parseMD5Header(request);
  MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
      advertisedSize, advertisedDigest, fileName, stream, throttler);
  LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
      + dstFiles.get(0).length() + " bytes.");
  return hash;
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.monotonicNow();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.monotonicNow();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:big-c    文件:TransferFsImage.java   
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
    long imageTxId, Storage dstStorage, InputStream stream,
    long advertisedSize, DataTransferThrottler throttler) throws IOException {

  String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

  List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
  if (dstFiles.isEmpty()) {
    throw new IOException("No targets in destination storage!");
  }

  MD5Hash advertisedDigest = parseMD5Header(request);
  MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
      advertisedSize, advertisedDigest, fileName, stream, throttler);
  LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
      + dstFiles.get(0).length() + " bytes.");
  return hash;
}
项目:big-c    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.monotonicNow();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.monotonicNow();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TransferFsImage.java   
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
    long imageTxId, Storage dstStorage, InputStream stream,
    long advertisedSize, DataTransferThrottler throttler) throws IOException {

  String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

  List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
  if (dstFiles.isEmpty()) {
    throw new IOException("No targets in destination storage!");
  }

  MD5Hash advertisedDigest = parseMD5Header(request);
  MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
      advertisedSize, advertisedDigest, fileName, stream, throttler);
  LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
      + dstFiles.get(0).length() + " bytes.");
  return hash;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:hadoop-plus    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:FlexMap    文件:TransferFsImage.java   
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
    long imageTxId, Storage dstStorage, InputStream stream,
    long advertisedSize, DataTransferThrottler throttler) throws IOException {

  String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

  List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
  if (dstFiles.isEmpty()) {
    throw new IOException("No targets in destination storage!");
  }

  MD5Hash advertisedDigest = parseMD5Header(request);
  MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
      advertisedSize, advertisedDigest, fileName, stream, throttler);
  LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
      + dstFiles.get(0).length() + " bytes.");
  return hash;
}
项目:FlexMap    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:hops    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024 * 1024L;
  final long TOTAL_BYTES = 6 * bandwidthPerSec;
  long bytesToSend = TOTAL_BYTES;
  long start = Time.now();
  DataTransferThrottler throttler =
      new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024 * 512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024 * 768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {
  }
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes * 1000 / (end - start) <= bandwidthPerSec);
}
项目:hadoop-TCP    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:hardfs    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:hadoop-on-lustre2    文件:TransferFsImage.java   
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
    long imageTxId, Storage dstStorage, InputStream stream,
    long advertisedSize, DataTransferThrottler throttler) throws IOException {

  String fileName = NNStorage.getCheckpointImageFileName(imageTxId);

  List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
  if (dstFiles.isEmpty()) {
    throw new IOException("No targets in destination storage!");
  }

  MD5Hash advertisedDigest = parseMD5Header(request);
  MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
      advertisedSize, advertisedDigest, fileName, stream, throttler);
  LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
      + dstFiles.get(0).length() + " bytes.");
  return hash;
}
项目:hadoop-on-lustre2    文件:TestBlockReplacement.java   
@Test
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Time.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Time.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:cumulus    文件:TestBlockReplacement.java   
public void testThrottler() throws IOException {
  Configuration conf = new HdfsConfiguration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Util.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Util.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:RDFS    文件:TestBlockReplacement.java   
public void testThrottler() throws IOException {
  Configuration conf = new Configuration();
  FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  long bandwidthPerSec = 1024*1024L;
  final long TOTAL_BYTES =6*bandwidthPerSec; 
  long bytesToSend = TOTAL_BYTES; 
  long start = Util.now();
  DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
  long totalBytes = 0L;
  long bytesSent = 1024*512L; // 0.5MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  bytesSent = 1024*768L; // 0.75MB
  throttler.throttle(bytesSent);
  bytesToSend -= bytesSent;
  try {
    Thread.sleep(1000);
  } catch (InterruptedException ignored) {}
  throttler.throttle(bytesToSend);
  long end = Util.now();
  assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
项目:RDFS    文件:BlockXCodingMerger.java   
public BlockXCodingMerger(Block block, int namespaceId,
        DataInputStream[] childInputStreams, long offsetInBlock,
        long length, String[] childAddrs, String myAddr,
        DataTransferThrottler throttler,
        int mergerLevel) throws IOException{
    super();
    this.block = block;
    this.namespaceId = namespaceId;
    this.childInputStreams = childInputStreams;
    this.offsetInBlock = offsetInBlock;
    this.length = length;
    this.childAddrs = childAddrs;
    this.myAddr = myAddr;
    this.throttler = throttler;
    this.mergerLevel = mergerLevel;
    Configuration conf = new Configuration();
    this.packetSize = conf.getInt("raid.blockreconstruct.packetsize", 4096);
    this.bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
    this.checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
            bytesPerChecksum, new PureJavaCrc32());
    this.checksumSize = checksum.getChecksumSize();
}
项目:RDFS    文件:FSImage.java   
/**
 * Constructor
 * @param conf Configuration
 */
FSImage(Configuration conf) throws IOException {
  this();
  setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
      FSImage.getCheckpointEditsDirs(conf, null));
  long transferBandwidth = conf.getLong(
      HdfsConstants.DFS_IMAGE_TRANSFER_RATE_KEY,
      HdfsConstants.DFS_IMAGE_TRANSFER_RATE_DEFAULT);

  if (transferBandwidth > 0) {
    this.imageTransferThrottler = new DataTransferThrottler(transferBandwidth);
  }
}
项目:RDFS    文件:BlockXCodingMerger.java   
public InternalBlockXCodingMerger(Block block, int namespaceId,
        DataInputStream[] childInputStreams, long offsetInBlock,
        long length, String[] childAddrs, String myAddr,
        DataTransferThrottler throttler,
        int mergerLevel, String parentAddr,
        DataOutputStream parentOut) throws IOException {
    super(block, namespaceId, childInputStreams, offsetInBlock, length,
            childAddrs, myAddr, throttler,
            mergerLevel);
    this.parentAddr = parentAddr;
    this.parentOut = parentOut;
}
项目:RDFS    文件:BlockXCodingMerger.java   
public BufferBlockXCodingMerger(Block block, int namespaceId,
        DataInputStream[] childInputStreams, long offsetInBlock,
        long length, String[] childAddrs, String myAddr,
        DataTransferThrottler throttler,int mergerLevel,
        byte[] buffer, int offsetInBuffer) throws IOException {
    super(block, namespaceId, childInputStreams, offsetInBlock, length,
            childAddrs, myAddr, throttler, mergerLevel);
    this.buffer = buffer;
    this.offsetInBuffer = offsetInBuffer;
    this.currentOffsetInBlock = offsetInBlock;
}
项目:hadoop    文件:TransferFsImage.java   
/**
 * A server-side method to respond to a getfile http request
 * Copies the contents of the local file into the output stream.
 */
public static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler)
  throws IOException {
  copyFileToStream(out, localfile, infile, throttler, null);
}
项目:hadoop    文件:TransferFsImage.java   
private static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler,
    Canceler canceler) throws IOException {
  byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
  try {
    CheckpointFaultInjector.getInstance()
        .aboutToSendFile(localfile);

    if (CheckpointFaultInjector.getInstance().
          shouldSendShortFile(localfile)) {
        // Test sending image shorter than localfile
        long len = localfile.length();
        buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)];
        // This will read at most half of the image
        // and the rest of the image will be sent over the wire
        infile.read(buf);
    }
    int num = 1;
    while (num > 0) {
      if (canceler != null && canceler.isCancelled()) {
        throw new SaveNamespaceCancelledException(
          canceler.getCancellationReason());
      }
      num = infile.read(buf);
      if (num <= 0) {
        break;
      }
      if (CheckpointFaultInjector.getInstance()
            .shouldCorruptAByte(localfile)) {
        // Simulate a corrupted byte on the wire
        LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
        buf[0]++;
      }

      out.write(buf, 0, num);
      if (throttler != null) {
        throttler.throttle(num, canceler);
      }
    }
  } catch (EofException e) {
    LOG.info("Connection closed by client");
    out = null; // so we don't close in the finally
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
项目:hadoop    文件:BlockSender.java   
private long doSendBlock(DataOutputStream out, OutputStream baseStream,
      DataTransferThrottler throttler) throws IOException {
  if (out == null) {
    throw new IOException( "out stream is null" );
  }
  initialOffset = offset;
  long totalRead = 0;
  OutputStream streamForSendChunks = out;

  lastCacheDropOffset = initialOffset;

  if (isLongRead() && blockInFd != null) {
    // Advise that this file descriptor will be accessed sequentially.
    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
        block.getBlockName(), blockInFd, 0, 0,
        NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
  }

  // Trigger readahead of beginning of file if configured.
  manageOsCache();

  final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
  try {
    int maxChunksPerPacket;
    int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
    boolean transferTo = transferToAllowed && !verifyChecksum
        && baseStream instanceof SocketOutputStream
        && blockIn instanceof FileInputStream;
    if (transferTo) {
      FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
      blockInPosition = fileChannel.position();
      streamForSendChunks = baseStream;
      maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

      // Smaller packet size to only hold checksum when doing transferTo
      pktBufSize += checksumSize * maxChunksPerPacket;
    } else {
      maxChunksPerPacket = Math.max(1,
          numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
      // Packet size includes both checksum and data
      pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
    }

    ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

    while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
      manageOsCache();
      long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
          transferTo, throttler);
      offset += len;
      totalRead += len + (numberOfChunks(len) * checksumSize);
      seqno++;
    }
    // If this thread was interrupted, then it did not send the full block.
    if (!Thread.currentThread().isInterrupted()) {
      try {
        // send an empty packet to mark the end of the block
        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
            throttler);
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }

      sentEntireByteRange = true;
    }
  } finally {
    if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
      final long endTime = System.nanoTime();
      ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
          initialOffset, endTime - startTime));
    }
    close();
  }
  return totalRead;
}
项目:aliyun-oss-hadoop-fs    文件:TransferFsImage.java   
/**
 * A server-side method to respond to a getfile http request
 * Copies the contents of the local file into the output stream.
 */
public static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler)
  throws IOException {
  copyFileToStream(out, localfile, infile, throttler, null);
}
项目:aliyun-oss-hadoop-fs    文件:TransferFsImage.java   
private static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler,
    Canceler canceler) throws IOException {
  byte buf[] = new byte[IO_FILE_BUFFER_SIZE];
  try {
    CheckpointFaultInjector.getInstance()
        .aboutToSendFile(localfile);

    if (CheckpointFaultInjector.getInstance().
          shouldSendShortFile(localfile)) {
        // Test sending image shorter than localfile
        long len = localfile.length();
        buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)];
        // This will read at most half of the image
        // and the rest of the image will be sent over the wire
        infile.read(buf);
    }
    int num = 1;
    while (num > 0) {
      if (canceler != null && canceler.isCancelled()) {
        throw new SaveNamespaceCancelledException(
          canceler.getCancellationReason());
      }
      num = infile.read(buf);
      if (num <= 0) {
        break;
      }
      if (CheckpointFaultInjector.getInstance()
            .shouldCorruptAByte(localfile)) {
        // Simulate a corrupted byte on the wire
        LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
        buf[0]++;
      }

      out.write(buf, 0, num);
      if (throttler != null) {
        throttler.throttle(num, canceler);
      }
    }
  } catch (EofException e) {
    LOG.info("Connection closed by client");
    out = null; // so we don't close in the finally
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:BlockSender.java   
private long doSendBlock(DataOutputStream out, OutputStream baseStream,
      DataTransferThrottler throttler) throws IOException {
  if (out == null) {
    throw new IOException( "out stream is null" );
  }
  initialOffset = offset;
  long totalRead = 0;
  OutputStream streamForSendChunks = out;

  lastCacheDropOffset = initialOffset;

  if (isLongRead() && blockInFd != null) {
    // Advise that this file descriptor will be accessed sequentially.
    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
        block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL);
  }

  // Trigger readahead of beginning of file if configured.
  manageOsCache();

  final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
  try {
    int maxChunksPerPacket;
    int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
    boolean transferTo = transferToAllowed && !verifyChecksum
        && baseStream instanceof SocketOutputStream
        && blockIn instanceof FileInputStream;
    if (transferTo) {
      FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
      blockInPosition = fileChannel.position();
      streamForSendChunks = baseStream;
      maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

      // Smaller packet size to only hold checksum when doing transferTo
      pktBufSize += checksumSize * maxChunksPerPacket;
    } else {
      maxChunksPerPacket = Math.max(1,
          numberOfChunks(IO_FILE_BUFFER_SIZE));
      // Packet size includes both checksum and data
      pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
    }

    ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

    while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
      manageOsCache();
      long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
          transferTo, throttler);
      offset += len;
      totalRead += len + (numberOfChunks(len) * checksumSize);
      seqno++;
    }
    // If this thread was interrupted, then it did not send the full block.
    if (!Thread.currentThread().isInterrupted()) {
      try {
        // send an empty packet to mark the end of the block
        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
            throttler);
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }

      sentEntireByteRange = true;
    }
  } finally {
    if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
      final long endTime = System.nanoTime();
      ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
          initialOffset, endTime - startTime));
    }
    close();
  }
  return totalRead;
}
项目:aliyun-oss-hadoop-fs    文件:GetJournalEditServlet.java   
@Override
public void doGet(final HttpServletRequest request,
    final HttpServletResponse response) throws ServletException, IOException {
  FileInputStream editFileIn = null;
  try {
    final ServletContext context = getServletContext();
    final Configuration conf = (Configuration) getServletContext()
        .getAttribute(JspHelper.CURRENT_CONF);
    final String journalId = request.getParameter(JOURNAL_ID_PARAM);
    QuorumJournalManager.checkJournalId(journalId);
    final JNStorage storage = JournalNodeHttpServer
        .getJournalFromContext(context, journalId).getStorage();

    // Check security
    if (!checkRequestorOrSendError(conf, request, response)) {
      return;
    }

    // Check that the namespace info is correct
    if (!checkStorageInfoOrSendError(storage, request, response)) {
      return;
    }

    long segmentTxId = ServletUtil.parseLongParam(request,
        SEGMENT_TXID_PARAM);

    FileJournalManager fjm = storage.getJournalManager();
    File editFile;

    synchronized (fjm) {
      // Synchronize on the FJM so that the file doesn't get finalized
      // out from underneath us while we're in the process of opening
      // it up.
      EditLogFile elf = fjm.getLogFile(
          segmentTxId);
      if (elf == null) {
        response.sendError(HttpServletResponse.SC_NOT_FOUND,
            "No edit log found starting at txid " + segmentTxId);
        return;
      }
      editFile = elf.getFile();
      ImageServlet.setVerificationHeadersForGet(response, editFile);
      ImageServlet.setFileNameHeaders(response, editFile);
      editFileIn = new FileInputStream(editFile);
    }

    DataTransferThrottler throttler = ImageServlet.getThrottler(conf);

    // send edits
    TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
        editFileIn, throttler);

  } catch (Throwable t) {
    String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
    response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
    throw new IOException(errMsg);
  } finally {
    IOUtils.closeStream(editFileIn);
  }
}
项目:big-c    文件:TransferFsImage.java   
/**
 * A server-side method to respond to a getfile http request
 * Copies the contents of the local file into the output stream.
 */
public static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler)
  throws IOException {
  copyFileToStream(out, localfile, infile, throttler, null);
}
项目:big-c    文件:TransferFsImage.java   
private static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler,
    Canceler canceler) throws IOException {
  byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
  try {
    CheckpointFaultInjector.getInstance()
        .aboutToSendFile(localfile);

    if (CheckpointFaultInjector.getInstance().
          shouldSendShortFile(localfile)) {
        // Test sending image shorter than localfile
        long len = localfile.length();
        buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)];
        // This will read at most half of the image
        // and the rest of the image will be sent over the wire
        infile.read(buf);
    }
    int num = 1;
    while (num > 0) {
      if (canceler != null && canceler.isCancelled()) {
        throw new SaveNamespaceCancelledException(
          canceler.getCancellationReason());
      }
      num = infile.read(buf);
      if (num <= 0) {
        break;
      }
      if (CheckpointFaultInjector.getInstance()
            .shouldCorruptAByte(localfile)) {
        // Simulate a corrupted byte on the wire
        LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
        buf[0]++;
      }

      out.write(buf, 0, num);
      if (throttler != null) {
        throttler.throttle(num, canceler);
      }
    }
  } catch (EofException e) {
    LOG.info("Connection closed by client");
    out = null; // so we don't close in the finally
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
项目:big-c    文件:BlockSender.java   
private long doSendBlock(DataOutputStream out, OutputStream baseStream,
      DataTransferThrottler throttler) throws IOException {
  if (out == null) {
    throw new IOException( "out stream is null" );
  }
  initialOffset = offset;
  long totalRead = 0;
  OutputStream streamForSendChunks = out;

  lastCacheDropOffset = initialOffset;

  if (isLongRead() && blockInFd != null) {
    // Advise that this file descriptor will be accessed sequentially.
    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
        block.getBlockName(), blockInFd, 0, 0,
        NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
  }

  // Trigger readahead of beginning of file if configured.
  manageOsCache();

  final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
  try {
    int maxChunksPerPacket;
    int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
    boolean transferTo = transferToAllowed && !verifyChecksum
        && baseStream instanceof SocketOutputStream
        && blockIn instanceof FileInputStream;
    if (transferTo) {
      FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
      blockInPosition = fileChannel.position();
      streamForSendChunks = baseStream;
      maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

      // Smaller packet size to only hold checksum when doing transferTo
      pktBufSize += checksumSize * maxChunksPerPacket;
    } else {
      maxChunksPerPacket = Math.max(1,
          numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
      // Packet size includes both checksum and data
      pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
    }

    ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

    while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
      manageOsCache();
      long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
          transferTo, throttler);
      offset += len;
      totalRead += len + (numberOfChunks(len) * checksumSize);
      seqno++;
    }
    // If this thread was interrupted, then it did not send the full block.
    if (!Thread.currentThread().isInterrupted()) {
      try {
        // send an empty packet to mark the end of the block
        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
            throttler);
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }

      sentEntireByteRange = true;
    }
  } finally {
    if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
      final long endTime = System.nanoTime();
      ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
          initialOffset, endTime - startTime));
    }
    close();
  }
  return totalRead;
}
项目:big-c    文件:GetJournalEditServlet.java   
@Override
public void doGet(final HttpServletRequest request,
    final HttpServletResponse response) throws ServletException, IOException {
  FileInputStream editFileIn = null;
  try {
    final ServletContext context = getServletContext();
    final Configuration conf = (Configuration) getServletContext()
        .getAttribute(JspHelper.CURRENT_CONF);
    final String journalId = request.getParameter(JOURNAL_ID_PARAM);
    QuorumJournalManager.checkJournalId(journalId);
    final JNStorage storage = JournalNodeHttpServer
        .getJournalFromContext(context, journalId).getStorage();

    // Check security
    if (!checkRequestorOrSendError(conf, request, response)) {
      return;
    }

    // Check that the namespace info is correct
    if (!checkStorageInfoOrSendError(storage, request, response)) {
      return;
    }

    long segmentTxId = ServletUtil.parseLongParam(request,
        SEGMENT_TXID_PARAM);

    FileJournalManager fjm = storage.getJournalManager();
    File editFile;

    synchronized (fjm) {
      // Synchronize on the FJM so that the file doesn't get finalized
      // out from underneath us while we're in the process of opening
      // it up.
      EditLogFile elf = fjm.getLogFile(
          segmentTxId);
      if (elf == null) {
        response.sendError(HttpServletResponse.SC_NOT_FOUND,
            "No edit log found starting at txid " + segmentTxId);
        return;
      }
      editFile = elf.getFile();
      ImageServlet.setVerificationHeadersForGet(response, editFile);
      ImageServlet.setFileNameHeaders(response, editFile);
      editFileIn = new FileInputStream(editFile);
    }

    DataTransferThrottler throttler = ImageServlet.getThrottler(conf);

    // send edits
    TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
        editFileIn, throttler);

  } catch (Throwable t) {
    String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
    response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
    throw new IOException(errMsg);
  } finally {
    IOUtils.closeStream(editFileIn);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TransferFsImage.java   
/**
 * A server-side method to respond to a getfile http request
 * Copies the contents of the local file into the output stream.
 */
public static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler)
  throws IOException {
  copyFileToStream(out, localfile, infile, throttler, null);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TransferFsImage.java   
private static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler,
    Canceler canceler) throws IOException {
  byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
  try {
    CheckpointFaultInjector.getInstance()
        .aboutToSendFile(localfile);

    if (CheckpointFaultInjector.getInstance().
          shouldSendShortFile(localfile)) {
        // Test sending image shorter than localfile
        long len = localfile.length();
        buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)];
        // This will read at most half of the image
        // and the rest of the image will be sent over the wire
        infile.read(buf);
    }
    int num = 1;
    while (num > 0) {
      if (canceler != null && canceler.isCancelled()) {
        throw new SaveNamespaceCancelledException(
          canceler.getCancellationReason());
      }
      num = infile.read(buf);
      if (num <= 0) {
        break;
      }
      if (CheckpointFaultInjector.getInstance()
            .shouldCorruptAByte(localfile)) {
        // Simulate a corrupted byte on the wire
        LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
        buf[0]++;
      }

      out.write(buf, 0, num);
      if (throttler != null) {
        throttler.throttle(num, canceler);
      }
    }
  } catch (EofException e) {
    LOG.info("Connection closed by client");
    out = null; // so we don't close in the finally
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:GetJournalEditServlet.java   
@Override
public void doGet(final HttpServletRequest request,
    final HttpServletResponse response) throws ServletException, IOException {
  FileInputStream editFileIn = null;
  try {
    final ServletContext context = getServletContext();
    final Configuration conf = (Configuration) getServletContext()
        .getAttribute(JspHelper.CURRENT_CONF);
    final String journalId = request.getParameter(JOURNAL_ID_PARAM);
    QuorumJournalManager.checkJournalId(journalId);
    final JNStorage storage = JournalNodeHttpServer
        .getJournalFromContext(context, journalId).getStorage();

    // Check security
    if (!checkRequestorOrSendError(conf, request, response)) {
      return;
    }

    // Check that the namespace info is correct
    if (!checkStorageInfoOrSendError(storage, request, response)) {
      return;
    }

    long segmentTxId = ServletUtil.parseLongParam(request,
        SEGMENT_TXID_PARAM);

    FileJournalManager fjm = storage.getJournalManager();
    File editFile;

    synchronized (fjm) {
      // Synchronize on the FJM so that the file doesn't get finalized
      // out from underneath us while we're in the process of opening
      // it up.
      EditLogFile elf = fjm.getLogFile(
          segmentTxId);
      if (elf == null) {
        response.sendError(HttpServletResponse.SC_NOT_FOUND,
            "No edit log found starting at txid " + segmentTxId);
        return;
      }
      editFile = elf.getFile();
      ImageServlet.setVerificationHeadersForGet(response, editFile);
      ImageServlet.setFileNameHeaders(response, editFile);
      editFileIn = new FileInputStream(editFile);
    }

    DataTransferThrottler throttler = ImageServlet.getThrottler(conf);

    // send edits
    TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
        editFileIn, throttler);

  } catch (Throwable t) {
    String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
    response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
    throw new IOException(errMsg);
  } finally {
    IOUtils.closeStream(editFileIn);
  }
}
项目:hadoop-plus    文件:TransferFsImage.java   
/**
 * A server-side method to respond to a getfile http request
 * Copies the contents of the local file into the output stream.
 */
public static void getFileServer(ServletResponse response, File localfile,
    FileInputStream infile,
    DataTransferThrottler throttler) 
  throws IOException {
  byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
  ServletOutputStream out = null;
  try {
    CheckpointFaultInjector.getInstance()
        .aboutToSendFile(localfile);
    out = response.getOutputStream();

    if (CheckpointFaultInjector.getInstance().
          shouldSendShortFile(localfile)) {
        // Test sending image shorter than localfile
        long len = localfile.length();
        buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)];
        // This will read at most half of the image
        // and the rest of the image will be sent over the wire
        infile.read(buf);
    }
    int num = 1;
    while (num > 0) {
      num = infile.read(buf);
      if (num <= 0) {
        break;
      }
      if (CheckpointFaultInjector.getInstance()
            .shouldCorruptAByte(localfile)) {
        // Simulate a corrupted byte on the wire
        LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
        buf[0]++;
      }

      out.write(buf, 0, num);
      if (throttler != null) {
        throttler.throttle(num);
      }
    }
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
项目:hadoop-plus    文件:BlockReceiver.java   
void receiveBlock(
    DataOutputStream mirrOut, // output to next datanode
    DataInputStream mirrIn,   // input from next datanode
    DataOutputStream replyOut,  // output to previous datanode
    String mirrAddr, DataTransferThrottler throttlerArg,
    DatanodeInfo[] downstreams) throws IOException {

    syncOnClose = datanode.getDnConf().syncOnClose;
    boolean responderClosed = false;
    mirrorOut = mirrOut;
    mirrorAddr = mirrAddr;
    throttler = throttlerArg;

  try {
    if (isClient && !isTransfer) {
      responder = new Daemon(datanode.threadGroup, 
          new PacketResponder(replyOut, mirrIn, downstreams));
      responder.start(); // start thread to processes responses
    }

    /* 
     * Receive until the last packet.
     */
    while (receivePacket() >= 0) {}

    // wait for all outstanding packet responses. And then
    // indicate responder to gracefully shutdown.
    // Mark that responder has been closed for future processing
    if (responder != null) {
      ((PacketResponder)responder.getRunnable()).close();
      responderClosed = true;
    }

    // If this write is for a replication or transfer-RBW/Finalized,
    // then finalize block or convert temporary to RBW.
    // For client-writes, the block is finalized in the PacketResponder.
    if (isDatanode || isTransfer) {
      // close the block/crc files
      close();
      block.setNumBytes(replicaInfo.getNumBytes());

      if (stage == BlockConstructionStage.TRANSFER_RBW) {
        // for TRANSFER_RBW, convert temporary to RBW
        datanode.data.convertTemporaryToRbw(block);
      } else {
        // for isDatnode or TRANSFER_FINALIZED
        // Finalize the block.
        datanode.data.finalizeBlock(block);
      }
      datanode.metrics.incrBlocksWritten();
    }

  } catch (IOException ioe) {
    LOG.info("Exception for " + block, ioe);
    throw ioe;
  } finally {
    if (!responderClosed) { // Abnormal termination of the flow above
      IOUtils.closeStream(this);
      if (responder != null) {
        responder.interrupt();
      }
      cleanupBlock();
    }
    if (responder != null) {
      try {
        responder.join(datanode.getDnConf().getXceiverStopTimeout());
        if (responder.isAlive()) {
          String msg = "Join on responder thread " + responder
              + " timed out";
          LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder));
          throw new IOException(msg);
        }
      } catch (InterruptedException e) {
        responder.interrupt();
        throw new IOException("Interrupted receiveBlock");
      }
      responder = null;
    }
  }
}
项目:hadoop-plus    文件:GetJournalEditServlet.java   
@Override
public void doGet(final HttpServletRequest request,
    final HttpServletResponse response) throws ServletException, IOException {
  FileInputStream editFileIn = null;
  try {
    final ServletContext context = getServletContext();
    final Configuration conf = (Configuration) getServletContext()
        .getAttribute(JspHelper.CURRENT_CONF);
    final String journalId = request.getParameter(JOURNAL_ID_PARAM);
    QuorumJournalManager.checkJournalId(journalId);
    final JNStorage storage = JournalNodeHttpServer
        .getJournalFromContext(context, journalId).getStorage();

    // Check security
    if (!checkRequestorOrSendError(conf, request, response)) {
      return;
    }

    // Check that the namespace info is correct
    if (!checkStorageInfoOrSendError(storage, request, response)) {
      return;
    }

    long segmentTxId = ServletUtil.parseLongParam(request,
        SEGMENT_TXID_PARAM);

    FileJournalManager fjm = storage.getJournalManager();
    File editFile;

    synchronized (fjm) {
      // Synchronize on the FJM so that the file doesn't get finalized
      // out from underneath us while we're in the process of opening
      // it up.
      EditLogFile elf = fjm.getLogFile(
          segmentTxId);
      if (elf == null) {
        response.sendError(HttpServletResponse.SC_NOT_FOUND,
            "No edit log found starting at txid " + segmentTxId);
        return;
      }
      editFile = elf.getFile();
      GetImageServlet.setVerificationHeaders(response, editFile);
      GetImageServlet.setFileNameHeaders(response, editFile);
      editFileIn = new FileInputStream(editFile);
    }

    DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);

    // send edits
    TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);

  } catch (Throwable t) {
    String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
    response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
    throw new IOException(errMsg);
  } finally {
    IOUtils.closeStream(editFileIn);
  }
}
项目:FlexMap    文件:TransferFsImage.java   
/**
 * A server-side method to respond to a getfile http request
 * Copies the contents of the local file into the output stream.
 */
public static void copyFileToStream(OutputStream out, File localfile,
    FileInputStream infile, DataTransferThrottler throttler)
  throws IOException {
  copyFileToStream(out, localfile, infile, throttler, null);
}