Java 类org.apache.hadoop.mapreduce.Counter 实例源码
项目:hadoop
文件:ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
项目:circus-train
文件:JobMetrics.java
public JobMetrics(Job job, String bytesReplicatedKey) {
Builder<String, Long> builder = ImmutableMap.builder();
if (job != null) {
Counters counters;
try {
counters = job.getCounters();
} catch (IOException e) {
throw new CircusTrainException("Unable to get counters from job.", e);
}
if (counters != null) {
for (CounterGroup group : counters) {
for (Counter counter : group) {
builder.put(DotJoiner.join(group.getName(), counter.getName()), counter.getValue());
}
}
}
}
metrics = builder.build();
Long bytesReplicatedValue = metrics.get(bytesReplicatedKey);
if (bytesReplicatedValue != null) {
bytesReplicated = bytesReplicatedValue;
} else {
bytesReplicated = 0L;
}
}
项目:hadoop
文件:JobHistoryEventHandler.java
@Private
public JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode nodes = mapper.createArrayNode();
if (counters != null) {
for (CounterGroup counterGroup : counters) {
ObjectNode groupNode = nodes.addObject();
groupNode.put("NAME", counterGroup.getName());
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
ArrayNode countersNode = groupNode.putArray("COUNTERS");
for (Counter counter : counterGroup) {
ObjectNode counterNode = countersNode.addObject();
counterNode.put("NAME", counter.getName());
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
counterNode.put("VALUE", counter.getValue());
}
}
}
return nodes;
}
项目:hadoop
文件:AbstractCounterGroup.java
/**
* GenericGroup ::= displayName #counter counter*
*/
@Override
public synchronized void write(DataOutput out) throws IOException {
Text.writeString(out, displayName);
WritableUtils.writeVInt(out, counters.size());
for(Counter counter: counters.values()) {
counter.write(out);
}
}
项目:hadoop
文件:AbstractCounters.java
/**
* Construct from another counters object.
* @param <C1> type of the other counter
* @param <G1> type of the other counter group
* @param counters the counters object to copy
* @param groupFactory the factory for new groups
*/
@InterfaceAudience.Private
public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters,
CounterGroupFactory<C, G> groupFactory) {
this.groupFactory = groupFactory;
for(G1 group: counters) {
String name = group.getName();
G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
(isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
for(Counter counter: group) {
newGroup.addCounter(counter.getName(), counter.getDisplayName(),
counter.getValue());
}
}
}
项目:hadoop
文件:AbstractCounters.java
/**
* Return textual representation of the counter values.
* @return the string
*/
@Override
public synchronized String toString() {
StringBuilder sb = new StringBuilder("Counters: " + countCounters());
for (G group: this) {
sb.append("\n\t").append(group.getDisplayName());
for (Counter counter: group) {
sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
.append(counter.getValue());
}
}
return sb.toString();
}
项目:hadoop
文件:FileSystemCounterGroup.java
/**
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme
// #counter for the above scheme
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
}
}
项目:hadoop
文件:EventWriter.java
static JhCounters toAvro(Counters counters, String name) {
JhCounters result = new JhCounters();
result.name = new Utf8(name);
result.groups = new ArrayList<JhCounterGroup>(0);
if (counters == null) return result;
for (CounterGroup group : counters) {
JhCounterGroup g = new JhCounterGroup();
g.name = new Utf8(group.getName());
g.displayName = new Utf8(group.getDisplayName());
g.counts = new ArrayList<JhCounter>(group.size());
for (Counter counter : group) {
JhCounter c = new JhCounter();
c.name = new Utf8(counter.getName());
c.displayName = new Utf8(counter.getDisplayName());
c.value = counter.getValue();
g.counts.add(c);
}
result.groups.add(g);
}
return result;
}
项目:ditb
文件:TableRecordReaderImpl.java
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context, long numStale) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (getCounter == null) {
return;
}
try {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());
ct.increment(entry.getValue());
}
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCAN_RESULTS_STALE")).increment(numStale);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}
}
项目:ditb
文件:IntegrationTestBigLinkedList.java
/**
* Verify the values in the Counters against the expected number of entries written.
*
* @param expectedReferenced
* Expected number of referenced entrires
* @param counters
* The Job's Counters object
* @return True if the values match what's expected, false otherwise
*/
protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
final Counter referenced = counters.findCounter(Counts.REFERENCED);
final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
boolean success = true;
if (expectedReferenced != referenced.getValue()) {
LOG.error("Expected referenced count does not match with actual referenced count. " +
"expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
success = false;
}
if (unreferenced.getValue() > 0) {
final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
+ (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
success = false;
}
return success;
}
项目:ditb
文件:IntegrationTestBigLinkedList.java
/**
* Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
*
* @param counters
* The Job's counters
* @return True if the "bad" counter objects are 0, false otherwise
*/
protected boolean verifyUnexpectedValues(Counters counters) {
final Counter undefined = counters.findCounter(Counts.UNDEFINED);
final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
boolean success = true;
if (undefined.getValue() > 0) {
LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
success = false;
}
if (lostfamilies.getValue() > 0) {
LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
success = false;
}
return success;
}
项目:aliyun-oss-hadoop-fs
文件:TaskCounterGroupInfo.java
public TaskCounterGroupInfo(String name, CounterGroup group) {
this.counterGroupName = name;
this.counter = new ArrayList<TaskCounterInfo>();
for (Counter c : group) {
TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
this.counter.add(cinfo);
}
}
项目:aliyun-oss-hadoop-fs
文件:JobHistoryEventHandler.java
@Private
public JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode nodes = mapper.createArrayNode();
if (counters != null) {
for (CounterGroup counterGroup : counters) {
ObjectNode groupNode = nodes.addObject();
groupNode.put("NAME", counterGroup.getName());
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
ArrayNode countersNode = groupNode.putArray("COUNTERS");
for (Counter counter : counterGroup) {
ObjectNode counterNode = countersNode.addObject();
counterNode.put("NAME", counter.getName());
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
counterNode.put("VALUE", counter.getValue());
}
}
}
return nodes;
}
项目:aliyun-oss-hadoop-fs
文件:AbstractCounterGroup.java
/**
* GenericGroup ::= displayName #counter counter*
*/
@Override
public synchronized void write(DataOutput out) throws IOException {
Text.writeString(out, displayName);
WritableUtils.writeVInt(out, counters.size());
for(Counter counter: counters.values()) {
counter.write(out);
}
}
项目:aliyun-oss-hadoop-fs
文件:AbstractCounters.java
/**
* Construct from another counters object.
* @param <C1> type of the other counter
* @param <G1> type of the other counter group
* @param counters the counters object to copy
* @param groupFactory the factory for new groups
*/
@InterfaceAudience.Private
public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters,
CounterGroupFactory<C, G> groupFactory) {
this.groupFactory = groupFactory;
for(G1 group: counters) {
String name = group.getName();
G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
(isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
for(Counter counter: group) {
newGroup.addCounter(counter.getName(), counter.getDisplayName(),
counter.getValue());
}
}
}
项目:aliyun-oss-hadoop-fs
文件:FileSystemCounterGroup.java
/**
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme
// #counter for the above scheme
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
}
}
项目:aliyun-oss-hadoop-fs
文件:EventWriter.java
static JhCounters toAvro(Counters counters, String name) {
JhCounters result = new JhCounters();
result.setName(new Utf8(name));
result.setGroups(new ArrayList<JhCounterGroup>(0));
if (counters == null) return result;
for (CounterGroup group : counters) {
JhCounterGroup g = new JhCounterGroup();
g.setName(new Utf8(group.getName()));
g.setDisplayName(new Utf8(group.getDisplayName()));
g.setCounts(new ArrayList<JhCounter>(group.size()));
for (Counter counter : group) {
JhCounter c = new JhCounter();
c.setName(new Utf8(counter.getName()));
c.setDisplayName(new Utf8(counter.getDisplayName()));
c.setValue(counter.getValue());
g.getCounts().add(c);
}
result.getGroups().add(g);
}
return result;
}
项目:big-c
文件:JobHistoryEventHandler.java
@Private
public JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode nodes = mapper.createArrayNode();
if (counters != null) {
for (CounterGroup counterGroup : counters) {
ObjectNode groupNode = nodes.addObject();
groupNode.put("NAME", counterGroup.getName());
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
ArrayNode countersNode = groupNode.putArray("COUNTERS");
for (Counter counter : counterGroup) {
ObjectNode counterNode = countersNode.addObject();
counterNode.put("NAME", counter.getName());
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
counterNode.put("VALUE", counter.getValue());
}
}
}
return nodes;
}
项目:TopPI
文件:TopPIoverHadoop.java
private boolean bigItemCount(String output) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(this.getConf(), "Counting items from " + this.input);
job.setJarByClass(TopPIoverHadoop.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(this.input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.setMapperClass(ItemBigCountingMapper.class);
job.setReducerClass(ItemBigCountingReducer.class);
boolean success = job.waitForCompletion(true);
if (success) {
Counter rebasingMaxID = job.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
this.getConf().setInt(KEY_REBASING_MAX_ID, (int) rebasingMaxID.getValue());
}
return success;
}
项目:big-c
文件:AbstractCounterGroup.java
/**
* GenericGroup ::= displayName #counter counter*
*/
@Override
public synchronized void write(DataOutput out) throws IOException {
Text.writeString(out, displayName);
WritableUtils.writeVInt(out, counters.size());
for(Counter counter: counters.values()) {
counter.write(out);
}
}
项目:big-c
文件:AbstractCounters.java
/**
* Construct from another counters object.
* @param <C1> type of the other counter
* @param <G1> type of the other counter group
* @param counters the counters object to copy
* @param groupFactory the factory for new groups
*/
@InterfaceAudience.Private
public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters,
CounterGroupFactory<C, G> groupFactory) {
this.groupFactory = groupFactory;
for(G1 group: counters) {
String name = group.getName();
G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
(isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
for(Counter counter: group) {
newGroup.addCounter(counter.getName(), counter.getDisplayName(),
counter.getValue());
}
}
}
项目:big-c
文件:FileSystemCounterGroup.java
/**
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme
// #counter for the above scheme
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
}
}
项目:big-c
文件:EventWriter.java
static JhCounters toAvro(Counters counters, String name) {
JhCounters result = new JhCounters();
result.name = new Utf8(name);
result.groups = new ArrayList<JhCounterGroup>(0);
if (counters == null) return result;
for (CounterGroup group : counters) {
JhCounterGroup g = new JhCounterGroup();
g.name = new Utf8(group.getName());
g.displayName = new Utf8(group.getDisplayName());
g.counts = new ArrayList<JhCounter>(group.size());
for (Counter counter : group) {
JhCounter c = new JhCounter();
c.name = new Utf8(counter.getName());
c.displayName = new Utf8(counter.getDisplayName());
c.value = counter.getValue();
g.counts.add(c);
}
result.groups.add(g);
}
return result;
}
项目:hadoop
文件:TaskCounterGroupInfo.java
public TaskCounterGroupInfo(String name, CounterGroup group) {
this.counterGroupName = name;
this.counter = new ArrayList<TaskCounterInfo>();
for (Counter c : group) {
TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
this.counter.add(cinfo);
}
}
项目:hadoop
文件:CounterGroupInfo.java
public CounterGroupInfo(String name, CounterGroup group, CounterGroup mg,
CounterGroup rg) {
this.counterGroupName = name;
this.counter = new ArrayList<CounterInfo>();
for (Counter c : group) {
Counter mc = mg == null ? null : mg.findCounter(c.getName());
Counter rc = rg == null ? null : rg.findCounter(c.getName());
CounterInfo cinfo = new CounterInfo(c, mc, rc);
this.counter.add(cinfo);
}
}
项目:hadoop
文件:TaskAttemptImpl.java
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
项目:hadoop
文件:JobHistoryEventHandler.java
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
}
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
}
}
项目:hadoop
文件:AbstractCounterGroup.java
@Override
public void incrAllCounters(CounterGroupBase<T> rightGroup) {
try {
for (Counter right : rightGroup) {
Counter left = findCounter(right.getName(), right.getDisplayName());
left.increment(right.getValue());
}
} catch (LimitExceededException e) {
counters.clear();
throw e;
}
}
项目:hadoop
文件:AbstractCounter.java
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof Counter) {
synchronized (genericRight) {
Counter right = (Counter) genericRight;
return getName().equals(right.getName()) &&
getDisplayName().equals(right.getDisplayName()) &&
getValue() == right.getValue();
}
}
return false;
}
项目:hadoop
文件:FrameworkCounterGroup.java
@Override
@SuppressWarnings("rawtypes")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other, "other counter group")
instanceof FrameworkCounterGroup<?, ?>) {
for (Counter counter : other) {
C c = findCounter(((FrameworkCounter) counter).key.name());
if (c != null) {
c.increment(counter.getValue());
}
}
}
}
项目:hadoop
文件:FrameworkCounterGroup.java
/**
* FrameworkGroup ::= #counter (key value)*
*/
@Override
@SuppressWarnings("unchecked")
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, size());
for (int i = 0; i < counters.length; ++i) {
Counter counter = (C) counters[i];
if (counter != null) {
WritableUtils.writeVInt(out, i);
WritableUtils.writeVLong(out, counter.getValue());
}
}
}
项目:hadoop
文件:FrameworkCounterGroup.java
@Override
public void readFields(DataInput in) throws IOException {
clear();
int len = WritableUtils.readVInt(in);
T[] enums = enumClass.getEnumConstants();
for (int i = 0; i < len; ++i) {
int ord = WritableUtils.readVInt(in);
Counter counter = newCounter(enums[ord]);
counter.setValue(WritableUtils.readVLong(in));
counters[ord] = counter;
}
}
项目:hadoop
文件:FileSystemCounterGroup.java
@Override
@SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other.getUnderlyingGroup(), "other group")
instanceof FileSystemCounterGroup<?>) {
for (Counter counter : other) {
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
findCounter(c.scheme, c.key) .increment(counter.getValue());
}
}
}
项目:hadoop
文件:CountersStrings.java
/**
* Make the pre 0.21 counter string (for e.g. old job history files)
* [(actual-name)(display-name)(value)]
* @param counter to stringify
* @return the stringified result
*/
public static String toEscapedCompactString(Counter counter) {
// First up, obtain the strings that need escaping. This will help us
// determine the buffer length apriori.
String escapedName, escapedDispName;
long currentValue;
synchronized(counter) {
escapedName = escape(counter.getName());
escapedDispName = escape(counter.getDisplayName());
currentValue = counter.getValue();
}
int length = escapedName.length() + escapedDispName.length() + 4;
length += 8; // For the following delimiting characters
StringBuilder builder = new StringBuilder(length);
builder.append(COUNTER_OPEN);
// Add the counter name
builder.append(UNIT_OPEN);
builder.append(escapedName);
builder.append(UNIT_CLOSE);
// Add the display name
builder.append(UNIT_OPEN);
builder.append(escapedDispName);
builder.append(UNIT_CLOSE);
// Add the value
builder.append(UNIT_OPEN);
builder.append(currentValue);
builder.append(UNIT_CLOSE);
builder.append(COUNTER_CLOSE);
return builder.toString();
}
项目:ditb
文件:TestRowCounter.java
/**
* Run the RowCounter map reduce job and verify the row count.
*
* @param args the command line arguments to be used for rowcounter job.
* @param expectedCount the expected row count (result of map reduce job).
* @throws Exception
*/
private void runRowCount(String[] args, int expectedCount)
throws Exception {
GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args);
Configuration conf = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = RowCounter.createSubmittableJob(conf, args);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
assertEquals(expectedCount, counter.getValue());
}
项目:ditb
文件:IntegrationTestWithCellVisibilityLoadAndVerify.java
private Counter getCounter(int idx) {
switch (idx) {
case 0:
return rowsExp1;
case 1:
return rowsExp2;
case 2:
return rowsExp3;
case 3:
return rowsexp4;
default:
return null;
}
}
项目:ditb
文件:IntegrationTestWithCellVisibilityLoadAndVerify.java
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
byte[] row = value.getRow();
Counter c = getCounter(row);
c.increment(1);
}
项目:ditb
文件:IntegrationTestWithCellVisibilityLoadAndVerify.java
private Counter getCounter(byte[] row) {
Counter c = null;
if (Bytes.indexOf(row, Bytes.toBytes(VISIBILITY_EXPS[0])) != -1) {
c = rowsExp1;
} else if (Bytes.indexOf(row, Bytes.toBytes(VISIBILITY_EXPS[1])) != -1) {
c = rowsExp2;
} else if (Bytes.indexOf(row, Bytes.toBytes(VISIBILITY_EXPS[2])) != -1) {
c = rowsExp3;
} else if (Bytes.indexOf(row, Bytes.toBytes(VISIBILITY_EXPS[3])) != -1) {
c = rowsExp4;
}
return c;
}
项目:aliyun-oss-hadoop-fs
文件:CounterGroupInfo.java
public CounterGroupInfo(String name, CounterGroup group, CounterGroup mg,
CounterGroup rg) {
this.counterGroupName = name;
this.counter = new ArrayList<CounterInfo>();
for (Counter c : group) {
Counter mc = mg == null ? null : mg.findCounter(c.getName());
Counter rc = rg == null ? null : rg.findCounter(c.getName());
CounterInfo cinfo = new CounterInfo(c, mc, rc);
this.counter.add(cinfo);
}
}
项目:aliyun-oss-hadoop-fs
文件:TaskAttemptImpl.java
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}