"""Databento historical price source for backtesting.""" from __future__ import annotations import hashlib import json from dataclasses import dataclass from datetime import date, timedelta from pathlib import Path from typing import Any from app.services.backtesting.historical_provider import DailyClosePoint, HistoricalPriceSource try: import databento as db import pandas as pd DATABENTO_AVAILABLE = True except ImportError: DATABENTO_AVAILABLE = False db = None # type: ignore pd = None # type: ignore @dataclass(frozen=True) class DatabentoCacheKey: """Cache key for Databento data requests.""" dataset: str symbol: str schema: str start_date: date end_date: date def cache_path(self, cache_dir: Path) -> Path: """Generate cache file path from key.""" key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] return cache_dir / f"dbn_{key_hash}.parquet" def metadata_path(self, cache_dir: Path) -> Path: """Generate metadata file path from key.""" key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] return cache_dir / f"dbn_{key_hash}_meta.json" @dataclass class DatabentoSourceConfig: """Configuration for Databento data source.""" api_key: str | None = None # Falls back to DATABENTO_API_KEY env var cache_dir: Path = Path(".cache/databento") dataset: str = "XNAS.BASIC" schema: str = "ohlcv-1d" stype_in: str = "raw_symbol" # Re-download threshold max_cache_age_days: int = 30 def __post_init__(self) -> None: # Ensure cache_dir is a Path if isinstance(self.cache_dir, str): object.__setattr__(self, "cache_dir", Path(self.cache_dir)) class DatabentoHistoricalPriceSource(HistoricalPriceSource): """Databento-based historical price source for backtesting. This provider fetches historical daily OHLCV data from Databento's API and caches it locally to minimize API calls and costs. Key features: - Smart caching with configurable age threshold - Cost estimation before fetching - Symbol-to-dataset resolution (GLD→XNAS.BASIC, GC=F→GLBX.MDP3) - Parquet storage for fast loading Example usage: source = DatabentoHistoricalPriceSource() prices = source.load_daily_closes("GLD", date(2024, 1, 1), date(2024, 1, 31)) """ def __init__(self, config: DatabentoSourceConfig | None = None) -> None: if not DATABENTO_AVAILABLE: raise RuntimeError("databento package required: pip install databento>=0.30.0") self.config = config or DatabentoSourceConfig() self.config.cache_dir.mkdir(parents=True, exist_ok=True) self._client: Any = None # db.Historical @property def client(self) -> Any: """Get or create Databento client.""" if self._client is None: if db is None: raise RuntimeError("databento package not installed") self._client = db.Historical(key=self.config.api_key) return self._client def _load_from_cache(self, key: DatabentoCacheKey) -> list[DailyClosePoint] | None: """Load cached data if available and fresh.""" cache_file = key.cache_path(self.config.cache_dir) meta_file = key.metadata_path(self.config.cache_dir) if not cache_file.exists() or not meta_file.exists(): return None try: with open(meta_file) as f: meta = json.load(f) # Check cache age download_date = date.fromisoformat(meta["download_date"]) age_days = (date.today() - download_date).days if age_days > self.config.max_cache_age_days: return None # Check parameters match if meta["dataset"] != key.dataset or meta["symbol"] != key.symbol: return None # Load parquet and convert if pd is None: return None df = pd.read_parquet(cache_file) return self._df_to_daily_points(df) except Exception: return None def _save_to_cache(self, key: DatabentoCacheKey, df: Any, cost_usd: float = 0.0) -> None: """Save data to cache.""" if pd is None: return cache_file = key.cache_path(self.config.cache_dir) meta_file = key.metadata_path(self.config.cache_dir) df.to_parquet(cache_file, index=False) meta = { "download_date": date.today().isoformat(), "dataset": key.dataset, "symbol": key.symbol, "schema": key.schema, "start_date": key.start_date.isoformat(), "end_date": key.end_date.isoformat(), "rows": len(df), "cost_usd": cost_usd, } with open(meta_file, "w") as f: json.dump(meta, f, indent=2) def _fetch_from_databento(self, key: DatabentoCacheKey) -> Any: """Fetch data from Databento API.""" data = self.client.timeseries.get_range( dataset=key.dataset, symbols=key.symbol, schema=key.schema, start=key.start_date.isoformat(), end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end stype_in=self.config.stype_in, ) return data.to_df() def _df_to_daily_points(self, df: Any) -> list[DailyClosePoint]: """Convert DataFrame to DailyClosePoint list.""" if pd is None: return [] points = [] for idx, row in df.iterrows(): # Databento ohlcv schema has ts_event as timestamp ts = row.get("ts_event", row.get("ts_recv", idx)) if hasattr(ts, "date"): row_date = ts.date() else: # Parse ISO date string ts_str = str(ts) row_date = date.fromisoformat(ts_str[:10]) # Databento prices are int64 scaled by 1e-9 close_raw = row.get("close", 0) if isinstance(close_raw, (int, float)): close = float(close_raw) / 1e9 if close_raw > 1e9 else float(close_raw) else: close = float(close_raw) if close > 0: points.append(DailyClosePoint(date=row_date, close=close)) return sorted(points, key=lambda p: p.date) def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]: """Load daily closing prices from Databento (with caching). Args: symbol: Trading symbol (GLD, GC=F, XAU) start_date: Inclusive start date end_date: Inclusive end date Returns: List of DailyClosePoint sorted by date """ # Map symbols to datasets dataset = self._resolve_dataset(symbol) databento_symbol = self._resolve_symbol(symbol) key = DatabentoCacheKey( dataset=dataset, symbol=databento_symbol, schema=self.config.schema, start_date=start_date, end_date=end_date, ) # Try cache first cached = self._load_from_cache(key) if cached is not None: return cached # Fetch from Databento df = self._fetch_from_databento(key) # Get cost estimate (approximate) try: cost_usd = self.get_cost_estimate(symbol, start_date, end_date) except Exception: cost_usd = 0.0 # Cache results self._save_to_cache(key, df, cost_usd) return self._df_to_daily_points(df) def _resolve_dataset(self, symbol: str) -> str: """Resolve symbol to Databento dataset.""" symbol_upper = symbol.upper() if symbol_upper in ("GLD", "GLDM", "IAU"): return "XNAS.BASIC" # ETFs on Nasdaq elif symbol_upper in ("GC=F", "GC", "GOLD"): return "GLBX.MDP3" # CME gold futures elif symbol_upper == "XAU": return "XNAS.BASIC" # Treat as GLD proxy else: return self.config.dataset # Use configured default def _resolve_symbol(self, symbol: str) -> str: """Resolve vault-dash symbol to Databento symbol.""" symbol_upper = symbol.upper() if symbol_upper == "XAU": return "GLD" # Proxy XAU via GLD prices elif symbol_upper == "GC=F": return "GC" # Use parent symbol for continuous contracts return symbol_upper def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float: """Estimate cost in USD for a data request. Args: symbol: Trading symbol start_date: Start date end_date: End date Returns: Estimated cost in USD """ dataset = self._resolve_dataset(symbol) databento_symbol = self._resolve_symbol(symbol) try: cost = self.client.metadata.get_cost( dataset=dataset, symbols=databento_symbol, schema=self.config.schema, start=start_date.isoformat(), end=(end_date + timedelta(days=1)).isoformat(), ) return float(cost) except Exception: return 0.0 # Return 0 if cost estimation fails def get_available_range(self, symbol: str) -> tuple[date | None, date | None]: """Get the available date range for a symbol. Args: symbol: Trading symbol Returns: Tuple of (start_date, end_date) or (None, None) if unavailable """ dataset = self._resolve_dataset(symbol) try: range_info = self.client.metadata.get_dataset_range(dataset=dataset) start_str = range_info.get("start", "") end_str = range_info.get("end", "") start = date.fromisoformat(start_str[:10]) if start_str else None end = date.fromisoformat(end_str[:10]) if end_str else None return start, end except Exception: return None, None def clear_cache(self) -> int: """Clear all cached data files. Returns: Number of files deleted """ count = 0 for file in self.config.cache_dir.glob("*"): if file.is_file(): file.unlink() count += 1 return count def get_cache_stats(self) -> dict[str, Any]: """Get cache statistics. Returns: Dict with total_size_bytes, file_count, oldest_download, entries """ total_size = 0 file_count = 0 oldest_download: date | None = None entries: list[dict[str, Any]] = [] for meta_file in self.config.cache_dir.glob("*_meta.json"): try: with open(meta_file) as f: meta = json.load(f) download_date = date.fromisoformat(meta["download_date"]) cache_file = meta_file.with_name(meta_file.stem.replace("_meta", "") + ".parquet") size = cache_file.stat().st_size if cache_file.exists() else 0 total_size += size file_count += 2 # meta + parquet if oldest_download is None or download_date < oldest_download: oldest_download = download_date entries.append( { "dataset": meta["dataset"], "symbol": meta["symbol"], "start_date": meta["start_date"], "end_date": meta["end_date"], "rows": meta.get("rows", 0), "cost_usd": meta.get("cost_usd", 0.0), "download_date": meta["download_date"], "size_bytes": size, } ) except Exception: continue return { "total_size_bytes": total_size, "file_count": file_count, "oldest_download": oldest_download.isoformat() if oldest_download else None, "entries": entries, }