from __future__ import annotations import csv import json from decimal import Decimal from io import StringIO from uuid import uuid4 from app.models.ltv_history import LtvHistoryRepository from app.services.ltv_history import LtvHistoryService def test_ltv_history_repository_persists_structured_workspace_snapshots(tmp_path) -> None: workspace_id = str(uuid4()) service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces")) service.record_workspace_snapshot( workspace_id, { "ltv_ratio": Decimal("0.74"), "margin_call_ltv": Decimal("0.80"), "loan_amount": Decimal("222000"), "gold_value": Decimal("300000"), "spot_price": Decimal("4041.9"), "quote_source": "yfinance", "quote_updated_at": "2026-03-20T00:00:00+00:00", }, ) payload = json.loads((tmp_path / "workspaces" / workspace_id / "ltv_history.json").read_text()) assert payload[0]["ltv_ratio"] == {"value": "0.74", "unit": "ratio"} assert payload[0]["margin_threshold"] == {"value": "0.80", "unit": "ratio"} assert payload[0]["spot_price"] == {"value": "4041.9", "currency": "USD", "per_weight_unit": "ozt"} assert payload[0]["loan_amount"] == {"value": "222000", "currency": "USD"} assert payload[0]["collateral_value"] == {"value": "300000", "currency": "USD"} def test_ltv_history_service_replaces_same_day_snapshot_and_builds_range_models(tmp_path) -> None: workspace_id = str(uuid4()) service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces")) service.record_workspace_snapshot( workspace_id, { "ltv_ratio": Decimal("0.70"), "margin_call_ltv": Decimal("0.80"), "loan_amount": Decimal("210000"), "gold_value": Decimal("300000"), "spot_price": Decimal("4100"), "quote_source": "seed", "quote_updated_at": "2026-01-01T00:00:00+00:00", }, ) service.record_workspace_snapshot( workspace_id, { "ltv_ratio": Decimal("0.75"), "margin_call_ltv": Decimal("0.80"), "loan_amount": Decimal("225000"), "gold_value": Decimal("300000"), "spot_price": Decimal("4000"), "quote_source": "seed", "quote_updated_at": "2026-03-15T00:00:00+00:00", }, ) service.record_workspace_snapshot( workspace_id, { "ltv_ratio": Decimal("0.76"), "margin_call_ltv": Decimal("0.80"), "loan_amount": Decimal("228000"), "gold_value": Decimal("300000"), "spot_price": Decimal("3990"), "quote_source": "seed", "quote_updated_at": "2026-03-15T12:00:00+00:00", }, ) snapshots = service.record_workspace_snapshot( workspace_id, { "ltv_ratio": Decimal("0.78"), "margin_call_ltv": Decimal("0.80"), "loan_amount": Decimal("234000"), "gold_value": Decimal("300000"), "spot_price": Decimal("3950"), "quote_source": "seed", "quote_updated_at": "2026-03-20T00:00:00+00:00", }, ) assert [snapshot.snapshot_date for snapshot in snapshots] == ["2026-01-01", "2026-03-15", "2026-03-20"] assert str(snapshots[1].ltv_ratio) == "0.76" chart_7 = service.chart_model(snapshots, days=7, current_margin_threshold=Decimal("0.80")) chart_30 = service.chart_model(snapshots, days=30, current_margin_threshold=Decimal("0.80")) chart_90 = service.chart_model(snapshots, days=90, current_margin_threshold=Decimal("0.80")) assert chart_7.title == "7 Day" assert chart_7.labels == ("2026-03-15", "2026-03-20") assert chart_7.ltv_values == (76.0, 78.0) assert chart_7.threshold_values == (80.0, 80.0) assert chart_30.labels == ("2026-03-15", "2026-03-20") assert chart_30.threshold_values == (80.0, 80.0) assert chart_90.labels == ("2026-01-01", "2026-03-15", "2026-03-20") def test_ltv_history_repository_rejects_invalid_numeric_and_date_payloads(tmp_path) -> None: workspace_id = str(uuid4()) repo = LtvHistoryRepository(base_path=tmp_path / "workspaces") history_path = tmp_path / "workspaces" / workspace_id / "ltv_history.json" history_path.parent.mkdir(parents=True, exist_ok=True) history_path.write_text( json.dumps( [ { "snapshot_date": "not-a-date", "captured_at": "2026-03-20T00:00:00+00:00", "ltv_ratio": {"value": "bad", "unit": "ratio"}, "margin_threshold": {"value": "0.80", "unit": "ratio"}, "loan_amount": {"value": "234000", "currency": "USD"}, "collateral_value": {"value": "300000", "currency": "USD"}, "spot_price": {"value": "3950", "currency": "USD", "per_weight_unit": "ozt"}, "source": "seed", } ] ) ) from app.models.ltv_history import LtvHistoryLoadError try: repo.load(workspace_id) except LtvHistoryLoadError as exc: assert "invalid" in str(exc) else: raise AssertionError("Expected invalid LTV history payload to raise LtvHistoryLoadError") def test_ltv_history_service_exports_csv(tmp_path) -> None: workspace_id = str(uuid4()) service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces")) snapshots = service.record_workspace_snapshot( workspace_id, { "ltv_ratio": Decimal("0.78"), "margin_call_ltv": Decimal("0.80"), "loan_amount": Decimal("234000"), "gold_value": Decimal("300000"), "spot_price": Decimal("3950"), "quote_source": "seed", "quote_updated_at": "2026-03-20T00:00:00+00:00", }, ) csv_content = service.export_csv(snapshots) rows = list(csv.DictReader(StringIO(csv_content))) assert rows == [ { "snapshot_date": "2026-03-20", "captured_at": "2026-03-20T00:00:00+00:00", "ltv_ratio_pct": "78.0", "margin_threshold_pct": "80.0", "loan_amount_usd": "234000", "collateral_value_usd": "300000", "spot_price_usd_per_ozt": "3950", "source": "seed", } ]