feat(PORT-003): add historical ltv charts

This commit is contained in:
Bu5hm4nn
2026-03-27 16:39:33 +01:00
parent b3418eed2e
commit 1a6760bee3
8 changed files with 670 additions and 15 deletions

134
app/services/ltv_history.py Normal file
View File

@@ -0,0 +1,134 @@
from __future__ import annotations
import csv
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from io import StringIO
from typing import Mapping
from app.models.ltv_history import LtvHistoryRepository, LtvSnapshot
from app.services.boundary_values import boundary_decimal
@dataclass(frozen=True)
class LtvHistoryChartModel:
title: str
labels: tuple[str, ...]
ltv_values: tuple[float, ...]
threshold_values: tuple[float, ...]
class LtvHistoryService:
def __init__(self, repository: LtvHistoryRepository | None = None) -> None:
self.repository = repository or LtvHistoryRepository()
def record_workspace_snapshot(self, workspace_id: str, portfolio: Mapping[str, object]) -> list[LtvSnapshot]:
snapshots = self.repository.load(workspace_id)
snapshot = self._build_snapshot(portfolio)
updated: list[LtvSnapshot] = []
replaced = False
for existing in snapshots:
if existing.snapshot_date == snapshot.snapshot_date:
updated.append(snapshot)
replaced = True
else:
updated.append(existing)
if not replaced:
updated.append(snapshot)
updated.sort(key=lambda item: (item.snapshot_date, item.captured_at))
self.repository.save(workspace_id, updated)
return updated
@staticmethod
def chart_model(
snapshots: list[LtvSnapshot],
*,
days: int,
current_margin_threshold: Decimal | float | str | None = None,
) -> LtvHistoryChartModel:
if days <= 0:
raise ValueError("days must be positive")
title = f"{days} Day"
if not snapshots:
return LtvHistoryChartModel(title=title, labels=(), ltv_values=(), threshold_values=())
latest_date = max(datetime.fromisoformat(item.snapshot_date).date() for item in snapshots)
cutoff_date = latest_date - timedelta(days=days - 1)
filtered = [item for item in snapshots if datetime.fromisoformat(item.snapshot_date).date() >= cutoff_date]
threshold = (
boundary_decimal(current_margin_threshold, field_name="current_margin_threshold")
if current_margin_threshold is not None
else filtered[-1].margin_threshold
)
threshold_value = round(float(threshold * Decimal("100")), 1)
return LtvHistoryChartModel(
title=title,
labels=tuple(item.snapshot_date for item in filtered),
ltv_values=tuple(round(float(item.ltv_ratio * Decimal("100")), 1) for item in filtered),
threshold_values=tuple(threshold_value for _ in filtered),
)
@staticmethod
def export_csv(snapshots: list[LtvSnapshot]) -> str:
output = StringIO()
writer = csv.DictWriter(
output,
fieldnames=[
"snapshot_date",
"captured_at",
"ltv_ratio_pct",
"margin_threshold_pct",
"loan_amount_usd",
"collateral_value_usd",
"spot_price_usd_per_ozt",
"source",
],
)
writer.writeheader()
for snapshot in snapshots:
writer.writerow(
{
"snapshot_date": snapshot.snapshot_date,
"captured_at": snapshot.captured_at,
"ltv_ratio_pct": f"{float(snapshot.ltv_ratio * Decimal('100')):.1f}",
"margin_threshold_pct": f"{float(snapshot.margin_threshold * Decimal('100')):.1f}",
"loan_amount_usd": _decimal_text(snapshot.loan_amount),
"collateral_value_usd": _decimal_text(snapshot.collateral_value),
"spot_price_usd_per_ozt": _decimal_text(snapshot.spot_price),
"source": snapshot.source,
}
)
return output.getvalue()
@staticmethod
def _build_snapshot(portfolio: Mapping[str, object]) -> LtvSnapshot:
captured_at = _normalize_timestamp(str(portfolio.get("quote_updated_at", "")))
return LtvSnapshot(
snapshot_date=captured_at[:10],
captured_at=captured_at,
ltv_ratio=boundary_decimal(portfolio.get("ltv_ratio"), field_name="portfolio.ltv_ratio"),
margin_threshold=boundary_decimal(
portfolio.get("margin_call_ltv"),
field_name="portfolio.margin_call_ltv",
),
loan_amount=boundary_decimal(portfolio.get("loan_amount"), field_name="portfolio.loan_amount"),
collateral_value=boundary_decimal(portfolio.get("gold_value"), field_name="portfolio.gold_value"),
spot_price=boundary_decimal(portfolio.get("spot_price"), field_name="portfolio.spot_price"),
source=str(portfolio.get("quote_source", "unknown")) or "unknown",
)
def _normalize_timestamp(value: str) -> str:
if value:
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).isoformat()
except ValueError:
pass
return datetime.now(UTC).replace(microsecond=0).isoformat()
def _decimal_text(value: Decimal) -> str:
if value == value.to_integral():
return str(value.quantize(Decimal("1")))
normalized = value.normalize()
return format(normalized, "f") if normalized.as_tuple().exponent < 0 else str(normalized)