1 Commits

Author SHA1 Message Date
Bu5hm4nn
21878bf7ff feat(DATA-001): Add live GLD price feed service with Redis caching
- Create PriceFeed service using yfinance
- Cache prices in Redis with 60s TTL
- Add PriceData dataclass for type safety
- Support concurrent price fetching for multiple symbols
2026-03-23 22:25:09 +01:00
4 changed files with 203 additions and 281 deletions

View File

@@ -1,150 +1,71 @@
"""Portfolio configuration models with validation and persistence."""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from dataclasses import dataclass
@dataclass
class PortfolioConfig:
"""User portfolio configuration with validation.
@dataclass(frozen=True)
class LombardPortfolio:
"""Lombard loan portfolio backed by physical gold.
Attributes:
gold_value: Current gold collateral value in USD
loan_amount: Outstanding loan amount in USD
margin_threshold: LTV threshold for margin call (default 0.75)
monthly_budget: Approved monthly hedge budget
ltv_warning: LTV warning level for alerts (default 0.70)
gold_ounces: Quantity of pledged gold in troy ounces.
gold_price_per_ounce: Current gold spot price per ounce.
loan_amount: Outstanding Lombard loan balance.
initial_ltv: Origination or current reference loan-to-value ratio.
margin_call_ltv: LTV threshold at which a margin call is triggered.
"""
gold_value: float = 215000.0
loan_amount: float = 145000.0
margin_threshold: float = 0.75
monthly_budget: float = 8000.0
ltv_warning: float = 0.70
# Data source settings
primary_source: str = "yfinance"
fallback_source: str = "yfinance"
refresh_interval: int = 5
# Alert settings
volatility_spike: float = 0.25
spot_drawdown: float = 7.5
email_alerts: bool = False
def __post_init__(self):
"""Validate configuration after initialization."""
self.validate()
def validate(self) -> None:
"""Validate configuration values."""
if self.gold_value <= 0:
raise ValueError("Gold value must be positive")
gold_ounces: float
gold_price_per_ounce: float
loan_amount: float
initial_ltv: float
margin_call_ltv: float
def __post_init__(self) -> None:
if self.gold_ounces <= 0:
raise ValueError("gold_ounces must be positive")
if self.gold_price_per_ounce <= 0:
raise ValueError("gold_price_per_ounce must be positive")
if self.loan_amount < 0:
raise ValueError("Loan amount cannot be negative")
if self.loan_amount >= self.gold_value:
raise ValueError("Loan amount must be less than gold value (LTV < 100%)")
if not 0.1 <= self.margin_threshold <= 0.95:
raise ValueError("Margin threshold must be between 10% and 95%")
if not 0.1 <= self.ltv_warning <= 0.95:
raise ValueError("LTV warning level must be between 10% and 95%")
if self.refresh_interval < 1:
raise ValueError("Refresh interval must be at least 1 second")
raise ValueError("loan_amount must be non-negative")
if not 0 < self.initial_ltv < 1:
raise ValueError("initial_ltv must be between 0 and 1")
if not 0 < self.margin_call_ltv < 1:
raise ValueError("margin_call_ltv must be between 0 and 1")
if self.initial_ltv > self.margin_call_ltv:
raise ValueError("initial_ltv cannot exceed margin_call_ltv")
if self.loan_amount > self.gold_value:
raise ValueError("loan_amount cannot exceed current gold value")
@property
def gold_value(self) -> float:
"""Current market value of pledged gold."""
return self.gold_ounces * self.gold_price_per_ounce
@property
def current_ltv(self) -> float:
"""Calculate current loan-to-value ratio."""
if self.gold_value == 0:
return 0.0
"""Current loan-to-value ratio."""
return self.loan_amount / self.gold_value
@property
def margin_buffer(self) -> float:
"""Calculate margin buffer (distance to margin call)."""
return self.margin_threshold - self.current_ltv
@property
def net_equity(self) -> float:
"""Calculate net equity (gold value - loan)."""
"""Equity remaining after subtracting the loan from gold value."""
return self.gold_value - self.loan_amount
@property
def gold_value_at_price(self, gold_price_per_ounce: float) -> float:
"""Gold value under an alternative spot-price scenario."""
if gold_price_per_ounce <= 0:
raise ValueError("gold_price_per_ounce must be positive")
return self.gold_ounces * gold_price_per_ounce
def ltv_at_price(self, gold_price_per_ounce: float) -> float:
"""Portfolio LTV under an alternative gold-price scenario."""
return self.loan_amount / self.gold_value_at_price(gold_price_per_ounce)
def net_equity_at_price(self, gold_price_per_ounce: float) -> float:
"""Net equity under an alternative gold-price scenario."""
return self.gold_value_at_price(gold_price_per_ounce) - self.loan_amount
def margin_call_price(self) -> float:
"""Calculate gold price at which margin call occurs."""
if self.margin_threshold == 0:
return float('inf')
return self.loan_amount / self.margin_threshold
def to_dict(self) -> dict[str, Any]:
"""Convert configuration to dictionary."""
return {
"gold_value": self.gold_value,
"loan_amount": self.loan_amount,
"margin_threshold": self.margin_threshold,
"monthly_budget": self.monthly_budget,
"ltv_warning": self.ltv_warning,
"primary_source": self.primary_source,
"fallback_source": self.fallback_source,
"refresh_interval": self.refresh_interval,
"volatility_spike": self.volatility_spike,
"spot_drawdown": self.spot_drawdown,
"email_alerts": self.email_alerts,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PortfolioConfig:
"""Create configuration from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
class PortfolioRepository:
"""Repository for persisting portfolio configuration.
Uses file-based storage by default. Can be extended to use Redis.
"""
CONFIG_PATH = Path("data/portfolio_config.json")
def __init__(self):
# Ensure data directory exists
self.CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
def save(self, config: PortfolioConfig) -> None:
"""Save configuration to disk."""
with open(self.CONFIG_PATH, "w") as f:
json.dump(config.to_dict(), f, indent=2)
def load(self) -> PortfolioConfig:
"""Load configuration from disk.
Returns default configuration if file doesn't exist.
"""
if not self.CONFIG_PATH.exists():
default = PortfolioConfig()
self.save(default)
return default
try:
with open(self.CONFIG_PATH) as f:
data = json.load(f)
return PortfolioConfig.from_dict(data)
except (json.JSONDecodeError, ValueError) as e:
print(f"Warning: Failed to load portfolio config: {e}. Using defaults.")
return PortfolioConfig()
# Singleton repository instance
_portfolio_repo: PortfolioRepository | None = None
def get_portfolio_repository() -> PortfolioRepository:
"""Get or create global portfolio repository instance."""
global _portfolio_repo
if _portfolio_repo is None:
_portfolio_repo = PortfolioRepository()
return _portfolio_repo
"""Gold price per ounce at which the portfolio breaches the margin LTV."""
return self.loan_amount / (self.margin_call_ltv * self.gold_ounces)

View File

@@ -3,16 +3,10 @@ from __future__ import annotations
from nicegui import ui
from app.pages.common import dashboard_page
from app.models.portfolio import PortfolioConfig, get_portfolio_repository
@ui.page("/settings")
def settings_page():
"""Settings page with persistent portfolio configuration."""
# Load current configuration
repo = get_portfolio_repository()
config = repo.load()
def settings_page() -> None:
with dashboard_page(
"Settings",
"Configure portfolio assumptions, preferred market data inputs, alert thresholds, and import/export behavior.",
@@ -23,46 +17,12 @@ def settings_page():
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
):
ui.label("Portfolio Parameters").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
gold_value = ui.number(
"Gold collateral value ($)",
value=config.gold_value,
min=0.01, # Must be positive
step=1000
).classes("w-full")
loan_amount = ui.number(
"Loan amount ($)",
value=config.loan_amount,
min=0,
step=1000
).classes("w-full")
margin_threshold = ui.number(
"Margin call LTV threshold",
value=config.margin_threshold,
min=0.1,
max=0.95,
step=0.01
).classes("w-full")
monthly_budget = ui.number(
"Monthly hedge budget ($)",
value=config.monthly_budget,
min=0,
step=500
).classes("w-full")
# Show calculated values
with ui.row().classes("w-full gap-2 mt-4 p-4 bg-slate-50 dark:bg-slate-800 rounded-lg"):
ui.label("Current LTV:").classes("font-medium")
ltv_display = ui.label(f"{(config.loan_amount / config.gold_value * 100):.1f}%")
ui.label("Margin buffer:").classes("font-medium ml-4")
buffer_display = ui.label(f"{((config.margin_threshold - config.loan_amount / config.gold_value) * 100):.1f}%")
ui.label("Margin call at:").classes("font-medium ml-4")
margin_price_display = ui.label(f"${(config.loan_amount / config.margin_threshold):,.2f}")
gold_value = ui.number("Gold collateral value", value=215000, min=0, step=1000).classes("w-full")
loan_amount = ui.number("Loan amount", value=145000, min=0, step=1000).classes("w-full")
margin_threshold = ui.number("Margin call LTV", value=0.75, min=0.1, max=0.95, step=0.01).classes(
"w-full"
)
ui.number("Monthly hedge budget", value=8000, min=0, step=500).classes("w-full")
with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
@@ -70,126 +30,53 @@ def settings_page():
ui.label("Data Sources").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
primary_source = ui.select(
["yfinance", "ibkr", "alpaca"],
value=config.primary_source,
value="yfinance",
label="Primary source",
).classes("w-full")
fallback_source = ui.select(
["fallback", "yfinance", "manual"],
value=config.fallback_source,
value="fallback",
label="Fallback source",
).classes("w-full")
refresh_interval = ui.number(
"Refresh interval (seconds)",
value=config.refresh_interval,
min=1,
step=1
).classes("w-full")
def update_calculations():
"""Update calculated displays when values change."""
try:
gold = gold_value.value or 1 # Avoid division by zero
loan = loan_amount.value or 0
margin = margin_threshold.value or 0.75
ltv = (loan / gold) * 100
buffer = (margin - loan / gold) * 100
margin_price = loan / margin if margin > 0 else 0
ltv_display.set_text(f"{ltv:.1f}%")
buffer_display.set_text(f"{buffer:.1f}%")
margin_price_display.set_text(f"${margin_price:,.2f}")
except Exception:
pass # Ignore calculation errors during editing
# Connect update function to value changes
gold_value.on_value_change(update_calculations)
loan_amount.on_value_change(update_calculations)
margin_threshold.on_value_change(update_calculations)
refresh_interval = ui.number("Refresh interval (seconds)", value=5, min=1, step=1).classes("w-full")
ui.switch("Enable Redis cache", value=True)
with ui.row().classes("w-full gap-6 max-lg:flex-col"):
with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
):
ui.label("Alert Thresholds").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
ltv_warning = ui.number(
"LTV warning level",
value=config.ltv_warning,
min=0.1,
max=0.95,
step=0.01
).classes("w-full")
vol_alert = ui.number(
"Volatility spike alert",
value=config.volatility_spike,
min=0.01,
max=2.0,
step=0.01
).classes("w-full")
price_alert = ui.number(
"Spot drawdown alert (%)",
value=config.spot_drawdown,
min=0.1,
max=50.0,
step=0.5
).classes("w-full")
email_alerts = ui.switch(
"Email alerts",
value=config.email_alerts
ltv_warning = ui.number("LTV warning level", value=0.70, min=0.1, max=0.95, step=0.01).classes("w-full")
vol_alert = ui.number("Volatility spike alert", value=0.25, min=0.01, max=2.0, step=0.01).classes(
"w-full"
)
price_alert = ui.number("Spot drawdown alert (%)", value=7.5, min=0.1, max=50.0, step=0.5).classes(
"w-full"
)
email_alerts = ui.switch("Email alerts", value=False)
with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
):
ui.label("Export / Import").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
export_format = ui.select(
["json", "csv", "yaml"],
value="json",
label="Export format"
).classes("w-full")
export_format = ui.select(["json", "csv", "yaml"], value="json", label="Export format").classes(
"w-full"
)
ui.switch("Include scenario history", value=True)
ui.switch("Include option selections", value=True)
ui.button("Import settings", icon="upload").props("outline color=primary")
ui.button("Export settings", icon="download").props("outline color=primary")
def save_settings():
"""Save settings with validation and persistence."""
try:
# Create new config from form values
new_config = PortfolioConfig(
gold_value=float(gold_value.value),
loan_amount=float(loan_amount.value),
margin_threshold=float(margin_threshold.value),
monthly_budget=float(monthly_budget.value),
ltv_warning=float(ltv_warning.value),
primary_source=str(primary_source.value),
fallback_source=str(fallback_source.value),
refresh_interval=int(refresh_interval.value),
volatility_spike=float(vol_alert.value),
spot_drawdown=float(price_alert.value),
email_alerts=bool(email_alerts.value),
)
# Save to repository
repo.save(new_config)
status.set_text(
f"Saved: gold=${new_config.gold_value:,.0f}, "
f"loan=${new_config.loan_amount:,.0f}, "
f"LTV={new_config.current_ltv:.1%}, "
f"margin={new_config.margin_threshold:.1%}, "
f"buffer={new_config.margin_buffer:.1%}"
)
ui.notify("Settings saved successfully", color="positive")
except ValueError as e:
ui.notify(f"Validation error: {e}", color="negative")
except Exception as e:
ui.notify(f"Failed to save: {e}", color="negative")
def save_settings() -> None:
status.set_text(
"Saved configuration: "
f"gold=${gold_value.value:,.0f}, loan=${loan_amount.value:,.0f}, margin={margin_threshold.value:.2f}, "
f"primary={primary_source.value}, fallback={fallback_source.value}, refresh={refresh_interval.value}s, "
f"ltv warning={ltv_warning.value:.2f}, vol={vol_alert.value:.2f}, drawdown={price_alert.value:.1f}%, "
f"email alerts={'on' if email_alerts.value else 'off'}, export={export_format.value}."
)
ui.notify("Settings saved", color="positive")
with ui.row().classes("w-full items-center justify-between gap-4 mt-6"):
status = ui.label(
f"Current: gold=${config.gold_value:,.0f}, loan=${config.loan_amount:,.0f}, "
f"current LTV={config.current_ltv:.1%}"
).classes("text-sm text-slate-500 dark:text-slate-400")
with ui.row().classes("w-full items-center justify-between gap-4"):
status = ui.label("").classes("text-sm text-slate-500 dark:text-slate-400")
ui.button("Save settings", on_click=save_settings).props("color=primary")

View File

@@ -70,3 +70,17 @@ class CacheService:
if isinstance(value, datetime):
return value.isoformat()
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
# Global cache instance
_cache_instance: CacheService | None = None
def get_cache() -> CacheService:
"""Get or create global cache instance."""
global _cache_instance
if _cache_instance is None:
import os
redis_url = os.environ.get("REDIS_URL")
_cache_instance = CacheService(redis_url)
return _cache_instance

100
app/services/price_feed.py Normal file
View File

@@ -0,0 +1,100 @@
"""Live price feed service for fetching real-time GLD and other asset prices."""
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import yfinance as yf
from app.services.cache import get_cache
logger = logging.getLogger(__name__)
@dataclass
class PriceData:
"""Price data for a symbol."""
symbol: str
price: float
currency: str
timestamp: datetime
source: str = "yfinance"
class PriceFeed:
"""Live price feed service using yfinance with Redis caching."""
CACHE_TTL_SECONDS = 60
DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"]
def __init__(self):
self._cache = get_cache()
async def get_price(self, symbol: str) -> Optional[PriceData]:
"""Get current price for a symbol, with caching.
Args:
symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD")
Returns:
PriceData or None if fetch fails
"""
# Check cache first
if self._cache.enabled:
cache_key = f"price:{symbol}"
cached = await self._cache.get(cache_key)
if cached:
return PriceData(**cached)
# Fetch from yfinance
try:
data = await self._fetch_yfinance(symbol)
if data:
# Cache the result
if self._cache.enabled:
await self._cache.set(
cache_key,
{
"symbol": data.symbol,
"price": data.price,
"currency": data.currency,
"timestamp": data.timestamp.isoformat(),
"source": data.source
},
ttl=self.CACHE_TTL_SECONDS
)
return data
except Exception as e:
logger.error(f"Failed to fetch price for {symbol}: {e}")
return None
async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
"""Fetch price from yfinance (run in thread pool to avoid blocking)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol)
def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
"""Synchronous yfinance fetch."""
ticker = yf.Ticker(symbol)
hist = ticker.history(period="1d", interval="1m")
if not hist.empty:
last_price = hist["Close"].iloc[-1]
currency = ticker.info.get("currency", "USD")
return PriceData(
symbol=symbol,
price=float(last_price),
currency=currency,
timestamp=datetime.utcnow()
)
return None
async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]:
"""Get prices for multiple symbols concurrently."""
tasks = [self.get_price(s) for s in symbols]
results = await asyncio.gather(*tasks)
return {s: r for s, r in zip(symbols, results)}