from __future__ import annotations

import base64
import json
import os
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen

from packages.pet_package_schema import DEFAULT_FRAMES_PER_ACTION
from services.ai.apimart_image_client import (
    _action_visual_review_requirements,
    _build_action_frame_prompt,
    _build_action_strip_prompt,
    _build_canonical_pet_prompt,
    _chat_completions_url,
    _parse_review_content,
    _sha256_text,
)


class OpenAIConfigError(RuntimeError):
    pass


class OpenAIRequestError(RuntimeError):
    pass


@dataclass
class OpenAIConfig:
    api_key: str
    base_url: str
    model_image: str
    model_image_edit: str
    model_review: str
    quality: str = "low"
    size: str = "1024x1024"
    request_timeout_seconds: float = 240.0
    max_retries: int = 4

    @property
    def masked_api_key(self) -> str:
        return _mask_secret(self.api_key)

    def __repr__(self) -> str:
        return (
            "OpenAIConfig("
            f"api_key={self.masked_api_key!r}, "
            f"base_url={self.base_url!r}, "
            f"model_image={self.model_image!r}, "
            f"model_image_edit={self.model_image_edit!r}, "
            f"model_review={self.model_review!r}, "
            f"quality={self.quality!r}, "
            f"size={self.size!r})"
        )


def load_openai_config(env_path: str | Path = ".env.local") -> OpenAIConfig:
    env_values = _read_env_file(Path(env_path))
    merged = {
        **env_values,
        **{
            key: value
            for key, value in os.environ.items()
            if key.startswith("OPENAI_")
        },
    }
    api_key = merged.get("OPENAI_API_KEY", "").strip()
    base_url = merged.get("OPENAI_BASE_URL", "https://api.openai.com/v1").strip().rstrip("/")
    model_image = merged.get("OPENAI_MODEL_IMAGE", "gpt-image-2").strip()
    model_image_edit = merged.get("OPENAI_MODEL_IMAGE_EDIT", model_image).strip()
    model_review = merged.get("OPENAI_MODEL_REVIEW", "gpt-4.1-mini").strip()
    if not api_key or not base_url or not model_image or not model_image_edit or not model_review:
        raise OpenAIConfigError(
            "OpenAI config is incomplete. Set OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL_IMAGE, OPENAI_MODEL_IMAGE_EDIT, and OPENAI_MODEL_REVIEW."
        )
    return OpenAIConfig(
        api_key=api_key,
        base_url=base_url,
        model_image=model_image,
        model_image_edit=model_image_edit,
        model_review=model_review,
        quality=merged.get("OPENAI_IMAGE_QUALITY", "low").strip() or "low",
        size=_normalize_openai_size(merged.get("OPENAI_IMAGE_SIZE", "1024x1024")),
        request_timeout_seconds=_env_float(merged, "OPENAI_IMAGE_REQUEST_TIMEOUT_SECONDS", 240.0),
        max_retries=max(0, int(_env_float(merged, "OPENAI_IMAGE_MAX_RETRIES", 4))),
    )


class OpenAIImageGenerationClient:
    provider = "openai"

    def __init__(
        self,
        *,
        api_key: str,
        base_url: str,
        model_image: str = "gpt-image-2",
        model_image_edit: str = "gpt-image-2",
        quality: str = "low",
        size: str = "1024x1024",
        opener: Callable[..., Any] = urlopen,
        sleep: Callable[[float], Any] = time.sleep,
        clock: Callable[[], float] = time.monotonic,
        boundary_factory: Callable[[], str] | None = None,
        request_timeout_seconds: float = 240.0,
        max_retries: int = 4,
        reference_limit: int = 4,
    ) -> None:
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model_image = model_image
        self.model_image_edit = model_image_edit
        self.model = model_image
        self.quality = quality
        self.size = _normalize_openai_size(size)
        self.opener = opener
        self.sleep = sleep
        self.clock = clock
        self.boundary_factory = boundary_factory or (lambda: f"----ai-pet-openai-{uuid.uuid4().hex}")
        self.request_timeout_seconds = max(5.0, float(request_timeout_seconds))
        self.max_retries = max(0, int(max_retries))
        self.reference_limit = max(1, min(int(reference_limit), 16))

    @classmethod
    def from_env(
        cls,
        env_path: str | Path = ".env.local",
        *,
        opener: Callable[..., Any] = urlopen,
    ) -> "OpenAIImageGenerationClient":
        config = load_openai_config(env_path)
        return cls(
            api_key=config.api_key,
            base_url=config.base_url,
            model_image=config.model_image,
            model_image_edit=config.model_image_edit,
            quality=config.quality,
            size=config.size,
            opener=opener,
            request_timeout_seconds=config.request_timeout_seconds,
            max_retries=config.max_retries,
        )

    def generate_canonical_pet(
        self,
        *,
        pet_name: str,
        notes: str,
        analysis: dict[str, Any],
        images: list[dict[str, Any]],
        size: str = "1024*1024",
    ) -> dict[str, Any]:
        if not images:
            raise ValueError("at least one image is required for OpenAI image reference generation")
        prompt = _build_canonical_pet_prompt(pet_name=pet_name, notes=notes, analysis=analysis)
        image_parts = [
            _image_part(
                field_name="image[]",
                filename=f"reference-{index + 1}.png",
                image_bytes=image["bytes"],
                mime=image["mime"],
            )
            for index, image in enumerate(images[: self.reference_limit])
        ]
        return self._edit_image(
            prompt=prompt,
            image_parts=image_parts,
            size=size,
            model=self.model_image_edit,
        )

    def generate_action_frame(
        self,
        *,
        action: str,
        frame_index: int,
        candidate_index: int = 0,
        pet_name: str,
        notes: str,
        analysis: dict[str, Any],
        images: list[dict[str, Any]],
        canonical_image_bytes: bytes,
        previous_frame_image_bytes: bytes | None = None,
        size: str = "1024*1024",
    ) -> dict[str, Any]:
        if not canonical_image_bytes:
            raise ValueError("canonical image is required for OpenAI action generation")
        if frame_index < 0 or frame_index >= DEFAULT_FRAMES_PER_ACTION:
            raise ValueError(f"frame_index must be between 0 and {DEFAULT_FRAMES_PER_ACTION - 1}")
        attach_previous_frame = bool(previous_frame_image_bytes) and action != "walk"
        prompt = _build_action_frame_prompt(
            action=action,
            frame_index=frame_index,
            candidate_index=candidate_index,
            pet_name=pet_name,
            notes=notes,
            analysis=analysis,
            has_previous_frame=attach_previous_frame,
        )
        image_parts = [
            _image_part(
                field_name="image[]",
                filename="canonical.png",
                image_bytes=canonical_image_bytes,
                mime="image/png",
            )
        ]
        if attach_previous_frame:
            image_parts.append(
                _image_part(
                    field_name="image[]",
                    filename="previous-frame.png",
                    image_bytes=previous_frame_image_bytes or b"",
                    mime="image/png",
                )
            )
        return self._edit_image(
            prompt=prompt,
            image_parts=image_parts,
            size=size,
            model=self.model_image_edit,
        )

    def generate_action_frame_minimal(
        self,
        *,
        action: str,
        frame_index: int,
        candidate_index: int = 0,
        pet_name: str,
        notes: str,
        analysis: dict[str, Any],
        canonical_image_bytes: bytes | None = None,
        size: str = "1024*1024",
    ) -> dict[str, Any]:
        if frame_index < 0 or frame_index >= DEFAULT_FRAMES_PER_ACTION:
            raise ValueError(f"frame_index must be between 0 and {DEFAULT_FRAMES_PER_ACTION - 1}")
        prompt = _build_action_frame_prompt(
            action=action,
            frame_index=frame_index,
            candidate_index=candidate_index,
            pet_name=pet_name,
            notes=notes,
            analysis=analysis,
            has_previous_frame=False,
        )
        if canonical_image_bytes:
            return self._edit_image(
                prompt=prompt,
                image_parts=[
                    _image_part(
                        field_name="image[]",
                        filename="canonical.png",
                        image_bytes=canonical_image_bytes,
                        mime="image/png",
                    )
                ],
                size=size,
                model=self.model_image_edit,
            )
        return self._generate_image(prompt=prompt, size=size, model=self.model_image)

    def generate_action_strip(
        self,
        *,
        action: str,
        candidate_index: int = 0,
        pet_name: str,
        notes: str,
        analysis: dict[str, Any],
        images: list[dict[str, Any]],
        canonical_image_bytes: bytes,
        layout_guide_image_bytes: bytes,
        size: str = "1280*720",
    ) -> dict[str, Any]:
        if not canonical_image_bytes:
            raise ValueError("canonical image is required for OpenAI action strip generation")
        if not layout_guide_image_bytes:
            raise ValueError("layout guide image is required for OpenAI action strip generation")
        prompt = _build_action_strip_prompt(
            action=action,
            candidate_index=candidate_index,
            pet_name=pet_name,
            notes=notes,
            analysis=analysis,
        )
        image_parts = [
            _image_part(
                field_name="image[]",
                filename="layout-guide.png",
                image_bytes=layout_guide_image_bytes,
                mime="image/png",
            ),
            _image_part(
                field_name="image[]",
                filename="canonical.png",
                image_bytes=canonical_image_bytes,
                mime="image/png",
            ),
        ]
        return self._edit_image(
            prompt=prompt,
            image_parts=image_parts,
            size=size,
            model=self.model_image_edit,
        )

    def _generate_image(self, *, prompt: str, size: str, model: str) -> dict[str, Any]:
        started_at = self.clock()
        payload = {
            "model": model,
            "prompt": prompt,
            "n": 1,
            "size": _normalize_openai_size(size, fallback=self.size),
            "quality": self.quality,
        }
        raw = self._post_json(f"{self.base_url}/images/generations", payload)
        return self._normalized_result(raw=raw, model=model, prompt=prompt, started_at=started_at)

    def _edit_image(self, *, prompt: str, image_parts: list[dict[str, Any]], size: str, model: str) -> dict[str, Any]:
        started_at = self.clock()
        fields = [
            ("model", model),
            ("prompt", prompt),
            ("n", "1"),
            ("size", _normalize_openai_size(size, fallback=self.size)),
            ("quality", self.quality),
        ]
        raw = self._post_multipart(f"{self.base_url}/images/edits", fields=fields, image_parts=image_parts)
        return self._normalized_result(raw=raw, model=model, prompt=prompt, started_at=started_at)

    def _normalized_result(self, *, raw: dict[str, Any], model: str, prompt: str, started_at: float) -> dict[str, Any]:
        image_bytes = _extract_b64_image(raw)
        usage = raw.get("usage", {}) if isinstance(raw.get("usage"), dict) else {}
        usage = dict(usage)
        usage["observed_duration_seconds"] = round(max(0.0, self.clock() - started_at), 3)
        return {
            "status": "ok",
            "provider": "openai",
            "model": model,
            "request_id": str(raw.get("id") or raw.get("created") or ""),
            "image_bytes": image_bytes,
            "image_mime": "image/png",
            "usage": usage,
            "prompt_sha256": _sha256_text(prompt),
        }

    def _post_json(self, url: str, payload: dict[str, Any]) -> dict[str, Any]:
        body = json.dumps(payload).encode("utf-8")
        for attempt in range(self.max_retries + 1):
            request = Request(
                url,
                data=body,
                method="POST",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                },
            )
            try:
                with self.opener(request, timeout=self.request_timeout_seconds) as response:
                    return json.loads(response.read().decode("utf-8"))
            except HTTPError as exc:
                if _transient_http_status(exc.code) and attempt < self.max_retries:
                    self.sleep(_http_retry_delay_seconds(exc, attempt=attempt))
                    continue
                raise OpenAIRequestError(f"OpenAI image request failed with HTTP {exc.code}") from exc
            except TimeoutError as exc:
                if attempt < self.max_retries:
                    self.sleep(1.0)
                    continue
                raise OpenAIRequestError("OpenAI image request timed out before receiving a response") from exc
            except URLError as exc:
                if attempt < self.max_retries:
                    self.sleep(1.0)
                    continue
                raise OpenAIRequestError("OpenAI image request failed before receiving a response") from exc
            except json.JSONDecodeError as exc:
                raise OpenAIRequestError("OpenAI image response was not valid JSON") from exc
        raise OpenAIRequestError("OpenAI image request failed before receiving a response")

    def _post_multipart(self, url: str, *, fields: list[tuple[str, str]], image_parts: list[dict[str, Any]]) -> dict[str, Any]:
        body, content_type = _encode_multipart(
            fields=fields,
            image_parts=image_parts,
            boundary=self.boundary_factory(),
        )
        for attempt in range(self.max_retries + 1):
            request = Request(
                url,
                data=body,
                method="POST",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": content_type,
                },
            )
            try:
                with self.opener(request, timeout=self.request_timeout_seconds) as response:
                    return json.loads(response.read().decode("utf-8"))
            except HTTPError as exc:
                if _transient_http_status(exc.code) and attempt < self.max_retries:
                    self.sleep(_http_retry_delay_seconds(exc, attempt=attempt))
                    continue
                raise OpenAIRequestError(f"OpenAI image edit request failed with HTTP {exc.code}") from exc
            except TimeoutError as exc:
                if attempt < self.max_retries:
                    self.sleep(1.0)
                    continue
                raise OpenAIRequestError("OpenAI image edit request timed out before receiving a response") from exc
            except URLError as exc:
                if attempt < self.max_retries:
                    self.sleep(1.0)
                    continue
                raise OpenAIRequestError("OpenAI image edit request failed before receiving a response") from exc
            except json.JSONDecodeError as exc:
                raise OpenAIRequestError("OpenAI image edit response was not valid JSON") from exc
        raise OpenAIRequestError("OpenAI image edit request failed before receiving a response")


class OpenAIVisualReviewClient:
    provider = "openai"

    def __init__(
        self,
        *,
        api_key: str,
        base_url: str,
        model: str,
        opener: Callable[..., Any] = urlopen,
        request_timeout_seconds: float = 60.0,
    ) -> None:
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.opener = opener
        self.request_timeout_seconds = max(5.0, float(request_timeout_seconds))

    @classmethod
    def from_env(
        cls,
        env_path: str | Path = ".env.local",
        *,
        opener: Callable[..., Any] = urlopen,
    ) -> "OpenAIVisualReviewClient":
        config = load_openai_config(env_path)
        return cls(
            api_key=config.api_key,
            base_url=config.base_url,
            model=config.model_review,
            opener=opener,
            request_timeout_seconds=config.request_timeout_seconds,
        )

    def review_action_frames(
        self,
        *,
        pet_name: str,
        action: str,
        image_bytes: bytes,
        image_mime: str,
    ) -> dict[str, Any]:
        data_url = _to_data_url(image_bytes, image_mime)
        payload = {
            "model": self.model,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You review one generated action row for a memorial desktop pet. Return compact JSON only. "
                        "Check same pet identity, full-body visibility, clean background removal readiness, and action semantics. "
                        "Reject wrong species, unrelated simplified animals, cropped or broken bodies, or unclear requested action."
                    ),
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": (
                                "Please review one generated action row. "
                                f"Pet display name: {pet_name}. "
                                f"Action to verify: {action}. "
                                f"The {DEFAULT_FRAMES_PER_ACTION} frames must look like the same pet identity and visibly match the action semantics. "
                                f"{_action_visual_review_requirements(action)} "
                                "Return JSON with keys score (0-1), passed (boolean), action_ok (boolean), identity_ok (boolean), "
                                f"notes (short), risks (array). For walk, also return front_paw_sequence ({DEFAULT_FRAMES_PER_ACTION} short strings), "
                                f"hind_paw_sequence ({DEFAULT_FRAMES_PER_ACTION} short strings), gait_cycle_ok (boolean), two_frame_loop_ok (boolean), "
                                "and bad_frames (array of 1-based frame numbers)."
                            ),
                        },
                        {"type": "image_url", "image_url": {"url": data_url}},
                    ],
                },
            ],
            "temperature": 0,
            "response_format": {"type": "json_object"},
        }
        raw = self._post_chat_completions(payload)
        content = _message_content(raw)
        return {
            "status": "ok",
            "provider": self.provider,
            "model": self.model,
            "response_id": raw.get("id"),
            "review": _parse_review_content(content),
        }

    def review_contact_sheet(
        self,
        *,
        pet_name: str,
        image_bytes: bytes,
        image_mime: str,
    ) -> dict[str, Any]:
        data_url = _to_data_url(image_bytes, image_mime)
        payload = {
            "model": self.model,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You review the final 6-row contact sheet for a memorial desktop pet. Return compact JSON only. "
                        "Reject packages with wrong-facing frames or action rows that do not match hard required labels. "
                        "Tail_wag may be a subtle low-disturbance row; do not reject solely because tail_wag resembles idle or look."
                    ),
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": (
                                "Please review the final 6-row contact sheet before installation. "
                                f"Pet display name: {pet_name}. "
                                "Rows are fixed: Row 1 idle, Row 2 sleep, Row 3 walk, Row 4 look, Row 5 sit, Row 6 tail_wag. "
                                f"Row 2 sleep: all {DEFAULT_FRAMES_PER_ACTION} frames must keep the same sleeping direction/orientation; reject any flipped, "
                                "opposite-facing, upright, or non-sleeping frame. minor breathing scale changes are not direction flips "
                                "when the head remains on the same side and the body is not mirrored. "
                                "Row 3 walk: this project intentionally uses a two-frame walk loop. "
                                f"All {DEFAULT_FRAMES_PER_ACTION} frames must face and walk right in side view; reject any left-facing, front-facing, sitting, "
                                "or loafing frame. Row 3 frames 1 and 2 must be two distinct walking key poses; "
                                f"Row 3 frames 3 through {DEFAULT_FRAMES_PER_ACTION} intentionally repeat Row 3 frames 1 and 2 for looping. "
                                "Reject if frames 1 and 2 are the same paw stance or only body translation. "
                                "Row 4 look should use a stable body with head/eye direction changes. "
                                "Row 6 tail_wag may be subtle; a small visible tail or rear-body variation is acceptable for a calm desktop pet. "
                                "Do not reject solely because tail_wag resembles idle or look when identity, full-body visibility, "
                                "walk direction, and sleep direction are acceptable. "
                                "Also check same pet identity, full-body visibility, calm low-disturbance tone, and no text/scenery. "
                                "Return JSON with keys score (0-1), passed (boolean), identity_ok (boolean), "
                                "row_semantics_ok (boolean), direction_ok (boolean), row_distinct_ok (boolean), "
                                "gait_cycle_ok (boolean for Row 3 walk), two_frame_loop_ok (boolean for Row 3 walk), "
                                "repair_actions (array of action names needing regeneration), notes (short), risks (array)."
                            ),
                        },
                        {"type": "image_url", "image_url": {"url": data_url}},
                    ],
                },
            ],
            "temperature": 0,
            "response_format": {"type": "json_object"},
        }
        raw = self._post_chat_completions(payload)
        content = _message_content(raw)
        return {
            "status": "ok",
            "provider": self.provider,
            "model": self.model,
            "response_id": raw.get("id"),
            "review": _parse_review_content(content),
        }

    def _post_chat_completions(self, payload: dict[str, Any]) -> dict[str, Any]:
        body = json.dumps(payload).encode("utf-8")
        request = Request(
            _chat_completions_url(self.base_url),
            data=body,
            method="POST",
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
            },
        )
        try:
            with self.opener(request, timeout=self.request_timeout_seconds) as response:
                return json.loads(response.read().decode("utf-8"))
        except HTTPError as exc:
            raise OpenAIRequestError(f"OpenAI review request failed with HTTP {exc.code}") from exc
        except TimeoutError as exc:
            raise OpenAIRequestError("OpenAI review request timed out before receiving a response") from exc
        except URLError as exc:
            raise OpenAIRequestError("OpenAI review request failed before receiving a response") from exc
        except json.JSONDecodeError as exc:
            raise OpenAIRequestError("OpenAI review response was not valid JSON") from exc


def _image_part(*, field_name: str, filename: str, image_bytes: bytes, mime: str) -> dict[str, Any]:
    if not isinstance(image_bytes, bytes) or not image_bytes:
        raise ValueError(f"{filename} image bytes are required")
    return {
        "field_name": field_name,
        "filename": filename,
        "bytes": image_bytes,
        "mime": mime or "image/png",
    }


def _transient_http_status(status_code: int) -> bool:
    return int(status_code) in {408, 409, 429, 500, 502, 503, 504}


def _http_retry_delay_seconds(exc: HTTPError, *, attempt: int) -> float:
    retry_after = ""
    headers = getattr(exc, "headers", None)
    if headers is not None:
        retry_after = str(headers.get("Retry-After", "") or "").strip()
    try:
        if retry_after:
            return max(1.0, min(float(retry_after), 10.0))
    except ValueError:
        pass
    return min(2.0**max(0, attempt), 10.0)


def _encode_multipart(*, fields: list[tuple[str, str]], image_parts: list[dict[str, Any]], boundary: str) -> tuple[bytes, str]:
    chunks: list[bytes] = []
    for name, value in fields:
        chunks.extend(
            [
                f"--{boundary}\r\n".encode("ascii"),
                f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode("utf-8"),
                str(value).encode("utf-8"),
                b"\r\n",
            ]
        )
    for part in image_parts:
        chunks.extend(
            [
                f"--{boundary}\r\n".encode("ascii"),
                (
                    f'Content-Disposition: form-data; name="{part["field_name"]}"; '
                    f'filename="{part["filename"]}"\r\n'
                ).encode("utf-8"),
                f'Content-Type: {part["mime"]}\r\n\r\n'.encode("ascii"),
                part["bytes"],
                b"\r\n",
            ]
        )
    chunks.append(f"--{boundary}--\r\n".encode("ascii"))
    return b"".join(chunks), f"multipart/form-data; boundary={boundary}"


def _extract_b64_image(raw: dict[str, Any]) -> bytes:
    data = raw.get("data")
    if isinstance(data, list) and data:
        b64_json = data[0].get("b64_json") if isinstance(data[0], dict) else None
        if b64_json:
            try:
                return base64.b64decode(str(b64_json))
            except ValueError as exc:
                raise OpenAIRequestError("OpenAI image response contained invalid base64 image data") from exc
    raise OpenAIRequestError("OpenAI image response did not include b64_json image data")


def _message_content(raw: dict[str, Any]) -> str:
    content = raw.get("choices", [{}])[0].get("message", {}).get("content", "")
    if isinstance(content, list):
        parts = []
        for part in content:
            if isinstance(part, dict) and "text" in part:
                parts.append(str(part["text"]))
            else:
                parts.append(str(part))
        return "\n".join(parts)
    return str(content)


def _to_data_url(image_bytes: bytes, mime: str) -> str:
    return f"data:{mime};base64,{base64.b64encode(image_bytes).decode('ascii')}"


def _normalize_openai_size(size: str | None, *, fallback: str = "1024x1024") -> str:
    clean = str(size or "").strip().lower().replace("*", "x")
    if clean in {"1024x1024", "1024x1536", "1536x1024", "auto"}:
        return clean
    if "x" in clean:
        width_text, height_text = clean.split("x", 1)
        try:
            width = int(width_text)
            height = int(height_text)
        except ValueError:
            return fallback
        if width > height:
            return "1536x1024"
        if height > width:
            return "1024x1536"
        return "1024x1024"
    return fallback


def _read_env_file(path: Path) -> dict[str, str]:
    if not path.exists():
        return {}
    values: dict[str, str] = {}
    for raw_line in path.read_text(encoding="utf-8").splitlines():
        line = raw_line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        values[key.strip()] = value.strip().strip('"').strip("'")
    return values


def _env_float(values: dict[str, str], key: str, default: float) -> float:
    try:
        return float(str(values.get(key, default)).strip())
    except (TypeError, ValueError):
        return default


def _mask_secret(value: str) -> str:
    if len(value) < 12:
        return "***"
    return f"{value[:4]}...{value[-4:]}"
