#!/usr/bin/env python3
from __future__ import annotations

import argparse
import sys
import uuid
from pathlib import Path
from typing import Any

ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from services.api.local_api import LocalPetApi
from services.api.production_app import create_production_api
from services.ai.provider_metrics import sanitize_cloud_provider_call_metrics


def _now() -> str:
    from datetime import datetime, timezone

    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def run_one_queued_build(*, api: LocalPetApi, worker_id: str = "local_worker") -> dict[str, Any]:
    claim = api.claim_next_build(worker_id=worker_id)
    build_id = claim["build_id"]
    try:
        return api.complete_claimed_build(build_id, worker_id=worker_id)
    except Exception as exc:
        return api.fail_claimed_build(
            build_id,
            worker_id=worker_id,
            failure_code="worker_error",
            failure_reason=str(exc),
        )


def run_one_cloud_queued_build(
    *,
    repository: Any,
    build_runner: Any,
    package_storage: Any,
    media_loader: Any,
    output_root: Path,
    worker_id: str = "cloud_worker",
) -> dict[str, Any]:
    now = _now()
    build = repository.claim_next_cloud_build(worker_id=worker_id, now=now)
    build_id = str(build["build_id"])
    try:
        _assert_cloud_build_can_run(build)
        user_id = str(build["user_id"])
        pet_id = str(build["pet_id"])
        pet = repository.find_pet_profile(user_id=user_id, pet_id=pet_id)
        if pet is None:
            raise ValueError("pet not found or not allowed")
        _assert_cloud_build_entitlement(repository.find_active_entitlements(user_id=user_id))
        media = []
        for media_id in _cloud_build_media_ids(build):
            media_item = repository.find_pet_media(user_id=user_id, pet_id=pet_id, media_id=media_id)
            if media_item is None:
                raise ValueError("media not found or not allowed")
            _assert_cloud_media_can_generate(media_item)
            media_loader.load_media(media_item)
            media.append(dict(media_item))

        output_root.mkdir(parents=True, exist_ok=True)
        result = build_runner(build=dict(build), pet=dict(pet), media=media, output_root=output_root)
        stored = package_storage.store_build_result(build_id=build_id, result=result)
        now = _now()
        metric_rows = sanitize_cloud_provider_call_metrics(
            result.get("provider_call_metrics"),
            user_id=user_id,
            pet_id=pet_id,
            build_id=build_id,
            created_at=now,
        )
        if metric_rows:
            repository.insert_provider_call_metrics(metric_rows)
        updated = repository.update_cloud_build_success(
            build_id=build_id,
            status="preview_ready",
            output_package_ref=stored.get("output_package_ref") or result.get("package_ref"),
            preview_ref=stored.get("preview_ref") or result.get("preview_ref"),
            user_approved=False,
            allowed_actions=list(result.get("actions") or []),
            checksum_valid=bool(result.get("checksum_valid", True)),
            runtime_compatible=bool(result.get("runtime_compatible", True)),
            updated_at=now,
            completed_at=now,
        )
        package_row = _cloud_package_row_from_stored(
            stored=stored,
            result=result,
            build=build,
            build_id=build_id,
            user_id=user_id,
            pet_id=pet_id,
            now=now,
        )
        if package_row is not None:
            repository.insert_pet_package(package_row)
        return {
            "build_id": build_id,
            "user_visible_status": "preview_ready",
            "preview_available": True,
            "updated_at": updated.get("updated_at") or now,
        }
    except Exception as exc:
        now = _now()
        repository.update_cloud_build_failure(
            build_id=build_id,
            failure_code="worker_error",
            failure_reason=_safe_worker_failure_reason(str(exc)),
            updated_at=now,
            completed_at=now,
        )
        return {
            "build_id": build_id,
            "user_visible_status": "failed",
            "failure_code": "worker_error",
            "updated_at": now,
        }


def _assert_cloud_build_can_run(build: dict[str, Any]) -> None:
    if build.get("user_approved") is True:
        raise ValueError("worker cannot process an already approved build")
    if int(build.get("cost_estimate_cents") or 0) > int(build.get("cost_cap_cents") or 0):
        raise ValueError("model cost cap exceeded")


def _assert_cloud_build_entitlement(entitlements: list[dict[str, Any]]) -> None:
    for entitlement in entitlements:
        if entitlement.get("type") != "custom_pet_build":
            continue
        if entitlement.get("status") != "active":
            continue
        if int(entitlement.get("quantity_used") or 0) < int(entitlement.get("quantity_total") or 0):
            return
    raise ValueError("active custom_pet_build entitlement is required")


def _assert_cloud_media_can_generate(media: dict[str, Any]) -> None:
    if media.get("deletion_status") != "active":
        raise ValueError("active uploaded media is required")
    if media.get("source_quality") != "passed":
        raise ValueError("passed uploaded media is required")


def _cloud_build_media_ids(build: dict[str, Any]) -> list[str]:
    refs = build.get("input_media_refs") or []
    media_ids: list[str] = []
    if isinstance(refs, list):
        for item in refs:
            if isinstance(item, dict):
                media_id = item.get("media_id")
            else:
                media_id = item
            if media_id:
                media_ids.append(str(media_id))
    if not media_ids:
        raise ValueError("build has no input media")
    return media_ids


def _cloud_package_row_from_stored(
    *,
    stored: dict[str, Any],
    result: dict[str, Any],
    build: dict[str, Any],
    build_id: str,
    user_id: str,
    pet_id: str,
    now: str,
) -> dict[str, Any] | None:
    package_storage_ref = stored.get("package_storage_ref") or stored.get("output_package_ref")
    required = {
        "runtime_version": stored.get("runtime_version"),
        "package_storage_ref": package_storage_ref,
        "bucket": stored.get("bucket"),
        "object_key": stored.get("object_key"),
        "asset_manifest_ref": stored.get("asset_manifest_ref"),
        "animation_manifest_ref": stored.get("animation_manifest_ref"),
        "spritesheet_ref": stored.get("spritesheet_ref"),
        "spritesheet_sha256": stored.get("spritesheet_sha256"),
        "pet_json_sha256": stored.get("pet_json_sha256"),
        "package_sha256": stored.get("package_sha256"),
    }
    if any(not str(value or "").strip() for value in required.values()):
        return None
    return {
        "package_id": stored.get("package_id") or f"pkg_{uuid.uuid4().hex}",
        "user_id": user_id,
        "pet_id": pet_id,
        "build_id": build_id,
        "status": "validated",
        "runtime_version": str(required["runtime_version"]),
        "asset_manifest_ref": str(required["asset_manifest_ref"]),
        "animation_manifest_ref": str(required["animation_manifest_ref"]),
        "package_storage_ref": str(required["package_storage_ref"]),
        "storage_provider": str(stored.get("storage_provider") or "aliyun_oss"),
        "storage_region": str(stored.get("storage_region") or ""),
        "bucket": str(required["bucket"]),
        "object_key": str(required["object_key"]),
        "spritesheet_ref": str(required["spritesheet_ref"]),
        "spritesheet_sha256": str(required["spritesheet_sha256"]),
        "pet_json_sha256": str(required["pet_json_sha256"]),
        "package_sha256": str(required["package_sha256"]),
        "allowed_actions": list(result.get("actions") or build.get("allowed_actions") or []),
        "user_approved": False,
        "created_at": now,
        "updated_at": now,
    }


def _safe_worker_failure_reason(reason: str) -> str:
    lowered = str(reason or "").lower()
    if "/users/" in lowered or "trace" in lowered or "stack" in lowered or "key" in lowered:
        return "Generation failed. Please try again."
    return str(reason or "Generation failed. Please try again.")[:160]


def main() -> int:
    parser = argparse.ArgumentParser(description="Run the local production pet generation worker.")
    parser.add_argument("--once", action="store_true", help="Process one queued build and exit.")
    parser.add_argument("--worker-id", default="local_worker")
    parser.add_argument("--state-dir", type=Path, required=True)
    parser.add_argument("--storage-dir", type=Path, required=True)
    parser.add_argument("--output-dir", type=Path, required=True)
    args = parser.parse_args()

    api = create_production_api(
        state_dir=args.state_dir,
        storage_dir=args.storage_dir,
        output_dir=args.output_dir,
    )
    if args.once:
        result = run_one_queued_build(api=api, worker_id=args.worker_id)
        print(result)
        return 0
    while True:
        run_one_queued_build(api=api, worker_id=args.worker_id)


if __name__ == "__main__":
    raise SystemExit(main())
