#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import os
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable
from urllib.parse import urlparse


ROOT = Path(__file__).resolve().parents[1]
DEFAULT_ARTIFACT = ROOT / "outputs" / "installers" / "mac-beta-0.3" / "AI桌宠-mac-beta-0.3.dmg"
DEFAULT_PACKAGE_REPORT = ROOT / "outputs" / "installers" / "mac-beta-0.3" / "package-report.json"
DEFAULT_NOTARIZATION_REPORT = ROOT / "outputs" / "installers" / "mac-beta-0.3" / "notarization-report.json"
DEFAULT_LIVE_TEST_REPORT = ROOT / "outputs" / "test_reports" / "mac-beta-cloud-live-test.json"
DEFAULT_CLEAN_MAC_SMOKE_REPORT = ROOT / "outputs" / "test_reports" / "mac-beta-clean-mac-smoke.json"
DEFAULT_KEYCHAIN_PROFILE = "sixxie-notary"

CommandRunner = Callable[[list[str]], dict[str, Any]]


def main() -> int:
    parser = argparse.ArgumentParser(description="Verify Mac public beta release gates without exposing secrets.")
    parser.add_argument("--artifact", type=Path, default=DEFAULT_ARTIFACT)
    parser.add_argument("--package-report", type=Path, default=DEFAULT_PACKAGE_REPORT)
    parser.add_argument("--notarization-report", type=Path, default=DEFAULT_NOTARIZATION_REPORT)
    parser.add_argument("--live-test-report", type=Path, default=DEFAULT_LIVE_TEST_REPORT)
    parser.add_argument("--clean-mac-smoke-report", type=Path, default=DEFAULT_CLEAN_MAC_SMOKE_REPORT)
    parser.add_argument("--keychain-profile", default=DEFAULT_KEYCHAIN_PROFILE)
    parser.add_argument("--output-report", type=Path)
    args = parser.parse_args()

    report = verify_release_readiness(
        artifact=args.artifact.resolve(),
        package_report=args.package_report.resolve(),
        notarization_report=args.notarization_report.resolve(),
        live_test_report=args.live_test_report.resolve(),
        clean_mac_smoke_report=args.clean_mac_smoke_report.resolve(),
        keychain_profile=args.keychain_profile,
    )
    if args.output_report:
        args.output_report.parent.mkdir(parents=True, exist_ok=True)
        args.output_report.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
    print(json.dumps(report, ensure_ascii=False, indent=2))
    return 0 if report["ok"] else 1


def verify_release_readiness(
    *,
    artifact: Path,
    package_report: Path,
    notarization_report: Path,
    live_test_report: Path,
    clean_mac_smoke_report: Path,
    keychain_profile: str = DEFAULT_KEYCHAIN_PROFILE,
    env: dict[str, str] | None = None,
    command_runner: CommandRunner | None = None,
) -> dict[str, Any]:
    env = dict(os.environ if env is None else env)
    run = command_runner or _run
    package = _read_json(package_report)
    notarization = _read_json(notarization_report)
    live_test = _read_json(live_test_report)
    clean_smoke = _read_json(clean_mac_smoke_report)

    gates = [
        _gate("artifact_exists", artifact.exists(), reason=None if artifact.exists() else "missing"),
        _package_gate(package),
        _cloud_create_gate(package),
        _real_api_base_url_gate(env.get("SIXXIE_API_BASE_URL", ""), package),
        _notary_profile_gate(run, keychain_profile),
        _notarization_gate(notarization),
        _gatekeeper_gate(run, artifact),
        _provider_e2e_gate(live_test),
        _clean_mac_smoke_gate(clean_smoke),
    ]
    return {
        "ok": all(gate["ok"] for gate in gates),
        "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "artifact": _display_path(artifact),
        "package_report": _display_path(package_report),
        "notarization_report": _display_path(notarization_report),
        "live_test_report": _display_path(live_test_report),
        "clean_mac_smoke_report": _display_path(clean_mac_smoke_report),
        "keychain_profile": keychain_profile,
        "gates": gates,
    }


def _package_gate(package: dict[str, Any] | None) -> dict[str, Any]:
    if not package:
        return _gate("package_report_ok", False, reason="missing_or_invalid")
    return _gate(
        "package_report_ok",
        bool(package.get("ok")) and package.get("signing_status") == "developer_id",
        reason=None if package.get("ok") and package.get("signing_status") == "developer_id" else "package_not_developer_id_ready",
        details={
            "launch_mode": package.get("launch_mode"),
            "signing_status": package.get("signing_status"),
            "api_base_url_configured": bool(package.get("api_base_url_configured")),
            "api_base_url_host": package.get("api_base_url_host"),
        },
    )


def _cloud_create_gate(package: dict[str, Any] | None) -> dict[str, Any]:
    ok = bool(package) and package.get("launch_mode") == "cloud-create" and package.get("local_service_disabled") is True
    return _gate(
        "cloud_create_package_boundary",
        ok,
        reason=None if ok else "not_cloud_create_or_local_service_enabled",
        details={
            "launch_mode": package.get("launch_mode") if package else None,
            "local_service_disabled": package.get("local_service_disabled") if package else None,
        },
    )


def _real_api_base_url_gate(raw_url: str, package: dict[str, Any] | None) -> dict[str, Any]:
    value = raw_url.strip()
    if not value:
        return _gate("real_api_base_url", False, reason="missing")
    parsed = urlparse(value)
    hostname = (parsed.hostname or "").lower()
    placeholder = (
        parsed.scheme not in {"https"}
        or not hostname
        or hostname in {"localhost", "127.0.0.1", "::1"}
        or hostname.endswith(".example")
        or "example" in hostname
    )
    package_host = str((package or {}).get("api_base_url_host") or "").strip().lower()
    details = {"env_host": hostname, "package_host": package_host, "scheme": parsed.scheme}
    if not package_host:
        return _gate("real_api_base_url", False, reason="package_host_missing", details=details)
    if package_host != hostname:
        return _gate("real_api_base_url", False, reason="package_host_mismatch", details=details)
    return _gate(
        "real_api_base_url",
        not placeholder,
        reason=None if not placeholder else "placeholder_or_non_https",
        details=details,
    )


def _notary_profile_gate(run: CommandRunner, keychain_profile: str) -> dict[str, Any]:
    result = run(["xcrun", "notarytool", "history", "--keychain-profile", keychain_profile, "--output-format", "json"])
    ok = result.get("returncode") == 0
    return _gate(
        "notary_keychain_profile",
        ok,
        reason=None if ok else "missing_or_invalid_profile",
        details=_safe_command_result(result),
    )


def _notarization_gate(notarization: dict[str, Any] | None) -> dict[str, Any]:
    ok = bool(notarization) and notarization.get("ok") is True
    reason = None
    if not ok:
        reason = "missing_or_not_accepted" if not notarization else "notarization_failed"
    details: dict[str, Any] = {}
    if notarization:
        details = {
            "keychain_profile": notarization.get("keychain_profile"),
            "notarytool_returncode": _nested_returncode(notarization, "notarytool_submit"),
        }
    return _gate("notarization_accepted", ok, reason=reason, details=details)


def _gatekeeper_gate(run: CommandRunner, artifact: Path) -> dict[str, Any]:
    result = run(["spctl", "-a", "-vv", "-t", "install", str(artifact)])
    ok = result.get("returncode") == 0
    return _gate(
        "gatekeeper_accepts_dmg",
        ok,
        reason=None if ok else "gatekeeper_rejected",
        details=_safe_command_result(result),
    )


def _provider_e2e_gate(live_test: dict[str, Any] | None) -> dict[str, Any]:
    if not live_test:
        return _gate("real_provider_quality_e2e", False, reason="missing_report")
    status = live_test.get("status")
    provider = live_test.get("provider") or _find_value(live_test, "provider")
    ok = status == "ok" and provider == "openai"
    return _gate(
        "real_provider_quality_e2e",
        ok,
        reason=None if ok else f"status_{status or 'unknown'}",
        details={"status": status, "provider": provider},
    )


def _clean_mac_smoke_gate(clean_smoke: dict[str, Any] | None) -> dict[str, Any]:
    if not clean_smoke:
        return _gate("clean_mac_smoke", False, reason="missing_report")
    status = clean_smoke.get("status")
    return _gate("clean_mac_smoke", status == "ok", reason=None if status == "ok" else f"status_{status or 'unknown'}")


def _gate(name: str, ok: bool, *, reason: str | None = None, details: dict[str, Any] | None = None) -> dict[str, Any]:
    gate: dict[str, Any] = {"name": name, "ok": bool(ok)}
    if reason:
        gate["reason"] = reason
    if details:
        gate["details"] = details
    return gate


def _safe_command_result(result: dict[str, Any]) -> dict[str, Any]:
    return {
        "returncode": result.get("returncode"),
        "stderr_summary": _safe_summary(result.get("stderr", ""), redactions=_command_path_redactions(result.get("command", []))),
    }


def _safe_summary(value: Any, *, redactions: dict[str, str] | None = None) -> str:
    text = str(value or "")
    if not text:
        return ""
    for original, replacement in (redactions or {}).items():
        text = text.replace(original, replacement)
    secret_markers = ["sk-", "OPENAI_API_KEY", "STRIPE_SECRET", "SUPABASE_SERVICE_ROLE_KEY"]
    if any(marker in text for marker in secret_markers):
        return "redacted"
    return text[:240]


def _command_path_redactions(command: Any) -> dict[str, str]:
    redactions: dict[str, str] = {}
    if not isinstance(command, list):
        return redactions
    for item in command:
        if not isinstance(item, str):
            continue
        if "/" not in item:
            continue
        path = Path(item)
        redactions[item] = _display_path(path)
    return redactions


def _nested_returncode(payload: dict[str, Any], key: str) -> int | None:
    value = payload.get(key)
    return value.get("returncode") if isinstance(value, dict) else None


def _find_value(value: Any, key: str) -> Any:
    if isinstance(value, dict):
        if key in value:
            return value[key]
        for child in value.values():
            found = _find_value(child, key)
            if found is not None:
                return found
    if isinstance(value, list):
        for child in value:
            found = _find_value(child, key)
            if found is not None:
                return found
    return None


def _read_json(path: Path) -> dict[str, Any] | None:
    if not path.exists():
        return None
    try:
        value = json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None
    return value if isinstance(value, dict) else None


def _display_path(path: Path) -> str:
    try:
        return str(path.resolve().relative_to(ROOT))
    except ValueError:
        return path.name


def _run(command: list[str]) -> dict[str, Any]:
    completed = subprocess.run(command, cwd=ROOT, text=True, capture_output=True, check=False)
    return {
        "command": command,
        "returncode": completed.returncode,
        "stdout": completed.stdout.strip(),
        "stderr": completed.stderr.strip(),
    }


if __name__ == "__main__":
    raise SystemExit(main())
