"""Live price feed service for fetching real-time GLD and other asset prices.""" import asyncio import logging from dataclasses import dataclass from datetime import datetime, timedelta from typing import Optional import yfinance as yf from app.services.cache import get_cache logger = logging.getLogger(__name__) @dataclass class PriceData: """Price data for a symbol.""" symbol: str price: float currency: str timestamp: datetime source: str = "yfinance" class PriceFeed: """Live price feed service using yfinance with Redis caching.""" CACHE_TTL_SECONDS = 60 DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"] def __init__(self): self._cache = get_cache() async def get_price(self, symbol: str) -> Optional[PriceData]: """Get current price for a symbol, with caching. Args: symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD") Returns: PriceData or None if fetch fails """ # Check cache first if self._cache.enabled: cache_key = f"price:{symbol}" cached = await self._cache.get(cache_key) if cached: return PriceData(**cached) # Fetch from yfinance try: data = await self._fetch_yfinance(symbol) if data: # Cache the result if self._cache.enabled: await self._cache.set( cache_key, { "symbol": data.symbol, "price": data.price, "currency": data.currency, "timestamp": data.timestamp.isoformat(), "source": data.source }, ttl=self.CACHE_TTL_SECONDS ) return data except Exception as e: logger.error(f"Failed to fetch price for {symbol}: {e}") return None async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]: """Fetch price from yfinance (run in thread pool to avoid blocking).""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol) def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]: """Synchronous yfinance fetch.""" ticker = yf.Ticker(symbol) hist = ticker.history(period="1d", interval="1m") if not hist.empty: last_price = hist["Close"].iloc[-1] currency = ticker.info.get("currency", "USD") return PriceData( symbol=symbol, price=float(last_price), currency=currency, timestamp=datetime.utcnow() ) return None async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]: """Get prices for multiple symbols concurrently.""" tasks = [self.get_price(s) for s in symbols] results = await asyncio.gather(*tasks) return {s: r for s, r in zip(symbols, results)}