Java 类org.apache.hadoop.mapred.IFile.Reader 实例源码
项目:hadoop
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:hadoop
文件:TestMerger.java
private Answer<?> getKeyAnswer(final String segmentName,
final boolean isCompressedInput) {
return new Answer<Object>() {
int i = 0;
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) {
if (i++ == 3) {
return false;
}
Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
int multiplier = isCompressedInput ? 100 : 1;
mock.bytesRead += 10 * multiplier;
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
项目:aliyun-oss-hadoop-fs
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:aliyun-oss-hadoop-fs
文件:TestMerger.java
private Answer<?> getKeyAnswer(final String segmentName,
final boolean isCompressedInput) {
return new Answer<Object>() {
int i = 0;
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) {
if (i++ == 3) {
return false;
}
Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
int multiplier = isCompressedInput ? 100 : 1;
mock.bytesRead += 10 * multiplier;
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
项目:big-c
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:big-c
文件:TestMerger.java
private Answer<?> getKeyAnswer(final String segmentName,
final boolean isCompressedInput) {
return new Answer<Object>() {
int i = 0;
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) {
if (i++ == 3) {
return false;
}
Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
int multiplier = isCompressedInput ? 100 : 1;
mock.bytesRead += 10 * multiplier;
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestMerger.java
private Answer<?> getKeyAnswer(final String segmentName,
final boolean isCompressedInput) {
return new Answer<Object>() {
int i = 0;
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) {
if (i++ == 3) {
return false;
}
Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
int multiplier = isCompressedInput ? 100 : 1;
mock.bytesRead += 10 * multiplier;
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
项目:hadoop-plus
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
path, null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:FlexMap
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:FlexMap
文件:TestMerger.java
private Answer<?> getKeyAnswer(final String segmentName,
final boolean isCompressedInput) {
return new Answer<Object>() {
int i = 0;
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) {
if (i++ == 3) {
return false;
}
Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
int multiplier = isCompressedInput ? 100 : 1;
mock.bytesRead += 10 * multiplier;
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
项目:hops
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:hops
文件:TestMerger.java
private Answer<?> getKeyAnswer(final String segmentName,
final boolean isCompressedInput) {
return new Answer<Object>() {
int i = 0;
@SuppressWarnings("unchecked")
public Boolean answer(InvocationOnMock invocation) {
if (i++ == 3) {
return false;
}
Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
int multiplier = isCompressedInput ? 100 : 1;
mock.bytesRead += 10 * multiplier;
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
项目:hadoop-TCP
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
path, null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:hardfs
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
path, null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:hadoop-on-lustre2
文件:TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
path, null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
项目:hadoop
文件:Merger.java
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
this.reader = reader;
this.preserve = preserve;
this.segmentLength = reader.getLength();
this.mapOutputsCounter = mapOutputsCounter;
}
项目:hadoop
文件:Merger.java
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
in = CryptoUtils.wrapIfNecessary(conf, in);
reader = new Reader<K, V>(conf, in,
segmentLength - CryptoUtils.cryptoPadding(conf),
codec, readsCounter);
}
if (mapOutputsCounter != null) {
mapOutputsCounter.increment(1);
}
}
项目:hadoop
文件:BackupStore.java
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
项目:hadoop
文件:TestMerger.java
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i, boolean isCompressedInput)
throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getLength()).thenReturn(30l);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.nextRawKey(any(DataInputBuffer.class)))
.thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput));
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
any(DataInputBuffer.class));
return readerMock;
}
项目:aliyun-oss-hadoop-fs
文件:Merger.java
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
this.reader = reader;
this.preserve = preserve;
this.segmentLength = reader.getLength();
this.mapOutputsCounter = mapOutputsCounter;
}
项目:aliyun-oss-hadoop-fs
文件:Merger.java
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
in = CryptoUtils.wrapIfNecessary(conf, in);
reader = new Reader<K, V>(conf, in,
segmentLength - CryptoUtils.cryptoPadding(conf),
codec, readsCounter);
}
if (mapOutputsCounter != null) {
mapOutputsCounter.increment(1);
}
}
项目:aliyun-oss-hadoop-fs
文件:BackupStore.java
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
项目:aliyun-oss-hadoop-fs
文件:TestMerger.java
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i, boolean isCompressedInput)
throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getLength()).thenReturn(30l);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.nextRawKey(any(DataInputBuffer.class)))
.thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput));
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
any(DataInputBuffer.class));
return readerMock;
}
项目:big-c
文件:Merger.java
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
this.reader = reader;
this.preserve = preserve;
this.segmentLength = reader.getLength();
this.mapOutputsCounter = mapOutputsCounter;
}
项目:big-c
文件:Merger.java
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
in = CryptoUtils.wrapIfNecessary(conf, in);
reader = new Reader<K, V>(conf, in,
segmentLength - CryptoUtils.cryptoPadding(conf),
codec, readsCounter);
}
if (mapOutputsCounter != null) {
mapOutputsCounter.increment(1);
}
}
项目:big-c
文件:BackupStore.java
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
项目:big-c
文件:TestMerger.java
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i, boolean isCompressedInput)
throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getLength()).thenReturn(30l);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.nextRawKey(any(DataInputBuffer.class)))
.thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput));
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
any(DataInputBuffer.class));
return readerMock;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Merger.java
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
this.reader = reader;
this.preserve = preserve;
this.segmentLength = reader.getLength();
this.mapOutputsCounter = mapOutputsCounter;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Merger.java
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
in = CryptoUtils.wrapIfNecessary(conf, in);
reader = new Reader<K, V>(conf, in,
segmentLength - CryptoUtils.cryptoPadding(conf),
codec, readsCounter);
}
if (mapOutputsCounter != null) {
mapOutputsCounter.increment(1);
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:BackupStore.java
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestMerger.java
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i, boolean isCompressedInput)
throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getLength()).thenReturn(30l);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.nextRawKey(any(DataInputBuffer.class)))
.thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput));
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
any(DataInputBuffer.class));
return readerMock;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Merger.java
public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
this.reader = reader;
this.preserve = preserve;
this.segmentLength = reader.getLength();
this.rawDataLength = rawDataLength;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Merger.java
private void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestMerger.java
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i) throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.next(any(DataInputBuffer.class), any(DataInputBuffer.class)))
.thenAnswer(getAnswer("Segment" + i));
return readerMock;
}
项目:hadoop-EAR
文件:Merger.java
private void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
}
}
项目:hadoop-plus
文件:Merger.java
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
this.reader = reader;
this.preserve = preserve;
this.segmentLength = reader.getLength();
this.mapOutputsCounter = mapOutputsCounter;
}
项目:hadoop-plus
文件:Merger.java
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
}
if (mapOutputsCounter != null) {
mapOutputsCounter.increment(1);
}
}
项目:hadoop-plus
文件:BackupStore.java
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
项目:hadoop-plus
文件:TestMerger.java
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i) throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.nextRawKey(any(DataInputBuffer.class)))
.thenAnswer(getKeyAnswer("Segment" + i));
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
any(DataInputBuffer.class));
return readerMock;
}