from __future__ import annotations import json from dataclasses import dataclass from datetime import date, datetime from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Any class LtvHistoryLoadError(RuntimeError): def __init__(self, history_path: Path, message: str) -> None: super().__init__(message) self.history_path = history_path @dataclass(frozen=True) class LtvSnapshot: snapshot_date: str captured_at: str ltv_ratio: Decimal margin_threshold: Decimal loan_amount: Decimal collateral_value: Decimal spot_price: Decimal source: str def __post_init__(self) -> None: for field_name in ("snapshot_date", "captured_at", "source"): value = getattr(self, field_name) if not isinstance(value, str) or not value.strip(): raise ValueError(f"{field_name} must be a non-empty string") date.fromisoformat(self.snapshot_date) datetime.fromisoformat(self.captured_at.replace("Z", "+00:00")) for field_name in ( "ltv_ratio", "margin_threshold", "loan_amount", "collateral_value", "spot_price", ): value = getattr(self, field_name) if not isinstance(value, Decimal) or not value.is_finite(): raise TypeError(f"{field_name} must be a finite Decimal") if self.ltv_ratio < 0: raise ValueError("ltv_ratio must be zero or greater") if not Decimal("0") < self.margin_threshold < Decimal("1"): raise ValueError("margin_threshold must be between 0 and 1") if self.loan_amount < 0: raise ValueError("loan_amount must be zero or greater") if self.collateral_value <= 0: raise ValueError("collateral_value must be positive") if self.spot_price <= 0: raise ValueError("spot_price must be positive") def to_dict(self) -> dict[str, Any]: return { "snapshot_date": self.snapshot_date, "captured_at": self.captured_at, "ltv_ratio": _structured_ratio_payload(self.ltv_ratio), "margin_threshold": _structured_ratio_payload(self.margin_threshold), "loan_amount": _structured_money_payload(self.loan_amount), "collateral_value": _structured_money_payload(self.collateral_value), "spot_price": _structured_price_payload(self.spot_price), "source": self.source, } @classmethod def from_dict(cls, data: dict[str, Any]) -> "LtvSnapshot": return cls( snapshot_date=_require_non_empty_string(data, "snapshot_date"), captured_at=_require_non_empty_string(data, "captured_at"), ltv_ratio=_parse_ratio_payload(data.get("ltv_ratio"), field_name="ltv_ratio"), margin_threshold=_parse_ratio_payload(data.get("margin_threshold"), field_name="margin_threshold"), loan_amount=_parse_money_payload(data.get("loan_amount"), field_name="loan_amount"), collateral_value=_parse_money_payload(data.get("collateral_value"), field_name="collateral_value"), spot_price=_parse_price_payload(data.get("spot_price"), field_name="spot_price"), source=_require_non_empty_string(data, "source"), ) class LtvHistoryRepository: def __init__(self, base_path: Path | str = Path("data/workspaces")) -> None: self.base_path = Path(base_path) self.base_path.mkdir(parents=True, exist_ok=True) def load(self, workspace_id: str) -> list[LtvSnapshot]: history_path = self.history_path(workspace_id) if not history_path.exists(): return [] try: payload = json.loads(history_path.read_text()) except json.JSONDecodeError as exc: raise LtvHistoryLoadError(history_path, f"LTV history is not valid JSON: {exc}") from exc except OSError as exc: raise LtvHistoryLoadError(history_path, f"LTV history could not be read: {exc}") from exc if not isinstance(payload, list): raise LtvHistoryLoadError(history_path, "LTV history payload must be a list") snapshots: list[LtvSnapshot] = [] for index, item in enumerate(payload): if not isinstance(item, dict): raise LtvHistoryLoadError(history_path, f"LTV history entry {index} must be an object") try: snapshots.append(LtvSnapshot.from_dict(item)) except (TypeError, ValueError, KeyError) as exc: raise LtvHistoryLoadError(history_path, f"LTV history entry {index} is invalid: {exc}") from exc return snapshots def save(self, workspace_id: str, snapshots: list[LtvSnapshot]) -> None: history_path = self.history_path(workspace_id) history_path.parent.mkdir(parents=True, exist_ok=True) history_path.write_text(json.dumps([snapshot.to_dict() for snapshot in snapshots], indent=2)) def history_path(self, workspace_id: str) -> Path: return self.base_path / workspace_id / "ltv_history.json" def _require_non_empty_string(data: dict[str, Any], field_name: str) -> str: value = data.get(field_name) if not isinstance(value, str) or not value.strip(): raise ValueError(f"{field_name} must be a non-empty string") return value def _decimal_text(value: Decimal) -> str: if value == value.to_integral(): return str(value.quantize(Decimal("1"))) normalized = value.normalize() exponent = normalized.as_tuple().exponent if isinstance(exponent, int) and exponent < 0: return format(normalized, "f") return str(normalized) def _parse_decimal_payload( payload: object, *, field_name: str, expected_tag_key: str, expected_tag_value: str, expected_currency: str | None = None, expected_per_weight_unit: str | None = None, ) -> Decimal: if not isinstance(payload, dict): raise TypeError(f"{field_name} must be an object") if payload.get(expected_tag_key) != expected_tag_value: raise ValueError(f"{field_name} must declare {expected_tag_key}={expected_tag_value!r}") if expected_currency is not None and payload.get("currency") != expected_currency: raise ValueError(f"{field_name} must declare currency={expected_currency!r}") if expected_per_weight_unit is not None and payload.get("per_weight_unit") != expected_per_weight_unit: raise ValueError(f"{field_name} must declare per_weight_unit={expected_per_weight_unit!r}") raw_value = payload.get("value") if not isinstance(raw_value, str) or not raw_value.strip(): raise ValueError(f"{field_name}.value must be a non-empty string") try: value = Decimal(raw_value) except InvalidOperation as exc: raise ValueError(f"{field_name}.value must be numeric") from exc if not value.is_finite(): raise ValueError(f"{field_name}.value must be finite") return value def _parse_ratio_payload(payload: object, *, field_name: str) -> Decimal: return _parse_decimal_payload(payload, field_name=field_name, expected_tag_key="unit", expected_tag_value="ratio") def _parse_money_payload(payload: object, *, field_name: str) -> Decimal: return _parse_decimal_payload( payload, field_name=field_name, expected_tag_key="currency", expected_tag_value="USD", expected_currency="USD", ) def _parse_price_payload(payload: object, *, field_name: str) -> Decimal: return _parse_decimal_payload( payload, field_name=field_name, expected_tag_key="currency", expected_tag_value="USD", expected_currency="USD", expected_per_weight_unit="ozt", ) def _structured_ratio_payload(value: Decimal) -> dict[str, str]: return {"value": str(value), "unit": "ratio"} def _structured_money_payload(value: Decimal) -> dict[str, str]: return {"value": _decimal_text(value), "currency": "USD"} def _structured_price_payload(value: Decimal) -> dict[str, str]: return {"value": _decimal_text(value), "currency": "USD", "per_weight_unit": "ozt"}