- Create PriceFeed service using yfinance - Cache prices in Redis with 60s TTL - Add PriceData dataclass for type safety - Support concurrent price fetching for multiple symbols
101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
"""Live price feed service for fetching real-time GLD and other asset prices."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
import yfinance as yf
|
|
|
|
from app.services.cache import get_cache
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class PriceData:
|
|
"""Price data for a symbol."""
|
|
symbol: str
|
|
price: float
|
|
currency: str
|
|
timestamp: datetime
|
|
source: str = "yfinance"
|
|
|
|
|
|
class PriceFeed:
|
|
"""Live price feed service using yfinance with Redis caching."""
|
|
|
|
CACHE_TTL_SECONDS = 60
|
|
DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"]
|
|
|
|
def __init__(self):
|
|
self._cache = get_cache()
|
|
|
|
async def get_price(self, symbol: str) -> Optional[PriceData]:
|
|
"""Get current price for a symbol, with caching.
|
|
|
|
Args:
|
|
symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD")
|
|
|
|
Returns:
|
|
PriceData or None if fetch fails
|
|
"""
|
|
# Check cache first
|
|
if self._cache.enabled:
|
|
cache_key = f"price:{symbol}"
|
|
cached = await self._cache.get(cache_key)
|
|
if cached:
|
|
return PriceData(**cached)
|
|
|
|
# Fetch from yfinance
|
|
try:
|
|
data = await self._fetch_yfinance(symbol)
|
|
if data:
|
|
# Cache the result
|
|
if self._cache.enabled:
|
|
await self._cache.set(
|
|
cache_key,
|
|
{
|
|
"symbol": data.symbol,
|
|
"price": data.price,
|
|
"currency": data.currency,
|
|
"timestamp": data.timestamp.isoformat(),
|
|
"source": data.source
|
|
},
|
|
ttl=self.CACHE_TTL_SECONDS
|
|
)
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch price for {symbol}: {e}")
|
|
|
|
return None
|
|
|
|
async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
|
|
"""Fetch price from yfinance (run in thread pool to avoid blocking)."""
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol)
|
|
|
|
def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
|
|
"""Synchronous yfinance fetch."""
|
|
ticker = yf.Ticker(symbol)
|
|
hist = ticker.history(period="1d", interval="1m")
|
|
|
|
if not hist.empty:
|
|
last_price = hist["Close"].iloc[-1]
|
|
currency = ticker.info.get("currency", "USD")
|
|
|
|
return PriceData(
|
|
symbol=symbol,
|
|
price=float(last_price),
|
|
currency=currency,
|
|
timestamp=datetime.utcnow()
|
|
)
|
|
return None
|
|
|
|
async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]:
|
|
"""Get prices for multiple symbols concurrently."""
|
|
tasks = [self.get_price(s) for s in symbols]
|
|
results = await asyncio.gather(*tasks)
|
|
return {s: r for s, r in zip(symbols, results)}
|