Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add benchmark for Storm Trident API #5

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Code licensed under the Apache 2.0 license. See LICENSE file for terms.

### Background
At Yahoo we have adopted [Apache Storm](https://storm.apache.org) as our stream processing platform of choice. But that was in 2012 and the landscape has changed significantly since then. Because of this we really want to know what Storm is good at, where it needs to be improved compared to other systems, and what its limitations are compared to other tools so we can recommend the best tool for the job to our customers. To do this we started to look for stream processing benchmarks that we could use to do this evaluation, but all of them ended up lacking in several fundamental areas. Primarily they did not test anything close to a read world use case, so we decided to write a simple one. This is the first round of these tests. The tool here is not polished and only covers three tools and one specific use case. We hope to expand this in the future in terms of the tools tested, the variety of processing tested, and the metrics gathered.
At Yahoo we have adopted [Apache Storm](https://storm.apache.org) as our stream processing platform of choice. But that was in 2012 and the landscape has changed significantly since then. Because of this we really want to know what Storm is good at, where it needs to be improved compared to other systems, and what its limitations are compared to other tools so we can recommend the best tool for the job to our customers. To do this we started to look for stream processing benchmarks that we could use to do this evaluation, but all of them ended up lacking in several fundamental areas. Primarily they did not test anything close to a real world use case, so we decided to write a simple one. This is the first round of these tests. The tool here is not polished and only covers three tools and one specific use case. We hope to expand this in the future in terms of the tools tested, the variety of processing tested, and the metrics gathered.

### Setup
We provide a script stream-bench.sh to setup and run the tests on a single node, and to act as an example of what to do when running the tests on a multi-node system.
Expand All @@ -17,6 +17,7 @@ It takes a list of operations to perform, and options are passed into the script
#### Operations
* SETUP - download dependencies (Storm, Spark, Flink, Redis, and Kafka) cleans out any temp files and compiles everything
* STORM_TEST - Run the test using Storm on a single node
* TRIDENT_TEST - Run the test using Storm Trident on a single node
* SPARK_TEST - Run the test using Spark on a single node
* FLINK_TEST - Run the test using Flink on a single node
* STOP_ALL - If something goes wrong stop all processes that were launched for the test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
Expand All @@ -17,25 +16,24 @@
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//import backtype.storm.utils.Utils;
import benchmark.common.Utils;
import benchmark.common.advertising.CampaignProcessorCommon;
import benchmark.common.advertising.RedisAdCampaignCache;
import java.util.Map;
import java.util.UUID;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.log4j.Logger;
import org.json.JSONObject;
import redis.clients.jedis.Jedis;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

import java.util.List;
import java.util.Map;
import java.util.UUID;


/**
* This is a basic example of a Storm topology.
*/
Expand Down Expand Up @@ -149,8 +147,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

public static class CampaignProcessor extends BaseRichBolt {

private static final Logger LOG = Logger.getLogger(CampaignProcessor.class);

private OutputCollector _collector;
transient private CampaignProcessorCommon campaignProcessorCommon;
private String redisServerHost;
Expand Down Expand Up @@ -180,20 +176,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

private static String joinHosts(List<String> hosts, String port) {
String joined = null;
for(String s : hosts) {
if(joined == null) {
joined = "";
}
else {
joined += ",";
}

joined += s + ":" + port;
}
return joined;
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
Expand All @@ -206,7 +189,7 @@ public static void main(String[] args) throws Exception {
String configPath = cmd.getOptionValue("conf");
Map commonConfig = Utils.findAndReadConfigFile(configPath, true);

String zkServerHosts = joinHosts((List<String>)commonConfig.get("zookeeper.servers"),
String zkServerHosts = StormUtils.joinHosts((List<String>)commonConfig.get("zookeeper.servers"),
Integer.toString((Integer)commonConfig.get("zookeeper.port")));
String redisServerHost = (String)commonConfig.get("redis.host");
String kafkaTopic = (String)commonConfig.get("kafka.topic");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package storm.benchmark;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.FailedException;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import benchmark.common.Utils;
import benchmark.common.advertising.CampaignProcessorCommon;
import benchmark.common.advertising.RedisAdCampaignCache;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

import java.util.List;
import java.util.Map;
import java.util.UUID;

public class AdvertisingTridentTopology {
private static final Logger LOG = LoggerFactory.getLogger(AdvertisingTridentTopology.class);


static class DeserializeFunction extends BaseFunction {

static Fields outputFields = new Fields("user_id", "page_id", "ad_id", "ad_type", "event_type", "event_time", "ip_address");

public void execute(TridentTuple tuple, TridentCollector collector) {
JSONObject obj = new JSONObject(tuple.getString(0));
collector.emit(new Values(obj.getString("user_id"),
obj.getString("page_id"),
obj.getString("ad_id"),
obj.getString("ad_type"),
obj.getString("event_type"),
obj.getString("event_time"),
obj.getString("ip_address")));
}
}

static class EventFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
if (tuple.getStringByField("event_type").equals("view")) {
return true;
} else {
return false;
}
}
}

static class RedisJoinFunction extends BaseFunction {
transient RedisAdCampaignCache redisAdCampaignCache;
private String redisServerHost;

static Fields outputFields = new Fields("campaign_id");

RedisJoinFunction(String redisServerHost) {
this.redisServerHost = redisServerHost;
}

@Override
public void prepare(Map conf, TridentOperationContext context) {
redisAdCampaignCache = new RedisAdCampaignCache(redisServerHost);
this.redisAdCampaignCache.prepare();
}

public void execute(TridentTuple tuple, TridentCollector collector) {
String ad_id = tuple.getStringByField("ad_id");
String campaign_id = this.redisAdCampaignCache.execute(ad_id);
if (campaign_id == null) {
throw new FailedException();
}
collector.emit(new Values(campaign_id));
}
}


static class CampaignProcessorFunction extends BaseFunction {
transient private CampaignProcessorCommon campaignProcessorCommon;
private String redisServerHost;

CampaignProcessorFunction(String redisServerHost) {
this.redisServerHost = redisServerHost;
}

@Override
public void prepare(Map conf, TridentOperationContext context) {
campaignProcessorCommon = new CampaignProcessorCommon(redisServerHost);
this.campaignProcessorCommon.prepare();
}

public void execute(TridentTuple tuple, TridentCollector collector) {
String campaign_id = tuple.getStringByField("campaign_id");
String event_time = tuple.getStringByField("event_time");
this.campaignProcessorCommon.execute(campaign_id, event_time);
}
}

public static void main(String[] args) throws Exception {

Options opts = new Options();
opts.addOption("conf", true, "Path to the config file.");

CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(opts, args);
String configPath = cmd.getOptionValue("conf");
Map commonConfig = Utils.findAndReadConfigFile(configPath, true);

String zkServerHosts = StormUtils.joinHosts((List<String>) commonConfig.get("zookeeper.servers"),
Integer.toString((Integer) commonConfig.get("zookeeper.port")));
String redisServerHost = (String) commonConfig.get("redis.host");
String kafkaTopic = (String) commonConfig.get("kafka.topic");
int kafkaPartitions = ((Number) commonConfig.get("kafka.partitions")).intValue();
int workers = ((Number) commonConfig.get("storm.workers")).intValue();
int ackers = ((Number) commonConfig.get("storm.ackers")).intValue();
int cores = ((Number) commonConfig.get("process.cores")).intValue();
int parallel = Math.max(1, cores / 7);

ZkHosts hosts = new ZkHosts(zkServerHosts);

TridentKafkaConfig spoutConfig = new TridentKafkaConfig(hosts, kafkaTopic, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(spoutConfig);

TridentTopology topology = new TridentTopology();

Stream stream = topology.newStream("benchmark", kafkaSpout);

Fields projectFields = new Fields("ad_id", "event_time");

stream
.parallelismHint(kafkaPartitions)
.shuffle()
// deserialize
.each(new Fields("str"), new DeserializeFunction(), DeserializeFunction.outputFields)
// event filter
.each(DeserializeFunction.outputFields, new EventFilter())
// project
.project(projectFields)
// redis join
.each(projectFields, new RedisJoinFunction(redisServerHost), RedisJoinFunction.outputFields)
.parallelismHint(parallel * 4)
.partitionBy(new Fields("campaign_id"))
// campaign processor
.each(new Fields("ad_id", "event_time", "campaign_id"), new CampaignProcessorFunction(redisServerHost), new Fields())
.parallelismHint(parallel * 2);


Config conf = new Config();

if (args != null && args.length > 0) {
conf.setNumWorkers(workers);
conf.setNumAckers(ackers);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, topology.build());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, topology.build());
backtype.storm.utils.Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
22 changes: 22 additions & 0 deletions storm-benchmarks/src/main/java/storm/benchmark/StormUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package storm.benchmark;

import java.util.List;

public class StormUtils {
private StormUtils(){}

static String joinHosts(List<String> hosts, String port) {
String joined = null;
for(String s : hosts) {
if(joined == null) {
joined = "";
}
else {
joined += ",";
}

joined += s + ":" + port;
}
return joined;
}
}
20 changes: 20 additions & 0 deletions stream-bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ run() {
then
"$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE
sleep 15
elif [ "START_TRIDENT_TOPOLOGY" = "$OPERATION" ];
then
"$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTridentTopology test-topo -conf $CONF_FILE
sleep 15
elif [ "STOP_STORM_TOPOLOGY" = "$OPERATION" ];
then
"$STORM_DIR/bin/storm" kill -w 0 test-topo || true
Expand Down Expand Up @@ -258,6 +262,21 @@ run() {
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
elif [ "TRIDENT_TEST" = "$OPERATION" ];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this to the help, echos below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

then
run "START_ZK"
run "START_REDIS"
run "START_KAFKA"
run "START_STORM"
run "START_TRIDENT_TOPOLOGY"
run "START_LOAD"
sleep $TEST_TIME
run "STOP_LOAD"
run "STOP_STORM_TOPOLOGY"
run "STOP_STORM"
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
elif [ "FLINK_TEST" = "$OPERATION" ];
then
run "START_ZK"
Expand Down Expand Up @@ -324,6 +343,7 @@ run() {
echo "STOP_SPARK: kill spark processes"
echo
echo "START_STORM_TOPOLOGY: run the storm test topology"
echo "START_TRIDENT_TOPOLOGY: run the storm test topology"
echo "STOP_STORM_TOPOLOGY: kill the storm test topology"
echo "START_FLINK_PROCESSING: run the flink test processing"
echo "STOP_FLINK_PROCESSSING: kill the flink test processing"
Expand Down