244 lines
8.3 KiB
Python
244 lines
8.3 KiB
Python
import argparse, json, os, re, shlex, subprocess, sys
|
|
from pathlib import Path
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
try:
|
|
from PIL import Image
|
|
PIL_OK = True
|
|
except Exception:
|
|
PIL_OK = False
|
|
|
|
DEFAULT_SUFFIX = {
|
|
"albedo": ["_albedo", "_basecolor", "_base_colour", "_base_color", "_base", "baseColor", "BaseColor"],
|
|
"mr": ["_mr", "_orm", "_metalrough", "_metallicroughness", "metallicRoughness", "Metallic"],
|
|
"normal": ["_normal", "_norm", "_nrm", "_normalgl", "Normal"]
|
|
}
|
|
|
|
SUPPORTED_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".tga", ".tif", ".tiff"}
|
|
|
|
def which_or_die(cmd):
|
|
from shutil import which
|
|
if which(cmd) is None:
|
|
sys.exit(f"ERROR: `{cmd}` not found in PATH")
|
|
|
|
def build_suffix_regex(tokens):
|
|
# ex) Foo_BaseColor -> _basecolor
|
|
alt = "|".join([re.escape(t.lower()) for t in tokens])
|
|
return re.compile(rf"(^|[_\-.])({alt})$", re.IGNORECASE)
|
|
|
|
def detect_role_by_suffix(stem, rx):
|
|
s = stem.lower()
|
|
for role, r in rx.items():
|
|
if r.search(s):
|
|
return role
|
|
return None
|
|
|
|
def parse_gltf_roles(gltf_path: Path):
|
|
"""glTF(.gltf JSON) to get image role"""
|
|
roles = {} # uri-> role (albedo/mr/normal)
|
|
if not gltf_path.exists():
|
|
return roles
|
|
if gltf_path.suffix.lower() == ".gltf":
|
|
data = json.loads(gltf_path.read_text(encoding="utf-8"))
|
|
else:
|
|
return roles
|
|
|
|
images = data.get("images", [])
|
|
textures = data.get("textures", [])
|
|
materials = data.get("materials", [])
|
|
|
|
# texture index -> image uri
|
|
tex_to_uri = {}
|
|
for i, tex in enumerate(textures):
|
|
src = tex.get("source")
|
|
if src is not None and 0 <= src < len(images):
|
|
uri = images[src].get("uri")
|
|
if uri:
|
|
tex_to_uri[i] = uri
|
|
|
|
def mark(uri, role):
|
|
if not uri:
|
|
return
|
|
|
|
prio = {"normal": 3, "albedo": 2, "mr": 1}
|
|
old = roles.get(uri)
|
|
if old is None or prio[role] > prio.get(old, 0):
|
|
roles[uri] = role
|
|
|
|
for mat in materials:
|
|
pbr = mat.get("pbrMetallicRoughness", {})
|
|
base = pbr.get("baseColorTexture", {})
|
|
mr = pbr.get("metallicRoughnessTexture", {})
|
|
nor = mat.get("normalTexture", {})
|
|
|
|
if "index" in base and base["index"] in tex_to_uri:
|
|
mark(tex_to_uri[base["index"]], "albedo")
|
|
if "index" in mr and mr["index"] in tex_to_uri:
|
|
mark(tex_to_uri[mr["index"]], "mr")
|
|
if "index" in nor and nor["index"] in tex_to_uri:
|
|
mark(tex_to_uri[nor["index"]], "normal")
|
|
|
|
return roles
|
|
|
|
def has_meaningful_alpha(img_path: Path) -> bool:
|
|
if not PIL_OK:
|
|
return False
|
|
try:
|
|
with Image.open(img_path) as im:
|
|
if ("A" in im.getbands()) or ("transparency" in im.info):
|
|
im = im.convert("RGBA")
|
|
alpha = im.getchannel("A")
|
|
extrema = alpha.getextrema()
|
|
return bool(extrema and extrema != (255, 255))
|
|
return False
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
def decide_targets(role, albedo_target, img_path):
|
|
""" return transcode target(BCn), OETF(srgb/linear)"""
|
|
if role == "normal":
|
|
return "bc5", "linear"
|
|
if role == "mr":
|
|
return "bc7", "linear"
|
|
# albedo
|
|
if albedo_target == "auto":
|
|
if has_meaningful_alpha(img_path):
|
|
return "bc3", "srgb"
|
|
else:
|
|
return "bc1", "srgb"
|
|
elif albedo_target in ("bc1", "bc3", "bc7"):
|
|
return albedo_target, "srgb"
|
|
else:
|
|
return "bc7", "srgb"
|
|
|
|
def run_cmd(args_list, dry_run=False):
|
|
cmd = " ".join(shlex.quote(a) for a in args_list)
|
|
if dry_run:
|
|
print(f"[DRY] {cmd}")
|
|
return 0
|
|
try:
|
|
subprocess.run(args_list, check=True)
|
|
return 0
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[ERR] {cmd}\n -> exit {e.returncode}", file=sys.stderr)
|
|
return e.returncode
|
|
|
|
def process_one(img_path: Path, out_dir: Path, role, opts):
|
|
stem = img_path.stem
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
tmp_dir = out_dir / ".intermediate"
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
tmp_ktx2 = tmp_dir / f"{stem}.uastc.ktx2"
|
|
|
|
# 1) PNG -> KTX2(UASTC)
|
|
target_bc, oetf = decide_targets(role, opts.albedo_target, img_path)
|
|
toktx = [
|
|
"toktx",
|
|
"--t2",
|
|
"--encode", "uastc",
|
|
"--uastc_quality", str(opts.uastc_quality),
|
|
]
|
|
if role == "normal":
|
|
toktx += ["--normal_mode", "--normalize"]
|
|
if opts.mipmaps:
|
|
toktx += ["--genmipmap"]
|
|
if opts.flip_y:
|
|
toktx += ["--lower_left_maps_to_s0t0"]
|
|
# albedo: srgb, else linear
|
|
toktx += ["--assign_oetf", oetf]
|
|
toktx += [str(tmp_ktx2), str(img_path)]
|
|
rc = run_cmd(toktx, dry_run=opts.dry_run)
|
|
if rc != 0: return rc
|
|
|
|
# 2) UASTC KTX2 -> BCn KTX2
|
|
out_ktx2 = out_dir / f"{stem}.ktx2"
|
|
ktx_trans = [
|
|
"ktx", "transcode",
|
|
"--target", target_bc,
|
|
str(tmp_ktx2), str(out_ktx2)
|
|
]
|
|
rc = run_cmd(ktx_trans, dry_run=opts.dry_run)
|
|
if rc != 0: return rc
|
|
|
|
if not opts.keep_temp and not opts.dry_run:
|
|
try:
|
|
tmp_ktx2.unlink()
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser(description="Image → KTX2(BCn) encoder (toktx + ktx transcode)")
|
|
p.add_argument("-i", "--input", required=True, help="Input folder(recursive) or image file")
|
|
p.add_argument("-o", "--output", required=True, help="Output folder")
|
|
p.add_argument("--gltf", help=".gltf File path (optional). glTF first, suffix last")
|
|
p.add_argument("--suffix-albedo", default=",".join(DEFAULT_SUFFIX["albedo"]),
|
|
help="albedo suffix CSV (Base: %s)" % ",".join(DEFAULT_SUFFIX["albedo"]))
|
|
p.add_argument("--suffix-mr", default=",".join(DEFAULT_SUFFIX["mr"]))
|
|
p.add_argument("--suffix-normal", default=",".join(DEFAULT_SUFFIX["normal"]))
|
|
p.add_argument("--albedo-target", choices=["auto", "bc1", "bc3", "bc7"], default="bc7",
|
|
help="albedo BC format(auto=non alpha BC1, alpha BC3)")
|
|
p.add_argument("--uastc-quality", type=int, default=2, help="UASTC quality(0~4)")
|
|
p.add_argument("--mipmaps", action="store_true", help="mipmap generation")
|
|
p.add_argument("--flip-y", action="store_true", help="Y flip(t0 to bottom)")
|
|
p.add_argument("--keep-temp", action="store_true", help="Preserve temporal UASTC")
|
|
p.add_argument("-j", "--jobs", type=int, default=os.cpu_count() or 4, help="Concurrent task size")
|
|
p.add_argument("--dry-run", action="store_true", help="Don't execute")
|
|
opts = p.parse_args()
|
|
|
|
which_or_die("toktx")
|
|
which_or_die("ktx")
|
|
|
|
rx = {
|
|
"albedo": build_suffix_regex([s.strip() for s in opts.suffix_albedo.split(",") if s.strip()]),
|
|
"mr": build_suffix_regex([s.strip() for s in opts.suffix_mr.split(",") if s.strip()]),
|
|
"normal": build_suffix_regex([s.strip() for s in opts.suffix_normal.split(",") if s.strip()]),
|
|
}
|
|
|
|
gltf_roles = {}
|
|
if opts.gltf:
|
|
gltf_roles = parse_gltf_roles(Path(opts.gltf))
|
|
|
|
in_path = Path(opts.input)
|
|
img_files = []
|
|
if in_path.is_file() and in_path.suffix.lower() in SUPPORTED_IMAGE_EXTS:
|
|
img_files = [in_path]
|
|
else:
|
|
for ext in SUPPORTED_IMAGE_EXTS:
|
|
img_files.extend(in_path.rglob(f"*{ext}"))
|
|
if not img_files:
|
|
sys.exit("No input images (supported: .png .jpg .jpeg .tga .tif .tiff).")
|
|
|
|
out_dir = Path(opts.output)
|
|
|
|
def decide_role_for_path(p: Path):
|
|
if gltf_roles:
|
|
for uri, role in gltf_roles.items():
|
|
if Path(uri).name.lower() == p.name.lower():
|
|
return role
|
|
|
|
by_suffix = detect_role_by_suffix(p.stem, rx)
|
|
return by_suffix or "albedo"
|
|
|
|
tasks = []
|
|
with ThreadPoolExecutor(max_workers=opts.jobs) as ex:
|
|
futs = {}
|
|
for img in img_files:
|
|
role = decide_role_for_path(img)
|
|
fut = ex.submit(process_one, img, out_dir, role, opts)
|
|
futs[fut] = (img, role)
|
|
any_err = False
|
|
for fut in as_completed(futs):
|
|
img, role = futs[fut]
|
|
rc = fut.result()
|
|
status = "OK" if rc == 0 else f"ERR({rc})"
|
|
print(f"[{status}] {img.name} -> role={role}")
|
|
if rc != 0:
|
|
any_err = True
|
|
if any_err:
|
|
sys.exit(2)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|