93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
|
|
import requests
|
|
|
|
TURNSTILE_VERIFY_URL = "https://challenges.cloudflare.com/turnstile/v0/siteverify"
|
|
DEFAULT_TURNSTILE_TEST_SITE_KEY = "1x00000000000000000000AA"
|
|
DEFAULT_TURNSTILE_TEST_SECRET_KEY = "1x0000000000000000000000000000000AA"
|
|
ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY = "2x00000000000000000000AB"
|
|
ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY = "2x0000000000000000000000000000000AA"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TurnstileSettings:
|
|
site_key: str
|
|
secret_key: str
|
|
enabled: bool
|
|
uses_test_keys: bool
|
|
|
|
|
|
def _environment() -> str:
|
|
return os.getenv("APP_ENV", os.getenv("ENVIRONMENT", "development")).lower()
|
|
|
|
|
|
def load_turnstile_settings() -> TurnstileSettings:
|
|
site_key = os.getenv("TURNSTILE_SITE_KEY", "")
|
|
secret_key = os.getenv("TURNSTILE_SECRET_KEY", "")
|
|
enabled = os.getenv("TURNSTILE_ENABLED", "true").lower() not in {"0", "false", "no"}
|
|
env = _environment()
|
|
|
|
known_test_pairs = {
|
|
(DEFAULT_TURNSTILE_TEST_SITE_KEY, DEFAULT_TURNSTILE_TEST_SECRET_KEY),
|
|
(ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY, ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY),
|
|
}
|
|
|
|
if env == "test":
|
|
if (site_key, secret_key) not in known_test_pairs:
|
|
if site_key or secret_key:
|
|
logger.info("Ignoring configured Turnstile credentials in test environment and using test keys")
|
|
site_key = DEFAULT_TURNSTILE_TEST_SITE_KEY
|
|
secret_key = DEFAULT_TURNSTILE_TEST_SECRET_KEY
|
|
elif not site_key or not secret_key:
|
|
if env == "development":
|
|
site_key = site_key or DEFAULT_TURNSTILE_TEST_SITE_KEY
|
|
secret_key = secret_key or DEFAULT_TURNSTILE_TEST_SECRET_KEY
|
|
else:
|
|
raise RuntimeError("Turnstile keys must be configured outside development/test environments")
|
|
|
|
uses_test_keys = site_key == DEFAULT_TURNSTILE_TEST_SITE_KEY and secret_key == DEFAULT_TURNSTILE_TEST_SECRET_KEY
|
|
return TurnstileSettings(
|
|
site_key=site_key,
|
|
secret_key=secret_key,
|
|
enabled=enabled,
|
|
uses_test_keys=uses_test_keys,
|
|
)
|
|
|
|
|
|
def verify_turnstile_token(token: str, remote_ip: str | None = None) -> bool:
|
|
settings = load_turnstile_settings()
|
|
if not settings.enabled:
|
|
return True
|
|
if not token.strip():
|
|
return False
|
|
if _environment() == "test":
|
|
if (
|
|
settings.site_key == ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY
|
|
and settings.secret_key == ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY
|
|
):
|
|
return False
|
|
if settings.uses_test_keys:
|
|
return True
|
|
try:
|
|
response = requests.post(
|
|
TURNSTILE_VERIFY_URL,
|
|
data={
|
|
"secret": settings.secret_key,
|
|
"response": token,
|
|
"remoteip": remote_ip or "",
|
|
},
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
except (requests.RequestException, ValueError) as exc:
|
|
logger.warning("Turnstile verification failed: %s", exc)
|
|
return False
|
|
return bool(payload.get("success"))
|