From 1a6760bee317294a25b7606c28f10f4750f73380 Mon Sep 17 00:00:00 2001 From: Bu5hm4nn Date: Fri, 27 Mar 2026 16:39:33 +0100 Subject: [PATCH] feat(PORT-003): add historical ltv charts --- app/models/ltv_history.py | 194 ++++++++++++++++++ app/pages/overview.py | 93 +++++++++ app/services/ltv_history.py | 134 ++++++++++++ docs/roadmap/ROADMAP.yaml | 4 +- .../PORT-003-historical-ltv-chart.yaml | 13 -- .../done/PORT-003-historical-ltv-chart.yaml | 19 ++ tests/test_ltv_history.py | 171 +++++++++++++++ tests/test_overview_ltv_history_playwright.py | 57 +++++ 8 files changed, 670 insertions(+), 15 deletions(-) create mode 100644 app/models/ltv_history.py create mode 100644 app/services/ltv_history.py delete mode 100644 docs/roadmap/backlog/PORT-003-historical-ltv-chart.yaml create mode 100644 docs/roadmap/done/PORT-003-historical-ltv-chart.yaml create mode 100644 tests/test_ltv_history.py create mode 100644 tests/test_overview_ltv_history_playwright.py diff --git a/app/models/ltv_history.py b/app/models/ltv_history.py new file mode 100644 index 0000000..e84ef89 --- /dev/null +++ b/app/models/ltv_history.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import date, datetime +from decimal import Decimal, InvalidOperation +from pathlib import Path +from typing import Any + + +class LtvHistoryLoadError(RuntimeError): + def __init__(self, history_path: Path, message: str) -> None: + super().__init__(message) + self.history_path = history_path + + +@dataclass(frozen=True) +class LtvSnapshot: + snapshot_date: str + captured_at: str + ltv_ratio: Decimal + margin_threshold: Decimal + loan_amount: Decimal + collateral_value: Decimal + spot_price: Decimal + source: str + + def __post_init__(self) -> None: + for field_name in ("snapshot_date", "captured_at", "source"): + value = getattr(self, field_name) + if not isinstance(value, str) or not value.strip(): + raise ValueError(f"{field_name} must be a non-empty string") + date.fromisoformat(self.snapshot_date) + datetime.fromisoformat(self.captured_at.replace("Z", "+00:00")) + for field_name in ( + "ltv_ratio", + "margin_threshold", + "loan_amount", + "collateral_value", + "spot_price", + ): + value = getattr(self, field_name) + if not isinstance(value, Decimal) or not value.is_finite(): + raise TypeError(f"{field_name} must be a finite Decimal") + if self.ltv_ratio < 0: + raise ValueError("ltv_ratio must be zero or greater") + if not Decimal("0") < self.margin_threshold < Decimal("1"): + raise ValueError("margin_threshold must be between 0 and 1") + if self.loan_amount < 0: + raise ValueError("loan_amount must be zero or greater") + if self.collateral_value <= 0: + raise ValueError("collateral_value must be positive") + if self.spot_price <= 0: + raise ValueError("spot_price must be positive") + + def to_dict(self) -> dict[str, Any]: + return { + "snapshot_date": self.snapshot_date, + "captured_at": self.captured_at, + "ltv_ratio": _structured_ratio_payload(self.ltv_ratio), + "margin_threshold": _structured_ratio_payload(self.margin_threshold), + "loan_amount": _structured_money_payload(self.loan_amount), + "collateral_value": _structured_money_payload(self.collateral_value), + "spot_price": _structured_price_payload(self.spot_price), + "source": self.source, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "LtvSnapshot": + return cls( + snapshot_date=_require_non_empty_string(data, "snapshot_date"), + captured_at=_require_non_empty_string(data, "captured_at"), + ltv_ratio=_parse_ratio_payload(data.get("ltv_ratio"), field_name="ltv_ratio"), + margin_threshold=_parse_ratio_payload(data.get("margin_threshold"), field_name="margin_threshold"), + loan_amount=_parse_money_payload(data.get("loan_amount"), field_name="loan_amount"), + collateral_value=_parse_money_payload(data.get("collateral_value"), field_name="collateral_value"), + spot_price=_parse_price_payload(data.get("spot_price"), field_name="spot_price"), + source=_require_non_empty_string(data, "source"), + ) + + +class LtvHistoryRepository: + def __init__(self, base_path: Path | str = Path("data/workspaces")) -> None: + self.base_path = Path(base_path) + self.base_path.mkdir(parents=True, exist_ok=True) + + def load(self, workspace_id: str) -> list[LtvSnapshot]: + history_path = self.history_path(workspace_id) + if not history_path.exists(): + return [] + try: + payload = json.loads(history_path.read_text()) + except json.JSONDecodeError as exc: + raise LtvHistoryLoadError(history_path, f"LTV history is not valid JSON: {exc}") from exc + except OSError as exc: + raise LtvHistoryLoadError(history_path, f"LTV history could not be read: {exc}") from exc + if not isinstance(payload, list): + raise LtvHistoryLoadError(history_path, "LTV history payload must be a list") + snapshots: list[LtvSnapshot] = [] + for index, item in enumerate(payload): + if not isinstance(item, dict): + raise LtvHistoryLoadError(history_path, f"LTV history entry {index} must be an object") + try: + snapshots.append(LtvSnapshot.from_dict(item)) + except (TypeError, ValueError, KeyError) as exc: + raise LtvHistoryLoadError(history_path, f"LTV history entry {index} is invalid: {exc}") from exc + return snapshots + + def save(self, workspace_id: str, snapshots: list[LtvSnapshot]) -> None: + history_path = self.history_path(workspace_id) + history_path.parent.mkdir(parents=True, exist_ok=True) + history_path.write_text(json.dumps([snapshot.to_dict() for snapshot in snapshots], indent=2)) + + def history_path(self, workspace_id: str) -> Path: + return self.base_path / workspace_id / "ltv_history.json" + + +def _require_non_empty_string(data: dict[str, Any], field_name: str) -> str: + value = data.get(field_name) + if not isinstance(value, str) or not value.strip(): + raise ValueError(f"{field_name} must be a non-empty string") + return value + + +def _decimal_text(value: Decimal) -> str: + if value == value.to_integral(): + return str(value.quantize(Decimal("1"))) + return format(value.normalize(), "f") if value.normalize().as_tuple().exponent < 0 else str(value) + + +def _parse_decimal_payload( + payload: object, + *, + field_name: str, + expected_tag_key: str, + expected_tag_value: str, + expected_currency: str | None = None, + expected_per_weight_unit: str | None = None, +) -> Decimal: + if not isinstance(payload, dict): + raise TypeError(f"{field_name} must be an object") + if payload.get(expected_tag_key) != expected_tag_value: + raise ValueError(f"{field_name} must declare {expected_tag_key}={expected_tag_value!r}") + if expected_currency is not None and payload.get("currency") != expected_currency: + raise ValueError(f"{field_name} must declare currency={expected_currency!r}") + if expected_per_weight_unit is not None and payload.get("per_weight_unit") != expected_per_weight_unit: + raise ValueError(f"{field_name} must declare per_weight_unit={expected_per_weight_unit!r}") + raw_value = payload.get("value") + if not isinstance(raw_value, str) or not raw_value.strip(): + raise ValueError(f"{field_name}.value must be a non-empty string") + try: + value = Decimal(raw_value) + except InvalidOperation as exc: + raise ValueError(f"{field_name}.value must be numeric") from exc + if not value.is_finite(): + raise ValueError(f"{field_name}.value must be finite") + return value + + +def _parse_ratio_payload(payload: object, *, field_name: str) -> Decimal: + return _parse_decimal_payload(payload, field_name=field_name, expected_tag_key="unit", expected_tag_value="ratio") + + +def _parse_money_payload(payload: object, *, field_name: str) -> Decimal: + return _parse_decimal_payload( + payload, + field_name=field_name, + expected_tag_key="currency", + expected_tag_value="USD", + expected_currency="USD", + ) + + +def _parse_price_payload(payload: object, *, field_name: str) -> Decimal: + return _parse_decimal_payload( + payload, + field_name=field_name, + expected_tag_key="currency", + expected_tag_value="USD", + expected_currency="USD", + expected_per_weight_unit="ozt", + ) + + +def _structured_ratio_payload(value: Decimal) -> dict[str, str]: + return {"value": str(value), "unit": "ratio"} + + +def _structured_money_payload(value: Decimal) -> dict[str, str]: + return {"value": _decimal_text(value), "currency": "USD"} + + +def _structured_price_payload(value: Decimal) -> dict[str, str]: + return {"value": _decimal_text(value), "currency": "USD", "per_weight_unit": "ozt"} diff --git a/app/pages/overview.py b/app/pages/overview.py index aae033c..cf59949 100644 --- a/app/pages/overview.py +++ b/app/pages/overview.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from datetime import datetime, timezone from fastapi import Request @@ -8,6 +9,7 @@ from nicegui import ui from app.components import PortfolioOverview from app.domain.portfolio_math import resolve_portfolio_spot_from_quote +from app.models.ltv_history import LtvHistoryRepository from app.models.workspace import WORKSPACE_COOKIE, get_workspace_repository from app.pages.common import ( dashboard_page, @@ -17,9 +19,12 @@ from app.pages.common import ( strategy_catalog, ) from app.services.alerts import AlertService, build_portfolio_alert_context +from app.services.ltv_history import LtvHistoryChartModel, LtvHistoryService from app.services.runtime import get_data_service from app.services.turnstile import load_turnstile_settings +logger = logging.getLogger(__name__) + _DEFAULT_CASH_BUFFER = 18_500.0 @@ -47,6 +52,31 @@ def _alert_badge_classes(severity: str) -> str: }.get(severity, "rounded-full bg-slate-100 px-3 py-1 text-xs font-semibold text-slate-700") +def _ltv_chart_options(model: LtvHistoryChartModel) -> dict: + return { + "tooltip": {"trigger": "axis", "valueFormatter": "function (value) { return value + '%'; }"}, + "legend": {"data": ["LTV", "Margin threshold"]}, + "xAxis": {"type": "category", "data": list(model.labels)}, + "yAxis": {"type": "value", "name": "LTV %", "axisLabel": {"formatter": "{value}%"}}, + "series": [ + { + "name": "LTV", + "type": "line", + "smooth": True, + "data": list(model.ltv_values), + "lineStyle": {"width": 3}, + }, + { + "name": "Margin threshold", + "type": "line", + "data": list(model.threshold_values), + "lineStyle": {"type": "dashed", "width": 2}, + "symbol": "none", + }, + ], + } + + def _render_workspace_recovery(title: str, message: str) -> None: with ui.column().classes("mx-auto mt-24 w-full max-w-2xl gap-6 px-6 text-center"): ui.icon("folder_off").classes("mx-auto text-6xl text-slate-400") @@ -126,6 +156,25 @@ async def overview_page(workspace_id: str) -> None: portfolio["cash_buffer"] = max(float(portfolio["gold_value"]) - configured_gold_value, 0.0) + _DEFAULT_CASH_BUFFER portfolio["hedge_budget"] = float(config.monthly_budget) alert_status = AlertService().evaluate(config, portfolio) + ltv_history_service = LtvHistoryService(repository=LtvHistoryRepository(base_path=repo.base_path)) + ltv_history_notice: str | None = None + try: + ltv_history = ltv_history_service.record_workspace_snapshot(workspace_id, portfolio) + ltv_chart_models = tuple( + ltv_history_service.chart_model( + ltv_history, + days=days, + current_margin_threshold=config.margin_threshold, + ) + for days in (7, 30, 90) + ) + ltv_history_csv = ltv_history_service.export_csv(ltv_history) if ltv_history else "" + except Exception: + logger.exception("Failed to prepare LTV history for workspace %s", workspace_id) + ltv_history = [] + ltv_chart_models = () + ltv_history_csv = "" + ltv_history_notice = "Historical LTV is temporarily unavailable due to a storage error." if portfolio["quote_source"] == "configured_entry_price": quote_status = "Live quote source: configured entry price fallback ยท Last updated Unavailable" else: @@ -248,6 +297,50 @@ async def overview_page(workspace_id: str) -> None: "Warning: if GLD approaches the margin-call price, collateral remediation or hedge monetization will be required." ).classes("text-sm font-medium text-amber-700 dark:text-amber-300") + with ui.card().classes( + "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" + ): + with ui.row().classes( + "w-full items-center justify-between gap-3 max-sm:flex-col max-sm:items-start" + ): + with ui.column().classes("gap-1"): + ui.label("Historical LTV").classes( + "text-lg font-semibold text-slate-900 dark:text-slate-100" + ) + ui.label( + "Stored workspace snapshots show how LTV trended against the current margin threshold over 7, 30, and 90 day windows." + ).classes("text-sm text-slate-500 dark:text-slate-400") + if ltv_history: + ui.button( + "Export CSV", + icon="download", + on_click=lambda: ui.download.content( + ltv_history_csv, + filename=f"{workspace_id}-ltv-history.csv", + media_type="text/csv", + ), + ).props("outline color=primary") + if ltv_history_notice: + ui.label(ltv_history_notice).classes("text-sm text-amber-700 dark:text-amber-300") + elif ltv_history: + with ui.grid(columns=1).classes("w-full gap-4 xl:grid-cols-3"): + for chart_model, chart_testid in zip( + ltv_chart_models, + ("ltv-history-chart-7d", "ltv-history-chart-30d", "ltv-history-chart-90d"), + strict=True, + ): + with ui.card().classes( + "rounded-xl border border-slate-200 bg-slate-50 p-4 shadow-none dark:border-slate-800 dark:bg-slate-950" + ): + ui.label(chart_model.title).classes( + "text-base font-semibold text-slate-900 dark:text-slate-100" + ) + ui.echart(_ltv_chart_options(chart_model)).props( + f"data-testid={chart_testid}" + ).classes("h-56 w-full") + else: + ui.label("No LTV snapshots recorded yet.").classes("text-sm text-slate-500 dark:text-slate-400") + with ui.card().classes( "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" ): diff --git a/app/services/ltv_history.py b/app/services/ltv_history.py new file mode 100644 index 0000000..12b75bc --- /dev/null +++ b/app/services/ltv_history.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import csv +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from decimal import Decimal +from io import StringIO +from typing import Mapping + +from app.models.ltv_history import LtvHistoryRepository, LtvSnapshot +from app.services.boundary_values import boundary_decimal + + +@dataclass(frozen=True) +class LtvHistoryChartModel: + title: str + labels: tuple[str, ...] + ltv_values: tuple[float, ...] + threshold_values: tuple[float, ...] + + +class LtvHistoryService: + def __init__(self, repository: LtvHistoryRepository | None = None) -> None: + self.repository = repository or LtvHistoryRepository() + + def record_workspace_snapshot(self, workspace_id: str, portfolio: Mapping[str, object]) -> list[LtvSnapshot]: + snapshots = self.repository.load(workspace_id) + snapshot = self._build_snapshot(portfolio) + updated: list[LtvSnapshot] = [] + replaced = False + for existing in snapshots: + if existing.snapshot_date == snapshot.snapshot_date: + updated.append(snapshot) + replaced = True + else: + updated.append(existing) + if not replaced: + updated.append(snapshot) + updated.sort(key=lambda item: (item.snapshot_date, item.captured_at)) + self.repository.save(workspace_id, updated) + return updated + + @staticmethod + def chart_model( + snapshots: list[LtvSnapshot], + *, + days: int, + current_margin_threshold: Decimal | float | str | None = None, + ) -> LtvHistoryChartModel: + if days <= 0: + raise ValueError("days must be positive") + title = f"{days} Day" + if not snapshots: + return LtvHistoryChartModel(title=title, labels=(), ltv_values=(), threshold_values=()) + latest_date = max(datetime.fromisoformat(item.snapshot_date).date() for item in snapshots) + cutoff_date = latest_date - timedelta(days=days - 1) + filtered = [item for item in snapshots if datetime.fromisoformat(item.snapshot_date).date() >= cutoff_date] + threshold = ( + boundary_decimal(current_margin_threshold, field_name="current_margin_threshold") + if current_margin_threshold is not None + else filtered[-1].margin_threshold + ) + threshold_value = round(float(threshold * Decimal("100")), 1) + return LtvHistoryChartModel( + title=title, + labels=tuple(item.snapshot_date for item in filtered), + ltv_values=tuple(round(float(item.ltv_ratio * Decimal("100")), 1) for item in filtered), + threshold_values=tuple(threshold_value for _ in filtered), + ) + + @staticmethod + def export_csv(snapshots: list[LtvSnapshot]) -> str: + output = StringIO() + writer = csv.DictWriter( + output, + fieldnames=[ + "snapshot_date", + "captured_at", + "ltv_ratio_pct", + "margin_threshold_pct", + "loan_amount_usd", + "collateral_value_usd", + "spot_price_usd_per_ozt", + "source", + ], + ) + writer.writeheader() + for snapshot in snapshots: + writer.writerow( + { + "snapshot_date": snapshot.snapshot_date, + "captured_at": snapshot.captured_at, + "ltv_ratio_pct": f"{float(snapshot.ltv_ratio * Decimal('100')):.1f}", + "margin_threshold_pct": f"{float(snapshot.margin_threshold * Decimal('100')):.1f}", + "loan_amount_usd": _decimal_text(snapshot.loan_amount), + "collateral_value_usd": _decimal_text(snapshot.collateral_value), + "spot_price_usd_per_ozt": _decimal_text(snapshot.spot_price), + "source": snapshot.source, + } + ) + return output.getvalue() + + @staticmethod + def _build_snapshot(portfolio: Mapping[str, object]) -> LtvSnapshot: + captured_at = _normalize_timestamp(str(portfolio.get("quote_updated_at", ""))) + return LtvSnapshot( + snapshot_date=captured_at[:10], + captured_at=captured_at, + ltv_ratio=boundary_decimal(portfolio.get("ltv_ratio"), field_name="portfolio.ltv_ratio"), + margin_threshold=boundary_decimal( + portfolio.get("margin_call_ltv"), + field_name="portfolio.margin_call_ltv", + ), + loan_amount=boundary_decimal(portfolio.get("loan_amount"), field_name="portfolio.loan_amount"), + collateral_value=boundary_decimal(portfolio.get("gold_value"), field_name="portfolio.gold_value"), + spot_price=boundary_decimal(portfolio.get("spot_price"), field_name="portfolio.spot_price"), + source=str(portfolio.get("quote_source", "unknown")) or "unknown", + ) + + +def _normalize_timestamp(value: str) -> str: + if value: + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).isoformat() + except ValueError: + pass + return datetime.now(UTC).replace(microsecond=0).isoformat() + + +def _decimal_text(value: Decimal) -> str: + if value == value.to_integral(): + return str(value.quantize(Decimal("1"))) + normalized = value.normalize() + return format(normalized, "f") if normalized.as_tuple().exponent < 0 else str(normalized) diff --git a/docs/roadmap/ROADMAP.yaml b/docs/roadmap/ROADMAP.yaml index 8297d62..1f8e679 100644 --- a/docs/roadmap/ROADMAP.yaml +++ b/docs/roadmap/ROADMAP.yaml @@ -13,7 +13,6 @@ notes: - Pre-alpha policy: we may cut or replace old features without backward compatibility until alpha is declared. - Alpha migration policy: once alpha is declared, compatibility only needs to move forward; backward migrations are not required. priority_queue: - - PORT-003 - BT-002 - BT-001C - EXEC-001 @@ -23,6 +22,7 @@ priority_queue: - OPS-001 - BT-003 recently_completed: + - PORT-003 - BT-003B - CORE-001D - CORE-001D3C @@ -41,7 +41,6 @@ states: - DATA-002A - DATA-001A - OPS-001 - - PORT-003 - EXEC-001 - EXEC-002 - BT-002 @@ -55,6 +54,7 @@ states: - PORT-001 - PORT-001A - PORT-002 + - PORT-003 - PORT-004 - SEC-001 - SEC-001A diff --git a/docs/roadmap/backlog/PORT-003-historical-ltv-chart.yaml b/docs/roadmap/backlog/PORT-003-historical-ltv-chart.yaml deleted file mode 100644 index 62b51dc..0000000 --- a/docs/roadmap/backlog/PORT-003-historical-ltv-chart.yaml +++ /dev/null @@ -1,13 +0,0 @@ -id: PORT-003 -title: Historical LTV Chart -status: backlog -priority: P2 -effort: M -depends_on: - - PORT-001 -tags: [portfolio, history, charts] -summary: Record and display historical LTV snapshots. -acceptance_criteria: - - Store LTV snapshots over time. - - Display 7/30/90 day charts with the margin threshold line. - - Allow export as CSV. diff --git a/docs/roadmap/done/PORT-003-historical-ltv-chart.yaml b/docs/roadmap/done/PORT-003-historical-ltv-chart.yaml new file mode 100644 index 0000000..43f4b16 --- /dev/null +++ b/docs/roadmap/done/PORT-003-historical-ltv-chart.yaml @@ -0,0 +1,19 @@ +id: PORT-003 +title: Historical LTV Chart +status: done +priority: P2 +effort: M +depends_on: + - PORT-001 +tags: + - portfolio + - history + - charts +summary: Workspace-scoped LTV snapshots are now recorded over time and rendered on the overview page with CSV export. +completed_notes: + - Added structured workspace-scoped LTV snapshot persistence in `app/models/ltv_history.py`. + - Added chart-range and CSV export support in `app/services/ltv_history.py`. + - Updated `app/pages/overview.py` to record snapshots, display 7/30/90 day LTV charts with the margin threshold line, and expose `Export CSV`. + - Added focused regression coverage in `tests/test_ltv_history.py` for persistence, same-day replacement, range filtering, and CSV export. + - Added a Playwright regression test in `tests/test_overview_ltv_history_playwright.py` covering the exact changed route, live rendered chart series on the overview page, and CSV download path on `/{workspace_id}`. + - Manual local Docker validation confirmed the stack starts cleanly, `/health` returns OK, and the Docker-served overview route renders the LTV history UI without visible runtime errors. diff --git a/tests/test_ltv_history.py b/tests/test_ltv_history.py new file mode 100644 index 0000000..bef0b5e --- /dev/null +++ b/tests/test_ltv_history.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import csv +import json +from decimal import Decimal +from io import StringIO +from uuid import uuid4 + +from app.models.ltv_history import LtvHistoryRepository +from app.services.ltv_history import LtvHistoryService + + +def test_ltv_history_repository_persists_structured_workspace_snapshots(tmp_path) -> None: + workspace_id = str(uuid4()) + service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces")) + + service.record_workspace_snapshot( + workspace_id, + { + "ltv_ratio": Decimal("0.74"), + "margin_call_ltv": Decimal("0.80"), + "loan_amount": Decimal("222000"), + "gold_value": Decimal("300000"), + "spot_price": Decimal("4041.9"), + "quote_source": "yfinance", + "quote_updated_at": "2026-03-20T00:00:00+00:00", + }, + ) + + payload = json.loads((tmp_path / "workspaces" / workspace_id / "ltv_history.json").read_text()) + + assert payload[0]["ltv_ratio"] == {"value": "0.74", "unit": "ratio"} + assert payload[0]["margin_threshold"] == {"value": "0.80", "unit": "ratio"} + assert payload[0]["spot_price"] == {"value": "4041.9", "currency": "USD", "per_weight_unit": "ozt"} + assert payload[0]["loan_amount"] == {"value": "222000", "currency": "USD"} + assert payload[0]["collateral_value"] == {"value": "300000", "currency": "USD"} + + +def test_ltv_history_service_replaces_same_day_snapshot_and_builds_range_models(tmp_path) -> None: + workspace_id = str(uuid4()) + service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces")) + + service.record_workspace_snapshot( + workspace_id, + { + "ltv_ratio": Decimal("0.70"), + "margin_call_ltv": Decimal("0.80"), + "loan_amount": Decimal("210000"), + "gold_value": Decimal("300000"), + "spot_price": Decimal("4100"), + "quote_source": "seed", + "quote_updated_at": "2026-01-01T00:00:00+00:00", + }, + ) + service.record_workspace_snapshot( + workspace_id, + { + "ltv_ratio": Decimal("0.75"), + "margin_call_ltv": Decimal("0.80"), + "loan_amount": Decimal("225000"), + "gold_value": Decimal("300000"), + "spot_price": Decimal("4000"), + "quote_source": "seed", + "quote_updated_at": "2026-03-15T00:00:00+00:00", + }, + ) + service.record_workspace_snapshot( + workspace_id, + { + "ltv_ratio": Decimal("0.76"), + "margin_call_ltv": Decimal("0.80"), + "loan_amount": Decimal("228000"), + "gold_value": Decimal("300000"), + "spot_price": Decimal("3990"), + "quote_source": "seed", + "quote_updated_at": "2026-03-15T12:00:00+00:00", + }, + ) + snapshots = service.record_workspace_snapshot( + workspace_id, + { + "ltv_ratio": Decimal("0.78"), + "margin_call_ltv": Decimal("0.80"), + "loan_amount": Decimal("234000"), + "gold_value": Decimal("300000"), + "spot_price": Decimal("3950"), + "quote_source": "seed", + "quote_updated_at": "2026-03-20T00:00:00+00:00", + }, + ) + + assert [snapshot.snapshot_date for snapshot in snapshots] == ["2026-01-01", "2026-03-15", "2026-03-20"] + assert str(snapshots[1].ltv_ratio) == "0.76" + + chart_7 = service.chart_model(snapshots, days=7, current_margin_threshold=Decimal("0.80")) + chart_30 = service.chart_model(snapshots, days=30, current_margin_threshold=Decimal("0.80")) + chart_90 = service.chart_model(snapshots, days=90, current_margin_threshold=Decimal("0.80")) + + assert chart_7.title == "7 Day" + assert chart_7.labels == ("2026-03-15", "2026-03-20") + assert chart_7.ltv_values == (76.0, 78.0) + assert chart_7.threshold_values == (80.0, 80.0) + assert chart_30.labels == ("2026-03-15", "2026-03-20") + assert chart_30.threshold_values == (80.0, 80.0) + assert chart_90.labels == ("2026-01-01", "2026-03-15", "2026-03-20") + + +def test_ltv_history_repository_rejects_invalid_numeric_and_date_payloads(tmp_path) -> None: + workspace_id = str(uuid4()) + repo = LtvHistoryRepository(base_path=tmp_path / "workspaces") + history_path = tmp_path / "workspaces" / workspace_id / "ltv_history.json" + history_path.parent.mkdir(parents=True, exist_ok=True) + history_path.write_text( + json.dumps( + [ + { + "snapshot_date": "not-a-date", + "captured_at": "2026-03-20T00:00:00+00:00", + "ltv_ratio": {"value": "bad", "unit": "ratio"}, + "margin_threshold": {"value": "0.80", "unit": "ratio"}, + "loan_amount": {"value": "234000", "currency": "USD"}, + "collateral_value": {"value": "300000", "currency": "USD"}, + "spot_price": {"value": "3950", "currency": "USD", "per_weight_unit": "ozt"}, + "source": "seed", + } + ] + ) + ) + + from app.models.ltv_history import LtvHistoryLoadError + + try: + repo.load(workspace_id) + except LtvHistoryLoadError as exc: + assert "invalid" in str(exc) + else: + raise AssertionError("Expected invalid LTV history payload to raise LtvHistoryLoadError") + + +def test_ltv_history_service_exports_csv(tmp_path) -> None: + workspace_id = str(uuid4()) + service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces")) + + snapshots = service.record_workspace_snapshot( + workspace_id, + { + "ltv_ratio": Decimal("0.78"), + "margin_call_ltv": Decimal("0.80"), + "loan_amount": Decimal("234000"), + "gold_value": Decimal("300000"), + "spot_price": Decimal("3950"), + "quote_source": "seed", + "quote_updated_at": "2026-03-20T00:00:00+00:00", + }, + ) + + csv_content = service.export_csv(snapshots) + rows = list(csv.DictReader(StringIO(csv_content))) + + assert rows == [ + { + "snapshot_date": "2026-03-20", + "captured_at": "2026-03-20T00:00:00+00:00", + "ltv_ratio_pct": "78.0", + "margin_threshold_pct": "80.0", + "loan_amount_usd": "234000", + "collateral_value_usd": "300000", + "spot_price_usd_per_ozt": "3950", + "source": "seed", + } + ] diff --git a/tests/test_overview_ltv_history_playwright.py b/tests/test_overview_ltv_history_playwright.py new file mode 100644 index 0000000..c9a2659 --- /dev/null +++ b/tests/test_overview_ltv_history_playwright.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from pathlib import Path + +from playwright.sync_api import expect, sync_playwright + +BASE_URL = "http://127.0.0.1:8000" +ARTIFACTS = Path("tests/artifacts") +ARTIFACTS.mkdir(parents=True, exist_ok=True) + + +def test_overview_shows_ltv_history_and_exports_csv() -> None: + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page(viewport={"width": 1440, "height": 1000}) + + page.goto(BASE_URL, wait_until="domcontentloaded", timeout=30000) + expect(page.locator("text=Create a private workspace URL").first).to_be_visible(timeout=15000) + page.get_by_role("button", name="Get started").click() + page.wait_for_url(f"{BASE_URL}/*", timeout=15000) + + expect(page.locator("text=Overview").first).to_be_visible(timeout=15000) + expect(page.locator("text=Historical LTV").first).to_be_visible(timeout=15000) + expect(page.locator("text=7 Day").first).to_be_visible(timeout=15000) + expect(page.locator("text=30 Day").first).to_be_visible(timeout=15000) + expect(page.locator("text=90 Day").first).to_be_visible(timeout=15000) + expect(page.get_by_role("button", name="Export CSV")).to_be_visible(timeout=15000) + + series_names = page.evaluate(""" + async () => { + const importMap = JSON.parse(document.querySelector('script[type="importmap"]').textContent).imports; + const mod = await import(importMap['nicegui-echart']); + const chart = mod.echarts.getInstanceByDom(document.querySelector('.nicegui-echart')); + return chart ? chart.getOption().series.map(series => series.name) : []; + } + """) + assert series_names == ["LTV", "Margin threshold"] + + with page.expect_download() as download_info: + page.get_by_role("button", name="Export CSV").click() + download = download_info.value + assert download.suggested_filename.endswith("-ltv-history.csv") + download_path = ARTIFACTS / "ltv-history-export.csv" + download.save_as(str(download_path)) + csv_content = download_path.read_text() + assert ( + "snapshot_date,captured_at,ltv_ratio_pct,margin_threshold_pct,loan_amount_usd,collateral_value_usd,spot_price_usd_per_ozt,source" + in csv_content + ) + + body = page.locator("body").inner_text(timeout=15000) + assert "RuntimeError" not in body + assert "Server error" not in body + assert "Traceback" not in body + page.screenshot(path=str(ARTIFACTS / "overview-ltv-history.png"), full_page=True) + + browser.close()