# Databento Historical Data Integration Plan ## Overview Integrate Databento historical API for backtesting and scenario comparison pages, replacing yfinance for historical data on these pages. The integration will support configurable start prices/values independent of portfolio settings, with intelligent caching to avoid redundant downloads. ## Architecture ### Current State - **Backtest page** (`app/pages/backtests.py`): Uses `YFinanceHistoricalPriceSource` via `BacktestPageService` - **Event comparison** (`app/pages/event_comparison.py`): Uses seeded event presets with yfinance data - **Historical provider** (`app/services/backtesting/historical_provider.py`): Protocol-based architecture with `YFinanceHistoricalPriceSource` and `SyntheticHistoricalProvider` ### Target State - Add `DatabentoHistoricalPriceSource` implementing `HistoricalPriceSource` protocol - Add `DatabentoHistoricalOptionSource` implementing `OptionSnapshotSource` protocol (future) - Smart caching layer: only re-download when parameters change - Pre-seeded scenario data via batch downloads ## Databento Data Sources ### Underlyings and Datasets | Instrument | Dataset | Symbol Format | Notes | |------------|---------|----------------|-------| | GLD ETF | `XNAS.BASIC` or `EQUS.PLUS` | `GLD` | US equities consolidated | | GC=F Futures | `GLBX.MDP3` | `GC` + continuous or `GC=F` raw | Gold futures | | Gold Options | `OPRA.PILLAR` | `GLD` underlying | Options on GLD ETF | ### Schemas | Schema | Use Case | Fields | |--------|----------|--------| | `ohlcv-1d` | Daily backtesting | open, high, low, close, volume | | `ohlcv-1h` | Intraday scenarios | Hourly bars | | `trades` | Tick-level analysis | Full trade data | | `definition` | Instrument metadata | Expiries, strike prices, tick sizes | ## Implementation Plan ### Phase 1: Historical Price Source (DATA-DB-001) **File:** `app/services/backtesting/databento_source.py` ```python from __future__ import annotations from dataclasses import dataclass from datetime import date, timedelta from pathlib import Path from typing import Any import hashlib import json from app.services.backtesting.historical_provider import DailyClosePoint, HistoricalPriceSource try: import databento as db DATABENTO_AVAILABLE = True except ImportError: DATABENTO_AVAILABLE = False @dataclass(frozen=True) class DatabentoCacheKey: """Cache key for Databento data requests.""" dataset: str symbol: str schema: str start_date: date end_date: date def cache_path(self, cache_dir: Path) -> Path: key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] return cache_dir / f"dbn_{key_hash}.parquet" def metadata_path(self, cache_dir: Path) -> Path: key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] return cache_dir / f"dbn_{key_hash}_meta.json" @dataclass class DatabentoSourceConfig: """Configuration for Databento data source.""" api_key: str | None = None # Falls back to DATABENTO_API_KEY env var cache_dir: Path = Path(".cache/databento") dataset: str = "XNAS.BASIC" schema: str = "ohlcv-1d" stype_in: str = "raw_symbol" # Re-download threshold max_cache_age_days: int = 30 class DatabentoHistoricalPriceSource(HistoricalPriceSource): """Databento-based historical price source for backtesting.""" def __init__(self, config: DatabentoSourceConfig | None = None) -> None: if not DATABENTO_AVAILABLE: raise RuntimeError("databento package required: pip install databento") self.config = config or DatabentoSourceConfig() self.config.cache_dir.mkdir(parents=True, exist_ok=True) self._client: db.Historical | None = None @property def client(self) -> db.Historical: if self._client is None: self._client = db.Historical(key=self.config.api_key) return self._client def _load_from_cache(self, key: DatabentoCacheKey) -> list[DailyClosePoint] | None: """Load cached data if available and fresh.""" cache_file = key.cache_path(self.config.cache_dir) meta_file = key.metadata_path(self.config.cache_dir) if not cache_file.exists() or not meta_file.exists(): return None try: with open(meta_file) as f: meta = json.load(f) # Check cache age download_date = date.fromisoformat(meta["download_date"]) age_days = (date.today() - download_date).days if age_days > self.config.max_cache_age_days: return None # Check parameters match if meta["dataset"] != key.dataset or meta["symbol"] != key.symbol: return None # Load parquet and convert import pandas as pd df = pd.read_parquet(cache_file) return self._df_to_daily_points(df) except Exception: return None def _save_to_cache(self, key: DatabentoCacheKey, df: pd.DataFrame) -> None: """Save data to cache.""" cache_file = key.cache_path(self.config.cache_dir) meta_file = key.metadata_path(self.config.cache_dir) df.to_parquet(cache_file, index=False) meta = { "download_date": date.today().isoformat(), "dataset": key.dataset, "symbol": key.symbol, "schema": key.schema, "start_date": key.start_date.isoformat(), "end_date": key.end_date.isoformat(), "rows": len(df), } with open(meta_file, "w") as f: json.dump(meta, f, indent=2) def _fetch_from_databento(self, key: DatabentoCacheKey) -> pd.DataFrame: """Fetch data from Databento API.""" data = self.client.timeseries.get_range( dataset=key.dataset, symbols=key.symbol, schema=key.schema, start=key.start_date.isoformat(), end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end stype_in=self.config.stype_in, ) df = data.to_df() return df def _df_to_daily_points(self, df: pd.DataFrame) -> list[DailyClosePoint]: """Convert DataFrame to DailyClosePoint list.""" points = [] for idx, row in df.iterrows(): # Databento ohlcv schema has ts_event as timestamp ts = row.get("ts_event", row.get("ts_recv", idx)) if hasattr(ts, "date"): row_date = ts.date() else: row_date = date.fromisoformat(str(ts)[:10]) close = float(row["close"]) / 1e9 # Databento prices are int64 x 1e-9 points.append(DailyClosePoint(date=row_date, close=close)) return sorted(points, key=lambda p: p.date) def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]: """Load daily closing prices from Databento (with caching).""" # Map symbols to datasets dataset = self._resolve_dataset(symbol) databento_symbol = self._resolve_symbol(symbol) key = DatabentoCacheKey( dataset=dataset, symbol=databento_symbol, schema=self.config.schema, start_date=start_date, end_date=end_date, ) # Try cache first cached = self._load_from_cache(key) if cached is not None: return cached # Fetch from Databento import pandas as pd df = self._fetch_from_databento(key) # Cache results self._save_to_cache(key, df) return self._df_to_daily_points(df) def _resolve_dataset(self, symbol: str) -> str: """Resolve symbol to Databento dataset.""" symbol_upper = symbol.upper() if symbol_upper in ("GLD", "GLDM", "IAU"): return "XNAS.BASIC" # ETFs on Nasdaq elif symbol_upper in ("GC=F", "GC", "GOLD"): return "GLBX.MDP3" # CME gold futures elif symbol_upper == "XAU": return "XNAS.BASIC" # Treat as GLD proxy else: return self.config.dataset # Use configured default def _resolve_symbol(self, symbol: str) -> str: """Resolve vault-dash symbol to Databento symbol.""" symbol_upper = symbol.upper() if symbol_upper == "XAU": return "GLD" # Proxy XAU via GLD prices elif symbol_upper == "GC=F": return "GC" # Use parent symbol for continuous contracts return symbol_upper def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float: """Estimate cost in USD for a data request.""" dataset = self._resolve_dataset(symbol) databento_symbol = self._resolve_symbol(symbol) try: cost = self.client.metadata.get_cost( dataset=dataset, symbols=databento_symbol, schema=self.config.schema, start=start_date.isoformat(), end=(end_date + timedelta(days=1)).isoformat(), ) return cost except Exception: return 0.0 # Return 0 if cost estimation fails class DatabentoBacktestProvider: """Databento-backed historical provider for synthetic backtesting.""" provider_id = "databento_v1" pricing_mode = "synthetic_bs_mid" def __init__( self, price_source: DatabentoHistoricalPriceSource, implied_volatility: float = 0.16, risk_free_rate: float = 0.045, ) -> None: self.price_source = price_source self.implied_volatility = implied_volatility self.risk_free_rate = risk_free_rate def load_history(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]: return self.price_source.load_daily_closes(symbol, start_date, end_date) # ... rest delegates to SyntheticHistoricalProvider logic ``` ### Phase 2: Backtest Settings Model (DATA-DB-002) **File:** `app/models/backtest_settings.py` ```python from dataclasses import dataclass, field from datetime import date from uuid import UUID from app.models.backtest import ProviderRef @dataclass(frozen=True) class BacktestSettings: """User-configurable backtest settings (independent of portfolio).""" # Scenario identification settings_id: UUID name: str # Data source configuration data_source: str = "databento" # "databento", "yfinance", "synthetic" dataset: str = "XNAS.BASIC" schema: str = "ohlcv-1d" # Date range start_date: date = date(2024, 1, 1) end_date: date = date(2024, 12, 31) # Independent scenario configuration (not derived from portfolio) underlying_symbol: str = "GLD" start_price: float = 0.0 # 0 = auto-derive from first close underlying_units: float = 1000.0 # Independent of portfolio loan_amount: float = 0.0 # Debt position for LTV analysis margin_call_ltv: float = 0.75 # Templates to test template_slugs: tuple[str, ...] = field(default_factory=lambda: ("protective-put-atm-12m",)) # Provider reference provider_ref: ProviderRef = field(default_factory=lambda: ProviderRef( provider_id="databento_v1", pricing_mode="synthetic_bs_mid", )) # Cache metadata cache_key: str = "" # Populated when data is fetched data_cost_usd: float = 0.0 # Cost of last data fetch ``` ### Phase 3: Cache Management (DATA-DB-003) **File:** `app/services/backtesting/databento_cache.py` ```python from __future__ import annotations from dataclasses import dataclass from datetime import date, timedelta from pathlib import Path import hashlib import json from app.services.backtesting.databento_source import DatabentoCacheKey @dataclass class CacheEntry: """Metadata for a cached Databento dataset.""" cache_key: DatabentoCacheKey file_path: Path download_date: date size_bytes: int cost_usd: float class DatabentoCacheManager: """Manages Databento data cache lifecycle.""" def __init__(self, cache_dir: Path = Path(".cache/databento")) -> None: self.cache_dir = cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) def list_entries(self) -> list[CacheEntry]: """List all cached entries.""" entries = [] for meta_file in self.cache_dir.glob("*_meta.json"): with open(meta_file) as f: meta = json.load(f) cache_file = meta_file.with_name(meta_file.stem.replace("_meta", "") + ".parquet") if cache_file.exists(): entries.append(CacheEntry( cache_key=DatabentoCacheKey( dataset=meta["dataset"], symbol=meta["symbol"], schema=meta["schema"], start_date=date.fromisoformat(meta["start_date"]), end_date=date.fromisoformat(meta["end_date"]), ), file_path=cache_file, download_date=date.fromisoformat(meta["download_date"]), size_bytes=cache_file.stat().st_size, cost_usd=0.0, # Would need to track separately )) return entries def invalidate_expired(self, max_age_days: int = 30) -> list[Path]: """Remove cache entries older than max_age_days.""" removed = [] cutoff = date.today() - timedelta(days=max_age_days) for entry in self.list_entries(): if entry.download_date < cutoff: entry.file_path.unlink(missing_ok=True) meta_file = entry.file_path.with_name(entry.file_path.stem + "_meta.json") meta_file.unlink(missing_ok=True) removed.append(entry.file_path) return removed def clear_all(self) -> int: """Clear all cached data.""" count = 0 for file in self.cache_dir.glob("*"): if file.is_file(): file.unlink() count += 1 return count def get_cache_size(self) -> int: """Get total cache size in bytes.""" return sum(f.stat().st_size for f in self.cache_dir.glob("*") if f.is_file()) def should_redownload(self, key: DatabentoCacheKey, params_changed: bool) -> bool: """Determine if data should be re-downloaded.""" cache_file = key.cache_path(self.cache_dir) meta_file = key.metadata_path(self.cache_dir) if params_changed: return True if not cache_file.exists() or not meta_file.exists(): return True try: with open(meta_file) as f: meta = json.load(f) download_date = date.fromisoformat(meta["download_date"]) age_days = (date.today() - download_date).days return age_days > 30 except Exception: return True ``` ### Phase 4: Backtest Page UI Updates (DATA-DB-004) **Key changes to `app/pages/backtests.py`:** 1. Add Databento configuration section 2. Add independent start price/units inputs 3. Show estimated data cost before fetching 4. Cache status indicator ```python # In backtests.py with ui.card().classes("w-full ..."): ui.label("Data Source").classes("text-lg font-semibold") data_source = ui.select( {"databento": "Databento (historical market data)", "yfinance": "Yahoo Finance (free, limited)"}, value="databento", label="Data source", ).classes("w-full") # Databento-specific settings with ui.column().classes("w-full gap-2").bind_visibility_from(data_source, "value", lambda v: v == "databento"): ui.label("Dataset configuration").classes("text-sm text-slate-500") dataset_select = ui.select( {"XNAS.BASIC": "Nasdaq Basic (GLD)", "GLBX.MDP3": "CME Globex (GC=F)"}, value="XNAS.BASIC", label="Dataset", ).classes("w-full") schema_select = ui.select( {"ohlcv-1d": "Daily bars", "ohlcv-1h": "Hourly bars"}, value="ohlcv-1d", label="Resolution", ).classes("w-full") # Cost estimate cost_label = ui.label("Estimated cost: $0.00").classes("text-sm text-slate-500") # Cache status cache_status = ui.label("").classes("text-xs text-slate-400") # Independent scenario settings with ui.card().classes("w-full ..."): ui.label("Scenario Configuration").classes("text-lg font-semibold") ui.label("Configure start values independent of portfolio settings").classes("text-sm text-slate-500") start_price_input = ui.number( "Start price", value=0.0, min=0.0, step=0.01, ).classes("w-full") ui.label("Set to 0 to auto-derive from first historical close").classes("text-xs text-slate-400 -mt-2") underlying_units_input = ui.number( "Underlying units", value=1000.0, min=0.0001, step=0.0001, ).classes("w-full") loan_amount_input = ui.number( "Loan amount ($)", value=0.0, min=0.0, step=1000, ).classes("w-full") ``` ### Phase 5: Scenario Pre-Seeding (DATA-DB-005) **File:** `app/services/backtesting/scenario_bulk_download.py` ```python from __future__ import annotations from dataclasses import dataclass from datetime import date from pathlib import Path import json try: import databento as db DATABENTO_AVAILABLE = True except ImportError: DATABENTO_AVAILABLE = False @dataclass class ScenarioPreset: """Pre-configured scenario ready for backtesting.""" preset_id: str display_name: str symbol: str dataset: str window_start: date window_end: date default_start_price: float # First close in window default_templates: tuple[str, ...] event_type: str tags: tuple[str, ...] description: str def download_historical_presets( client: db.Historical, presets: list[ScenarioPreset], output_dir: Path, ) -> dict[str, Path]: """Bulk download historical data for all presets. Returns mapping of preset_id to cached file path. """ results = {} for preset in presets: cache_key = DatabentoCacheKey( dataset=preset.dataset, symbol=preset.symbol, schema="ohlcv-1d", start_date=preset.window_start, end_date=preset.window_end, ) cache_file = cache_key.cache_path(output_dir) # Download if not cached if not cache_file.exists(): data = client.timeseries.get_range( dataset=preset.dataset, symbols=preset.symbol, schema="ohlcv-1d", start=preset.window_start.isoformat(), end=preset.window_end.isoformat(), ) data.to_parquet(cache_file) results[preset.preset_id] = cache_file return results def create_default_presets() -> list[ScenarioPreset]: """Create default scenario presets for gold hedging research.""" return [ ScenarioPreset( preset_id="gld-2020-covid-crash", display_name="GLD March 2020 COVID Crash", symbol="GLD", dataset="XNAS.BASIC", window_start=date(2020, 2, 15), window_end=date(2020, 4, 15), default_start_price=143.0, # Approx GLD close on 2020-02-15 default_templates=("protective-put-atm-12m", "protective-put-95pct-12m"), event_type="crash", tags=("covid", "crash", "high-vol"), description="March 2020 COVID market crash - extreme volatility event", ), ScenarioPreset( preset_id="gld-2022-rate-hike-cycle", display_name="GLD 2022 Rate Hike Cycle", symbol="GLD", dataset="XNAS.BASIC", window_start=date(2022, 1, 1), window_end=date(2022, 12, 31), default_start_price=168.0, default_templates=("protective-put-atm-12m", "ladder-50-50-atm-95pct-12m"), event_type="rate_cycle", tags=("rates", "fed", "extended"), description="Full year 2022 - aggressive Fed rate hikes", ), ScenarioPreset( preset_id="gcf-2024-rally", display_name="GC=F 2024 Gold Rally", symbol="GC", dataset="GLBX.MDP3", window_start=date(2024, 1, 1), window_end=date(2024, 12, 31), default_start_price=2060.0, default_templates=("protective-put-atm-12m",), event_type="rally", tags=("gold", "futures", "rally"), description="Gold futures rally in 2024", ), ] ``` ### Phase 6: Settings Persistence (DATA-DB-006) **File:** `app/models/backtest_settings_repository.py` ```python from dataclasses import asdict from datetime import date from pathlib import Path from uuid import UUID, uuid4 import json from app.models.backtest_settings import BacktestSettings class BacktestSettingsRepository: """Persistence for backtest settings.""" def __init__(self, base_path: Path | None = None) -> None: self.base_path = base_path or Path(".workspaces") def _settings_path(self, workspace_id: str) -> Path: return self.base_path / workspace_id / "backtest_settings.json" def load(self, workspace_id: str) -> BacktestSettings: """Load backtest settings, creating defaults if not found.""" path = self._settings_path(workspace_id) if path.exists(): with open(path) as f: data = json.load(f) return BacktestSettings( settings_id=UUID(data["settings_id"]), name=data.get("name", "Default Backtest"), data_source=data.get("data_source", "databento"), dataset=data.get("dataset", "XNAS.BASIC"), schema=data.get("schema", "ohlcv-1d"), start_date=date.fromisoformat(data["start_date"]), end_date=date.fromisoformat(data["end_date"]), underlying_symbol=data.get("underlying_symbol", "GLD"), start_price=data.get("start_price", 0.0), underlying_units=data.get("underlying_units", 1000.0), loan_amount=data.get("loan_amount", 0.0), margin_call_ltv=data.get("margin_call_ltv", 0.75), template_slugs=tuple(data.get("template_slugs", ("protective-put-atm-12m",))), cache_key=data.get("cache_key", ""), data_cost_usd=data.get("data_cost_usd", 0.0), ) # Return defaults return BacktestSettings( settings_id=uuid4(), name="Default Backtest", ) def save(self, workspace_id: str, settings: BacktestSettings) -> None: """Persist backtest settings.""" path = self._settings_path(workspace_id) path.parent.mkdir(parents=True, exist_ok=True) data = asdict(settings) data["settings_id"] = str(data["settings_id"]) data["start_date"] = data["start_date"].isoformat() data["end_date"] = data["end_date"].isoformat() data["template_slugs"] = list(data["template_slugs"]) data["provider_ref"] = { "provider_id": settings.provider_ref.provider_id, "pricing_mode": settings.provider_ref.pricing_mode, } with open(path, "w") as f: json.dump(data, f, indent=2) ``` ## Roadmap Items ### DATA-DB-001: Databento Historical Price Source **Dependencies:** None **Estimated effort:** 2-3 days **Deliverables:** - `app/services/backtesting/databento_source.py` - `tests/test_databento_source.py` (mocked API) - Environment variable `DATABENTO_API_KEY` support ### DATA-DB-002: Backtest Settings Model **Dependencies:** None **Estimated effort:** 1 day **Deliverables:** - `app/models/backtest_settings.py` - Repository for persistence ### DATA-DB-003: Cache Management **Dependencies:** DATA-DB-001 **Estimated effort:** 1 day **Deliverables:** - `app/services/backtesting/databento_cache.py` - Cache cleanup CLI command ### DATA-DB-004: Backtest Page UI Updates **Dependencies:** DATA-DB-001, DATA-DB-002 **Estimated effort:** 2 days **Deliverables:** - Updated `app/pages/backtests.py` - Updated `app/pages/event_comparison.py` - Cost estimation display ### DATA-DB-005: Scenario Pre-Seeding **Dependencies:** DATA-DB-001 **Estimated effort:** 1-2 days **Deliverables:** - `app/services/backtesting/scenario_bulk_download.py` - Pre-configured presets for gold hedging research - Bulk download script ### DATA-DB-006: Options Data Source (Future) **Dependencies:** DATA-DB-001 **Estimated effort:** 3-5 days **Deliverables:** - `DatabentoOptionSnapshotSource` implementing `OptionSnapshotSource` - OPRA.PILLAR integration for historical options chains ## Configuration Add to `.env`: ``` DATABENTO_API_KEY=db-xxxxxxxxxxxxxxxxxxxxxxxx ``` Add to `requirements.txt`: ``` databento>=0.30.0 ``` Add to `pyproject.toml`: ```toml [project.optional-dependencies] databento = ["databento>=0.30.0"] ``` ## Testing Strategy 1. **Unit tests** with mocked Databento responses (`tests/test_databento_source.py`) 2. **Integration tests** with recorded VCR cassettes (`tests/cassettes/*.yaml`) 3. **E2E tests** using cached data (`tests/test_backtest_databento_playwright.py`) ## Cost Management - Use `metadata.get_cost()` before fetching to show estimated cost - Default to cached data when available - Batch download for large historical ranges (>1 year) - Consider Databento flat rate plans for heavy usage ## Security Considerations - API key stored in environment variable, never in code - Cache files contain only market data (no PII) - Rate limiting respected (100 requests/second per IP)