@PostConstruct public void initialize() { for (Metric<?> metric : systemPublicMetrics.metrics()) { Gauge<Long> metricGauge = () -> metric.getValue().longValue(); String name = metric.getName(); if (!name.contains(".")) { name += ".total"; } registry.register(name, metricGauge); } final MackerelSender sender = new MackerelSender(serviceName, apiKey); final MackerelReporter reporter = MackerelReporter .forRegistry(registry) .build(sender); reporter.start(1, TimeUnit.MINUTES); }
@Override public void serialize(Metric metric, JsonGenerator json, SerializerProvider serializerProvider) throws IOException { json.writeStartObject(); InternalFormatters formatters = formattersQueue.poll(); if (formatters == null) { formatters = new InternalFormatters(); } json.writeStringField("name", metric.getName()); try { json.writeNumberField("value", formatters.getDecimalFormat() .parse(formatters.getDecimalFormat().format(metric.getValue().doubleValue())).doubleValue()); } catch (ParseException e) { e.printStackTrace(); } json.writeStringField("timestamp", formatters.getDateFormat().format(metric.getTimestamp())); json.writeEndObject(); formattersQueue.offer(formatters); }
@Override public Metric deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException { JsonNode node = p.getCodec().readTree(p); String name = node.get("name").asText(); Number value = node.get("value").asDouble(); Date timestamp = null; InternalFormatters formatters = formattersQueue.poll(); if (formatters == null) { formatters = new InternalFormatters(); } try { timestamp = formatters.getDateFormat().parse(node.get("timestamp").asText()); } catch (ParseException e) { } finally { formattersQueue.offer(formatters); } Metric<Number> metric = new Metric(name, value, timestamp); return metric; }
@Test public void includeOneMetric() throws Exception { Long now = System.currentTimeMillis(); Metric<Double> inputSendCount = new Metric<Double>("integration.channel.input.sendCount",10.0, new Date(now)); ApplicationMetrics app = createMetrics("httpIngest", "http", "foo",0); app.getMetrics().add(inputSendCount); Cache<String, LinkedList<ApplicationMetrics>> rawCache = Caffeine.newBuilder().build(); ApplicationMetricsService service = new ApplicationMetricsService(rawCache); MetricsAggregator aggregator = new MetricsAggregator(service); MetricsCollectorEndpoint endpoint = new MetricsCollectorEndpoint(service); aggregator.receive(app); Assert.assertEquals(1, rawCache.estimatedSize()); StreamMetrics streamMetrics = endpoint.fetchMetrics("").getBody().iterator().next(); Application application = streamMetrics.getApplications().get(0); Assert.assertNotNull(streamMetrics); Assert.assertEquals("http", application.getName()); Instance instance = application.getInstances().get(0); Assert.assertEquals(app.getName(),instance.getKey()); Assert.assertEquals("foo", instance.getGuid()); Metric<Double> computed = instance.getMetrics().stream().filter(metric -> metric.getName().equals("integration.channel.input.send.mean")).findFirst().get(); Assert.assertEquals(0, computed.getValue(),0.0); }
@Override public Collection<Metric<?>> metrics() { final ThreadPoolExecutor executor = this.threadPoolTaskExecutor.getThreadPoolExecutor(); final BlockingQueue<Runnable> queue = executor.getQueue(); return asList( new Metric<>(prefix + THREAD_POOL + ".poolSize", executor.getPoolSize()), new Metric<>(prefix + THREAD_POOL + ".corePoolSize", executor.getCorePoolSize()), new Metric<>(prefix + THREAD_POOL + ".largestPoolSize", executor.getLargestPoolSize()), new Metric<>(prefix + THREAD_POOL + ".maximumPoolSize", executor.getMaximumPoolSize()), new Metric<>(prefix + THREAD_POOL + ".activeCount", executor.getActiveCount()), new Metric<>(prefix + THREAD_POOL + ".completedTaskCount", executor.getCompletedTaskCount()), new Metric<>(prefix + THREAD_POOL + ".taskPoolSize", executor.getTaskCount()), new Metric<>(prefix + THREAD_POOL + ".keepAliveTimeNanoSeconds", executor.getKeepAliveTime(NANOSECONDS)), new Metric<>(prefix + THREAD_POOL + QUEUE + ".size", queue.size()), new Metric<>(prefix + THREAD_POOL + QUEUE + ".remainingCapacity", queue.remainingCapacity()) ); }
@Test public void shouldIngestMetricWithDefaultGlobalTags() { // Given when(statfulMetricProcessor.validate(any())).thenReturn(true); List<ProcessedMetric> processedMetrics = Lists.newArrayList(getProcessedMetric()); when(statfulMetricProcessor.process(any())).thenReturn(processedMetrics); Metric metric = new Metric<>(METRIC_NAME, 1L); // When subject.ingestMetric(metric); // Then ArgumentCaptor<Tags> tags = ArgumentCaptor.forClass(Tags.class); verify(statfulClient).put(eq(INGESTED_METRIC_NAME), eq(Double.toString(1L)), tags.capture(), eq(null), eq(null), anyInt(), eq(NAMESPACE), anyInt()); assertEquals(GLOBAL_TAGS.getValue(), tags.getValue().getTagValue(GLOBAL_TAGS.getName())); }
@Test public void shouldIngestMetricWithAggregations() { // Given when(statfulMetricProcessor.validate(any())).thenReturn(true); List<ProcessedMetric> processedMetrics = Lists.newArrayList(getProcessedMetricWithAggregations()); when(statfulMetricProcessor.process(any())).thenReturn(processedMetrics); Metric metric = new Metric<>(METRIC_NAME, 1L); // When subject.ingestMetric(metric); // Then ArgumentCaptor<Aggregations> aggregations = ArgumentCaptor.forClass(Aggregations.class); verify(statfulClient).put(eq(INGESTED_METRIC_NAME), eq(Double.toString(1L)), any(Tags.class), aggregations.capture(), eq(null), anyInt(), eq(NAMESPACE), anyInt()); assertEquals(Aggregation.AVG, aggregations.getValue().getAggregations().stream().findFirst().get()); }
@Theory public void reader(String input) throws Exception { iterate("writeReader"); double rate = number / watch.getLastTaskTimeMillis() * 1000; System.err.println("Rate(" + count + ")=" + rate + ", " + watch); watch.start("readReader" + count); this.reader.findAll().forEach(new Consumer<Metric<?>>() { @Override public void accept(Metric<?> metric) { err.println(metric); } }); final LongAdder total = new LongAdder(); this.reader.findAll().forEach(new Consumer<Metric<?>>() { @Override public void accept(Metric<?> value) { total.add(value.getValue().intValue()); } }); watch.stop(); System.err.println("Read(" + count + ")=" + watch.getLastTaskTimeMillis() + "ms"); assertThat(total.longValue()).isEqualTo(number * threadCount); }
@Test public void testMetrics() { rabbitQueueMetrics.addQueue(generateQueue("test.queue")); propertiesManager.request(EasyMock.anyObject(Queue.class)); PowerMock.expectLastCall().andAnswer(() -> { RabbitQueueProperties properties = new RabbitQueueProperties(); properties.setMessageCount(5432); properties.setConsumerCount(2); return properties; }); PowerMock.replayAll(); List<Metric<?>> metrics = new ArrayList<>(rabbitQueueMetrics.metrics()); PowerMock.verifyAll(); Assert.assertEquals(2, metrics.size()); Assert.assertEquals("rabbit.queue.test_queue.currentMessageCount", metrics.get(0).getName()); Assert.assertEquals(5432, metrics.get(0).getValue()); Assert.assertEquals("rabbit.queue.test_queue.currentConsumerCount", metrics.get(1).getName()); Assert.assertEquals(2, metrics.get(1).getValue()); }
@Test public void testSetHystrixThreadPool() { Metric<Double> metric = new Metric<>("hystrix.HystrixThreadPool.foo.bar", 42D); writer.set(metric); List<Event> events = recorder.play(); assertThat(events).hasSize(1); Event event = events.get(0); assertThat(event.getMessage()).isEqualTo(SpectatorLogMetricWriter.SET_MESSAGE); Object[] args = event.getArguments(); assertThat(args).hasSize(5); assertThat(args[0]).isEqualTo("hystrix.HystrixThreadPool"); assertThat(args[1]).isEqualTo("foo"); assertThat(args[2]).isEqualTo("bar"); assertThat(args[3]).isEqualTo("foo.bar"); assertThat(args[4]).isEqualTo(42D); }
@Override public Iterable<Metric<?>> findAll() { return new Iterable<Metric<?>>() { @Override public Iterator<Metric<?>> iterator() { Set<Metric<?>> metrics = new HashSet<Metric<?>>(); for (String name : MetricRegistryMetricReader.this.names.keySet()) { Metric<?> metric = findOne(name); if (metric != null) { metrics.add(metric); } } return metrics.iterator(); } }; }
/** * Add metrics from ManagementFactory if possible. Note that ManagementFactory is not * available on Google App Engine. * @param result the result */ private void addManagementMetrics(Collection<Metric<?>> result) { try { // Add JVM up time in ms result.add(new Metric<Long>("uptime", ManagementFactory.getRuntimeMXBean().getUptime())); result.add(new Metric<Double>("systemload.average", ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage())); addHeapMetrics(result); addNonHeapMetrics(result); addThreadMetrics(result); addClassLoadingMetrics(result); addGarbageCollectionMetrics(result); } catch (NoClassDefFoundError ex) { // Expected on Google App Engine } }
@Override public Iterable<Metric<?>> findAll(String group) { BoundZSetOperations<String, String> zSetOperations = this.redisOperations .boundZSetOps(keyFor(group)); Set<String> keys = zSetOperations.range(0, -1); Iterator<String> keysIt = keys.iterator(); List<Metric<?>> result = new ArrayList<Metric<?>>(keys.size()); List<String> values = this.redisOperations.opsForValue().multiGet(keys); for (String v : values) { String key = keysIt.next(); result.add(deserialize(group, key, v, zSetOperations.score(key))); } return result; }
@Test public void provideAdditionalWriter() { this.context = new AnnotationConfigApplicationContext(WriterConfig.class, MetricRepositoryAutoConfiguration.class, MetricExportAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class); GaugeService gaugeService = this.context.getBean(GaugeService.class); assertThat(gaugeService).isNotNull(); gaugeService.submit("foo", 2.7); MetricExporters exporters = this.context.getBean(MetricExporters.class); MetricCopyExporter exporter = (MetricCopyExporter) exporters.getExporters() .get("writer"); exporter.setIgnoreTimestamps(true); exporter.export(); MetricWriter writer = this.context.getBean("writer", MetricWriter.class); Mockito.verify(writer, Mockito.atLeastOnce()).set(Matchers.any(Metric.class)); }
@JsonCreator public ApplicationMetrics(@JsonProperty("name") String name, @JsonProperty("metrics") Collection<Metric<Double>> metrics) { this.name = name; this.metrics = metrics; this.createdTime = new Date(); }
public Collection<Metric<Double>> getAggregateMetrics() { return getInstances().stream().map(instance -> instance.getMetrics()).flatMap(metrics -> metrics.stream()) .filter(metric -> metric.getName().matches("integration\\.channel\\.(\\w*)\\.send\\.mean")) .collect(Collectors.groupingBy(Metric::getName, Collectors.summingDouble(Metric::getValue))).entrySet() .stream().map(entry -> new Metric<Double>(entry.getKey(), entry.getValue(), new Date())) .collect(Collectors.toList()); }
private List<Metric<Double>> computeRate(List<ApplicationMetrics> applicationMetricsList) { List<Metric<Double>> result = new ArrayList<>(); ApplicationMetrics applicationMetrics = applicationMetricsList.get(0); for (Metric<Double> metric : applicationMetrics.getMetrics()) { Matcher matcher = pattern.matcher(metric.getName()); if (matcher.matches()) { Metric previous = applicationMetricsList.size() < 2 ? null : findMetric(applicationMetricsList.get(1).getMetrics(), metric.getName()); result.add(new Metric<Double>("integration.channel." + matcher.group(1) + ".send.mean", delta(metric, previous))); } } return result; }
private Double delta(Metric<Double> current, Metric<Double> previous) { if (previous == null) { return 0.0; } else { return (current.getValue() - previous.getValue()) / (current.getTimestamp().getTime() - previous.getTimestamp().getTime()) * 1000; } }
private Metric<Double> findMetric(Collection<Metric<Double>> metrics, String name) { Metric<Double> result = null; Optional<Metric<Double>> optinal = metrics.stream().filter(metric -> metric.getName().equals(name)).findFirst(); if (optinal.isPresent()) { result = optinal.get(); } else { result = new Metric<Double>(name, 0.0); } return result; }
@Test public void incrementMetric() throws Exception { Long now = System.currentTimeMillis(); Metric<Double> inputSendCount = new Metric<Double>("integration.channel.input.sendCount",10.0, new Date(now)); ApplicationMetrics app = createMetrics("httpIngest", "http", "foo", 0); app.getMetrics().add(inputSendCount); Cache<String, LinkedList<ApplicationMetrics>> rawCache = Caffeine.newBuilder().build(); ApplicationMetricsService service = new ApplicationMetricsService(rawCache); MetricsAggregator aggregator = new MetricsAggregator(service); MetricsCollectorEndpoint endpoint = new MetricsCollectorEndpoint(service); aggregator.receive(app); Assert.assertEquals(1, rawCache.estimatedSize()); StreamMetrics streamMetrics = endpoint.fetchMetrics("").getBody().iterator().next(); Application application = streamMetrics.getApplications().get(0); Assert.assertNotNull(streamMetrics); Assert.assertEquals("http", application.getName()); Instance instance = application.getInstances().get(0); Assert.assertEquals("foo", instance.getGuid()); Metric<Double> inputSendCount2 = new Metric<Double>("integration.channel.input.sendCount",110.0, new Date(now+5000)); ApplicationMetrics app2 = createMetrics("httpIngest", "http", "foo", 0); app2.getMetrics().add(inputSendCount2); aggregator.receive(app2); Assert.assertEquals(1, rawCache.estimatedSize()); streamMetrics = endpoint.fetchMetrics("").getBody().iterator().next(); application = streamMetrics.getApplications().get(0); Assert.assertNotNull(streamMetrics); Assert.assertEquals("http", application.getName()); instance = application.getInstances().get(0); Assert.assertEquals("foo", instance.getGuid()); Metric<Double> computed = instance.getMetrics().stream().filter(metric -> metric.getName().equals("integration.channel.input.send.mean")).findFirst().get(); Assert.assertEquals(20.0, computed.getValue(),0.0); }
@Test public void aggregateMetricsTest() throws Exception { Cache<String, LinkedList<ApplicationMetrics>> rawCache = Caffeine.newBuilder().build(); ApplicationMetricsService service = new ApplicationMetricsService(rawCache); MetricsAggregator aggregator = new MetricsAggregator(service); MetricsCollectorEndpoint endpoint = new MetricsCollectorEndpoint(service); Long now = System.currentTimeMillis(); Metric<Double> inputSendCount = new Metric<Double>("integration.channel.input.sendCount",0.0, new Date(now)); ApplicationMetrics app = createMetrics("httpIngest", "http", "foo", 0); app.getMetrics().add(inputSendCount); ApplicationMetrics app2 = createMetrics("httpIngest", "http", "bar", 1); app2.getMetrics().add(inputSendCount); aggregator.receive(app); aggregator.receive(app2); Metric<Double> inputSendCount2 = new Metric<Double>("integration.channel.input.sendCount",10.0, new Date(now+5000)); ApplicationMetrics app3 = createMetrics("httpIngest", "http", "foo", 0); app3.getMetrics().add(inputSendCount2); ApplicationMetrics app4 = createMetrics("httpIngest", "http", "bar", 1); app4.getMetrics().add(inputSendCount2); aggregator.receive(app3); aggregator.receive(app4); StreamMetrics streamMetrics = endpoint.fetchMetrics("").getBody().iterator().next(); Metric<Double> aggregate = streamMetrics.getApplications().get(0).getAggregateMetrics().iterator().next(); Assert.assertEquals(4.0,aggregate.getValue(),0.0); }
private ApplicationMetrics createMetrics(String streamName, String applicationName, String appGuid, Integer index, List<Metric<Double>> metrics) { ApplicationMetrics applicationMetrics = new ApplicationMetrics( streamName + "." + applicationName + "." + appGuid, new LinkedList<>()); Map<String, Object> properties = new HashMap<>(); properties.put(ApplicationMetrics.STREAM_NAME, streamName); properties.put(ApplicationMetrics.APPLICATION_NAME, applicationName); properties.put(ApplicationMetrics.APPLICATION_GUID, appGuid); properties.put(ApplicationMetrics.INSTANCE_INDEX, index.toString()); applicationMetrics.setProperties(properties); applicationMetrics.setMetrics(metrics); return applicationMetrics; }
@Test public void sprechenSieDeutsch() throws Exception { Locale.setDefault(Locale.GERMANY); ObjectMapper mapper = new ObjectMapper(); SimpleModule module = new SimpleModule(); module.addDeserializer(Metric.class,new MetricJsonSerializer.Deserializer()); module.addSerializer(Metric.class,new MetricJsonSerializer.Serializer()); mapper.registerModule(module); Metric sample = new Metric("sample",3.14159, new Date()); String json = mapper.writeValueAsString(sample); Metric m = mapper.readValue(json,Metric.class); Assert.assertEquals(3.14, m.getValue().doubleValue(),0.0); }
@Override public void set(Metric<?> metric) { String metricContent = metric.getName(); String[] metricSplit = metricContent.split("\\."); String hystrixType=""; String serviceName=""; String methodName=""; String metricName=metricContent; // format different types of hystrix metrics if (metricSplit[2].equals("RibbonCommand")){ hystrixType = "hystrix.HystrixCommand.RibbonCommand"; serviceName = metricSplit[3]; // remove prefix metricName = metricContent.substring(37); } else { if (metricSplit[1].equals("HystrixCommand")){ hystrixType = "hystrix.HystrixCommand"; serviceName = metricSplit[2]; methodName= metricSplit[3]; metricName= metricContent.substring(23); } if (metricSplit[1].equals("HystrixThreadPool")){ hystrixType = "hystrix.HystrixThreadPool"; serviceName = metricSplit[2]; methodName= metricSplit[3]; metricName= metricContent.substring(26); } } log.info("type=GAUGE, hystrix_type={}, service={}, method={}, name={}, value={}", hystrixType, serviceName, methodName, metricName, metric.getValue()); }
@Test public void exportExcludesDefaultIncludes() { this.exporter.setExcludes("foo"); this.reader.set(new Metric<Number>("foo", 2.3)); this.reader.set(new Metric<Number>("bar", 2.4)); this.exporter.export(); assertThat(this.writer.count()).isEqualTo(1); }
@Test public void setAndGetMultiple() { this.repository.set("foo", Arrays.<Metric<?>>asList(new Metric<Number>("foo.val", 12.3), new Metric<Number>("foo.bar", 11.3))); assertThat(Iterables.collection(this.repository.findAll("foo"))).hasSize(2); }
@Test public void shouldIngestMetricOnSet() { // When Metric metric = new Metric<>("bar", 2L); subject.set(metric); // Then verify(statfulClientProxy).ingestMetric(metric); }
@Test public void onlyPrefixedMetricsCopied() { this.reader.set(new Metric<Number>("foo.bar", 2.3)); this.reader.set(new Metric<Number>("foo.spam", 1.3)); this.reader.set(new Metric<Number>("foobar.spam", 1.3)); this.exporter.setGroups(Collections.singleton("foo")); this.exporter.export(); assertThat(Iterables.collection(this.writer.groups())).hasSize(1); }
@Test public void multiMetricGroupsCopiedAsDefault() { this.reader.set("foo", Arrays.<Metric<?>>asList(new Metric<Number>("bar", 2.3), new Metric<Number>("spam", 1.3))); this.exporter.export(); assertThat(this.writer.countGroups()).isEqualTo(1); assertThat(Iterables.collection(this.writer.findAll("foo"))).hasSize(2); }
@Bean public MetricsEndpoint endpoint() { final Metric<Float> metric = new Metric<Float>("a", 0.5f); PublicMetrics metrics = new PublicMetrics() { @Override public Collection<Metric<?>> metrics() { return Collections.<Metric<?>>singleton(metric); } }; return new MetricsEndpoint(metrics); }
@Test public void shouldNotIngestUnknownMetricProcessor() { // Given when(statfulMetricProcessor.validate(any())).thenReturn(false); // When subject.ingestMetric(new Metric<Number>("unknown", 1L)); // Then verifyNoMoreInteractions(statfulClient); }
@Test public void unprefixedMetricsNotCopied() { this.reader.set(new Metric<Number>("foo.bar", 2.3)); this.reader.set(new Metric<Number>("foo.spam", 1.3)); this.exporter.setGroups(Collections.singleton("bar")); this.exporter.export(); assertThat(Iterables.collection(this.writer.groups())).isEmpty(); }
/** * Add JVM heap metrics. * @param result the result */ protected void addHeapMetrics(Collection<Metric<?>> result) { MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean() .getHeapMemoryUsage(); result.add(newMemoryMetric("heap.committed", memoryUsage.getCommitted())); result.add(newMemoryMetric("heap.init", memoryUsage.getInit())); result.add(newMemoryMetric("heap.used", memoryUsage.getUsed())); result.add(newMemoryMetric("heap", memoryUsage.getMax())); }
@Test public void detectsCounters() throws Exception { PublicMetrics publicMetrics = () -> Collections.singleton(new Metric<Number>("counter_mem.free", 1024)); ResponseEntity<String> response = responseForMetrics(publicMetrics); String body = response.getBody(); assertThat(body, equalTo( "#TYPE mem_free counter\n" + "#HELP mem_free mem_free\n" + "mem_free 1024.0\n")); }
@Test public void detectsGauges() throws Exception { PublicMetrics publicMetrics = () -> Collections.singleton(new Metric<Number>("gauge_mem.free", 1024)); ResponseEntity<String> response = responseForMetrics(publicMetrics); String body = response.getBody(); assertThat(body, equalTo( "#TYPE mem_free gauge\n" + "#HELP mem_free mem_free\n" + "mem_free 1024.0\n")); }
private void assertHasMetric(Collection<Metric<?>> metrics, Metric<?> metric) { for (Metric<?> m : metrics) { if (m.getValue().equals(metric.getValue()) && m.getName().equals(metric.getName())) { return; } } fail("Metric " + metric.toString() + " not found in " + metrics.toString()); }
@Test public void prefixWithPeriod() { this.repository.increment(new Delta<Number>("foo.bar", 1)); Set<String> names = new HashSet<String>(); for (Metric<?> metric : this.repository.findAll("foo.")) { names.add(metric.getName()); } assertThat(names).hasSize(1); assertThat(names.contains("foo.bar")).isTrue(); }
private Collection<? extends Metric<?>> getStatistics(String name, Statistics statistic) { List<Metric<?>> metrics = new ArrayList<Metric<?>>(); metrics.add(new Metric<Double>(name + ".mean", statistic.getMean())); metrics.add(new Metric<Double>(name + ".max", statistic.getMax())); metrics.add(new Metric<Double>(name + ".min", statistic.getMin())); metrics.add( new Metric<Double>(name + ".stdev", statistic.getStandardDeviation())); metrics.add(new Metric<Long>(name + ".count", statistic.getCountLong())); return metrics; }