Compare commits
1 Commits
feature/PO
...
feature/DA
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21878bf7ff |
@@ -70,3 +70,17 @@ class CacheService:
|
|||||||
if isinstance(value, datetime):
|
if isinstance(value, datetime):
|
||||||
return value.isoformat()
|
return value.isoformat()
|
||||||
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
|
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
|
||||||
|
|
||||||
|
|
||||||
|
# Global cache instance
|
||||||
|
_cache_instance: CacheService | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_cache() -> CacheService:
|
||||||
|
"""Get or create global cache instance."""
|
||||||
|
global _cache_instance
|
||||||
|
if _cache_instance is None:
|
||||||
|
import os
|
||||||
|
redis_url = os.environ.get("REDIS_URL")
|
||||||
|
_cache_instance = CacheService(redis_url)
|
||||||
|
return _cache_instance
|
||||||
|
|||||||
100
app/services/price_feed.py
Normal file
100
app/services/price_feed.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
"""Live price feed service for fetching real-time GLD and other asset prices."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import yfinance as yf
|
||||||
|
|
||||||
|
from app.services.cache import get_cache
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PriceData:
|
||||||
|
"""Price data for a symbol."""
|
||||||
|
symbol: str
|
||||||
|
price: float
|
||||||
|
currency: str
|
||||||
|
timestamp: datetime
|
||||||
|
source: str = "yfinance"
|
||||||
|
|
||||||
|
|
||||||
|
class PriceFeed:
|
||||||
|
"""Live price feed service using yfinance with Redis caching."""
|
||||||
|
|
||||||
|
CACHE_TTL_SECONDS = 60
|
||||||
|
DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"]
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._cache = get_cache()
|
||||||
|
|
||||||
|
async def get_price(self, symbol: str) -> Optional[PriceData]:
|
||||||
|
"""Get current price for a symbol, with caching.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PriceData or None if fetch fails
|
||||||
|
"""
|
||||||
|
# Check cache first
|
||||||
|
if self._cache.enabled:
|
||||||
|
cache_key = f"price:{symbol}"
|
||||||
|
cached = await self._cache.get(cache_key)
|
||||||
|
if cached:
|
||||||
|
return PriceData(**cached)
|
||||||
|
|
||||||
|
# Fetch from yfinance
|
||||||
|
try:
|
||||||
|
data = await self._fetch_yfinance(symbol)
|
||||||
|
if data:
|
||||||
|
# Cache the result
|
||||||
|
if self._cache.enabled:
|
||||||
|
await self._cache.set(
|
||||||
|
cache_key,
|
||||||
|
{
|
||||||
|
"symbol": data.symbol,
|
||||||
|
"price": data.price,
|
||||||
|
"currency": data.currency,
|
||||||
|
"timestamp": data.timestamp.isoformat(),
|
||||||
|
"source": data.source
|
||||||
|
},
|
||||||
|
ttl=self.CACHE_TTL_SECONDS
|
||||||
|
)
|
||||||
|
return data
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch price for {symbol}: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
|
||||||
|
"""Fetch price from yfinance (run in thread pool to avoid blocking)."""
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol)
|
||||||
|
|
||||||
|
def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
|
||||||
|
"""Synchronous yfinance fetch."""
|
||||||
|
ticker = yf.Ticker(symbol)
|
||||||
|
hist = ticker.history(period="1d", interval="1m")
|
||||||
|
|
||||||
|
if not hist.empty:
|
||||||
|
last_price = hist["Close"].iloc[-1]
|
||||||
|
currency = ticker.info.get("currency", "USD")
|
||||||
|
|
||||||
|
return PriceData(
|
||||||
|
symbol=symbol,
|
||||||
|
price=float(last_price),
|
||||||
|
currency=currency,
|
||||||
|
timestamp=datetime.utcnow()
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]:
|
||||||
|
"""Get prices for multiple symbols concurrently."""
|
||||||
|
tasks = [self.get_price(s) for s in symbols]
|
||||||
|
results = await asyncio.gather(*tasks)
|
||||||
|
return {s: r for s, r in zip(symbols, results)}
|
||||||
Reference in New Issue
Block a user