Files
vault-dash/docs/DATABENTO_INTEGRATION_PLAN.md

780 lines
26 KiB
Markdown

# Databento Historical Data Integration Plan
## Overview
Integrate Databento historical API for backtesting and scenario comparison pages, replacing yfinance for historical data on these pages. The integration will support configurable start prices/values independent of portfolio settings, with intelligent caching to avoid redundant downloads.
## Architecture
### Current State
- **Backtest page** (`app/pages/backtests.py`): Uses `YFinanceHistoricalPriceSource` via `BacktestPageService`
- **Event comparison** (`app/pages/event_comparison.py`): Uses seeded event presets with yfinance data
- **Historical provider** (`app/services/backtesting/historical_provider.py`): Protocol-based architecture with `YFinanceHistoricalPriceSource` and `SyntheticHistoricalProvider`
### Target State
- Add `DatabentoHistoricalPriceSource` implementing `HistoricalPriceSource` protocol
- Add `DatabentoHistoricalOptionSource` implementing `OptionSnapshotSource` protocol (future)
- Smart caching layer: only re-download when parameters change
- Pre-seeded scenario data via batch downloads
## Databento Data Sources
### Underlyings and Datasets
| Instrument | Dataset | Symbol Format | Notes |
|------------|---------|----------------|-------|
| GLD ETF | `XNAS.BASIC` or `EQUS.PLUS` | `GLD` | US equities consolidated |
| GC=F Futures | `GLBX.MDP3` | `GC` + continuous or `GC=F` raw | Gold futures |
| Gold Options | `OPRA.PILLAR` | `GLD` underlying | Options on GLD ETF |
### Schemas
| Schema | Use Case | Fields |
|--------|----------|--------|
| `ohlcv-1d` | Daily backtesting | open, high, low, close, volume |
| `ohlcv-1h` | Intraday scenarios | Hourly bars |
| `trades` | Tick-level analysis | Full trade data |
| `definition` | Instrument metadata | Expiries, strike prices, tick sizes |
## Implementation Plan
### Phase 1: Historical Price Source (DATA-DB-001)
**File:** `app/services/backtesting/databento_source.py`
```python
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
from typing import Any
import hashlib
import json
from app.services.backtesting.historical_provider import DailyClosePoint, HistoricalPriceSource
try:
import databento as db
DATABENTO_AVAILABLE = True
except ImportError:
DATABENTO_AVAILABLE = False
@dataclass(frozen=True)
class DatabentoCacheKey:
"""Cache key for Databento data requests."""
dataset: str
symbol: str
schema: str
start_date: date
end_date: date
def cache_path(self, cache_dir: Path) -> Path:
key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
return cache_dir / f"dbn_{key_hash}.parquet"
def metadata_path(self, cache_dir: Path) -> Path:
key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
return cache_dir / f"dbn_{key_hash}_meta.json"
@dataclass
class DatabentoSourceConfig:
"""Configuration for Databento data source."""
api_key: str | None = None # Falls back to DATABENTO_API_KEY env var
cache_dir: Path = Path(".cache/databento")
dataset: str = "XNAS.BASIC"
schema: str = "ohlcv-1d"
stype_in: str = "raw_symbol"
# Re-download threshold
max_cache_age_days: int = 30
class DatabentoHistoricalPriceSource(HistoricalPriceSource):
"""Databento-based historical price source for backtesting."""
def __init__(self, config: DatabentoSourceConfig | None = None) -> None:
if not DATABENTO_AVAILABLE:
raise RuntimeError("databento package required: pip install databento")
self.config = config or DatabentoSourceConfig()
self.config.cache_dir.mkdir(parents=True, exist_ok=True)
self._client: db.Historical | None = None
@property
def client(self) -> db.Historical:
if self._client is None:
self._client = db.Historical(key=self.config.api_key)
return self._client
def _load_from_cache(self, key: DatabentoCacheKey) -> list[DailyClosePoint] | None:
"""Load cached data if available and fresh."""
cache_file = key.cache_path(self.config.cache_dir)
meta_file = key.metadata_path(self.config.cache_dir)
if not cache_file.exists() or not meta_file.exists():
return None
try:
with open(meta_file) as f:
meta = json.load(f)
# Check cache age
download_date = date.fromisoformat(meta["download_date"])
age_days = (date.today() - download_date).days
if age_days > self.config.max_cache_age_days:
return None
# Check parameters match
if meta["dataset"] != key.dataset or meta["symbol"] != key.symbol:
return None
# Load parquet and convert
import pandas as pd
df = pd.read_parquet(cache_file)
return self._df_to_daily_points(df)
except Exception:
return None
def _save_to_cache(self, key: DatabentoCacheKey, df: pd.DataFrame) -> None:
"""Save data to cache."""
cache_file = key.cache_path(self.config.cache_dir)
meta_file = key.metadata_path(self.config.cache_dir)
df.to_parquet(cache_file, index=False)
meta = {
"download_date": date.today().isoformat(),
"dataset": key.dataset,
"symbol": key.symbol,
"schema": key.schema,
"start_date": key.start_date.isoformat(),
"end_date": key.end_date.isoformat(),
"rows": len(df),
}
with open(meta_file, "w") as f:
json.dump(meta, f, indent=2)
def _fetch_from_databento(self, key: DatabentoCacheKey) -> pd.DataFrame:
"""Fetch data from Databento API."""
data = self.client.timeseries.get_range(
dataset=key.dataset,
symbols=key.symbol,
schema=key.schema,
start=key.start_date.isoformat(),
end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end
stype_in=self.config.stype_in,
)
df = data.to_df()
return df
def _df_to_daily_points(self, df: pd.DataFrame) -> list[DailyClosePoint]:
"""Convert DataFrame to DailyClosePoint list."""
points = []
for idx, row in df.iterrows():
# Databento ohlcv schema has ts_event as timestamp
ts = row.get("ts_event", row.get("ts_recv", idx))
if hasattr(ts, "date"):
row_date = ts.date()
else:
row_date = date.fromisoformat(str(ts)[:10])
close = float(row["close"]) / 1e9 # Databento prices are int64 x 1e-9
points.append(DailyClosePoint(date=row_date, close=close))
return sorted(points, key=lambda p: p.date)
def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
"""Load daily closing prices from Databento (with caching)."""
# Map symbols to datasets
dataset = self._resolve_dataset(symbol)
databento_symbol = self._resolve_symbol(symbol)
key = DatabentoCacheKey(
dataset=dataset,
symbol=databento_symbol,
schema=self.config.schema,
start_date=start_date,
end_date=end_date,
)
# Try cache first
cached = self._load_from_cache(key)
if cached is not None:
return cached
# Fetch from Databento
import pandas as pd
df = self._fetch_from_databento(key)
# Cache results
self._save_to_cache(key, df)
return self._df_to_daily_points(df)
def _resolve_dataset(self, symbol: str) -> str:
"""Resolve symbol to Databento dataset."""
symbol_upper = symbol.upper()
if symbol_upper in ("GLD", "GLDM", "IAU"):
return "XNAS.BASIC" # ETFs on Nasdaq
elif symbol_upper in ("GC=F", "GC", "GOLD"):
return "GLBX.MDP3" # CME gold futures
elif symbol_upper == "XAU":
return "XNAS.BASIC" # Treat as GLD proxy
else:
return self.config.dataset # Use configured default
def _resolve_symbol(self, symbol: str) -> str:
"""Resolve vault-dash symbol to Databento symbol."""
symbol_upper = symbol.upper()
if symbol_upper == "XAU":
return "GLD" # Proxy XAU via GLD prices
elif symbol_upper == "GC=F":
return "GC" # Use parent symbol for continuous contracts
return symbol_upper
def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float:
"""Estimate cost in USD for a data request."""
dataset = self._resolve_dataset(symbol)
databento_symbol = self._resolve_symbol(symbol)
try:
cost = self.client.metadata.get_cost(
dataset=dataset,
symbols=databento_symbol,
schema=self.config.schema,
start=start_date.isoformat(),
end=(end_date + timedelta(days=1)).isoformat(),
)
return cost
except Exception:
return 0.0 # Return 0 if cost estimation fails
class DatabentoBacktestProvider:
"""Databento-backed historical provider for synthetic backtesting."""
provider_id = "databento_v1"
pricing_mode = "synthetic_bs_mid"
def __init__(
self,
price_source: DatabentoHistoricalPriceSource,
implied_volatility: float = 0.16,
risk_free_rate: float = 0.045,
) -> None:
self.price_source = price_source
self.implied_volatility = implied_volatility
self.risk_free_rate = risk_free_rate
def load_history(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
return self.price_source.load_daily_closes(symbol, start_date, end_date)
# ... rest delegates to SyntheticHistoricalProvider logic
```
### Phase 2: Backtest Settings Model (DATA-DB-002)
**File:** `app/models/backtest_settings.py`
```python
from dataclasses import dataclass, field
from datetime import date
from uuid import UUID
from app.models.backtest import ProviderRef
@dataclass(frozen=True)
class BacktestSettings:
"""User-configurable backtest settings (independent of portfolio)."""
# Scenario identification
settings_id: UUID
name: str
# Data source configuration
data_source: str = "databento" # "databento", "yfinance", "synthetic"
dataset: str = "XNAS.BASIC"
schema: str = "ohlcv-1d"
# Date range
start_date: date = date(2024, 1, 1)
end_date: date = date(2024, 12, 31)
# Independent scenario configuration (not derived from portfolio)
underlying_symbol: str = "GLD"
start_price: float = 0.0 # 0 = auto-derive from first close
underlying_units: float = 1000.0 # Independent of portfolio
loan_amount: float = 0.0 # Debt position for LTV analysis
margin_call_ltv: float = 0.75
# Templates to test
template_slugs: tuple[str, ...] = field(default_factory=lambda: ("protective-put-atm-12m",))
# Provider reference
provider_ref: ProviderRef = field(default_factory=lambda: ProviderRef(
provider_id="databento_v1",
pricing_mode="synthetic_bs_mid",
))
# Cache metadata
cache_key: str = "" # Populated when data is fetched
data_cost_usd: float = 0.0 # Cost of last data fetch
```
### Phase 3: Cache Management (DATA-DB-003)
**File:** `app/services/backtesting/databento_cache.py`
```python
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
import hashlib
import json
from app.services.backtesting.databento_source import DatabentoCacheKey
@dataclass
class CacheEntry:
"""Metadata for a cached Databento dataset."""
cache_key: DatabentoCacheKey
file_path: Path
download_date: date
size_bytes: int
cost_usd: float
class DatabentoCacheManager:
"""Manages Databento data cache lifecycle."""
def __init__(self, cache_dir: Path = Path(".cache/databento")) -> None:
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
def list_entries(self) -> list[CacheEntry]:
"""List all cached entries."""
entries = []
for meta_file in self.cache_dir.glob("*_meta.json"):
with open(meta_file) as f:
meta = json.load(f)
cache_file = meta_file.with_name(meta_file.stem.replace("_meta", "") + ".parquet")
if cache_file.exists():
entries.append(CacheEntry(
cache_key=DatabentoCacheKey(
dataset=meta["dataset"],
symbol=meta["symbol"],
schema=meta["schema"],
start_date=date.fromisoformat(meta["start_date"]),
end_date=date.fromisoformat(meta["end_date"]),
),
file_path=cache_file,
download_date=date.fromisoformat(meta["download_date"]),
size_bytes=cache_file.stat().st_size,
cost_usd=0.0, # Would need to track separately
))
return entries
def invalidate_expired(self, max_age_days: int = 30) -> list[Path]:
"""Remove cache entries older than max_age_days."""
removed = []
cutoff = date.today() - timedelta(days=max_age_days)
for entry in self.list_entries():
if entry.download_date < cutoff:
entry.file_path.unlink(missing_ok=True)
meta_file = entry.file_path.with_name(entry.file_path.stem + "_meta.json")
meta_file.unlink(missing_ok=True)
removed.append(entry.file_path)
return removed
def clear_all(self) -> int:
"""Clear all cached data."""
count = 0
for file in self.cache_dir.glob("*"):
if file.is_file():
file.unlink()
count += 1
return count
def get_cache_size(self) -> int:
"""Get total cache size in bytes."""
return sum(f.stat().st_size for f in self.cache_dir.glob("*") if f.is_file())
def should_redownload(self, key: DatabentoCacheKey, params_changed: bool) -> bool:
"""Determine if data should be re-downloaded."""
cache_file = key.cache_path(self.cache_dir)
meta_file = key.metadata_path(self.cache_dir)
if params_changed:
return True
if not cache_file.exists() or not meta_file.exists():
return True
try:
with open(meta_file) as f:
meta = json.load(f)
download_date = date.fromisoformat(meta["download_date"])
age_days = (date.today() - download_date).days
return age_days > 30
except Exception:
return True
```
### Phase 4: Backtest Page UI Updates (DATA-DB-004)
**Key changes to `app/pages/backtests.py`:**
1. Add Databento configuration section
2. Add independent start price/units inputs
3. Show estimated data cost before fetching
4. Cache status indicator
```python
# In backtests.py
with ui.card().classes("w-full ..."):
ui.label("Data Source").classes("text-lg font-semibold")
data_source = ui.select(
{"databento": "Databento (historical market data)", "yfinance": "Yahoo Finance (free, limited)"},
value="databento",
label="Data source",
).classes("w-full")
# Databento-specific settings
with ui.column().classes("w-full gap-2").bind_visibility_from(data_source, "value", lambda v: v == "databento"):
ui.label("Dataset configuration").classes("text-sm text-slate-500")
dataset_select = ui.select(
{"XNAS.BASIC": "Nasdaq Basic (GLD)", "GLBX.MDP3": "CME Globex (GC=F)"},
value="XNAS.BASIC",
label="Dataset",
).classes("w-full")
schema_select = ui.select(
{"ohlcv-1d": "Daily bars", "ohlcv-1h": "Hourly bars"},
value="ohlcv-1d",
label="Resolution",
).classes("w-full")
# Cost estimate
cost_label = ui.label("Estimated cost: $0.00").classes("text-sm text-slate-500")
# Cache status
cache_status = ui.label("").classes("text-xs text-slate-400")
# Independent scenario settings
with ui.card().classes("w-full ..."):
ui.label("Scenario Configuration").classes("text-lg font-semibold")
ui.label("Configure start values independent of portfolio settings").classes("text-sm text-slate-500")
start_price_input = ui.number(
"Start price",
value=0.0,
min=0.0,
step=0.01,
).classes("w-full")
ui.label("Set to 0 to auto-derive from first historical close").classes("text-xs text-slate-400 -mt-2")
underlying_units_input = ui.number(
"Underlying units",
value=1000.0,
min=0.0001,
step=0.0001,
).classes("w-full")
loan_amount_input = ui.number(
"Loan amount ($)",
value=0.0,
min=0.0,
step=1000,
).classes("w-full")
```
### Phase 5: Scenario Pre-Seeding (DATA-DB-005)
**File:** `app/services/backtesting/scenario_bulk_download.py`
```python
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from pathlib import Path
import json
try:
import databento as db
DATABENTO_AVAILABLE = True
except ImportError:
DATABENTO_AVAILABLE = False
@dataclass
class ScenarioPreset:
"""Pre-configured scenario ready for backtesting."""
preset_id: str
display_name: str
symbol: str
dataset: str
window_start: date
window_end: date
default_start_price: float # First close in window
default_templates: tuple[str, ...]
event_type: str
tags: tuple[str, ...]
description: str
def download_historical_presets(
client: db.Historical,
presets: list[ScenarioPreset],
output_dir: Path,
) -> dict[str, Path]:
"""Bulk download historical data for all presets.
Returns mapping of preset_id to cached file path.
"""
results = {}
for preset in presets:
cache_key = DatabentoCacheKey(
dataset=preset.dataset,
symbol=preset.symbol,
schema="ohlcv-1d",
start_date=preset.window_start,
end_date=preset.window_end,
)
cache_file = cache_key.cache_path(output_dir)
# Download if not cached
if not cache_file.exists():
data = client.timeseries.get_range(
dataset=preset.dataset,
symbols=preset.symbol,
schema="ohlcv-1d",
start=preset.window_start.isoformat(),
end=preset.window_end.isoformat(),
)
data.to_parquet(cache_file)
results[preset.preset_id] = cache_file
return results
def create_default_presets() -> list[ScenarioPreset]:
"""Create default scenario presets for gold hedging research."""
return [
ScenarioPreset(
preset_id="gld-2020-covid-crash",
display_name="GLD March 2020 COVID Crash",
symbol="GLD",
dataset="XNAS.BASIC",
window_start=date(2020, 2, 15),
window_end=date(2020, 4, 15),
default_start_price=143.0, # Approx GLD close on 2020-02-15
default_templates=("protective-put-atm-12m", "protective-put-95pct-12m"),
event_type="crash",
tags=("covid", "crash", "high-vol"),
description="March 2020 COVID market crash - extreme volatility event",
),
ScenarioPreset(
preset_id="gld-2022-rate-hike-cycle",
display_name="GLD 2022 Rate Hike Cycle",
symbol="GLD",
dataset="XNAS.BASIC",
window_start=date(2022, 1, 1),
window_end=date(2022, 12, 31),
default_start_price=168.0,
default_templates=("protective-put-atm-12m", "ladder-50-50-atm-95pct-12m"),
event_type="rate_cycle",
tags=("rates", "fed", "extended"),
description="Full year 2022 - aggressive Fed rate hikes",
),
ScenarioPreset(
preset_id="gcf-2024-rally",
display_name="GC=F 2024 Gold Rally",
symbol="GC",
dataset="GLBX.MDP3",
window_start=date(2024, 1, 1),
window_end=date(2024, 12, 31),
default_start_price=2060.0,
default_templates=("protective-put-atm-12m",),
event_type="rally",
tags=("gold", "futures", "rally"),
description="Gold futures rally in 2024",
),
]
```
### Phase 6: Settings Persistence (DATA-DB-006)
**File:** `app/models/backtest_settings_repository.py`
```python
from dataclasses import asdict
from datetime import date
from pathlib import Path
from uuid import UUID, uuid4
import json
from app.models.backtest_settings import BacktestSettings
class BacktestSettingsRepository:
"""Persistence for backtest settings."""
def __init__(self, base_path: Path | None = None) -> None:
self.base_path = base_path or Path(".workspaces")
def _settings_path(self, workspace_id: str) -> Path:
return self.base_path / workspace_id / "backtest_settings.json"
def load(self, workspace_id: str) -> BacktestSettings:
"""Load backtest settings, creating defaults if not found."""
path = self._settings_path(workspace_id)
if path.exists():
with open(path) as f:
data = json.load(f)
return BacktestSettings(
settings_id=UUID(data["settings_id"]),
name=data.get("name", "Default Backtest"),
data_source=data.get("data_source", "databento"),
dataset=data.get("dataset", "XNAS.BASIC"),
schema=data.get("schema", "ohlcv-1d"),
start_date=date.fromisoformat(data["start_date"]),
end_date=date.fromisoformat(data["end_date"]),
underlying_symbol=data.get("underlying_symbol", "GLD"),
start_price=data.get("start_price", 0.0),
underlying_units=data.get("underlying_units", 1000.0),
loan_amount=data.get("loan_amount", 0.0),
margin_call_ltv=data.get("margin_call_ltv", 0.75),
template_slugs=tuple(data.get("template_slugs", ("protective-put-atm-12m",))),
cache_key=data.get("cache_key", ""),
data_cost_usd=data.get("data_cost_usd", 0.0),
)
# Return defaults
return BacktestSettings(
settings_id=uuid4(),
name="Default Backtest",
)
def save(self, workspace_id: str, settings: BacktestSettings) -> None:
"""Persist backtest settings."""
path = self._settings_path(workspace_id)
path.parent.mkdir(parents=True, exist_ok=True)
data = asdict(settings)
data["settings_id"] = str(data["settings_id"])
data["start_date"] = data["start_date"].isoformat()
data["end_date"] = data["end_date"].isoformat()
data["template_slugs"] = list(data["template_slugs"])
data["provider_ref"] = {
"provider_id": settings.provider_ref.provider_id,
"pricing_mode": settings.provider_ref.pricing_mode,
}
with open(path, "w") as f:
json.dump(data, f, indent=2)
```
## Roadmap Items
### DATA-DB-001: Databento Historical Price Source
**Dependencies:** None
**Estimated effort:** 2-3 days
**Deliverables:**
- `app/services/backtesting/databento_source.py`
- `tests/test_databento_source.py` (mocked API)
- Environment variable `DATABENTO_API_KEY` support
### DATA-DB-002: Backtest Settings Model
**Dependencies:** None
**Estimated effort:** 1 day
**Deliverables:**
- `app/models/backtest_settings.py`
- Repository for persistence
### DATA-DB-003: Cache Management
**Dependencies:** DATA-DB-001
**Estimated effort:** 1 day
**Deliverables:**
- `app/services/backtesting/databento_cache.py`
- Cache cleanup CLI command
### DATA-DB-004: Backtest Page UI Updates
**Dependencies:** DATA-DB-001, DATA-DB-002
**Estimated effort:** 2 days
**Deliverables:**
- Updated `app/pages/backtests.py`
- Updated `app/pages/event_comparison.py`
- Cost estimation display
### DATA-DB-005: Scenario Pre-Seeding
**Dependencies:** DATA-DB-001
**Estimated effort:** 1-2 days
**Deliverables:**
- `app/services/backtesting/scenario_bulk_download.py`
- Pre-configured presets for gold hedging research
- Bulk download script
### DATA-DB-006: Options Data Source (Future)
**Dependencies:** DATA-DB-001
**Estimated effort:** 3-5 days
**Deliverables:**
- `DatabentoOptionSnapshotSource` implementing `OptionSnapshotSource`
- OPRA.PILLAR integration for historical options chains
## Configuration
Add to `.env`:
```
DATABENTO_API_KEY=db-xxxxxxxxxxxxxxxxxxxxxxxx
```
Add to `requirements.txt`:
```
databento>=0.30.0
```
Add to `pyproject.toml`:
```toml
[project.optional-dependencies]
databento = ["databento>=0.30.0"]
```
## Testing Strategy
1. **Unit tests** with mocked Databento responses (`tests/test_databento_source.py`)
2. **Integration tests** with recorded VCR cassettes (`tests/cassettes/*.yaml`)
3. **E2E tests** using cached data (`tests/test_backtest_databento_playwright.py`)
## Cost Management
- Use `metadata.get_cost()` before fetching to show estimated cost
- Default to cached data when available
- Batch download for large historical ranges (>1 year)
- Consider Databento flat rate plans for heavy usage
## Security Considerations
- API key stored in environment variable, never in code
- Cache files contain only market data (no PII)
- Rate limiting respected (100 requests/second per IP)