Java 类org.apache.hadoop.io.ObjectWritable 实例源码
项目:spark_deep
文件:RPC.java
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
项目:spark_deep
文件:RPC.java
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
//
client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
项目:anthelion
文件:Loops.java
/**
* Wrap values in ObjectWritable.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
Writable cloned = null;
if (value instanceof LinkDatum) {
cloned = new Text(((LinkDatum)value).getUrl());
}
else {
cloned = WritableUtils.clone(value, conf);
}
objWrite.set(cloned);
output.collect(key, objWrite);
}
项目:hadoop-on-lustre
文件:RPC.java
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
项目:hadoop-on-lustre
文件:RPC.java
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
项目:gora
文件:MapFieldValueFilter.java
@Override
public void readFields(DataInput in) throws IOException {
fieldName = Text.readString(in);
mapKey = new Utf8(Text.readString(in));
filterOp = WritableUtils.readEnum(in, FilterOp.class);
operands.clear();
int operandsSize = WritableUtils.readVInt(in);
for (int i = 0; i < operandsSize; i++) {
Object operand = ObjectWritable.readObject(in, conf);
if (operand instanceof String) {
operand = new Utf8((String) operand);
}
operands.add(operand);
}
filterIfMissing = in.readBoolean();
}
项目:geowave
文件:RasterTileResizeCombiner.java
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context )
throws IOException,
InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(
key,
values);
if (mergedCoverage != null) {
context.write(
key,
mergedCoverage);
}
}
项目:geowave
文件:RasterTileResizeReducer.java
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, GridCoverage>.Context context )
throws IOException,
InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(
key,
values);
if (mergedCoverage != null) {
context.write(
helper.getGeoWaveOutputKey(),
mergedCoverage);
}
}
项目:geowave
文件:ConvexHullMapReduce.java
@Override
protected void mapWritableValue(
final GeoWaveInputKey key,
final ObjectWritable value,
final Mapper<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context )
throws IOException,
InterruptedException {
// cached for efficiency since the output is the input object
// the de-serialized input object is only used for sampling.
// For simplicity, allow the de-serialization to occur in all cases,
// even though some sampling
// functions do not inspect the input object.
currentValue = value;
super.mapWritableValue(
key,
value,
context);
}
项目:geowave
文件:ConvexHullMapReduce.java
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final org.apache.hadoop.mapreduce.Mapper<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context )
throws IOException,
InterruptedException {
@SuppressWarnings("unchecked")
final AnalyticItemWrapper<T> wrapper = itemWrapperFactory.create((T) value);
outputKey.setAdapterId(key.getAdapterId());
outputKey.setDataId(new ByteArrayId(
StringUtils.stringToBinary(nestedGroupCentroidAssigner.getGroupForLevel(wrapper))));
outputKey.setInsertionId(key.getInsertionId());
context.write(
outputKey,
currentValue);
}
项目:geowave
文件:InputToOutputKeyReducer.java
@Override
protected void setup(
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context )
throws IOException,
InterruptedException {
super.setup(context);
final ScopedJobConfiguration config = new ScopedJobConfiguration(
context.getConfiguration(),
InputToOutputKeyReducer.class,
LOGGER);
final ByteArrayId indexId = new ByteArrayId(
config.getString(
OutputParameters.Output.INDEX_ID,
"na"));
final List<ByteArrayId> indexIds = new ArrayList<ByteArrayId>();
indexIds.add(indexId);
outputKey = new GeoWaveOutputKey(
new ByteArrayId(
"na"),
indexIds);
}
项目:geowave
文件:SimpleFeatureOutputReducer.java
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context )
throws IOException,
InterruptedException {
final Iterator<Object> valIt = values.iterator();
if (valIt.hasNext()) {
key.setAdapterId(outputAdapter.getAdapterId());
final SimpleFeature feature = getSimpleFeature(
key,
valIt.next());
context.write(
key,
feature);
}
}
项目:geowave
文件:GeoWaveInputLoadJobRunner.java
@Override
public void configure(
final Job job )
throws Exception {
job.setMapperClass(Mapper.class);
job.setReducerClass(InputToOutputKeyReducer.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
job.setSpeculativeExecution(false);
job.setJobName("GeoWave Input to Output");
job.setReduceSpeculativeExecution(false);
}
项目:geowave
文件:KSamplerMapReduce.java
@Override
protected void mapWritableValue(
final GeoWaveInputKey key,
final ObjectWritable value,
final Mapper<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context )
throws IOException,
InterruptedException {
// cached for efficiency since the output is the input object
// the de-serialized input object is only used for sampling.
// For simplicity, allow the de-serialization to occur in all cases,
// even though some sampling
// functions do not inspect the input object.
currentValue = value;
super.mapWritableValue(
key,
value,
context);
}
项目:geowave
文件:KSamplerMapReduce.java
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final org.apache.hadoop.mapreduce.Mapper<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context )
throws IOException,
InterruptedException {
@SuppressWarnings("unchecked")
final double rank = samplingFunction.rank(
sampleSize,
(T) value);
if (rank > 0.0000000001) {
final AnalyticItemWrapper<Object> wrapper = itemWrapperFactory.create(value);
outputKey.setDataId(new ByteArrayId(
keyManager.putData(
nestedGroupCentroidAssigner.getGroupForLevel(wrapper),
1.0 - rank, // sorts in ascending order
key.getDataId().getBytes())));
outputKey.setAdapterId(key.getAdapterId());
outputKey.setInsertionId(key.getInsertionId());
context.write(
outputKey,
currentValue);
}
}
项目:geowave
文件:KMeansMapReduce.java
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final org.apache.hadoop.mapreduce.Mapper<GeoWaveInputKey, ObjectWritable, GroupIDText, BytesWritable>.Context context )
throws IOException,
InterruptedException {
final AnalyticItemWrapper<Object> item = itemWrapperFactory.create(value);
nestedGroupCentroidAssigner.findCentroidForLevel(
item,
centroidAssociationFn);
final byte[] outData = association.toBinary();
outputValWritable.set(
outData,
0,
outData.length);
context.write(
outputKeyWritable,
outputValWritable);
}
项目:geowave
文件:UpdateCentroidCostMapReduce.java
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final Mapper<GeoWaveInputKey, ObjectWritable, GroupIDText, CountofDoubleWritable>.Context context )
throws IOException,
InterruptedException {
final AnalyticItemWrapper<Object> wrappedItem = itemWrapperFactory.create(value);
dw.set(
nestedGroupCentroidAssigner.findCentroidForLevel(
wrappedItem,
centroidAssociationFn),
1.0);
context.write(
outputWritable,
dw);
}
项目:geowave
文件:BasicMapReduceIT.java
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final Mapper<GeoWaveInputKey, ObjectWritable, NullWritable, NullWritable>.Context context )
throws IOException,
InterruptedException {
ResultCounterType resultType = ResultCounterType.ERROR;
if (value instanceof SimpleFeature) {
final SimpleFeature result = (SimpleFeature) value;
final Geometry geometry = (Geometry) result.getDefaultGeometry();
if (!geometry.isEmpty()) {
resultType = expectedHashedCentroids.contains(TestUtils.hashCentroid(geometry)) ? ResultCounterType.EXPECTED
: ResultCounterType.UNEXPECTED;
}
}
context.getCounter(
resultType).increment(
1);
}
项目:geowave
文件:BasicMapReduceIT.java
@Override
protected void setup(
final Mapper<GeoWaveInputKey, ObjectWritable, NullWritable, NullWritable>.Context context )
throws IOException,
InterruptedException {
super.setup(context);
final Configuration config = GeoWaveConfiguratorBase.getConfiguration(context);
final String expectedResults = config.get(MapReduceTestUtils.EXPECTED_RESULTS_KEY);
if (expectedResults != null) {
expectedHashedCentroids = new HashSet<Long>();
final byte[] expectedResultsBinary = ByteArrayUtils.byteArrayFromString(expectedResults);
final ByteBuffer buf = ByteBuffer.wrap(expectedResultsBinary);
final int count = buf.getInt();
for (int i = 0; i < count; i++) {
expectedHashedCentroids.add(buf.getLong());
}
}
}
项目:geowave
文件:GeoWaveReducer.java
protected void reduceWritableValues(
final GeoWaveInputKey key,
final Iterable<ObjectWritable> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context )
throws IOException,
InterruptedException {
final HadoopWritableSerializer<?, Writable> serializer = serializationTool
.getHadoopWritableSerializerForAdapter(key.getAdapterId());
final Iterable<Object> transformedValues = Iterables.transform(
values,
new Function<ObjectWritable, Object>() {
@Override
public Object apply(
final ObjectWritable writable ) {
final Object innerObj = writable.get();
return innerObj instanceof Writable ? serializer.fromWritable((Writable) innerObj) : innerObj;
}
});
reduceNativeValues(
key,
transformedValues,
new NativeReduceContext(
context,
serializationTool));
}
项目:geowave
文件:GeoWaveDedupeCombiner.java
@Override
protected void reduce(
final GeoWaveInputKey key,
final Iterable<ObjectWritable> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context )
throws IOException,
InterruptedException {
final Iterator<ObjectWritable> it = values.iterator();
while (it.hasNext()) {
final ObjectWritable next = it.next();
if (next != null) {
context.write(
key,
next);
return;
}
}
}
项目:geowave
文件:StoreCopyReducer.java
@Override
protected void reduceNativeValues(
GeoWaveInputKey key,
Iterable<Object> values,
Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context )
throws IOException,
InterruptedException {
final Iterator<Object> objects = values.iterator();
while (objects.hasNext()) {
final AdapterToIndexMapping mapping = store.getIndicesForAdapter(key.getAdapterId());
context.write(
new GeoWaveOutputKey<>(
mapping.getAdapterId(),
Arrays.asList(mapping.getIndexIds())),
objects.next());
}
}
项目:geowave
文件:GeoWaveWritableInputReducer.java
protected void reduceWritableValues(
final GeoWaveInputKey key,
final Iterable<ObjectWritable> values,
final Reducer<GeoWaveInputKey, ObjectWritable, KEYOUT, VALUEOUT>.Context context )
throws IOException,
InterruptedException {
final HadoopWritableSerializer<?, Writable> serializer = serializationTool
.getHadoopWritableSerializerForAdapter(key.getAdapterId());
final Iterable<Object> transformedValues = Iterables.transform(
values,
new Function<ObjectWritable, Object>() {
@Override
public Object apply(
final ObjectWritable writable ) {
final Object innerObj = writable.get();
return (innerObj instanceof Writable) ? serializer.fromWritable((Writable) innerObj) : innerObj;
}
});
reduceNativeValues(
key,
transformedValues,
context);
}
项目:hortonworks-extension
文件:RPC.java
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
项目:hortonworks-extension
文件:RPC.java
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
项目:hortonworks-extension
文件:RPC.java
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
项目:hortonworks-extension
文件:RPC.java
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
项目:hbase-secondary-index
文件:IndexSpecification.java
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
indexId = in.readUTF();
int numIndexedCols = in.readInt();
indexedColumns = new byte[numIndexedCols][];
for (int i = 0; i < numIndexedCols; i++) {
indexedColumns[i] = Bytes.readByteArray(in);
}
int numAdditionalCols = in.readInt();
additionalColumns = new byte[numAdditionalCols][];
for (int i = 0; i < numAdditionalCols; i++) {
additionalColumns[i] = Bytes.readByteArray(in);
}
makeAllColumns();
keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, CONF);
// FIXME this is to read the deprecated comparator, in existing data
ObjectWritable.readObject(in, CONF);
}
项目:spark_deep
文件:RPC.java
public void readFields(DataInput in) throws IOException {
methodName = UTF8.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for(int i = 0; i < parameters.length; i++){
parameters[i] = ObjectWritable.readObject(in, objectWritable, conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
项目:spark_deep
文件:RPC.java
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for(int i = 0; i < parameterClasses.length; i++){
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf);
}
}
项目:GeoCrawler
文件:ScoreUpdater.java
/**
* Changes input into ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
项目:GeoCrawler
文件:LinkRank.java
/**
* Runs the inverter job. The inverter job flips outlinks to inlinks to be
* passed into the analysis job.
*
* @param nodeDb
* The node database to use.
* @param outlinkDb
* The outlink database to use.
* @param output
* The output directory.
*
* @throws IOException
* If an error occurs while running the inverter job.
*/
private void runInverter(Path nodeDb, Path outlinkDb, Path output)
throws IOException {
// configure the inverter
JobConf inverter = new NutchJob(getConf());
inverter.setJobName("LinkAnalysis Inverter");
FileInputFormat.addInputPath(inverter, nodeDb);
FileInputFormat.addInputPath(inverter, outlinkDb);
FileOutputFormat.setOutputPath(inverter, output);
inverter.setInputFormat(SequenceFileInputFormat.class);
inverter.setMapperClass(Inverter.class);
inverter.setReducerClass(Inverter.class);
inverter.setMapOutputKeyClass(Text.class);
inverter.setMapOutputValueClass(ObjectWritable.class);
inverter.setOutputKeyClass(Text.class);
inverter.setOutputValueClass(LinkDatum.class);
inverter.setOutputFormat(SequenceFileOutputFormat.class);
inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the inverter job
LOG.info("Starting inverter job");
try {
JobClient.runJob(inverter);
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished inverter job.");
}
项目:GeoCrawler
文件:LinkRank.java
/**
* Runs the link analysis job. The link analysis job applies the link rank
* formula to create a score per url and stores that score in the NodeDb.
*
* Typically the link analysis job is run a number of times to allow the link
* rank scores to converge.
*
* @param nodeDb
* The node database from which we are getting previous link rank
* scores.
* @param inverted
* The inverted inlinks
* @param output
* The link analysis output.
* @param iteration
* The current iteration number.
* @param numIterations
* The total number of link analysis iterations
*
* @throws IOException
* If an error occurs during link analysis.
*/
private void runAnalysis(Path nodeDb, Path inverted, Path output,
int iteration, int numIterations, float rankOne) throws IOException {
JobConf analyzer = new NutchJob(getConf());
analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ " of " + numIterations);
FileInputFormat.addInputPath(analyzer, nodeDb);
FileInputFormat.addInputPath(analyzer, inverted);
FileOutputFormat.setOutputPath(analyzer, output);
analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
analyzer.setMapOutputKeyClass(Text.class);
analyzer.setMapOutputValueClass(ObjectWritable.class);
analyzer.setInputFormat(SequenceFileInputFormat.class);
analyzer.setMapperClass(Analyzer.class);
analyzer.setReducerClass(Analyzer.class);
analyzer.setOutputKeyClass(Text.class);
analyzer.setOutputValueClass(Node.class);
analyzer.setOutputFormat(MapFileOutputFormat.class);
analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
LOG.info("Starting analysis job");
try {
JobClient.runJob(analyzer);
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished analysis job.");
}
项目:GeoCrawler
文件:LinkRank.java
/**
* Convert values to ObjectWritable
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
项目:GeoCrawler
文件:LinkRank.java
/**
* Convert values to ObjectWritable
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(WritableUtils.clone(value, conf));
output.collect(key, objWrite);
}
项目:GeoCrawler
文件:LinkDumper.java
/**
* Wraps all values in ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
项目:GeoCrawler
文件:LinkDumper.java
/**
* Inverts outlinks to inlinks while attaching node information to the
* outlink.
*/
public void reduce(Text key, Iterator<ObjectWritable> values,
OutputCollector<Text, LinkNode> output, Reporter reporter)
throws IOException {
String fromUrl = key.toString();
List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
Node node = null;
// loop through all values aggregating outlinks, saving node
while (values.hasNext()) {
ObjectWritable write = values.next();
Object obj = write.get();
if (obj instanceof Node) {
node = (Node) obj;
} else if (obj instanceof LinkDatum) {
outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
}
}
// only collect if there are outlinks
int numOutlinks = node.getNumOutlinks();
if (numOutlinks > 0) {
for (int i = 0; i < outlinks.size(); i++) {
LinkDatum outlink = outlinks.get(i);
String toUrl = outlink.getUrl();
// collect the outlink as an inlink with the node
output.collect(new Text(toUrl), new LinkNode(fromUrl, node));
}
}
}
项目:gora-boot
文件:MapFieldValueFilter.java
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, fieldName);
Text.writeString(out, mapKey.toString());
WritableUtils.writeEnum(out, filterOp);
WritableUtils.writeVInt(out, operands.size());
for (Object operand : operands) {
if (operand instanceof String) {
throw new IllegalStateException("Use Utf8 instead of String for operands");
}
if (operand instanceof Utf8) {
operand = operand.toString();
}
if (operand instanceof Boolean) {
ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
} else if (operand instanceof Character) {
ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
} else if (operand instanceof Byte) {
ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
} else if (operand instanceof Short) {
ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
} else if (operand instanceof Integer) {
ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
} else if (operand instanceof Long) {
ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
} else if (operand instanceof Float) {
ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
} else if (operand instanceof Double) {
ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
} else if (operand instanceof Void) {
ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
} else {
ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
}
}
out.writeBoolean(filterIfMissing);
}
项目:gora-boot
文件:SingleFieldValueFilter.java
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, fieldName);
WritableUtils.writeEnum(out, filterOp);
WritableUtils.writeVInt(out, operands.size());
for (Object operand : operands) {
if (operand instanceof String) {
throw new IllegalStateException("Use Utf8 instead of String for operands");
}
if (operand instanceof Utf8) {
operand = operand.toString();
}
if (operand instanceof Boolean) {
ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
} else if (operand instanceof Character) {
ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
} else if (operand instanceof Byte) {
ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
} else if (operand instanceof Short) {
ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
} else if (operand instanceof Integer) {
ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
} else if (operand instanceof Long) {
ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
} else if (operand instanceof Float) {
ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
} else if (operand instanceof Double) {
ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
} else if (operand instanceof Void) {
ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
} else {
ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
}
}
out.writeBoolean(filterIfMissing);
}
项目:anthelion
文件:ScoreUpdater.java
/**
* Changes input into ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}