- Fix return type annotation for get_default_premium_for_product - Add type narrowing for Weight|Money union using _as_money helper - Add isinstance checks before float() calls for object types - Add type guard for Decimal.exponent comparison - Use _unit_typed and _currency_typed properties for type narrowing - Cast option_type to OptionType Literal after validation - Fix provider type hierarchy in backtesting services - Add types-requests to dev dependencies - Remove '|| true' from CI type-check job All 36 mypy errors resolved across 15 files.
138 lines
5.4 KiB
Python
138 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from decimal import Decimal
|
|
from io import StringIO
|
|
from typing import Mapping
|
|
|
|
from app.models.ltv_history import LtvHistoryRepository, LtvSnapshot
|
|
from app.services.boundary_values import boundary_decimal
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class LtvHistoryChartModel:
|
|
title: str
|
|
labels: tuple[str, ...]
|
|
ltv_values: tuple[float, ...]
|
|
threshold_values: tuple[float, ...]
|
|
|
|
|
|
class LtvHistoryService:
|
|
def __init__(self, repository: LtvHistoryRepository | None = None) -> None:
|
|
self.repository = repository or LtvHistoryRepository()
|
|
|
|
def record_workspace_snapshot(self, workspace_id: str, portfolio: Mapping[str, object]) -> list[LtvSnapshot]:
|
|
snapshots = self.repository.load(workspace_id)
|
|
snapshot = self._build_snapshot(portfolio)
|
|
updated: list[LtvSnapshot] = []
|
|
replaced = False
|
|
for existing in snapshots:
|
|
if existing.snapshot_date == snapshot.snapshot_date:
|
|
updated.append(snapshot)
|
|
replaced = True
|
|
else:
|
|
updated.append(existing)
|
|
if not replaced:
|
|
updated.append(snapshot)
|
|
updated.sort(key=lambda item: (item.snapshot_date, item.captured_at))
|
|
self.repository.save(workspace_id, updated)
|
|
return updated
|
|
|
|
@staticmethod
|
|
def chart_model(
|
|
snapshots: list[LtvSnapshot],
|
|
*,
|
|
days: int,
|
|
current_margin_threshold: Decimal | float | str | None = None,
|
|
) -> LtvHistoryChartModel:
|
|
if days <= 0:
|
|
raise ValueError("days must be positive")
|
|
title = f"{days} Day"
|
|
if not snapshots:
|
|
return LtvHistoryChartModel(title=title, labels=(), ltv_values=(), threshold_values=())
|
|
latest_date = max(datetime.fromisoformat(item.snapshot_date).date() for item in snapshots)
|
|
cutoff_date = latest_date - timedelta(days=days - 1)
|
|
filtered = [item for item in snapshots if datetime.fromisoformat(item.snapshot_date).date() >= cutoff_date]
|
|
threshold = (
|
|
boundary_decimal(current_margin_threshold, field_name="current_margin_threshold")
|
|
if current_margin_threshold is not None
|
|
else filtered[-1].margin_threshold
|
|
)
|
|
threshold_value = round(float(threshold * Decimal("100")), 1)
|
|
return LtvHistoryChartModel(
|
|
title=title,
|
|
labels=tuple(item.snapshot_date for item in filtered),
|
|
ltv_values=tuple(round(float(item.ltv_ratio * Decimal("100")), 1) for item in filtered),
|
|
threshold_values=tuple(threshold_value for _ in filtered),
|
|
)
|
|
|
|
@staticmethod
|
|
def export_csv(snapshots: list[LtvSnapshot]) -> str:
|
|
output = StringIO()
|
|
writer = csv.DictWriter(
|
|
output,
|
|
fieldnames=[
|
|
"snapshot_date",
|
|
"captured_at",
|
|
"ltv_ratio_pct",
|
|
"margin_threshold_pct",
|
|
"loan_amount_usd",
|
|
"collateral_value_usd",
|
|
"spot_price_usd_per_ozt",
|
|
"source",
|
|
],
|
|
)
|
|
writer.writeheader()
|
|
for snapshot in snapshots:
|
|
writer.writerow(
|
|
{
|
|
"snapshot_date": snapshot.snapshot_date,
|
|
"captured_at": snapshot.captured_at,
|
|
"ltv_ratio_pct": f"{float(snapshot.ltv_ratio * Decimal('100')):.1f}",
|
|
"margin_threshold_pct": f"{float(snapshot.margin_threshold * Decimal('100')):.1f}",
|
|
"loan_amount_usd": _decimal_text(snapshot.loan_amount),
|
|
"collateral_value_usd": _decimal_text(snapshot.collateral_value),
|
|
"spot_price_usd_per_ozt": _decimal_text(snapshot.spot_price),
|
|
"source": snapshot.source,
|
|
}
|
|
)
|
|
return output.getvalue()
|
|
|
|
@staticmethod
|
|
def _build_snapshot(portfolio: Mapping[str, object]) -> LtvSnapshot:
|
|
captured_at = _normalize_timestamp(str(portfolio.get("quote_updated_at", "")))
|
|
return LtvSnapshot(
|
|
snapshot_date=captured_at[:10],
|
|
captured_at=captured_at,
|
|
ltv_ratio=boundary_decimal(portfolio.get("ltv_ratio"), field_name="portfolio.ltv_ratio"),
|
|
margin_threshold=boundary_decimal(
|
|
portfolio.get("margin_call_ltv"),
|
|
field_name="portfolio.margin_call_ltv",
|
|
),
|
|
loan_amount=boundary_decimal(portfolio.get("loan_amount"), field_name="portfolio.loan_amount"),
|
|
collateral_value=boundary_decimal(portfolio.get("gold_value"), field_name="portfolio.gold_value"),
|
|
spot_price=boundary_decimal(portfolio.get("spot_price"), field_name="portfolio.spot_price"),
|
|
source=str(portfolio.get("quote_source", "unknown")) or "unknown",
|
|
)
|
|
|
|
|
|
def _normalize_timestamp(value: str) -> str:
|
|
if value:
|
|
try:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC).isoformat()
|
|
except ValueError:
|
|
pass
|
|
return datetime.now(UTC).replace(microsecond=0).isoformat()
|
|
|
|
|
|
def _decimal_text(value: Decimal) -> str:
|
|
if value == value.to_integral():
|
|
return str(value.quantize(Decimal("1")))
|
|
normalized = value.normalize()
|
|
exponent = normalized.as_tuple().exponent
|
|
if isinstance(exponent, int) and exponent < 0:
|
|
return format(normalized, "f")
|
|
return str(normalized)
|