-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_instructions.py
174 lines (150 loc) · 7.15 KB
/
generate_instructions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from typing import List, Dict, Callable
import os
import yaml
import json
import openai
import random
import backoff
import functools
import multiprocessing
from openai import OpenAI
from pydantic import BaseModel
from tqdm import tqdm
import numpy as np
import torch
from transformers import AutoTokenizer, CLIPTextModelWithProjection
def backoff_hdlr(details):
print ("Backing off {wait:0.1f} seconds after {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
def pairwise_cosine_similarity(x1, x2):
"""
Computes pairwise cosine similarity between two tensors.
Args:
x1 (torch.Tensor): First tensor of shape (N, D)
x2 (torch.Tensor): Second tensor of shape (M, D)
Returns:
torch.Tensor: Pairwise cosine similarity matrix of shape (N, M)
"""
x1 = x1 / x1.norm(dim=1, keepdim=True)
x2 = x2 / x2.norm(dim=1, keepdim=True)
return torch.matmul(x1, x2.t())
class CLIPEmbeddingModel:
def __init__(self, device="cuda"):
self.device = device
self.model = CLIPTextModelWithProjection.from_pretrained("openai/clip-vit-base-patch32").to(device)
self.tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32",
clean_up_tokenization_spaces=True)
def __call__(self, text: List[str]) -> torch.Tensor:
with torch.no_grad():
inputs = self.tokenizer(text, padding=True, return_tensors="pt").to(self.device)
outputs = self.model(**inputs)
text_embeds = outputs.text_embeds
return text_embeds
class InstructionSet(BaseModel):
instructions: List[str]
class EmbodiedRedTeamModel:
"""
The model takes the task description and the initial image as the feasible set as inputs and generates a set of instructions eliciting failure on the robot.
"""
def __init__(self,
embedding_model: Callable,
model: str = "gpt-4o-2024-08-06",
num_rejection_samples: int = 5):
"""
num_rejection_samples: number of samples for diversity rejection sampling
"""
self.client = OpenAI()
self.model = model
self.num_rejection_samples = num_rejection_samples
self.embedding_model = embedding_model
@backoff.on_exception(backoff.expo,
(openai.RateLimitError, openai.BadRequestError),
on_backoff=backoff_hdlr,
)
def __call__(self, task: str, image_url: str = None, examples: List[str] = [], num_instructions: int = 10, return_all_annotations=False):
# Compose example sets
if len(examples) > 0:
examples_message = " Here are the example instructions that met your goal:\n"
for i, example in enumerate(examples):
examples_message += f"- {example}\n"
else:
examples_message = " "
# Compose the prompt depending on providing image or not
if image_url:
content = [
{
"type": "text",
"text": f"The attached image is an example image of the initial state of a robot that will perform the task: {task}.{examples_message} Generate a diverse set of exactly {num_instructions} instructions."
},
{
"type": "image_url",
"image_url": {"url": image_url},
},
]
else:
content = [
{
"type": "text",
"text": f"The robot will perform the task: {task}.{examples_message} Generate a diverse set of exactly {num_instructions} instructions."
},
]
chat_completion = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": "You are a quality assurance engineer for a robot. Your goal is to come up with instructions that describe the given task correctly, is similar to what human users would possibly give, and yet challenge the robot's capability on accomplishing the task."
},
{
"role": "user",
"content": content
}
],
response_format=InstructionSet,
n=self.num_rejection_samples,
)
all_annotations: List[List[str]] = [choice.message.parsed.instructions for choice in chat_completion.choices]
if self.num_rejection_samples > 1:
all_sim: torch.Tensor = [self.embedding_model(annotations).mean().item() for annotations in all_annotations]
else:
all_sim: torch.Tensor = torch.Tensor([1]) # Dummy
if return_all_annotations:
return all_annotations[np.argmin(all_sim)], all_annotations
return all_annotations[np.argmin(all_sim)]
def _vlm_worker(task_and_links, examples):
task, links = task_and_links
embedding_model = CLIPEmbeddingModel("cuda")
red_team = EmbodiedRedTeamModel(embedding_model=embedding_model)
annotations = red_team(task, image_url=random.sample(links, k=1)[0], examples=examples[task])
return task, annotations
def _lm_worker(task, examples):
embedding_model = CLIPEmbeddingModel("cuda")
red_team = EmbodiedRedTeamModel(embedding_model=embedding_model)
annotations = red_team(task, examples=examples.get(task, []))
return task, annotations
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Generate instructions for red teaming")
parser.add_argument("--output_path", required=True, type=str, help="Output directory of the instructions.")
parser.add_argument("--examples_path", type=str, help="YAML file for the previously generated task-annotation pairs")
parser.add_argument("--task_images", type=str, default="vlm_initial_state_links.json", help="YAML file of all tasks and image links")
parser.add_argument("--use_image", action="store_true", default=False, help="Include eimage or not")
parser.add_argument("--max_num_workers", type=int, default=8, help="Number of parallel workers")
args = parser.parse_args()
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
multiprocessing.set_start_method('spawn', force=True)
with open(args.task_images, "r") as f:
vlm_task_to_links = json.load(f)
examples = {}
if args.examples_path and os.path.exists(args.examples_path):
with open(args.examples_path, "r") as f:
examples: Dict[str, List[str]] = yaml.safe_load(f)
with multiprocessing.Pool(args.max_num_workers) as pool:
if args.use_image:
results = list(tqdm(pool.imap(functools.partial(_vlm_worker, examples=examples), vlm_task_to_links.items()), total=len(vlm_task_to_links)))
else:
results = list(tqdm(pool.imap(functools.partial(_lm_worker, examples=examples), vlm_task_to_links.keys()), total=len(vlm_task_to_links)))
with open(args.output_path, "w") as f:
yaml.dump({k: v for k, v in results}, f)
print(f"Save the output at: {args.output_path}")