from __future__ import annotations

import hashlib
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Protocol

from services.ai.apimart_image_client import APIMartRequestError
from services.ai.openai_image_client import OpenAIRequestError
from services.ai.qwen_client import QwenPhotoAnalyzer, QwenRequestError
from services.ai.model_router import load_image_generator, load_visual_reviewer
from services.media_pipeline.photo_inputs import prepare_photo_inputs
from services.media_pipeline.png_transparency import ensure_png_alpha
from services.pet_builder.action_semantics import evaluate_action_semantics, score_candidate_frame
from services.pet_builder.action_atlas import (
    FRAMES_PER_ACTION,
    _read_rgba_png,
    _write_rgba_png,
    compose_action_frame_atlas,
    extract_action_strip_frames,
)
from services.pet_builder.layout_guides import build_action_strip_layout_guide_png
from services.pet_builder.qa_media import write_animation_previews, write_contact_sheet_png
from services.pet_builder.size_proportion_qa import evaluate_size_proportions
from packages.pet_package_schema import ALLOWED_ACTIONS

FORCED_PER_FRAME_ACTIONS = set(ALLOWED_ACTIONS)
PREVIOUS_FRAME_REFERENCE_ACTIONS: set[str] = {"sleep"}
PROVIDER_PREVIOUS_FRAME_COPY_ACTIONS = {"idle", "sleep", "look", "sit", "tail_wag"}
ACTION_FRAME_CHROMA_KEY = (0, 0, 255)
ACTION_FRAME_CHROMA_THRESHOLD = 180.0
WALK_GENERATED_FRAME_COUNT = 2
WALK_LOOP_MODE = "two_frame_copy"
MAX_ACTION_STRIP_ATTEMPTS = 3
ACTION_STRIP_ATTEMPT_LIMITS = {
    "walk": 1,
}


class PhotoAnalyzer(Protocol):
    def analyze_pet_photos(self, *, pet_name: str, notes: str, images: list[dict[str, Any]]) -> dict[str, Any]:
        ...


class ImageGenerator(Protocol):
    def generate_canonical_pet(
        self,
        *,
        pet_name: str,
        notes: str,
        analysis: dict[str, Any],
        images: list[dict[str, Any]],
        size: str = "1024*1024",
    ) -> dict[str, Any]:
        ...

    def generate_action_frame(
        self,
        *,
        action: str,
        frame_index: int,
        pet_name: str,
        notes: str,
        analysis: dict[str, Any],
        images: list[dict[str, Any]],
        canonical_image_bytes: bytes,
        previous_frame_image_bytes: bytes | None = None,
        candidate_index: int = 0,
        size: str = "1024*1024",
    ) -> dict[str, Any]:
        ...


class ActionReviewer(Protocol):
    def review_action_frames(
        self,
        *,
        pet_name: str,
        action: str,
        image_bytes: bytes,
        image_mime: str,
    ) -> dict[str, Any]:
        ...

    def review_contact_sheet(
        self,
        *,
        pet_name: str,
        image_bytes: bytes,
        image_mime: str,
    ) -> dict[str, Any]:
        ...


class LocalPetPhotoAnalyzer:
    model = "local_photo_analysis_v1"
    provider = "local"

    def analyze_pet_photos(self, *, pet_name: str, notes: str, images: list[dict[str, Any]]) -> dict[str, Any]:
        species = _infer_species_from_text(f"{pet_name} {notes}")
        return {
            "provider": self.provider,
            "species": species,
            "base_color": "unknown",
            "accent_color": "unknown",
            "eye_color": "unknown",
            "body_shape": "unknown",
            "distinctive_marks": [],
            "confidence": 0.5,
            "safe_for_generation": True,
            "user_visible_summary": "本地素材元数据检查通过；外貌还原以授权原图和 canonical 主形象为准。",
        }


def build_pet_package_from_photos(
    *,
    pet_name: str,
    notes: str,
    photo_paths: list[str | Path],
    output_root: str | Path,
    analyzer: PhotoAnalyzer | None = None,
    image_generator: ImageGenerator | None = None,
    action_reviewer: ActionReviewer | None = None,
    candidates_per_frame: int = 1,
    stop_after_first_acceptable_candidate: bool | None = None,
    resume_build_dir: str | Path | None = None,
    action_frame_generation_mode: str | None = None,
) -> Path:
    if not pet_name.strip():
        raise ValueError("pet_name is required")
    if not photo_paths:
        raise ValueError("at least one photo is required")

    image_generator_was_provided = image_generator is not None
    analyzer_was_provided = analyzer is not None
    image_generator = image_generator or load_image_generator()
    provider_slug = _slugify(str(getattr(image_generator, "provider", "qwen")) or "qwen")
    default_visual_review_allowed = action_reviewer is None and _should_load_default_visual_reviewer(
        provider_slug=provider_slug,
        image_generator_was_provided=image_generator_was_provided,
        analyzer_was_provided=analyzer_was_provided,
    )
    analyzer = analyzer or _default_photo_analyzer(provider_slug)
    if default_visual_review_allowed:
        action_reviewer = load_visual_reviewer(provider_slug=provider_slug)
    final_visual_review_required = action_reviewer is not None
    if stop_after_first_acceptable_candidate is None:
        stop_after_first_acceptable_candidate = provider_slug in {"apimart", "apimart-gpt-image-2", "gpt-image-2"}
    action_frame_generation_mode = _normalize_action_frame_generation_mode(
        action_frame_generation_mode or os.environ.get("PET_ACTION_FRAME_GENERATION_MODE", "provider")
    )
    force_deterministic_action_frames = action_frame_generation_mode == "deterministic"
    if force_deterministic_action_frames:
        raise ValueError(
            "deterministic action frame mode is disabled for user-facing builds because action semantics cannot be verified"
        )
    images = prepare_photo_inputs(photo_paths)
    analysis = analyzer.analyze_pet_photos(pet_name=pet_name.strip(), notes=notes.strip(), images=images)
    if analysis.get("safe_for_generation") is False:
        raise ValueError("photo analysis marked this build unsafe for generation")

    pet_id = _slugify(pet_name)
    resume_build_path = Path(resume_build_dir) if resume_build_dir is not None else None
    build_id = resume_build_path.name if resume_build_path else f"build-{provider_slug}-{pet_id}-{_utc_stamp()}"
    package_id = f"package-{provider_slug}-{pet_id}-{_utc_stamp()}"
    build_dir = resume_build_path or (Path(output_root) / build_id)
    qa_dir = build_dir / "qa"
    candidates_per_frame = max(1, min(int(candidates_per_frame), 3))
    action_frame_dir = qa_dir / "action-frames"
    candidate_dir = qa_dir / "action-frame-candidates"
    raw_candidate_dir = qa_dir / "action-frame-raw-candidates"
    action_review_dir = qa_dir / "action-reviews"
    action_contact_dir = qa_dir / "action-contact-sheets"
    layout_guide_dir = qa_dir / "layout-guides"
    action_strip_dir = qa_dir / "action-strips"
    action_frame_dir.mkdir(parents=True, exist_ok=True)
    candidate_dir.mkdir(parents=True, exist_ok=True)
    raw_candidate_dir.mkdir(parents=True, exist_ok=True)
    action_review_dir.mkdir(parents=True, exist_ok=True)
    action_contact_dir.mkdir(parents=True, exist_ok=True)
    layout_guide_dir.mkdir(parents=True, exist_ok=True)
    _write_json(
        build_dir / "pet_request.json",
        _build_pet_request(
            pet_id=pet_id,
            pet_name=pet_name.strip(),
            image_refs=images,
        ),
    )

    canonical_reference_path = qa_dir / "canonical-reference.png"
    if resume_build_path and canonical_reference_path.exists():
        canonical_reference_bytes = canonical_reference_path.read_bytes()
        canonical_generation = _resumed_generation_payload(
            action="canonical",
            frame_index=None,
            provider=provider_slug,
            model=str(getattr(image_generator, "model", "")),
        )
        canonical_generation["image_bytes"] = canonical_reference_bytes
        canonical_generation["image_mime"] = "image/png"
        canonical_png_path = qa_dir / "canonical.png"
        if canonical_png_path.exists():
            canonical_bytes = canonical_png_path.read_bytes()
            canonical_transparency = {
                "method": "resumed_existing_alpha_png",
                "alpha_capable": True,
                "source": "qa/canonical.png",
            }
        else:
            canonical_bytes, canonical_transparency = ensure_png_alpha(canonical_reference_bytes)
            canonical_png_path.write_bytes(canonical_bytes)
    else:
        canonical_generation = image_generator.generate_canonical_pet(
            pet_name=pet_name.strip(),
            notes=notes.strip(),
            analysis=analysis,
            images=images,
            size="1024*1024",
        )
        canonical_reference_bytes = canonical_generation.get("image_bytes")
        if not isinstance(canonical_reference_bytes, bytes) or not canonical_reference_bytes:
            raise ValueError("canonical image generation did not return image bytes")
        canonical_reference_path.write_bytes(canonical_reference_bytes)
        canonical_bytes, canonical_transparency = ensure_png_alpha(canonical_reference_bytes)
        (qa_dir / "canonical.png").write_bytes(canonical_bytes)

    action_generations: dict[str, list[dict[str, Any]]] = {}
    action_candidate_generations: dict[str, list[list[dict[str, Any]]]] = {}
    action_candidate_pools: dict[str, dict[int, list[dict[str, Any]]]] = {}
    action_frames: dict[str, list[bytes]] = {}
    action_transparency: dict[str, list[dict[str, Any]]] = {}
    candidate_failures: list[dict[str, Any]] = []
    strip_fallback_actions: list[str] = []
    semantic_repair_actions: list[str] = []
    final_semantic_repair_actions: list[str] = []
    final_visual_repair_actions: list[str] = []
    size_proportion_repair_actions: list[str] = []
    forced_per_frame_actions: list[str] = []
    canonical_only_last_resort_frames = 0
    minimal_last_resort_frames = 0
    deterministic_action_fallback_frames = 0
    provider_previous_frame_copy_fallback_frames = 0
    local_sleep_variant_frames = 0
    concurrent_action_frame_batches = 0
    concurrent_action_frame_jobs = 0
    prefetched_action_frame_results: dict[tuple[str, int, int], dict[str, Any] | Exception] = {}
    generation_strategy = "canonical_plus_per_action_frames"
    resume_regenerate_actions: set[str] = set()

    def record_candidate_failure(failure: dict[str, Any]) -> None:
        candidate_failures.append(failure)
        _write_json(qa_dir / "candidate-failures.json", {"failures": candidate_failures})

    def stop_if_provider_request_blocked(
        *,
        action: str,
        frame_index: int,
        candidate_index: int | None,
        exc: Exception,
    ) -> None:
        if not _provider_request_blocked(exc):
            return
        record_candidate_failure(
            {
                "action": action,
                "frame_index": frame_index,
                "candidate_index": candidate_index,
                "reason": "provider_request_blocked",
                "provider_error": str(exc)[:500],
            }
        )
        raise ValueError(f"image provider unavailable during {action} frame {frame_index}") from exc

    def process_frame_candidate(
        *,
        action: str,
        frame_index: int,
        candidate_index: int,
        action_generation: dict[str, Any],
    ) -> dict[str, Any] | None:
        action_bytes = action_generation.get("image_bytes")
        if not isinstance(action_bytes, bytes) or not action_bytes:
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": frame_index,
                    "candidate_index": candidate_index,
                    "reason": "empty_image_bytes",
                }
            )
            return None
        (raw_candidate_dir / f"{action}-{frame_index}-{candidate_index}.png").write_bytes(action_bytes)
        action_bytes, frame_transparency = ensure_png_alpha(
            action_bytes,
            chroma_key=ACTION_FRAME_CHROMA_KEY,
            chroma_threshold=ACTION_FRAME_CHROMA_THRESHOLD,
        )
        transparency_rejection = _transparency_rejection_reason(frame_transparency)
        if transparency_rejection == "chroma_key_background_missing":
            action_bytes, frame_transparency = ensure_png_alpha(action_generation["image_bytes"])
            transparency_rejection = _transparency_rejection_reason(frame_transparency)
        candidate_path = candidate_dir / f"{action}-{frame_index}-{candidate_index}.png"
        candidate_path.write_bytes(action_bytes)
        if transparency_rejection:
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": frame_index,
                    "candidate_index": candidate_index,
                    "reason": transparency_rejection,
                    "transparency": _safe_transparency_report(frame_transparency),
                }
            )
            return None
        candidate_report = compose_action_frame_atlas(
            {action: [action_bytes]},
            allow_stable_pet_parts=True,
        ).qa["rows"][
            ALLOWED_ACTIONS.index(action)
        ]["frames"]
        frame_report = candidate_report[0] if candidate_report else {"source_bbox": None, "foreground_pixels": 0}
        rejection_reason = _frame_rejection_reason(frame_report)
        if rejection_reason:
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": frame_index,
                    "candidate_index": candidate_index,
                    "reason": rejection_reason,
                    "frame_report": _safe_frame_report(frame_report),
                }
            )
            return None
        base_score = score_candidate_frame(
            action=action,
            frame_report=frame_report,
            candidate_index=candidate_index,
        )
        visual_detail_score = _candidate_visual_detail_score(action_bytes)
        return {
            "generation": action_generation,
            "image_bytes": action_bytes,
            "transparency": frame_transparency,
            "candidate_index": candidate_index,
            "frame_report": frame_report,
            "score": base_score + visual_detail_score,
            "base_score": base_score,
            "visual_detail_score": visual_detail_score,
        }

    def deterministic_action_generation(
        *,
        action: str,
        frame_index: int,
        candidate_index: int,
        source_reason: str,
    ) -> dict[str, Any]:
        nonlocal deterministic_action_fallback_frames, generation_strategy
        deterministic_action_fallback_frames += 1
        generation_strategy = (
            "canonical_plus_deterministic_action_frames"
            if force_deterministic_action_frames
            else "canonical_plus_deterministic_action_fallback"
        )
        return {
            "image_bytes": _deterministic_motion_frame_from_canonical(
                canonical_bytes,
                action=action,
                frame_index=frame_index,
            ),
            "image_mime": "image/png",
            "provider": provider_slug,
            "request_id": f"deterministic-fallback:{action}:{frame_index}",
            "model": f"{getattr(image_generator, 'model', 'unknown')}:canonical-motion-fallback",
            "usage": {
                "source": "canonical_reference",
                "external_image_count": 0,
                "fallback_reason": source_reason[:120],
            },
            "prompt_sha256": str(canonical_generation.get("prompt_sha256", "")),
            "fallback": "deterministic_canonical_motion",
        }

    def local_sleep_variant_generation(
        *,
        frame_index: int,
        candidate_index: int,
        source_frame_image_bytes: bytes,
    ) -> dict[str, Any]:
        nonlocal local_sleep_variant_frames
        local_sleep_variant_frames += 1
        return {
            "image_bytes": _direction_locked_sleep_variant_frame(
                source_frame_image_bytes,
                frame_index=frame_index,
            ),
            "image_mime": "image/png",
            "provider": provider_slug,
            "request_id": f"local-sleep-direction-variant:{frame_index}",
            "model": f"{getattr(image_generator, 'model', 'unknown')}:local-sleep-direction-lock",
            "usage": {
                "source": "provider_first_sleep_pose",
                "source_frame_index": 0,
                "external_image_count": 0,
                "source_mode": "local_sleep_direction_locked_variant",
            },
            "prompt_sha256": str(canonical_generation.get("prompt_sha256", "")),
            "source_mode": "local_sleep_direction_locked_variant",
            "fallback": "local_sleep_direction_locked_variant",
        }

    def use_deterministic_action_fallback(
        *,
        action: str,
        frame_index: int,
        candidate_index: int,
        reason: str,
    ) -> dict[str, Any] | None:
        record_candidate_failure(
            {
                "action": action,
                "frame_index": frame_index,
                "candidate_index": candidate_index,
                "reason": "provider_action_frame_unavailable",
                "provider_error": reason[:220],
            }
        )
        return process_frame_candidate(
            action=action,
            frame_index=frame_index,
            candidate_index=candidate_index,
            action_generation=deterministic_action_generation(
                action=action,
                frame_index=frame_index,
                candidate_index=candidate_index,
                source_reason=reason,
            ),
        )

    def use_previous_frame_copy_fallback(
        *,
        action: str,
        frame_index: int,
        candidate_index: int,
        reason: str,
    ) -> dict[str, Any] | None:
        nonlocal provider_previous_frame_copy_fallback_frames
        if action not in PROVIDER_PREVIOUS_FRAME_COPY_ACTIONS:
            return None
        previous_frames = action_frames.get(action) or []
        previous_generations = action_generations.get(action) or []
        if not previous_frames or not previous_generations:
            return None
        source_frame_index = len(previous_frames) - 1
        source_generation = dict(previous_generations[source_frame_index])
        source_usage = source_generation.get("usage") if isinstance(source_generation.get("usage"), dict) else {}
        provider_previous_frame_copy_fallback_frames += 1
        record_candidate_failure(
            {
                "action": action,
                "frame_index": frame_index,
                "candidate_index": candidate_index,
                "reason": "provider_previous_frame_copy_fallback",
                "source_frame_index": source_frame_index,
                "provider_error": reason[:220],
            }
        )
        fallback_generation = {
            **source_generation,
            "image_bytes": previous_frames[source_frame_index],
            "image_mime": "image/png",
            "request_id": f"{source_generation.get('request_id', 'provider')}:{action}:{frame_index}:copy-fallback",
            "usage": {
                **source_usage,
                "source": "previous_same_action_frame",
                "source_frame_index": source_frame_index,
                "fallback_reason": reason[:120],
            },
            "fallback": "provider_previous_frame_copy",
        }
        return process_frame_candidate(
            action=action,
            frame_index=frame_index,
            candidate_index=candidate_index,
            action_generation=fallback_generation,
        )

    strip_generator = getattr(image_generator, "generate_action_strip", None)
    use_action_strips = callable(strip_generator) and _should_use_action_strips_by_default(provider_slug)
    if not use_action_strips:
        forced_per_frame_actions = list(ALLOWED_ACTIONS)

    def generate_action_strip_frames(action: str, *, layout_guide_image_bytes: bytes, strip_candidate_index: int = 0) -> bool:
        nonlocal generation_strategy
        if not callable(strip_generator):
            return False
        generation_strategy = "canonical_plus_per_action_strips"
        try:
            strip_kwargs = {
                "action": action,
                "pet_name": pet_name.strip(),
                "notes": notes.strip(),
                "analysis": analysis,
                "images": [],
                "canonical_image_bytes": canonical_reference_bytes,
                "layout_guide_image_bytes": layout_guide_image_bytes,
                "size": "1280*720",
            }
            if strip_candidate_index:
                strip_kwargs["candidate_index"] = strip_candidate_index
            try:
                strip_generation = strip_generator(**strip_kwargs)
            except TypeError:
                strip_kwargs.pop("candidate_index", None)
                strip_generation = strip_generator(**strip_kwargs)
        except Exception as exc:
            stop_if_provider_request_blocked(
                action=action,
                frame_index=-1,
                candidate_index=None,
                exc=exc,
            )
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": None,
                    "candidate_index": None,
                    "reason": "action_strip_generation_failed",
                    "provider_error": str(exc)[:220],
                }
            )
            if action not in strip_fallback_actions:
                strip_fallback_actions.append(action)
            return False

        strip_bytes = strip_generation.get("image_bytes")
        if not isinstance(strip_bytes, bytes) or not strip_bytes:
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": None,
                    "candidate_index": None,
                    "reason": "action_strip_empty_image_bytes",
                }
            )
            if action not in strip_fallback_actions:
                strip_fallback_actions.append(action)
            return False
        action_strip_dir.mkdir(parents=True, exist_ok=True)
        (action_strip_dir / f"{action}-raw.png").write_bytes(strip_bytes)
        strip_bytes, strip_transparency = ensure_png_alpha(
            strip_bytes,
            chroma_key=ACTION_FRAME_CHROMA_KEY,
            chroma_threshold=ACTION_FRAME_CHROMA_THRESHOLD,
        )
        transparency_rejection = _transparency_rejection_reason(strip_transparency)
        if transparency_rejection == "chroma_key_background_missing":
            strip_bytes, strip_transparency = ensure_png_alpha(strip_generation["image_bytes"])
            transparency_rejection = _transparency_rejection_reason(strip_transparency)
        (action_strip_dir / f"{action}.png").write_bytes(strip_bytes)
        if transparency_rejection:
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": None,
                    "candidate_index": None,
                    "reason": transparency_rejection,
                    "transparency": _safe_transparency_report(strip_transparency),
                }
            )
            if action not in strip_fallback_actions:
                strip_fallback_actions.append(action)
            return False

        extracted = extract_action_strip_frames(action, strip_bytes, allow_stable_pet_parts=True)
        _write_json(action_strip_dir / f"{action}.json", {key: value for key, value in extracted.items() if key != "frames"})
        if not extracted.get("ok"):
            record_candidate_failure(
                {
                    "action": action,
                    "frame_index": None,
                    "candidate_index": None,
                    "reason": "action_strip_structure_invalid",
                    "strip_failure_reason": extracted.get("failure_reason"),
                }
            )
            if action not in strip_fallback_actions:
                strip_fallback_actions.append(action)
            return False

        action_generations[action] = []
        action_candidate_generations[action] = []
        action_frames[action] = []
        action_transparency[action] = []
        strip_request_id = str(strip_generation.get("request_id", ""))
        for frame_index, frame_bytes in enumerate(extracted["frames"]):
            frame_generation = dict(strip_generation)
            frame_generation["image_bytes"] = frame_bytes
            frame_generation["image_mime"] = "image/png"
            frame_generation["request_id"] = f"{strip_request_id}:frame-{frame_index}"
            frame_generation["strip_request_id"] = strip_request_id
            frame_generation["strip_frame_index"] = frame_index
            candidate = process_frame_candidate(
                action=action,
                frame_index=frame_index,
                candidate_index=0,
                action_generation=frame_generation,
            )
            if not candidate:
                action_generations[action] = []
                action_candidate_generations[action] = []
                action_frames[action] = []
                action_transparency[action] = []
                if action not in strip_fallback_actions:
                    strip_fallback_actions.append(action)
                return False
            selected_generation = dict(candidate["generation"])
            selected_generation["selected_candidate_index"] = candidate["candidate_index"]
            selected_generation["candidate_score"] = candidate["score"]
            selected_generation["base_score"] = candidate["base_score"]
            selected_generation["visual_detail_score"] = candidate["visual_detail_score"]
            action_generations[action].append(selected_generation)
            action_candidate_generations[action].append([selected_generation])
            action_frames[action].append(candidate["image_bytes"])
            action_transparency[action].append(candidate["transparency"])
            (action_frame_dir / f"{action}-{frame_index}.png").write_bytes(candidate["image_bytes"])
        return True

    def generate_per_frame_action(action: str, *, candidate_index_offset: int = 0) -> None:
        nonlocal canonical_only_last_resort_frames, minimal_last_resort_frames
        nonlocal concurrent_action_frame_batches, concurrent_action_frame_jobs
        action_generations[action] = []
        action_candidate_generations[action] = []
        action_frames[action] = []
        action_transparency[action] = []
        pooled_candidates = action_candidate_pools.setdefault(action, {})
        previous_frame_image_bytes: bytes | None = None
        generated_frame_count = WALK_GENERATED_FRAME_COUNT if action == "walk" else FRAMES_PER_ACTION
        concurrent_provider_results: dict[tuple[int, int], dict[str, Any] | Exception] = {}
        if (
            _should_generate_action_frames_concurrently(provider_slug, action)
            and not force_deterministic_action_frames
            and candidates_per_frame > 0
            and not any(key[0] == action for key in prefetched_action_frame_results)
        ):
            job_specs: list[tuple[int, int]] = []
            for frame_index in range(generated_frame_count):
                existing_frame_path = action_frame_dir / f"{action}-{frame_index}.png"
                if resume_build_path and action not in resume_regenerate_actions and existing_frame_path.exists():
                    continue
                for candidate_offset in range(candidates_per_frame):
                    job_specs.append((frame_index, candidate_index_offset + candidate_offset))
            if job_specs:
                concurrent_action_frame_batches += 1
                concurrent_action_frame_jobs += len(job_specs)
                max_workers = min(len(job_specs), _concurrent_action_frame_worker_count(provider_slug))
                with ThreadPoolExecutor(max_workers=max_workers) as executor:
                    futures = {
                        executor.submit(
                            image_generator.generate_action_frame,
                            action=action,
                            frame_index=frame_index,
                            pet_name=pet_name.strip(),
                            notes=notes.strip(),
                            analysis=analysis,
                            images=[],
                            canonical_image_bytes=canonical_reference_bytes,
                            previous_frame_image_bytes=None,
                            candidate_index=candidate_index,
                            size="1024*1024",
                        ): (frame_index, candidate_index)
                        for frame_index, candidate_index in job_specs
                    }
                    for future in as_completed(futures):
                        frame_index, candidate_index = futures[future]
                        try:
                            concurrent_provider_results[(frame_index, candidate_index)] = future.result()
                        except Exception as exc:
                            concurrent_provider_results[(frame_index, candidate_index)] = exc
        for frame_index in range(generated_frame_count):
            existing_frame_path = action_frame_dir / f"{action}-{frame_index}.png"
            if resume_build_path and action not in resume_regenerate_actions and existing_frame_path.exists():
                selected_bytes = existing_frame_path.read_bytes()
                selected_generation = _resumed_generation_payload(
                    action=action,
                    frame_index=frame_index,
                    provider=provider_slug,
                    model=str(getattr(image_generator, "model", "")),
                )
                action_generations[action].append(selected_generation)
                action_candidate_generations[action].append([selected_generation])
                action_frames[action].append(selected_bytes)
                action_transparency[action].append(
                    {
                        "method": "resumed_existing_alpha_png",
                        "alpha_capable": True,
                        "source": f"qa/action-frames/{action}-{frame_index}.png",
                    }
                )
                previous_frame_image_bytes = selected_bytes
                continue
            candidates = pooled_candidates.setdefault(frame_index, [])
            provider_fallback_used = False
            if (
                _should_use_local_sleep_direction_variants(provider_slug)
                and action == "sleep"
                and frame_index > 0
                and action_frames.get(action)
            ):
                candidate = process_frame_candidate(
                    action=action,
                    frame_index=frame_index,
                    candidate_index=candidate_index_offset,
                    action_generation=local_sleep_variant_generation(
                        frame_index=frame_index,
                        candidate_index=candidate_index_offset,
                        source_frame_image_bytes=action_frames[action][0],
                    ),
                )
                if candidate:
                    candidates.append(candidate)
                    provider_fallback_used = True
            if force_deterministic_action_frames:
                candidate = process_frame_candidate(
                    action=action,
                    frame_index=frame_index,
                    candidate_index=candidate_index_offset,
                    action_generation=deterministic_action_generation(
                        action=action,
                        frame_index=frame_index,
                        candidate_index=candidate_index_offset,
                        source_reason="action_frame_generation_mode:deterministic",
                    ),
                )
                if candidate:
                    candidates.append(candidate)
                    provider_fallback_used = True
            for candidate_offset in range(candidates_per_frame):
                if provider_fallback_used:
                    break
                candidate_index = candidate_index_offset + candidate_offset
                try:
                    prefetched_key = (action, frame_index, candidate_index)
                    local_prefetched_key = (frame_index, candidate_index)
                    if prefetched_key in prefetched_action_frame_results:
                        prefetched = prefetched_action_frame_results[prefetched_key]
                    elif local_prefetched_key in concurrent_provider_results:
                        prefetched = concurrent_provider_results[local_prefetched_key]
                    else:
                        prefetched = None
                    if isinstance(prefetched, Exception):
                        raise prefetched
                    if prefetched is not None:
                        action_generation = prefetched
                    else:
                        previous_reference = (
                            previous_frame_image_bytes if action in PREVIOUS_FRAME_REFERENCE_ACTIONS else None
                        )
                        action_generation = image_generator.generate_action_frame(
                            action=action,
                            frame_index=frame_index,
                            pet_name=pet_name.strip(),
                            notes=notes.strip(),
                            analysis=analysis,
                            images=[],
                            canonical_image_bytes=canonical_reference_bytes,
                            previous_frame_image_bytes=previous_reference,
                            candidate_index=candidate_index,
                            size="1024*1024",
                        )
                except Exception as exc:
                    if _provider_request_transient(exc):
                        record_candidate_failure(
                            {
                                "action": action,
                                "frame_index": frame_index,
                                "candidate_index": candidate_index,
                                "reason": "provider_action_frame_unavailable",
                                "provider_error": str(exc)[:220],
                            }
                        )
                        continue
                    stop_if_provider_request_blocked(
                        action=action,
                        frame_index=frame_index,
                        candidate_index=candidate_index,
                        exc=exc,
                    )
                    record_candidate_failure(
                        {
                            "action": action,
                            "frame_index": frame_index,
                            "candidate_index": candidate_index,
                            "reason": str(exc)[:220],
                        }
                    )
                    continue
                candidate = process_frame_candidate(
                    action=action,
                    frame_index=frame_index,
                    candidate_index=candidate_index,
                    action_generation=action_generation,
                )
                if candidate:
                    candidates.append(candidate)
                    if stop_after_first_acceptable_candidate:
                        break
            if not candidates:
                fallback_candidate_index = candidate_index_offset + candidates_per_frame
                try:
                    previous_reference = (
                        previous_frame_image_bytes if action in PREVIOUS_FRAME_REFERENCE_ACTIONS else None
                    )
                    action_generation = image_generator.generate_action_frame(
                        action=action,
                        frame_index=frame_index,
                        pet_name=pet_name.strip(),
                        notes=notes.strip(),
                        analysis=analysis,
                        images=[],
                        canonical_image_bytes=canonical_reference_bytes,
                        previous_frame_image_bytes=previous_reference,
                        candidate_index=fallback_candidate_index,
                        size="1024*1024",
                    )
                except Exception as exc:
                    if _provider_request_transient(exc):
                        record_candidate_failure(
                            {
                                "action": action,
                                "frame_index": frame_index,
                                "candidate_index": fallback_candidate_index,
                                "reason": "provider_action_frame_unavailable",
                                "provider_error": str(exc)[:220],
                            }
                        )
                    else:
                        stop_if_provider_request_blocked(
                            action=action,
                            frame_index=frame_index,
                            candidate_index=fallback_candidate_index,
                            exc=exc,
                        )
                        record_candidate_failure(
                            {
                                "action": action,
                                "frame_index": frame_index,
                                "candidate_index": fallback_candidate_index,
                                "reason": str(exc)[:220],
                            }
                        )
                    last_resort_candidate_index = fallback_candidate_index + 1
                    used_minimal_last_resort = False
                    action_generation = None
                    try:
                        previous_reference = (
                            previous_frame_image_bytes if action in PREVIOUS_FRAME_REFERENCE_ACTIONS else None
                        )
                        action_generation = image_generator.generate_action_frame(
                            action=action,
                            frame_index=frame_index,
                            pet_name=pet_name.strip(),
                            notes=notes.strip(),
                            analysis=analysis,
                            images=[],
                            canonical_image_bytes=canonical_reference_bytes,
                            previous_frame_image_bytes=previous_reference,
                            candidate_index=last_resort_candidate_index,
                            size="1024*1024",
                        )
                    except Exception as last_resort_exc:
                        if _provider_request_transient(last_resort_exc):
                            record_candidate_failure(
                                {
                                    "action": action,
                                    "frame_index": frame_index,
                                    "candidate_index": last_resort_candidate_index,
                                    "reason": "provider_action_frame_unavailable",
                                    "provider_error": str(last_resort_exc)[:220],
                                }
                            )
                        else:
                            stop_if_provider_request_blocked(
                                action=action,
                                frame_index=frame_index,
                                candidate_index=last_resort_candidate_index,
                                exc=last_resort_exc,
                            )
                            record_candidate_failure(
                                {
                                    "action": action,
                                    "frame_index": frame_index,
                                    "candidate_index": last_resort_candidate_index,
                                    "reason": str(last_resort_exc)[:220],
                                }
                            )
                        minimal_generator = getattr(image_generator, "generate_action_frame_minimal", None)
                        if callable(minimal_generator):
                            minimal_candidate_index = last_resort_candidate_index + 1
                            try:
                                action_generation = minimal_generator(
                                    action=action,
                                    frame_index=frame_index,
                                    pet_name=pet_name.strip(),
                                    notes=notes.strip(),
                                    analysis=analysis,
                                    canonical_image_bytes=canonical_reference_bytes,
                                    candidate_index=minimal_candidate_index,
                                    size="1024*1024",
                                )
                            except Exception as minimal_exc:
                                stop_if_provider_request_blocked(
                                    action=action,
                                    frame_index=frame_index,
                                    candidate_index=minimal_candidate_index,
                                    exc=minimal_exc,
                                )
                                record_candidate_failure(
                                    {
                                        "action": action,
                                        "frame_index": frame_index,
                                        "candidate_index": minimal_candidate_index,
                                        "reason": str(minimal_exc)[:220],
                                    }
                                )
                                copy_candidate = use_previous_frame_copy_fallback(
                                    action=action,
                                    frame_index=frame_index,
                                    candidate_index=minimal_candidate_index + 1,
                                    reason=str(minimal_exc),
                                )
                                if copy_candidate:
                                    candidates.append(copy_candidate)
                                else:
                                    record_candidate_failure(
                                        {
                                            "action": action,
                                            "frame_index": frame_index,
                                            "candidate_index": None,
                                            "reason": "no_acceptable_frame_candidate",
                                        }
                                    )
                                    raise ValueError(f"frame QA failed for {action} frame {frame_index}") from minimal_exc
                            else:
                                used_minimal_last_resort = True
                                last_resort_candidate_index = minimal_candidate_index
                        else:
                            copy_candidate = use_previous_frame_copy_fallback(
                                action=action,
                                frame_index=frame_index,
                                candidate_index=last_resort_candidate_index + 1,
                                reason=str(last_resort_exc),
                            )
                            if copy_candidate:
                                candidates.append(copy_candidate)
                            else:
                                record_candidate_failure(
                                    {
                                        "action": action,
                                        "frame_index": frame_index,
                                        "candidate_index": None,
                                        "reason": "no_acceptable_frame_candidate",
                                    }
                                )
                                raise ValueError(f"frame QA failed for {action} frame {frame_index}") from last_resort_exc
                    if action_generation:
                        candidate = process_frame_candidate(
                            action=action,
                            frame_index=frame_index,
                            candidate_index=last_resort_candidate_index,
                            action_generation=action_generation,
                        )
                        if candidate:
                            if used_minimal_last_resort:
                                minimal_last_resort_frames += 1
                            else:
                                canonical_only_last_resort_frames += 1
                            candidates.append(candidate)
                else:
                    candidate = process_frame_candidate(
                        action=action,
                        frame_index=frame_index,
                        candidate_index=fallback_candidate_index,
                        action_generation=action_generation,
                    )
                    if candidate:
                        candidates.append(candidate)
            if not candidates:
                copy_candidate = use_previous_frame_copy_fallback(
                    action=action,
                    frame_index=frame_index,
                    candidate_index=candidate_index_offset + candidates_per_frame + 3,
                    reason="no acceptable provider candidate",
                )
                if copy_candidate:
                    candidates.append(copy_candidate)
                else:
                    record_candidate_failure(
                        {
                            "action": action,
                            "frame_index": frame_index,
                            "candidate_index": None,
                            "reason": "no_acceptable_frame_candidate",
                        }
                    )
                    raise ValueError(f"frame QA failed for {action} frame {frame_index}")
            selected = max(candidates, key=lambda item: item["score"])
            selected_generation = dict(selected["generation"])
            if _should_use_local_sleep_direction_variants(provider_slug) and action == "sleep" and frame_index == 0:
                selected_generation["source_mode"] = "provider_first_sleep_pose"
                selected_generation["usage"] = {
                    **(
                        selected_generation.get("usage", {})
                        if isinstance(selected_generation.get("usage"), dict)
                        else {}
                    ),
                    "source_mode": "provider_first_sleep_pose",
                }
            selected_generation["selected_candidate_index"] = selected["candidate_index"]
            selected_generation["candidate_score"] = selected["score"]
            selected_generation["base_score"] = selected["base_score"]
            selected_generation["visual_detail_score"] = selected["visual_detail_score"]
            action_generations[action].append(selected_generation)
            action_candidate_generations[action].append([dict(item["generation"]) for item in candidates])
            action_frames[action].append(selected["image_bytes"])
            action_transparency[action].append(selected["transparency"])
            (action_frame_dir / f"{action}-{frame_index}.png").write_bytes(selected["image_bytes"])
            previous_frame_image_bytes = selected["image_bytes"]
        if action == "walk":
            for target_frame_index in range(WALK_GENERATED_FRAME_COUNT, FRAMES_PER_ACTION):
                source_frame_index = target_frame_index % WALK_GENERATED_FRAME_COUNT
                selected_generation = _copy_walk_generation_payload(
                    action_generations[action][source_frame_index],
                    source_frame_index=source_frame_index,
                    target_frame_index=target_frame_index,
                )
                selected_transparency = dict(action_transparency[action][source_frame_index])
                selected_transparency["source_mode"] = WALK_LOOP_MODE
                selected_transparency["derived_from_frame_index"] = source_frame_index
                selected_bytes = action_frames[action][source_frame_index]
                action_generations[action].append(selected_generation)
                action_candidate_generations[action].append([selected_generation])
                action_frames[action].append(selected_bytes)
                action_transparency[action].append(selected_transparency)
                (action_frame_dir / f"{action}-{target_frame_index}.png").write_bytes(selected_bytes)

    def review_generated_action(action: str, *, attempt: int) -> dict[str, Any]:
        action_atlas = compose_action_frame_atlas({action: action_frames[action]}, allow_stable_pet_parts=True)
        action_frame_qa = _single_action_frame_qa(action=action, frame_qa=action_atlas.qa)
        _annotate_selected_candidates(action_frame_qa, {action: action_generations[action]})
        _annotate_walk_loop_mode(action_frame_qa)
        action_semantic_qa = evaluate_action_semantics(action_frame_qa)
        action_style_qa = _evaluate_action_style_consistency(action_frames[action])
        visual_review = None
        visual_review_ok = True
        if (
            action_reviewer
            and bool(action_frame_qa.get("ok"))
            and bool(action_semantic_qa.get("ok"))
            and bool(action_style_qa.get("ok"))
        ):
            for review_attempt in range(2):
                try:
                    visual_review = action_reviewer.review_action_frames(
                        pet_name=pet_name.strip(),
                        action=action,
                        image_bytes=action_atlas.image_bytes,
                        image_mime="image/png",
                    )
                    visual_review_ok = _visual_review_passed(visual_review, action=action)
                    break
                except Exception as exc:
                    if review_attempt == 0:
                        continue
                    visual_review = {
                        "status": "error",
                        "provider": "qwen",
                        "review": {
                            "passed": False,
                            "notes": "action visual review failed before receiving a usable response",
                            "risks": [str(exc)[:220]],
                        },
                    }
                    visual_review_ok = False
        review = {
            "action": action,
            "attempt": attempt,
            "ok": bool(action_frame_qa.get("ok"))
            and bool(action_semantic_qa.get("ok"))
            and bool(action_style_qa.get("ok"))
            and visual_review_ok,
            "frame_qa": action_frame_qa,
            "semantic_qa": action_semantic_qa,
            "style_qa": action_style_qa,
            "visual_review": visual_review,
        }
        (action_contact_dir / f"{action}.png").write_bytes(action_atlas.image_bytes)
        _write_json(action_review_dir / f"{action}.json", review)
        return review

    def prefetch_independent_action_frames_for_first_pass() -> None:
        nonlocal concurrent_action_frame_batches, concurrent_action_frame_jobs
        if use_action_strips or force_deterministic_action_frames or candidates_per_frame <= 0:
            return
        job_specs: list[tuple[str, int, int]] = []
        for action in ALLOWED_ACTIONS:
            if not _should_generate_action_frames_concurrently(provider_slug, action):
                continue
            generated_frame_count = WALK_GENERATED_FRAME_COUNT if action == "walk" else FRAMES_PER_ACTION
            for frame_index in range(generated_frame_count):
                existing_frame_path = action_frame_dir / f"{action}-{frame_index}.png"
                if resume_build_path and action not in resume_regenerate_actions and existing_frame_path.exists():
                    continue
                for candidate_offset in range(candidates_per_frame):
                    candidate_index = candidate_offset
                    job_specs.append((action, frame_index, candidate_index))
        if not job_specs:
            return

        concurrent_action_frame_batches += 1
        concurrent_action_frame_jobs += len(job_specs)
        max_workers = min(len(job_specs), _concurrent_action_frame_worker_count(provider_slug))
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(
                    image_generator.generate_action_frame,
                    action=action,
                    frame_index=frame_index,
                    pet_name=pet_name.strip(),
                    notes=notes.strip(),
                    analysis=analysis,
                    images=[],
                    canonical_image_bytes=canonical_reference_bytes,
                    previous_frame_image_bytes=None,
                    candidate_index=candidate_index,
                    size="1024*1024",
                ): (action, frame_index, candidate_index)
                for action, frame_index, candidate_index in job_specs
            }
            for future in as_completed(futures):
                action, frame_index, candidate_index = futures[future]
                try:
                    prefetched_action_frame_results[(action, frame_index, candidate_index)] = future.result()
                except Exception as exc:
                    prefetched_action_frame_results[(action, frame_index, candidate_index)] = exc

    prefetch_independent_action_frames_for_first_pass()

    for action in ALLOWED_ACTIONS:
        layout_guide_image_bytes = build_action_strip_layout_guide_png(action=action)
        (layout_guide_dir / f"{action}.png").write_bytes(layout_guide_image_bytes)
        action_ok = False
        if use_action_strips:
            strip_attempt = 0
            max_strip_attempts = ACTION_STRIP_ATTEMPT_LIMITS.get(action, MAX_ACTION_STRIP_ATTEMPTS)
            while strip_attempt < max_strip_attempts and not action_ok:
                strip_generated = generate_action_strip_frames(
                    action,
                    layout_guide_image_bytes=layout_guide_image_bytes,
                    strip_candidate_index=strip_attempt,
                )
                if not strip_generated:
                    break
                action_review = review_generated_action(action, attempt=1)
                if action_review["ok"]:
                    action_ok = True
                    break
                else:
                    visual_rejected = _visual_review_rejected_generated_action(action_review, action=action)
                    failure_payload = {
                        "action": action,
                        "frame_index": None,
                        "candidate_index": None,
                        "reason": "action_visual_review_repair"
                        if visual_rejected
                        else "action_style_consistency_repair"
                        if not bool(action_review["style_qa"].get("ok"))
                        else "action_strip_weak_action_semantics",
                        "semantic_failure_reasons": action_review["semantic_qa"].get("failure_reasons", []),
                        "style_failure_reasons": action_review["style_qa"].get("failure_reasons", []),
                    }
                    visual_summary = _action_visual_review_failure_summary(action_review)
                    if visual_summary:
                        failure_payload.update(visual_summary)
                    record_candidate_failure(failure_payload)
                    if action not in semantic_repair_actions:
                        semantic_repair_actions.append(action)
                    action_candidate_pools.pop(action, None)
                    if resume_build_path:
                        resume_regenerate_actions.add(action)
                    strip_attempt += 1
            if not action_ok:
                if action not in forced_per_frame_actions:
                    forced_per_frame_actions.append(action)
        if action_ok:
            continue
        for attempt in range(2):
            generate_per_frame_action(action, candidate_index_offset=attempt * max(candidates_per_frame, 1))
            action_review = review_generated_action(action, attempt=attempt + 1)
            if action_review["ok"]:
                action_ok = True
                break
            visual_rejected = _visual_review_rejected_generated_action(action_review, action=action)
            failure_payload = {
                "action": action,
                "frame_index": None,
                "candidate_index": None,
                "reason": "action_visual_review_repair"
                if visual_rejected and attempt == 0
                else "action_visual_review_failed_after_repair"
                if visual_rejected
                else "action_style_consistency_repair"
                if not bool(action_review["style_qa"].get("ok")) and attempt == 0
                else "action_style_consistency_failed_after_repair"
                if not bool(action_review["style_qa"].get("ok"))
                else "action_semantics_repair"
                if attempt == 0
                else "action_semantics_failed_after_repair",
                "semantic_failure_reasons": action_review["semantic_qa"].get("failure_reasons", []),
                "style_failure_reasons": action_review["style_qa"].get("failure_reasons", []),
            }
            visual_summary = _action_visual_review_failure_summary(action_review)
            if visual_summary:
                failure_payload.update(visual_summary)
            record_candidate_failure(
                failure_payload
            )
            if attempt == 0:
                if (
                    not bool(action_review["frame_qa"].get("ok"))
                    or not bool(action_review["semantic_qa"].get("ok"))
                    or not bool(action_review["style_qa"].get("ok"))
                    or visual_rejected
                ):
                    action_candidate_pools.pop(action, None)
                    if resume_build_path:
                        resume_regenerate_actions.add(action)
                final_semantic_repair_actions.append(action)
        if not action_ok:
            raise ValueError(f"action review failed for {action}")
    if strip_fallback_actions and generation_strategy == "canonical_plus_per_action_strips":
        generation_strategy = "canonical_plus_action_strips_with_per_frame_fallback"
    if semantic_repair_actions and generation_strategy == "canonical_plus_per_action_strips":
        generation_strategy = "canonical_plus_action_strips_with_semantic_repair"
    spritesheet_filename = "spritesheet.png"
    spritesheet_path = build_dir / spritesheet_filename
    contact_sheet_path = qa_dir / "contact-sheet.png"
    checksum = ""
    contact_sheet_media: dict[str, Any] = {}
    final_visual_review = None
    review_contact_sheet = getattr(action_reviewer, "review_contact_sheet", None)
    if final_visual_review_required and not callable(review_contact_sheet):
        raise ValueError("final visual QA reviewer must support review_contact_sheet")
    max_final_visual_attempts = 4
    max_size_proportion_attempts = 3
    size_proportion_qa: dict[str, Any] = {}
    for size_attempt in range(max_size_proportion_attempts):
        for final_attempt in range(max_final_visual_attempts):
            atlas = compose_action_frame_atlas(action_frames, allow_stable_pet_parts=True)
            _annotate_selected_candidates(atlas.qa, action_generations)
            _annotate_walk_loop_mode(atlas.qa)
            semantic_qa = evaluate_action_semantics(atlas.qa)
            failed_final_actions = _failed_semantic_actions(semantic_qa)
            if failed_final_actions:
                for action in failed_final_actions:
                    record_candidate_failure(
                        {
                            "action": action,
                            "frame_index": None,
                            "candidate_index": None,
                            "reason": "final_action_semantics_failed",
                        }
                    )
                _write_json(qa_dir / "action-semantic-qa.json", semantic_qa)
                raise ValueError(f"final action semantic QA failed for {', '.join(failed_final_actions)}")

            spritesheet_path.write_bytes(atlas.image_bytes)
            checksum = hashlib.sha256(spritesheet_path.read_bytes()).hexdigest()
            contact_sheet_media = write_contact_sheet_png(
                spritesheet_bytes=atlas.image_bytes,
                output_path=contact_sheet_path,
            )
            if not callable(review_contact_sheet):
                break
            for review_attempt in range(2):
                try:
                    final_visual_review = review_contact_sheet(
                        pet_name=pet_name.strip(),
                        image_bytes=contact_sheet_path.read_bytes(),
                        image_mime="image/png",
                    )
                    break
                except Exception as exc:
                    if review_attempt == 0:
                        continue
                    final_visual_review = {
                        "status": "error",
                        "provider": "qwen",
                        "review": {
                            "passed": False,
                            "identity_ok": False,
                            "row_semantics_ok": False,
                            "direction_ok": False,
                            "row_distinct_ok": False,
                            "repair_actions": list(ALLOWED_ACTIONS),
                            "notes": "final contact sheet visual review failed before receiving a usable response",
                            "risks": [str(exc)[:220]],
                        },
                    }
            _write_json(qa_dir / "final-visual-review.json", final_visual_review)
            if _final_contact_sheet_visual_review_passed(final_visual_review):
                break

            review_payload = final_visual_review.get("review") if isinstance(final_visual_review, dict) else {}
            repair_actions = review_payload.get("repair_actions") if isinstance(review_payload, dict) else []
            normalized_repair_actions = [
                str(action)
                for action in repair_actions
                if str(action) in ALLOWED_ACTIONS
            ] if isinstance(repair_actions, list) else []
            if final_attempt < max_final_visual_attempts - 1 and normalized_repair_actions:
                record_candidate_failure(
                    {
                        "action": None,
                        "frame_index": None,
                        "candidate_index": None,
                        "reason": "final_contact_sheet_visual_qa_repair",
                        "repair_actions": normalized_repair_actions,
                        "repair_round": final_attempt + 1,
                    }
                )
                for repair_action in normalized_repair_actions:
                    if repair_action not in final_visual_repair_actions:
                        final_visual_repair_actions.append(repair_action)
                    action_candidate_pools.pop(repair_action, None)
                    generate_per_frame_action(repair_action, candidate_index_offset=(final_attempt + 1) * 10)
                    repaired_action_review = review_generated_action(repair_action, attempt=final_attempt + 3)
                    if not repaired_action_review["ok"]:
                        failure_payload = {
                            "action": repair_action,
                            "frame_index": None,
                            "candidate_index": None,
                            "reason": "final_visual_repair_action_failed",
                            "semantic_failure_reasons": repaired_action_review["semantic_qa"].get("failure_reasons", []),
                        }
                        visual_summary = _action_visual_review_failure_summary(repaired_action_review)
                        if visual_summary:
                            failure_payload.update(visual_summary)
                        record_candidate_failure(failure_payload)
                        raise ValueError(f"action review failed for {repair_action}")
                continue
            record_candidate_failure(
                {
                    "action": None,
                    "frame_index": None,
                    "candidate_index": None,
                    "reason": "final_contact_sheet_visual_qa_failed",
                    "repair_actions": normalized_repair_actions,
                }
            )
            raise ValueError("final contact sheet visual QA failed")

        size_proportion_qa = evaluate_size_proportions(atlas.qa)
        _write_json(qa_dir / "size-proportion-qa.json", size_proportion_qa)
        if size_proportion_qa.get("ok") is True:
            break
        size_repair_actions = _failed_size_proportion_actions(size_proportion_qa)
        if size_attempt < max_size_proportion_attempts - 1 and size_repair_actions:
            record_candidate_failure(
                {
                    "action": None,
                    "frame_index": None,
                    "candidate_index": None,
                    "reason": "size_proportion_qa_repair",
                    "repair_actions": size_repair_actions,
                    "repair_round": size_attempt + 1,
                    "failure_reasons": size_proportion_qa.get("failure_reasons", []),
                }
            )
            for repair_action in size_repair_actions:
                if repair_action not in size_proportion_repair_actions:
                    size_proportion_repair_actions.append(repair_action)
                action_candidate_pools.pop(repair_action, None)
                generate_per_frame_action(repair_action, candidate_index_offset=(size_attempt + 1) * 30)
                repaired_action_review = review_generated_action(repair_action, attempt=max_final_visual_attempts + size_attempt + 3)
                if not repaired_action_review["ok"]:
                    record_candidate_failure(
                        {
                            "action": repair_action,
                            "frame_index": None,
                            "candidate_index": None,
                            "reason": "size_proportion_repair_action_failed",
                            "semantic_failure_reasons": repaired_action_review["semantic_qa"].get("failure_reasons", []),
                        }
                    )
                    raise ValueError(f"action review failed for {repair_action}")
            generation_strategy = "canonical_plus_action_frames_with_size_repair"
            continue
        record_candidate_failure(
            {
                "action": None,
                "frame_index": None,
                "candidate_index": None,
                "reason": "size_proportion_qa_failed",
                "repair_actions": size_repair_actions,
                "failure_reasons": size_proportion_qa.get("failure_reasons", []),
            }
        )
        break
    preview_media = write_animation_previews(
        action_frames=action_frames,
        output_dir=qa_dir / "previews",
    )
    package_manifest = _write_install_package(
        build_dir=build_dir,
        pet_id=pet_id,
        pet_name=pet_name.strip(),
        spritesheet_bytes=atlas.image_bytes,
    )
    review = _build_review(frame_qa=atlas.qa, action_semantic_qa=semantic_qa)

    manifest = _build_manifest(
        pet_id=pet_id,
        pet_name=pet_name.strip(),
        build_id=build_id,
        package_id=package_id,
        checksum=checksum,
        spritesheet_filename=spritesheet_filename,
        image_refs=images,
        analyzer_model=getattr(analyzer, "model", None),
        image_generation_provider=str(
            canonical_generation.get("provider") or getattr(image_generator, "provider", "") or "unknown"
        ),
        image_model=str(canonical_generation.get("model") or getattr(image_generator, "model", "")),
        canonical_request_id=str(canonical_generation.get("request_id", "")),
        action_request_ids={
            action: [str(payload.get("request_id", "")) for payload in payloads]
            for action, payloads in action_generations.items()
        },
        action_candidate_request_ids={
            action: [
                [str(candidate.get("request_id", "")) for candidate in frame_candidates]
                for frame_candidates in action_candidate_generations[action]
            ]
            for action in ALLOWED_ACTIONS
        },
        generation_prompt_sha256=str(canonical_generation.get("prompt_sha256", "")),
        action_prompt_sha256s={
            action: [str(payload.get("prompt_sha256", "")) for payload in payloads]
            for action, payloads in action_generations.items()
        },
        generation_usage={
            "canonical": _safe_generation_usage(canonical_generation.get("usage", {})),
            "actions": {
                action: [_safe_generation_usage(payload.get("usage", {})) for payload in payloads]
                for action, payloads in action_generations.items()
            },
        },
        transparency_processing={
            "applied": False,
            "alpha_capable": True,
            "reason": "atlas_composed_rgba",
            "canonical": canonical_transparency,
            "actions": action_transparency,
        },
        frame_qa=atlas.qa,
        action_semantic_qa=semantic_qa,
        size_proportion_qa=size_proportion_qa,
        candidate_policy={
            "generation_mode": _generation_mode(generation_strategy),
            "candidates_per_frame": candidates_per_frame,
            "selection": "deterministic_action_semantic_score",
            "fallback_candidate_index": candidates_per_frame,
            "strip_fallback_actions": strip_fallback_actions,
            "semantic_repair_actions": semantic_repair_actions,
            "final_semantic_repair_actions": final_semantic_repair_actions,
            "final_visual_repair_actions": final_visual_repair_actions,
            "size_proportion_repair_actions": size_proportion_repair_actions,
            "forced_per_frame_actions": forced_per_frame_actions,
            "walk_loop_mode": WALK_LOOP_MODE,
            "walk_generated_frames": WALK_GENERATED_FRAME_COUNT,
            "previous_frame_reference_actions": sorted(PREVIOUS_FRAME_REFERENCE_ACTIONS),
            "canonical_only_last_resort_frames": canonical_only_last_resort_frames,
            "minimal_last_resort_frames": minimal_last_resort_frames,
            "deterministic_action_fallback_frames": deterministic_action_fallback_frames,
            "provider_previous_frame_copy_fallback_frames": provider_previous_frame_copy_fallback_frames,
            "local_sleep_variant_frames": local_sleep_variant_frames,
            "concurrent_action_frame_batches": concurrent_action_frame_batches,
            "concurrent_action_frame_jobs": concurrent_action_frame_jobs,
            "action_frame_generation_mode": action_frame_generation_mode,
            "stop_after_first_acceptable_candidate": bool(stop_after_first_acceptable_candidate),
            "resume_build_dir": str(resume_build_path) if resume_build_path else None,
            "candidate_failures": len(candidate_failures),
        },
        generation_strategy=generation_strategy,
        image_mime="image/png",
        analysis=analysis,
    )
    manifest["package_ref"] = "package/pet.json"
    manifest["source"]["hatch_pet_pipeline"] = {
        "reference": "hatch-pet",
        "job_manifest": "imagegen-jobs.json",
        "pet_request": "pet_request.json",
        "deterministic_review": "qa/review.json",
        "final_visual_review": "qa/final-visual-review.json" if final_visual_review else None,
        "contact_sheet_png": "qa/contact-sheet.png",
        "preview_dir": "qa/previews",
        "package_dir": "package",
        "package_manifest": package_manifest,
        "repair_mode": "smallest_failed_action",
        "visual_qa_required_before_user_approval": final_visual_review_required,
        "final_visual_review_required": final_visual_review_required,
        "final_visual_review_provider": final_visual_review.get("provider") if isinstance(final_visual_review, dict) else None,
        "final_visual_review_model": final_visual_review.get("model") if isinstance(final_visual_review, dict) else None,
        "contact_sheet_media": contact_sheet_media,
        "preview_media": preview_media,
    }
    validation = _build_validation(manifest)
    summary = _build_summary(manifest, analysis, canonical_generation, action_generations)

    _write_json(build_dir / "manifest.json", manifest)
    _write_json(build_dir / "validation.json", validation)
    _write_json(
        build_dir / "imagegen-jobs.json",
        _build_job_manifest(
            pet_id=pet_id,
            pet_name=pet_name.strip(),
            image_refs=images,
            canonical_generation=canonical_generation,
            action_generations=action_generations,
            strip_fallback_actions=strip_fallback_actions,
            semantic_repair_actions=semantic_repair_actions,
            final_semantic_repair_actions=final_semantic_repair_actions,
            size_proportion_repair_actions=size_proportion_repair_actions,
            forced_per_frame_actions=forced_per_frame_actions,
        ),
    )
    (qa_dir / "contact-sheet.html").write_text(_contact_sheet_html(manifest), encoding="utf-8")
    _write_json(qa_dir / "frame-qa.json", atlas.qa)
    _write_json(qa_dir / "action-semantic-qa.json", semantic_qa)
    _write_json(qa_dir / "candidate-failures.json", {"failures": candidate_failures})
    _write_json(qa_dir / "review.json", review)
    _write_json(qa_dir / "run-summary.json", summary)
    return build_dir


def _build_pet_request(
    *,
    pet_id: str,
    pet_name: str,
    image_refs: list[dict[str, Any]],
) -> dict[str, Any]:
    return {
        "pet_id": pet_id,
        "display_name": pet_name,
        "source_kind": "raw_pet_photo",
        "photo_count": len(image_refs),
        "states": list(ALLOWED_ACTIONS),
        "memory_text_stored": False,
        "raw_media_included": False,
        "input_media_refs": [
            {
                "sha256": image["sha256"],
                "mime": image.get("source_mime", image["mime"]),
                "prepared_mime": image["mime"],
                "converted": image.get("converted", False),
            }
            for image in image_refs
        ],
    }


def _default_photo_analyzer(provider_slug: str) -> PhotoAnalyzer:
    if provider_slug == "qwen":
        return QwenPhotoAnalyzer.from_env()
    return LocalPetPhotoAnalyzer()


def _should_load_default_visual_reviewer(
    *,
    provider_slug: str,
    image_generator_was_provided: bool,
    analyzer_was_provided: bool,
) -> bool:
    if provider_slug == "qwen" and not analyzer_was_provided:
        return True
    return not image_generator_was_provided


def _infer_species_from_text(value: str) -> str:
    clean = value.lower()
    if "猫" in clean or "cat" in clean:
        return "cat"
    if "狗" in clean or "dog" in clean:
        return "dog"
    return "pet"


def _build_job_manifest(
    *,
    pet_id: str,
    pet_name: str,
    image_refs: list[dict[str, Any]],
    canonical_generation: dict[str, Any],
    action_generations: dict[str, list[dict[str, Any]]],
    strip_fallback_actions: list[str],
    semantic_repair_actions: list[str],
    final_semantic_repair_actions: list[str],
    size_proportion_repair_actions: list[str],
    forced_per_frame_actions: list[str],
) -> dict[str, Any]:
    jobs = [
        {
            "id": "canonical",
            "kind": "canonical_base",
            "status": "complete",
            "depends_on": [],
            "output_path": "qa/canonical.png",
            "input_images": [
                {
                    "role": "raw_pet_photo",
                    "sha256": image["sha256"],
                    "mime": image.get("source_mime", image["mime"]),
                }
                for image in image_refs
            ],
            "request_id": canonical_generation.get("request_id"),
            "model": canonical_generation.get("model"),
            "completed_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        }
    ]
    for action in ALLOWED_ACTIONS:
        payloads = action_generations.get(action, [])
        source_modes = sorted({str(payload.get("source_mode") or "per_frame") for payload in payloads})
        jobs.append(
            {
                "id": action,
                "kind": "action_row",
                "status": "complete" if len(payloads) == FRAMES_PER_ACTION else "incomplete",
                "depends_on": ["canonical"],
                "output_path": f"qa/action-frames/{action}-*.png",
                "input_images": [
                    {"role": "canonical_identity_reference", "path": "qa/canonical-reference.png"},
                ],
                "layout_guide": f"qa/layout-guides/{action}.png",
                "source_modes": source_modes,
                "request_ids": [payload.get("request_id") for payload in payloads],
                "prompt_sha256s": [payload.get("prompt_sha256") for payload in payloads],
                "repair_reason": _job_repair_reason(
                    action=action,
                    strip_fallback_actions=strip_fallback_actions,
                    semantic_repair_actions=semantic_repair_actions,
                    final_semantic_repair_actions=final_semantic_repair_actions,
                    size_proportion_repair_actions=size_proportion_repair_actions,
                    forced_per_frame_actions=forced_per_frame_actions,
                ),
                "completed_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            }
        )
    return {
        "version": "hatch_pet_style_photo_worker.v1",
        "pet_id": pet_id,
        "display_name": pet_name,
        "job_count": len(jobs),
        "jobs": jobs,
        "privacy": {
            "raw_media_included": False,
            "memory_text_stored": False,
        },
    }


def _job_repair_reason(
    *,
    action: str,
    strip_fallback_actions: list[str],
    semantic_repair_actions: list[str],
    final_semantic_repair_actions: list[str],
    size_proportion_repair_actions: list[str],
    forced_per_frame_actions: list[str],
) -> str | None:
    if action in final_semantic_repair_actions:
        return "final_action_semantics_repair"
    if action in size_proportion_repair_actions:
        return "size_proportion_qa_repair"
    if action in semantic_repair_actions:
        return "action_strip_weak_action_semantics"
    if action in strip_fallback_actions:
        return "action_strip_structure_invalid"
    if action in forced_per_frame_actions:
        return "forced_per_frame"
    return None


def _generation_mode(generation_strategy: str) -> str:
    if generation_strategy == "canonical_plus_deterministic_action_frames":
        return "deterministic_motion"
    if generation_strategy == "canonical_plus_deterministic_action_fallback":
        return "deterministic_motion_fallback"
    if generation_strategy == "canonical_plus_per_action_strips":
        return "action_strip"
    if "action_strips" in generation_strategy:
        return "mixed"
    return "per_frame"


def _normalize_action_frame_generation_mode(value: str | None) -> str:
    normalized = str(value or "provider").strip().lower().replace("-", "_")
    if normalized in {"deterministic", "local", "canonical_motion", "canonical_only"}:
        return "deterministic"
    return "provider"


def _should_use_action_strips_by_default(provider_slug: str) -> bool:
    # OpenAI currently produces good single-frame edits but unreliable multi-slot
    # strips for this desktop-pet contract; skipping strips avoids slow
    # hatch-pet-style repair loops without weakening downstream QA gates.
    return provider_slug not in {"openai", "official-openai", "openai-gpt-image"}


def _should_use_local_sleep_direction_variants(provider_slug: str) -> bool:
    return provider_slug in {"openai", "official-openai", "openai-gpt-image"}


def _should_generate_action_frames_concurrently(provider_slug: str, action: str) -> bool:
    return provider_slug in {"openai", "official-openai", "openai-gpt-image"} and action != "sleep"


def _concurrent_action_frame_worker_count(provider_slug: str) -> int:
    if provider_slug in {"openai", "official-openai", "openai-gpt-image"}:
        return 3
    return 1


def _failed_semantic_actions(action_semantic_qa: dict[str, Any]) -> list[str]:
    failed: list[str] = []
    for row in action_semantic_qa.get("rows", []):
        action = str(row.get("action", ""))
        if action in ALLOWED_ACTIONS and row.get("ok") is not True:
            failed.append(action)
    return failed


def _failed_size_proportion_actions(size_proportion_qa: dict[str, Any]) -> list[str]:
    failed: list[str] = []
    for row in size_proportion_qa.get("rows", []):
        action = str(row.get("action", ""))
        if action in ALLOWED_ACTIONS and row.get("ok") is not True:
            failed.append(action)
    return failed


def _single_action_frame_qa(*, action: str, frame_qa: dict[str, Any]) -> dict[str, Any]:
    row = next((item for item in frame_qa.get("rows", []) if item.get("action") == action), None)
    if not isinstance(row, dict):
        return {
            "ok": False,
            "failure_reason": "missing_action_row",
            "cell_width": frame_qa.get("cell_width"),
            "cell_height": frame_qa.get("cell_height"),
            "frames_per_action": FRAMES_PER_ACTION,
            "action_count": 1,
            "selected_frame_count": 0,
            "rows": [{"action": action, "input_frames": 0, "selected_frames": 0, "frames": []}],
        }
    frames = [frame for frame in row.get("frames", []) if isinstance(frame, dict)]
    failure_reason = None
    for frame in frames:
        frame_reason = _frame_rejection_reason(frame)
        if frame_reason:
            failure_reason = frame_reason
            break
    if len(frames) != FRAMES_PER_ACTION or int(row.get("selected_frames") or 0) != FRAMES_PER_ACTION:
        failure_reason = failure_reason or "not_enough_frames"
    return {
        "ok": failure_reason is None,
        "failure_reason": failure_reason,
        "cell_width": frame_qa.get("cell_width"),
        "cell_height": frame_qa.get("cell_height"),
        "frames_per_action": FRAMES_PER_ACTION,
        "action_count": 1,
        "selected_frame_count": len(frames),
        "rows": [row],
    }


def _frame_rejection_reason(frame_report: dict[str, Any]) -> str | None:
    if not frame_report or not frame_report.get("source_bbox"):
        return "missing_frame_foreground"
    if frame_report.get("fragmented_foreground") is True:
        return "fragmented_frame_foreground"
    component_count = int(frame_report.get("component_count") or 0)
    if component_count > 12:
        return "too_many_frame_fragments"
    foreground_pixels = int(frame_report.get("foreground_pixels") or 0)
    if foreground_pixels < 800:
        return "weak_frame_foreground"
    fill_ratio = float(frame_report.get("bbox_fill_ratio") or 1.0)
    if foreground_pixels > 120_000 and fill_ratio > 0.95:
        return "opaque_background_not_removed"
    if fill_ratio < 0.22:
        return "sparse_frame_foreground"
    return None


def _transparency_rejection_reason(transparency: dict[str, Any]) -> str | None:
    if transparency.get("method") != "explicit_chroma_key":
        return None
    width = int(transparency.get("width") or 0)
    height = int(transparency.get("height") or 0)
    transparent_pixels = int(transparency.get("transparent_pixels") or 0)
    if width <= 0 or height <= 0:
        return "chroma_key_background_unverified"
    if transparent_pixels < int(width * height * 0.20):
        return "chroma_key_background_missing"
    chroma_residue_pixels = int(transparency.get("chroma_residue_pixels") or 0)
    if chroma_residue_pixels > max(5000, int(width * height * 0.005)):
        return "chroma_key_residue_remaining"
    return None


def _candidate_visual_detail_score(image_bytes: bytes) -> float:
    try:
        _width, _height, pixels = _read_rgba_png(image_bytes)
    except Exception:
        return 0.0

    sample_step = max(1, len(pixels) // 160_000)
    foreground: list[tuple[int, int, int]] = [
        (red, green, blue)
        for index, (red, green, blue, alpha) in enumerate(pixels)
        if index % sample_step == 0 and alpha > 0
    ]
    if len(foreground) < 800:
        return 0.0

    quantized = [(red // 16, green // 16, blue // 16) for red, green, blue in foreground]
    total = float(len(foreground))
    top_quantized_ratio = max(Counter(quantized).values()) / total
    exact_counter = Counter(foreground)
    top_exact_ratio = max(exact_counter.values()) / total
    unique_exact = len(exact_counter)

    score = 0.0
    if top_quantized_ratio > 0.35:
        score -= (top_quantized_ratio - 0.35) * 90.0
    elif top_quantized_ratio < 0.22:
        score += 12.0

    if top_exact_ratio > 0.08:
        score -= (top_exact_ratio - 0.08) * 120.0
    elif top_exact_ratio < 0.03:
        score += 10.0

    if unique_exact < 12_000:
        score -= (12_000 - unique_exact) / 600.0
    elif unique_exact > 24_000:
        score += min((unique_exact - 24_000) / 1_500.0, 12.0)

    return max(-50.0, min(35.0, score))


def _deterministic_motion_frame_from_canonical(canonical_image_bytes: bytes, *, action: str, frame_index: int) -> bytes:
    try:
        source_width, source_height, source_pixels = _read_rgba_png(canonical_image_bytes)
    except Exception:
        source_width, source_height, source_pixels = 1, 1, [(120, 120, 120, 255)]
    bbox = _foreground_bbox(source_pixels, source_width, source_height)
    if bbox is None:
        source_width, source_height, source_pixels = 1, 1, [(120, 120, 120, 255)]
        bbox = (0, 0, 0, 0)

    left, top, right, bottom = bbox
    crop_width = max(1, right - left + 1)
    crop_height = max(1, bottom - top + 1)
    crop = [
        source_pixels[(top + y) * source_width + left + x]
        for y in range(crop_height)
        for x in range(crop_width)
    ]
    canvas_width = 144
    canvas_height = 112
    target_width, target_height, x0, y0 = _deterministic_frame_geometry(action=action, frame_index=frame_index)
    canvas = [(0, 0, 0, 0)] * (canvas_width * canvas_height)
    for y in range(target_height):
        source_y = min(crop_height - 1, int(y * crop_height / max(1, target_height)))
        for x in range(target_width):
            source_x = min(crop_width - 1, int(x * crop_width / max(1, target_width)))
            red, green, blue, alpha = crop[source_y * crop_width + source_x]
            if alpha <= 0:
                continue
            dest_x = x0 + x
            dest_y = y0 + y
            if 0 <= dest_x < canvas_width and 0 <= dest_y < canvas_height:
                canvas[dest_y * canvas_width + dest_x] = (red, green, blue, alpha)
    return _write_rgba_png(canvas_width, canvas_height, canvas)


def _direction_locked_sleep_variant_frame(source_image_bytes: bytes, *, frame_index: int) -> bytes:
    try:
        source_width, source_height, source_pixels = _read_rgba_png(source_image_bytes)
    except Exception:
        return source_image_bytes
    bbox = _foreground_bbox(source_pixels, source_width, source_height)
    if bbox is None:
        return source_image_bytes

    left, top, right, bottom = bbox
    crop_width = max(1, right - left + 1)
    crop_height = max(1, bottom - top + 1)
    crop = [
        source_pixels[(top + y) * source_width + left + x]
        for y in range(crop_height)
        for x in range(crop_width)
    ]

    variant = frame_index % FRAMES_PER_ACTION
    scale_by_frame = [1.0, 0.94, 1.08, 0.98]
    vertical_breath_by_frame = [0, 2, -1, 1]
    scale = scale_by_frame[variant % len(scale_by_frame)]
    target_width = max(1, min(source_width, round(crop_width * scale)))
    target_height = max(1, min(source_height, round(crop_height * scale)))
    resized = _resize_nearest(crop, crop_width, crop_height, target_width, target_height)

    center_x = (left + right) / 2
    bottom_anchor = bottom
    target_left = round(center_x - target_width / 2)
    target_top = round(bottom_anchor - target_height + 1 + vertical_breath_by_frame[variant % len(vertical_breath_by_frame)])
    target_left = max(0, min(source_width - target_width, target_left))
    target_top = max(0, min(source_height - target_height, target_top))

    canvas = [(0, 0, 0, 0)] * (source_width * source_height)
    for y in range(target_height):
        for x in range(target_width):
            pixel = resized[y * target_width + x]
            if pixel[3] == 0:
                continue
            canvas[(target_top + y) * source_width + target_left + x] = pixel
    return _write_rgba_png(source_width, source_height, canvas)


def _resize_nearest(
    pixels: list[tuple[int, int, int, int]],
    width: int,
    height: int,
    target_width: int,
    target_height: int,
) -> list[tuple[int, int, int, int]]:
    if width == target_width and height == target_height:
        return list(pixels)
    output: list[tuple[int, int, int, int]] = []
    for y in range(target_height):
        source_y = min(height - 1, int(y * height / target_height))
        for x in range(target_width):
            source_x = min(width - 1, int(x * width / target_width))
            output.append(pixels[source_y * width + source_x])
    return output


def _foreground_bbox(
    pixels: list[tuple[int, int, int, int]],
    width: int,
    height: int,
) -> tuple[int, int, int, int] | None:
    xs: list[int] = []
    ys: list[int] = []
    for index, (_red, _green, _blue, alpha) in enumerate(pixels):
        if alpha <= 0:
            continue
        xs.append(index % width)
        ys.append(index // width)
    if not xs or not ys:
        return None
    return min(xs), min(ys), max(xs), max(ys)


def _deterministic_frame_geometry(*, action: str, frame_index: int) -> tuple[int, int, int, int]:
    index = frame_index % FRAMES_PER_ACTION
    if action == "sleep":
        widths = [96, 116, 116, 120]
        heights = [34, 30, 30, 28]
        offsets = [8, 12, 12, 20]
        return widths[index], heights[index], offsets[index], 58
    if action == "walk":
        widths = [100, 120, 100, 120]
        heights = [44, 36, 44, 36]
        offsets = [8, 18, 8, 18]
        return widths[index], heights[index], offsets[index], 44
    if action == "tail_wag":
        widths = [78, 106, 82, 110]
        heights = [58, 50, 58, 50]
        offsets = [24, 12, 28, 10]
        return widths[index], heights[index], offsets[index], 28
    if action == "sit":
        widths = [72, 76, 72, 78]
        heights = [84, 80, 84, 78]
        return widths[index], heights[index], 32 + index * 2, 18
    if action == "look":
        widths = [76, 82, 78, 84]
        heights = [74, 72, 74, 72]
        return widths[index], heights[index], 28 + index * 4, 24
    widths = [80, 84, 80, 86]
    heights = [76, 74, 76, 74]
    return widths[index], heights[index], 30 + index * 3, 24


def _evaluate_action_style_consistency(frame_images: list[bytes]) -> dict[str, Any]:
    signatures = []
    for frame_index, image_bytes in enumerate(frame_images):
        signature = _frame_style_signature(image_bytes)
        signature["frame"] = frame_index
        signatures.append(signature)
    valid = [item for item in signatures if item.get("ok")]
    if len(valid) < 2:
        return {
            "ok": True,
            "failure_reasons": [],
            "bad_frames": [],
            "frames": signatures,
        }
    median_blue_bias = _median_float([float(item["blue_bias"]) for item in valid])
    median_dark_ratio = _median_float([float(item["dark_ratio"]) for item in valid])
    bad_frames = []
    for item in valid:
        blue_bias_delta = abs(float(item["blue_bias"]) - median_blue_bias)
        dark_ratio = float(item["dark_ratio"])
        item["blue_bias_delta"] = round(blue_bias_delta, 6)
        item["dark_ratio_delta"] = round(dark_ratio - median_dark_ratio, 6)
        if blue_bias_delta > 16.0:
            bad_frames.append(int(item["frame"]))
        elif dark_ratio > max(0.055, median_dark_ratio + 0.045):
            bad_frames.append(int(item["frame"]))
    return {
        "ok": not bad_frames,
        "failure_reasons": [] if not bad_frames else ["frame_style_outlier"],
        "bad_frames": bad_frames,
        "median_blue_bias": round(median_blue_bias, 6),
        "median_dark_ratio": round(median_dark_ratio, 6),
        "frames": signatures,
    }


def _frame_style_signature(image_bytes: bytes) -> dict[str, Any]:
    try:
        _width, _height, pixels = _read_rgba_png(image_bytes)
    except Exception:
        return {"ok": False, "reason": "unreadable_png"}
    foreground = [(red, green, blue) for red, green, blue, alpha in pixels if alpha > 0]
    if len(foreground) < 800:
        return {"ok": False, "reason": "not_enough_foreground"}
    count = float(len(foreground))
    mean_red = sum(pixel[0] for pixel in foreground) / count
    mean_green = sum(pixel[1] for pixel in foreground) / count
    mean_blue = sum(pixel[2] for pixel in foreground) / count
    dark_ratio = sum(1 for red, green, blue in foreground if red + green + blue < 120) / count
    return {
        "ok": True,
        "foreground_pixels": len(foreground),
        "mean_rgb": [round(mean_red, 3), round(mean_green, 3), round(mean_blue, 3)],
        "blue_bias": round(mean_blue - ((mean_red + mean_green) / 2), 6),
        "dark_ratio": round(dark_ratio, 6),
    }


def _median_float(values: list[float]) -> float:
    if not values:
        return 0.0
    ordered = sorted(values)
    midpoint = len(ordered) // 2
    if len(ordered) % 2:
        return ordered[midpoint]
    return (ordered[midpoint - 1] + ordered[midpoint]) / 2


def _safe_transparency_report(transparency: dict[str, Any]) -> dict[str, Any]:
    return {
        "method": transparency.get("method"),
        "width": transparency.get("width"),
        "height": transparency.get("height"),
        "transparent_pixels": transparency.get("transparent_pixels"),
        "chroma_residue_pixels": transparency.get("chroma_residue_pixels"),
        "chroma_key": transparency.get("chroma_key"),
        "chroma_threshold": transparency.get("chroma_threshold"),
    }


def _safe_frame_report(frame_report: dict[str, Any]) -> dict[str, Any]:
    return {
        "component_count": frame_report.get("component_count"),
        "source_bbox": frame_report.get("source_bbox"),
        "foreground_pixels": frame_report.get("foreground_pixels"),
        "bbox_fill_ratio": frame_report.get("bbox_fill_ratio"),
    }


def _visual_review_passed(payload: dict[str, Any], *, action: str = "") -> bool:
    review = payload.get("review") if isinstance(payload, dict) else None
    if not isinstance(review, dict):
        return False
    passed = review.get("passed")
    action_ok = review.get("action_ok", True)
    identity_ok = review.get("identity_ok", True)
    if action == "walk" and review.get("gait_cycle_ok") is not True and review.get("two_frame_loop_ok") is not True:
        return False
    return (
        passed is True
        and action_ok is not False
        and identity_ok is not False
        and not _visual_review_notes_block_action(action=action, review=review)
    )


def _provider_request_blocked(exc: Exception) -> bool:
    if not isinstance(exc, (QwenRequestError, APIMartRequestError)):
        return False
    text = str(exc).lower()
    return any(
        marker in text
        for marker in (
            "arrearage",
            "account is in good standing",
            "access denied",
            "overdue-payment",
            "insufficient balance",
        )
    )


def _provider_request_transient(exc: Exception) -> bool:
    if _provider_request_blocked(exc):
        return False
    if not isinstance(exc, (QwenRequestError, APIMartRequestError, OpenAIRequestError)):
        return False
    text = str(exc).lower()
    return any(
        marker in text
        for marker in (
            "timed out",
            "timeout",
            "polling failed",
            "before receiving a response",
            "remote end closed connection",
            "temporarily unavailable",
            "connection reset",
            "connection aborted",
        )
    )


def _final_contact_sheet_visual_review_passed(payload: dict[str, Any] | None) -> bool:
    review = payload.get("review") if isinstance(payload, dict) else None
    if not isinstance(review, dict):
        return False
    return (
        review.get("passed") is True
        and review.get("identity_ok", True) is not False
        and review.get("row_semantics_ok", True) is not False
        and review.get("direction_ok", True) is not False
    )


def _visual_review_rejected_generated_action(action_review: dict[str, Any], *, action: str = "") -> bool:
    payload = action_review.get("visual_review")
    if not isinstance(payload, dict) or payload.get("status") != "ok":
        return False
    review = payload.get("review")
    if not isinstance(review, dict):
        return False
    return (
        review.get("passed") is False
        or review.get("action_ok") is False
        or review.get("identity_ok") is False
        or (action == "walk" and review.get("gait_cycle_ok") is not True and review.get("two_frame_loop_ok") is not True)
        or _visual_review_notes_block_action(action=action, review=review)
    )


def _action_visual_review_failure_summary(action_review: dict[str, Any]) -> dict[str, Any]:
    payload = action_review.get("visual_review")
    if not isinstance(payload, dict):
        return {}
    review = payload.get("review")
    if not isinstance(review, dict):
        return {}
    risks = review.get("risks", [])
    return {
        "visual_review_score": review.get("score"),
        "visual_review_passed": review.get("passed"),
        "visual_review_action_ok": review.get("action_ok"),
        "visual_review_identity_ok": review.get("identity_ok"),
        "visual_review_gait_cycle_ok": review.get("gait_cycle_ok"),
        "visual_review_two_frame_loop_ok": review.get("two_frame_loop_ok"),
        "visual_review_front_paw_sequence": review.get("front_paw_sequence"),
        "visual_review_hind_paw_sequence": review.get("hind_paw_sequence"),
        "visual_review_bad_frames": review.get("bad_frames"),
        "visual_review_notes": str(review.get("notes") or "")[:240],
        "visual_review_risks": [str(item)[:120] for item in risks[:4]] if isinstance(risks, list) else [],
    }


def _visual_review_notes_block_action(*, action: str, review: dict[str, Any]) -> bool:
    notes = str(review.get("notes") or "").lower()
    risks = " ".join(str(item).lower() for item in review.get("risks", []) if item is not None)
    text = f"{notes} {risks}"
    if action == "sit":
        safe_negations = ("no standing", "without standing", "not standing", "no transition")
        if any(negation in text for negation in safe_negations):
            return False
        blockers = (
            "one standing frame",
            "standing frame included",
            "standing pose",
            "not sitting",
            "not seated",
            "from standing",
            "transition logic",
        )
        return any(blocker in text for blocker in blockers)
    return False


def _build_review(*, frame_qa: dict[str, Any], action_semantic_qa: dict[str, Any]) -> dict[str, Any]:
    errors: list[dict[str, Any]] = []
    if frame_qa.get("ok") is not True:
        errors.append(
            {
                "scope": "frames",
                "reason": frame_qa.get("failure_reason") or "frame_review_failed",
            }
        )
    for row in action_semantic_qa.get("rows", []):
        if row.get("ok") is True:
            continue
        errors.append(
            {
                "scope": "action_semantics",
                "action": row.get("action"),
                "reasons": row.get("reasons", []),
            }
        )
    return {
        "ok": not errors,
        "kind": "deterministic_hatch_pet_style_review",
        "visual_qa_required_before_user_approval": True,
        "errors": errors,
        "frame_qa_ok": frame_qa.get("ok") is True,
        "action_semantic_qa_ok": action_semantic_qa.get("ok") is True,
    }


def _write_install_package(
    *,
    build_dir: Path,
    pet_id: str,
    pet_name: str,
    spritesheet_bytes: bytes,
) -> dict[str, Any]:
    package_dir = build_dir / "package"
    package_dir.mkdir(parents=True, exist_ok=True)
    (package_dir / "spritesheet.png").write_bytes(spritesheet_bytes)
    payload = {
        "id": pet_id,
        "displayName": pet_name,
        "description": "纪念桌宠动作包，需用户确认后安装。",
        "runtimeVersion": "0.1.0",
        "spritesheetPath": "spritesheet.png",
        "animations": {
            action: {"row": index, "frames": FRAMES_PER_ACTION, "mode": "low_disturbance"}
            for index, action in enumerate(ALLOWED_ACTIONS)
        },
        "userApproved": False,
        "source": {
            "fixtureKind": "private_generated",
            "sensitiveContentStored": False,
        },
    }
    _write_json(package_dir / "pet.json", payload)
    return {
        "path": "package/pet.json",
        "spritesheet": "package/spritesheet.png",
        "format": "pet.json",
    }


def _read_image_ref(path: Path) -> dict[str, Any]:
    if not path.exists():
        raise ValueError("photo file does not exist")
    suffix = path.suffix.lower()
    mime = "image/png" if suffix == ".png" else "image/webp" if suffix == ".webp" else "image/jpeg"
    image_bytes = path.read_bytes()
    return {
        "name": path.name,
        "mime": mime,
        "path": path,
        "bytes": image_bytes,
        "sha256": hashlib.sha256(image_bytes).hexdigest(),
    }


def _spritesheet_filename(image: dict[str, Any]) -> str:
    mime = str(image.get("mime", "")).lower()
    if mime == "image/webp":
        return "spritesheet.webp"
    if mime == "image/png":
        return "spritesheet.png"
    return "spritesheet.jpg"


def _generated_spritesheet_filename(image_mime: str) -> str:
    mime = image_mime.lower()
    if mime == "image/webp":
        return "spritesheet.webp"
    if mime == "image/jpeg":
        return "spritesheet.jpg"
    return "spritesheet.png"


def _build_manifest(
    *,
    pet_id: str,
    pet_name: str,
    build_id: str,
    package_id: str,
    checksum: str,
    spritesheet_filename: str,
    image_refs: list[dict[str, Any]],
    analyzer_model: str | None,
    image_generation_provider: str,
    image_model: str,
    canonical_request_id: str,
    action_request_ids: dict[str, list[str]],
    action_candidate_request_ids: dict[str, list[list[str]]],
    generation_prompt_sha256: str,
    action_prompt_sha256s: dict[str, list[str]],
    generation_usage: dict[str, Any],
    transparency_processing: dict[str, Any],
    frame_qa: dict[str, Any],
    action_semantic_qa: dict[str, Any],
    size_proportion_qa: dict[str, Any],
    candidate_policy: dict[str, Any],
    generation_strategy: str,
    image_mime: str,
    analysis: dict[str, Any],
) -> dict[str, Any]:
    return {
        "package_id": package_id,
        "build_id": build_id,
        "pet_id": pet_id,
        "display_name": pet_name,
        "runtime_version": "0.1.0",
        "schema_version": "0.1.0",
        "cell_width": 192,
        "cell_height": 208,
        "states": [
            {"name": state, "frames": FRAMES_PER_ACTION, "mode": "low_disturbance"}
            for state in ALLOWED_ACTIONS
        ],
        "spritesheet_path": spritesheet_filename,
        "checksum_algorithm": "sha256",
        "checksum": checksum,
        "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "user_approved": False,
        "source": {
            "kind": "photo_to_image_worker",
            "input_kind": "raw_pet_photo",
            "asset_style": "high_fidelity_reference_preserving",
            "generator_version": "photo_generation_worker.image_provider.v3",
            "generation_strategy": generation_strategy,
            "analyzer_model": analyzer_model,
            "image_generation_provider": image_generation_provider,
            "image_model": image_model,
            "image_request_id": canonical_request_id,
            "canonical_request_id": canonical_request_id,
            "action_request_ids": action_request_ids,
            "action_candidate_request_ids": action_candidate_request_ids,
            "generation_prompt_sha256": generation_prompt_sha256,
            "action_prompt_sha256s": action_prompt_sha256s,
            "generation_size": "1024*1024",
            "generation_usage": generation_usage,
            "transparency_processing": transparency_processing,
            "frame_qa": frame_qa,
            "action_semantic_qa": action_semantic_qa,
            "size_proportion_qa": size_proportion_qa,
            "candidate_policy": candidate_policy,
            "input_media_refs": [
                {
                    "sha256": image["sha256"],
                    "mime": image.get("source_mime", image["mime"]),
                    "prepared_mime": image["mime"],
                    "prepared_sha256": image.get("prepared_sha256"),
                    "converted": image.get("converted", False),
                }
                for image in image_refs
            ],
            "output_asset_ref": spritesheet_filename,
            "output_mime": image_mime,
            "raw_media_included": False,
            "memory_text_included": False,
            "analysis_summary": str(analysis.get("user_visible_summary", ""))[:180],
            "analysis_confidence": analysis.get("confidence"),
        },
    }


def _annotate_selected_candidates(frame_qa: dict[str, Any], action_generations: dict[str, list[dict[str, Any]]]) -> None:
    for row in frame_qa.get("rows", []):
        action = str(row.get("action", ""))
        selected_payloads = action_generations.get(action, [])
        for frame in row.get("frames", []):
            frame_index = int(frame.get("frame", 0))
            if frame_index >= len(selected_payloads):
                continue
            payload = selected_payloads[frame_index]
            frame["selected_candidate_index"] = int(payload.get("selected_candidate_index", 0))
            frame["candidate_score"] = float(payload.get("candidate_score", 0.0))


def _annotate_walk_loop_mode(frame_qa: dict[str, Any]) -> None:
    for row in frame_qa.get("rows", []):
        if row.get("action") == "walk":
            row["loop_mode"] = WALK_LOOP_MODE
            row["generated_frame_count"] = WALK_GENERATED_FRAME_COUNT
            row["derived_frames"] = [
                {"frame": frame_index, "derived_from_frame": frame_index % WALK_GENERATED_FRAME_COUNT}
                for frame_index in range(WALK_GENERATED_FRAME_COUNT, FRAMES_PER_ACTION)
            ]


def _copy_walk_generation_payload(
    source_payload: dict[str, Any],
    *,
    source_frame_index: int,
    target_frame_index: int,
) -> dict[str, Any]:
    source_request_id = str(source_payload.get("request_id", ""))
    copied = dict(source_payload)
    copied["request_id"] = f"derived-copy-of:{source_request_id or f'walk-{source_frame_index}'}"
    copied["source_mode"] = WALK_LOOP_MODE
    copied["derived_from_frame_index"] = source_frame_index
    copied["copied_to_frame_index"] = target_frame_index
    copied["selected_candidate_index"] = int(source_payload.get("selected_candidate_index", 0))
    copied["candidate_score"] = float(source_payload.get("candidate_score", 0.0))
    copied["base_score"] = float(source_payload.get("base_score", 0.0))
    copied["visual_detail_score"] = float(source_payload.get("visual_detail_score", 0.0))
    usage = dict(source_payload.get("usage", {})) if isinstance(source_payload.get("usage"), dict) else {}
    usage["derived_copy"] = True
    usage["external_image_count"] = 0
    copied["usage"] = usage
    return copied


def _resumed_generation_payload(
    *,
    action: str,
    frame_index: int | None,
    provider: str,
    model: str,
) -> dict[str, Any]:
    suffix = "canonical" if frame_index is None else f"{action}-{frame_index}"
    return {
        "status": "ok",
        "provider": provider,
        "model": model,
        "request_id": f"resumed-existing:{suffix}",
        "image_mime": "image/png",
        "usage": {
            "resumed_existing": True,
            "external_image_count": 0,
        },
        "prompt_sha256": "",
        "selected_candidate_index": 0,
        "candidate_score": 0.0,
        "base_score": 0.0,
        "visual_detail_score": 0.0,
        "source_mode": "resumed_existing",
    }


def _build_validation(manifest: dict[str, Any]) -> dict[str, Any]:
    asset_style = manifest["source"].get("asset_style")
    hatch_pipeline = manifest["source"].get("hatch_pet_pipeline", {})
    final_visual_qa_required = bool(hatch_pipeline.get("visual_qa_required_before_user_approval"))
    checks = {
        "manifest_complete": True,
        "spritesheet_exists": True,
        "checksum_sha256": len(manifest["checksum"]) == 64,
        "states_allowed": [state["name"] for state in manifest["states"]] == list(ALLOWED_ACTIONS),
        "high_fidelity_asset": asset_style in {"high_fidelity_reference_preserving"},
        "image_generation_tracked": bool(manifest["source"].get("image_request_id"))
        and bool(manifest["source"].get("image_model")),
        "alpha_capable_asset": manifest["source"].get("transparency_processing", {}).get("alpha_capable") is True,
        "frame_qa_passed": manifest["source"].get("frame_qa", {}).get("ok") is True,
        "action_semantics_passed": manifest["source"].get("action_semantic_qa", {}).get("ok") is True,
        "size_proportion_qa_passed": manifest["source"].get("size_proportion_qa", {}).get("ok") is True,
        "hatch_pet_review_artifacts_ready": bool(
            hatch_pipeline.get("deterministic_review")
        )
        and bool(hatch_pipeline.get("contact_sheet_png"))
        and bool(hatch_pipeline.get("preview_dir")),
        "final_visual_qa_passed": (not final_visual_qa_required) or bool(hatch_pipeline.get("final_visual_review")),
        "package_manifest_written": manifest.get("package_ref") == "package/pet.json",
        "user_approval_required": manifest["user_approved"] is False,
        "no_sensitive_payloads": True,
    }
    return {"ok": all(checks.values()), "checks": checks, "failure_reason": None}


def _build_summary(
    manifest: dict[str, Any],
    analysis: dict[str, Any],
    canonical_generation: dict[str, Any],
    action_generations: dict[str, list[dict[str, Any]]],
) -> dict[str, Any]:
    return {
        "ok": True,
        "package_id": manifest["package_id"],
        "build_id": manifest["build_id"],
        "pet_id": manifest["pet_id"],
        "photo_analysis": {
            "provider": analysis.get("provider", "external"),
            "species": analysis.get("species"),
            "base_color": analysis.get("base_color"),
            "accent_color": analysis.get("accent_color"),
            "confidence": analysis.get("confidence"),
            "safe_for_generation": analysis.get("safe_for_generation", True),
            "summary": analysis.get("user_visible_summary", ""),
        },
        "image_generation": {
            "provider": manifest["source"].get("image_generation_provider")
            or canonical_generation.get("provider")
            or "unknown",
            "model": canonical_generation.get("model"),
            "generation_strategy": manifest["source"].get("generation_strategy"),
            "canonical_request_id": canonical_generation.get("request_id"),
            "action_request_ids": {
                action: [payload.get("request_id") for payload in payloads]
                for action, payloads in action_generations.items()
            },
            "output_mime": "image/png",
            "usage": manifest["source"].get("generation_usage", {}),
        },
        "frame_qa": manifest["source"].get("frame_qa", {}),
        "action_semantic_qa": manifest["source"].get("action_semantic_qa", {}),
        "size_proportion_qa": manifest["source"].get("size_proportion_qa", {}),
        "manifest": "manifest.json",
        "spritesheet": manifest["spritesheet_path"],
        "validation": "validation.json",
        "contact_sheet": "qa/contact-sheet.png",
        "contact_sheet_html": "qa/contact-sheet.html",
        "previews": "qa/previews",
        "review": "qa/review.json",
        "package": manifest.get("package_ref"),
    }


def _contact_sheet_html(manifest: dict[str, Any]) -> str:
    rows = "\n".join(
        f"<li><strong>{state['name']}</strong><span>{state['frames']} frames · {state['mode']}</span></li>"
        for state in manifest["states"]
    )
    return f"""<!doctype html>
<html lang="zh-CN">
<meta charset="utf-8" />
<title>{manifest['display_name']} photo worker QA</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, "PingFang SC", sans-serif; background: #fbf7f0; color: #2e2a26; }}
main {{ max-width: 880px; margin: 32px auto; padding: 24px; background: #fff; border-radius: 18px; }}
img {{ width: 256px; height: 256px; object-fit: none; object-position: left top; }}
li {{ margin: 8px 0; display: flex; gap: 12px; }}
</style>
<main>
<h1>{manifest['display_name']} 照片生成 worker QA</h1>
<p>本页展示从授权照片经本地素材检查和配置的图片生成 provider 产出的 P0 高还原桌宠包证据。</p>
<img src="../{manifest['spritesheet_path']}" alt="{manifest['display_name']} generated spritesheet first frame" />
<ul>{rows}</ul>
</main>
</html>
"""


def _safe_generation_usage(value: Any) -> dict[str, Any]:
    if not isinstance(value, dict):
        return {}
    safe: dict[str, Any] = {}
    for key, item in value.items():
        if isinstance(item, (str, int, float, bool)) or item is None:
            safe[str(key)[:80]] = item
    return safe


def _slugify(value: str) -> str:
    cleaned = "".join(ch.lower() if ch.isalnum() else "-" for ch in value.strip())
    return "-".join(part for part in cleaned.split("-") if part) or "pet"


def _utc_stamp() -> str:
    return datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")


def _write_json(path: Path, payload: dict[str, Any]) -> None:
    path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
