feat(SEC-001): protect workspace bootstrap with turnstile
This commit is contained in:
@@ -2,3 +2,5 @@ APP_HOST=0.0.0.0
|
|||||||
APP_PORT=8000
|
APP_PORT=8000
|
||||||
REDIS_URL=redis://localhost:6379
|
REDIS_URL=redis://localhost:6379
|
||||||
CONFIG_PATH=/app/config/settings.yaml
|
CONFIG_PATH=/app/config/settings.yaml
|
||||||
|
TURNSTILE_SITE_KEY=1x00000000000000000000AA
|
||||||
|
TURNSTILE_SECRET_KEY=1x0000000000000000000000000000000AA
|
||||||
|
|||||||
@@ -47,7 +47,14 @@ jobs:
|
|||||||
pip install nicegui fastapi uvicorn yfinance polars pandas pydantic pyyaml
|
pip install nicegui fastapi uvicorn yfinance polars pandas pydantic pyyaml
|
||||||
pip list
|
pip list
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/test_pricing.py tests/test_strategies.py tests/test_portfolio.py -v --tb=short
|
run: |
|
||||||
|
pytest \
|
||||||
|
tests/test_pricing.py \
|
||||||
|
tests/test_strategies.py \
|
||||||
|
tests/test_portfolio.py \
|
||||||
|
tests/test_turnstile.py \
|
||||||
|
tests/test_workspace.py \
|
||||||
|
-v --tb=short
|
||||||
|
|
||||||
type-check:
|
type-check:
|
||||||
runs-on: [linux, docker]
|
runs-on: [linux, docker]
|
||||||
@@ -127,6 +134,8 @@ jobs:
|
|||||||
APP_ENV: production
|
APP_ENV: production
|
||||||
APP_NAME: Vault Dashboard
|
APP_NAME: Vault Dashboard
|
||||||
APP_PORT: "8000"
|
APP_PORT: "8000"
|
||||||
|
TURNSTILE_SITE_KEY: ${{ vars.TURNSTILE_SITE_KEY }}
|
||||||
|
TURNSTILE_SECRET_KEY: ${{ secrets.TURNSTILE_SECRET_KEY }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
|||||||
25
app/main.py
25
app/main.py
@@ -10,14 +10,15 @@ from contextlib import asynccontextmanager
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
|
from fastapi import FastAPI, Form, Request, WebSocket, WebSocketDisconnect
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.responses import RedirectResponse
|
from fastapi.responses import RedirectResponse, Response
|
||||||
from nicegui import ui # type: ignore[attr-defined]
|
from nicegui import ui # type: ignore[attr-defined]
|
||||||
|
|
||||||
import app.pages # noqa: F401
|
import app.pages # noqa: F401
|
||||||
from app.api.routes import router as api_router
|
from app.api.routes import router as api_router
|
||||||
from app.models.workspace import WORKSPACE_COOKIE, get_workspace_repository
|
from app.models.workspace import WORKSPACE_COOKIE, get_workspace_repository
|
||||||
|
from app.services import turnstile as turnstile_service
|
||||||
from app.services.cache import CacheService
|
from app.services.cache import CacheService
|
||||||
from app.services.data_service import DataService
|
from app.services.data_service import DataService
|
||||||
from app.services.runtime import set_data_service
|
from app.services.runtime import set_data_service
|
||||||
@@ -37,11 +38,14 @@ class Settings:
|
|||||||
websocket_interval_seconds: int = 5
|
websocket_interval_seconds: int = 5
|
||||||
nicegui_mount_path: str = "/"
|
nicegui_mount_path: str = "/"
|
||||||
nicegui_storage_secret: str = "vault-dash-dev-secret"
|
nicegui_storage_secret: str = "vault-dash-dev-secret"
|
||||||
|
turnstile_site_key: str = ""
|
||||||
|
turnstile_secret_key: str = ""
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls) -> Settings:
|
def load(cls) -> Settings:
|
||||||
cls._load_dotenv()
|
cls._load_dotenv()
|
||||||
origins = os.getenv("CORS_ORIGINS", "*")
|
origins = os.getenv("CORS_ORIGINS", "*")
|
||||||
|
turnstile = turnstile_service.load_turnstile_settings()
|
||||||
return cls(
|
return cls(
|
||||||
app_name=os.getenv("APP_NAME", cls.app_name),
|
app_name=os.getenv("APP_NAME", cls.app_name),
|
||||||
environment=os.getenv("APP_ENV", os.getenv("ENVIRONMENT", cls.environment)),
|
environment=os.getenv("APP_ENV", os.getenv("ENVIRONMENT", cls.environment)),
|
||||||
@@ -52,6 +56,8 @@ class Settings:
|
|||||||
websocket_interval_seconds=int(os.getenv("WEBSOCKET_INTERVAL_SECONDS", cls.websocket_interval_seconds)),
|
websocket_interval_seconds=int(os.getenv("WEBSOCKET_INTERVAL_SECONDS", cls.websocket_interval_seconds)),
|
||||||
nicegui_mount_path=os.getenv("NICEGUI_MOUNT_PATH", cls.nicegui_mount_path),
|
nicegui_mount_path=os.getenv("NICEGUI_MOUNT_PATH", cls.nicegui_mount_path),
|
||||||
nicegui_storage_secret=os.getenv("NICEGUI_STORAGE_SECRET", cls.nicegui_storage_secret),
|
nicegui_storage_secret=os.getenv("NICEGUI_STORAGE_SECRET", cls.nicegui_storage_secret),
|
||||||
|
turnstile_site_key=turnstile.site_key,
|
||||||
|
turnstile_secret_key=turnstile.secret_key,
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -149,7 +155,20 @@ async def health(request: Request) -> dict[str, Any]:
|
|||||||
|
|
||||||
|
|
||||||
@app.get("/workspaces/bootstrap", tags=["workspace"])
|
@app.get("/workspaces/bootstrap", tags=["workspace"])
|
||||||
async def bootstrap_workspace() -> RedirectResponse:
|
async def bootstrap_workspace_redirect() -> RedirectResponse:
|
||||||
|
return RedirectResponse(url="/", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/workspaces/bootstrap", tags=["workspace"])
|
||||||
|
async def bootstrap_workspace(
|
||||||
|
request: Request,
|
||||||
|
turnstile_response: str = Form(alias="cf-turnstile-response", default=""),
|
||||||
|
) -> Response:
|
||||||
|
if not turnstile_service.verify_turnstile_token(
|
||||||
|
turnstile_response, request.client.host if request.client else None
|
||||||
|
):
|
||||||
|
return RedirectResponse(url="/?captcha_error=1", status_code=303)
|
||||||
|
|
||||||
workspace_id = get_workspace_repository().create_workspace_id()
|
workspace_id = get_workspace_repository().create_workspace_id()
|
||||||
response = RedirectResponse(url=f"/{workspace_id}", status_code=303)
|
response = RedirectResponse(url=f"/{workspace_id}", status_code=303)
|
||||||
response.set_cookie(
|
response.set_cookie(
|
||||||
|
|||||||
@@ -63,8 +63,7 @@ def legacy_backtests_page(request: Request):
|
|||||||
def workspace_backtests_page(workspace_id: str) -> None:
|
def workspace_backtests_page(workspace_id: str) -> None:
|
||||||
repo = get_workspace_repository()
|
repo = get_workspace_repository()
|
||||||
if not repo.workspace_exists(workspace_id):
|
if not repo.workspace_exists(workspace_id):
|
||||||
render_workspace_recovery()
|
return RedirectResponse(url="/", status_code=307)
|
||||||
return
|
|
||||||
_render_backtests_page(workspace_id=workspace_id)
|
_render_backtests_page(workspace_id=workspace_id)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ def render_workspace_recovery(title: str = "Workspace not found", message: str |
|
|||||||
ui.label(title).classes("text-3xl font-bold text-slate-900 dark:text-slate-50")
|
ui.label(title).classes("text-3xl font-bold text-slate-900 dark:text-slate-50")
|
||||||
ui.label(resolved_message).classes("text-base text-slate-500 dark:text-slate-400")
|
ui.label(resolved_message).classes("text-base text-slate-500 dark:text-slate-400")
|
||||||
with ui.row().classes("mx-auto gap-3"):
|
with ui.row().classes("mx-auto gap-3"):
|
||||||
ui.link("Get started", "/workspaces/bootstrap").classes(
|
ui.link("Get started", "/").classes(
|
||||||
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
||||||
)
|
)
|
||||||
ui.link("Go to welcome page", "/").classes(
|
ui.link("Go to welcome page", "/").classes(
|
||||||
|
|||||||
@@ -41,8 +41,7 @@ def legacy_event_comparison_page(request: Request):
|
|||||||
def workspace_event_comparison_page(workspace_id: str) -> None:
|
def workspace_event_comparison_page(workspace_id: str) -> None:
|
||||||
repo = get_workspace_repository()
|
repo = get_workspace_repository()
|
||||||
if not repo.workspace_exists(workspace_id):
|
if not repo.workspace_exists(workspace_id):
|
||||||
render_workspace_recovery()
|
return RedirectResponse(url="/", status_code=307)
|
||||||
return
|
|
||||||
_render_event_comparison_page(workspace_id=workspace_id)
|
_render_event_comparison_page(workspace_id=workspace_id)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -70,8 +70,7 @@ def legacy_hedge_page(request: Request):
|
|||||||
def workspace_hedge_page(workspace_id: str) -> None:
|
def workspace_hedge_page(workspace_id: str) -> None:
|
||||||
repo = get_workspace_repository()
|
repo = get_workspace_repository()
|
||||||
if not repo.workspace_exists(workspace_id):
|
if not repo.workspace_exists(workspace_id):
|
||||||
render_workspace_recovery()
|
return RedirectResponse(url="/", status_code=307)
|
||||||
return
|
|
||||||
_render_hedge_page(workspace_id=workspace_id)
|
_render_hedge_page(workspace_id=workspace_id)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from app.models.workspace import WORKSPACE_COOKIE, get_workspace_repository
|
|||||||
from app.pages.common import dashboard_page, quick_recommendations, recommendation_style, strategy_catalog
|
from app.pages.common import dashboard_page, quick_recommendations, recommendation_style, strategy_catalog
|
||||||
from app.services.alerts import AlertService, build_portfolio_alert_context
|
from app.services.alerts import AlertService, build_portfolio_alert_context
|
||||||
from app.services.runtime import get_data_service
|
from app.services.runtime import get_data_service
|
||||||
|
from app.services.turnstile import load_turnstile_settings
|
||||||
|
|
||||||
_DEFAULT_CASH_BUFFER = 18_500.0
|
_DEFAULT_CASH_BUFFER = 18_500.0
|
||||||
|
|
||||||
@@ -55,7 +56,7 @@ def _render_workspace_recovery(title: str, message: str) -> None:
|
|||||||
ui.label(title).classes("text-3xl font-bold text-slate-900 dark:text-slate-50")
|
ui.label(title).classes("text-3xl font-bold text-slate-900 dark:text-slate-50")
|
||||||
ui.label(message).classes("text-base text-slate-500 dark:text-slate-400")
|
ui.label(message).classes("text-base text-slate-500 dark:text-slate-400")
|
||||||
with ui.row().classes("mx-auto gap-3"):
|
with ui.row().classes("mx-auto gap-3"):
|
||||||
ui.link("Get started", "/workspaces/bootstrap").classes(
|
ui.link("Get started", "/").classes(
|
||||||
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
||||||
)
|
)
|
||||||
ui.link("Go to welcome page", "/").classes(
|
ui.link("Go to welcome page", "/").classes(
|
||||||
@@ -69,6 +70,7 @@ def welcome_page(request: Request):
|
|||||||
workspace_id = request.cookies.get(WORKSPACE_COOKIE, "")
|
workspace_id = request.cookies.get(WORKSPACE_COOKIE, "")
|
||||||
if workspace_id and repo.workspace_exists(workspace_id):
|
if workspace_id and repo.workspace_exists(workspace_id):
|
||||||
return RedirectResponse(url=f"/{workspace_id}", status_code=307)
|
return RedirectResponse(url=f"/{workspace_id}", status_code=307)
|
||||||
|
captcha_error = request.query_params.get("captcha_error") == "1"
|
||||||
|
|
||||||
with ui.column().classes("mx-auto mt-24 w-full max-w-3xl gap-8 px-6"):
|
with ui.column().classes("mx-auto mt-24 w-full max-w-3xl gap-8 px-6"):
|
||||||
with ui.card().classes(
|
with ui.card().classes(
|
||||||
@@ -79,10 +81,25 @@ def welcome_page(request: Request):
|
|||||||
ui.label(
|
ui.label(
|
||||||
"Start with a workspace-scoped overview and settings area. Your portfolio defaults are stored server-side and your browser keeps a workspace cookie for quick return visits."
|
"Start with a workspace-scoped overview and settings area. Your portfolio defaults are stored server-side and your browser keeps a workspace cookie for quick return visits."
|
||||||
).classes("text-base text-slate-500 dark:text-slate-400")
|
).classes("text-base text-slate-500 dark:text-slate-400")
|
||||||
with ui.row().classes("items-center gap-4 pt-4"):
|
if captcha_error:
|
||||||
ui.link("Get started", "/workspaces/bootstrap").classes(
|
ui.label("CAPTCHA verification failed. Please retry the Turnstile challenge.").classes(
|
||||||
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
"rounded-lg border border-rose-200 bg-rose-50 px-4 py-3 text-sm font-medium text-rose-700 dark:border-rose-900/60 dark:bg-rose-950/30 dark:text-rose-300"
|
||||||
)
|
)
|
||||||
|
with ui.row().classes("items-center gap-4 pt-4"):
|
||||||
|
turnstile = load_turnstile_settings()
|
||||||
|
ui.add_body_html(
|
||||||
|
'<script src="https://challenges.cloudflare.com/turnstile/v0/api.js" async defer></script>'
|
||||||
|
)
|
||||||
|
hidden_token = (
|
||||||
|
'<input type="hidden" name="cf-turnstile-response" value="test-token" />'
|
||||||
|
if turnstile.uses_test_keys
|
||||||
|
else ""
|
||||||
|
)
|
||||||
|
ui.html(f"""<form method="post" action="/workspaces/bootstrap" class="flex items-center gap-4">
|
||||||
|
{hidden_token}
|
||||||
|
<div class="cf-turnstile" data-sitekey="{turnstile.site_key}"></div>
|
||||||
|
<button type="submit" class="rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900">Get started</button>
|
||||||
|
</form>""")
|
||||||
ui.label("You can always create a fresh workspace later if a link is lost.").classes(
|
ui.label("You can always create a fresh workspace later if a link is lost.").classes(
|
||||||
"text-sm text-slate-500 dark:text-slate-400"
|
"text-sm text-slate-500 dark:text-slate-400"
|
||||||
)
|
)
|
||||||
@@ -102,11 +119,7 @@ def legacy_overview_page(request: Request):
|
|||||||
async def overview_page(workspace_id: str) -> None:
|
async def overview_page(workspace_id: str) -> None:
|
||||||
repo = get_workspace_repository()
|
repo = get_workspace_repository()
|
||||||
if not repo.workspace_exists(workspace_id):
|
if not repo.workspace_exists(workspace_id):
|
||||||
_render_workspace_recovery(
|
return RedirectResponse(url="/", status_code=307)
|
||||||
"Workspace not found",
|
|
||||||
"The workspace link looks missing or expired. Create a new workspace or return to the welcome page.",
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
config = repo.load_portfolio_config(workspace_id)
|
config = repo.load_portfolio_config(workspace_id)
|
||||||
data_service = get_data_service()
|
data_service = get_data_service()
|
||||||
@@ -224,15 +237,17 @@ async def overview_page(workspace_id: str) -> None:
|
|||||||
"w-full items-start justify-between gap-4 border-b border-slate-100 py-3 last:border-b-0 dark:border-slate-800"
|
"w-full items-start justify-between gap-4 border-b border-slate-100 py-3 last:border-b-0 dark:border-slate-800"
|
||||||
):
|
):
|
||||||
with ui.column().classes("gap-1"):
|
with ui.column().classes("gap-1"):
|
||||||
ui.label(event.message).classes("text-sm font-medium text-slate-900 dark:text-slate-100")
|
ui.label(event.message).classes(
|
||||||
|
"text-sm font-medium text-slate-900 dark:text-slate-100"
|
||||||
|
)
|
||||||
ui.label(
|
ui.label(
|
||||||
f"Logged {_format_timestamp(event.updated_at)} · Spot ${event.spot_price:,.2f} · LTV {event.ltv_ratio:.1%}"
|
f"Logged {_format_timestamp(event.updated_at)} · Spot ${event.spot_price:,.2f} · LTV {event.ltv_ratio:.1%}"
|
||||||
).classes("text-xs text-slate-500 dark:text-slate-400")
|
).classes("text-xs text-slate-500 dark:text-slate-400")
|
||||||
ui.label(event.severity.upper()).classes(_alert_badge_classes(event.severity))
|
ui.label(event.severity.upper()).classes(_alert_badge_classes(event.severity))
|
||||||
else:
|
else:
|
||||||
ui.label("No alert history yet. Alerts will be logged once the warning threshold is crossed.").classes(
|
ui.label(
|
||||||
"text-sm text-slate-500 dark:text-slate-400"
|
"No alert history yet. Alerts will be logged once the warning threshold is crossed."
|
||||||
)
|
).classes("text-sm text-slate-500 dark:text-slate-400")
|
||||||
|
|
||||||
with ui.card().classes(
|
with ui.card().classes(
|
||||||
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
|
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ def _render_workspace_recovery() -> None:
|
|||||||
"The requested workspace is unavailable. Start a new workspace or return to the welcome page."
|
"The requested workspace is unavailable. Start a new workspace or return to the welcome page."
|
||||||
).classes("text-base text-slate-500 dark:text-slate-400")
|
).classes("text-base text-slate-500 dark:text-slate-400")
|
||||||
with ui.row().classes("mx-auto gap-3"):
|
with ui.row().classes("mx-auto gap-3"):
|
||||||
ui.link("Get started", "/workspaces/bootstrap").classes(
|
ui.link("Get started", "/").classes(
|
||||||
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
"rounded-lg bg-slate-900 px-5 py-3 text-sm font-semibold text-white no-underline dark:bg-slate-100 dark:text-slate-900"
|
||||||
)
|
)
|
||||||
ui.link("Go to welcome page", "/").classes(
|
ui.link("Go to welcome page", "/").classes(
|
||||||
@@ -49,8 +49,7 @@ def settings_page(workspace_id: str) -> None:
|
|||||||
"""Settings page with workspace-scoped persistent portfolio configuration."""
|
"""Settings page with workspace-scoped persistent portfolio configuration."""
|
||||||
workspace_repo = get_workspace_repository()
|
workspace_repo = get_workspace_repository()
|
||||||
if not workspace_repo.workspace_exists(workspace_id):
|
if not workspace_repo.workspace_exists(workspace_id):
|
||||||
_render_workspace_recovery()
|
return RedirectResponse(url="/", status_code=307)
|
||||||
return
|
|
||||||
|
|
||||||
config = workspace_repo.load_portfolio_config(workspace_id)
|
config = workspace_repo.load_portfolio_config(workspace_id)
|
||||||
alert_service = AlertService()
|
alert_service = AlertService()
|
||||||
|
|||||||
73
app/services/turnstile.py
Normal file
73
app/services/turnstile.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
TURNSTILE_VERIFY_URL = "https://challenges.cloudflare.com/turnstile/v0/siteverify"
|
||||||
|
DEFAULT_TURNSTILE_TEST_SITE_KEY = "1x00000000000000000000AA"
|
||||||
|
DEFAULT_TURNSTILE_TEST_SECRET_KEY = "1x0000000000000000000000000000000AA"
|
||||||
|
ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY = "2x00000000000000000000AB"
|
||||||
|
ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY = "2x0000000000000000000000000000000AA"
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class TurnstileSettings:
|
||||||
|
site_key: str
|
||||||
|
secret_key: str
|
||||||
|
enabled: bool
|
||||||
|
uses_test_keys: bool
|
||||||
|
|
||||||
|
|
||||||
|
def _environment() -> str:
|
||||||
|
return os.getenv("APP_ENV", os.getenv("ENVIRONMENT", "development")).lower()
|
||||||
|
|
||||||
|
|
||||||
|
def load_turnstile_settings() -> TurnstileSettings:
|
||||||
|
site_key = os.getenv("TURNSTILE_SITE_KEY", "")
|
||||||
|
secret_key = os.getenv("TURNSTILE_SECRET_KEY", "")
|
||||||
|
enabled = os.getenv("TURNSTILE_ENABLED", "true").lower() not in {"0", "false", "no"}
|
||||||
|
env = _environment()
|
||||||
|
|
||||||
|
if not site_key or not secret_key:
|
||||||
|
if env in {"development", "test"}:
|
||||||
|
site_key = site_key or DEFAULT_TURNSTILE_TEST_SITE_KEY
|
||||||
|
secret_key = secret_key or DEFAULT_TURNSTILE_TEST_SECRET_KEY
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Turnstile keys must be configured outside development/test environments")
|
||||||
|
|
||||||
|
uses_test_keys = site_key == DEFAULT_TURNSTILE_TEST_SITE_KEY and secret_key == DEFAULT_TURNSTILE_TEST_SECRET_KEY
|
||||||
|
return TurnstileSettings(
|
||||||
|
site_key=site_key,
|
||||||
|
secret_key=secret_key,
|
||||||
|
enabled=enabled,
|
||||||
|
uses_test_keys=uses_test_keys,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_turnstile_token(token: str, remote_ip: str | None = None) -> bool:
|
||||||
|
settings = load_turnstile_settings()
|
||||||
|
if not settings.enabled:
|
||||||
|
return True
|
||||||
|
if not token.strip():
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
TURNSTILE_VERIFY_URL,
|
||||||
|
data={
|
||||||
|
"secret": settings.secret_key,
|
||||||
|
"response": token,
|
||||||
|
"remoteip": remote_ip or "",
|
||||||
|
},
|
||||||
|
timeout=10,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
payload = response.json()
|
||||||
|
except (requests.RequestException, ValueError) as exc:
|
||||||
|
logger.warning("Turnstile verification failed: %s", exc)
|
||||||
|
return False
|
||||||
|
return bool(payload.get("success"))
|
||||||
@@ -15,6 +15,8 @@ services:
|
|||||||
NICEGUI_MOUNT_PATH: ${NICEGUI_MOUNT_PATH:-/}
|
NICEGUI_MOUNT_PATH: ${NICEGUI_MOUNT_PATH:-/}
|
||||||
NICEGUI_STORAGE_SECRET: ${NICEGUI_STORAGE_SECRET}
|
NICEGUI_STORAGE_SECRET: ${NICEGUI_STORAGE_SECRET}
|
||||||
CORS_ORIGINS: ${CORS_ORIGINS:-*}
|
CORS_ORIGINS: ${CORS_ORIGINS:-*}
|
||||||
|
TURNSTILE_SITE_KEY: ${TURNSTILE_SITE_KEY:-}
|
||||||
|
TURNSTILE_SECRET_KEY: ${TURNSTILE_SECRET_KEY:-}
|
||||||
ports:
|
ports:
|
||||||
- "${APP_BIND_ADDRESS:-127.0.0.1}:${APP_PORT:-8000}:8000"
|
- "${APP_BIND_ADDRESS:-127.0.0.1}:${APP_PORT:-8000}:8000"
|
||||||
networks:
|
networks:
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ WEBSOCKET_INTERVAL_SECONDS=${WEBSOCKET_INTERVAL_SECONDS:-5}
|
|||||||
NICEGUI_MOUNT_PATH=${NICEGUI_MOUNT_PATH:-/}
|
NICEGUI_MOUNT_PATH=${NICEGUI_MOUNT_PATH:-/}
|
||||||
NICEGUI_STORAGE_SECRET=${NICEGUI_STORAGE_SECRET:-}
|
NICEGUI_STORAGE_SECRET=${NICEGUI_STORAGE_SECRET:-}
|
||||||
CORS_ORIGINS=${CORS_ORIGINS:-*}
|
CORS_ORIGINS=${CORS_ORIGINS:-*}
|
||||||
|
TURNSTILE_SITE_KEY=${TURNSTILE_SITE_KEY:-}
|
||||||
|
TURNSTILE_SECRET_KEY=${TURNSTILE_SECRET_KEY:-}
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Upload docker-compose file
|
# Upload docker-compose file
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ def test_homepage_and_options_page_render() -> None:
|
|||||||
page.goto(BASE_URL, wait_until="domcontentloaded", timeout=30000)
|
page.goto(BASE_URL, wait_until="domcontentloaded", timeout=30000)
|
||||||
expect(page).to_have_title("NiceGUI")
|
expect(page).to_have_title("NiceGUI")
|
||||||
expect(page.locator("text=Create a private workspace URL").first).to_be_visible(timeout=10000)
|
expect(page.locator("text=Create a private workspace URL").first).to_be_visible(timeout=10000)
|
||||||
page.get_by_role("link", name="Get started").click()
|
page.get_by_role("button", name="Get started").click()
|
||||||
page.wait_for_url(f"{BASE_URL}/*", timeout=15000)
|
page.wait_for_url(f"{BASE_URL}/*", timeout=15000)
|
||||||
workspace_url = page.url
|
workspace_url = page.url
|
||||||
workspace_id = workspace_url.removeprefix(f"{BASE_URL}/")
|
workspace_id = workspace_url.removeprefix(f"{BASE_URL}/")
|
||||||
@@ -157,7 +157,7 @@ def test_homepage_and_options_page_render() -> None:
|
|||||||
second_page = second_context.new_page()
|
second_page = second_context.new_page()
|
||||||
second_page.goto(BASE_URL, wait_until="domcontentloaded", timeout=30000)
|
second_page.goto(BASE_URL, wait_until="domcontentloaded", timeout=30000)
|
||||||
expect(second_page.locator("text=Create a private workspace URL").first).to_be_visible(timeout=10000)
|
expect(second_page.locator("text=Create a private workspace URL").first).to_be_visible(timeout=10000)
|
||||||
second_page.get_by_role("link", name="Get started").click()
|
second_page.get_by_role("button", name="Get started").click()
|
||||||
second_page.wait_for_url(f"{BASE_URL}/*", timeout=15000)
|
second_page.wait_for_url(f"{BASE_URL}/*", timeout=15000)
|
||||||
second_workspace_url = second_page.url
|
second_workspace_url = second_page.url
|
||||||
assert second_workspace_url != workspace_url
|
assert second_workspace_url != workspace_url
|
||||||
|
|||||||
153
tests/test_turnstile.py
Normal file
153
tests/test_turnstile.py
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
from app.main import app
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_get_bootstrap_url_redirects_to_welcome_page() -> None:
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/workspaces/bootstrap", follow_redirects=False)
|
||||||
|
|
||||||
|
assert response.status_code in {302, 303, 307}
|
||||||
|
assert response.headers["location"] == "/"
|
||||||
|
|
||||||
|
|
||||||
|
def test_bootstrap_workspace_rejects_missing_turnstile_token(monkeypatch, tmp_path) -> None:
|
||||||
|
from app.models import workspace as workspace_module
|
||||||
|
from app.models.workspace import WorkspaceRepository
|
||||||
|
|
||||||
|
repo = WorkspaceRepository(base_path=tmp_path / "workspaces")
|
||||||
|
monkeypatch.setattr(workspace_module, "_workspace_repo", repo)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.post("/workspaces/bootstrap", follow_redirects=False)
|
||||||
|
|
||||||
|
assert response.status_code == 303
|
||||||
|
assert response.headers["location"] == "/?captcha_error=1"
|
||||||
|
|
||||||
|
|
||||||
|
def test_bootstrap_workspace_creates_workspace_when_turnstile_verification_succeeds(monkeypatch, tmp_path) -> None:
|
||||||
|
from app.models import workspace as workspace_module
|
||||||
|
from app.models.workspace import WorkspaceRepository
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
repo = WorkspaceRepository(base_path=tmp_path / "workspaces")
|
||||||
|
monkeypatch.setattr(workspace_module, "_workspace_repo", repo)
|
||||||
|
monkeypatch.setattr(turnstile_module, "verify_turnstile_token", lambda *args, **kwargs: True)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.post(
|
||||||
|
"/workspaces/bootstrap",
|
||||||
|
data={"cf-turnstile-response": "token-ok"},
|
||||||
|
follow_redirects=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code in {302, 303, 307}
|
||||||
|
workspace_id = response.headers["location"].strip("/")
|
||||||
|
assert repo.workspace_exists(workspace_id)
|
||||||
|
assert response.cookies.get("workspace_id") == workspace_id
|
||||||
|
|
||||||
|
|
||||||
|
def test_bootstrap_workspace_rejects_failed_turnstile_verification(monkeypatch, tmp_path) -> None:
|
||||||
|
from app.models import workspace as workspace_module
|
||||||
|
from app.models.workspace import WorkspaceRepository
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
repo = WorkspaceRepository(base_path=tmp_path / "workspaces")
|
||||||
|
monkeypatch.setattr(workspace_module, "_workspace_repo", repo)
|
||||||
|
monkeypatch.setattr(turnstile_module, "verify_turnstile_token", lambda *args, **kwargs: False)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.post(
|
||||||
|
"/workspaces/bootstrap",
|
||||||
|
data={"cf-turnstile-response": "token-bad"},
|
||||||
|
follow_redirects=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 303
|
||||||
|
assert response.headers["location"] == "/?captcha_error=1"
|
||||||
|
assert not any(repo.base_path.iterdir())
|
||||||
|
|
||||||
|
|
||||||
|
def test_turnstile_settings_use_default_test_keys_when_env_is_missing(monkeypatch) -> None:
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
monkeypatch.setenv("APP_ENV", "test")
|
||||||
|
monkeypatch.setenv("APP_ENV", "test")
|
||||||
|
monkeypatch.delenv("TURNSTILE_SITE_KEY", raising=False)
|
||||||
|
monkeypatch.delenv("TURNSTILE_SECRET_KEY", raising=False)
|
||||||
|
|
||||||
|
settings = turnstile_module.load_turnstile_settings()
|
||||||
|
|
||||||
|
assert settings.site_key == turnstile_module.DEFAULT_TURNSTILE_TEST_SITE_KEY
|
||||||
|
assert settings.secret_key == turnstile_module.DEFAULT_TURNSTILE_TEST_SECRET_KEY
|
||||||
|
assert settings.enabled is True
|
||||||
|
assert settings.uses_test_keys is True
|
||||||
|
assert settings.uses_test_keys is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_turnstile_settings_fail_loudly_without_keys_outside_dev(monkeypatch) -> None:
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
monkeypatch.setenv("APP_ENV", "production")
|
||||||
|
monkeypatch.delenv("TURNSTILE_SITE_KEY", raising=False)
|
||||||
|
monkeypatch.delenv("TURNSTILE_SECRET_KEY", raising=False)
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="Turnstile keys must be configured"):
|
||||||
|
turnstile_module.load_turnstile_settings()
|
||||||
|
|
||||||
|
|
||||||
|
def test_turnstile_verification_returns_false_on_transport_error(monkeypatch) -> None:
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
monkeypatch.setenv("APP_ENV", "test")
|
||||||
|
|
||||||
|
def raise_error(*args, **kwargs):
|
||||||
|
raise requests.RequestException("boom")
|
||||||
|
|
||||||
|
monkeypatch.setattr(turnstile_module.requests, "post", raise_error)
|
||||||
|
|
||||||
|
assert turnstile_module.verify_turnstile_token("token") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_turnstile_settings_support_always_fail_test_keys(monkeypatch) -> None:
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
monkeypatch.setenv("APP_ENV", "test")
|
||||||
|
monkeypatch.setenv("TURNSTILE_SITE_KEY", turnstile_module.ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY)
|
||||||
|
monkeypatch.setenv("TURNSTILE_SECRET_KEY", turnstile_module.ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY)
|
||||||
|
|
||||||
|
settings = turnstile_module.load_turnstile_settings()
|
||||||
|
|
||||||
|
assert settings.site_key == turnstile_module.ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY
|
||||||
|
assert settings.secret_key == turnstile_module.ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY
|
||||||
|
assert settings.enabled is True
|
||||||
|
assert settings.uses_test_keys is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_bootstrap_stays_blocked_under_always_fail_turnstile_test_keys(monkeypatch, tmp_path) -> None:
|
||||||
|
from app.models import workspace as workspace_module
|
||||||
|
from app.models.workspace import WorkspaceRepository
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
|
repo = WorkspaceRepository(base_path=tmp_path / "workspaces")
|
||||||
|
monkeypatch.setattr(workspace_module, "_workspace_repo", repo)
|
||||||
|
monkeypatch.setenv("APP_ENV", "test")
|
||||||
|
monkeypatch.setenv("TURNSTILE_SITE_KEY", turnstile_module.ALWAYS_FAIL_TURNSTILE_TEST_SITE_KEY)
|
||||||
|
monkeypatch.setenv("TURNSTILE_SECRET_KEY", turnstile_module.ALWAYS_FAIL_TURNSTILE_TEST_SECRET_KEY)
|
||||||
|
monkeypatch.setattr(turnstile_module, "verify_turnstile_token", lambda *args, **kwargs: False)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.post(
|
||||||
|
"/workspaces/bootstrap",
|
||||||
|
data={"cf-turnstile-response": "blocked-token"},
|
||||||
|
follow_redirects=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 303
|
||||||
|
assert response.headers["location"] == "/?captcha_error=1"
|
||||||
|
assert not any(repo.base_path.iterdir())
|
||||||
@@ -48,11 +48,20 @@ def test_root_without_workspace_cookie_shows_welcome_page(tmp_path, monkeypatch)
|
|||||||
assert "Get started" in response.text
|
assert "Get started" in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_bootstrap_endpoint_creates_workspace_cookie_and_redirects(tmp_path, monkeypatch) -> None:
|
def test_bootstrap_endpoint_requires_turnstile_and_creates_workspace_cookie_and_redirects(
|
||||||
|
tmp_path, monkeypatch
|
||||||
|
) -> None:
|
||||||
|
from app.services import turnstile as turnstile_module
|
||||||
|
|
||||||
repo = _install_workspace_repo(tmp_path, monkeypatch)
|
repo = _install_workspace_repo(tmp_path, monkeypatch)
|
||||||
|
monkeypatch.setattr(turnstile_module, "verify_turnstile_token", lambda *args, **kwargs: True)
|
||||||
|
|
||||||
with TestClient(app) as client:
|
with TestClient(app) as client:
|
||||||
response = client.get("/workspaces/bootstrap", follow_redirects=False)
|
response = client.post(
|
||||||
|
"/workspaces/bootstrap",
|
||||||
|
data={"cf-turnstile-response": "token-ok"},
|
||||||
|
follow_redirects=False,
|
||||||
|
)
|
||||||
|
|
||||||
assert response.status_code in {302, 303, 307}
|
assert response.status_code in {302, 303, 307}
|
||||||
location = response.headers["location"]
|
location = response.headers["location"]
|
||||||
@@ -74,15 +83,24 @@ def test_root_with_valid_workspace_cookie_redirects_to_workspace(tmp_path, monke
|
|||||||
assert response.headers["location"] == f"/{workspace_id}"
|
assert response.headers["location"] == f"/{workspace_id}"
|
||||||
|
|
||||||
|
|
||||||
def test_unknown_workspace_route_shows_friendly_recovery_path(tmp_path, monkeypatch) -> None:
|
def test_unknown_workspace_route_redirects_to_welcome_page(tmp_path, monkeypatch) -> None:
|
||||||
_install_workspace_repo(tmp_path, monkeypatch)
|
_install_workspace_repo(tmp_path, monkeypatch)
|
||||||
|
|
||||||
with TestClient(app) as client:
|
with TestClient(app) as client:
|
||||||
response = client.get("/00000000-0000-4000-8000-000000000000")
|
response = client.get("/00000000-0000-4000-8000-000000000000")
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert "Workspace not found" in response.text
|
assert "Create a private workspace URL" in response.text
|
||||||
assert "Get started" in response.text
|
|
||||||
|
|
||||||
|
def test_arbitrary_fake_workspace_like_path_redirects_to_welcome_page(tmp_path, monkeypatch) -> None:
|
||||||
|
_install_workspace_repo(tmp_path, monkeypatch)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
response = client.get("/arbitrary2371568path/")
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "Create a private workspace URL" in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_workspace_settings_round_trip_uses_workspace_storage(tmp_path, monkeypatch) -> None:
|
def test_workspace_settings_round_trip_uses_workspace_storage(tmp_path, monkeypatch) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user