1 Commits

Author SHA1 Message Date
Bu5hm4nn
21878bf7ff feat(DATA-001): Add live GLD price feed service with Redis caching
- Create PriceFeed service using yfinance
- Cache prices in Redis with 60s TTL
- Add PriceData dataclass for type safety
- Support concurrent price fetching for multiple symbols
2026-03-23 22:25:09 +01:00
4 changed files with 203 additions and 281 deletions

View File

@@ -1,150 +1,71 @@
"""Portfolio configuration models with validation and persistence."""
from __future__ import annotations from __future__ import annotations
import json from dataclasses import dataclass
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass @dataclass(frozen=True)
class PortfolioConfig: class LombardPortfolio:
"""User portfolio configuration with validation. """Lombard loan portfolio backed by physical gold.
Attributes: Attributes:
gold_value: Current gold collateral value in USD gold_ounces: Quantity of pledged gold in troy ounces.
loan_amount: Outstanding loan amount in USD gold_price_per_ounce: Current gold spot price per ounce.
margin_threshold: LTV threshold for margin call (default 0.75) loan_amount: Outstanding Lombard loan balance.
monthly_budget: Approved monthly hedge budget initial_ltv: Origination or current reference loan-to-value ratio.
ltv_warning: LTV warning level for alerts (default 0.70) margin_call_ltv: LTV threshold at which a margin call is triggered.
""" """
gold_value: float = 215000.0 gold_ounces: float
loan_amount: float = 145000.0 gold_price_per_ounce: float
margin_threshold: float = 0.75 loan_amount: float
monthly_budget: float = 8000.0 initial_ltv: float
ltv_warning: float = 0.70 margin_call_ltv: float
# Data source settings def __post_init__(self) -> None:
primary_source: str = "yfinance" if self.gold_ounces <= 0:
fallback_source: str = "yfinance" raise ValueError("gold_ounces must be positive")
refresh_interval: int = 5 if self.gold_price_per_ounce <= 0:
raise ValueError("gold_price_per_ounce must be positive")
# Alert settings
volatility_spike: float = 0.25
spot_drawdown: float = 7.5
email_alerts: bool = False
def __post_init__(self):
"""Validate configuration after initialization."""
self.validate()
def validate(self) -> None:
"""Validate configuration values."""
if self.gold_value <= 0:
raise ValueError("Gold value must be positive")
if self.loan_amount < 0: if self.loan_amount < 0:
raise ValueError("Loan amount cannot be negative") raise ValueError("loan_amount must be non-negative")
if self.loan_amount >= self.gold_value: if not 0 < self.initial_ltv < 1:
raise ValueError("Loan amount must be less than gold value (LTV < 100%)") raise ValueError("initial_ltv must be between 0 and 1")
if not 0.1 <= self.margin_threshold <= 0.95: if not 0 < self.margin_call_ltv < 1:
raise ValueError("Margin threshold must be between 10% and 95%") raise ValueError("margin_call_ltv must be between 0 and 1")
if not 0.1 <= self.ltv_warning <= 0.95: if self.initial_ltv > self.margin_call_ltv:
raise ValueError("LTV warning level must be between 10% and 95%") raise ValueError("initial_ltv cannot exceed margin_call_ltv")
if self.refresh_interval < 1: if self.loan_amount > self.gold_value:
raise ValueError("Refresh interval must be at least 1 second") raise ValueError("loan_amount cannot exceed current gold value")
@property
def gold_value(self) -> float:
"""Current market value of pledged gold."""
return self.gold_ounces * self.gold_price_per_ounce
@property @property
def current_ltv(self) -> float: def current_ltv(self) -> float:
"""Calculate current loan-to-value ratio.""" """Current loan-to-value ratio."""
if self.gold_value == 0:
return 0.0
return self.loan_amount / self.gold_value return self.loan_amount / self.gold_value
@property
def margin_buffer(self) -> float:
"""Calculate margin buffer (distance to margin call)."""
return self.margin_threshold - self.current_ltv
@property @property
def net_equity(self) -> float: def net_equity(self) -> float:
"""Calculate net equity (gold value - loan).""" """Equity remaining after subtracting the loan from gold value."""
return self.gold_value - self.loan_amount return self.gold_value - self.loan_amount
@property def gold_value_at_price(self, gold_price_per_ounce: float) -> float:
"""Gold value under an alternative spot-price scenario."""
if gold_price_per_ounce <= 0:
raise ValueError("gold_price_per_ounce must be positive")
return self.gold_ounces * gold_price_per_ounce
def ltv_at_price(self, gold_price_per_ounce: float) -> float:
"""Portfolio LTV under an alternative gold-price scenario."""
return self.loan_amount / self.gold_value_at_price(gold_price_per_ounce)
def net_equity_at_price(self, gold_price_per_ounce: float) -> float:
"""Net equity under an alternative gold-price scenario."""
return self.gold_value_at_price(gold_price_per_ounce) - self.loan_amount
def margin_call_price(self) -> float: def margin_call_price(self) -> float:
"""Calculate gold price at which margin call occurs.""" """Gold price per ounce at which the portfolio breaches the margin LTV."""
if self.margin_threshold == 0: return self.loan_amount / (self.margin_call_ltv * self.gold_ounces)
return float('inf')
return self.loan_amount / self.margin_threshold
def to_dict(self) -> dict[str, Any]:
"""Convert configuration to dictionary."""
return {
"gold_value": self.gold_value,
"loan_amount": self.loan_amount,
"margin_threshold": self.margin_threshold,
"monthly_budget": self.monthly_budget,
"ltv_warning": self.ltv_warning,
"primary_source": self.primary_source,
"fallback_source": self.fallback_source,
"refresh_interval": self.refresh_interval,
"volatility_spike": self.volatility_spike,
"spot_drawdown": self.spot_drawdown,
"email_alerts": self.email_alerts,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PortfolioConfig:
"""Create configuration from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
class PortfolioRepository:
"""Repository for persisting portfolio configuration.
Uses file-based storage by default. Can be extended to use Redis.
"""
CONFIG_PATH = Path("data/portfolio_config.json")
def __init__(self):
# Ensure data directory exists
self.CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
def save(self, config: PortfolioConfig) -> None:
"""Save configuration to disk."""
with open(self.CONFIG_PATH, "w") as f:
json.dump(config.to_dict(), f, indent=2)
def load(self) -> PortfolioConfig:
"""Load configuration from disk.
Returns default configuration if file doesn't exist.
"""
if not self.CONFIG_PATH.exists():
default = PortfolioConfig()
self.save(default)
return default
try:
with open(self.CONFIG_PATH) as f:
data = json.load(f)
return PortfolioConfig.from_dict(data)
except (json.JSONDecodeError, ValueError) as e:
print(f"Warning: Failed to load portfolio config: {e}. Using defaults.")
return PortfolioConfig()
# Singleton repository instance
_portfolio_repo: PortfolioRepository | None = None
def get_portfolio_repository() -> PortfolioRepository:
"""Get or create global portfolio repository instance."""
global _portfolio_repo
if _portfolio_repo is None:
_portfolio_repo = PortfolioRepository()
return _portfolio_repo

View File

@@ -3,16 +3,10 @@ from __future__ import annotations
from nicegui import ui from nicegui import ui
from app.pages.common import dashboard_page from app.pages.common import dashboard_page
from app.models.portfolio import PortfolioConfig, get_portfolio_repository
@ui.page("/settings") @ui.page("/settings")
def settings_page(): def settings_page() -> None:
"""Settings page with persistent portfolio configuration."""
# Load current configuration
repo = get_portfolio_repository()
config = repo.load()
with dashboard_page( with dashboard_page(
"Settings", "Settings",
"Configure portfolio assumptions, preferred market data inputs, alert thresholds, and import/export behavior.", "Configure portfolio assumptions, preferred market data inputs, alert thresholds, and import/export behavior.",
@@ -23,46 +17,12 @@ def settings_page():
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
): ):
ui.label("Portfolio Parameters").classes("text-lg font-semibold text-slate-900 dark:text-slate-100") ui.label("Portfolio Parameters").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
gold_value = ui.number("Gold collateral value", value=215000, min=0, step=1000).classes("w-full")
gold_value = ui.number( loan_amount = ui.number("Loan amount", value=145000, min=0, step=1000).classes("w-full")
"Gold collateral value ($)", margin_threshold = ui.number("Margin call LTV", value=0.75, min=0.1, max=0.95, step=0.01).classes(
value=config.gold_value, "w-full"
min=0.01, # Must be positive )
step=1000 ui.number("Monthly hedge budget", value=8000, min=0, step=500).classes("w-full")
).classes("w-full")
loan_amount = ui.number(
"Loan amount ($)",
value=config.loan_amount,
min=0,
step=1000
).classes("w-full")
margin_threshold = ui.number(
"Margin call LTV threshold",
value=config.margin_threshold,
min=0.1,
max=0.95,
step=0.01
).classes("w-full")
monthly_budget = ui.number(
"Monthly hedge budget ($)",
value=config.monthly_budget,
min=0,
step=500
).classes("w-full")
# Show calculated values
with ui.row().classes("w-full gap-2 mt-4 p-4 bg-slate-50 dark:bg-slate-800 rounded-lg"):
ui.label("Current LTV:").classes("font-medium")
ltv_display = ui.label(f"{(config.loan_amount / config.gold_value * 100):.1f}%")
ui.label("Margin buffer:").classes("font-medium ml-4")
buffer_display = ui.label(f"{((config.margin_threshold - config.loan_amount / config.gold_value) * 100):.1f}%")
ui.label("Margin call at:").classes("font-medium ml-4")
margin_price_display = ui.label(f"${(config.loan_amount / config.margin_threshold):,.2f}")
with ui.card().classes( with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
@@ -70,126 +30,53 @@ def settings_page():
ui.label("Data Sources").classes("text-lg font-semibold text-slate-900 dark:text-slate-100") ui.label("Data Sources").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
primary_source = ui.select( primary_source = ui.select(
["yfinance", "ibkr", "alpaca"], ["yfinance", "ibkr", "alpaca"],
value=config.primary_source, value="yfinance",
label="Primary source", label="Primary source",
).classes("w-full") ).classes("w-full")
fallback_source = ui.select( fallback_source = ui.select(
["fallback", "yfinance", "manual"], ["fallback", "yfinance", "manual"],
value=config.fallback_source, value="fallback",
label="Fallback source", label="Fallback source",
).classes("w-full") ).classes("w-full")
refresh_interval = ui.number( refresh_interval = ui.number("Refresh interval (seconds)", value=5, min=1, step=1).classes("w-full")
"Refresh interval (seconds)", ui.switch("Enable Redis cache", value=True)
value=config.refresh_interval,
min=1,
step=1
).classes("w-full")
def update_calculations():
"""Update calculated displays when values change."""
try:
gold = gold_value.value or 1 # Avoid division by zero
loan = loan_amount.value or 0
margin = margin_threshold.value or 0.75
ltv = (loan / gold) * 100
buffer = (margin - loan / gold) * 100
margin_price = loan / margin if margin > 0 else 0
ltv_display.set_text(f"{ltv:.1f}%")
buffer_display.set_text(f"{buffer:.1f}%")
margin_price_display.set_text(f"${margin_price:,.2f}")
except Exception:
pass # Ignore calculation errors during editing
# Connect update function to value changes
gold_value.on_value_change(update_calculations)
loan_amount.on_value_change(update_calculations)
margin_threshold.on_value_change(update_calculations)
with ui.row().classes("w-full gap-6 max-lg:flex-col"): with ui.row().classes("w-full gap-6 max-lg:flex-col"):
with ui.card().classes( with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
): ):
ui.label("Alert Thresholds").classes("text-lg font-semibold text-slate-900 dark:text-slate-100") ui.label("Alert Thresholds").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
ltv_warning = ui.number( ltv_warning = ui.number("LTV warning level", value=0.70, min=0.1, max=0.95, step=0.01).classes("w-full")
"LTV warning level", vol_alert = ui.number("Volatility spike alert", value=0.25, min=0.01, max=2.0, step=0.01).classes(
value=config.ltv_warning, "w-full"
min=0.1,
max=0.95,
step=0.01
).classes("w-full")
vol_alert = ui.number(
"Volatility spike alert",
value=config.volatility_spike,
min=0.01,
max=2.0,
step=0.01
).classes("w-full")
price_alert = ui.number(
"Spot drawdown alert (%)",
value=config.spot_drawdown,
min=0.1,
max=50.0,
step=0.5
).classes("w-full")
email_alerts = ui.switch(
"Email alerts",
value=config.email_alerts
) )
price_alert = ui.number("Spot drawdown alert (%)", value=7.5, min=0.1, max=50.0, step=0.5).classes(
"w-full"
)
email_alerts = ui.switch("Email alerts", value=False)
with ui.card().classes( with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
): ):
ui.label("Export / Import").classes("text-lg font-semibold text-slate-900 dark:text-slate-100") ui.label("Export / Import").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
export_format = ui.select( export_format = ui.select(["json", "csv", "yaml"], value="json", label="Export format").classes(
["json", "csv", "yaml"], "w-full"
value="json", )
label="Export format"
).classes("w-full")
ui.switch("Include scenario history", value=True) ui.switch("Include scenario history", value=True)
ui.switch("Include option selections", value=True) ui.switch("Include option selections", value=True)
ui.button("Import settings", icon="upload").props("outline color=primary") ui.button("Import settings", icon="upload").props("outline color=primary")
ui.button("Export settings", icon="download").props("outline color=primary") ui.button("Export settings", icon="download").props("outline color=primary")
def save_settings(): def save_settings() -> None:
"""Save settings with validation and persistence."""
try:
# Create new config from form values
new_config = PortfolioConfig(
gold_value=float(gold_value.value),
loan_amount=float(loan_amount.value),
margin_threshold=float(margin_threshold.value),
monthly_budget=float(monthly_budget.value),
ltv_warning=float(ltv_warning.value),
primary_source=str(primary_source.value),
fallback_source=str(fallback_source.value),
refresh_interval=int(refresh_interval.value),
volatility_spike=float(vol_alert.value),
spot_drawdown=float(price_alert.value),
email_alerts=bool(email_alerts.value),
)
# Save to repository
repo.save(new_config)
status.set_text( status.set_text(
f"Saved: gold=${new_config.gold_value:,.0f}, " "Saved configuration: "
f"loan=${new_config.loan_amount:,.0f}, " f"gold=${gold_value.value:,.0f}, loan=${loan_amount.value:,.0f}, margin={margin_threshold.value:.2f}, "
f"LTV={new_config.current_ltv:.1%}, " f"primary={primary_source.value}, fallback={fallback_source.value}, refresh={refresh_interval.value}s, "
f"margin={new_config.margin_threshold:.1%}, " f"ltv warning={ltv_warning.value:.2f}, vol={vol_alert.value:.2f}, drawdown={price_alert.value:.1f}%, "
f"buffer={new_config.margin_buffer:.1%}" f"email alerts={'on' if email_alerts.value else 'off'}, export={export_format.value}."
) )
ui.notify("Settings saved successfully", color="positive") ui.notify("Settings saved", color="positive")
except ValueError as e: with ui.row().classes("w-full items-center justify-between gap-4"):
ui.notify(f"Validation error: {e}", color="negative") status = ui.label("").classes("text-sm text-slate-500 dark:text-slate-400")
except Exception as e:
ui.notify(f"Failed to save: {e}", color="negative")
with ui.row().classes("w-full items-center justify-between gap-4 mt-6"):
status = ui.label(
f"Current: gold=${config.gold_value:,.0f}, loan=${config.loan_amount:,.0f}, "
f"current LTV={config.current_ltv:.1%}"
).classes("text-sm text-slate-500 dark:text-slate-400")
ui.button("Save settings", on_click=save_settings).props("color=primary") ui.button("Save settings", on_click=save_settings).props("color=primary")

View File

@@ -70,3 +70,17 @@ class CacheService:
if isinstance(value, datetime): if isinstance(value, datetime):
return value.isoformat() return value.isoformat()
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable") raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
# Global cache instance
_cache_instance: CacheService | None = None
def get_cache() -> CacheService:
"""Get or create global cache instance."""
global _cache_instance
if _cache_instance is None:
import os
redis_url = os.environ.get("REDIS_URL")
_cache_instance = CacheService(redis_url)
return _cache_instance

100
app/services/price_feed.py Normal file
View File

@@ -0,0 +1,100 @@
"""Live price feed service for fetching real-time GLD and other asset prices."""
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import yfinance as yf
from app.services.cache import get_cache
logger = logging.getLogger(__name__)
@dataclass
class PriceData:
"""Price data for a symbol."""
symbol: str
price: float
currency: str
timestamp: datetime
source: str = "yfinance"
class PriceFeed:
"""Live price feed service using yfinance with Redis caching."""
CACHE_TTL_SECONDS = 60
DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"]
def __init__(self):
self._cache = get_cache()
async def get_price(self, symbol: str) -> Optional[PriceData]:
"""Get current price for a symbol, with caching.
Args:
symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD")
Returns:
PriceData or None if fetch fails
"""
# Check cache first
if self._cache.enabled:
cache_key = f"price:{symbol}"
cached = await self._cache.get(cache_key)
if cached:
return PriceData(**cached)
# Fetch from yfinance
try:
data = await self._fetch_yfinance(symbol)
if data:
# Cache the result
if self._cache.enabled:
await self._cache.set(
cache_key,
{
"symbol": data.symbol,
"price": data.price,
"currency": data.currency,
"timestamp": data.timestamp.isoformat(),
"source": data.source
},
ttl=self.CACHE_TTL_SECONDS
)
return data
except Exception as e:
logger.error(f"Failed to fetch price for {symbol}: {e}")
return None
async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
"""Fetch price from yfinance (run in thread pool to avoid blocking)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol)
def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
"""Synchronous yfinance fetch."""
ticker = yf.Ticker(symbol)
hist = ticker.history(period="1d", interval="1m")
if not hist.empty:
last_price = hist["Close"].iloc[-1]
currency = ticker.info.get("currency", "USD")
return PriceData(
symbol=symbol,
price=float(last_price),
currency=currency,
timestamp=datetime.utcnow()
)
return None
async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]:
"""Get prices for multiple symbols concurrently."""
tasks = [self.get_price(s) for s in symbols]
results = await asyncio.gather(*tasks)
return {s: r for s, r in zip(symbols, results)}