-
Notifications
You must be signed in to change notification settings - Fork 9
/
embodied_tasks.py
197 lines (152 loc) · 8.42 KB
/
embodied_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import pandas as pd
import traceback
from natsort import natsorted
from utils import encode_image, get_image_paths, LM_client, evaluate_texts
import os
import time
import ast
class EmbodiedTasks:
def __init__(self, query_dict, model, api_key):
self.query_dict = query_dict
self.model = model
self.api_key = api_key
def run(self):
"""
Derive the responses from the LLM API
"""
query_dict = self.query_dict
prompt_ = open("prompts/prompt1.txt", "r").read()
# Initializing large model
client = LM_client(self.model, self.api_key)
for qa in query_dict.keys():
# Path of images in dataset
path = 'dataset/imgs'
# Subfolder names corresponding to samples
list_dir = os.listdir(path)
list_dir = natsorted(list_dir)
if isinstance(query_dict[qa], str):
# Perform only single round interaction with the large model
# Initializing prompt
prompt = prompt_ + query_dict[qa]
# Output from LM
label = pd.DataFrame(columns=['folder_name', 'claude3'])
# Run each sample one by one and collect the output of LM
for count in range(len(list_dir)):
error_time = 0
while True:
try:
# Read the embodied image observation
folder_name = list_dir[count]
folder_path = os.path.join(path, folder_name)
imgs = get_image_paths(folder_path)
# Store historical information of large model dialogues
messages = []
# Run LM to obtain output
messages, answer = client.query(messages, prompt, imgs)
print(answer)
# Concat Output from LM
label.loc[label.shape[0], :] = [folder_name, answer]
break
except Exception as e:
# Dealing with abnormal situations caused by frequent LM access
print(f"An error occurred: {e}")
traceback.print_exc()
error_time += 1
time.sleep(10)
if error_time == 3:
label.loc[label.shape[0], :] = [folder_name, str(e)]
break
else:
continue
elif isinstance(query_dict[qa], list):
# Perform multiple rounds of interaction with the large model
# Initializing prompt
prompt1 = prompt_ + query_dict[qa][0]
prompt2 = query_dict[qa][1]
# Output from LM
label = pd.DataFrame(columns=['folder_name', '1', '2'])
# Run each sample one by one and collect the output of LM
for count in range(len(list_dir)):
error_time = 0
while True:
try:
# Read the embodied image observation
folder_name = list_dir[count]
folder_path = os.path.join(path, folder_name)
imgs = get_image_paths(folder_path)
# Store historical information of large model dialogues
messages = []
# Run LM to obtain output
messages, answer1 = client.query(messages, prompt1, imgs)
messages, answer2 = client.query(messages, prompt2)
# Concat Output from LM
label.loc[label.shape[0], :] = [folder_name, answer1, answer2]
break
except Exception as e:
# Dealing with abnormal situations caused by frequent LM access
print(f"An error occurred: {e}")
traceback.print_exc()
error_time += 1
time.sleep(10)
if error_time == 3:
label.loc[label.shape[0], :] = [folder_name, None, None]
break
else:
continue
# Save the results
save_path = 'results/%s_%s.csv' % (qa, model)
label.to_csv(save_path)
def evaluate(self):
"""
Evaluate the model's performance
"""
for qa in self.query_dict.keys():
if qa == 'scene' or qa[:2] == 'qa':
groundtruth_df = pd.read_csv('dataset/imgs_label/%s.csv' % qa, index_col=0)
label = pd.read_csv('results/%s_%s.csv' % (qa, model), index_col=0)
label = label.iloc[:, 1]
groundtruth = groundtruth_df.apply(lambda row: [row[0], row[1], row[2]], axis=1)
results_df = evaluate_texts(groundtruth, label)
elif qa[:6] == 'dialog':
groundtruth_df = pd.read_csv('dataset/imgs_label/%s.csv' % qa, index_col=0)
label = pd.read_csv('results/%s_%s.csv' % (qa, model), index_col=0)
df_extract = label.apply(lambda row: str(row[1]) + ' ' + str(row[2]), axis=1)
label = df_extract
# 将 'items' 列转换为列表
groundtruth_df.iloc[:, 2] = groundtruth_df.iloc[:, 2].apply(ast.literal_eval)
groundtruth_df.iloc[:, 3] = groundtruth_df.iloc[:, 3].apply(ast.literal_eval)
# 合并第二列和第三列成一个列表,并生成一个新的 Series
groundtruth = groundtruth_df.apply(lambda row: [row[0] + ' ' + row[1]] + [row[2][0] + ' ' + row[2][1]] + [row[3][0] + ' ' + row[3][1]], axis=1)
results_df = evaluate_texts(groundtruth, label)
elif qa[:2] == 'tp':
groundtruth_df = pd.read_csv('dataset/imgs_label/%s.csv' % qa, index_col=0)
label = pd.read_csv('results/%s_%s.csv' % (qa, model), index_col=0)
label = label.iloc[:, 1]
groundtruth = groundtruth_df.apply(lambda row: [row[1], row[2], row[3]], axis=1)
results_df = evaluate_texts(groundtruth, label)
print(qa, ': ')
print(results_df['mean'])
if __name__ == '__main__':
# Part of prompt for LM
# Embodied first-view scene understanding ('situation')
# Embodied question answering ('qa1', ..., 'qa10')
# Embodied dialogue ('dialog1', 'dialog2', 'dialog3')
# Embodied task planning ('tp1', 'tp2', 'tp3')
query_dict = {
'scene': 'please describe your current location, including the surrounding environment, your relationship to the environment, and any relevant spatial information.',
'qa1': 'How many traffic lights can be observed around in total?',
'qa2': 'Is there a building on the left side? What color is it?',
'qa3': 'Are you facing the road, the building, or the greenery?',
'dialog1': ['May I ask if there are any prominent waypoints around?',
'Where are they located respectively?'],
'dialog2': ['May I ask what color the building on the left is?',
'Where is it located relative to the road ahead'],
'dialog3': ['How many trees are there in the rear view?', 'What colors are they respectively'],
'tp1': 'I want to have a cup of coffee at ALL-Star coffee shop, but I have not brought any money. What should I do? Please give a chain-like plan.',
'tp2': 'I need to get an emergency medicine from the pharmacy, but I do not know the way. What should I do? Please give a chain-like plan.',
}
model = "xxxx" # LM models, for example: "claude-3-haiku-20240307", "gpt-4o"
api_key = "xxxxxxx" # Fill in API key
embodied_tasks = EmbodiedTasks(query_dict, model, api_key)
embodied_tasks.run()
embodied_tasks.evaluate()