"""Live price feed service for fetching real-time GLD and other asset prices.""" import asyncio import logging from dataclasses import dataclass from datetime import datetime from typing import Optional import yfinance as yf from app.services.cache import get_cache logger = logging.getLogger(__name__) @dataclass class PriceData: """Price data for a symbol.""" symbol: str price: float currency: str timestamp: datetime source: str = "yfinance" class PriceFeed: """Live price feed service using yfinance with Redis caching.""" CACHE_TTL_SECONDS = 60 DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"] def __init__(self): self._cache = get_cache() async def get_price(self, symbol: str) -> Optional[PriceData]: """Get current price for a symbol, with caching. Args: symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD") Returns: PriceData or None if fetch fails """ # Check cache first if self._cache.enabled: cache_key = f"price:{symbol}" cached = await self._cache.get(cache_key) if cached: return PriceData(**cached) # Fetch from yfinance try: data = await self._fetch_yfinance(symbol) if data: # Cache the result if self._cache.enabled: await self._cache.set( cache_key, { "symbol": data.symbol, "price": data.price, "currency": data.currency, "timestamp": data.timestamp.isoformat(), "source": data.source, }, ttl=self.CACHE_TTL_SECONDS, ) return data except Exception as e: logger.error(f"Failed to fetch price for {symbol}: {e}") return None async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]: """Fetch price from yfinance (run in thread pool to avoid blocking).""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol) def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]: """Synchronous yfinance fetch.""" ticker = yf.Ticker(symbol) hist = ticker.history(period="1d", interval="1m") if not hist.empty: last_price = hist["Close"].iloc[-1] currency = ticker.info.get("currency", "USD") return PriceData(symbol=symbol, price=float(last_price), currency=currency, timestamp=datetime.utcnow()) return None async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]: """Get prices for multiple symbols concurrently.""" tasks = [self.get_price(s) for s in symbols] results = await asyncio.gather(*tasks) return {s: r for s, r in zip(symbols, results)}