"""Databento historical price source for backtesting.""" from __future__ import annotations import hashlib import json import logging from dataclasses import dataclass from datetime import date, timedelta from pathlib import Path from typing import Any logger = logging.getLogger(__name__) # Try to import databento, gracefully degrade if not available try: import databento as db import pandas as pd DATABENTO_AVAILABLE = True except ImportError: db = None pd = None DATABENTO_AVAILABLE = False @dataclass class DatabentoSourceConfig: """Configuration for Databento data source.""" api_key: str | None = None # Falls back to DATABENTO_API_KEY env var cache_dir: Path = Path(".cache/databento") dataset: str = "XNAS.BASIC" schema: str = "ohlcv-1d" stype_in: str = "raw_symbol" # Re-download threshold max_cache_age_days: int = 30 def __post_init__(self) -> None: # Ensure cache_dir is a Path if isinstance(self.cache_dir, str): object.__setattr__(self, "cache_dir", Path(self.cache_dir)) @dataclass(frozen=True) class DatabentoCacheKey: """Cache key for Databento data.""" dataset: str symbol: str schema: str start_date: date end_date: date def cache_path(self, cache_dir: Path) -> Path: key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] return cache_dir / f"dbn_{key_hash}.parquet" def metadata_path(self, cache_dir: Path) -> Path: key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] return cache_dir / f"dbn_{key_hash}_meta.json" class DatabentoHistoricalPriceSource: """Databento-based historical price source for backtesting. This provider fetches historical daily OHLCV data from Databento's API and caches it locally to minimize API calls and costs. Key features: - Smart caching with configurable age threshold - Cost estimation before fetching - Symbol-to-dataset resolution (GLD→XNAS.BASIC, GC=F→GLBX.MDP3) - Parquet storage for fast loading Example usage: source = DatabentoHistoricalPriceSource() prices = source.load_daily_closes("GLD", date(2024, 1, 1), date(2024, 1, 31)) """ def __init__(self, config: DatabentoSourceConfig | None = None) -> None: if not DATABENTO_AVAILABLE: raise RuntimeError("databento package required: pip install databento>=0.30.0") self.config = config or DatabentoSourceConfig() self.config.cache_dir.mkdir(parents=True, exist_ok=True) self._client: Any = None # db.Historical @property def client(self) -> Any: """Get or create Databento client.""" if self._client is None: if db is None: raise RuntimeError("databento package not installed") self._client = db.Historical(key=self.config.api_key) return self._client def _load_from_cache(self, key: DatabentoCacheKey) -> list[dict[str, Any]] | None: """Load cached data if available and fresh.""" cache_file = key.cache_path(self.config.cache_dir) meta_file = key.metadata_path(self.config.cache_dir) if not cache_file.exists() or not meta_file.exists(): return None try: with open(meta_file) as f: meta = json.load(f) # Check dataset and symbol match (for cache invalidation) if meta.get("dataset") != key.dataset or meta.get("symbol") != key.symbol: return None cache_age = (date.today() - date.fromisoformat(meta["download_date"])).days if cache_age > self.config.max_cache_age_days: return None if meta.get("start_date") != key.start_date.isoformat() or meta.get("end_date") != key.end_date.isoformat(): return None if meta.get("dataset") != key.dataset or meta.get("symbol") != key.symbol: return None # Load parquet and convert df = pd.read_parquet(cache_file) return self._df_to_daily_points(df) except Exception: return None def _save_to_cache(self, key: DatabentoCacheKey, df: Any, cost_usd: float = 0.0) -> None: """Save data to cache.""" if pd is None: return cache_file = key.cache_path(self.config.cache_dir) meta_file = key.metadata_path(self.config.cache_dir) df.to_parquet(cache_file, index=False) meta = { "download_date": date.today().isoformat(), "dataset": key.dataset, "symbol": key.symbol, "schema": key.schema, "start_date": key.start_date.isoformat(), "end_date": key.end_date.isoformat(), "rows": len(df), "cost_usd": cost_usd, } with open(meta_file, "w") as f: json.dump(meta, f, indent=2) def _fetch_from_databento(self, key: DatabentoCacheKey) -> Any: """Fetch data from Databento API.""" data = self.client.timeseries.get_range( dataset=key.dataset, symbols=key.symbol, schema=key.schema, start=key.start_date.isoformat(), end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end stype_in=self.config.stype_in, ) return data.to_df() def _df_to_daily_points(self, df: Any) -> list[Any]: """Convert DataFrame to DailyClosePoint list with OHLC data.""" from app.services.backtesting.historical_provider import DailyClosePoint if pd is None: return [] def parse_price(raw_val: Any) -> float | None: """Parse Databento price (int64 scaled by 1e9).""" if raw_val is None or (isinstance(raw_val, float) and pd.isna(raw_val)): return None if isinstance(raw_val, (int, float)): return float(raw_val) / 1e9 if raw_val > 1e9 else float(raw_val) return float(raw_val) if raw_val else None points = [] for idx, row in df.iterrows(): # Databento ohlcv schema has ts_event as timestamp ts = row.get("ts_event", row.get("ts_recv", idx)) if hasattr(ts, "date"): row_date = ts.date() else: # Parse ISO date string ts_str = str(ts) row_date = date.fromisoformat(ts_str[:10]) close = parse_price(row.get("close")) low = parse_price(row.get("low")) high = parse_price(row.get("high")) open_price = parse_price(row.get("open")) if close and close > 0: points.append( DailyClosePoint( date=row_date, close=close, low=low, high=high, open=open_price, ) ) return sorted(points, key=lambda p: p.date) from app.services.backtesting.historical_provider import DailyClosePoint def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]: """Load daily closing prices from Databento (with caching). Args: symbol: Trading symbol (GLD, GC=F, XAU) start_date: Inclusive start date end_date: Inclusive end date Returns: List of DailyClosePoint sorted by date """ # Map symbols to datasets dataset = self._resolve_dataset(symbol) databento_symbol = self._resolve_symbol(symbol) key = DatabentoCacheKey( dataset=dataset, symbol=databento_symbol, schema=self.config.schema, start_date=start_date, end_date=end_date, ) # Try cache first cached = self._load_from_cache(key) if cached is not None: return cached # Fetch from Databento df = self._fetch_from_databento(key) # Get cost estimate (approximate) try: cost_usd = self.get_cost_estimate(symbol, start_date, end_date) except Exception: cost_usd = 0.0 # Cache results self._save_to_cache(key, df, cost_usd) return self._df_to_daily_points(df) def _resolve_dataset(self, symbol: str) -> str: """Resolve symbol to Databento dataset.""" symbol_upper = symbol.upper() if symbol_upper in ("GLD", "GLDM", "IAU"): return "XNAS.BASIC" # ETFs on Nasdaq elif symbol_upper in ("GC=F", "GC", "GOLD"): return "GLBX.MDP3" # CME gold futures elif symbol_upper == "XAU": return "XNAS.BASIC" # Treat as GLD proxy else: return self.config.dataset # Use configured default def _resolve_symbol(self, symbol: str) -> str: """Resolve vault-dash symbol to Databento symbol.""" symbol_upper = symbol.upper() if symbol_upper == "XAU": return "GLD" # Proxy XAU via GLD prices elif symbol_upper == "GC=F": return "GC" # Use parent symbol for continuous contracts return symbol_upper def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float: """Estimate cost in USD for a data request. Args: symbol: Trading symbol start_date: Start date end_date: End date Returns: Estimated cost in USD """ dataset = self._resolve_dataset(symbol) databento_symbol = self._resolve_symbol(symbol) try: cost = self.client.metadata.get_cost( dataset=dataset, symbols=databento_symbol, schema=self.config.schema, start=start_date.isoformat(), end=(end_date + timedelta(days=1)).isoformat(), ) return float(cost) except Exception: return 0.0 # Return 0 if cost estimation fails def get_available_range(self, symbol: str) -> tuple[date | None, date | None]: """Get the available date range for a symbol from Databento. Args: symbol: Trading symbol Returns: Tuple of (start_date, end_date) or (None, None) if unavailable """ # Note: Databento availability depends on the dataset # For now, return None to indicate we should try fetching return None, None def get_cache_stats(self) -> dict[str, Any]: """Get cache statistics.""" cache_dir = self.config.cache_dir if not cache_dir.exists(): return {"status": "empty", "entries": [], "file_count": 0, "total_size_bytes": 0} entries = [] total_size = 0 file_count = 0 for meta_file in cache_dir.glob("*_meta.json"): try: with open(meta_file) as f: meta = json.load(f) entries.append( { "symbol": meta.get("symbol"), "dataset": meta.get("dataset"), "start_date": meta.get("start_date"), "end_date": meta.get("end_date"), "download_date": meta.get("download_date"), "rows": meta.get("rows"), "cost_usd": meta.get("cost_usd"), } ) total_size += meta_file.stat().st_size file_count += 1 except Exception: continue # Count parquet files too for parquet_file in cache_dir.glob("dbn_*.parquet"): total_size += parquet_file.stat().st_size file_count += 1 return { "status": "populated" if entries else "empty", "entries": entries, "file_count": file_count, "total_size_bytes": total_size, } def clear_cache(self) -> int: """Clear all cache files. Returns: Number of files deleted. """ cache_dir = self.config.cache_dir if not cache_dir.exists(): return 0 count = 0 for cache_file in cache_dir.glob("dbn_*.parquet"): cache_file.unlink() count += 1 for meta_file in cache_dir.glob("dbn_*_meta.json"): meta_file.unlink() count += 1 return count