4 Commits

Author SHA1 Message Date
Bu5hm4nn
70ec625146 feat(DATA-002): add live GLD options chain data via yfinance 2026-03-23 22:53:08 +01:00
Bu5hm4nn
c14ff83adc Merge PORT-001: Portfolio configuration persistence 2026-03-23 22:27:28 +01:00
Bu5hm4nn
77456c0cb4 Merge DATA-001: Live price feed integration 2026-03-23 22:27:28 +01:00
Bu5hm4nn
21878bf7ff feat(DATA-001): Add live GLD price feed service with Redis caching
- Create PriceFeed service using yfinance
- Cache prices in Redis with 60s TTL
- Add PriceData dataclass for type safety
- Support concurrent price fetching for multiple symbols
2026-03-23 22:25:09 +01:00
7 changed files with 375 additions and 60 deletions

View File

@@ -18,6 +18,7 @@ import app.pages # noqa: F401
from app.api.routes import router as api_router
from app.services.cache import CacheService
from app.services.data_service import DataService
from app.services.runtime import set_data_service
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger(__name__)
@@ -110,6 +111,7 @@ async def lifespan(app: FastAPI):
app.state.cache = CacheService(settings.redis_url, default_ttl=settings.cache_ttl)
await app.state.cache.connect()
app.state.data_service = DataService(app.state.cache, default_symbol=settings.default_symbol)
set_data_service(app.state.data_service)
app.state.ws_manager = ConnectionManager()
app.state.publisher_task = asyncio.create_task(publish_updates(app))
logger.info("Application startup complete")

View File

@@ -1,14 +1,66 @@
"""Portfolio configuration models with validation and persistence."""
"""Portfolio configuration and domain portfolio models."""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class LombardPortfolio:
"""Lombard loan portfolio backed by physical gold."""
gold_ounces: float
gold_price_per_ounce: float
loan_amount: float
initial_ltv: float
margin_call_ltv: float
def __post_init__(self) -> None:
if self.gold_ounces <= 0:
raise ValueError("gold_ounces must be positive")
if self.gold_price_per_ounce <= 0:
raise ValueError("gold_price_per_ounce must be positive")
if self.loan_amount < 0:
raise ValueError("loan_amount must be non-negative")
if not 0 < self.initial_ltv < 1:
raise ValueError("initial_ltv must be between 0 and 1")
if not 0 < self.margin_call_ltv < 1:
raise ValueError("margin_call_ltv must be between 0 and 1")
if self.initial_ltv > self.margin_call_ltv:
raise ValueError("initial_ltv cannot exceed margin_call_ltv")
if self.loan_amount > self.gold_value:
raise ValueError("loan_amount cannot exceed current gold value")
@property
def gold_value(self) -> float:
return self.gold_ounces * self.gold_price_per_ounce
@property
def current_ltv(self) -> float:
return self.loan_amount / self.gold_value
@property
def net_equity(self) -> float:
return self.gold_value - self.loan_amount
def gold_value_at_price(self, gold_price_per_ounce: float) -> float:
if gold_price_per_ounce <= 0:
raise ValueError("gold_price_per_ounce must be positive")
return self.gold_ounces * gold_price_per_ounce
def ltv_at_price(self, gold_price_per_ounce: float) -> float:
return self.loan_amount / self.gold_value_at_price(gold_price_per_ounce)
def net_equity_at_price(self, gold_price_per_ounce: float) -> float:
return self.gold_value_at_price(gold_price_per_ounce) - self.loan_amount
def margin_call_price(self) -> float:
return self.loan_amount / (self.margin_call_ltv * self.gold_ounces)
@dataclass
class PortfolioConfig:
"""User portfolio configuration with validation.

View File

@@ -5,16 +5,22 @@ from typing import Any
from nicegui import ui
from app.components import GreeksTable
from app.pages.common import dashboard_page, option_chain, strategy_catalog
from app.pages.common import dashboard_page, strategy_catalog
from app.services.runtime import get_data_service
@ui.page("/options")
def options_page() -> None:
chain = option_chain()
expiries = sorted({row["expiry"] for row in chain})
strike_values = sorted({row["strike"] for row in chain})
selected_expiry = {"value": expiries[0]}
strike_range = {"min": strike_values[0], "max": strike_values[-1]}
async def options_page() -> None:
chain_data = await get_data_service().get_options_chain("GLD")
chain = list(chain_data.get("rows") or [*chain_data.get("calls", []), *chain_data.get("puts", [])])
expiries = list(chain_data.get("expirations") or sorted({row["expiry"] for row in chain}))
strike_values = sorted({float(row["strike"]) for row in chain})
selected_expiry = {"value": expiries[0] if expiries else None}
strike_range = {
"min": strike_values[0] if strike_values else 0.0,
"max": strike_values[-1] if strike_values else 0.0,
}
selected_strategy = {"value": strategy_catalog()[0]["label"]}
chosen_contracts: list[dict[str, Any]] = []
@@ -32,15 +38,15 @@ def options_page() -> None:
min_strike = ui.number(
"Min strike",
value=strike_range["min"],
min=strike_values[0],
max=strike_values[-1],
min=strike_values[0] if strike_values else 0.0,
max=strike_values[-1] if strike_values else 0.0,
step=5,
).classes("w-full")
max_strike = ui.number(
"Max strike",
value=strike_range["max"],
min=strike_values[0],
max=strike_values[-1],
min=strike_values[0] if strike_values else 0.0,
max=strike_values[-1] if strike_values else 0.0,
step=5,
).classes("w-full")
strategy_select = ui.select(
@@ -49,6 +55,15 @@ def options_page() -> None:
label="Add to hedge strategy",
).classes("w-full")
source_label = f"Source: {chain_data.get('source', 'unknown')}"
if chain_data.get("updated_at"):
source_label += f" · Updated {chain_data['updated_at']}"
ui.label(source_label).classes("text-xs text-slate-500 dark:text-slate-400")
if chain_data.get("error"):
ui.label(f"Options data unavailable: {chain_data['error']}").classes(
"text-xs text-amber-700 dark:text-amber-300"
)
selection_card = ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
)
@@ -56,12 +71,14 @@ def options_page() -> None:
chain_table = ui.html("").classes("w-full")
greeks = GreeksTable([])
def filtered_rows() -> list[dict]:
def filtered_rows() -> list[dict[str, Any]]:
if not selected_expiry["value"]:
return []
return [
row
for row in chain
if row["expiry"] == selected_expiry["value"]
and strike_range["min"] <= row["strike"] <= strike_range["max"]
and strike_range["min"] <= float(row["strike"]) <= strike_range["max"]
]
def render_selection() -> None:
@@ -76,10 +93,10 @@ def options_page() -> None:
return
for contract in chosen_contracts[-3:]:
ui.label(
f"{contract['symbol']} · premium ${contract['premium']:.2f} · Δ {contract['delta']:+.3f}"
f"{contract['symbol']} · premium ${float(contract['premium']):.2f} · IV {float(contract.get('impliedVolatility', 0.0)):.1%}"
).classes("text-sm text-slate-600 dark:text-slate-300")
def add_to_strategy(contract: dict) -> None:
def add_to_strategy(contract: dict[str, Any]) -> None:
chosen_contracts.append(contract)
render_selection()
greeks.set_options(chosen_contracts[-6:])
@@ -100,6 +117,8 @@ def options_page() -> None:
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>Type</th>
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>Strike</th>
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>Bid / Ask</th>
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>Last</th>
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>IV</th>
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>Greeks</th>
<th class='px-4 py-3 text-left text-xs font-semibold uppercase tracking-wide text-slate-500 dark:text-slate-300'>Action</th>
</tr>
@@ -110,16 +129,18 @@ def options_page() -> None:
<tr class='border-b border-slate-200 dark:border-slate-800'>
<td class='px-4 py-3 font-medium text-slate-900 dark:text-slate-100'>{row['symbol']}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>{row['type'].upper()}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>${row['strike']:.2f}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>${row['bid']:.2f} / ${row['ask']:.2f}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>Δ {row['delta']:+.3f} · Γ {row['gamma']:.3f} · Θ {row['theta']:+.3f}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>${float(row['strike']):.2f}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>${float(row['bid']):.2f} / ${float(row['ask']):.2f}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>${float(row.get('lastPrice', row.get('premium', 0.0))):.2f}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'>{float(row.get('impliedVolatility', 0.0)):.1%}</td>
<td class='px-4 py-3 text-slate-600 dark:text-slate-300'{float(row.get('delta', 0.0)):+.3f} · Γ {float(row.get('gamma', 0.0)):.3f} · Θ {float(row.get('theta', 0.0)):+.3f}</td>
<td class='px-4 py-3 text-sky-600 dark:text-sky-300'>Use quick-add buttons below</td>
</tr>
""" for row in rows)
+ (
""
if rows
else "<tr><td colspan='6' class='px-4 py-6 text-center text-slate-500 dark:text-slate-400'>No contracts match the current filter.</td></tr>"
else "<tr><td colspan='8' class='px-4 py-6 text-center text-slate-500 dark:text-slate-400'>No contracts match the current filter.</td></tr>"
)
+ """
</tbody>
@@ -136,7 +157,7 @@ def options_page() -> None:
with ui.row().classes("w-full gap-2 max-sm:flex-col"):
for row in rows[:6]:
ui.button(
f"Add {row['type'].upper()} {row['strike']:.0f}",
f"Add {row['type'].upper()} {float(row['strike']):.0f}",
on_click=lambda _, contract=row: add_to_strategy(contract),
).props("outline color=primary")
greeks.set_options(rows[:6])
@@ -147,8 +168,8 @@ def options_page() -> None:
def update_filters() -> None:
selected_expiry["value"] = expiry_select.value
strike_range["min"] = float(min_strike.value)
strike_range["max"] = float(max_strike.value)
strike_range["min"] = float(min_strike.value or 0.0)
strike_range["max"] = float(max_strike.value or 0.0)
if strike_range["min"] > strike_range["max"]:
strike_range["min"], strike_range["max"] = (
strike_range["max"],
@@ -161,6 +182,7 @@ def options_page() -> None:
expiry_select.on_value_change(lambda _: update_filters())
min_strike.on_value_change(lambda _: update_filters())
max_strike.on_value_change(lambda _: update_filters())
def on_strategy_change(event) -> None:
selected_strategy["value"] = event.value # type: ignore[assignment]
render_selection()

View File

@@ -70,3 +70,17 @@ class CacheService:
if isinstance(value, datetime):
return value.isoformat()
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
# Global cache instance
_cache_instance: CacheService | None = None
def get_cache() -> CacheService:
"""Get or create global cache instance."""
global _cache_instance
if _cache_instance is None:
import os
redis_url = os.environ.get("REDIS_URL")
_cache_instance = CacheService(redis_url)
return _cache_instance

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
import logging
import math
from datetime import UTC, datetime
from typing import Any
@@ -57,46 +58,77 @@ class DataService:
return quote
async def get_options_chain(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
cache_key = f"options:{ticker}"
ticker_symbol = (symbol or self.default_symbol).upper()
cache_key = f"options:{ticker_symbol}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
quote = await self.get_quote(ticker)
base_price = quote["price"]
quote = await self.get_quote(ticker_symbol)
if yf is None:
options_chain = self._fallback_options_chain(ticker_symbol, quote, source="fallback")
await self.cache.set_json(cache_key, options_chain)
return options_chain
try:
ticker = yf.Ticker(ticker_symbol)
expirations = await asyncio.to_thread(lambda: list(ticker.options or []))
if not expirations:
options_chain = self._fallback_options_chain(
ticker_symbol,
quote,
source="fallback",
error="No option expirations returned by yfinance",
)
await self.cache.set_json(cache_key, options_chain)
return options_chain
calls: list[dict[str, Any]] = []
puts: list[dict[str, Any]] = []
for expiry in expirations:
try:
chain = await asyncio.to_thread(ticker.option_chain, expiry)
except Exception as exc: # pragma: no cover - network dependent
logger.warning("Failed to fetch option chain for %s %s: %s", ticker_symbol, expiry, exc)
continue
calls.extend(self._normalize_option_rows(chain.calls, ticker_symbol, expiry, "call"))
puts.extend(self._normalize_option_rows(chain.puts, ticker_symbol, expiry, "put"))
if not calls and not puts:
options_chain = self._fallback_options_chain(
ticker_symbol,
quote,
source="fallback",
error="No option contracts returned by yfinance",
)
await self.cache.set_json(cache_key, options_chain)
return options_chain
options_chain = {
"symbol": ticker,
"symbol": ticker_symbol,
"updated_at": datetime.now(UTC).isoformat(),
"calls": [
{
"strike": round(base_price * 1.05, 2),
"premium": round(base_price * 0.03, 2),
"expiry": "2026-06-19",
},
{
"strike": round(base_price * 1.10, 2),
"premium": round(base_price * 0.02, 2),
"expiry": "2026-09-18",
},
],
"puts": [
{
"strike": round(base_price * 0.95, 2),
"premium": round(base_price * 0.028, 2),
"expiry": "2026-06-19",
},
{
"strike": round(base_price * 0.90, 2),
"premium": round(base_price * 0.018, 2),
"expiry": "2026-09-18",
},
],
"source": quote["source"],
"expirations": expirations,
"calls": calls,
"puts": puts,
"rows": sorted(calls + puts, key=lambda row: (row["expiry"], row["strike"], row["type"])),
"underlying_price": quote["price"],
"source": "yfinance",
}
await self.cache.set_json(cache_key, options_chain)
return options_chain
except Exception as exc: # pragma: no cover - network dependent
logger.warning("Failed to fetch options chain for %s from yfinance: %s", ticker_symbol, exc)
options_chain = self._fallback_options_chain(
ticker_symbol,
quote,
source="fallback",
error=str(exc),
)
await self.cache.set_json(cache_key, options_chain)
return options_chain
async def get_strategies(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
@@ -149,6 +181,81 @@ class DataService:
logger.warning("Failed to fetch %s from yfinance: %s", symbol, exc)
return self._fallback_quote(symbol, source="fallback")
def _fallback_options_chain(
self,
symbol: str,
quote: dict[str, Any],
*,
source: str,
error: str | None = None,
) -> dict[str, Any]:
options_chain = {
"symbol": symbol,
"updated_at": datetime.now(UTC).isoformat(),
"expirations": [],
"calls": [],
"puts": [],
"rows": [],
"underlying_price": quote["price"],
"source": source,
}
if error:
options_chain["error"] = error
return options_chain
def _normalize_option_rows(self, frame: Any, symbol: str, expiry: str, option_type: str) -> list[dict[str, Any]]:
if frame is None or getattr(frame, "empty", True):
return []
rows: list[dict[str, Any]] = []
for item in frame.to_dict(orient="records"):
strike = self._safe_float(item.get("strike"))
if strike <= 0:
continue
bid = self._safe_float(item.get("bid"))
ask = self._safe_float(item.get("ask"))
last_price = self._safe_float(item.get("lastPrice"))
implied_volatility = self._safe_float(item.get("impliedVolatility"))
contract_symbol = str(item.get("contractSymbol") or "").strip()
rows.append(
{
"contractSymbol": contract_symbol,
"symbol": contract_symbol or f"{symbol} {expiry} {option_type.upper()} {strike:.2f}",
"strike": strike,
"bid": bid,
"ask": ask,
"premium": last_price or self._midpoint(bid, ask),
"lastPrice": last_price,
"impliedVolatility": implied_volatility,
"expiry": expiry,
"type": option_type,
"openInterest": int(self._safe_float(item.get("openInterest"))),
"volume": int(self._safe_float(item.get("volume"))),
"delta": 0.0,
"gamma": 0.0,
"theta": 0.0,
"vega": 0.0,
"rho": 0.0,
}
)
return rows
@staticmethod
def _safe_float(value: Any) -> float:
try:
result = float(value)
except (TypeError, ValueError):
return 0.0
return 0.0 if math.isnan(result) else result
@staticmethod
def _midpoint(bid: float, ask: float) -> float:
if bid > 0 and ask > 0:
return round((bid + ask) / 2, 4)
return max(bid, ask, 0.0)
@staticmethod
def _fallback_quote(symbol: str, source: str) -> dict[str, Any]:
return {

100
app/services/price_feed.py Normal file
View File

@@ -0,0 +1,100 @@
"""Live price feed service for fetching real-time GLD and other asset prices."""
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import yfinance as yf
from app.services.cache import get_cache
logger = logging.getLogger(__name__)
@dataclass
class PriceData:
"""Price data for a symbol."""
symbol: str
price: float
currency: str
timestamp: datetime
source: str = "yfinance"
class PriceFeed:
"""Live price feed service using yfinance with Redis caching."""
CACHE_TTL_SECONDS = 60
DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"]
def __init__(self):
self._cache = get_cache()
async def get_price(self, symbol: str) -> Optional[PriceData]:
"""Get current price for a symbol, with caching.
Args:
symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD")
Returns:
PriceData or None if fetch fails
"""
# Check cache first
if self._cache.enabled:
cache_key = f"price:{symbol}"
cached = await self._cache.get(cache_key)
if cached:
return PriceData(**cached)
# Fetch from yfinance
try:
data = await self._fetch_yfinance(symbol)
if data:
# Cache the result
if self._cache.enabled:
await self._cache.set(
cache_key,
{
"symbol": data.symbol,
"price": data.price,
"currency": data.currency,
"timestamp": data.timestamp.isoformat(),
"source": data.source
},
ttl=self.CACHE_TTL_SECONDS
)
return data
except Exception as e:
logger.error(f"Failed to fetch price for {symbol}: {e}")
return None
async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
"""Fetch price from yfinance (run in thread pool to avoid blocking)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol)
def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
"""Synchronous yfinance fetch."""
ticker = yf.Ticker(symbol)
hist = ticker.history(period="1d", interval="1m")
if not hist.empty:
last_price = hist["Close"].iloc[-1]
currency = ticker.info.get("currency", "USD")
return PriceData(
symbol=symbol,
price=float(last_price),
currency=currency,
timestamp=datetime.utcnow()
)
return None
async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]:
"""Get prices for multiple symbols concurrently."""
tasks = [self.get_price(s) for s in symbols]
results = await asyncio.gather(*tasks)
return {s: r for s, r in zip(symbols, results)}

18
app/services/runtime.py Normal file
View File

@@ -0,0 +1,18 @@
"""Runtime service registry for UI pages and background tasks."""
from __future__ import annotations
from app.services.data_service import DataService
_data_service: DataService | None = None
def set_data_service(service: DataService) -> None:
global _data_service
_data_service = service
def get_data_service() -> DataService:
if _data_service is None:
raise RuntimeError("DataService has not been initialized")
return _data_service