-
Notifications
You must be signed in to change notification settings - Fork 0
Home
By
- Melika Sajadian
- Panagiotis Arapakis
- Rob de Groot
This blogpost is written to present our findings while working through the second lab for the course ET4310 - Supercomputing for Big Data at the faculty Electrical Engineering, Mathematics and Computer Science at TU Delft. The goal of the lab was to scale a previously-made application to analyze the entire Global Database of Events, Language and Tone 2.0 (GDELT) dataset, summarizing the top ten discussed topics for each day from the 19th of February 2015 up until the current date.
In order to achieve the goal of this lab, a script has been written in Scala, which is executed through Apache’s Spark, which is run through several Amazon’s AWS clusters, making use of 9 to 20 nodes at a time. The main goal is to create a functioning application that processes the entire dataset in under 30 minutes, when this is achieved the next goal is to optimize the application to be as cheap as possible, while it still processes everything in under 30 minutes.
For those interested: All benchmark graphs and the Scala script that we used can be found in the Appendix at the bottom of this page. We'd first like to go through the resources used, our initial results and our further optimizations based on improvements that we found out about on-the-go.
- Amazon S3, Amazon's object based storage system which is compatible with Amazons services.
- Amazon EC2, Elastic Cloud Compute allowsx access to Amazon's different machines.
- Amazon EMR, Elastig Map Reduce is a layer that allows deployment of MapReduce applications like Spark.
- Foxy-Proxy, a Google Chrome extension for VPN services.
- PuTTY, an SSH client that we used to connect to Ganglia services
Number of nodes | Cost/Hour (€) | Time (min) | TotalCost |
---|---|---|---|
9 | € 0.60 | 22.8 | € 2.10 |
12 | € 0.60 | 19.5 | € 2.35 |
15 | € 0.60 | 15.5 | € 2.32 |
16 | € 0.60 | 14.8 | € 2.37 |
18 | € 0.60 | 14.9 | € 2.69 |
19 | € 0.60 | 14.7 | € 2.79 |
20 | € 0.60 | 14.5 | € 2.90 |
Time vs Number of Nodes | Cost vs Number of Nodes |
---|---|
Our benchmark led us to believe that 9 nodes is the optimal solution for the data analysis. If you were to look at the plot on the left, you might think wouldn't 8 nodes still manage to work under 30 minutes?, and we thought the same thing. However, when using only 8 nodes, the analysis wouldn't complete as there were cases of memory overload on certain cores.
This benchmark also gives insight into the effects of scaling the number of nodes, you can see that increasing the number of nodes has a big effect at lower number of nodes used, but has a decreasingly small effect when past a certain number of nodes.
CPU Usage - Cluster with 9 nodes | CPU Usage - Cluster with 20 nodes |
---|---|
The CPU usage graphs above cleary depict that the cluster with 9 nodes' CPU usage doesn't drop below 30%, whereas the cluster with 20 nodes' CPU usage drops below 20%.
By monitoring “Cluster Node Metric” on the Yarn interface, we observed that every time only 9 Vcores of each node is used and 27 Vcores were available. Initially, we thought this means that the cluster is not working optimally.
So, we searched for documents on how AWS allocates memory and cores. Unfortunately, we could not find an explanation. Therefore, we tried to manually allocate memory, number of cores and executors for our spark application. The implementation is quite straight forward: when creating the steps, on spark-submit field, we specify the number of executors (x), number of cores (y), and memory (z), as follows:
--class <CLASS_NAME> --master yarn --num-executors x --executor-cores y --executor-memory z |
---|
In order to determine x, y and z, five essentially important considerations need to be taken into account before calculating x, y and z. These considerations stem from three sources; spoddotur, Richa Kandelwal and Omid Vahdaty:
- As the spark application is using YARN as its cluster manager, some background processes are running, such as NameNode, Secondary NameNode, DataNode, JobTracker and TaskTracker. Therefore, it is essential to allocate 1 core per node to these processes.
- YARN ApplicationMaster which negotiates resources from the ResourceManager and monitors resource consumption needs around 1024 Mb of memory plus one executor.
- When using spark with YARN we should be aware that full memory requested to YARN per executor equals to spark-executor-memory + spark.yarn.executor.memoryOverhead. This overhead is the Max(384MB, 7% of spark.executor-memory).
- The total memory that is available on each node for YARN Containers which will ultimately be used to allocate memory to executors is set by EMR. Therefore, the real memory storage is usually less than the nominal memory storage. For example c4.8xLarge, the nominal memory is 60 Gb, however, based on EMR documentation , it is around 53 Gb. But, then if we check total memory on YARN UI, we see it is even less and is around 52 Gb per node.
- When considering number of nodes, we should exclude the master node.
Taking considerations 1, 2, 4, and 5 into account, we have 19 instances of c4.8xlarge with 35 vCores and 51 Gb per node. But still with 51 Gb per node, we get memory overload in practice, so we took 50 Gb per node. Before determining the optimized configuration, let’s take extreme configurations into account and see the issues associated with each of them:
- For each node, we consider the highest possible number of executors which is 35 (in total 35×19 is the number of executors). So, we have 1 core per executor and memory of (50/35)×1024 – 384 ≈ 1 Gb. This is not a good strategy, because we cannot run multiple tasks in the same JVM.
- For each node, we consider the highest number of cores per executor, i.e. 35 cores. As such, the number of executors is 1 executor per node and (50/1)-(50/1)x0.07 ≈ 46 Gb. This is also a bad strategy because HDFS throughput has problems with running many concurrent threads and by experience executors running with more than 5 concurrent tasks may lead to poor HDFS I/O throughput.
So, the optimized solution is somewhere in between. In this second benchmark, we tried different numbers of cores per executor. It should be noted that the number of used vCores shown on YARN UI is misleading, since it shows the number of executors per node. Now, let’s see an example: for 5 core per executor we should have 7 executors per node and 7×19 = 133 in total. Leaving one executor for the driver, we have 132 executors. The memory is then calculated as ((50/7)-(50/7)×0.07)×1024 = 6800 Mb. It should be noted that, even with these calculations, sometimes the step fails because of memory overload. In this cases, we used trial and error around the estimated memory to get the completed step. Table 3 shows our five experiments. So, looking at first two rows, we see indeed having more than 5 cores per executor is not optimal at all. The second row shows the optimal configuration. And the third row which is quite similar to what EMR does by default, i.e. 9 executors per node with 4 cores (but even more optimized). This configuration and the configuration after that are not also optimized, since it gets closer to extreme case number 1.
spark.dynamicAllocation is not enabled in Spark by default, however it is automatically enabled on EMR clusters So apparently, up to now we were benefiting from this automatic optimization. EMR provides another automatic approaches for optimization: by setting the maximizeResourceAllocation to true when configuring the cluster (Edit software settings), EMR automatically does the optimization of resources: [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ] But what we experienced, is that it is not optimal at all. Since it is a configuration which is closer to extreme case number 2 in which the number of executors is 1 and based on Richa Kandelwal, “it can potentially waste the cluster resources as it sets the default parallelism to 2 x number of CPU cores available to YARN containers”.
There are quite some other options that we did not try. One, for example, would be changing the type of instances. Actually we had that in mind, and requested for 40 instances of m4.2xlarge, which has approximately half of the resource capacity compared to c4.8xLarge. But unfortunately we got the increased limit a few hours before the deadline. Additionally, our credits ran out so we could not test further. We also wanted to test the different configurations for 9 nodes and see if the combination of both optimized number of nodes and optimized resource configuration would work, but we could not test this as well. We also wanted to test application on the client mode. The benefit of the client mode is that we can run the driver on the master node. In conclusion, the minimum number of 9 nodes was required to run the application under 30 minutes with minimum cost of 2.1 $. Also, if we are to run the application with 20 nodes, then the optimized configuration would be 7 executors per node, each having 5 cores, and 6800 Mb of memory.
In this section we will display all graphs from the various benchmarks that have been done for this lab. The first set of benchmarks are about cluster optimization based on the number of nodes.
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
The second set of benchmarks are done with the 20-nodes setup. Our optimizations here are based on different configurations, using the three parameters: number of executors, number of cores and memory.
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
CPU | Memory |
---|---|
Load | Network |
package RDDimplementation
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
object RddImplement{
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.FATAL)
val conf = new SparkConf().setAppName("RddImplement")
val sc = new SparkContext(conf) // If you need SparkContext object
val raw_data = sc.textFile("s3n://gdelt-open-data/v2/gkg/2015021*.gkg.csv").filter(_.split("\\\t").length>23)
val splitRDD = raw_data.map(_.split("\\\t",-1))
val DateTopic = splitRDD.map(a => (a(1), a(23)))
val rdd = DateTopic.map(a => (s"${a._1.slice(0,4)}-${a._1.slice(4,6)}-${a._1.slice(6,8)}", a._2))
val grouped = rdd.groupByKey()
val topics = grouped.map(x => (x._1, x._2.map(_.split(';'))))
val topicsFiltered = topics.map(x => (x._1, x._2.flatMap(_.map(_.split(",")(0)))))
val mapped = topicsFiltered.map(x => (x._1, x._2.map(word => (word, 1))))
val reduced = mapped.map(x => (x._1, x._2.groupBy(_._1).mapValues(_.map(_._2).sum ).toList))
val sorted = reduced.map(x => (x._1, x._2.filterNot(_._1 == "").filterNot(_._1.contains("ParentCategory")).sortBy(- _._2).take(10)))
sorted.foreach(println)
sorted.saveAsTextFile("s3://prayer14/results")
sc.stop
}
}