706 lines
29 KiB
Python
706 lines
29 KiB
Python
"""Market data access layer with caching support."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import math
|
|
from datetime import date, datetime, timezone
|
|
from typing import Any
|
|
|
|
from app.core.calculations import option_row_greeks
|
|
from app.domain.instruments import gld_ounces_per_share
|
|
from app.services.cache import CacheService
|
|
from app.strategies.engine import StrategySelectionEngine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import yfinance as yf
|
|
except ImportError: # pragma: no cover - optional dependency
|
|
yf = None
|
|
|
|
|
|
class DataService:
|
|
"""Fetches portfolio and market data, using Redis when available."""
|
|
|
|
def __init__(self, cache: CacheService, default_symbol: str = "GLD") -> None:
|
|
self.cache = cache
|
|
self.default_symbol = default_symbol
|
|
self.gc_f_symbol = "GC=F" # COMEX Gold Futures
|
|
|
|
async def get_portfolio(self, symbol: str | None = None) -> dict[str, Any]:
|
|
ticker = (symbol or self.default_symbol).upper()
|
|
cache_key = f"portfolio:{ticker}"
|
|
|
|
cached = await self.cache.get_json(cache_key)
|
|
if cached and isinstance(cached, dict):
|
|
return cached
|
|
|
|
quote = await self.get_quote(ticker)
|
|
portfolio = {
|
|
"symbol": ticker,
|
|
"spot_price": quote["price"],
|
|
"portfolio_value": round(quote["price"] * 1000, 2),
|
|
"loan_amount": 600_000.0,
|
|
"ltv_ratio": round(600_000.0 / max(quote["price"] * 1000, 1), 4),
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"source": quote["source"],
|
|
}
|
|
await self.cache.set_json(cache_key, portfolio)
|
|
return portfolio
|
|
|
|
async def get_quote(self, symbol: str) -> dict[str, Any]:
|
|
normalized_symbol = symbol.upper()
|
|
cache_key = f"quote:{normalized_symbol}"
|
|
cached = await self.cache.get_json(cache_key)
|
|
if cached and isinstance(cached, dict):
|
|
try:
|
|
normalized_cached = self._normalize_quote_payload(cached, normalized_symbol)
|
|
except ValueError:
|
|
normalized_cached = None
|
|
if normalized_cached is not None:
|
|
if normalized_cached != cached:
|
|
await self.cache.set_json(cache_key, normalized_cached)
|
|
return normalized_cached
|
|
|
|
quote = self._normalize_quote_payload(await self._fetch_quote(normalized_symbol), normalized_symbol)
|
|
await self.cache.set_json(cache_key, quote)
|
|
return quote
|
|
|
|
async def get_option_expirations(self, symbol: str | None = None) -> dict[str, Any]:
|
|
ticker_symbol = (symbol or self.default_symbol).upper()
|
|
cache_key = f"options:{ticker_symbol}:expirations"
|
|
|
|
cached = await self.cache.get_json(cache_key)
|
|
if cached and isinstance(cached, dict):
|
|
malformed_list_shape = (
|
|
not isinstance(cached.get("expirations"), list) and cached.get("expirations") is not None
|
|
)
|
|
try:
|
|
normalized_cached = self._normalize_option_expirations_payload(cached, ticker_symbol)
|
|
except ValueError as exc:
|
|
logger.warning("Discarding cached option expirations payload for %s: %s", ticker_symbol, exc)
|
|
normalized_cached = None
|
|
if malformed_list_shape:
|
|
logger.warning("Discarding malformed cached option expirations payload for %s", ticker_symbol)
|
|
normalized_cached = None
|
|
if normalized_cached is not None:
|
|
if normalized_cached != cached:
|
|
await self.cache.set_json(cache_key, normalized_cached)
|
|
return normalized_cached
|
|
|
|
quote = await self.get_quote(ticker_symbol)
|
|
if yf is None:
|
|
payload = self._fallback_option_expirations(
|
|
ticker_symbol,
|
|
quote,
|
|
source="fallback",
|
|
error="yfinance is not installed",
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
|
|
try:
|
|
ticker = yf.Ticker(ticker_symbol)
|
|
expirations = await asyncio.to_thread(lambda: list(ticker.options or []))
|
|
if not expirations:
|
|
payload = self._fallback_option_expirations(
|
|
ticker_symbol,
|
|
quote,
|
|
source="fallback",
|
|
error="No option expirations returned by yfinance",
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
|
|
payload = self._normalize_option_expirations_payload(
|
|
{
|
|
"symbol": ticker_symbol,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"expirations": expirations,
|
|
"underlying_price": quote["price"],
|
|
"source": "yfinance",
|
|
},
|
|
ticker_symbol,
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
except Exception as exc: # pragma: no cover - network dependent
|
|
logger.warning("Failed to fetch option expirations for %s from yfinance: %s", ticker_symbol, exc)
|
|
payload = self._fallback_option_expirations(
|
|
ticker_symbol,
|
|
quote,
|
|
source="fallback",
|
|
error=str(exc),
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
|
|
async def get_options_chain_for_expiry(
|
|
self, symbol: str | None = None, expiry: str | None = None
|
|
) -> dict[str, Any]:
|
|
ticker_symbol = (symbol or self.default_symbol).upper()
|
|
expirations_data = await self.get_option_expirations(ticker_symbol)
|
|
expirations = list(expirations_data.get("expirations") or [])
|
|
target_expiry = expiry or (expirations[0] if expirations else None)
|
|
quote = await self.get_quote(ticker_symbol)
|
|
|
|
if not target_expiry:
|
|
return self._fallback_options_chain(
|
|
ticker_symbol,
|
|
quote,
|
|
expirations=expirations,
|
|
selected_expiry=None,
|
|
source=expirations_data.get("source", quote.get("source", "fallback")),
|
|
error=expirations_data.get("error"),
|
|
)
|
|
|
|
cache_key = f"options:{ticker_symbol}:{target_expiry}"
|
|
cached = await self.cache.get_json(cache_key)
|
|
if cached and isinstance(cached, dict):
|
|
malformed_list_shape = any(
|
|
not isinstance(cached.get(field), list) and cached.get(field) is not None
|
|
for field in ("expirations", "calls", "puts", "rows")
|
|
)
|
|
try:
|
|
normalized_cached = self._normalize_options_chain_payload(cached, ticker_symbol)
|
|
except ValueError as exc:
|
|
logger.warning(
|
|
"Discarding cached options chain payload for %s %s: %s", ticker_symbol, target_expiry, exc
|
|
)
|
|
normalized_cached = None
|
|
if malformed_list_shape:
|
|
logger.warning(
|
|
"Discarding malformed cached options chain payload for %s %s", ticker_symbol, target_expiry
|
|
)
|
|
normalized_cached = None
|
|
if normalized_cached is not None:
|
|
if normalized_cached != cached:
|
|
await self.cache.set_json(cache_key, normalized_cached)
|
|
return normalized_cached
|
|
|
|
if yf is None:
|
|
payload = self._fallback_options_chain(
|
|
ticker_symbol,
|
|
quote,
|
|
expirations=expirations,
|
|
selected_expiry=target_expiry,
|
|
source="fallback",
|
|
error="yfinance is not installed",
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
|
|
try:
|
|
ticker = yf.Ticker(ticker_symbol)
|
|
chain = await asyncio.to_thread(ticker.option_chain, target_expiry)
|
|
calls = self._normalize_option_rows(chain.calls, ticker_symbol, target_expiry, "call", quote["price"])
|
|
puts = self._normalize_option_rows(chain.puts, ticker_symbol, target_expiry, "put", quote["price"])
|
|
|
|
if not calls and not puts:
|
|
payload = self._fallback_options_chain(
|
|
ticker_symbol,
|
|
quote,
|
|
expirations=expirations,
|
|
selected_expiry=target_expiry,
|
|
source="fallback",
|
|
error="No option contracts returned by yfinance",
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
|
|
payload = self._normalize_options_chain_payload(
|
|
{
|
|
"symbol": ticker_symbol,
|
|
"selected_expiry": target_expiry,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"expirations": expirations,
|
|
"calls": calls,
|
|
"puts": puts,
|
|
"rows": sorted(calls + puts, key=lambda row: (row["strike"], row["type"])),
|
|
"underlying_price": quote["price"],
|
|
"source": "yfinance",
|
|
},
|
|
ticker_symbol,
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
except Exception as exc: # pragma: no cover - network dependent
|
|
logger.warning(
|
|
"Failed to fetch options chain for %s %s from yfinance: %s", ticker_symbol, target_expiry, exc
|
|
)
|
|
payload = self._fallback_options_chain(
|
|
ticker_symbol,
|
|
quote,
|
|
expirations=expirations,
|
|
selected_expiry=target_expiry,
|
|
source="fallback",
|
|
error=str(exc),
|
|
)
|
|
await self.cache.set_json(cache_key, payload)
|
|
return payload
|
|
|
|
async def get_options_chain(self, symbol: str | None = None) -> dict[str, Any]:
|
|
ticker_symbol = (symbol or self.default_symbol).upper()
|
|
expirations_data = await self.get_option_expirations(ticker_symbol)
|
|
expirations = list(expirations_data.get("expirations") or [])
|
|
if not expirations:
|
|
quote = await self.get_quote(ticker_symbol)
|
|
return self._fallback_options_chain(
|
|
ticker_symbol,
|
|
quote,
|
|
expirations=[],
|
|
selected_expiry=None,
|
|
source=expirations_data.get("source", quote.get("source", "fallback")),
|
|
error=expirations_data.get("error"),
|
|
)
|
|
return await self.get_options_chain_for_expiry(ticker_symbol, expirations[0])
|
|
|
|
async def get_gc_futures(self) -> dict[str, Any]:
|
|
"""Fetch GC=F (COMEX Gold Futures) quote.
|
|
|
|
Returns a quote dict similar to get_quote but for gold futures.
|
|
Falls back gracefully if GC=F is unavailable.
|
|
"""
|
|
cache_key = f"quote:{self.gc_f_symbol}"
|
|
cached = await self.cache.get_json(cache_key)
|
|
if cached and isinstance(cached, dict):
|
|
try:
|
|
normalized_cached = self._normalize_quote_payload(cached, self.gc_f_symbol)
|
|
except ValueError:
|
|
normalized_cached = None
|
|
if normalized_cached is not None:
|
|
if normalized_cached != cached:
|
|
await self.cache.set_json(cache_key, normalized_cached)
|
|
return normalized_cached
|
|
|
|
quote = self._normalize_quote_payload(await self._fetch_gc_futures(), self.gc_f_symbol)
|
|
await self.cache.set_json(cache_key, quote)
|
|
return quote
|
|
|
|
async def _fetch_gc_futures(self) -> dict[str, Any]:
|
|
"""Fetch GC=F from yfinance with graceful fallback."""
|
|
if yf is None:
|
|
return self._fallback_gc_futures(source="fallback", error="yfinance is not installed")
|
|
|
|
try:
|
|
ticker = yf.Ticker(self.gc_f_symbol)
|
|
history = await asyncio.to_thread(ticker.history, period="5d", interval="1d")
|
|
if history.empty:
|
|
return self._fallback_gc_futures(source="fallback", error="No history returned for GC=F")
|
|
|
|
closes = history["Close"]
|
|
last = float(closes.iloc[-1])
|
|
previous = float(closes.iloc[-2]) if len(closes) > 1 else last
|
|
change = round(last - previous, 4)
|
|
change_percent = round((change / previous) * 100, 4) if previous else 0.0
|
|
|
|
# Try to get more recent price from fast_info if available
|
|
try:
|
|
fast_price = ticker.fast_info.get("lastPrice", last)
|
|
if fast_price and fast_price > 0:
|
|
last = float(fast_price)
|
|
except Exception:
|
|
pass # Keep history close if fast_info unavailable
|
|
|
|
return {
|
|
"symbol": self.gc_f_symbol,
|
|
"price": round(last, 4),
|
|
"quote_unit": "ozt", # Gold futures are per troy ounce
|
|
"change": change,
|
|
"change_percent": change_percent,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"source": "yfinance",
|
|
}
|
|
except Exception as exc: # pragma: no cover - network dependent
|
|
logger.warning("Failed to fetch %s from yfinance: %s", self.gc_f_symbol, exc)
|
|
return self._fallback_gc_futures(source="fallback", error=str(exc))
|
|
|
|
@staticmethod
|
|
def _fallback_gc_futures(source: str, error: str | None = None) -> dict[str, Any]:
|
|
"""Fallback GC=F quote when live data unavailable."""
|
|
payload = {
|
|
"symbol": "GC=F",
|
|
"price": 2700.0, # Fallback estimate
|
|
"quote_unit": "ozt",
|
|
"change": 0.0,
|
|
"change_percent": 0.0,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"source": source,
|
|
}
|
|
if error:
|
|
payload["error"] = error
|
|
return payload
|
|
|
|
async def get_basis_data(self) -> dict[str, Any]:
|
|
"""Get GLD/GC=F basis data for comparison.
|
|
|
|
Returns:
|
|
Dict with GLD implied spot, GC=F adjusted price, basis in bps, and status info.
|
|
"""
|
|
gld_quote = await self.get_quote("GLD")
|
|
gc_f_quote = await self.get_gc_futures()
|
|
|
|
# Use current date for GLD ounces calculation
|
|
ounces_per_share = float(gld_ounces_per_share(date.today()))
|
|
|
|
# GLD implied spot = GLD_price / ounces_per_share
|
|
gld_price = gld_quote.get("price", 0.0)
|
|
gld_implied_spot = gld_price / ounces_per_share if ounces_per_share > 0 and gld_price > 0 else 0.0
|
|
|
|
# GC=F adjusted = (GC=F - contango_estimate) / 10 for naive comparison
|
|
# But actually GC=F is already per oz, so we just adjust for contango
|
|
gc_f_price = gc_f_quote.get("price", 0.0)
|
|
contango_estimate = 10.0 # Typical contango ~$10/oz
|
|
gc_f_adjusted = gc_f_price - contango_estimate if gc_f_price > 0 else 0.0
|
|
|
|
# Basis in bps = (GLD_implied_spot / GC=F_adjusted - 1) * 10000
|
|
basis_bps = 0.0
|
|
if gc_f_adjusted > 0 and gld_implied_spot > 0:
|
|
basis_bps = (gld_implied_spot / gc_f_adjusted - 1) * 10000
|
|
|
|
# Determine basis status
|
|
abs_basis = abs(basis_bps)
|
|
if abs_basis < 25:
|
|
basis_status = "green"
|
|
basis_label = "Normal"
|
|
elif abs_basis < 50:
|
|
basis_status = "yellow"
|
|
basis_label = "Elevated"
|
|
else:
|
|
basis_status = "red"
|
|
basis_label = "Warning"
|
|
|
|
# After-hours check: compare timestamps
|
|
gld_updated = gld_quote.get("updated_at", "")
|
|
gc_f_updated = gc_f_quote.get("updated_at", "")
|
|
after_hours = False
|
|
after_hours_note = ""
|
|
|
|
try:
|
|
gld_time = datetime.fromisoformat(gld_updated.replace("Z", "+00:00"))
|
|
gc_f_time = datetime.fromisoformat(gc_f_updated.replace("Z", "+00:00"))
|
|
# If GC=F updated much more recently, likely after-hours
|
|
time_diff = (gc_f_time - gld_time).total_seconds()
|
|
if time_diff > 3600: # More than 1 hour difference
|
|
after_hours = True
|
|
after_hours_note = "GLD quote may be stale (after-hours)"
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"gld_implied_spot": round(gld_implied_spot, 2),
|
|
"gld_price": round(gld_price, 2),
|
|
"gld_ounces_per_share": round(ounces_per_share, 4),
|
|
"gc_f_price": round(gc_f_price, 2),
|
|
"gc_f_adjusted": round(gc_f_adjusted, 2),
|
|
"contango_estimate": contango_estimate,
|
|
"basis_bps": round(basis_bps, 1),
|
|
"basis_status": basis_status,
|
|
"basis_label": basis_label,
|
|
"after_hours": after_hours,
|
|
"after_hours_note": after_hours_note,
|
|
"gld_updated_at": gld_updated,
|
|
"gc_f_updated_at": gc_f_updated,
|
|
"gld_source": gld_quote.get("source", "unknown"),
|
|
"gc_f_source": gc_f_quote.get("source", "unknown"),
|
|
}
|
|
|
|
async def get_strategies(self, symbol: str | None = None) -> dict[str, Any]:
|
|
ticker = (symbol or self.default_symbol).upper()
|
|
quote = await self.get_quote(ticker)
|
|
engine = StrategySelectionEngine(spot_price=quote["price"] if ticker != "GLD" else 460.0)
|
|
|
|
return {
|
|
"symbol": ticker,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"paper_parameters": {
|
|
"portfolio_value": engine.portfolio_value,
|
|
"loan_amount": engine.loan_amount,
|
|
"margin_call_threshold": engine.margin_call_threshold,
|
|
"spot_price": engine.spot_price,
|
|
"volatility": engine.volatility,
|
|
"risk_free_rate": engine.risk_free_rate,
|
|
},
|
|
"strategies": engine.compare_all_strategies(),
|
|
"recommendations": {
|
|
profile: engine.recommend(profile) # type: ignore[arg-type]
|
|
for profile in ("conservative", "balanced", "cost_sensitive")
|
|
},
|
|
"sensitivity_analysis": engine.sensitivity_analysis(),
|
|
}
|
|
|
|
async def _fetch_quote(self, symbol: str) -> dict[str, Any]:
|
|
if yf is None:
|
|
return self._fallback_quote(symbol, source="fallback")
|
|
|
|
try:
|
|
ticker = yf.Ticker(symbol)
|
|
history = await asyncio.to_thread(ticker.history, period="5d", interval="1d")
|
|
if history.empty:
|
|
return self._fallback_quote(symbol, source="fallback")
|
|
|
|
closes = history["Close"]
|
|
last = float(closes.iloc[-1])
|
|
previous = float(closes.iloc[-2]) if len(closes) > 1 else last
|
|
change = round(last - previous, 4)
|
|
change_percent = round((change / previous) * 100, 4) if previous else 0.0
|
|
return {
|
|
"symbol": symbol,
|
|
"price": round(last, 4),
|
|
"quote_unit": "share",
|
|
"change": change,
|
|
"change_percent": change_percent,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"source": "yfinance",
|
|
}
|
|
except Exception as exc: # pragma: no cover - network dependent
|
|
logger.warning("Failed to fetch %s from yfinance: %s", symbol, exc)
|
|
return self._fallback_quote(symbol, source="fallback")
|
|
|
|
def _fallback_option_expirations(
|
|
self,
|
|
symbol: str,
|
|
quote: dict[str, Any],
|
|
*,
|
|
source: str,
|
|
error: str | None = None,
|
|
) -> dict[str, Any]:
|
|
payload = {
|
|
"symbol": symbol,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"expirations": [],
|
|
"underlying_price": quote["price"],
|
|
"source": source,
|
|
}
|
|
if error:
|
|
payload["error"] = error
|
|
return payload
|
|
|
|
def _fallback_options_chain(
|
|
self,
|
|
symbol: str,
|
|
quote: dict[str, Any],
|
|
*,
|
|
expirations: list[str],
|
|
selected_expiry: str | None,
|
|
source: str,
|
|
error: str | None = None,
|
|
) -> dict[str, Any]:
|
|
options_chain = {
|
|
"symbol": symbol,
|
|
"selected_expiry": selected_expiry,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"expirations": expirations,
|
|
"calls": [],
|
|
"puts": [],
|
|
"rows": [],
|
|
"underlying_price": quote["price"],
|
|
"source": source,
|
|
}
|
|
if error:
|
|
options_chain["error"] = error
|
|
return options_chain
|
|
|
|
@staticmethod
|
|
def _normalize_option_expirations_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]:
|
|
"""Normalize option expirations payload to explicit contract.
|
|
|
|
This is the named boundary adapter between external provider/cache
|
|
payloads and internal option expirations handling. It ensures:
|
|
|
|
- symbol is always present and uppercased
|
|
- expirations is always a list (empty if None/missing)
|
|
- Explicit symbol mismatches are rejected (fail-closed)
|
|
|
|
Args:
|
|
payload: Raw expirations dict from cache or provider
|
|
symbol: Expected symbol (used as fallback if missing from payload)
|
|
|
|
Returns:
|
|
Normalized expirations dict with explicit symbol and list type
|
|
|
|
Raises:
|
|
ValueError: If payload symbol explicitly conflicts with requested symbol
|
|
"""
|
|
normalized: dict[str, Any] = dict(payload)
|
|
normalized_symbol = symbol.upper()
|
|
|
|
# Ensure symbol is always present and normalized.
|
|
# Missing symbol is repaired from the requested key; explicit mismatches are rejected.
|
|
raw_symbol = normalized.get("symbol", normalized_symbol)
|
|
normalized_payload_symbol = str(raw_symbol).upper() if raw_symbol is not None else normalized_symbol
|
|
if raw_symbol is not None and normalized_payload_symbol != normalized_symbol:
|
|
raise ValueError(
|
|
f"Option expirations symbol mismatch: expected {normalized_symbol}, got {normalized_payload_symbol}"
|
|
)
|
|
normalized["symbol"] = normalized_payload_symbol
|
|
|
|
# Ensure expirations is always a list
|
|
expirations = normalized.get("expirations")
|
|
if not isinstance(expirations, list):
|
|
logger.warning(
|
|
"Repairing malformed option expirations payload for %s: expirations was %r",
|
|
normalized_symbol,
|
|
type(expirations).__name__,
|
|
)
|
|
normalized["expirations"] = []
|
|
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _normalize_options_chain_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]:
|
|
"""Normalize options chain payload to explicit contract.
|
|
|
|
This is the named boundary adapter between external provider/cache
|
|
payloads and internal options chain handling. It ensures:
|
|
|
|
- symbol is always present and uppercased
|
|
- calls, puts, rows, and expirations are always lists (empty if None/missing)
|
|
- Explicit symbol mismatches are rejected (fail-closed)
|
|
|
|
Args:
|
|
payload: Raw options chain dict from cache or provider
|
|
symbol: Expected symbol (used as fallback if missing from payload)
|
|
|
|
Returns:
|
|
Normalized options chain dict with explicit symbol and list types
|
|
|
|
Raises:
|
|
ValueError: If payload symbol explicitly conflicts with requested symbol
|
|
"""
|
|
normalized: dict[str, Any] = dict(payload)
|
|
normalized_symbol = symbol.upper()
|
|
|
|
# Ensure symbol is always present and normalized.
|
|
# Missing symbol is repaired from the requested key; explicit mismatches are rejected.
|
|
raw_symbol = normalized.get("symbol", normalized_symbol)
|
|
normalized_payload_symbol = str(raw_symbol).upper() if raw_symbol is not None else normalized_symbol
|
|
if raw_symbol is not None and normalized_payload_symbol != normalized_symbol:
|
|
raise ValueError(
|
|
f"Options chain symbol mismatch: expected {normalized_symbol}, got {normalized_payload_symbol}"
|
|
)
|
|
normalized["symbol"] = normalized_payload_symbol
|
|
|
|
# Ensure list fields are always lists
|
|
for field in ("expirations", "calls", "puts", "rows"):
|
|
if not isinstance(normalized.get(field), list):
|
|
logger.warning(
|
|
"Repairing malformed options chain payload for %s: %s was %r",
|
|
normalized_symbol,
|
|
field,
|
|
type(normalized.get(field)).__name__,
|
|
)
|
|
normalized[field] = []
|
|
|
|
return normalized
|
|
|
|
def _normalize_option_rows(
|
|
self,
|
|
frame: Any,
|
|
symbol: str,
|
|
expiry: str,
|
|
option_type: str,
|
|
underlying_price: float,
|
|
) -> list[dict[str, Any]]:
|
|
if frame is None or getattr(frame, "empty", True):
|
|
return []
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
for item in frame.to_dict(orient="records"):
|
|
strike = self._safe_float(item.get("strike"))
|
|
if strike <= 0:
|
|
continue
|
|
|
|
bid = self._safe_float(item.get("bid"))
|
|
ask = self._safe_float(item.get("ask"))
|
|
last_price = self._safe_float(item.get("lastPrice"))
|
|
implied_volatility = self._safe_float(item.get("impliedVolatility"))
|
|
contract_symbol = str(item.get("contractSymbol") or "").strip()
|
|
|
|
row = {
|
|
"contractSymbol": contract_symbol,
|
|
"symbol": contract_symbol or f"{symbol} {expiry} {option_type.upper()} {strike:.2f}",
|
|
"strike": strike,
|
|
"bid": bid,
|
|
"ask": ask,
|
|
"premium": last_price or self._midpoint(bid, ask),
|
|
"lastPrice": last_price,
|
|
"impliedVolatility": implied_volatility,
|
|
"expiry": expiry,
|
|
"type": option_type,
|
|
"openInterest": int(self._safe_float(item.get("openInterest"))),
|
|
"volume": int(self._safe_float(item.get("volume"))),
|
|
}
|
|
row.update(option_row_greeks(row, underlying_price))
|
|
rows.append(row)
|
|
return rows
|
|
|
|
@staticmethod
|
|
def _safe_float(value: Any) -> float:
|
|
try:
|
|
result = float(value)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
return 0.0 if math.isnan(result) else result
|
|
|
|
@staticmethod
|
|
def _midpoint(bid: float, ask: float) -> float:
|
|
if bid > 0 and ask > 0:
|
|
return round((bid + ask) / 2, 4)
|
|
return max(bid, ask, 0.0)
|
|
|
|
@staticmethod
|
|
def _normalize_quote_payload(payload: dict[str, Any], symbol: str) -> dict[str, Any]:
|
|
"""Normalize provider/cache quote payload to explicit contract.
|
|
|
|
This is the named boundary adapter between external float-heavy provider
|
|
payloads and internal quote handling. It ensures:
|
|
|
|
- symbol is always present and uppercased
|
|
- GLD quotes have explicit quote_unit='share' metadata
|
|
- Non-GLD symbols pass through without auto-assigned units
|
|
|
|
Fail-closed: missing/invalid fields are preserved for upstream handling
|
|
rather than silently defaulted. Type conversion is not performed here.
|
|
|
|
Args:
|
|
payload: Raw quote dict from cache or provider (float-heavy)
|
|
symbol: Expected symbol (used as fallback if missing from payload)
|
|
|
|
Returns:
|
|
Normalized quote dict with explicit symbol and GLD quote_unit
|
|
"""
|
|
normalized: dict[str, Any] = dict(payload)
|
|
normalized_symbol = symbol.upper()
|
|
|
|
# Ensure symbol is always present and normalized.
|
|
# Missing symbol is repaired from the requested key; explicit mismatches are rejected.
|
|
raw_symbol = normalized.get("symbol", normalized_symbol)
|
|
normalized_payload_symbol = str(raw_symbol).upper() if raw_symbol is not None else normalized_symbol
|
|
if raw_symbol is not None and normalized_payload_symbol != normalized_symbol:
|
|
raise ValueError(
|
|
f"Quote payload symbol mismatch: expected {normalized_symbol}, got {normalized_payload_symbol}"
|
|
)
|
|
normalized["symbol"] = normalized_payload_symbol
|
|
|
|
# Add explicit quote_unit for GLD (CORE-002A/B compatibility)
|
|
# Repair missing or empty unit metadata, but preserve explicit non-empty values
|
|
if normalized["symbol"] == "GLD" and not normalized.get("quote_unit"):
|
|
normalized["quote_unit"] = "share"
|
|
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _fallback_quote(symbol: str, source: str) -> dict[str, Any]:
|
|
return {
|
|
"symbol": symbol,
|
|
"price": 215.0,
|
|
"quote_unit": "share",
|
|
"change": 0.0,
|
|
"change_percent": 0.0,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"source": source,
|
|
}
|