forked from nchammas/flintrock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_acceptance.py
159 lines (124 loc) · 5.25 KB
/
test_acceptance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import json
import subprocess
import urllib.request
# Flintrock modules
from flintrock.exceptions import ClusterInvalidState
def test_describe_stopped_cluster(stopped_cluster):
p = subprocess.run([
'flintrock', 'describe', stopped_cluster],
stdout=subprocess.PIPE)
assert p.returncode == 0
assert p.stdout.startswith(stopped_cluster.encode())
def test_stop_stopped_cluster(stopped_cluster):
p = subprocess.run([
'flintrock', 'stop', stopped_cluster],
stdout=subprocess.PIPE)
assert p.returncode == 0
assert p.stdout == b"Cluster is already stopped.\n"
def test_try_launching_duplicate_stopped_cluster(stopped_cluster):
p = subprocess.run([
'flintrock', 'launch', stopped_cluster],
stderr=subprocess.PIPE)
assert p.returncode == 1
assert p.stderr.decode('utf-8').startswith(
"Cluster {c} already exists".format(c=stopped_cluster))
def test_start_running_cluster(running_cluster):
p = subprocess.run([
'flintrock', 'start', running_cluster],
stdout=subprocess.PIPE)
assert p.returncode == 0
assert p.stdout == b"Cluster is already running.\n"
def test_try_launching_duplicate_cluster(running_cluster):
p = subprocess.run([
'flintrock', 'launch', running_cluster],
stderr=subprocess.PIPE)
assert p.returncode == 1
assert p.stderr.decode('utf-8').startswith(
"Cluster {c} already exists".format(c=running_cluster))
def test_describe_running_cluster(running_cluster):
p = subprocess.run([
'flintrock', 'describe', running_cluster],
stdout=subprocess.PIPE)
assert p.returncode == 0
assert p.stdout.startswith(running_cluster.encode())
def test_run_command_on_running_cluster(running_cluster):
p = subprocess.run([
'flintrock', 'run-command', running_cluster, '--', 'ls', '-l'])
assert p.returncode == 0
def test_copy_file_on_running_cluster(running_cluster, local_file):
p = subprocess.run([
'flintrock', 'copy-file', running_cluster, local_file, '/tmp/copied_from_local'])
assert p.returncode == 0
def test_hdfs_on_running_cluster(running_cluster, remote_file):
hdfs_path = '/hdfs_file'
p = subprocess.run([
'flintrock', 'run-command', running_cluster, '--master-only', '--',
'./hadoop/bin/hdfs', 'dfs', '-put', remote_file, hdfs_path])
assert p.returncode == 0
p = subprocess.run([
'flintrock', 'run-command', running_cluster, '--',
'./hadoop/bin/hdfs', 'dfs', '-cat', hdfs_path])
assert p.returncode == 0
def test_spark_on_running_cluster(running_cluster, remote_file):
# TODO: Run a real query; e.g. sc.parallelize(range(10)).count()
p = subprocess.run([
'flintrock', 'run-command', running_cluster, '--',
'./spark/bin/pyspark', '--help'])
assert p.returncode == 0
p = subprocess.run([
'flintrock', 'describe', running_cluster, '--master-hostname-only'],
stdout=subprocess.PIPE)
master_address = p.stdout.strip().decode('utf-8')
assert p.returncode == 0
spark_master_ui = 'http://{m}:8080/json/'.format(m=master_address)
spark_ui_info = json.loads(
urllib.request.urlopen(spark_master_ui).read().decode('utf-8'))
assert spark_ui_info['status'] == 'ALIVE'
def test_operations_against_non_existent_cluster():
cluster_name = 'this_cluster_doesnt_exist_yo'
expected_error_message = (
b"No cluster " + cluster_name.encode('utf-8') + b" in region ")
for command in ['describe', 'stop', 'start', 'login', 'destroy']:
p = subprocess.run(
['flintrock', command, cluster_name],
stderr=subprocess.PIPE)
assert p.returncode == 1
assert p.stderr.startswith(expected_error_message)
for command in ['run-command']:
p = subprocess.run(
['flintrock', command, cluster_name, 'ls'],
stderr=subprocess.PIPE)
assert p.returncode == 1
assert p.stderr.startswith(expected_error_message)
for command in ['copy-file']:
p = subprocess.run(
['flintrock', command, cluster_name, __file__, '/remote/path'],
stderr=subprocess.PIPE)
assert p.returncode == 1
assert p.stderr.startswith(expected_error_message)
def test_operations_against_stopped_cluster(stopped_cluster):
p = subprocess.run(
['flintrock', 'run-command', stopped_cluster, 'ls'],
stderr=subprocess.PIPE)
expected_error_message = str(
ClusterInvalidState(
attempted_command='run-command',
state='stopped'))
assert p.returncode == 1
assert p.stderr.decode('utf-8').strip() == expected_error_message
p = subprocess.run(
['flintrock', 'copy-file', stopped_cluster, __file__, '/remote/path'],
stderr=subprocess.PIPE)
expected_error_message = str(
ClusterInvalidState(
attempted_command='copy-file',
state='stopped'))
assert p.returncode == 1
assert p.stderr.decode('utf-8').strip() == expected_error_message
def test_launch_with_bad_ami():
p = subprocess.run([
'flintrock', 'launch', 'whatever-cluster',
'--ec2-ami', 'ami-badbad00'],
stderr=subprocess.PIPE)
assert p.returncode == 1
assert p.stderr.startswith(b"Error: Could not find")