Skip to content

Commit

Permalink
initial commit of storm metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
RBeaudoin committed May 19, 2016
0 parents commit 4fcca97
Show file tree
Hide file tree
Showing 8 changed files with 674 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Taken from http://gary-rowe.com/agilestack/2012/10/12/a-gitignore-file-for-intellij-and-eclipse-with-maven/

# Eclipse
.classpath
.project
.settings/

# Intellij
.idea/
*.iml
*.iws

# Mac
.DS_Store

# Maven
log/
target/
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Storm Metrics

A collection of metrics and metric consumers for use with the [Storm Metrics API](http://storm.apache.org/releases/1.0.1/Metrics.html)

## Building

Storm Metrics is a Maven project and can be built and tested by running:

`mvn test`

## License

MIT
86 changes: 86 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>org.rbeaudoin</groupId>
<artifactId>storm_metrics</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Storm Metrics</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<log4j.version>1.2.16</log4j.version>
<storm.version>1.0.0</storm.version>
</properties>

<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

<!-- Storm Dependencies -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Kafka Producer Dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<inherited>true</inherited>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
100 changes: 100 additions & 0 deletions src/main/java/org/rbeaudoin/storm/metrics/KafkaMetricsConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.rbeaudoin.storm.metrics;

import com.google.gson.Gson;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.task.TopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

/**
* Implementation of IMetricsConsumer used to consume Storm toplology metrics
*/
public class KafkaMetricsConsumer implements IMetricsConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaMetricsConsumer.class);
final String brokerKey = "metrics.consumer.kafka.brokers";
final String topicKey = "metrics.consumer.kafka.topic";
final String metricPrefixKey = "metrics.consumer.metric.prefix";

String metricPrefix;
String kafkaOutputTopic;
Producer<String, String> kafkaProducer;

@Override
public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
Map<String, String> metricsConsumerConfig = (Map<String, String>) registrationArgument;

// broker and topic configuration is required
if(!metricsConsumerConfig.containsKey(brokerKey)) {
throw new IllegalArgumentException("Registration argument for metrics consumer is missing broker configuration");
} else if(!metricsConsumerConfig.containsKey(topicKey)) {
throw new IllegalArgumentException("Registration argument for metrics consumer is missing topic configuration");
} else if(!metricsConsumerConfig.containsKey(metricPrefixKey)) {
throw new IllegalArgumentException("Registration argument for metrics consumer is missing metrics prefix configuration");
}

ProducerConfig kafkaProducerProps = getKafkaProducerProps(metricsConsumerConfig);

kafkaOutputTopic = metricsConsumerConfig.get(topicKey);
metricPrefix = metricsConsumerConfig.get(metricPrefixKey);

log.info("Configuring KafkaMetricsConsumer with topic properties: {}", kafkaOutputTopic);

kafkaProducer = new Producer<>(kafkaProducerProps);
}

@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
Gson gson = new Gson();

for(DataPoint dataPoint : dataPoints) {
//only process metrics with the specified prefix, ignore other storm metrics
if(dataPoint.name.startsWith(metricPrefix)){
KafkaMetric kafkaMetric = new KafkaMetric();
kafkaMetric.metricName = dataPoint.name;
kafkaMetric.metricValue = (Map<String, Integer>) dataPoint.value;
kafkaMetric.taskInfo = taskInfo;

log.info("Processing metric: {} with value: {}", dataPoint.name, dataPoint.value);

KeyedMessage<String, String> metricProducerRecord = new KeyedMessage<>(kafkaOutputTopic, gson.toJson(kafkaMetric));
kafkaProducer.send(metricProducerRecord);
}
}
}

@Override
public void cleanup() {
kafkaProducer.close();
}

ProducerConfig getKafkaProducerProps(Map<String, String> metricsConsumerConfig) {
String kafkaBrokers;

//Get metric consumer config from registrationArgument
kafkaBrokers = metricsConsumerConfig.get(brokerKey);

log.info("Configuring KafkaMetricsConsumer with brokers: {}", kafkaBrokers);

//Setup kafka producer
Properties props = new Properties();
props.put("metadata.broker.list", kafkaBrokers);
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");

log.info("Configuring KafkaMetricsConsumer with producer properties: {}", props);

return new ProducerConfig(props);
}

static class KafkaMetric {
String metricName;
Map<String, Integer> metricValue;
TaskInfo taskInfo;
}
}
76 changes: 76 additions & 0 deletions src/main/java/org/rbeaudoin/storm/metrics/RateMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.rbeaudoin.storm.metrics;


import org.apache.storm.metric.api.IMetric;

import java.util.HashMap;
import java.util.Map;

/**
* Implementation of IMetrics used to track counts and percentages for topology events
*/
public class RateMetric implements IMetric {
Map<String, Integer> metricValues = new HashMap<>();

final String totalCountKey;
final String partialCountKey;
final String percentKey;

public RateMetric(String totalCountKey, String partialCountKey,
String percentKey) {

if(totalCountKey == null || totalCountKey.isEmpty()) {
throw new IllegalArgumentException("totalCount key must all be non-null and non-empty");
} else if (partialCountKey == null || partialCountKey.isEmpty()) {
throw new IllegalArgumentException("partialCount key must all be non-null and non-empty");
} else if (percentKey == null || percentKey.isEmpty()) {
throw new IllegalArgumentException("percentage key must all be non-null and non-empty");
}

// Keys for each value must be unique
if(totalCountKey.equals(partialCountKey) || totalCountKey.equals(percentKey)
|| partialCountKey.equals(percentKey)) {
throw new IllegalArgumentException("Keys must be different for totalCount, partialCount, and percent");
}

this.totalCountKey = totalCountKey;
this.partialCountKey = partialCountKey;
this.percentKey = percentKey;

resetMetricValues();
}

public void incrTotal() {
this.metricValues.put(this.totalCountKey, this.metricValues.get(this.totalCountKey) + 1);
}

public void incrPartial() {
this.metricValues.put(this.partialCountKey, this.metricValues.get(this.partialCountKey) + 1);
}

@Override
public Object getValueAndReset() {
Map<String, Integer> currentMetricValues = new HashMap<>();

double totalCount = this.metricValues.get(this.totalCountKey);
double partialCount = this.metricValues.get(this.partialCountKey);

int rate = (int)(( partialCount / totalCount ) * 100);
this.metricValues.put(this.percentKey, rate);

for(Map.Entry<String, Integer> entry: metricValues.entrySet()) {
currentMetricValues.put(entry.getKey(), entry.getValue());
}

resetMetricValues();

return currentMetricValues;
}

private void resetMetricValues() {
this.metricValues.clear();

metricValues.put(this.totalCountKey, 0);
metricValues.put(this.partialCountKey, 0);
}
}
Loading

0 comments on commit 4fcca97

Please sign in to comment.