-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
executable file
·225 lines (181 loc) · 6.22 KB
/
Copy pathtest.py
File metadata and controls
executable file
·225 lines (181 loc) · 6.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!env/bin/python
import os
import subprocess
import argparse
import json
import time
import fnmatch
from typing import Sequence, Mapping
import elasticsearch
from elastipy import connections
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"--live", type=bool, default=False, nargs="?", const=True,
help="Test against elasticsearch backend"
)
parser.add_argument(
"--failfast", type=bool, default=False, nargs="?", const=True,
help="Stop at first error"
)
parser.add_argument(
"-es", "--elasticsearch", type=str, default=None,
help="Json representation of elasticsearch server settings"
)
parser.add_argument(
"-c", "--coverage", type=bool, default=False, nargs="?", const=True,
help="Show coverage report"
)
parser.add_argument(
"-m", "--missing", type=bool, default=False, nargs="?", const=True,
help="Show missing line numbers in coverage report"
)
parser.add_argument(
"-i", "--include", type=str, nargs="+",
help="wildcard patterns to match specific tests"
)
parser.add_argument(
"-e", "--exclude", type=str, nargs="+",
help="wildcard patterns to exclude specific tests"
)
parser.add_argument(
"-l", "--list", type=bool, nargs="?", default=False, const=True,
help="Simply list all tests, do not run."
)
return parser.parse_args()
def check_cluster_ready(params: str = None, max_seconds: int = 60, interval: int = 5):
"""
Wait that elasticsearch cluster is connected and ready
"""
# override default connection
if params:
connections.set("default", json.loads(params))
for i in range(0, max_seconds, interval):
try:
print(connections.get())
health = connections.get("default").cat.health(format="json")
health = health[0] # expect at least one node
if health["status"] != "red":
return
print("waiting for elasticsearch status change:", health["status"])
except elasticsearch.ConnectionError as e:
print(e)
print("waiting for elasticsearch server")
time.sleep(interval)
print("elasticsearch server not available")
exit(1)
def run_test(
package_names: Sequence[str],
extra_args,
extra_env: Mapping = None,
coverage: bool = False,
):
env = os.environ.copy()
if extra_env:
env.update(extra_env)
if coverage:
return subprocess.call(
["coverage", "run", "-m", "unittest", *extra_args, *package_names],
env=env
)
else:
return subprocess.call(
["python", "-m", "unittest", *extra_args, *package_names],
env=env
)
def get_test_names(package_names: list, include: list, exclude: list) -> list:
"""
Return a list of strings passable as arguments to `python -m unittest`
which contain all tests that match any one of the wildcard patterns
:param package_names: `list[str]`
:param include: `list[str]` or None
:param exclude: `list[str]` or None
:return: `list[str]`
"""
import unittest
import inspect
import importlib
if include is not None:
include = [
f"*{p}*" if "*" not in p and "?" not in p else p
for p in include
]
if exclude is not None:
exclude = [
f"*{p}*" if "*" not in p and "?" not in p else p
for p in exclude
]
classes = set()
for package_name in package_names:
module = importlib.import_module(package_name)
for key in dir(module):
thing = getattr(module, key)
if inspect.isclass(thing) and issubclass(thing, unittest.TestCase):
classes.add(thing)
names = []
for klass in classes:
for key in dir(klass):
thing = getattr(klass, key)
if key.startswith("test_") and callable(thing):
name = f"{klass.__name__}.{thing.__name__}"
filename = inspect.getfile(inspect.getmodule(klass))[:-3]
path = filename.split("/")
path = path[path.index("tests"):]
path = ".".join(path)
name = f"{path}.{name}"
if include is not None:
has_match = False
for p in include:
if fnmatch.fnmatchcase(name, p):
has_match = True
break
if not has_match:
continue
if exclude is not None:
has_match = False
for p in exclude:
if fnmatch.fnmatchcase(name, p):
has_match = True
break
if has_match:
continue
names.append(name)
return sorted(names)
if __name__ == "__main__":
options = parse_arguments()
extra_args = []
extra_env = dict()
package_names = ["tests"]
if options.live:
package_names.append("tests.live")
check_cluster_ready(options.elasticsearch)
if options.failfast:
extra_args.append("--failfast")
if options.elasticsearch:
extra_env["ELASTIPY_UNITTEST_SERVER"] = options.elasticsearch
if options.include or options.exclude or options.list:
all_test_names = get_test_names(
package_names, options.include, options.exclude
)
if not all_test_names:
print("No matches found")
exit(1)
if options.list:
for n in sorted(all_test_names):
print(n)
exit(0)
extra_args += all_test_names
package_names = []
code = run_test(
package_names=package_names,
extra_args=extra_args,
extra_env=extra_env,
coverage=options.coverage or options.missing,
)
if code:
exit(code)
if options.coverage or options.missing:
report_args = []
if options.missing:
report_args.append("--show-missing")
subprocess.call(["coverage", "report", *report_args])