-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtests.py
330 lines (253 loc) · 11.1 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import gym
import random
import jsonpickle
# NOTE: to run this code, the OpenLockAgents must be in your PYTHONPATH
from openlockagents.common.agent import Agent, ROOT_DIR
from openlockagents.common.logger_agent import SubjectWriter
class ActionTest:
def __init__(self, name, reward=None):
self.name = name
self.reward = reward
def __eq__(self, other):
return self.name == other.name and self.reward == other.reward
def __str__(self):
return self.name + "," + str(self.reward)
def __repr__(self):
return str(self)
def test_ce3(agent):
scenario_name = "CE3"
trials_to_verify = ["trial1", "trial2", "trial3", "trial4", "trial5", "trial6"]
test_scenario(agent, scenario_name, trials_to_verify)
def test_ce4(agent):
scenario_name = "CE4"
trials_to_verify = ["trial7", "trial8", "trial9", "trial10", "trial11"]
test_scenario(agent, scenario_name, trials_to_verify)
def test_cc3(agent):
scenario_name = "CC3"
trials_to_verify = ["trial1", "trial2", "trial3", "trial4", "trial5", "trial6"]
test_scenario(agent, scenario_name, trials_to_verify)
def test_cc4(agent):
scenario_name = "CC4"
trials_to_verify = ["trial7", "trial8", "trial9", "trial10", "trial11"]
test_scenario(agent, scenario_name, trials_to_verify)
def test_scenario(agent, scenario_name, trials_to_verify):
agent.env.use_physics = True
for trial in trials_to_verify:
trial_selected = agent.setup_trial(
scenario_name=scenario_name,
action_limit=3,
attempt_limit=5,
specified_trial=trial,
)
solutions = agent.env.scenario.solutions
for solution in solutions:
agent.env.reset()
prev_num_solutions = len(agent.env.cur_trial.completed_solutions)
execute_solution(agent, solution)
agent.finish_attempt()
assert len(agent.env.cur_trial.completed_solutions) > prev_num_solutions
assert agent.env.cur_trial.attempt_seq[-1].success is True
agent.finish_trial(trial_selected, False)
assert agent.logger.trial_seq[-1].success is True
def execute_solution(agent, action_seq):
execute_action_seq(agent, action_seq)
def execute_action_seq(agent, action_seq):
for action_log in action_seq:
action = agent.env.action_map[action_log.name]
state, reward, done, opt = agent.env.step(action)
def verify_file_output_matches(env):
pass
def verify_simulator_fsm_match(agent, num_attempts_per_scenario):
scenarios_to_test = ["CE3", "CE4", "CC3", "CC4"]
for scenario_name in scenarios_to_test:
agent.env.use_physics = True
trial_selected = agent.setup_trial(
scenario_name=scenario_name,
action_limit=3,
attempt_limit=num_attempts_per_scenario,
)
for i in range(num_attempts_per_scenario):
done = False
agent.env.reset()
while not done:
action_idx = random.randrange(len(agent.env.action_map))
action = agent.env.action_map[agent.env.action_space[action_idx]]
state, reward, done, opt = agent.env.step(action)
agent.verify_fsm_matches_simulator(agent.env.observation_space)
agent.finish_attempt()
agent.finish_trial(trial_selected, False)
def test_rewards(agent):
data_dir = "./test-output/rewards"
scenarios_to_test = ["CE3", "CE4", "CC3", "CC4"]
reward_functions = [
"basic",
"change_state",
"unique_solutions",
"change_state_unique_solutions",
"negative_immovable_unique_solutions",
"negative_immovable",
"negative_immovable_partial_action_seq",
"negative_immovable_negative_repeat",
"negative_immovable_solution_multiplier",
"negative_immovable_partial_action_seq_solution_multiplier",
"negative_change_state_partial_action_seq_solution_multiplier",
]
action_seqs_ce = [
# all three actions do nothing
[
ActionTest("push_inactive0"),
ActionTest("push_inactive1"),
ActionTest("push_inactive0"),
],
# one action moves one lever
[
ActionTest("push_l2"),
ActionTest("push_inactive1"),
ActionTest("push_inactive0"),
],
[
ActionTest("push_l1"),
ActionTest("push_inactive1"),
ActionTest("push_inactive0"),
],
# move two levers
[ActionTest("push_l2"), ActionTest("push_l1"), ActionTest("push_inactive0")],
[ActionTest("push_l1"), ActionTest("push_l2"), ActionTest("push_inactive0")],
# unlock the door but don't open
[ActionTest("push_l2"), ActionTest("push_l0"), ActionTest("push_inactive0")],
[ActionTest("push_l2"), ActionTest("push_l0"), ActionTest("push_inactive0")],
[ActionTest("push_l1"), ActionTest("push_l0"), ActionTest("push_inactive0")],
[ActionTest("push_l1"), ActionTest("push_l0"), ActionTest("push_inactive0")],
# repeated actions
[ActionTest("push_l0"), ActionTest("push_l0"), ActionTest("push_inactive0")],
[ActionTest("push_l1"), ActionTest("push_l1"), ActionTest("push_inactive0")],
# push 3 levers
[ActionTest("push_l2"), ActionTest("push_l0"), ActionTest("push_l1")],
[ActionTest("push_l1"), ActionTest("push_l0"), ActionTest("push_l2")],
# open the door (repeat solutions)
[ActionTest("push_l2"), ActionTest("push_l0"), ActionTest("push_door")],
[ActionTest("push_l2"), ActionTest("push_l0"), ActionTest("push_door")],
[ActionTest("push_l1"), ActionTest("push_l0"), ActionTest("push_door")],
[ActionTest("push_l1"), ActionTest("push_l0"), ActionTest("push_door")],
]
action_seqs_cc = [
# all three actions do nothing
[
ActionTest("push_inactive0"),
ActionTest("push_inactive1"),
ActionTest("push_inactive0"),
],
# one action moves one lever
[
ActionTest("push_l0"),
ActionTest("push_inactive1"),
ActionTest("push_inactive0"),
],
[
ActionTest("push_l0"),
ActionTest("push_inactive1"),
ActionTest("push_inactive0"),
],
# move two levers
[ActionTest("push_l0"), ActionTest("push_inactive0"), ActionTest("push_l2")],
[ActionTest("push_l0"), ActionTest("push_inactive0"), ActionTest("push_l1")],
# unlock the door but don't open
[ActionTest("push_l0"), ActionTest("push_l1"), ActionTest("push_inactive0")],
[ActionTest("push_l0"), ActionTest("push_l1"), ActionTest("push_inactive0")],
[ActionTest("push_l0"), ActionTest("push_l2"), ActionTest("push_inactive0")],
[ActionTest("push_l0"), ActionTest("push_l2"), ActionTest("push_inactive0")],
# repeated actions
[ActionTest("push_l0"), ActionTest("push_l0"), ActionTest("push_inactive0")],
[ActionTest("push_l1"), ActionTest("push_l1"), ActionTest("push_inactive0")],
# push 3 levers
[ActionTest("push_l0"), ActionTest("push_l1"), ActionTest("push_l2")],
[ActionTest("push_l0"), ActionTest("push_l2"), ActionTest("push_l1")],
# open the door
[ActionTest("push_l0"), ActionTest("push_l1"), ActionTest("push_door")],
[ActionTest("push_l0"), ActionTest("push_l1"), ActionTest("push_door")],
[ActionTest("push_l0"), ActionTest("push_l2"), ActionTest("push_door")],
[ActionTest("push_l0"), ActionTest("push_l2"), ActionTest("push_door")],
]
for scenario_name in scenarios_to_test:
scenario_data_dir = data_dir + "/" + scenario_name
agent.env.use_physics = True
if scenario_name == "CE3" or scenario_name == "CE4":
action_seqs = action_seqs_ce
if scenario_name == "CC3" or scenario_name == "CC4":
action_seqs = action_seqs_cc
for reward_function in reward_functions:
trial_selected = agent.setup_trial(
scenario_name=scenario_name, action_limit=3, attempt_limit=10000
)
reward_filepath = scenario_data_dir + "/" + reward_function + ".json"
rewards = []
i = 0
for action_seq in action_seqs:
agent.env.reset()
action_seq_rewards = run_reward_test(agent, action_seq, reward_function)
agent.finish_attempt()
print("Rewards: {}".format(str(action_seq_rewards)))
rewards.append(action_seq_rewards)
i += 1
# uncomment to save the rewards to a file
save_reward_file(reward_filepath, rewards, action_seqs)
reward_file = load_reward_file(reward_filepath)
print("Loading reward file: {}".format(reward_file))
if rewards != reward_file:
mismatches = [i for i in reward_file if rewards[i] != reward_file[i]]
reward_file_mismatches = [reward_file[i] for i in mismatches]
rewards_mismatches = [rewards[i] for i in mismatches]
assert_err = "Reward does not match in {} reward function. Received reward of {}. Expected reward of {}".format(
reward_function, rewards_mismatches, reward_file_mismatches
)
assert reward_file == rewards, assert_err
agent.finish_trial(trial_selected, False)
def save_reward_file(path, rewards, action_seqs):
assert len(rewards) == len(action_seqs)
# print('Confirm you want to overwrite saved rewards by entering \'y\': ')
# ans = input()
# if ans != 'y':
# print('Exiting...')
# sys.exit(0)
json_str = jsonpickle.encode(rewards)
SubjectWriter.pretty_write(path, json_str)
def load_reward_file(path):
with open(path, "r") as f:
content = f.read()
rewards = jsonpickle.decode(content)
return rewards
def run_reward_test(agent, action_seq, reward_function):
agent.env.reward_mode = reward_function
rewards = []
for action_test in action_seq:
action = agent.env.action_map[action_test.name]
next_state, reward, done, opt = agent.env.step(action)
action_test.reward = reward
rewards.append(action_test)
return rewards
def main():
env = gym.make("openlock-v1")
params = {"data_dir": ROOT_DIR + "/../OpenLockUnitTests"}
# create session/trial/experiment manager
agent = Agent("unit tester", params, env)
agent.setup_subject()
print("Starting unit tests.")
# print('Testing CE3.')
# test_ce3(agent)
# print('Testing CC3')
# test_cc3(agent)
# print('Testing CC4')
# test_cc4(agent)
# print('Testing CE4.')
# test_ce4(agent)
# todo: implement verifying file output (json) against a known, correct output
verify_file_output_matches(agent)
print("Verifying physics simulator and FSM output matches.")
# verify_simulator_fsm_match(agent, 100)
print("Verifying rewards match saved values.")
# bypass physics sim
agent.env.use_physics = False
test_rewards(agent)
print("All tests passed")
if __name__ == "__main__":
main()