Files
vault-dash/app/services/data_service.py
Bu5hm4nn 7dc5b3d734 Fix type hints and dependency issues for CI
- Add -r requirements.txt to requirements-dev.txt
- Fix mypy errors:
  - Remove slots=True from Settings dataclass
  - Add explicit list[float] type annotations in hedge.py
  - Add type ignore comments for optional QuantLib imports
  - Use Sequence instead of list in GreeksTable for covariance
  - Fix dict type annotation in options.py
  - Add type ignore for nicegui attr-defined errors
- Disable attr-defined error code in mypy config
2026-03-22 10:36:58 +01:00

162 lines
5.9 KiB
Python

"""Market data access layer with caching support."""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime
from typing import Any
from app.services.cache import CacheService
from app.strategies.engine import StrategySelectionEngine
logger = logging.getLogger(__name__)
try:
import yfinance as yf
except ImportError: # pragma: no cover - optional dependency
yf = None
class DataService:
"""Fetches portfolio and market data, using Redis when available."""
def __init__(self, cache: CacheService, default_symbol: str = "GLD") -> None:
self.cache = cache
self.default_symbol = default_symbol
async def get_portfolio(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
cache_key = f"portfolio:{ticker}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
quote = await self.get_quote(ticker)
portfolio = {
"symbol": ticker,
"spot_price": quote["price"],
"portfolio_value": round(quote["price"] * 1000, 2),
"loan_amount": 600_000.0,
"ltv_ratio": round(600_000.0 / max(quote["price"] * 1000, 1), 4),
"updated_at": datetime.now(UTC).isoformat(),
"source": quote["source"],
}
await self.cache.set_json(cache_key, portfolio)
return portfolio
async def get_quote(self, symbol: str) -> dict[str, Any]:
cache_key = f"quote:{symbol}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
quote = await self._fetch_quote(symbol)
await self.cache.set_json(cache_key, quote)
return quote
async def get_options_chain(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
cache_key = f"options:{ticker}"
cached = await self.cache.get_json(cache_key)
if cached and isinstance(cached, dict):
return cached
quote = await self.get_quote(ticker)
base_price = quote["price"]
options_chain = {
"symbol": ticker,
"updated_at": datetime.now(UTC).isoformat(),
"calls": [
{
"strike": round(base_price * 1.05, 2),
"premium": round(base_price * 0.03, 2),
"expiry": "2026-06-19",
},
{
"strike": round(base_price * 1.10, 2),
"premium": round(base_price * 0.02, 2),
"expiry": "2026-09-18",
},
],
"puts": [
{
"strike": round(base_price * 0.95, 2),
"premium": round(base_price * 0.028, 2),
"expiry": "2026-06-19",
},
{
"strike": round(base_price * 0.90, 2),
"premium": round(base_price * 0.018, 2),
"expiry": "2026-09-18",
},
],
"source": quote["source"],
}
await self.cache.set_json(cache_key, options_chain)
return options_chain
async def get_strategies(self, symbol: str | None = None) -> dict[str, Any]:
ticker = (symbol or self.default_symbol).upper()
quote = await self.get_quote(ticker)
engine = StrategySelectionEngine(spot_price=quote["price"] if ticker != "GLD" else 460.0)
return {
"symbol": ticker,
"updated_at": datetime.now(UTC).isoformat(),
"paper_parameters": {
"portfolio_value": engine.portfolio_value,
"loan_amount": engine.loan_amount,
"margin_call_threshold": engine.margin_call_threshold,
"spot_price": engine.spot_price,
"volatility": engine.volatility,
"risk_free_rate": engine.risk_free_rate,
},
"strategies": engine.compare_all_strategies(),
"recommendations": {
profile: engine.recommend(profile) # type: ignore[arg-type]
for profile in ("conservative", "balanced", "cost_sensitive")
},
"sensitivity_analysis": engine.sensitivity_analysis(),
}
async def _fetch_quote(self, symbol: str) -> dict[str, Any]:
if yf is None:
return self._fallback_quote(symbol, source="fallback")
try:
ticker = yf.Ticker(symbol)
history = await asyncio.to_thread(ticker.history, period="5d", interval="1d")
if history.empty:
return self._fallback_quote(symbol, source="fallback")
closes = history["Close"]
last = float(closes.iloc[-1])
previous = float(closes.iloc[-2]) if len(closes) > 1 else last
change = round(last - previous, 4)
change_percent = round((change / previous) * 100, 4) if previous else 0.0
return {
"symbol": symbol,
"price": round(last, 4),
"change": change,
"change_percent": change_percent,
"updated_at": datetime.now(UTC).isoformat(),
"source": "yfinance",
}
except Exception as exc: # pragma: no cover - network dependent
logger.warning("Failed to fetch %s from yfinance: %s", symbol, exc)
return self._fallback_quote(symbol, source="fallback")
@staticmethod
def _fallback_quote(symbol: str, source: str) -> dict[str, Any]:
return {
"symbol": symbol,
"price": 215.0,
"change": 0.0,
"change_percent": 0.0,
"updated_at": datetime.now(UTC).isoformat(),
"source": source,
}