diff --git a/README.markdown b/README.markdown index dbebb8d8b..0ea47236f 100644 --- a/README.markdown +++ b/README.markdown @@ -7,7 +7,7 @@ Code licensed under the Apache 2.0 license. See LICENSE file for terms. ### Background -At Yahoo we have adopted [Apache Storm](https://storm.apache.org) as our stream processing platform of choice. But that was in 2012 and the landscape has changed significantly since then. Because of this we really want to know what Storm is good at, where it needs to be improved compared to other systems, and what its limitations are compared to other tools so we can recommend the best tool for the job to our customers. To do this we started to look for stream processing benchmarks that we could use to do this evaluation, but all of them ended up lacking in several fundamental areas. Primarily they did not test anything close to a read world use case, so we decided to write a simple one. This is the first round of these tests. The tool here is not polished and only covers three tools and one specific use case. We hope to expand this in the future in terms of the tools tested, the variety of processing tested, and the metrics gathered. +At Yahoo we have adopted [Apache Storm](https://storm.apache.org) as our stream processing platform of choice. But that was in 2012 and the landscape has changed significantly since then. Because of this we really want to know what Storm is good at, where it needs to be improved compared to other systems, and what its limitations are compared to other tools so we can recommend the best tool for the job to our customers. To do this we started to look for stream processing benchmarks that we could use to do this evaluation, but all of them ended up lacking in several fundamental areas. Primarily they did not test anything close to a real world use case, so we decided to write a simple one. This is the first round of these tests. The tool here is not polished and only covers three tools and one specific use case. We hope to expand this in the future in terms of the tools tested, the variety of processing tested, and the metrics gathered. ### Setup We provide a script stream-bench.sh to setup and run the tests on a single node, and to act as an example of what to do when running the tests on a multi-node system. @@ -17,6 +17,7 @@ It takes a list of operations to perform, and options are passed into the script #### Operations * SETUP - download dependencies (Storm, Spark, Flink, Redis, and Kafka) cleans out any temp files and compiles everything * STORM_TEST - Run the test using Storm on a single node + * TRIDENT_TEST - Run the test using Storm Trident on a single node * SPARK_TEST - Run the test using Spark on a single node * FLINK_TEST - Run the test using Flink on a single node * STOP_ALL - If something goes wrong stop all processes that were launched for the test. diff --git a/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTopology.java b/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTopology.java index 60e9a93b0..33b85f3db 100644 --- a/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTopology.java +++ b/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTopology.java @@ -8,7 +8,6 @@ import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -17,25 +16,24 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; -//import backtype.storm.utils.Utils; import benchmark.common.Utils; import benchmark.common.advertising.CampaignProcessorCommon; import benchmark.common.advertising.RedisAdCampaignCache; -import java.util.Map; -import java.util.UUID; -import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import org.apache.log4j.Logger; import org.json.JSONObject; -import redis.clients.jedis.Jedis; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; +import java.util.List; +import java.util.Map; +import java.util.UUID; + + /** * This is a basic example of a Storm topology. */ @@ -149,8 +147,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { public static class CampaignProcessor extends BaseRichBolt { - private static final Logger LOG = Logger.getLogger(CampaignProcessor.class); - private OutputCollector _collector; transient private CampaignProcessorCommon campaignProcessorCommon; private String redisServerHost; @@ -180,20 +176,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { } } - private static String joinHosts(List hosts, String port) { - String joined = null; - for(String s : hosts) { - if(joined == null) { - joined = ""; - } - else { - joined += ","; - } - joined += s + ":" + port; - } - return joined; - } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); @@ -206,7 +189,7 @@ public static void main(String[] args) throws Exception { String configPath = cmd.getOptionValue("conf"); Map commonConfig = Utils.findAndReadConfigFile(configPath, true); - String zkServerHosts = joinHosts((List)commonConfig.get("zookeeper.servers"), + String zkServerHosts = StormUtils.joinHosts((List)commonConfig.get("zookeeper.servers"), Integer.toString((Integer)commonConfig.get("zookeeper.port"))); String redisServerHost = (String)commonConfig.get("redis.host"); String kafkaTopic = (String)commonConfig.get("kafka.topic"); diff --git a/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTridentTopology.java b/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTridentTopology.java new file mode 100644 index 000000000..4c3f1c476 --- /dev/null +++ b/storm-benchmarks/src/main/java/storm/benchmark/AdvertisingTridentTopology.java @@ -0,0 +1,179 @@ +package storm.benchmark; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.FailedException; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import benchmark.common.Utils; +import benchmark.common.advertising.CampaignProcessorCommon; +import benchmark.common.advertising.RedisAdCampaignCache; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.StringScheme; +import storm.kafka.ZkHosts; +import storm.kafka.trident.TransactionalTridentKafkaSpout; +import storm.kafka.trident.TridentKafkaConfig; +import storm.trident.Stream; +import storm.trident.TridentTopology; +import storm.trident.operation.BaseFilter; +import storm.trident.operation.BaseFunction; +import storm.trident.operation.TridentCollector; +import storm.trident.operation.TridentOperationContext; +import storm.trident.tuple.TridentTuple; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class AdvertisingTridentTopology { + private static final Logger LOG = LoggerFactory.getLogger(AdvertisingTridentTopology.class); + + + static class DeserializeFunction extends BaseFunction { + + static Fields outputFields = new Fields("user_id", "page_id", "ad_id", "ad_type", "event_type", "event_time", "ip_address"); + + public void execute(TridentTuple tuple, TridentCollector collector) { + JSONObject obj = new JSONObject(tuple.getString(0)); + collector.emit(new Values(obj.getString("user_id"), + obj.getString("page_id"), + obj.getString("ad_id"), + obj.getString("ad_type"), + obj.getString("event_type"), + obj.getString("event_time"), + obj.getString("ip_address"))); + } + } + + static class EventFilter extends BaseFilter { + public boolean isKeep(TridentTuple tuple) { + if (tuple.getStringByField("event_type").equals("view")) { + return true; + } else { + return false; + } + } + } + + static class RedisJoinFunction extends BaseFunction { + transient RedisAdCampaignCache redisAdCampaignCache; + private String redisServerHost; + + static Fields outputFields = new Fields("campaign_id"); + + RedisJoinFunction(String redisServerHost) { + this.redisServerHost = redisServerHost; + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + redisAdCampaignCache = new RedisAdCampaignCache(redisServerHost); + this.redisAdCampaignCache.prepare(); + } + + public void execute(TridentTuple tuple, TridentCollector collector) { + String ad_id = tuple.getStringByField("ad_id"); + String campaign_id = this.redisAdCampaignCache.execute(ad_id); + if (campaign_id == null) { + throw new FailedException(); + } + collector.emit(new Values(campaign_id)); + } + } + + + static class CampaignProcessorFunction extends BaseFunction { + transient private CampaignProcessorCommon campaignProcessorCommon; + private String redisServerHost; + + CampaignProcessorFunction(String redisServerHost) { + this.redisServerHost = redisServerHost; + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + campaignProcessorCommon = new CampaignProcessorCommon(redisServerHost); + this.campaignProcessorCommon.prepare(); + } + + public void execute(TridentTuple tuple, TridentCollector collector) { + String campaign_id = tuple.getStringByField("campaign_id"); + String event_time = tuple.getStringByField("event_time"); + this.campaignProcessorCommon.execute(campaign_id, event_time); + } + } + + public static void main(String[] args) throws Exception { + + Options opts = new Options(); + opts.addOption("conf", true, "Path to the config file."); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(opts, args); + String configPath = cmd.getOptionValue("conf"); + Map commonConfig = Utils.findAndReadConfigFile(configPath, true); + + String zkServerHosts = StormUtils.joinHosts((List) commonConfig.get("zookeeper.servers"), + Integer.toString((Integer) commonConfig.get("zookeeper.port"))); + String redisServerHost = (String) commonConfig.get("redis.host"); + String kafkaTopic = (String) commonConfig.get("kafka.topic"); + int kafkaPartitions = ((Number) commonConfig.get("kafka.partitions")).intValue(); + int workers = ((Number) commonConfig.get("storm.workers")).intValue(); + int ackers = ((Number) commonConfig.get("storm.ackers")).intValue(); + int cores = ((Number) commonConfig.get("process.cores")).intValue(); + int parallel = Math.max(1, cores / 7); + + ZkHosts hosts = new ZkHosts(zkServerHosts); + + TridentKafkaConfig spoutConfig = new TridentKafkaConfig(hosts, kafkaTopic, UUID.randomUUID().toString()); + spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); + + TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(spoutConfig); + + TridentTopology topology = new TridentTopology(); + + Stream stream = topology.newStream("benchmark", kafkaSpout); + + Fields projectFields = new Fields("ad_id", "event_time"); + + stream + .parallelismHint(kafkaPartitions) + .shuffle() + // deserialize + .each(new Fields("str"), new DeserializeFunction(), DeserializeFunction.outputFields) + // event filter + .each(DeserializeFunction.outputFields, new EventFilter()) + // project + .project(projectFields) + // redis join + .each(projectFields, new RedisJoinFunction(redisServerHost), RedisJoinFunction.outputFields) + .parallelismHint(parallel * 4) + .partitionBy(new Fields("campaign_id")) + // campaign processor + .each(new Fields("ad_id", "event_time", "campaign_id"), new CampaignProcessorFunction(redisServerHost), new Fields()) + .parallelismHint(parallel * 2); + + + Config conf = new Config(); + + if (args != null && args.length > 0) { + conf.setNumWorkers(workers); + conf.setNumAckers(ackers); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, topology.build()); + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, topology.build()); + backtype.storm.utils.Utils.sleep(10000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} diff --git a/storm-benchmarks/src/main/java/storm/benchmark/StormUtils.java b/storm-benchmarks/src/main/java/storm/benchmark/StormUtils.java new file mode 100644 index 000000000..147282e22 --- /dev/null +++ b/storm-benchmarks/src/main/java/storm/benchmark/StormUtils.java @@ -0,0 +1,22 @@ +package storm.benchmark; + +import java.util.List; + +public class StormUtils { + private StormUtils(){} + + static String joinHosts(List hosts, String port) { + String joined = null; + for(String s : hosts) { + if(joined == null) { + joined = ""; + } + else { + joined += ","; + } + + joined += s + ":" + port; + } + return joined; + } +} diff --git a/stream-bench.sh b/stream-bench.sh index d2cb3d57c..abac12f49 100755 --- a/stream-bench.sh +++ b/stream-bench.sh @@ -218,6 +218,10 @@ run() { then "$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE sleep 15 + elif [ "START_TRIDENT_TOPOLOGY" = "$OPERATION" ]; + then + "$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTridentTopology test-topo -conf $CONF_FILE + sleep 15 elif [ "STOP_STORM_TOPOLOGY" = "$OPERATION" ]; then "$STORM_DIR/bin/storm" kill -w 0 test-topo || true @@ -258,6 +262,21 @@ run() { run "STOP_KAFKA" run "STOP_REDIS" run "STOP_ZK" + elif [ "TRIDENT_TEST" = "$OPERATION" ]; + then + run "START_ZK" + run "START_REDIS" + run "START_KAFKA" + run "START_STORM" + run "START_TRIDENT_TOPOLOGY" + run "START_LOAD" + sleep $TEST_TIME + run "STOP_LOAD" + run "STOP_STORM_TOPOLOGY" + run "STOP_STORM" + run "STOP_KAFKA" + run "STOP_REDIS" + run "STOP_ZK" elif [ "FLINK_TEST" = "$OPERATION" ]; then run "START_ZK" @@ -324,6 +343,7 @@ run() { echo "STOP_SPARK: kill spark processes" echo echo "START_STORM_TOPOLOGY: run the storm test topology" + echo "START_TRIDENT_TOPOLOGY: run the storm test topology" echo "STOP_STORM_TOPOLOGY: kill the storm test topology" echo "START_FLINK_PROCESSING: run the flink test processing" echo "STOP_FLINK_PROCESSSING: kill the flink test processing" @@ -331,6 +351,7 @@ run() { echo "STOP_SPARK_PROCESSSING: kill the spark test processing" echo echo "STORM_TEST: run storm test (assumes SETUP is done)" + echo "TRIDENT_TEST: run storm trident test (assumes SETUP is done)" echo "FLINK_TEST: run flink test (assumes SETUP is done)" echo "SPARK_TEST: run spark test (assumes SETUP is done)" echo "STOP_ALL: stop everything"