Files
vault-dash/app/services/ltv_history.py
Bu5hm4nn 887565be74 fix(types): resolve all mypy type errors (CORE-003)
- Fix return type annotation for get_default_premium_for_product
- Add type narrowing for Weight|Money union using _as_money helper
- Add isinstance checks before float() calls for object types
- Add type guard for Decimal.exponent comparison
- Use _unit_typed and _currency_typed properties for type narrowing
- Cast option_type to OptionType Literal after validation
- Fix provider type hierarchy in backtesting services
- Add types-requests to dev dependencies
- Remove '|| true' from CI type-check job

All 36 mypy errors resolved across 15 files.
2026-03-30 00:05:09 +02:00

138 lines
5.4 KiB
Python

from __future__ import annotations
import csv
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal
from io import StringIO
from typing import Mapping
from app.models.ltv_history import LtvHistoryRepository, LtvSnapshot
from app.services.boundary_values import boundary_decimal
@dataclass(frozen=True)
class LtvHistoryChartModel:
title: str
labels: tuple[str, ...]
ltv_values: tuple[float, ...]
threshold_values: tuple[float, ...]
class LtvHistoryService:
def __init__(self, repository: LtvHistoryRepository | None = None) -> None:
self.repository = repository or LtvHistoryRepository()
def record_workspace_snapshot(self, workspace_id: str, portfolio: Mapping[str, object]) -> list[LtvSnapshot]:
snapshots = self.repository.load(workspace_id)
snapshot = self._build_snapshot(portfolio)
updated: list[LtvSnapshot] = []
replaced = False
for existing in snapshots:
if existing.snapshot_date == snapshot.snapshot_date:
updated.append(snapshot)
replaced = True
else:
updated.append(existing)
if not replaced:
updated.append(snapshot)
updated.sort(key=lambda item: (item.snapshot_date, item.captured_at))
self.repository.save(workspace_id, updated)
return updated
@staticmethod
def chart_model(
snapshots: list[LtvSnapshot],
*,
days: int,
current_margin_threshold: Decimal | float | str | None = None,
) -> LtvHistoryChartModel:
if days <= 0:
raise ValueError("days must be positive")
title = f"{days} Day"
if not snapshots:
return LtvHistoryChartModel(title=title, labels=(), ltv_values=(), threshold_values=())
latest_date = max(datetime.fromisoformat(item.snapshot_date).date() for item in snapshots)
cutoff_date = latest_date - timedelta(days=days - 1)
filtered = [item for item in snapshots if datetime.fromisoformat(item.snapshot_date).date() >= cutoff_date]
threshold = (
boundary_decimal(current_margin_threshold, field_name="current_margin_threshold")
if current_margin_threshold is not None
else filtered[-1].margin_threshold
)
threshold_value = round(float(threshold * Decimal("100")), 1)
return LtvHistoryChartModel(
title=title,
labels=tuple(item.snapshot_date for item in filtered),
ltv_values=tuple(round(float(item.ltv_ratio * Decimal("100")), 1) for item in filtered),
threshold_values=tuple(threshold_value for _ in filtered),
)
@staticmethod
def export_csv(snapshots: list[LtvSnapshot]) -> str:
output = StringIO()
writer = csv.DictWriter(
output,
fieldnames=[
"snapshot_date",
"captured_at",
"ltv_ratio_pct",
"margin_threshold_pct",
"loan_amount_usd",
"collateral_value_usd",
"spot_price_usd_per_ozt",
"source",
],
)
writer.writeheader()
for snapshot in snapshots:
writer.writerow(
{
"snapshot_date": snapshot.snapshot_date,
"captured_at": snapshot.captured_at,
"ltv_ratio_pct": f"{float(snapshot.ltv_ratio * Decimal('100')):.1f}",
"margin_threshold_pct": f"{float(snapshot.margin_threshold * Decimal('100')):.1f}",
"loan_amount_usd": _decimal_text(snapshot.loan_amount),
"collateral_value_usd": _decimal_text(snapshot.collateral_value),
"spot_price_usd_per_ozt": _decimal_text(snapshot.spot_price),
"source": snapshot.source,
}
)
return output.getvalue()
@staticmethod
def _build_snapshot(portfolio: Mapping[str, object]) -> LtvSnapshot:
captured_at = _normalize_timestamp(str(portfolio.get("quote_updated_at", "")))
return LtvSnapshot(
snapshot_date=captured_at[:10],
captured_at=captured_at,
ltv_ratio=boundary_decimal(portfolio.get("ltv_ratio"), field_name="portfolio.ltv_ratio"),
margin_threshold=boundary_decimal(
portfolio.get("margin_call_ltv"),
field_name="portfolio.margin_call_ltv",
),
loan_amount=boundary_decimal(portfolio.get("loan_amount"), field_name="portfolio.loan_amount"),
collateral_value=boundary_decimal(portfolio.get("gold_value"), field_name="portfolio.gold_value"),
spot_price=boundary_decimal(portfolio.get("spot_price"), field_name="portfolio.spot_price"),
source=str(portfolio.get("quote_source", "unknown")) or "unknown",
)
def _normalize_timestamp(value: str) -> str:
if value:
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).isoformat()
except ValueError:
pass
return datetime.now(UTC).replace(microsecond=0).isoformat()
def _decimal_text(value: Decimal) -> str:
if value == value.to_integral():
return str(value.quantize(Decimal("1")))
normalized = value.normalize()
exponent = normalized.as_tuple().exponent
if isinstance(exponent, int) and exponent < 0:
return format(normalized, "f")
return str(normalized)