Files
vault-dash/app/services/data_service.py

386 lines
15 KiB
Python

"""Market data access layer with caching support."""
from __future__ import annotations
import asyncio
import logging
import math
from datetime import datetime, timezone
from typing import Any
from app.core.calculations import option_row_greeks
from app.services.cache import CacheService
from app.strategies.engine import StrategySelectionEngine
logger = logging.getLogger(__name__)
try:
import yfinance as yf
except ImportError: # pragma: no cover - optional dependency
yf = None
class DataService:
"""Fetches portfolio and market data, using Redis when available."""
def __init__(self, cache: CacheService, default_symbol: str = "GLD") -> None:
self.cache = cache
self.default_symbol = default_symbol
async def get_portfolio(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
cache_key = f"portfolio:{ticker}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
quote = await self.get_quote(ticker)
portfolio = {
"symbol": ticker,
"spot_price": quote["price"],
"portfolio_value": round(quote["price"] * 1000, 2),
"loan_amount": 600_000.0,
"ltv_ratio": round(600_000.0 / max(quote["price"] * 1000, 1), 4),
"updated_at": datetime.now(timezone.utc).isoformat(),
"source": quote["source"],
}
await self.cache.set_json(cache_key, portfolio)
return portfolio
async def get_quote(self, symbol: str) -> dict[str, Any]:
normalized_symbol = symbol.upper()
cache_key = f"quote:{normalized_symbol}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
normalized_cached = self._normalize_quote_payload(cached, normalized_symbol)
if normalized_cached != cached:
await self.cache.set_json(cache_key, normalized_cached)
return normalized_cached
quote = self._normalize_quote_payload(await self._fetch_quote(normalized_symbol), normalized_symbol)
await self.cache.set_json(cache_key, quote)
return quote
async def get_option_expirations(self, symbol: str | None = None) -> dict[str, Any]:
ticker_symbol = (symbol or self.default_symbol).upper()
cache_key = f"options:{ticker_symbol}:expirations"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
quote = await self.get_quote(ticker_symbol)
if yf is None:
payload = self._fallback_option_expirations(
ticker_symbol,
quote,
source="fallback",
error="yfinance is not installed",
)
await self.cache.set_json(cache_key, payload)
return payload
try:
ticker = yf.Ticker(ticker_symbol)
expirations = await asyncio.to_thread(lambda: list(ticker.options or []))
if not expirations:
payload = self._fallback_option_expirations(
ticker_symbol,
quote,
source="fallback",
error="No option expirations returned by yfinance",
)
await self.cache.set_json(cache_key, payload)
return payload
payload = {
"symbol": ticker_symbol,
"updated_at": datetime.now(timezone.utc).isoformat(),
"expirations": expirations,
"underlying_price": quote["price"],
"source": "yfinance",
}
await self.cache.set_json(cache_key, payload)
return payload
except Exception as exc: # pragma: no cover - network dependent
logger.warning("Failed to fetch option expirations for %s from yfinance: %s", ticker_symbol, exc)
payload = self._fallback_option_expirations(
ticker_symbol,
quote,
source="fallback",
error=str(exc),
)
await self.cache.set_json(cache_key, payload)
return payload
async def get_options_chain_for_expiry(
self, symbol: str | None = None, expiry: str | None = None
) -> dict[str, Any]:
ticker_symbol = (symbol or self.default_symbol).upper()
expirations_data = await self.get_option_expirations(ticker_symbol)
expirations = list(expirations_data.get("expirations") or [])
target_expiry = expiry or (expirations[0] if expirations else None)
quote = await self.get_quote(ticker_symbol)
if not target_expiry:
return self._fallback_options_chain(
ticker_symbol,
quote,
expirations=expirations,
selected_expiry=None,
source=expirations_data.get("source", quote.get("source", "fallback")),
error=expirations_data.get("error"),
)
cache_key = f"options:{ticker_symbol}:{target_expiry}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
if yf is None:
payload = self._fallback_options_chain(
ticker_symbol,
quote,
expirations=expirations,
selected_expiry=target_expiry,
source="fallback",
error="yfinance is not installed",
)
await self.cache.set_json(cache_key, payload)
return payload
try:
ticker = yf.Ticker(ticker_symbol)
chain = await asyncio.to_thread(ticker.option_chain, target_expiry)
calls = self._normalize_option_rows(chain.calls, ticker_symbol, target_expiry, "call", quote["price"])
puts = self._normalize_option_rows(chain.puts, ticker_symbol, target_expiry, "put", quote["price"])
if not calls and not puts:
payload = self._fallback_options_chain(
ticker_symbol,
quote,
expirations=expirations,
selected_expiry=target_expiry,
source="fallback",
error="No option contracts returned by yfinance",
)
await self.cache.set_json(cache_key, payload)
return payload
payload = {
"symbol": ticker_symbol,
"selected_expiry": target_expiry,
"updated_at": datetime.now(timezone.utc).isoformat(),
"expirations": expirations,
"calls": calls,
"puts": puts,
"rows": sorted(calls + puts, key=lambda row: (row["strike"], row["type"])),
"underlying_price": quote["price"],
"source": "yfinance",
}
await self.cache.set_json(cache_key, payload)
return payload
except Exception as exc: # pragma: no cover - network dependent
logger.warning(
"Failed to fetch options chain for %s %s from yfinance: %s", ticker_symbol, target_expiry, exc
)
payload = self._fallback_options_chain(
ticker_symbol,
quote,
expirations=expirations,
selected_expiry=target_expiry,
source="fallback",
error=str(exc),
)
await self.cache.set_json(cache_key, payload)
return payload
async def get_options_chain(self, symbol: str | None = None) -> dict[str, Any]:
ticker_symbol = (symbol or self.default_symbol).upper()
expirations_data = await self.get_option_expirations(ticker_symbol)
expirations = list(expirations_data.get("expirations") or [])
if not expirations:
quote = await self.get_quote(ticker_symbol)
return self._fallback_options_chain(
ticker_symbol,
quote,
expirations=[],
selected_expiry=None,
source=expirations_data.get("source", quote.get("source", "fallback")),
error=expirations_data.get("error"),
)
return await self.get_options_chain_for_expiry(ticker_symbol, expirations[0])
async def get_strategies(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
quote = await self.get_quote(ticker)
engine = StrategySelectionEngine(spot_price=quote["price"] if ticker != "GLD" else 460.0)
return {
"symbol": ticker,
"updated_at": datetime.now(timezone.utc).isoformat(),
"paper_parameters": {
"portfolio_value": engine.portfolio_value,
"loan_amount": engine.loan_amount,
"margin_call_threshold": engine.margin_call_threshold,
"spot_price": engine.spot_price,
"volatility": engine.volatility,
"risk_free_rate": engine.risk_free_rate,
},
"strategies": engine.compare_all_strategies(),
"recommendations": {
profile: engine.recommend(profile) # type: ignore[arg-type]
for profile in ("conservative", "balanced", "cost_sensitive")
},
"sensitivity_analysis": engine.sensitivity_analysis(),
}
async def _fetch_quote(self, symbol: str) -> dict[str, Any]:
if yf is None:
return self._fallback_quote(symbol, source="fallback")
try:
ticker = yf.Ticker(symbol)
history = await asyncio.to_thread(ticker.history, period="5d", interval="1d")
if history.empty:
return self._fallback_quote(symbol, source="fallback")
closes = history["Close"]
last = float(closes.iloc[-1])
previous = float(closes.iloc[-2]) if len(closes) > 1 else last
change = round(last - previous, 4)
change_percent = round((change / previous) * 100, 4) if previous else 0.0
return {
"symbol": symbol,
"price": round(last, 4),
"quote_unit": "share",
"change": change,
"change_percent": change_percent,
"updated_at": datetime.now(timezone.utc).isoformat(),
"source": "yfinance",
}
except Exception as exc: # pragma: no cover - network dependent
logger.warning("Failed to fetch %s from yfinance: %s", symbol, exc)
return self._fallback_quote(symbol, source="fallback")
def _fallback_option_expirations(
self,
symbol: str,
quote: dict[str, Any],
*,
source: str,
error: str | None = None,
) -> dict[str, Any]:
payload = {
"symbol": symbol,
"updated_at": datetime.now(timezone.utc).isoformat(),
"expirations": [],
"underlying_price": quote["price"],
"source": source,
}
if error:
payload["error"] = error
return payload
def _fallback_options_chain(
self,
symbol: str,
quote: dict[str, Any],
*,
expirations: list[str],
selected_expiry: str | None,
source: str,
error: str | None = None,
) -> dict[str, Any]:
options_chain = {
"symbol": symbol,
"selected_expiry": selected_expiry,
"updated_at": datetime.now(timezone.utc).isoformat(),
"expirations": expirations,
"calls": [],
"puts": [],
"rows": [],
"underlying_price": quote["price"],
"source": source,
}
if error:
options_chain["error"] = error
return options_chain
def _normalize_option_rows(
self,
frame: Any,
symbol: str,
expiry: str,
option_type: str,
underlying_price: float,
) -> list[dict[str, Any]]:
if frame is None or getattr(frame, "empty", True):
return []
rows: list[dict[str, Any]] = []
for item in frame.to_dict(orient="records"):
strike = self._safe_float(item.get("strike"))
if strike <= 0:
continue
bid = self._safe_float(item.get("bid"))
ask = self._safe_float(item.get("ask"))
last_price = self._safe_float(item.get("lastPrice"))
implied_volatility = self._safe_float(item.get("impliedVolatility"))
contract_symbol = str(item.get("contractSymbol") or "").strip()
row = {
"contractSymbol": contract_symbol,
"symbol": contract_symbol or f"{symbol} {expiry} {option_type.upper()} {strike:.2f}",
"strike": strike,
"bid": bid,
"ask": ask,
"premium": last_price or self._midpoint(bid, ask),
"lastPrice": last_price,
"impliedVolatility": implied_volatility,
"expiry": expiry,
"type": option_type,
"openInterest": int(self._safe_float(item.get("openInterest"))),
"volume": int(self._safe_float(item.get("volume"))),
}
row.update(option_row_greeks(row, underlying_price))
rows.append(row)
return rows
@staticmethod
def _safe_float(value: Any) -> float:
try:
result = float(value)
except (TypeError, ValueError):
return 0.0
return 0.0 if math.isnan(result) else result
@staticmethod
def _midpoint(bid: float, ask: float) -> float:
if bid > 0 and ask > 0:
return round((bid + ask) / 2, 4)
return max(bid, ask, 0.0)
@staticmethod
def _normalize_quote_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]:
normalized = dict(payload)
normalized_symbol = symbol.upper()
normalized["symbol"] = str(normalized.get("symbol", normalized_symbol)).upper()
if normalized["symbol"] == "GLD" and not normalized.get("quote_unit"):
normalized["quote_unit"] = "share"
return normalized
@staticmethod
def _fallback_quote(symbol: str, source: str) -> dict[str, Any]:
return {
"symbol": symbol,
"price": 215.0,
"quote_unit": "share",
"change": 0.0,
"change_percent": 0.0,
"updated_at": datetime.now(timezone.utc).isoformat(),
"source": source,
}