chore: enforce linting as part of build
This commit is contained in:
@@ -64,7 +64,7 @@ class LombardPortfolio:
|
||||
@dataclass
|
||||
class PortfolioConfig:
|
||||
"""User portfolio configuration with validation.
|
||||
|
||||
|
||||
Attributes:
|
||||
gold_value: Current gold collateral value in USD
|
||||
loan_amount: Outstanding loan amount in USD
|
||||
@@ -72,27 +72,27 @@ class PortfolioConfig:
|
||||
monthly_budget: Approved monthly hedge budget
|
||||
ltv_warning: LTV warning level for alerts (default 0.70)
|
||||
"""
|
||||
|
||||
|
||||
gold_value: float = 215000.0
|
||||
loan_amount: float = 145000.0
|
||||
margin_threshold: float = 0.75
|
||||
monthly_budget: float = 8000.0
|
||||
ltv_warning: float = 0.70
|
||||
|
||||
|
||||
# Data source settings
|
||||
primary_source: str = "yfinance"
|
||||
fallback_source: str = "yfinance"
|
||||
refresh_interval: int = 5
|
||||
|
||||
|
||||
# Alert settings
|
||||
volatility_spike: float = 0.25
|
||||
spot_drawdown: float = 7.5
|
||||
email_alerts: bool = False
|
||||
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate configuration after initialization."""
|
||||
self.validate()
|
||||
|
||||
|
||||
def validate(self) -> None:
|
||||
"""Validate configuration values."""
|
||||
if self.gold_value <= 0:
|
||||
@@ -107,31 +107,31 @@ class PortfolioConfig:
|
||||
raise ValueError("LTV warning level must be between 10% and 95%")
|
||||
if self.refresh_interval < 1:
|
||||
raise ValueError("Refresh interval must be at least 1 second")
|
||||
|
||||
|
||||
@property
|
||||
def current_ltv(self) -> float:
|
||||
"""Calculate current loan-to-value ratio."""
|
||||
if self.gold_value == 0:
|
||||
return 0.0
|
||||
return self.loan_amount / self.gold_value
|
||||
|
||||
|
||||
@property
|
||||
def margin_buffer(self) -> float:
|
||||
"""Calculate margin buffer (distance to margin call)."""
|
||||
return self.margin_threshold - self.current_ltv
|
||||
|
||||
|
||||
@property
|
||||
def net_equity(self) -> float:
|
||||
"""Calculate net equity (gold value - loan)."""
|
||||
return self.gold_value - self.loan_amount
|
||||
|
||||
|
||||
@property
|
||||
def margin_call_price(self) -> float:
|
||||
"""Calculate gold price at which margin call occurs."""
|
||||
if self.margin_threshold == 0:
|
||||
return float('inf')
|
||||
return float("inf")
|
||||
return self.loan_amount / self.margin_threshold
|
||||
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert configuration to dictionary."""
|
||||
return {
|
||||
@@ -147,7 +147,7 @@ class PortfolioConfig:
|
||||
"spot_drawdown": self.spot_drawdown,
|
||||
"email_alerts": self.email_alerts,
|
||||
}
|
||||
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> PortfolioConfig:
|
||||
"""Create configuration from dictionary."""
|
||||
@@ -156,31 +156,31 @@ class PortfolioConfig:
|
||||
|
||||
class PortfolioRepository:
|
||||
"""Repository for persisting portfolio configuration.
|
||||
|
||||
|
||||
Uses file-based storage by default. Can be extended to use Redis.
|
||||
"""
|
||||
|
||||
|
||||
CONFIG_PATH = Path("data/portfolio_config.json")
|
||||
|
||||
|
||||
def __init__(self):
|
||||
# Ensure data directory exists
|
||||
self.CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def save(self, config: PortfolioConfig) -> None:
|
||||
"""Save configuration to disk."""
|
||||
with open(self.CONFIG_PATH, "w") as f:
|
||||
json.dump(config.to_dict(), f, indent=2)
|
||||
|
||||
|
||||
def load(self) -> PortfolioConfig:
|
||||
"""Load configuration from disk.
|
||||
|
||||
|
||||
Returns default configuration if file doesn't exist.
|
||||
"""
|
||||
if not self.CONFIG_PATH.exists():
|
||||
default = PortfolioConfig()
|
||||
self.save(default)
|
||||
return default
|
||||
|
||||
|
||||
try:
|
||||
with open(self.CONFIG_PATH) as f:
|
||||
data = json.load(f)
|
||||
|
||||
@@ -81,6 +81,7 @@ def get_cache() -> CacheService:
|
||||
global _cache_instance
|
||||
if _cache_instance is None:
|
||||
import os
|
||||
|
||||
redis_url = os.environ.get("REDIS_URL")
|
||||
_cache_instance = CacheService(redis_url)
|
||||
return _cache_instance
|
||||
|
||||
@@ -110,7 +110,9 @@ class DataService:
|
||||
await self.cache.set_json(cache_key, payload)
|
||||
return payload
|
||||
|
||||
async def get_options_chain_for_expiry(self, symbol: str | None = None, expiry: str | None = None) -> dict[str, Any]:
|
||||
async def get_options_chain_for_expiry(
|
||||
self, symbol: str | None = None, expiry: str | None = None
|
||||
) -> dict[str, Any]:
|
||||
ticker_symbol = (symbol or self.default_symbol).upper()
|
||||
expirations_data = await self.get_option_expirations(ticker_symbol)
|
||||
expirations = list(expirations_data.get("expirations") or [])
|
||||
@@ -176,7 +178,9 @@ class DataService:
|
||||
await self.cache.set_json(cache_key, payload)
|
||||
return payload
|
||||
except Exception as exc: # pragma: no cover - network dependent
|
||||
logger.warning("Failed to fetch options chain for %s %s from yfinance: %s", ticker_symbol, target_expiry, exc)
|
||||
logger.warning(
|
||||
"Failed to fetch options chain for %s %s from yfinance: %s", ticker_symbol, target_expiry, exc
|
||||
)
|
||||
payload = self._fallback_options_chain(
|
||||
ticker_symbol,
|
||||
quote,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import yfinance as yf
|
||||
@@ -16,6 +16,7 @@ logger = logging.getLogger(__name__)
|
||||
@dataclass
|
||||
class PriceData:
|
||||
"""Price data for a symbol."""
|
||||
|
||||
symbol: str
|
||||
price: float
|
||||
currency: str
|
||||
@@ -25,19 +26,19 @@ class PriceData:
|
||||
|
||||
class PriceFeed:
|
||||
"""Live price feed service using yfinance with Redis caching."""
|
||||
|
||||
|
||||
CACHE_TTL_SECONDS = 60
|
||||
DEFAULT_SYMBOLS = ["GLD", "TLT", "BTC-USD"]
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self._cache = get_cache()
|
||||
|
||||
|
||||
async def get_price(self, symbol: str) -> Optional[PriceData]:
|
||||
"""Get current price for a symbol, with caching.
|
||||
|
||||
|
||||
Args:
|
||||
symbol: Yahoo Finance symbol (e.g., "GLD", "BTC-USD")
|
||||
|
||||
|
||||
Returns:
|
||||
PriceData or None if fetch fails
|
||||
"""
|
||||
@@ -47,7 +48,7 @@ class PriceFeed:
|
||||
cached = await self._cache.get(cache_key)
|
||||
if cached:
|
||||
return PriceData(**cached)
|
||||
|
||||
|
||||
# Fetch from yfinance
|
||||
try:
|
||||
data = await self._fetch_yfinance(symbol)
|
||||
@@ -55,44 +56,39 @@ class PriceFeed:
|
||||
# Cache the result
|
||||
if self._cache.enabled:
|
||||
await self._cache.set(
|
||||
cache_key,
|
||||
cache_key,
|
||||
{
|
||||
"symbol": data.symbol,
|
||||
"price": data.price,
|
||||
"currency": data.currency,
|
||||
"timestamp": data.timestamp.isoformat(),
|
||||
"source": data.source
|
||||
"source": data.source,
|
||||
},
|
||||
ttl=self.CACHE_TTL_SECONDS
|
||||
ttl=self.CACHE_TTL_SECONDS,
|
||||
)
|
||||
return data
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch price for {symbol}: {e}")
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def _fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
|
||||
"""Fetch price from yfinance (run in thread pool to avoid blocking)."""
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self._sync_fetch_yfinance, symbol)
|
||||
|
||||
|
||||
def _sync_fetch_yfinance(self, symbol: str) -> Optional[PriceData]:
|
||||
"""Synchronous yfinance fetch."""
|
||||
ticker = yf.Ticker(symbol)
|
||||
hist = ticker.history(period="1d", interval="1m")
|
||||
|
||||
|
||||
if not hist.empty:
|
||||
last_price = hist["Close"].iloc[-1]
|
||||
currency = ticker.info.get("currency", "USD")
|
||||
|
||||
return PriceData(
|
||||
symbol=symbol,
|
||||
price=float(last_price),
|
||||
currency=currency,
|
||||
timestamp=datetime.utcnow()
|
||||
)
|
||||
|
||||
return PriceData(symbol=symbol, price=float(last_price), currency=currency, timestamp=datetime.utcnow())
|
||||
return None
|
||||
|
||||
|
||||
async def get_prices(self, symbols: list[str]) -> dict[str, Optional[PriceData]]:
|
||||
"""Get prices for multiple symbols concurrently."""
|
||||
tasks = [self.get_price(s) for s in symbols]
|
||||
|
||||
Reference in New Issue
Block a user