Files
QuaternionEngine/texture_compression.py
2025-11-09 18:27:25 +09:00

245 lines
8.3 KiB
Python

import argparse, json, os, re, shlex, subprocess, sys
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from PIL import Image
PIL_OK = True
except Exception:
PIL_OK = False
DEFAULT_SUFFIX = {
"albedo": ["_albedo", "_basecolor", "_base_colour", "_base_color", "_base", "baseColor"],
"mr": ["_mr", "_orm", "_metalrough", "_metallicroughness", "metallicRoughness"],
"normal": ["_normal", "_norm", "_nrm", "_normalgl"]
}
SUPPORTED_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".tga", ".tif", ".tiff"}
def which_or_die(cmd):
from shutil import which
if which(cmd) is None:
sys.exit(f"ERROR: `{cmd}` not found in PATH")
def build_suffix_regex(tokens):
# ex) Foo_BaseColor -> _basecolor
alt = "|".join([re.escape(t.lower()) for t in tokens])
return re.compile(rf"(^|[_\-.])({alt})$", re.IGNORECASE)
def detect_role_by_suffix(stem, rx):
s = stem.lower()
for role, r in rx.items():
if r.search(s):
return role
return None
def parse_gltf_roles(gltf_path: Path):
"""glTF(.gltf JSON) to get image role"""
roles = {} # uri-> role (albedo/mr/normal)
if not gltf_path.exists():
return roles
if gltf_path.suffix.lower() == ".gltf":
data = json.loads(gltf_path.read_text(encoding="utf-8"))
else:
return roles
images = data.get("images", [])
textures = data.get("textures", [])
materials = data.get("materials", [])
# texture index -> image uri
tex_to_uri = {}
for i, tex in enumerate(textures):
src = tex.get("source")
if src is not None and 0 <= src < len(images):
uri = images[src].get("uri")
if uri:
tex_to_uri[i] = uri
def mark(uri, role):
if not uri:
return
# normal > albedo > mr 우선
prio = {"normal": 3, "albedo": 2, "mr": 1}
old = roles.get(uri)
if old is None or prio[role] > prio.get(old, 0):
roles[uri] = role
for mat in materials:
pbr = mat.get("pbrMetallicRoughness", {})
base = pbr.get("baseColorTexture", {})
mr = pbr.get("metallicRoughnessTexture", {})
nor = mat.get("normalTexture", {})
if "index" in base and base["index"] in tex_to_uri:
mark(tex_to_uri[base["index"]], "albedo")
if "index" in mr and mr["index"] in tex_to_uri:
mark(tex_to_uri[mr["index"]], "mr")
if "index" in nor and nor["index"] in tex_to_uri:
mark(tex_to_uri[nor["index"]], "normal")
return roles
def has_meaningful_alpha(img_path: Path) -> bool:
if not PIL_OK:
return False
try:
with Image.open(img_path) as im:
if ("A" in im.getbands()) or ("transparency" in im.info):
im = im.convert("RGBA")
alpha = im.getchannel("A")
extrema = alpha.getextrema()
return bool(extrema and extrema != (255, 255))
return False
except Exception:
return False
return False
def decide_targets(role, albedo_target, img_path):
""" return transcode target(BCn), OETF(srgb/linear)"""
if role == "normal":
return "bc5", "linear"
if role == "mr":
return "bc7", "linear"
# albedo
if albedo_target == "auto":
if has_meaningful_alpha(img_path):
return "bc3", "srgb"
else:
return "bc1", "srgb"
elif albedo_target in ("bc1", "bc3", "bc7"):
return albedo_target, "srgb"
else:
return "bc7", "srgb"
def run_cmd(args_list, dry_run=False):
cmd = " ".join(shlex.quote(a) for a in args_list)
if dry_run:
print(f"[DRY] {cmd}")
return 0
try:
subprocess.run(args_list, check=True)
return 0
except subprocess.CalledProcessError as e:
print(f"[ERR] {cmd}\n -> exit {e.returncode}", file=sys.stderr)
return e.returncode
def process_one(img_path: Path, out_dir: Path, role, opts):
stem = img_path.stem
out_dir.mkdir(parents=True, exist_ok=True)
tmp_dir = out_dir / ".intermediate"
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_ktx2 = tmp_dir / f"{stem}.uastc.ktx2"
# 1) PNG -> KTX2(UASTC)
target_bc, oetf = decide_targets(role, opts.albedo_target, img_path)
toktx = [
"toktx",
"--t2",
"--encode", "uastc",
"--uastc_quality", str(opts.uastc_quality),
]
if role == "normal":
toktx += ["--normal_mode", "--normalize"]
if opts.mipmaps:
toktx += ["--genmipmap"]
if opts.flip_y:
toktx += ["--lower_left_maps_to_s0t0"]
# albedo: srgb, else linear
toktx += ["--assign_oetf", oetf]
toktx += [str(tmp_ktx2), str(img_path)]
rc = run_cmd(toktx, dry_run=opts.dry_run)
if rc != 0: return rc
# 2) UASTC KTX2 -> BCn KTX2
out_ktx2 = out_dir / f"{stem}.ktx2"
ktx_trans = [
"ktx", "transcode",
"--target", target_bc,
"--zstd", "18",
str(tmp_ktx2), str(out_ktx2)
]
rc = run_cmd(ktx_trans, dry_run=opts.dry_run)
if rc != 0: return rc
if not opts.keep_temp and not opts.dry_run:
try:
tmp_ktx2.unlink()
except Exception:
pass
return 0
def main():
p = argparse.ArgumentParser(description="Image → KTX2(BCn) encoder (toktx + ktx transcode)")
p.add_argument("-i", "--input", required=True, help="Input folder(recursive) or image file")
p.add_argument("-o", "--output", required=True, help="Output folder")
p.add_argument("--gltf", help=".gltf File path (optional). glTF first, suffix last")
p.add_argument("--suffix-albedo", default=",".join(DEFAULT_SUFFIX["albedo"]),
help="albedo suffix CSV (Base: %s)" % ",".join(DEFAULT_SUFFIX["albedo"]))
p.add_argument("--suffix-mr", default=",".join(DEFAULT_SUFFIX["mr"]))
p.add_argument("--suffix-normal", default=",".join(DEFAULT_SUFFIX["normal"]))
p.add_argument("--albedo-target", choices=["auto", "bc1", "bc3", "bc7"], default="bc7",
help="albedo BC format(auto=non alpha BC1, alpha BC3)")
p.add_argument("--uastc-quality", type=int, default=2, help="UASTC quality(0~4)")
p.add_argument("--mipmaps", action="store_true", help="mipmap generation")
p.add_argument("--flip-y", action="store_true", help="Y flip(t0 to bottom)")
p.add_argument("--keep-temp", action="store_true", help="Preserve temporal UASTC")
p.add_argument("-j", "--jobs", type=int, default=os.cpu_count() or 4, help="Concurrent task size")
p.add_argument("--dry-run", action="store_true", help="Don't execute")
opts = p.parse_args()
which_or_die("toktx")
which_or_die("ktx")
rx = {
"albedo": build_suffix_regex([s.strip() for s in opts.suffix_albedo.split(",") if s.strip()]),
"mr": build_suffix_regex([s.strip() for s in opts.suffix_mr.split(",") if s.strip()]),
"normal": build_suffix_regex([s.strip() for s in opts.suffix_normal.split(",") if s.strip()]),
}
gltf_roles = {}
if opts.gltf:
gltf_roles = parse_gltf_roles(Path(opts.gltf))
in_path = Path(opts.input)
img_files = []
if in_path.is_file() and in_path.suffix.lower() in SUPPORTED_IMAGE_EXTS:
img_files = [in_path]
else:
for ext in SUPPORTED_IMAGE_EXTS:
img_files.extend(in_path.rglob(f"*{ext}"))
if not img_files:
sys.exit("No input images (supported: .png .jpg .jpeg .tga .tif .tiff).")
out_dir = Path(opts.output)
def decide_role_for_path(p: Path):
if gltf_roles:
for uri, role in gltf_roles.items():
if Path(uri).name.lower() == p.name.lower():
return role
by_suffix = detect_role_by_suffix(p.stem, rx)
return by_suffix or "albedo"
tasks = []
with ThreadPoolExecutor(max_workers=opts.jobs) as ex:
futs = {}
for img in img_files:
role = decide_role_for_path(img)
fut = ex.submit(process_one, img, out_dir, role, opts)
futs[fut] = (img, role)
any_err = False
for fut in as_completed(futs):
img, role = futs[fut]
rc = fut.result()
status = "OK" if rc == 0 else f"ERR({rc})"
print(f"[{status}] {img.name} -> role={role}")
if rc != 0:
any_err = True
if any_err:
sys.exit(2)
if __name__ == "__main__":
main()