156 lines
5.2 KiB
Python
156 lines
5.2 KiB
Python
"""Alert evaluation and history persistence."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal
|
|
from typing import Mapping
|
|
|
|
from app.domain.portfolio_math import build_alert_context
|
|
from app.models.alerts import AlertEvent, AlertHistoryLoadError, AlertHistoryRepository, AlertStatus
|
|
from app.models.portfolio import PortfolioConfig
|
|
from app.services.boundary_values import boundary_decimal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class AlertEvaluationInput:
|
|
ltv_ratio: Decimal
|
|
spot_price: Decimal
|
|
updated_at: str
|
|
warning_threshold: Decimal
|
|
critical_threshold: Decimal
|
|
email_alerts_enabled: bool
|
|
|
|
|
|
def _normalize_alert_evaluation_input(
|
|
config: PortfolioConfig,
|
|
portfolio: Mapping[str, object],
|
|
) -> AlertEvaluationInput:
|
|
return AlertEvaluationInput(
|
|
ltv_ratio=boundary_decimal(
|
|
portfolio.get("ltv_ratio"),
|
|
field_name="portfolio.ltv_ratio",
|
|
),
|
|
spot_price=boundary_decimal(
|
|
portfolio.get("spot_price"),
|
|
field_name="portfolio.spot_price",
|
|
),
|
|
updated_at=str(portfolio.get("quote_updated_at", "")),
|
|
warning_threshold=boundary_decimal(
|
|
config.ltv_warning,
|
|
field_name="config.ltv_warning",
|
|
),
|
|
critical_threshold=boundary_decimal(
|
|
config.margin_threshold,
|
|
field_name="config.margin_threshold",
|
|
),
|
|
email_alerts_enabled=bool(config.email_alerts),
|
|
)
|
|
|
|
|
|
def _ratio_text(value: Decimal) -> str:
|
|
return f"{float(value):.1%}"
|
|
|
|
|
|
def build_portfolio_alert_context(
|
|
config: PortfolioConfig,
|
|
*,
|
|
spot_price: float,
|
|
source: str,
|
|
updated_at: str,
|
|
) -> dict[str, float | str]:
|
|
return build_alert_context(
|
|
config,
|
|
spot_price=spot_price,
|
|
source=source,
|
|
updated_at=updated_at,
|
|
)
|
|
|
|
|
|
class AlertService:
|
|
def __init__(self, history_path=None) -> None:
|
|
self.repository = AlertHistoryRepository(history_path=history_path)
|
|
|
|
def evaluate(
|
|
self, config: PortfolioConfig, portfolio: Mapping[str, object], *, persist: bool = True
|
|
) -> AlertStatus:
|
|
history: list[AlertEvent] = []
|
|
history_unavailable = False
|
|
history_notice: str | None = None
|
|
try:
|
|
history = self.repository.load()
|
|
except AlertHistoryLoadError as exc:
|
|
history_unavailable = True
|
|
history_notice = (
|
|
"Alert history is temporarily unavailable due to a storage error. New alerts are not being recorded."
|
|
)
|
|
logger.warning("Alert history unavailable at %s: %s", exc.history_path, exc)
|
|
|
|
evaluation = _normalize_alert_evaluation_input(config, portfolio)
|
|
|
|
if evaluation.ltv_ratio >= evaluation.critical_threshold:
|
|
severity = "critical"
|
|
message = (
|
|
f"Current LTV {_ratio_text(evaluation.ltv_ratio)} is above the critical threshold of "
|
|
f"{_ratio_text(evaluation.critical_threshold)}."
|
|
)
|
|
elif evaluation.ltv_ratio >= evaluation.warning_threshold:
|
|
severity = "warning"
|
|
message = (
|
|
f"Current LTV {_ratio_text(evaluation.ltv_ratio)} is above the warning threshold of "
|
|
f"{_ratio_text(evaluation.warning_threshold)}."
|
|
)
|
|
else:
|
|
severity = "ok"
|
|
message = "LTV is within configured thresholds."
|
|
|
|
preview_history: list[AlertEvent] = []
|
|
if severity != "ok":
|
|
event = AlertEvent(
|
|
severity=severity,
|
|
message=message,
|
|
ltv_ratio=float(evaluation.ltv_ratio),
|
|
warning_threshold=float(evaluation.warning_threshold),
|
|
critical_threshold=float(evaluation.critical_threshold),
|
|
spot_price=float(evaluation.spot_price),
|
|
updated_at=evaluation.updated_at,
|
|
email_alerts_enabled=evaluation.email_alerts_enabled,
|
|
)
|
|
if persist:
|
|
if not history_unavailable and self._should_record(history, event):
|
|
history.append(event)
|
|
self.repository.save(history)
|
|
else:
|
|
preview_history = [event]
|
|
|
|
if not persist:
|
|
resolved_history = preview_history
|
|
elif history_unavailable:
|
|
resolved_history = []
|
|
elif severity != "ok":
|
|
resolved_history = list(reversed(self.repository.load()))
|
|
else:
|
|
resolved_history = history
|
|
|
|
return AlertStatus(
|
|
severity=severity,
|
|
message=message,
|
|
ltv_ratio=float(evaluation.ltv_ratio),
|
|
warning_threshold=float(evaluation.warning_threshold),
|
|
critical_threshold=float(evaluation.critical_threshold),
|
|
email_alerts_enabled=evaluation.email_alerts_enabled,
|
|
history=resolved_history,
|
|
history_unavailable=history_unavailable,
|
|
history_notice=history_notice,
|
|
)
|
|
|
|
@staticmethod
|
|
def _should_record(history: list[AlertEvent], event: AlertEvent) -> bool:
|
|
if not history:
|
|
return True
|
|
latest = history[-1]
|
|
return latest.severity != event.severity
|