from __future__ import annotations

import hashlib
import os
import re
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any

from services.api.supabase_service_role import SupabaseServiceRoleRepository
from services.api.state_guards import (
    ALLOWED_ACTIONS,
    assert_can_approve_build,
    assert_can_download_package,
)
from services.payment.stripe_checkout import StripeCheckoutClient
from services.payment.stripe_webhook import StripeWebhookVerifier
from services.storage.aliyun_oss_delete import AliyunOssObjectDeleteService
from services.storage.aliyun_oss_download import AliyunOssSignedDownloadService
from services.storage.aliyun_oss_signed_upload import AliyunOssSignedUploadService


ALLOWED_CHANNELS = {"dev", "beta", "production"}
ALLOWED_CLIENT_SURFACES = {"web", "desktop_app"}
ALLOWED_IMAGE_MIME_TYPES = {"image/png", "image/jpeg", "image/webp", "image/gif"}
MAX_CLOUD_UPLOAD_BYTES = 15 * 1024 * 1024
UPLOAD_URL_TTL_SECONDS = 600
DEFAULT_PLAN_CODE = "sixxie_monthly_launch"
DEFAULT_PLAN_PRICE_CENTS = 999
MONTHLY_ENTITLEMENT_QUANTITIES = {
    "custom_pet_build": 1,
    "revision_credit": 3,
    "package_download": 1,
    "package_runtime_access": 1,
}


class CloudPetApi:
    def __init__(
        self,
        *,
        repository: SupabaseServiceRoleRepository,
        media_uploads: Any | None = None,
        media_deleter: Any | None = None,
        package_downloads: Any | None = None,
        stripe_checkout: Any | None = None,
        stripe_webhook: Any | None = None,
    ) -> None:
        self.repository = repository
        self.media_uploads = media_uploads
        self.media_deleter = media_deleter
        self.package_downloads = package_downloads
        self.stripe_checkout = stripe_checkout
        self.stripe_webhook = stripe_webhook

    @classmethod
    def from_env(cls) -> "CloudPetApi":
        supabase_url = os.environ.get("SUPABASE_URL", "")
        service_role_key = os.environ.get("SUPABASE_SERVICE_ROLE_KEY", "")
        missing = []
        if not supabase_url.strip():
            missing.append("SUPABASE_URL")
        if not service_role_key.strip():
            missing.append("SUPABASE_SERVICE_ROLE_KEY")
        if missing:
            raise ValueError(f"Missing cloud API configuration: {', '.join(missing)}")
        media_uploads = None
        media_deleter = None
        package_downloads = None
        stripe_checkout = None
        stripe_webhook = None
        if any(os.environ.get(name, "").strip() for name in _ALIYUN_OSS_ENV_NAMES):
            media_uploads = AliyunOssSignedUploadService.from_env()
            media_deleter = AliyunOssObjectDeleteService.from_env()
            package_downloads = AliyunOssSignedDownloadService.from_env()
        if any(os.environ.get(name, "").strip() for name in _STRIPE_CHECKOUT_ENV_NAMES):
            stripe_checkout = StripeCheckoutClient.from_env()
        if os.environ.get("STRIPE_WEBHOOK_SECRET", "").strip():
            stripe_webhook = StripeWebhookVerifier.from_env()
        return cls(
            repository=SupabaseServiceRoleRepository(
                supabase_url=supabase_url,
                service_role_key=service_role_key,
            ),
            media_uploads=media_uploads,
            media_deleter=media_deleter,
            package_downloads=package_downloads,
            stripe_checkout=stripe_checkout,
            stripe_webhook=stripe_webhook,
        )

    def health(self) -> dict[str, Any]:
        return {"ok": True, "store": "supabase", "storage": "not_checked", "output": "not_checked"}

    def create_cloud_session(
        self,
        *,
        auth_uid: str,
        email: str | None,
        device_id: str,
        app_version: str,
        channel: str,
    ) -> dict[str, Any]:
        _require_uuid(auth_uid, "auth_uid")
        _require_text(device_id, "device_id")
        _require_text(app_version, "app_version")
        if channel not in ALLOWED_CHANNELS:
            raise ValueError("channel must be dev, beta, or production")

        now = _now()
        user = self.repository.find_user_by_auth_uid(auth_uid=auth_uid)
        if user is None:
            user = self.repository.insert_user(
                {
                    "user_id": _new_id("usr"),
                    "auth_uid": auth_uid,
                    "email": _optional_text(email),
                    "status": "active",
                    "privacy_settings": {},
                    "created_at": now,
                    "updated_at": now,
                }
            )
        _assert_active_user(user)
        user_id = str(user["user_id"])
        session = self.repository.insert_api_session(
            {
                "session_id": _new_id("ses"),
                "user_id": user_id,
                "device_id": device_id.strip(),
                "app_version": app_version.strip(),
                "channel": channel,
                "status": "active",
                "created_at": now,
                "expires_at": _days_from_now(30),
            }
        )
        entitlements = self.repository.find_active_entitlements(user_id=user_id)
        return {
            "user_id": user_id,
            "session_id": session["session_id"],
            "user_visible_status": "session_active",
            "expires_at": session.get("expires_at") or _days_from_now(30),
            "entitlements": _summarize_entitlements(entitlements),
        }

    def get_cloud_me(self, *, session_id: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        user = self.repository.find_user_by_id(user_id=str(session["user_id"]))
        if user is None:
            raise PermissionError("user not found or not allowed")
        _assert_active_user(user)
        entitlements = self.repository.find_active_entitlements(user_id=str(session["user_id"]))
        return {
            "user_id": session["user_id"],
            "session_id": session["session_id"],
            "user_visible_status": "session_active",
            "expires_at": session.get("expires_at"),
            "entitlements": _summarize_entitlements(entitlements),
        }

    def redeem_cloud_invite(self, *, session_id: str, invite_code: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        _require_text(invite_code, "invite_code")
        now = _now()
        user_id = str(session["user_id"])
        invite = self.repository.find_beta_invite_by_code_hash(code_hash=_hash_invite_code(invite_code))
        if invite is None:
            raise ValueError("invite is not active")
        _assert_invite_can_be_redeemed(invite, now=now)
        existing_redemption = self.repository.find_beta_invite_redemption(
            invite_id=str(invite["invite_id"]),
            user_id=user_id,
        )
        if existing_redemption is not None:
            raise ValueError("invite already redeemed by this user")

        starts_at = now
        expires_at = _days_from_now(30 * int(invite.get("grant_months") or 1))
        subscription_id = _new_id("sub")
        redemption_id = _new_id("red")
        self.repository.insert_beta_invite_redemption(
            {
                "redemption_id": redemption_id,
                "invite_id": invite["invite_id"],
                "user_id": user_id,
                "subscription_id": subscription_id,
                "redeemed_at": now,
            }
        )
        subscription = self.repository.insert_subscription(
            {
                "subscription_id": subscription_id,
                "user_id": user_id,
                "plan_code": invite.get("grant_plan_code") or DEFAULT_PLAN_CODE,
                "provider": "invite",
                "source": "invite",
                "status": "active",
                "current_period_start": starts_at,
                "current_period_end": expires_at,
                "access_ends_at": expires_at,
                "cancel_at_period_end": False,
                "created_at": now,
                "updated_at": now,
            }
        )
        entitlement_rows = []
        for entitlement_type, quantity in MONTHLY_ENTITLEMENT_QUANTITIES.items():
            entitlement_rows.append(
                {
                    "entitlement_id": _new_id("ent"),
                    "user_id": user_id,
                    "subscription_id": subscription_id,
                    "order_id": None,
                    "pet_id": None,
                    "type": entitlement_type,
                    "status": "active",
                    "quantity_total": quantity,
                    "quantity_used": 0,
                    "source_type": "invite",
                    "source_id": invite["invite_id"],
                    "starts_at": starts_at,
                    "expires_at": expires_at,
                    "created_at": now,
                    "updated_at": now,
                }
            )
        self.repository.insert_entitlements(entitlement_rows)
        used_count = int(invite.get("used_count") or 0) + 1
        max_uses = int(invite.get("max_uses") or 1)
        invite_status = "used_up" if used_count >= max_uses else "active"
        self.repository.update_beta_invite_usage(
            invite_id=str(invite["invite_id"]),
            used_count=used_count,
            status=invite_status,
        )
        entitlements = self.repository.find_active_entitlements(user_id=user_id)
        return {
            "user_visible_status": "invite_redeemed",
            "subscription_id": subscription.get("subscription_id") or subscription_id,
            "subscription_status": subscription.get("status") or "active",
            "entitlements": _summarize_entitlements(entitlements),
        }

    def create_stripe_checkout_session(
        self,
        *,
        session_id: str,
        client_surface: str,
        success_url: str,
        cancel_url: str,
        amount_total_cents: int | None = None,
    ) -> dict[str, Any]:
        del amount_total_cents
        session = self._require_active_session(session_id=session_id)
        if client_surface not in ALLOWED_CLIENT_SURFACES:
            raise ValueError("client_surface must be web or desktop_app")
        if self.stripe_checkout is None:
            raise ValueError("stripe checkout is not configured")
        user_id = str(session["user_id"])
        checkout = self.stripe_checkout.create_subscription_checkout(
            user_id=user_id,
            plan_code=DEFAULT_PLAN_CODE,
            price_id=self.stripe_checkout.price_id,
            client_surface=client_surface,
            success_url=success_url,
            cancel_url=cancel_url,
        )
        now = _now()
        order = self.repository.insert_order(
            {
                "order_id": _new_id("ord"),
                "user_id": user_id,
                "pet_id": None,
                "provider": "stripe",
                "subscription_id": None,
                "client_surface": client_surface,
                "provider_checkout_session_id": checkout["id"],
                "provider_payment_id": None,
                "provider_event_id": None,
                "status": "checkout_started",
                "currency": str(checkout.get("currency") or "usd").lower(),
                "amount_total_cents": int(checkout.get("amount_total") or DEFAULT_PLAN_PRICE_CENTS),
                "product_code": DEFAULT_PLAN_CODE,
                "created_at": now,
            }
        )
        return {
            "user_visible_status": "checkout_created",
            "checkout_url": checkout["url"],
            "order_id": order.get("order_id"),
        }

    def handle_stripe_webhook(self, *, raw_body: str | bytes, stripe_signature: str) -> dict[str, Any]:
        if self.stripe_webhook is None:
            raise ValueError("stripe webhook is not configured")
        event = self.stripe_webhook.verify_event(raw_body=raw_body, stripe_signature=stripe_signature)
        event_id = str(event["id"])
        event_type = str(event["type"])
        existing = self.repository.find_webhook_event(provider_event_id=event_id)
        if existing is not None and existing.get("status") == "processed":
            return {
                "user_visible_status": "already_processed",
                "webhook_status": "processed",
            }

        now = _now()
        webhook_event_id = str(existing.get("webhook_event_id")) if existing else _new_id("wh")
        if existing is None:
            self.repository.insert_webhook_event(
                {
                    "webhook_event_id": webhook_event_id,
                    "provider": "stripe",
                    "provider_event_id": event_id,
                    "event_type": event_type,
                    "status": "received",
                    "safe_payload": _safe_stripe_event_payload(event),
                    "created_at": now,
                }
            )

        if event_type != "checkout.session.completed":
            self.repository.update_webhook_event_status(
                webhook_event_id=webhook_event_id,
                status="ignored",
                processed_at=now,
            )
            return {
                "user_visible_status": "ignored",
                "webhook_status": "ignored",
            }

        checkout_session = _stripe_event_object(event)
        user_id = _stripe_checkout_user_id(checkout_session)
        plan_code = _stripe_checkout_plan_code(checkout_session)
        starts_at = now
        expires_at = _days_from_now(30)
        subscription_id = _new_id("sub")
        order_id = _new_id("ord")
        subscription = self.repository.insert_subscription(
            {
                "subscription_id": subscription_id,
                "user_id": user_id,
                "plan_code": plan_code,
                "provider": "stripe",
                "provider_customer_id": _optional_text(checkout_session.get("customer")),
                "provider_subscription_id": _optional_text(checkout_session.get("subscription")),
                "source": "checkout",
                "status": "active",
                "current_period_start": starts_at,
                "current_period_end": expires_at,
                "access_ends_at": expires_at,
                "cancel_at_period_end": False,
                "created_at": now,
                "updated_at": now,
            }
        )
        order = self.repository.insert_order(
            {
                "order_id": order_id,
                "user_id": user_id,
                "pet_id": None,
                "provider": "stripe",
                "subscription_id": subscription.get("subscription_id") or subscription_id,
                "client_surface": _stripe_checkout_client_surface(checkout_session),
                "provider_checkout_session_id": checkout_session["id"],
                "provider_payment_id": _optional_text(checkout_session.get("payment_intent")),
                "provider_event_id": event_id,
                "status": "paid",
                "currency": str(checkout_session.get("currency") or "usd").lower(),
                "amount_total_cents": int(checkout_session.get("amount_total") or DEFAULT_PLAN_PRICE_CENTS),
                "product_code": plan_code,
                "created_at": now,
                "paid_at": now,
            }
        )
        self.repository.insert_entitlements(
            _monthly_entitlement_rows(
                user_id=user_id,
                subscription_id=str(subscription.get("subscription_id") or subscription_id),
                order_id=str(order.get("order_id") or order_id),
                pet_id=None,
                source_type="stripe",
                source_id=event_id,
                starts_at=starts_at,
                expires_at=expires_at,
                now=now,
            )
        )
        self.repository.update_webhook_event_status(
            webhook_event_id=webhook_event_id,
            status="processed",
            processed_at=now,
        )
        entitlements = self.repository.find_active_entitlements(user_id=user_id)
        return {
            "user_visible_status": "payment_verified",
            "webhook_status": "processed",
            "subscription_status": subscription.get("status") or "active",
            "entitlements": _summarize_entitlements(entitlements),
        }

    def create_cloud_pet_draft(
        self,
        *,
        auth_uid: str,
        email: str | None,
        device_id: str,
        app_version: str,
        channel: str,
        pet_name: str,
        species: str | None = None,
        breed: str | None = None,
        personality_notes: str | None = None,
        typical_gestures: str | None = None,
        memory_notes: str | None = None,
    ) -> dict[str, Any]:
        _require_uuid(auth_uid, "auth_uid")
        _require_text(device_id, "device_id")
        _require_text(app_version, "app_version")
        _require_text(pet_name, "pet_name")
        if channel not in ALLOWED_CHANNELS:
            raise ValueError("channel must be dev, beta, or production")

        now = _now()
        user = self.repository.find_user_by_auth_uid(auth_uid=auth_uid)
        if user is None:
            user = self.repository.insert_user(
                {
                    "user_id": _new_id("usr"),
                    "auth_uid": auth_uid,
                    "email": _optional_text(email),
                    "status": "active",
                    "privacy_settings": {},
                    "created_at": now,
                    "updated_at": now,
                }
            )
        user_id = str(user["user_id"])
        session = self.repository.insert_api_session(
            {
                "session_id": _new_id("ses"),
                "user_id": user_id,
                "device_id": device_id.strip(),
                "app_version": app_version.strip(),
                "channel": channel,
                "status": "active",
                "created_at": now,
                "expires_at": _days_from_now(30),
            }
        )
        pet = self.repository.insert_pet_profile(
            {
                "pet_id": _new_id("pet"),
                "user_id": user_id,
                "pet_name": pet_name.strip(),
                "species": _optional_text(species),
                "breed": _optional_text(breed),
                "personality_notes": _optional_text(personality_notes),
                "typical_gestures": _optional_text(typical_gestures),
                "memory_notes": _optional_text(memory_notes),
                "status": "ready_for_media",
                "created_at": now,
                "updated_at": now,
            }
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": pet["pet_id"],
                "event_name": "pet_profile_created",
                "event_properties": {"surface": "cloud_api", "channel": channel},
                "occurred_at": now,
            }
        )
        return {
            "user_id": user_id,
            "session_id": session["session_id"],
            "pet_id": pet["pet_id"],
            "display_name": pet.get("pet_name") or pet_name.strip(),
            "user_visible_status": "ready_for_media",
            "created_at": pet.get("created_at") or now,
        }

    def create_cloud_media_upload(
        self,
        *,
        user_id: str,
        pet_id: str,
        media_type: str,
        mime_type: str,
        size_bytes: int,
        sha256: str,
        original_filename: str | None = None,
    ) -> dict[str, Any]:
        if self.media_uploads is None:
            raise ValueError("cloud media upload storage is not configured")
        _require_text(user_id, "user_id")
        _require_text(pet_id, "pet_id")
        if media_type != "photo":
            raise ValueError("P0 cloud media upload only accepts photo media")
        cleaned_mime = str(mime_type or "").strip().lower()
        if cleaned_mime not in ALLOWED_IMAGE_MIME_TYPES:
            raise ValueError("mime_type must be a supported image type")
        size = int(size_bytes)
        if size <= 0:
            raise ValueError("size_bytes must be greater than zero")
        if size > MAX_CLOUD_UPLOAD_BYTES:
            raise ValueError("image upload exceeds the 15MB limit")
        cleaned_sha = str(sha256 or "").strip().lower()
        if re.fullmatch(r"[0-9a-f]{64}", cleaned_sha) is None:
            raise ValueError("sha256 must be a lowercase 64-character hex digest")

        pet = self.repository.find_pet_profile(user_id=user_id, pet_id=pet_id)
        if pet is None:
            raise PermissionError("pet not found or not allowed")
        if pet.get("status") == "deleted":
            raise ValueError("deleted pet cannot accept uploads")

        now = _now()
        media_id = _new_id("med")
        object_key = _cloud_media_object_key(
            user_id=user_id,
            pet_id=pet_id,
            media_id=media_id,
            original_filename=original_filename,
            mime_type=cleaned_mime,
        )
        upload = self.media_uploads.create_upload(
            object_key=object_key,
            mime_type=cleaned_mime,
            size_bytes=size,
            expires_in_seconds=UPLOAD_URL_TTL_SECONDS,
        )
        media = self.repository.insert_pet_media(
            {
                "media_id": media_id,
                "user_id": user_id,
                "pet_id": pet_id,
                "media_type": "photo",
                "storage_ref": f"oss://{self.media_uploads.bucket}/{object_key}",
                "storage_provider": self.media_uploads.storage_provider,
                "storage_region": self.media_uploads.storage_region,
                "bucket": self.media_uploads.bucket,
                "object_key": object_key,
                "original_filename": _optional_text(original_filename),
                "mime_type": cleaned_mime,
                "size_bytes": size,
                "sha256": cleaned_sha,
                "source_quality": "unchecked",
                "deletion_status": "active",
                "created_at": now,
                "updated_at": now,
            }
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": pet_id,
                "event_name": "pet_media_upload_created",
                "event_properties": {
                    "surface": "cloud_api",
                    "media_type": "photo",
                    "mime_type": cleaned_mime,
                    "size_bytes": size,
                },
                "occurred_at": now,
            }
        )
        return {
            "media_id": media.get("media_id") or media_id,
            "user_visible_status": "upload_url_created",
            "quality_status": "unchecked",
            "upload": {
                "method": upload["method"],
                "url": upload["url"],
                "headers": dict(upload.get("headers") or {}),
                "expires_at": upload["expires_at"],
            },
            "max_upload_bytes": MAX_CLOUD_UPLOAD_BYTES,
        }

    def complete_cloud_media_upload(
        self,
        *,
        user_id: str,
        pet_id: str,
        media_id: str,
    ) -> dict[str, Any]:
        if self.media_uploads is None:
            raise ValueError("cloud media upload storage is not configured")
        _require_text(user_id, "user_id")
        _require_text(pet_id, "pet_id")
        _require_text(media_id, "media_id")

        media = self.repository.find_pet_media(user_id=user_id, pet_id=pet_id, media_id=media_id)
        if media is None:
            raise PermissionError("media not found or not allowed")
        if media.get("deletion_status") != "active":
            raise ValueError("active media is required before upload completion")
        if media.get("source_quality") == "failed":
            raise ValueError("failed media cannot be completed")
        if str(media.get("bucket") or "") != str(self.media_uploads.bucket):
            raise ValueError("media storage bucket does not match configured storage")

        verified = self.media_uploads.verify_upload(
            object_key=str(media.get("object_key") or ""),
            mime_type=str(media.get("mime_type") or ""),
            size_bytes=int(media.get("size_bytes") or 0),
            sha256=str(media.get("sha256") or ""),
        )
        now = _now()
        updated = self.repository.update_pet_media_quality(
            media_id=media_id,
            source_quality="passed",
            quality_failure_reason=None,
            updated_at=now,
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": pet_id,
                "event_name": "pet_media_upload_completed",
                "event_properties": {
                    "surface": "cloud_api",
                    "media_id": media_id,
                    "size_bytes": verified["size_bytes"],
                },
                "occurred_at": now,
            }
        )
        return {
            "media_id": updated.get("media_id") or media_id,
            "user_visible_status": "upload_complete",
            "quality_status": updated.get("source_quality") or "passed",
            "uploaded_bytes": verified["size_bytes"],
            "updated_at": updated.get("updated_at") or now,
        }

    def create_cloud_build(
        self,
        *,
        session_id: str,
        pet_id: str,
        media_ids: list[str],
        image_provider: str,
        image_model: str,
    ) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        user_id = str(session["user_id"])
        _require_text(pet_id, "pet_id")
        provider = str(image_provider or "openai").strip().lower()
        if provider not in {"openai", "qwen", "apimart"}:
            raise ValueError("image_provider must be openai, qwen, or apimart")
        _require_text(image_model, "image_model")
        cleaned_media_ids = [str(media_id).strip() for media_id in media_ids if str(media_id or "").strip()]
        if not cleaned_media_ids:
            raise ValueError("at least one media item is required")

        pet = self.repository.find_pet_profile(user_id=user_id, pet_id=pet_id)
        if pet is None:
            raise PermissionError("pet not found or not allowed")
        if pet.get("status") == "deleted":
            raise ValueError("deleted pet cannot start generation")
        entitlements = self.repository.find_active_entitlements(user_id=user_id)
        _assert_entitlement_available(entitlements, entitlement_type="custom_pet_build")
        media_refs = []
        for media_id in cleaned_media_ids:
            media = self.repository.find_pet_media(user_id=user_id, pet_id=pet_id, media_id=media_id)
            if media is None:
                raise PermissionError("media not found or not allowed")
            if media.get("deletion_status") != "active":
                raise ValueError("active uploaded media is required")
            if media.get("source_quality") != "passed":
                raise ValueError("passed uploaded media is required")
            media_refs.append({"media_id": media_id})

        now = _now()
        build_id = _new_id("bld")
        build = self.repository.insert_pet_build(
            {
                "build_id": build_id,
                "user_id": user_id,
                "pet_id": pet_id,
                "build_type": "initial",
                "status": "queued",
                "input_media_refs": media_refs,
                "image_provider": provider,
                "image_model": image_model.strip(),
                "cost_cap_cents": 500,
                "cost_estimate_cents": 0,
                "user_approved": False,
                "approval_answers": {},
                "created_at": now,
                "updated_at": now,
            }
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": pet_id,
                "build_id": build.get("build_id") or build_id,
                "event_name": "pet_build_created",
                "event_properties": {"surface": "cloud_api", "image_provider": provider},
                "occurred_at": now,
            }
        )
        return {
            "build_id": build.get("build_id") or build_id,
            "user_visible_status": "queued",
            "created_at": build.get("created_at") or now,
        }

    def get_cloud_build(self, *, session_id: str, build_id: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        _require_text(build_id, "build_id")
        build = self.repository.find_pet_build(user_id=str(session["user_id"]), build_id=build_id)
        if build is None:
            raise PermissionError("build not found or not allowed")
        status = str(build.get("status") or "")
        return {
            "build_id": str(build.get("build_id") or build_id),
            "pet_id": str(build.get("pet_id") or ""),
            "user_visible_status": status or "unknown",
            "preview_available": status in {"preview_ready", "approved", "package_ready"},
            "user_approved": bool(build.get("user_approved")),
            "updated_at": build.get("updated_at"),
            "failure_code": _optional_text(build.get("failure_code")),
            "failure_reason": _safe_optional_failure_reason(build.get("failure_reason")),
        }

    def approve_cloud_build(
        self,
        *,
        session_id: str,
        build_id: str,
        approval_answers: dict[str, Any],
    ) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        user_id = str(session["user_id"])
        _require_text(build_id, "build_id")
        build = self.repository.find_pet_build(user_id=user_id, build_id=build_id)
        if build is None:
            raise PermissionError("build not found or not allowed")
        assert_can_approve_build(build_status=str(build.get("status") or ""), approval_answers=approval_answers)
        package = self.repository.find_pet_package_by_build(user_id=user_id, build_id=build_id)
        if package is None:
            raise ValueError("generated package is not ready")
        if package.get("status") not in {"created", "validated", "approved_for_download"}:
            raise ValueError("generated package is not ready")
        actions = package.get("allowed_actions") or ALLOWED_ACTIONS
        assert_can_download_package(
            package_status="approved_for_download",
            package_user_approved=True,
            entitlement_status="active",
            checksum_valid=bool(package.get("package_sha256")),
            runtime_compatible=bool(package.get("runtime_version")),
            allowed_actions=actions,
        )

        now = _now()
        approved_build = self.repository.update_pet_build_approval(
            build_id=build_id,
            status="approved",
            user_approved=True,
            approval_answers=dict(approval_answers),
            approved_at=now,
            updated_at=now,
        )
        approved_package = self.repository.update_pet_package_approval(
            package_id=str(package["package_id"]),
            status="approved_for_download",
            user_approved=True,
            updated_at=now,
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": approved_build.get("pet_id") or build.get("pet_id"),
                "build_id": build_id,
                "package_id": approved_package.get("package_id") or package.get("package_id"),
                "event_name": "pet_build_approved",
                "event_properties": {"surface": "cloud_api"},
                "occurred_at": now,
            }
        )
        return {
            "build_id": build_id,
            "package_id": approved_package.get("package_id") or package.get("package_id"),
            "user_visible_status": "package_ready",
            "approved_at": approved_build.get("approved_at") or now,
        }

    def delete_cloud_media(self, *, session_id: str, media_id: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        _require_text(media_id, "media_id")
        user_id = str(session["user_id"])
        media = self.repository.find_pet_media_by_id(media_id=media_id)
        if media is None:
            raise PermissionError("media not found or not allowed")
        if str(media.get("user_id") or "") != user_id:
            raise PermissionError("media not found or not allowed")
        deletion_status = str(media.get("deletion_status") or "active")
        if deletion_status == "deleted":
            return {
                "media_id": str(media.get("media_id") or media_id),
                "user_visible_status": "deleted",
                "deletion_status": "deleted",
            }
        if deletion_status == "active":
            self.repository.update_pet_media_deletion(
                media_id=media_id,
                deletion_status="deletion_requested",
                deleted_at=None,
                updated_at=_now(),
            )
        if self.media_deleter is None:
            raise ValueError("cloud media deletion storage is not configured")
        if str(media.get("bucket") or "") != str(self.media_deleter.bucket):
            raise ValueError("media storage bucket does not match configured storage")

        self.media_deleter.delete_object(object_key=str(media.get("object_key") or ""))
        now = _now()
        updated = self.repository.update_pet_media_deletion(
            media_id=media_id,
            deletion_status="deleted",
            deleted_at=now,
            updated_at=now,
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": media["pet_id"],
                "event_name": "pet_media_deleted",
                "event_properties": {
                    "surface": "cloud_api",
                    "media_id": media_id,
                },
                "occurred_at": now,
            }
        )
        return {
            "media_id": updated.get("media_id") or media_id,
            "user_visible_status": "deleted",
            "deletion_status": updated.get("deletion_status") or "deleted",
            "updated_at": updated.get("updated_at") or now,
        }

    def create_cloud_package_download_url(self, *, session_id: str, package_id: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        package = self._require_downloadable_cloud_package(
            user_id=str(session["user_id"]),
            package_id=package_id,
        )
        _assert_entitlement_available(
            self.repository.find_active_entitlements(user_id=str(session["user_id"])),
            entitlement_type="package_download",
        )
        if self.package_downloads is None:
            raise ValueError("cloud package download storage is not configured")
        if str(package.get("bucket") or "") != str(self.package_downloads.bucket):
            raise ValueError("package storage bucket does not match configured storage")
        signed = self.package_downloads.create_download_url(
            object_key=str(package.get("object_key") or ""),
            expires_in_seconds=UPLOAD_URL_TTL_SECONDS,
        )
        return {
            "package_id": package_id,
            "user_visible_status": "download_ready",
            "download": {
                "method": signed["method"],
                "url": signed["url"],
                "expires_at": signed["expires_at"],
            },
            "checksum_sha256": package["package_sha256"],
            "runtime_version": package["runtime_version"],
        }

    def delete_cloud_package(self, *, session_id: str, package_id: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        _require_text(package_id, "package_id")
        user_id = str(session["user_id"])
        package = self.repository.find_pet_package(package_id=package_id)
        if package is None or str(package.get("user_id") or "") != user_id:
            raise PermissionError("package not found or not allowed")
        if package.get("status") == "deleted":
            return {
                "package_id": package_id,
                "user_visible_status": "deleted",
                "deletion_status": "deleted",
            }
        now = _now()
        updated = self.repository.update_pet_package_deletion(
            package_id=package_id,
            status="deleted",
            deleted_at=now,
            updated_at=now,
        )
        self.repository.insert_event(
            {
                "event_id": _new_id("evt"),
                "user_id": user_id,
                "pet_id": package.get("pet_id"),
                "build_id": package.get("build_id"),
                "package_id": package_id,
                "event_name": "pet_package_deleted",
                "event_properties": {"surface": "cloud_api"},
                "occurred_at": now,
            }
        )
        return {
            "package_id": updated.get("package_id") or package_id,
            "user_visible_status": "deleted",
            "deletion_status": "deleted",
            "updated_at": updated.get("updated_at") or now,
        }

    def check_cloud_package_runtime(self, *, session_id: str, package_id: str) -> dict[str, Any]:
        session = self._require_active_session(session_id=session_id)
        package = self._require_downloadable_cloud_package(
            user_id=str(session["user_id"]),
            package_id=package_id,
        )
        entitlement = _assert_entitlement_available(
            self.repository.find_active_entitlements(user_id=str(session["user_id"])),
            entitlement_type="package_runtime_access",
        )
        return {
            "package_id": package_id,
            "user_visible_status": "runtime_access_granted",
            "runtime_access_expires_at": entitlement.get("expires_at"),
            "checksum_sha256": package["package_sha256"],
            "runtime_version": package["runtime_version"],
        }

    def _require_active_session(self, *, session_id: str) -> dict[str, Any]:
        _require_text(session_id, "session_id")
        session = self.repository.find_api_session(session_id=session_id)
        if session is None:
            raise PermissionError("session not found or not allowed")
        if session.get("status") != "active" or _is_expired(str(session.get("expires_at") or ""), _now()):
            raise PermissionError("session is not active")
        return session

    def _require_downloadable_cloud_package(self, *, user_id: str, package_id: str) -> dict[str, Any]:
        _require_text(package_id, "package_id")
        package = self.repository.find_pet_package(package_id=package_id)
        if package is None or str(package.get("user_id") or "") != user_id:
            raise PermissionError("package not found or not allowed")
        if package.get("status") != "approved_for_download" or package.get("user_approved") is not True:
            raise ValueError("approved package is required")
        checksum = str(package.get("package_sha256") or "")
        if re.fullmatch(r"[0-9a-f]{64}", checksum) is None:
            raise ValueError("package checksum is invalid")
        _require_text(str(package.get("runtime_version") or ""), "runtime_version")
        _require_text(str(package.get("object_key") or ""), "object_key")
        return package


def _require_text(value: str | None, field_name: str) -> None:
    if not str(value or "").strip():
        raise ValueError(f"{field_name} is required")


def _require_uuid(value: str, field_name: str) -> None:
    try:
        uuid.UUID(value)
    except ValueError as exc:
        raise ValueError(f"{field_name} must be a valid UUID") from exc


def _optional_text(value: str | None) -> str | None:
    if value is None:
        return None
    cleaned = str(value).strip()
    return cleaned or None


def _safe_optional_failure_reason(value: Any) -> str | None:
    reason = _optional_text(value)
    if reason is None:
        return None
    lowered = reason.lower()
    if "/users/" in lowered or "trace" in lowered or "stack" in lowered or "key" in lowered:
        return "Generation failed. Please try again."
    return reason[:160]


def _new_id(prefix: str) -> str:
    return f"{prefix}_{uuid.uuid4().hex}"


def _now() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def _days_from_now(days: int) -> str:
    return (datetime.now(timezone.utc) + timedelta(days=days)).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def _assert_active_user(user: dict[str, Any]) -> None:
    if user.get("status") not in {None, "active"}:
        raise PermissionError("user is not active")


def _assert_invite_can_be_redeemed(invite: dict[str, Any], *, now: str) -> None:
    if invite.get("status") != "active":
        raise ValueError("invite is not active")
    if _is_expired(str(invite.get("expires_at") or ""), now):
        raise ValueError("invite has expired")
    if int(invite.get("used_count") or 0) >= int(invite.get("max_uses") or 1):
        raise ValueError("invite has no remaining uses")


def _hash_invite_code(code: str) -> str:
    return hashlib.sha256(code.strip().encode("utf-8")).hexdigest()


def _summarize_entitlements(entitlements: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
    summary: dict[str, dict[str, Any]] = {}
    for entitlement in entitlements:
        entitlement_type = str(entitlement.get("type") or "")
        if not entitlement_type:
            continue
        quantity_total = int(entitlement.get("quantity_total") or 0)
        quantity_used = int(entitlement.get("quantity_used") or 0)
        remaining = max(quantity_total - quantity_used, 0)
        current = summary.setdefault(
            entitlement_type,
            {
                "status": "active",
                "quantity_total": 0,
                "quantity_used": 0,
                "remaining": 0,
                "expires_at": entitlement.get("expires_at"),
            },
        )
        current["quantity_total"] = int(current["quantity_total"]) + quantity_total
        current["quantity_used"] = int(current["quantity_used"]) + quantity_used
        current["remaining"] = int(current["remaining"]) + remaining
        if entitlement.get("expires_at") and (
            not current.get("expires_at") or str(entitlement["expires_at"]) > str(current["expires_at"])
        ):
            current["expires_at"] = entitlement["expires_at"]
    return summary


def _assert_entitlement_available(entitlements: list[dict[str, Any]], *, entitlement_type: str) -> dict[str, Any]:
    now = _now()
    for entitlement in entitlements:
        if entitlement.get("type") != entitlement_type:
            continue
        if entitlement.get("status") != "active":
            continue
        if _is_expired(str(entitlement.get("expires_at") or ""), now):
            continue
        if int(entitlement.get("quantity_used") or 0) < int(entitlement.get("quantity_total") or 0):
            return entitlement
    raise PermissionError(f"active {entitlement_type} entitlement is required")


def _monthly_entitlement_rows(
    *,
    user_id: str,
    subscription_id: str,
    order_id: str | None,
    pet_id: str | None,
    source_type: str,
    source_id: str,
    starts_at: str,
    expires_at: str,
    now: str,
) -> list[dict[str, Any]]:
    rows = []
    for entitlement_type, quantity in MONTHLY_ENTITLEMENT_QUANTITIES.items():
        rows.append(
            {
                "entitlement_id": _new_id("ent"),
                "user_id": user_id,
                "subscription_id": subscription_id,
                "order_id": order_id,
                "pet_id": pet_id,
                "type": entitlement_type,
                "status": "active",
                "quantity_total": quantity,
                "quantity_used": 0,
                "source_type": source_type,
                "source_id": source_id,
                "starts_at": starts_at,
                "expires_at": expires_at,
                "created_at": now,
                "updated_at": now,
            }
        )
    return rows


def _safe_stripe_event_payload(event: dict[str, Any]) -> dict[str, Any]:
    obj = _stripe_event_object(event)
    return {
        "event_id": event.get("id"),
        "event_type": event.get("type"),
        "object_id": obj.get("id"),
        "checkout_session_id": obj.get("id") if obj.get("object") == "checkout.session" else None,
        "subscription_id": obj.get("subscription"),
        "amount_total_cents": obj.get("amount_total"),
        "currency": obj.get("currency"),
    }


def _stripe_event_object(event: dict[str, Any]) -> dict[str, Any]:
    data = event.get("data")
    if not isinstance(data, dict) or not isinstance(data.get("object"), dict):
        raise ValueError("stripe webhook event object is invalid")
    return data["object"]


def _stripe_checkout_user_id(checkout_session: dict[str, Any]) -> str:
    metadata = checkout_session.get("metadata") if isinstance(checkout_session.get("metadata"), dict) else {}
    user_id = str(metadata.get("user_id") or checkout_session.get("client_reference_id") or "").strip()
    _require_text(user_id, "user_id")
    return user_id


def _stripe_checkout_plan_code(checkout_session: dict[str, Any]) -> str:
    metadata = checkout_session.get("metadata") if isinstance(checkout_session.get("metadata"), dict) else {}
    return str(metadata.get("plan_code") or DEFAULT_PLAN_CODE).strip() or DEFAULT_PLAN_CODE


def _stripe_checkout_client_surface(checkout_session: dict[str, Any]) -> str:
    metadata = checkout_session.get("metadata") if isinstance(checkout_session.get("metadata"), dict) else {}
    client_surface = str(metadata.get("client_surface") or "web").strip()
    return client_surface if client_surface in ALLOWED_CLIENT_SURFACES else "web"


def _is_expired(expires_at: str, now: str) -> bool:
    return bool(expires_at and expires_at <= now)


_ALIYUN_OSS_ENV_NAMES = (
    "ALIYUN_OSS_ENDPOINT",
    "ALIYUN_OSS_BUCKET",
    "ALIYUN_OSS_REGION",
    "ALIYUN_OSS_ACCESS_KEY_ID",
    "ALIYUN_OSS_ACCESS_KEY_SECRET",
)

_STRIPE_CHECKOUT_ENV_NAMES = (
    "STRIPE_SECRET_KEY",
    "STRIPE_PRICE_ID_SIXXIE_MONTHLY_LAUNCH",
)


def _cloud_media_object_key(
    *,
    user_id: str,
    pet_id: str,
    media_id: str,
    original_filename: str | None,
    mime_type: str,
) -> str:
    suffix = _safe_upload_suffix(original_filename=original_filename, mime_type=mime_type)
    return (
        f"users/{_safe_segment(user_id)}/pets/{_safe_segment(pet_id)}/"
        f"media/{_safe_segment(media_id)}/source{suffix}"
    )


def _safe_upload_suffix(*, original_filename: str | None, mime_type: str) -> str:
    filename = str(original_filename or "")
    suffix = ""
    if "." in filename:
        suffix = "." + filename.rsplit(".", 1)[-1].lower()
    if suffix in {".png", ".jpg", ".jpeg", ".webp", ".gif"}:
        return suffix
    return {
        "image/png": ".png",
        "image/jpeg": ".jpg",
        "image/webp": ".webp",
        "image/gif": ".gif",
    }.get(mime_type, ".img")


def _safe_segment(value: str) -> str:
    cleaned = "".join(ch for ch in str(value) if ch.isalnum() or ch in {"_", "-"}).strip("._-")
    return cleaned or "item"
