From 21878bf7ff5bb006a5349b91363d75ca98613dbd Mon Sep 17 00:00:00 2001 From: Bu5hm4nn Date: Mon, 23 Mar 2026 22:25:09 +0100 Subject: [PATCH] feat(DATA-001): Add live GLD price feed service with Redis caching - Create PriceFeed service using yfinance - Cache prices in Redis with 60s TTL - Add PriceData dataclass for type safety - Support concurrent price fetching for multiple symbols --- app/services/cache.py | 14 ++++++ app/services/price_feed.py | 100 +++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 app/services/price_feed.py diff --git a/app/services/cache.py b/app/services/cache.py index 66ade3a..317cc79 100644 --- a/app/services/cache.py +++ b/app/services/cache.py @@ -70,3 +70,17 @@ class CacheService: if isinstance(value, datetime): return value.isoformat() raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable") + + +# Global cache instance +_cache_instance: CacheService | None = None + + +def get_cache() -> CacheService: + """Get or create global cache instance.""" + global _cache_instance + if _cache_instance is None: + import os + redis_url = os.environ.get("REDIS_URL") + _cache_instance = CacheService(redis_url) + return _cache_instance diff --git a/app/services/price_feed.py b/app/services/price_feed.py new file mode 100644 index 0000000..283eb78 --- /dev/null +++ b/app/services/price_feed.py @@ -0,0 +1,100 @@ +"""Live price feed service for fetching real-time GLD and other asset prices.""" + +import asyncio +import logging +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Optional + +import yfinance as yf + +from app.services.cache import get_cache + +logger = logging.getLogger(__name__) + + +@dataclass +class PriceData: + """Price data for a symbol.""" + symbol: str + price: float + currency: str + timestamp: datetime + source: str = "yfinance" + + +class PriceFeed: + """Live price feed service using yfinance with Redis caching.""" + + CACHE_TTL_SECONDS = 60 + DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"] + + def __init__(self): + self._cache = get_cache() + + async def get_price(self, symbol: str) -> Optional[PriceData]: + """Get current price for a symbol, with caching. + + Args: + symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD") + + Returns: + PriceData or None if fetch fails + """ + # Check cache first + if self._cache.enabled: + cache_key = f"price:{symbol}" + cached = await self._cache.get(cache_key) + if cached: + return PriceData(**cached) + + # Fetch from yfinance + try: + data = await self._fetch_yfinance(symbol) + if data: + # Cache the result + if self._cache.enabled: + await self._cache.set( + cache_key, + { + "symbol": data.symbol, + "price": data.price, + "currency": data.currency, + "timestamp": data.timestamp.isoformat(), + "source": data.source + }, + ttl=self.CACHE_TTL_SECONDS + ) + return data + except Exception as e: + logger.error(f"Failed to fetch price for {symbol}: {e}") + + return None + + async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]: + """Fetch price from yfinance (run in thread pool to avoid blocking).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol) + + def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]: + """Synchronous yfinance fetch.""" + ticker = yf.Ticker(symbol) + hist = ticker.history(period="1d", interval="1m") + + if not hist.empty: + last_price = hist["Close"].iloc[-1] + currency = ticker.info.get("currency", "USD") + + return PriceData( + symbol=symbol, + price=float(last_price), + currency=currency, + timestamp=datetime.utcnow() + ) + return None + + async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]: + """Get prices for multiple symbols concurrently.""" + tasks = [self.get_price(s) for s in symbols] + results = await asyncio.gather(*tasks) + return {s: r for s, r in zip(symbols, results)}