Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR #244, continued #256

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spark/src/cgcloud/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
def roles( ):
from cgcloud.spark.spark_box import SparkBox, SparkSlave, SparkMaster
from cgcloud.spark.spark_box import SparkBox, Spark2Box, SparkSlave, SparkMaster
return sorted( locals( ).values( ), key=lambda cls: cls.__name__ )


Expand Down
36 changes: 27 additions & 9 deletions spark/src/cgcloud/spark/spark_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
persistent_dir = '/mnt/persistent'
var_dir = '/var/lib/sparkbox'
hdfs_replication = 1
hadoop_version = '2.6.0'
spark_version = '1.6.2'
# The major version of Hadoop that the Spark binaries were built against
spark_hadoop_version = '2.6'

Service = namedtuple( 'Service', [
'init_name',
Expand Down Expand Up @@ -82,6 +78,11 @@ class SparkBox( ApacheSoftwareBox,
typically seconds after the reservation has been submitted, the slaves can be started up.
"""

hadoop_version = '2.6.0'
spark_version = '1.6.2'
# The major version of Hadoop that the Spark binaries were built against
spark_hadoop_version = '2.6'

@classmethod
def get_role_options( cls ):
return super( SparkBox, cls ).get_role_options( ) + [
Expand All @@ -107,7 +108,11 @@ def _populate_security_group( self, group_id ):
dict( ip_protocol='tcp', from_port=0, to_port=65535,
src_security_group_group_id=group_id ),
dict( ip_protocol='udp', from_port=0, to_port=65535,
src_security_group_group_id=group_id ) ]
src_security_group_group_id=group_id ),
dict( ip_protocol='tcp', from_port=8080, to_port=8081,
cidr_ip='0.0.0.0/0' ),
dict( ip_protocol='tcp', from_port=4040, to_port=4045,
cidr_ip='0.0.0.0/0' )]

def _get_iam_ec2_role( self ):
iam_role_name, policies = super( SparkBox, self )._get_iam_ec2_role( )
Expand Down Expand Up @@ -189,7 +194,7 @@ def __ec2_keypair_name( self, ctx ):
@fabric_task
def __install_hadoop( self ):
# Download and extract Hadoop
path = fmt( 'hadoop/common/hadoop-{hadoop_version}/hadoop-{hadoop_version}.tar.gz' )
path = fmt( 'hadoop/common/hadoop-{self.hadoop_version}/hadoop-{self.hadoop_version}.tar.gz' )
self._install_apache_package( path, install_dir )

# Add environment variables to hadoop_env.sh
Expand Down Expand Up @@ -258,7 +263,7 @@ def __to_hadoop_xml_config( properties ):
@fabric_task
def __install_spark( self ):
# Download and extract Spark
path = fmt( 'spark/spark-{spark_version}/spark-{spark_version}-bin-hadoop{spark_hadoop_version}.tgz' )
path = fmt( 'spark/spark-{self.spark_version}/spark-{self.spark_version}-bin-hadoop{self.spark_hadoop_version}.tgz' )
self._install_apache_package( path, install_dir )

spark_dir = var_dir + "/spark"
Expand All @@ -271,8 +276,9 @@ def __install_spark( self ):
SPARK_WORKER_DIR=self._lazy_mkdir( spark_dir, "work" ),
SPARK_LOCAL_DIRS=self._lazy_mkdir( spark_dir, "local" ),
JAVA_HOME='/usr/lib/jvm/java-8-oracle',
SPARK_MASTER_IP='spark-master',
HADOOP_CONF_DIR=fmt( "{install_dir}/hadoop/etc/hadoop" ) )
SPARK_MASTER_HOST='spark-master',
HADOOP_CONF_DIR=fmt( "{install_dir}/hadoop/etc/hadoop" ),
SPARK_PUBLIC_DNS="$(curl -s http://169.254.169.254/latest/meta-data/public-hostname)" )
with remote_open( spark_env_sh_path, use_sudo=True ) as spark_env_sh:
spark_env_sh.write( '\n' )
for name, value in spark_env.iteritems( ):
Expand Down Expand Up @@ -430,6 +436,18 @@ def __setup_path( self ):
# Spark's and Hadoop's sbin
f.write( fmt( 'PATH="$PATH:{install_dir}/{package}/bin"\n' ) )

class Spark2Box( SparkBox ):
"""
A node in a Spark v2.x cluster; used only to create an image for master and worker boxes
"""

hadoop_version = '2.7.3'
spark_version = '2.1.0'
# The major version of Hadoop that the Spark binaries were built against
spark_hadoop_version = '2.7'

def __init__( self, ctx ):
super( Spark2Box, self ).__init__( ctx )

class SparkMaster( SparkBox, ClusterLeader ):
"""
Expand Down
45 changes: 38 additions & 7 deletions spark/src/cgcloud/spark/test/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,49 @@
import time
import logging
import unittest
from abc import ABCMeta, abstractmethod

from cgcloud.core.test import CoreTestCase
from cgcloud.spark.spark_box import install_dir, SparkBox, SparkMaster, SparkSlave
from cgcloud.spark.spark_box import install_dir, SparkBox, Spark2Box, SparkMaster, SparkSlave

log = logging.getLogger( __name__ )

master = SparkMaster.role( )
slave = SparkSlave.role( )
node = SparkBox.role( )

num_slaves = 2


class SparkClusterTests( CoreTestCase ):
class BaseSparkClusterTests( CoreTestCase ):
"""
Covers the creation of a Spark cluster from scratch and running a simple Spark job on it.
Also covers persistant HDFS between two cluster incarnations.
"""
__metaclass__ = ABCMeta

node = NotImplemented
cleanup = True
create_image = True

@classmethod
@abstractmethod
def initNode( cls ):
raise NotImplementedError("Abstract method")

@classmethod
def setUpClass( cls ):
cls.initNode
os.environ[ 'CGCLOUD_PLUGINS' ] = 'cgcloud.spark'
super( SparkClusterTests, cls ).setUpClass( )
super( BaseSparkClusterTests, cls ).setUpClass( )
if cls.create_image:
cls._cgcloud( 'create', node, '-IT' )
cls._cgcloud( 'create', cls.node, '-IT' )

@classmethod
def tearDownClass( cls ):
if cls.cleanup and cls.create_image:
cls._cgcloud( 'delete-image', node )
super( SparkClusterTests, cls ).tearDownClass( )
cls._cgcloud( 'delete-image', cls.node )
super( BaseSparkClusterTests, cls ).tearDownClass( )


def test_wordcount( self ):
self._create_cluster( )
Expand Down Expand Up @@ -131,3 +141,24 @@ def word_count( ):

def _delete_volumes( self ):
pass

class SparkClusterTests( BaseSparkClusterTests ):
"""
Covers the creation of a Spark v1.x cluster from scratch and running a simple Spark job on it.
Also covers persistant HDFS between two cluster incarnations.
"""

@classmethod
def initNode( cls ):
cls.node = SparkBox.role( )


class Spark2ClusterTests( BaseSparkClusterTests ):
"""
Covers the creation of a Spark v2.x cluster from scratch and running a simple Spark job on it.
Also covers persistant HDFS between two cluster incarnations.
"""

@classmethod
def initNode( cls ):
cls.node = Spark2Box.role( )