This repository has been archived by the owner on Nov 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdedupe.py
248 lines (214 loc) · 8.99 KB
/
dedupe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import json
import re
import math
import subprocess
from argparse import ArgumentParser
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict
from tqdm import tqdm
FFMPEG_BIN = "ffmpeg"
SSIM_REGEX = re.compile(
r"^\[Parsed_ssim_0 @ 0x[0-9a-fA-F]+\] SSIM "
r"R:(?P<r1>-?\d+\.\d+) \((?P<r2>inf|-?\d+\.\d+)\) "
r"G:(?P<g1>-?\d+\.\d+) \((?P<g2>inf|-?\d+\.\d+)\) "
r"B:(?P<b1>-?\d+\.\d+) \((?P<b2>inf|-?\d+\.\d+)\) "
r"All:(?P<all1>-?\d+\.\d+) \((?P<all2>inf|-?\d+\.\d+)\)$"
)
def ffmpeg_ssim(a: Path, b: Path) -> float:
cmd = [
FFMPEG_BIN,
"-hide_banner",
"-i",
str(a),
"-i",
str(b),
"-lavfi",
"ssim",
"-f",
"null",
"-",
]
completed = subprocess.run(cmd, check=True, capture_output=True, encoding="utf-8")
# ffmpeg output goes to stderr
lines = completed.stderr.splitlines()
ssim_result = lines[-1]
m = SSIM_REGEX.match(ssim_result)
# if this fails, it's a bug because we couldn't parse the output
assert m, ssim_result
return float(m.group("all1"))
def mission_number(mission: str) -> int:
assert mission.startswith("m")
return int(mission[1:])
# "best" to "worst"
PACKAGE_NAMES = [
"rtexture4",
"rtexture2",
"texture8",
"texture6",
"texture4",
"texture2",
]
PACKAGE_SCORE_MIN = {p: i for (i, p) in enumerate(PACKAGE_NAMES)}
def package_number(package: str) -> int:
return PACKAGE_SCORE_MIN[package]
def maybe_same(a: Any, b: Any) -> bool:
# we want to test as many conditions here as possible,
# because the ffmpeg SSIM test is expensive. at this stage,
# we are only checking for very similar images. however,
# even with all these conditions it isn't enough to
# actually say the images are the same.
return bool(
a["width"] == b["width"]
and a["height"] == b["height"]
and a["stretch"] == b["stretch"]
and a["alpha"] == b["alpha"]
and a["colors"] == b["colors"]
)
def dedupe_missions(png_dir: Path, texture_atlas: Any) -> None:
for texture_name, texture_infos in tqdm(sorted(texture_atlas.items())):
# group texture infos by package
ti_by_package = defaultdict(list)
for ti in texture_infos:
# ignore duplicates
if "duplicate" in ti:
continue
ti_by_package[ti["package"]].append(ti)
# sort them by mission number
for package_tis in ti_by_package.values():
package_tis.sort(key=lambda ti: mission_number(ti["mission"]))
# check for duplicates with the same package name
for package_name, package_tis in ti_by_package.items():
# zero or one textures cannot have duplicates
if len(package_tis) < 2:
continue
# keep track of duplicates. the key is is the second textures's
# mission, and the value is the first textures's mission
duplicates: Dict[str, str] = {}
# try all combinations (we can shortcut some of these later)
for a in package_tis:
for b in package_tis:
# check we are working on the expected textures...
assert a["name"] == texture_name
assert b["name"] == texture_name
assert a["package"] == package_name
assert b["package"] == package_name
a_mission = a["mission"]
b_mission = b["mission"]
# don't compare the same mission
if a_mission == b_mission:
continue
# don't compare if we have already found a duplicate
if a_mission in duplicates or b_mission in duplicates:
continue
if maybe_same(a, b):
a_filename = f"{texture_name}-{a_mission}-{package_name}.png"
b_filename = f"{texture_name}-{b_mission}-{package_name}.png"
a_path = png_dir / texture_name / a_filename
b_path = png_dir / texture_name / b_filename
all_ssim = ffmpeg_ssim(a_path, b_path)
ssim_same = math.isclose(all_ssim, 1.0)
if ssim_same:
duplicates[b_mission] = a_mission
for ti in package_tis:
mission = ti["mission"]
# there maybe a chain of duplicates
maybe_dupe = mission
try:
for _ in range(30):
maybe_dupe = duplicates[maybe_dupe]
raise RuntimeError(
f"Duplicate cycle for '{texture_name}-{mission}-{package_name}'"
)
except KeyError:
pass
# if `maybe_dupe` is different than the original mission,
# record the de-dupe information
if maybe_dupe != mission:
ti["duplicate"] = {
"texture": texture_name,
"mission": maybe_dupe,
"package": package_name,
}
def dedupe_packages(png_dir: Path, texture_atlas: Any) -> None:
for texture_name, texture_infos in tqdm(sorted(texture_atlas.items())):
# group texture infos by mission
ti_by_mission = defaultdict(list)
for ti in texture_infos:
# ignore duplicates
if "duplicate" in ti:
continue
ti_by_mission[ti["mission"]].append(ti)
# sort them by package
for mission_tis in ti_by_mission.values():
mission_tis.sort(key=lambda ti: package_number(ti["package"]))
# check for duplicates with the same mission name
for mission_name, mission_tis in ti_by_mission.items():
# zero or one textures cannot have duplicates
if len(mission_tis) < 2:
continue
# keep track of duplicates. the key is is the second textures's
# package, and the value is the first textures's package
duplicates: Dict[str, str] = {}
# try all combinations (we can shortcut some of these later)
for a in mission_tis:
for b in mission_tis:
# check we are working on the expected textures...
assert a["name"] == texture_name
assert b["name"] == texture_name
assert a["mission"] == mission_name
assert b["mission"] == mission_name
a_package = a["package"]
b_package = b["package"]
# don't compare the same package
if a_package == b_package:
continue
# don't compare if we have already found a duplicate
if a_package in duplicates or b_package in duplicates:
continue
if maybe_same(a, b):
a_filename = f"{texture_name}-{mission_name}-{a_package}.png"
b_filename = f"{texture_name}-{mission_name}-{b_package}.png"
a_path = png_dir / texture_name / a_filename
b_path = png_dir / texture_name / b_filename
all_ssim = ffmpeg_ssim(a_path, b_path)
ssim_same = math.isclose(all_ssim, 1.0)
if ssim_same:
duplicates[b_package] = a_package
for ti in mission_tis:
package = ti["package"]
# there maybe a chain of duplicates
maybe_dupe = package
try:
for _ in range(30):
maybe_dupe = duplicates[maybe_dupe]
raise RuntimeError(
f"Duplicate cycle for '{texture_name}-{mission_name}-{package}'"
)
except KeyError:
pass
# if `maybe_dupe` is different than the original mission,
# record the de-dupe information
if maybe_dupe != package:
ti["duplicate"] = {
"texture": texture_name,
"mission": mission_name,
"package": maybe_dupe,
}
def main() -> None:
parser = ArgumentParser()
parser.add_argument(
"png_dir",
type=lambda value: Path(value).resolve(strict=True),
)
args = parser.parse_args()
atlas_path = args.png_dir / "atlas.json"
with atlas_path.open("r") as f:
texture_atlas = json.load(f)
dedupe_missions(args.png_dir, texture_atlas)
dedupe_packages(args.png_dir, texture_atlas)
dedupe_path = args.png_dir / "dedupe.json"
with dedupe_path.open("w") as f:
json.dump(texture_atlas, f, indent=2)
if __name__ == "__main__":
main()