Files
vault-dash/docs/DATABENTO_INTEGRATION_PLAN.md

26 KiB

Databento Historical Data Integration Plan

Overview

Integrate Databento historical API for backtesting and scenario comparison pages, replacing yfinance for historical data on these pages. The integration will support configurable start prices/values independent of portfolio settings, with intelligent caching to avoid redundant downloads.

Architecture

Current State

  • Backtest page (app/pages/backtests.py): Uses YFinanceHistoricalPriceSource via BacktestPageService
  • Event comparison (app/pages/event_comparison.py): Uses seeded event presets with yfinance data
  • Historical provider (app/services/backtesting/historical_provider.py): Protocol-based architecture with YFinanceHistoricalPriceSource and SyntheticHistoricalProvider

Target State

  • Add DatabentoHistoricalPriceSource implementing HistoricalPriceSource protocol
  • Add DatabentoHistoricalOptionSource implementing OptionSnapshotSource protocol (future)
  • Smart caching layer: only re-download when parameters change
  • Pre-seeded scenario data via batch downloads

Databento Data Sources

Underlyings and Datasets

Instrument Dataset Symbol Format Notes
GLD ETF XNAS.BASIC or EQUS.PLUS GLD US equities consolidated
GC=F Futures GLBX.MDP3 GC + continuous or GC=F raw Gold futures
Gold Options OPRA.PILLAR GLD underlying Options on GLD ETF

Schemas

Schema Use Case Fields
ohlcv-1d Daily backtesting open, high, low, close, volume
ohlcv-1h Intraday scenarios Hourly bars
trades Tick-level analysis Full trade data
definition Instrument metadata Expiries, strike prices, tick sizes

Implementation Plan

Phase 1: Historical Price Source (DATA-DB-001)

File: app/services/backtesting/databento_source.py

from __future__ import annotations

from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
from typing import Any
import hashlib
import json

from app.services.backtesting.historical_provider import DailyClosePoint, HistoricalPriceSource

try:
    import databento as db
    DATABENTO_AVAILABLE = True
except ImportError:
    DATABENTO_AVAILABLE = False


@dataclass(frozen=True)
class DatabentoCacheKey:
    """Cache key for Databento data requests."""
    dataset: str
    symbol: str
    schema: str
    start_date: date
    end_date: date
    
    def cache_path(self, cache_dir: Path) -> Path:
        key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
        key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
        return cache_dir / f"dbn_{key_hash}.parquet"
    
    def metadata_path(self, cache_dir: Path) -> Path:
        key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
        key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
        return cache_dir / f"dbn_{key_hash}_meta.json"


@dataclass
class DatabentoSourceConfig:
    """Configuration for Databento data source."""
    api_key: str | None = None  # Falls back to DATABENTO_API_KEY env var
    cache_dir: Path = Path(".cache/databento")
    dataset: str = "XNAS.BASIC"
    schema: str = "ohlcv-1d"
    stype_in: str = "raw_symbol"
    
    # Re-download threshold
    max_cache_age_days: int = 30


class DatabentoHistoricalPriceSource(HistoricalPriceSource):
    """Databento-based historical price source for backtesting."""
    
    def __init__(self, config: DatabentoSourceConfig | None = None) -> None:
        if not DATABENTO_AVAILABLE:
            raise RuntimeError("databento package required: pip install databento")
        
        self.config = config or DatabentoSourceConfig()
        self.config.cache_dir.mkdir(parents=True, exist_ok=True)
        self._client: db.Historical | None = None
    
    @property
    def client(self) -> db.Historical:
        if self._client is None:
            self._client = db.Historical(key=self.config.api_key)
        return self._client
    
    def _load_from_cache(self, key: DatabentoCacheKey) -> list[DailyClosePoint] | None:
        """Load cached data if available and fresh."""
        cache_file = key.cache_path(self.config.cache_dir)
        meta_file = key.metadata_path(self.config.cache_dir)
        
        if not cache_file.exists() or not meta_file.exists():
            return None
        
        try:
            with open(meta_file) as f:
                meta = json.load(f)
            
            # Check cache age
            download_date = date.fromisoformat(meta["download_date"])
            age_days = (date.today() - download_date).days
            if age_days > self.config.max_cache_age_days:
                return None
            
            # Check parameters match
            if meta["dataset"] != key.dataset or meta["symbol"] != key.symbol:
                return None
            
            # Load parquet and convert
            import pandas as pd
            df = pd.read_parquet(cache_file)
            return self._df_to_daily_points(df)
        except Exception:
            return None
    
    def _save_to_cache(self, key: DatabentoCacheKey, df: pd.DataFrame) -> None:
        """Save data to cache."""
        cache_file = key.cache_path(self.config.cache_dir)
        meta_file = key.metadata_path(self.config.cache_dir)
        
        df.to_parquet(cache_file, index=False)
        
        meta = {
            "download_date": date.today().isoformat(),
            "dataset": key.dataset,
            "symbol": key.symbol,
            "schema": key.schema,
            "start_date": key.start_date.isoformat(),
            "end_date": key.end_date.isoformat(),
            "rows": len(df),
        }
        with open(meta_file, "w") as f:
            json.dump(meta, f, indent=2)
    
    def _fetch_from_databento(self, key: DatabentoCacheKey) -> pd.DataFrame:
        """Fetch data from Databento API."""
        data = self.client.timeseries.get_range(
            dataset=key.dataset,
            symbols=key.symbol,
            schema=key.schema,
            start=key.start_date.isoformat(),
            end=(key.end_date + timedelta(days=1)).isoformat(),  # Exclusive end
            stype_in=self.config.stype_in,
        )
        df = data.to_df()
        return df
    
    def _df_to_daily_points(self, df: pd.DataFrame) -> list[DailyClosePoint]:
        """Convert DataFrame to DailyClosePoint list."""
        points = []
        for idx, row in df.iterrows():
            # Databento ohlcv schema has ts_event as timestamp
            ts = row.get("ts_event", row.get("ts_recv", idx))
            if hasattr(ts, "date"):
                row_date = ts.date()
            else:
                row_date = date.fromisoformat(str(ts)[:10])
            
            close = float(row["close"]) / 1e9  # Databento prices are int64 x 1e-9
            
            points.append(DailyClosePoint(date=row_date, close=close))
        
        return sorted(points, key=lambda p: p.date)
    
    def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
        """Load daily closing prices from Databento (with caching)."""
        # Map symbols to datasets
        dataset = self._resolve_dataset(symbol)
        databento_symbol = self._resolve_symbol(symbol)
        
        key = DatabentoCacheKey(
            dataset=dataset,
            symbol=databento_symbol,
            schema=self.config.schema,
            start_date=start_date,
            end_date=end_date,
        )
        
        # Try cache first
        cached = self._load_from_cache(key)
        if cached is not None:
            return cached
        
        # Fetch from Databento
        import pandas as pd
        df = self._fetch_from_databento(key)
        
        # Cache results
        self._save_to_cache(key, df)
        
        return self._df_to_daily_points(df)
    
    def _resolve_dataset(self, symbol: str) -> str:
        """Resolve symbol to Databento dataset."""
        symbol_upper = symbol.upper()
        if symbol_upper in ("GLD", "GLDM", "IAU"):
            return "XNAS.BASIC"  # ETFs on Nasdaq
        elif symbol_upper in ("GC=F", "GC", "GOLD"):
            return "GLBX.MDP3"  # CME gold futures
        elif symbol_upper == "XAU":
            return "XNAS.BASIC"  # Treat as GLD proxy
        else:
            return self.config.dataset  # Use configured default
    
    def _resolve_symbol(self, symbol: str) -> str:
        """Resolve vault-dash symbol to Databento symbol."""
        symbol_upper = symbol.upper()
        if symbol_upper == "XAU":
            return "GLD"  # Proxy XAU via GLD prices
        elif symbol_upper == "GC=F":
            return "GC"  # Use parent symbol for continuous contracts
        return symbol_upper
    
    def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float:
        """Estimate cost in USD for a data request."""
        dataset = self._resolve_dataset(symbol)
        databento_symbol = self._resolve_symbol(symbol)
        
        try:
            cost = self.client.metadata.get_cost(
                dataset=dataset,
                symbols=databento_symbol,
                schema=self.config.schema,
                start=start_date.isoformat(),
                end=(end_date + timedelta(days=1)).isoformat(),
            )
            return cost
        except Exception:
            return 0.0  # Return 0 if cost estimation fails


class DatabentoBacktestProvider:
    """Databento-backed historical provider for synthetic backtesting."""
    
    provider_id = "databento_v1"
    pricing_mode = "synthetic_bs_mid"
    
    def __init__(
        self,
        price_source: DatabentoHistoricalPriceSource,
        implied_volatility: float = 0.16,
        risk_free_rate: float = 0.045,
    ) -> None:
        self.price_source = price_source
        self.implied_volatility = implied_volatility
        self.risk_free_rate = risk_free_rate
    
    def load_history(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
        return self.price_source.load_daily_closes(symbol, start_date, end_date)
    
    # ... rest delegates to SyntheticHistoricalProvider logic

Phase 2: Backtest Settings Model (DATA-DB-002)

File: app/models/backtest_settings.py

from dataclasses import dataclass, field
from datetime import date
from uuid import UUID

from app.models.backtest import ProviderRef


@dataclass(frozen=True)
class BacktestSettings:
    """User-configurable backtest settings (independent of portfolio)."""
    
    # Scenario identification
    settings_id: UUID
    name: str
    
    # Data source configuration
    data_source: str = "databento"  # "databento", "yfinance", "synthetic"
    dataset: str = "XNAS.BASIC"
    schema: str = "ohlcv-1d"
    
    # Date range
    start_date: date = date(2024, 1, 1)
    end_date: date = date(2024, 12, 31)
    
    # Independent scenario configuration (not derived from portfolio)
    underlying_symbol: str = "GLD"
    start_price: float = 0.0  # 0 = auto-derive from first close
    underlying_units: float = 1000.0  # Independent of portfolio
    loan_amount: float = 0.0  # Debt position for LTV analysis
    margin_call_ltv: float = 0.75
    
    # Templates to test
    template_slugs: tuple[str, ...] = field(default_factory=lambda: ("protective-put-atm-12m",))
    
    # Provider reference
    provider_ref: ProviderRef = field(default_factory=lambda: ProviderRef(
        provider_id="databento_v1",
        pricing_mode="synthetic_bs_mid",
    ))
    
    # Cache metadata
    cache_key: str = ""  # Populated when data is fetched
    data_cost_usd: float = 0.0  # Cost of last data fetch

Phase 3: Cache Management (DATA-DB-003)

File: app/services/backtesting/databento_cache.py

from __future__ import annotations

from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
import hashlib
import json

from app.services.backtesting.databento_source import DatabentoCacheKey


@dataclass
class CacheEntry:
    """Metadata for a cached Databento dataset."""
    cache_key: DatabentoCacheKey
    file_path: Path
    download_date: date
    size_bytes: int
    cost_usd: float


class DatabentoCacheManager:
    """Manages Databento data cache lifecycle."""
    
    def __init__(self, cache_dir: Path = Path(".cache/databento")) -> None:
        self.cache_dir = cache_dir
        self.cache_dir.mkdir(parents=True, exist_ok=True)
    
    def list_entries(self) -> list[CacheEntry]:
        """List all cached entries."""
        entries = []
        for meta_file in self.cache_dir.glob("*_meta.json"):
            with open(meta_file) as f:
                meta = json.load(f)
            
            cache_file = meta_file.with_name(meta_file.stem.replace("_meta", "") + ".parquet")
            if cache_file.exists():
                entries.append(CacheEntry(
                    cache_key=DatabentoCacheKey(
                        dataset=meta["dataset"],
                        symbol=meta["symbol"],
                        schema=meta["schema"],
                        start_date=date.fromisoformat(meta["start_date"]),
                        end_date=date.fromisoformat(meta["end_date"]),
                    ),
                    file_path=cache_file,
                    download_date=date.fromisoformat(meta["download_date"]),
                    size_bytes=cache_file.stat().st_size,
                    cost_usd=0.0,  # Would need to track separately
                ))
        return entries
    
    def invalidate_expired(self, max_age_days: int = 30) -> list[Path]:
        """Remove cache entries older than max_age_days."""
        removed = []
        cutoff = date.today() - timedelta(days=max_age_days)
        
        for entry in self.list_entries():
            if entry.download_date < cutoff:
                entry.file_path.unlink(missing_ok=True)
                meta_file = entry.file_path.with_name(entry.file_path.stem + "_meta.json")
                meta_file.unlink(missing_ok=True)
                removed.append(entry.file_path)
        
        return removed
    
    def clear_all(self) -> int:
        """Clear all cached data."""
        count = 0
        for file in self.cache_dir.glob("*"):
            if file.is_file():
                file.unlink()
                count += 1
        return count
    
    def get_cache_size(self) -> int:
        """Get total cache size in bytes."""
        return sum(f.stat().st_size for f in self.cache_dir.glob("*") if f.is_file())
    
    def should_redownload(self, key: DatabentoCacheKey, params_changed: bool) -> bool:
        """Determine if data should be re-downloaded."""
        cache_file = key.cache_path(self.cache_dir)
        meta_file = key.metadata_path(self.cache_dir)
        
        if params_changed:
            return True
        
        if not cache_file.exists() or not meta_file.exists():
            return True
        
        try:
            with open(meta_file) as f:
                meta = json.load(f)
            download_date = date.fromisoformat(meta["download_date"])
            age_days = (date.today() - download_date).days
            return age_days > 30
        except Exception:
            return True

Phase 4: Backtest Page UI Updates (DATA-DB-004)

Key changes to app/pages/backtests.py:

  1. Add Databento configuration section
  2. Add independent start price/units inputs
  3. Show estimated data cost before fetching
  4. Cache status indicator
# In backtests.py

with ui.card().classes("w-full ..."):
    ui.label("Data Source").classes("text-lg font-semibold")
    
    data_source = ui.select(
        {"databento": "Databento (historical market data)", "yfinance": "Yahoo Finance (free, limited)"},
        value="databento",
        label="Data source",
    ).classes("w-full")
    
    # Databento-specific settings
    with ui.column().classes("w-full gap-2").bind_visibility_from(data_source, "value", lambda v: v == "databento"):
        ui.label("Dataset configuration").classes("text-sm text-slate-500")
        
        dataset_select = ui.select(
            {"XNAS.BASIC": "Nasdaq Basic (GLD)", "GLBX.MDP3": "CME Globex (GC=F)"},
            value="XNAS.BASIC",
            label="Dataset",
        ).classes("w-full")
        
        schema_select = ui.select(
            {"ohlcv-1d": "Daily bars", "ohlcv-1h": "Hourly bars"},
            value="ohlcv-1d",
            label="Resolution",
        ).classes("w-full")
        
        # Cost estimate
        cost_label = ui.label("Estimated cost: $0.00").classes("text-sm text-slate-500")
        
        # Cache status
        cache_status = ui.label("").classes("text-xs text-slate-400")
    
    # Independent scenario settings
    with ui.card().classes("w-full ..."):
        ui.label("Scenario Configuration").classes("text-lg font-semibold")
        ui.label("Configure start values independent of portfolio settings").classes("text-sm text-slate-500")
        
        start_price_input = ui.number(
            "Start price",
            value=0.0,
            min=0.0,
            step=0.01,
        ).classes("w-full")
        ui.label("Set to 0 to auto-derive from first historical close").classes("text-xs text-slate-400 -mt-2")
        
        underlying_units_input = ui.number(
            "Underlying units",
            value=1000.0,
            min=0.0001,
            step=0.0001,
        ).classes("w-full")
        
        loan_amount_input = ui.number(
            "Loan amount ($)",
            value=0.0,
            min=0.0,
            step=1000,
        ).classes("w-full")

Phase 5: Scenario Pre-Seeding (DATA-DB-005)

File: app/services/backtesting/scenario_bulk_download.py

from __future__ import annotations

from dataclasses import dataclass
from datetime import date
from pathlib import Path
import json

try:
    import databento as db
    DATABENTO_AVAILABLE = True
except ImportError:
    DATABENTO_AVAILABLE = False


@dataclass
class ScenarioPreset:
    """Pre-configured scenario ready for backtesting."""
    preset_id: str
    display_name: str
    symbol: str
    dataset: str
    window_start: date
    window_end: date
    default_start_price: float  # First close in window
    default_templates: tuple[str, ...]
    event_type: str
    tags: tuple[str, ...]
    description: str


def download_historical_presets(
    client: db.Historical,
    presets: list[ScenarioPreset],
    output_dir: Path,
) -> dict[str, Path]:
    """Bulk download historical data for all presets.
    
    Returns mapping of preset_id to cached file path.
    """
    results = {}
    
    for preset in presets:
        cache_key = DatabentoCacheKey(
            dataset=preset.dataset,
            symbol=preset.symbol,
            schema="ohlcv-1d",
            start_date=preset.window_start,
            end_date=preset.window_end,
        )
        
        cache_file = cache_key.cache_path(output_dir)
        
        # Download if not cached
        if not cache_file.exists():
            data = client.timeseries.get_range(
                dataset=preset.dataset,
                symbols=preset.symbol,
                schema="ohlcv-1d",
                start=preset.window_start.isoformat(),
                end=preset.window_end.isoformat(),
            )
            data.to_parquet(cache_file)
        
        results[preset.preset_id] = cache_file
    
    return results


def create_default_presets() -> list[ScenarioPreset]:
    """Create default scenario presets for gold hedging research."""
    return [
        ScenarioPreset(
            preset_id="gld-2020-covid-crash",
            display_name="GLD March 2020 COVID Crash",
            symbol="GLD",
            dataset="XNAS.BASIC",
            window_start=date(2020, 2, 15),
            window_end=date(2020, 4, 15),
            default_start_price=143.0,  # Approx GLD close on 2020-02-15
            default_templates=("protective-put-atm-12m", "protective-put-95pct-12m"),
            event_type="crash",
            tags=("covid", "crash", "high-vol"),
            description="March 2020 COVID market crash - extreme volatility event",
        ),
        ScenarioPreset(
            preset_id="gld-2022-rate-hike-cycle",
            display_name="GLD 2022 Rate Hike Cycle",
            symbol="GLD",
            dataset="XNAS.BASIC",
            window_start=date(2022, 1, 1),
            window_end=date(2022, 12, 31),
            default_start_price=168.0,
            default_templates=("protective-put-atm-12m", "ladder-50-50-atm-95pct-12m"),
            event_type="rate_cycle",
            tags=("rates", "fed", "extended"),
            description="Full year 2022 - aggressive Fed rate hikes",
        ),
        ScenarioPreset(
            preset_id="gcf-2024-rally",
            display_name="GC=F 2024 Gold Rally",
            symbol="GC",
            dataset="GLBX.MDP3",
            window_start=date(2024, 1, 1),
            window_end=date(2024, 12, 31),
            default_start_price=2060.0,
            default_templates=("protective-put-atm-12m",),
            event_type="rally",
            tags=("gold", "futures", "rally"),
            description="Gold futures rally in 2024",
        ),
    ]

Phase 6: Settings Persistence (DATA-DB-006)

File: app/models/backtest_settings_repository.py

from dataclasses import asdict
from datetime import date
from pathlib import Path
from uuid import UUID, uuid4
import json

from app.models.backtest_settings import BacktestSettings


class BacktestSettingsRepository:
    """Persistence for backtest settings."""
    
    def __init__(self, base_path: Path | None = None) -> None:
        self.base_path = base_path or Path(".workspaces")
    
    def _settings_path(self, workspace_id: str) -> Path:
        return self.base_path / workspace_id / "backtest_settings.json"
    
    def load(self, workspace_id: str) -> BacktestSettings:
        """Load backtest settings, creating defaults if not found."""
        path = self._settings_path(workspace_id)
        
        if path.exists():
            with open(path) as f:
                data = json.load(f)
            return BacktestSettings(
                settings_id=UUID(data["settings_id"]),
                name=data.get("name", "Default Backtest"),
                data_source=data.get("data_source", "databento"),
                dataset=data.get("dataset", "XNAS.BASIC"),
                schema=data.get("schema", "ohlcv-1d"),
                start_date=date.fromisoformat(data["start_date"]),
                end_date=date.fromisoformat(data["end_date"]),
                underlying_symbol=data.get("underlying_symbol", "GLD"),
                start_price=data.get("start_price", 0.0),
                underlying_units=data.get("underlying_units", 1000.0),
                loan_amount=data.get("loan_amount", 0.0),
                margin_call_ltv=data.get("margin_call_ltv", 0.75),
                template_slugs=tuple(data.get("template_slugs", ("protective-put-atm-12m",))),
                cache_key=data.get("cache_key", ""),
                data_cost_usd=data.get("data_cost_usd", 0.0),
            )
        
        # Return defaults
        return BacktestSettings(
            settings_id=uuid4(),
            name="Default Backtest",
        )
    
    def save(self, workspace_id: str, settings: BacktestSettings) -> None:
        """Persist backtest settings."""
        path = self._settings_path(workspace_id)
        path.parent.mkdir(parents=True, exist_ok=True)
        
        data = asdict(settings)
        data["settings_id"] = str(data["settings_id"])
        data["start_date"] = data["start_date"].isoformat()
        data["end_date"] = data["end_date"].isoformat()
        data["template_slugs"] = list(data["template_slugs"])
        data["provider_ref"] = {
            "provider_id": settings.provider_ref.provider_id,
            "pricing_mode": settings.provider_ref.pricing_mode,
        }
        
        with open(path, "w") as f:
            json.dump(data, f, indent=2)

Roadmap Items

DATA-DB-001: Databento Historical Price Source

Dependencies: None
Estimated effort: 2-3 days
Deliverables:

  • app/services/backtesting/databento_source.py
  • tests/test_databento_source.py (mocked API)
  • Environment variable DATABENTO_API_KEY support

DATA-DB-002: Backtest Settings Model

Dependencies: None
Estimated effort: 1 day
Deliverables:

  • app/models/backtest_settings.py
  • Repository for persistence

DATA-DB-003: Cache Management

Dependencies: DATA-DB-001
Estimated effort: 1 day
Deliverables:

  • app/services/backtesting/databento_cache.py
  • Cache cleanup CLI command

DATA-DB-004: Backtest Page UI Updates

Dependencies: DATA-DB-001, DATA-DB-002
Estimated effort: 2 days
Deliverables:

  • Updated app/pages/backtests.py
  • Updated app/pages/event_comparison.py
  • Cost estimation display

DATA-DB-005: Scenario Pre-Seeding

Dependencies: DATA-DB-001
Estimated effort: 1-2 days
Deliverables:

  • app/services/backtesting/scenario_bulk_download.py
  • Pre-configured presets for gold hedging research
  • Bulk download script

DATA-DB-006: Options Data Source (Future)

Dependencies: DATA-DB-001
Estimated effort: 3-5 days
Deliverables:

  • DatabentoOptionSnapshotSource implementing OptionSnapshotSource
  • OPRA.PILLAR integration for historical options chains

Configuration

Add to .env:

DATABENTO_API_KEY=db-xxxxxxxxxxxxxxxxxxxxxxxx

Add to requirements.txt:

databento>=0.30.0

Add to pyproject.toml:

[project.optional-dependencies]
databento = ["databento>=0.30.0"]

Testing Strategy

  1. Unit tests with mocked Databento responses (tests/test_databento_source.py)
  2. Integration tests with recorded VCR cassettes (tests/cassettes/*.yaml)
  3. E2E tests using cached data (tests/test_backtest_databento_playwright.py)

Cost Management

  • Use metadata.get_cost() before fetching to show estimated cost
  • Default to cached data when available
  • Batch download for large historical ranges (>1 year)
  • Consider Databento flat rate plans for heavy usage

Security Considerations

  • API key stored in environment variable, never in code
  • Cache files contain only market data (no PII)
  • Rate limiting respected (100 requests/second per IP)