-
Notifications
You must be signed in to change notification settings - Fork 0
/
write_images.py
226 lines (185 loc) · 7.48 KB
/
write_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import asyncio
import os
import time
from io import BytesIO
from random import random
from typing import Tuple, Optional
import aiohttp
from PIL import Image
from PIL import ImageFile
from dotenv import load_dotenv
from storage3.utils import StorageException
from supabase import create_client
ImageFile.LOAD_TRUNCATED_IMAGES = True
load_dotenv()
supabase_client = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY"))
async def adjust_image(image_url: str, session) -> Image.Image:
"""Adjust image at given url to specified sizes."""
# download the image
headers = {"User-Agent": "Thumbnail collection animal app; [email protected]"}
async with session.get(image_url, headers=headers) as response:
data = await response.read()
image = Image.open(BytesIO(data))
image_width, image_height = image.size
aspect_ratio = image_width / image_height
target_aspect_ratio = 1
# crop the image horizontally
if target_aspect_ratio < aspect_ratio:
crop_width = int(image_height * target_aspect_ratio)
crop_height = image_height
x0 = (image_width - crop_width) // 2
y0 = 0
# crop the image vertically
else:
crop_width = image_width
crop_height = int(image_width / target_aspect_ratio)
x0 = 0
y0 = (image_height - crop_height) // 2
# Define large image and thumbnail
image = image.crop((x0, y0, x0 + crop_width, y0 + crop_height))
image_width, image_height = image.size
# cover = image.resize((min(800, image_width), min(800, image_height)))
thumbnail = image.resize((min(150, image_width), min(150, image_height)))
return thumbnail
async def get_file_info(filename: str, session) -> Optional[Tuple[str, str, int, int]]:
"""Get file info for a given wiki image file. Returns url, license."""
api_url = "https://commons.wikimedia.org/w/api.php"
# Prepare the query parameters for the API request
params = {
"action": "query",
"format": "json",
"prop": "imageinfo",
"titles": filename,
"iiprop": "url|extmetadata|dimensions"
}
async with session.get(api_url, params=params) as response:
data = await response.json()
# Extract the relevant information from the JSON response
page_id = list(data["query"]["pages"].keys())[0]
image_info = data["query"]["pages"][page_id]["imageinfo"][0]
try:
original_url = image_info["url"]
license = image_info["extmetadata"]["LicenseShortName"]["value"]
width = image_info["width"]
height = image_info["height"]
except KeyError:
return None
return original_url, license, width, height
async def get_animal_image_url(species_id: int, animal_name: str, session, fill_null=False):
"""Asynchronously send a request for an animal image to the wiki commons API.
Limits the results to only images with a creative commons license, and with
squarish proportions."""
# Check if image url not already in db
existing_url = supabase_client.table("species_images").select("*").eq("species_id", species_id).execute()
record = existing_url.data[0] if existing_url.data else None
if record:
if fill_null and None in [record.get("thumbnail_name"), record.get("cover_url")]:
print(f"{species_id}: Try to fill record with null values")
else:
print(f"{species_id}: Record already exists")
return None
# Search query for images with the animal name
base_url = "https://commons.wikimedia.org/w/api.php"
payload = {
'action': 'query',
'list': 'search',
'srsearch': animal_name,
'srlimit': '10',
'prop': 'imageinfo',
'srnamespace': '6',
'format': 'json',
}
headers = {
'Content-Type': 'application/json;charset=UTF-8',
}
# Send the request
async with session.get(base_url, params=payload, headers=headers) as response:
await asyncio.sleep(random())
response_dict = await response.json()
# Get first image that conforms to the requirements
img_url = None
for i, img_result in enumerate(response_dict["query"]["search"]):
# Check if image is a jpg
title = img_result["title"]
if not title.startswith("File:") or not "jpg" in title:
continue
# Get image info
print(f"{species_id}: Sending requests for file info")
file_info = await get_file_info(title, session)
if not file_info:
continue
temp_url, license, width, height = file_info
print(f"{species_id}: Got requests for file info:")
# Check if image has a CC license and squarish proportions
if not "CC" in license and not "Public Domain" in license:
continue
if abs(width - height) > width * 0.4:
continue
# If so, save the image url and break the loop
img_url = temp_url
print(f"{species_id}: found url:", img_url, license, width, height)
break
# If no image found, save nulls on new record
if not img_url:
supabase_client.table("species_images").upsert({
"species_id": species_id,
"cover_url": None,
"thumbnail_name": None,
}).execute()
print(f"{species_id}: No url found, empty record added")
return None
# Adjust the image to the desired sizes
thumbnail = await adjust_image(img_url, session)
# Upload the image file
bytestream = BytesIO()
thumbnail.save(bytestream, format='JPEG')
bucket = supabase_client.storage.from_("animal-images")
try:
image_path = f"/thumbnail/{animal_name}.jpg"
file_type = "image/jpeg"
response = bucket.upload(image_path, bytestream.getvalue(),
{"content-type": file_type})
print(f"{species_id}: Image uploaded")
supabase_client.table("species_images").upsert({
"species_id": species_id,
"cover_url": img_url,
"thumbnail_name": image_path,
}).execute()
print(f"{species_id}: Record added")
return img_url
except StorageException:
print(f"{species_id}: Image already exists")
finally:
bytestream.close()
return
async def main(range_start: int, range_end: int):
"""Main function."""
tasks = []
async with aiohttp.ClientSession() as session:
for i in range(range_start, range_end):
species = supabase_client.from_("species_view").select(
"species_id",
"common_name",
"genus",
"species"
).eq("species_id", i)
result = species.execute()
if not result.data:
continue
# Get thumbnail and cover image
record = result.data[0]
binomial = (record.get("genus") or "") + " " + (record.get("species") or "")
animal_name = record.get("common_name") or binomial
species_id = record.get("species_id")
print('\n', species_id, ": starting task for ", animal_name)
task = asyncio.create_task(get_animal_image_url(species_id, animal_name, session, fill_null=True))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
start = time.time()
start_range = 90000
end_range = 95800
step_size = 50
for i in range(start_range, end_range, step_size):
asyncio.run(main(i, i + step_size))
print("TIME", start - time.time())