forked from MrHiraiwa/LineBotForGPTPlus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
langchainagent.py
212 lines (174 loc) · 6.79 KB
/
langchainagent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain_community.chat_models import ChatOpenAI
from langchain_community.utilities.google_search import GoogleSearchAPIWrapper
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from openai import OpenAI
from datetime import datetime, time, timedelta
import pytz
import requests
from bs4 import BeautifulSoup
from google.cloud import storage
from PIL import Image
import io
public_url = []
public_url_original = []
public_url_preview = []
user_id = []
message_id = []
bucket_name = []
file_age = []
llm = ChatOpenAI(model="gpt-3.5-turbo")
google_search = GoogleSearchAPIWrapper()
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper(lang='ja', doc_content_chars_max=1000, load_all_available_meta=True))
def google_search_results(query):
return google_search.results(query, 5)
def clock(dummy):
jst = pytz.timezone('Asia/Tokyo')
nowDate = datetime.now(jst)
nowDateStr = nowDate.strftime('%Y/%m/%d %H:%M:%S %Z')
return nowDateStr
def scraping(links):
contents = []
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36" ,
}
for link in links:
try:
response = requests.get(link, headers=headers, timeout=5) # Use headers
response.raise_for_status()
response.encoding = response.apparent_encoding
html = response.text
except requests.RequestException:
html = "<html></html>"
soup = BeautifulSoup(html, features="html.parser")
# Remove all 'a' tags
for a in soup.findAll('a'):
a.decompose()
content = soup.select_one("article, .post, .content")
if content is None or content.text.strip() == "":
content = soup.select_one("body")
if content is not None:
text = ' '.join(content.text.split()).replace("。 ", "。\n").replace("! ", "!\n").replace("? ", "?\n").strip()
contents.append(text)
return contents
def set_bucket_lifecycle(bucket_name, age):
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
rule = {
'action': {'type': 'Delete'},
'condition': {'age': age} # The number of days after object creation
}
bucket.lifecycle_rules = [rule]
bucket.patch()
#print(f"Lifecycle rule set for bucket {bucket_name}.")
def bucket_exists(bucket_name):
"""Check if a bucket exists."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
return bucket.exists()
def download_image(image_url):
""" PNG画像をダウンロードする """
response = requests.get(image_url)
return io.BytesIO(response.content)
def create_preview_image(original_image_stream):
""" 画像のサイズを縮小してプレビュー用画像を生成する """
image = Image.open(original_image_stream)
image.thumbnail((640, 640)) # 画像の最大サイズを1024x1024に制限
preview_image = io.BytesIO()
image.save(preview_image, format='PNG')
preview_image.seek(0)
return preview_image
def upload_blob(bucket_name, source_stream, destination_blob_name, content_type='image/png'):
"""Uploads a file to the bucket from a byte stream."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_file(source_stream, content_type=content_type)
public_url = f"https://storage.googleapis.com/{bucket_name}/{destination_blob_name}"
return public_url
except Exception as e:
print(f"Failed to upload file: {e}")
raise
def generate_image(prompt):
global public_url_original
global public_url_preview
blob_path = f'{user_id}/{message_id}.png'
preview_blob_path = f'{user_id}/{message_id}_s.png'
client = OpenAI()
try:
response = client.images.generate(
model="dall-e-3",
prompt=prompt,
size="1024x1024",
quality="standard",
n=1,
)
image_result = response.data[0].url
except Exception as e:
return e
if bucket_exists(bucket_name):
set_bucket_lifecycle(bucket_name, file_age)
else:
print(f"Bucket {bucket_name} does not exist.")
return 'OK'
# PNG画像をダウンロード
png_image = download_image(image_result)
# プレビュー画像を生成
preview_image = create_preview_image(png_image)
png_image.seek(0) # ストリームをリセット
# 元のPNG画像をアップロード
public_url_original = upload_blob(bucket_name, png_image, blob_path)
# プレビュー用のPNG画像をアップロード
public_url_preview = upload_blob(bucket_name, preview_image, preview_blob_path)
return 'generated the image. The image you generated has already been displayed to the user. No need to send URL.'
tools = [
Tool(
name = "Search",
func=google_search_results,
description="useful for when you need to answer questions about current events. it is single-input tool Search."
),
Tool(
name = "Clock",
func=clock,
description="useful for when you need to know what time it is. it is single-input tool."
),
Tool(
name = "Scraping",
func=scraping,
description="useful for when you need to read a web page by specifying the URL. it is single-input tool."
),
Tool(
name = "Wikipedia",
func=wikipedia,
description="useful for when you need to Read dictionary page by specifying the word. it is single-input tool."
),
Tool(
name = "Painting",
func= generate_image,
description="It is a useful tool that can generate image based on the Sentence by specifying the Sentence."
),
]
mrkl = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS, verbose=True)
def langchain_agent(question, USER_ID, MESSAGE_ID, BUCKET_NAME=None, FILE_AGE=None):
global user_id
global message_id
global bucket_name
global file_age
global public_url_original
global public_url_preview
public_url_original = []
public_url_preview = []
user_id = USER_ID
message_id = MESSAGE_ID
bucket_name = BUCKET_NAME
file_age = FILE_AGE
try:
result = mrkl.run(question)
return result, public_url_original, public_url_preview
except Exception as e:
print(f"An error occurred: {e}")
# 何らかのデフォルト値やエラーメッセージを返す
return "An error occurred while processing the question"