172 lines
6.4 KiB
Python
172 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
from decimal import Decimal
|
|
from io import StringIO
|
|
from uuid import uuid4
|
|
|
|
from app.models.ltv_history import LtvHistoryRepository
|
|
from app.services.ltv_history import LtvHistoryService
|
|
|
|
|
|
def test_ltv_history_repository_persists_structured_workspace_snapshots(tmp_path) -> None:
|
|
workspace_id = str(uuid4())
|
|
service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces"))
|
|
|
|
service.record_workspace_snapshot(
|
|
workspace_id,
|
|
{
|
|
"ltv_ratio": Decimal("0.74"),
|
|
"margin_call_ltv": Decimal("0.80"),
|
|
"loan_amount": Decimal("222000"),
|
|
"gold_value": Decimal("300000"),
|
|
"spot_price": Decimal("4041.9"),
|
|
"quote_source": "yfinance",
|
|
"quote_updated_at": "2026-03-20T00:00:00+00:00",
|
|
},
|
|
)
|
|
|
|
payload = json.loads((tmp_path / "workspaces" / workspace_id / "ltv_history.json").read_text())
|
|
|
|
assert payload[0]["ltv_ratio"] == {"value": "0.74", "unit": "ratio"}
|
|
assert payload[0]["margin_threshold"] == {"value": "0.80", "unit": "ratio"}
|
|
assert payload[0]["spot_price"] == {"value": "4041.9", "currency": "USD", "per_weight_unit": "ozt"}
|
|
assert payload[0]["loan_amount"] == {"value": "222000", "currency": "USD"}
|
|
assert payload[0]["collateral_value"] == {"value": "300000", "currency": "USD"}
|
|
|
|
|
|
def test_ltv_history_service_replaces_same_day_snapshot_and_builds_range_models(tmp_path) -> None:
|
|
workspace_id = str(uuid4())
|
|
service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces"))
|
|
|
|
service.record_workspace_snapshot(
|
|
workspace_id,
|
|
{
|
|
"ltv_ratio": Decimal("0.70"),
|
|
"margin_call_ltv": Decimal("0.80"),
|
|
"loan_amount": Decimal("210000"),
|
|
"gold_value": Decimal("300000"),
|
|
"spot_price": Decimal("4100"),
|
|
"quote_source": "seed",
|
|
"quote_updated_at": "2026-01-01T00:00:00+00:00",
|
|
},
|
|
)
|
|
service.record_workspace_snapshot(
|
|
workspace_id,
|
|
{
|
|
"ltv_ratio": Decimal("0.75"),
|
|
"margin_call_ltv": Decimal("0.80"),
|
|
"loan_amount": Decimal("225000"),
|
|
"gold_value": Decimal("300000"),
|
|
"spot_price": Decimal("4000"),
|
|
"quote_source": "seed",
|
|
"quote_updated_at": "2026-03-15T00:00:00+00:00",
|
|
},
|
|
)
|
|
service.record_workspace_snapshot(
|
|
workspace_id,
|
|
{
|
|
"ltv_ratio": Decimal("0.76"),
|
|
"margin_call_ltv": Decimal("0.80"),
|
|
"loan_amount": Decimal("228000"),
|
|
"gold_value": Decimal("300000"),
|
|
"spot_price": Decimal("3990"),
|
|
"quote_source": "seed",
|
|
"quote_updated_at": "2026-03-15T12:00:00+00:00",
|
|
},
|
|
)
|
|
snapshots = service.record_workspace_snapshot(
|
|
workspace_id,
|
|
{
|
|
"ltv_ratio": Decimal("0.78"),
|
|
"margin_call_ltv": Decimal("0.80"),
|
|
"loan_amount": Decimal("234000"),
|
|
"gold_value": Decimal("300000"),
|
|
"spot_price": Decimal("3950"),
|
|
"quote_source": "seed",
|
|
"quote_updated_at": "2026-03-20T00:00:00+00:00",
|
|
},
|
|
)
|
|
|
|
assert [snapshot.snapshot_date for snapshot in snapshots] == ["2026-01-01", "2026-03-15", "2026-03-20"]
|
|
assert str(snapshots[1].ltv_ratio) == "0.76"
|
|
|
|
chart_7 = service.chart_model(snapshots, days=7, current_margin_threshold=Decimal("0.80"))
|
|
chart_30 = service.chart_model(snapshots, days=30, current_margin_threshold=Decimal("0.80"))
|
|
chart_90 = service.chart_model(snapshots, days=90, current_margin_threshold=Decimal("0.80"))
|
|
|
|
assert chart_7.title == "7 Day"
|
|
assert chart_7.labels == ("2026-03-15", "2026-03-20")
|
|
assert chart_7.ltv_values == (76.0, 78.0)
|
|
assert chart_7.threshold_values == (80.0, 80.0)
|
|
assert chart_30.labels == ("2026-03-15", "2026-03-20")
|
|
assert chart_30.threshold_values == (80.0, 80.0)
|
|
assert chart_90.labels == ("2026-01-01", "2026-03-15", "2026-03-20")
|
|
|
|
|
|
def test_ltv_history_repository_rejects_invalid_numeric_and_date_payloads(tmp_path) -> None:
|
|
workspace_id = str(uuid4())
|
|
repo = LtvHistoryRepository(base_path=tmp_path / "workspaces")
|
|
history_path = tmp_path / "workspaces" / workspace_id / "ltv_history.json"
|
|
history_path.parent.mkdir(parents=True, exist_ok=True)
|
|
history_path.write_text(
|
|
json.dumps(
|
|
[
|
|
{
|
|
"snapshot_date": "not-a-date",
|
|
"captured_at": "2026-03-20T00:00:00+00:00",
|
|
"ltv_ratio": {"value": "bad", "unit": "ratio"},
|
|
"margin_threshold": {"value": "0.80", "unit": "ratio"},
|
|
"loan_amount": {"value": "234000", "currency": "USD"},
|
|
"collateral_value": {"value": "300000", "currency": "USD"},
|
|
"spot_price": {"value": "3950", "currency": "USD", "per_weight_unit": "ozt"},
|
|
"source": "seed",
|
|
}
|
|
]
|
|
)
|
|
)
|
|
|
|
from app.models.ltv_history import LtvHistoryLoadError
|
|
|
|
try:
|
|
repo.load(workspace_id)
|
|
except LtvHistoryLoadError as exc:
|
|
assert "invalid" in str(exc)
|
|
else:
|
|
raise AssertionError("Expected invalid LTV history payload to raise LtvHistoryLoadError")
|
|
|
|
|
|
def test_ltv_history_service_exports_csv(tmp_path) -> None:
|
|
workspace_id = str(uuid4())
|
|
service = LtvHistoryService(repository=LtvHistoryRepository(base_path=tmp_path / "workspaces"))
|
|
|
|
snapshots = service.record_workspace_snapshot(
|
|
workspace_id,
|
|
{
|
|
"ltv_ratio": Decimal("0.78"),
|
|
"margin_call_ltv": Decimal("0.80"),
|
|
"loan_amount": Decimal("234000"),
|
|
"gold_value": Decimal("300000"),
|
|
"spot_price": Decimal("3950"),
|
|
"quote_source": "seed",
|
|
"quote_updated_at": "2026-03-20T00:00:00+00:00",
|
|
},
|
|
)
|
|
|
|
csv_content = service.export_csv(snapshots)
|
|
rows = list(csv.DictReader(StringIO(csv_content)))
|
|
|
|
assert rows == [
|
|
{
|
|
"snapshot_date": "2026-03-20",
|
|
"captured_at": "2026-03-20T00:00:00+00:00",
|
|
"ltv_ratio_pct": "78.0",
|
|
"margin_threshold_pct": "80.0",
|
|
"loan_amount_usd": "234000",
|
|
"collateral_value_usd": "300000",
|
|
"spot_price_usd_per_ozt": "3950",
|
|
"source": "seed",
|
|
}
|
|
]
|