-
Notifications
You must be signed in to change notification settings - Fork 1
/
gpt_functions.py
373 lines (295 loc) · 12.4 KB
/
gpt_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
"""
This is just a more updated version of gpt_utils.py that I have been using in other projects. I don't want to update the old usages so I am keeping it for
compat but this one is more robust and should be used going forward (and honestly this should have been spun off into its own actual library at this point).
"""
import hashlib
import json
import os
import re
import tempfile
import time
from io import StringIO
import openai
import pandas as pd
import pyperclip
g_model = "gpt-4o-mini"
# g_model = "gpt-3.5-turbo-16k"
# g_model = "gpt-3.5-turbo-0301"
# g_model = "gpt-4"
total_cost = 0
seed = ""
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def set_seed(new_seed):
global seed
seed = new_seed
def get_digest(input_string):
global seed
return hashlib.sha256(f"{input_string}{seed}".encode('utf-8')).hexdigest()
def print_cost():
print(f"${total_cost:0.4f}")
def reset_cost():
global total_cost
total_cost = 0
def get_cost():
return total_cost
def get_models():
openai.api_key = os.getenv("OPENAI_API_KEY")
models = openai.Model.list()
print(models)
pass
def backup_response(prompt_string, messages, prompt_tokens, response_tokens, total_tokens, total_prompts):
digest = get_digest(str(prompt_string))
with open(f"gpt_caches/{digest}.json", "w", encoding="utf-8") as fp:
json.dump({
"messages": messages,
"prompt_tokens": prompt_tokens,
"response_tokens": response_tokens,
"total_prompts": total_prompts,
"total_tokens": total_tokens
}, fp, indent=2)
def load_response(prompts):
digest = get_digest(str(prompts))
if os.path.exists(f"gpt_caches/{digest}.json"):
with open(f"gpt_caches/{digest}.json", "r", encoding="utf-8") as fp:
response = json.load(fp)
return response, digest
return None, digest
def load_response_digest(digest):
if os.path.exists(f"gpt_caches/{digest}.json"):
with open(f"gpt_caches/{digest}.json", "r", encoding="utf-8") as fp:
response = json.load(fp)
return response, digest
return None, digest
def delete_response_digest(digest):
if os.path.exists(f"gpt_caches/{digest}.json"):
os.remove(f"gpt_caches/{digest}.json")
else:
print("response does not exist")
def get_proper_digest(model=g_model, system_prompt=None, user_prompts=()):
if type(user_prompts) == str:
user_prompts = [user_prompts]
prompt_string = f"{model}-{system_prompt}-{user_prompts}"
digest = get_digest(prompt_string)
return digest, prompt_string
def calculate_cost(model, in_tokens, out_tokens=0):
# given a model and a number of tokens, calculate the estimated cost in USD
# gpt4: 0.03 per 1k tokens input, 0.06 per 1k tokens output
# gpt3: 0.0015 per 1k tokens, 0.002 per 1k tokens output
model_pricing = {
"gpt-3.5-turbo": {"in": 0.0015, "out": 0.002},
"gpt-3.5-turbo-16k": {"in": 0.003, "out": 0.004},
"gpt-3.5-turbo-0301": {"in": 0.0015, "out": 0.002},
"gpt-4": {"in": 0.03, "out": 0.06},
"gpt-4o": {"in": 0.005, "out": 0.015},
"gpt-4o-mini": {"in": 0.00015, "out": 0.0006},
}
cost = in_tokens * model_pricing[model]["in"] + out_tokens * model_pricing[model]["out"]
return cost/1000
def query_gpt(user_prompts=(), system_prompt=None, model=g_model, note=None, no_print=False, fake=False, bypass_cache=False, json_mode=False, expected_schema=None):
global total_cost
# I also want to be able to use this with just a single string
if type(user_prompts) == str:
user_prompts = [user_prompts]
digest, prompt_string = get_proper_digest(model, system_prompt, user_prompts)
response, digest = load_response_digest(digest)
# notes are just so I know which query is running at a glance
note_text = f"starting with '{user_prompts[0][:40]}...'" if note is None else f"({note})"
query_desc = f"{digest[:20]}..., {len(user_prompts): >3} prompt(s), {note_text} -> {model}"
# load a cached response if it exists, and if we are not bypassing it
if response is not None and not bypass_cache:
prompt_cost = calculate_cost(model, response["prompt_tokens"], response["response_tokens"])
total_cost += prompt_cost
if not no_print:
print(
f'Found existing: {query_desc} | used {response["total_prompts"]} prompts and {response["total_tokens"]} tokens for ${prompt_cost:02.6f}',
flush=True
)
return response["messages"], response["messages"][-1]
if not no_print:
print(f"Generating new: {query_desc} ", end="", flush=True)
# load openai key
openai.api_key = os.getenv("OPENAI_API_KEY")
# stats
prompt_tokens = 0
response_tokens = 0
total_tokens = 0
total_prompts = 0
# build the initial messages object
messages = [{"role": "system", "content": system_prompt}] if system_prompt is not None else []
if model == "gpt-4":
fake = True
timer_start = time.time()
completion = None
for prompt in user_prompts:
if not no_print:
print(total_prompts, end=",")
# append the next prompt to the messages
messages.append({"role": "user", "content": prompt})
if fake:
pyperclip.copy(prompt)
breakpoint()
# contact openai and get a response
do_json_mode = {}
if json_mode:
do_json_mode["response_format"] = {"type": "json_object"}
completion = try_gpt(
openai.ChatCompletion.create,
expected_schema=expected_schema,
model=model,
messages=messages,
**do_json_mode,
)
# extract the response from the completion
response = completion["choices"][-1]["message"]
# append the response to the messages
messages.append(response)
# add to stats
prompt_tokens += completion.usage.prompt_tokens
response_tokens += completion.usage.completion_tokens
total_prompts += 1
total_tokens += completion.get("usage", {}).get("total_tokens", 0)
pass
timer_end = time.time()
prompt_cost = calculate_cost(model, prompt_tokens, response_tokens)
total_cost += prompt_cost
if not no_print:
print(f" | used {total_prompts} prompts and {total_tokens} tokens for ${prompt_cost:06.06f}, {timer_end - timer_start:05.02f}s", flush=True)
backup_response(prompt_string, messages, prompt_tokens, response_tokens, total_tokens, total_prompts)
return messages, messages[-1]
def get_cached(user_prompts=(), system_prompt=None, model=g_model):
# I also want to be able to use this with just a single string
if type(user_prompts) == str:
user_prompts = [user_prompts]
# load a cached response if it exists
digest, prompt_string = get_proper_digest(model, system_prompt, user_prompts)
response, digest = load_response_digest(digest)
if response is not None:
return response["messages"], response["messages"][-1], digest
return None, None, digest
def check_schema(schema, obj, path=""):
missing = []
extra = []
if isinstance(schema, dict):
if not isinstance(obj, dict):
return [path or "root"], [] # Missing entire dict structure
# Check for missing keys
for key, sub_schema in schema.items():
if key not in obj:
missing.append(f"{path + '.' if path else ''}{key}")
else:
# Recursively check sub-objects
sub_missing, sub_extra = check_schema(sub_schema, obj[key], f"{path + '.' if path else ''}{key}")
missing.extend(sub_missing)
extra.extend(sub_extra)
# Check for extra keys
for key in obj:
if key not in schema:
extra.append(f"{path + '.' if path else ''}{key}")
elif isinstance(schema, list):
if not isinstance(obj, list):
return [path or "root"], [] # Expected a list but got something else
# Check list elements (assuming all elements should match the first schema item)
for i, item in enumerate(obj):
sub_missing, sub_extra = check_schema(schema[0], item, f"{path}[{i}]")
missing.extend(sub_missing)
extra.extend(sub_extra)
else:
# Check for type mismatch
if not isinstance(obj, schema):
missing.append(path)
return missing, extra
# automatic retries because I am tired of openai timing out.
# todo: pass a pre bound function rather than the function and its parameters
def try_gpt(call, *args, expected_schema=None, **kwargs):
total_attempts = 10
output = None
success = False
err = None
for attempt_num in range(total_attempts):
try:
time.sleep(1)
output = call(*args, **kwargs)
# quickly check the schema if needed
if expected_schema is not None:
missing, extra = check_schema(expected_schema, json.loads(extract_code_block(output["choices"][-1]["message"]["content"])))
assert len(missing) == 0 and len(extra) == 0
success = True
break
except (openai.error.ServiceUnavailableError, openai.error.APIError, openai.error.Timeout) as err1:
err = err1
print(f"\n---===>>> Could not complete, waiting 10 seconds and trying again... ({attempt_num+1}/{total_attempts}) <<<===---")
time.sleep(10)
continue
except AssertionError as err1:
err = err1
print(f"\n---===>>> Did not follow required json schema, trying again... ({attempt_num+1}/{total_attempts}) <<<===---")
continue
if not success:
raise err
return output
def extract_code_block(text):
code_block = re.search(r'```(?:json\r?\n|csv\r?\n)?(.*?)```', text, re.DOTALL)
return code_block.group(1).strip() if code_block else text
def get_role_messages(messages, role="assistant"):
return [message for message in messages if message["role"] == role]
def get_last_content(messages):
return messages[-1]["content"]
def get_all_content(messages):
return [message["content"] for message in messages]
def csv_to_df(text, headers):
# to keep consistent between windows/linux we remove carriage return chars
text = text.replace("\r", "")
# sometimes gpt includes the header text even though I do not want it to, so here we test to see if it did to inform pd.read_csv
includes_header = set([cell.strip(' "\'') for cell in text.split("\n")[0].split(",")]) == set(headers)
# remove trailing commas, GPT seems to love to sometimes include those.
text = "\n".join([line.strip(",") for i, line in enumerate(text.split("\n"))])
# replace escaped " characters with fancy quote characters because it helps
# todo: undo this in the df
text = text.replace(r"\"", "“")
# create the df, but drop any row that is all nan.
try:
df = pd.read_csv(
StringIO(text),
encoding="utf-8",
header=0 if includes_header else None,
names=headers,
quotechar='"',
sep=',',
skipinitialspace=True
).dropna(how='all')
except pd.errors.ParserError as err:
# replace [quote comma space quote] sequence with something that is not used, then replace all the other commas with something unused
unused1 = "[[[[[[[[["
unused_comma = ","
text = text.replace("\", \"", unused1)
text = text.replace(",", unused_comma)
text = text.replace(unused1, "\", \"")
df = pd.read_csv(
StringIO(text),
encoding="utf-8",
header=0 if includes_header else None,
names=headers,
quotechar='"',
sep=',',
skipinitialspace=True
).dropna(how='all')
# todo: undo the weird stuff I did earlier
# sometimes gpt numbers the entries even though I tell it not to.
# To combat that I look in the first column and try to strip out any
# df[headers[0]] = df[headers[0]].apply(lambda x: re.sub(r'^\s*\d+\.', '', x).strip())
# sometimes gpt uses a dash for blanks, these should be removed too
df = df.replace("-", "")
df = df.fillna("")
return df
if __name__ == '__main__':
get_models()
pass