"""Market data access layer with caching support.""" from __future__ import annotations import asyncio import logging import math from datetime import date, datetime, timezone from typing import Any from app.core.calculations import option_row_greeks from app.domain.instruments import gld_ounces_per_share from app.services.cache import CacheService from app.strategies.engine import StrategySelectionEngine logger = logging.getLogger(__name__) try: import yfinance as yf except ImportError: # pragma: no cover - optional dependency yf = None class DataService: """Fetches portfolio and market data, using Redis when available.""" def __init__(self, cache: CacheService, default_symbol: str = "GLD") -> None: self.cache = cache self.default_symbol = default_symbol self.gc_f_symbol = "GC=F" # COMEX Gold Futures async def get_portfolio(self, symbol: str | None = None) -> dict[str, Any]: ticker = (symbol or self.default_symbol).upper() cache_key = f"portfolio:{ticker}" cached = await self.cache.get_json(cache_key) if cached and isinstance(cached, dict): return cached quote = await self.get_quote(ticker) portfolio = { "symbol": ticker, "spot_price": quote["price"], "portfolio_value": round(quote["price"] * 1000, 2), "loan_amount": 600_000.0, "ltv_ratio": round(600_000.0 / max(quote["price"] * 1000, 1), 4), "updated_at": datetime.now(timezone.utc).isoformat(), "source": quote["source"], } await self.cache.set_json(cache_key, portfolio) return portfolio async def get_quote(self, symbol: str) -> dict[str, Any]: normalized_symbol = symbol.upper() cache_key = f"quote:{normalized_symbol}" cached = await self.cache.get_json(cache_key) if cached and isinstance(cached, dict): try: normalized_cached = self._normalize_quote_payload(cached, normalized_symbol) except ValueError: normalized_cached = None if normalized_cached is not None: if normalized_cached != cached: await self.cache.set_json(cache_key, normalized_cached) return normalized_cached quote = self._normalize_quote_payload(await self._fetch_quote(normalized_symbol), normalized_symbol) await self.cache.set_json(cache_key, quote) return quote async def get_option_expirations(self, symbol: str | None = None) -> dict[str, Any]: ticker_symbol = (symbol or self.default_symbol).upper() cache_key = f"options:{ticker_symbol}:expirations" cached = await self.cache.get_json(cache_key) if cached and isinstance(cached, dict): malformed_list_shape = ( not isinstance(cached.get("expirations"), list) and cached.get("expirations") is not None ) try: normalized_cached = self._normalize_option_expirations_payload(cached, ticker_symbol) except ValueError as exc: logger.warning("Discarding cached option expirations payload for %s: %s", ticker_symbol, exc) normalized_cached = None if malformed_list_shape: logger.warning("Discarding malformed cached option expirations payload for %s", ticker_symbol) normalized_cached = None if normalized_cached is not None: if normalized_cached != cached: await self.cache.set_json(cache_key, normalized_cached) return normalized_cached quote = await self.get_quote(ticker_symbol) if yf is None: payload = self._fallback_option_expirations( ticker_symbol, quote, source="fallback", error="yfinance is not installed", ) await self.cache.set_json(cache_key, payload) return payload try: ticker = yf.Ticker(ticker_symbol) expirations = await asyncio.to_thread(lambda: list(ticker.options or [])) if not expirations: payload = self._fallback_option_expirations( ticker_symbol, quote, source="fallback", error="No option expirations returned by yfinance", ) await self.cache.set_json(cache_key, payload) return payload payload = self._normalize_option_expirations_payload( { "symbol": ticker_symbol, "updated_at": datetime.now(timezone.utc).isoformat(), "expirations": expirations, "underlying_price": quote["price"], "source": "yfinance", }, ticker_symbol, ) await self.cache.set_json(cache_key, payload) return payload except Exception as exc: # pragma: no cover - network dependent logger.warning("Failed to fetch option expirations for %s from yfinance: %s", ticker_symbol, exc) payload = self._fallback_option_expirations( ticker_symbol, quote, source="fallback", error=str(exc), ) await self.cache.set_json(cache_key, payload) return payload async def get_options_chain_for_expiry( self, symbol: str | None = None, expiry: str | None = None ) -> dict[str, Any]: ticker_symbol = (symbol or self.default_symbol).upper() expirations_data = await self.get_option_expirations(ticker_symbol) expirations = list(expirations_data.get("expirations") or []) target_expiry = expiry or (expirations[0] if expirations else None) quote = await self.get_quote(ticker_symbol) if not target_expiry: return self._fallback_options_chain( ticker_symbol, quote, expirations=expirations, selected_expiry=None, source=expirations_data.get("source", quote.get("source", "fallback")), error=expirations_data.get("error"), ) cache_key = f"options:{ticker_symbol}:{target_expiry}" cached = await self.cache.get_json(cache_key) if cached and isinstance(cached, dict): malformed_list_shape = any( not isinstance(cached.get(field), list) and cached.get(field) is not None for field in ("expirations", "calls", "puts", "rows") ) try: normalized_cached = self._normalize_options_chain_payload(cached, ticker_symbol) except ValueError as exc: logger.warning( "Discarding cached options chain payload for %s %s: %s", ticker_symbol, target_expiry, exc ) normalized_cached = None if malformed_list_shape: logger.warning( "Discarding malformed cached options chain payload for %s %s", ticker_symbol, target_expiry ) normalized_cached = None if normalized_cached is not None: if normalized_cached != cached: await self.cache.set_json(cache_key, normalized_cached) return normalized_cached if yf is None: payload = self._fallback_options_chain( ticker_symbol, quote, expirations=expirations, selected_expiry=target_expiry, source="fallback", error="yfinance is not installed", ) await self.cache.set_json(cache_key, payload) return payload try: ticker = yf.Ticker(ticker_symbol) chain = await asyncio.to_thread(ticker.option_chain, target_expiry) calls = self._normalize_option_rows(chain.calls, ticker_symbol, target_expiry, "call", quote["price"]) puts = self._normalize_option_rows(chain.puts, ticker_symbol, target_expiry, "put", quote["price"]) if not calls and not puts: payload = self._fallback_options_chain( ticker_symbol, quote, expirations=expirations, selected_expiry=target_expiry, source="fallback", error="No option contracts returned by yfinance", ) await self.cache.set_json(cache_key, payload) return payload payload = self._normalize_options_chain_payload( { "symbol": ticker_symbol, "selected_expiry": target_expiry, "updated_at": datetime.now(timezone.utc).isoformat(), "expirations": expirations, "calls": calls, "puts": puts, "rows": sorted(calls + puts, key=lambda row: (row["strike"], row["type"])), "underlying_price": quote["price"], "source": "yfinance", }, ticker_symbol, ) await self.cache.set_json(cache_key, payload) return payload except Exception as exc: # pragma: no cover - network dependent logger.warning( "Failed to fetch options chain for %s %s from yfinance: %s", ticker_symbol, target_expiry, exc ) payload = self._fallback_options_chain( ticker_symbol, quote, expirations=expirations, selected_expiry=target_expiry, source="fallback", error=str(exc), ) await self.cache.set_json(cache_key, payload) return payload async def get_options_chain(self, symbol: str | None = None) -> dict[str, Any]: ticker_symbol = (symbol or self.default_symbol).upper() expirations_data = await self.get_option_expirations(ticker_symbol) expirations = list(expirations_data.get("expirations") or []) if not expirations: quote = await self.get_quote(ticker_symbol) return self._fallback_options_chain( ticker_symbol, quote, expirations=[], selected_expiry=None, source=expirations_data.get("source", quote.get("source", "fallback")), error=expirations_data.get("error"), ) return await self.get_options_chain_for_expiry(ticker_symbol, expirations[0]) async def get_gc_futures(self) -> dict[str, Any]: """Fetch GC=F (COMEX Gold Futures) quote. Returns a quote dict similar to get_quote but for gold futures. Falls back gracefully if GC=F is unavailable. """ cache_key = f"quote:{self.gc_f_symbol}" cached = await self.cache.get_json(cache_key) if cached and isinstance(cached, dict): try: normalized_cached = self._normalize_quote_payload(cached, self.gc_f_symbol) except ValueError: normalized_cached = None if normalized_cached is not None: if normalized_cached != cached: await self.cache.set_json(cache_key, normalized_cached) return normalized_cached quote = self._normalize_quote_payload(await self._fetch_gc_futures(), self.gc_f_symbol) await self.cache.set_json(cache_key, quote) return quote async def _fetch_gc_futures(self) -> dict[str, Any]: """Fetch GC=F from yfinance with graceful fallback.""" if yf is None: return self._fallback_gc_futures(source="fallback", error="yfinance is not installed") try: ticker = yf.Ticker(self.gc_f_symbol) history = await asyncio.to_thread(ticker.history, period="5d", interval="1d") if history.empty: return self._fallback_gc_futures(source="fallback", error="No history returned for GC=F") closes = history["Close"] last = float(closes.iloc[-1]) previous = float(closes.iloc[-2]) if len(closes) > 1 else last change = round(last - previous, 4) change_percent = round((change / previous) * 100, 4) if previous else 0.0 # Try to get more recent price from fast_info if available try: fast_price = ticker.fast_info.get("lastPrice", last) if fast_price and fast_price > 0: last = float(fast_price) except Exception: pass # Keep history close if fast_info unavailable return { "symbol": self.gc_f_symbol, "price": round(last, 4), "quote_unit": "ozt", # Gold futures are per troy ounce "change": change, "change_percent": change_percent, "updated_at": datetime.now(timezone.utc).isoformat(), "source": "yfinance", } except Exception as exc: # pragma: no cover - network dependent logger.warning("Failed to fetch %s from yfinance: %s", self.gc_f_symbol, exc) return self._fallback_gc_futures(source="fallback", error=str(exc)) @staticmethod def _fallback_gc_futures(source: str, error: str | None = None) -> dict[str, Any]: """Fallback GC=F quote when live data unavailable.""" payload = { "symbol": "GC=F", "price": 2700.0, # Fallback estimate "quote_unit": "ozt", "change": 0.0, "change_percent": 0.0, "updated_at": datetime.now(timezone.utc).isoformat(), "source": source, } if error: payload["error"] = error return payload async def get_basis_data(self) -> dict[str, Any]: """Get GLD/GC=F basis data for comparison. Returns: Dict with GLD implied spot, GC=F adjusted price, basis in bps, and status info. """ gld_quote = await self.get_quote("GLD") gc_f_quote = await self.get_gc_futures() # Use current date for GLD ounces calculation ounces_per_share = float(gld_ounces_per_share(date.today())) # GLD implied spot = GLD_price / ounces_per_share gld_price = gld_quote.get("price", 0.0) gld_implied_spot = gld_price / ounces_per_share if ounces_per_share > 0 and gld_price > 0 else 0.0 # GC=F adjusted = (GC=F - contango_estimate) / 10 for naive comparison # But actually GC=F is already per oz, so we just adjust for contango gc_f_price = gc_f_quote.get("price", 0.0) contango_estimate = 10.0 # Typical contango ~$10/oz gc_f_adjusted = gc_f_price - contango_estimate if gc_f_price > 0 else 0.0 # Basis in bps = (GLD_implied_spot / GC=F_adjusted - 1) * 10000 basis_bps = 0.0 if gc_f_adjusted > 0 and gld_implied_spot > 0: basis_bps = (gld_implied_spot / gc_f_adjusted - 1) * 10000 # Determine basis status abs_basis = abs(basis_bps) if abs_basis < 25: basis_status = "green" basis_label = "Normal" elif abs_basis < 50: basis_status = "yellow" basis_label = "Elevated" else: basis_status = "red" basis_label = "Warning" # After-hours check: compare timestamps gld_updated = gld_quote.get("updated_at", "") gc_f_updated = gc_f_quote.get("updated_at", "") after_hours = False after_hours_note = "" try: gld_time = datetime.fromisoformat(gld_updated.replace("Z", "+00:00")) gc_f_time = datetime.fromisoformat(gc_f_updated.replace("Z", "+00:00")) # If GC=F updated much more recently, likely after-hours time_diff = (gc_f_time - gld_time).total_seconds() if time_diff > 3600: # More than 1 hour difference after_hours = True after_hours_note = "GLD quote may be stale (after-hours)" except Exception: pass return { "gld_implied_spot": round(gld_implied_spot, 2), "gld_price": round(gld_price, 2), "gld_ounces_per_share": round(ounces_per_share, 4), "gc_f_price": round(gc_f_price, 2), "gc_f_adjusted": round(gc_f_adjusted, 2), "contango_estimate": contango_estimate, "basis_bps": round(basis_bps, 1), "basis_status": basis_status, "basis_label": basis_label, "after_hours": after_hours, "after_hours_note": after_hours_note, "gld_updated_at": gld_updated, "gc_f_updated_at": gc_f_updated, "gld_source": gld_quote.get("source", "unknown"), "gc_f_source": gc_f_quote.get("source", "unknown"), } async def get_strategies(self, symbol: str | None = None) -> dict[str, Any]: ticker = (symbol or self.default_symbol).upper() quote = await self.get_quote(ticker) engine = StrategySelectionEngine(spot_price=quote["price"] if ticker != "GLD" else 460.0) return { "symbol": ticker, "updated_at": datetime.now(timezone.utc).isoformat(), "paper_parameters": { "portfolio_value": engine.portfolio_value, "loan_amount": engine.loan_amount, "margin_call_threshold": engine.margin_call_threshold, "spot_price": engine.spot_price, "volatility": engine.volatility, "risk_free_rate": engine.risk_free_rate, }, "strategies": engine.compare_all_strategies(), "recommendations": { profile: engine.recommend(profile) # type: ignore[arg-type] for profile in ("conservative", "balanced", "cost_sensitive") }, "sensitivity_analysis": engine.sensitivity_analysis(), } async def _fetch_quote(self, symbol: str) -> dict[str, Any]: if yf is None: return self._fallback_quote(symbol, source="fallback") try: ticker = yf.Ticker(symbol) history = await asyncio.to_thread(ticker.history, period="5d", interval="1d") if history.empty: return self._fallback_quote(symbol, source="fallback") closes = history["Close"] last = float(closes.iloc[-1]) previous = float(closes.iloc[-2]) if len(closes) > 1 else last change = round(last - previous, 4) change_percent = round((change / previous) * 100, 4) if previous else 0.0 return { "symbol": symbol, "price": round(last, 4), "quote_unit": "share", "change": change, "change_percent": change_percent, "updated_at": datetime.now(timezone.utc).isoformat(), "source": "yfinance", } except Exception as exc: # pragma: no cover - network dependent logger.warning("Failed to fetch %s from yfinance: %s", symbol, exc) return self._fallback_quote(symbol, source="fallback") def _fallback_option_expirations( self, symbol: str, quote: dict[str, Any], *, source: str, error: str | None = None, ) -> dict[str, Any]: payload = { "symbol": symbol, "updated_at": datetime.now(timezone.utc).isoformat(), "expirations": [], "underlying_price": quote["price"], "source": source, } if error: payload["error"] = error return payload def _fallback_options_chain( self, symbol: str, quote: dict[str, Any], *, expirations: list[str], selected_expiry: str | None, source: str, error: str | None = None, ) -> dict[str, Any]: options_chain = { "symbol": symbol, "selected_expiry": selected_expiry, "updated_at": datetime.now(timezone.utc).isoformat(), "expirations": expirations, "calls": [], "puts": [], "rows": [], "underlying_price": quote["price"], "source": source, } if error: options_chain["error"] = error return options_chain @staticmethod def _normalize_option_expirations_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]: """Normalize option expirations payload to explicit contract. This is the named boundary adapter between external provider/cache payloads and internal option expirations handling. It ensures: - symbol is always present and uppercased - expirations is always a list (empty if None/missing) - Explicit symbol mismatches are rejected (fail-closed) Args: payload: Raw expirations dict from cache or provider symbol: Expected symbol (used as fallback if missing from payload) Returns: Normalized expirations dict with explicit symbol and list type Raises: ValueError: If payload symbol explicitly conflicts with requested symbol """ normalized: dict[str, Any] = dict(payload) normalized_symbol = symbol.upper() # Ensure symbol is always present and normalized. # Missing symbol is repaired from the requested key; explicit mismatches are rejected. raw_symbol = normalized.get("symbol", normalized_symbol) normalized_payload_symbol = str(raw_symbol).upper() if raw_symbol is not None else normalized_symbol if raw_symbol is not None and normalized_payload_symbol != normalized_symbol: raise ValueError( f"Option expirations symbol mismatch: expected {normalized_symbol}, got {normalized_payload_symbol}" ) normalized["symbol"] = normalized_payload_symbol # Ensure expirations is always a list expirations = normalized.get("expirations") if not isinstance(expirations, list): logger.warning( "Repairing malformed option expirations payload for %s: expirations was %r", normalized_symbol, type(expirations).__name__, ) normalized["expirations"] = [] return normalized @staticmethod def _normalize_options_chain_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]: """Normalize options chain payload to explicit contract. This is the named boundary adapter between external provider/cache payloads and internal options chain handling. It ensures: - symbol is always present and uppercased - calls, puts, rows, and expirations are always lists (empty if None/missing) - Explicit symbol mismatches are rejected (fail-closed) Args: payload: Raw options chain dict from cache or provider symbol: Expected symbol (used as fallback if missing from payload) Returns: Normalized options chain dict with explicit symbol and list types Raises: ValueError: If payload symbol explicitly conflicts with requested symbol """ normalized: dict[str, Any] = dict(payload) normalized_symbol = symbol.upper() # Ensure symbol is always present and normalized. # Missing symbol is repaired from the requested key; explicit mismatches are rejected. raw_symbol = normalized.get("symbol", normalized_symbol) normalized_payload_symbol = str(raw_symbol).upper() if raw_symbol is not None else normalized_symbol if raw_symbol is not None and normalized_payload_symbol != normalized_symbol: raise ValueError( f"Options chain symbol mismatch: expected {normalized_symbol}, got {normalized_payload_symbol}" ) normalized["symbol"] = normalized_payload_symbol # Ensure list fields are always lists for field in ("expirations", "calls", "puts", "rows"): if not isinstance(normalized.get(field), list): logger.warning( "Repairing malformed options chain payload for %s: %s was %r", normalized_symbol, field, type(normalized.get(field)).__name__, ) normalized[field] = [] return normalized def _normalize_option_rows( self, frame: Any, symbol: str, expiry: str, option_type: str, underlying_price: float, ) -> list[dict[str, Any]]: if frame is None or getattr(frame, "empty", True): return [] rows: list[dict[str, Any]] = [] for item in frame.to_dict(orient="records"): strike = self._safe_float(item.get("strike")) if strike <= 0: continue bid = self._safe_float(item.get("bid")) ask = self._safe_float(item.get("ask")) last_price = self._safe_float(item.get("lastPrice")) implied_volatility = self._safe_float(item.get("impliedVolatility")) contract_symbol = str(item.get("contractSymbol") or "").strip() row = { "contractSymbol": contract_symbol, "symbol": contract_symbol or f"{symbol} {expiry} {option_type.upper()} {strike:.2f}", "strike": strike, "bid": bid, "ask": ask, "premium": last_price or self._midpoint(bid, ask), "lastPrice": last_price, "impliedVolatility": implied_volatility, "expiry": expiry, "type": option_type, "openInterest": int(self._safe_float(item.get("openInterest"))), "volume": int(self._safe_float(item.get("volume"))), } row.update(option_row_greeks(row, underlying_price)) rows.append(row) return rows @staticmethod def _safe_float(value: Any) -> float: try: result = float(value) except (TypeError, ValueError): return 0.0 return 0.0 if math.isnan(result) else result @staticmethod def _midpoint(bid: float, ask: float) -> float: if bid > 0 and ask > 0: return round((bid + ask) / 2, 4) return max(bid, ask, 0.0) @staticmethod def _normalize_quote_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]: """Normalize provider/cache quote payload to explicit contract. This is the named boundary adapter between external float-heavy provider payloads and internal quote handling. It ensures: - symbol is always present and uppercased - GLD quotes have explicit quote_unit='share' metadata - Non-GLD symbols pass through without auto-assigned units Fail-closed: missing/invalid fields are preserved for upstream handling rather than silently defaulted. Type conversion is not performed here. Args: payload: Raw quote dict from cache or provider (float-heavy) symbol: Expected symbol (used as fallback if missing from payload) Returns: Normalized quote dict with explicit symbol and GLD quote_unit """ normalized: dict[str, Any] = dict(payload) normalized_symbol = symbol.upper() # Ensure symbol is always present and normalized. # Missing symbol is repaired from the requested key; explicit mismatches are rejected. raw_symbol = normalized.get("symbol", normalized_symbol) normalized_payload_symbol = str(raw_symbol).upper() if raw_symbol is not None else normalized_symbol if raw_symbol is not None and normalized_payload_symbol != normalized_symbol: raise ValueError( f"Quote payload symbol mismatch: expected {normalized_symbol}, got {normalized_payload_symbol}" ) normalized["symbol"] = normalized_payload_symbol # Add explicit quote_unit for GLD (CORE-002A/B compatibility) # Repair missing or empty unit metadata, but preserve explicit non-empty values if normalized["symbol"] == "GLD" and not normalized.get("quote_unit"): normalized["quote_unit"] = "share" return normalized @staticmethod def _fallback_quote(symbol: str, source: str) -> dict[str, Any]: return { "symbol": symbol, "price": 215.0, "quote_unit": "share", "change": 0.0, "change_percent": 0.0, "updated_at": datetime.now(timezone.utc).isoformat(), "source": source, }