26 KiB
Databento Historical Data Integration Plan
Overview
Integrate Databento historical API for backtesting and scenario comparison pages, replacing yfinance for historical data on these pages. The integration will support configurable start prices/values independent of portfolio settings, with intelligent caching to avoid redundant downloads.
Architecture
Current State
- Backtest page (
app/pages/backtests.py): UsesYFinanceHistoricalPriceSourceviaBacktestPageService - Event comparison (
app/pages/event_comparison.py): Uses seeded event presets with yfinance data - Historical provider (
app/services/backtesting/historical_provider.py): Protocol-based architecture withYFinanceHistoricalPriceSourceandSyntheticHistoricalProvider
Target State
- Add
DatabentoHistoricalPriceSourceimplementingHistoricalPriceSourceprotocol - Add
DatabentoHistoricalOptionSourceimplementingOptionSnapshotSourceprotocol (future) - Smart caching layer: only re-download when parameters change
- Pre-seeded scenario data via batch downloads
Databento Data Sources
Underlyings and Datasets
| Instrument | Dataset | Symbol Format | Notes |
|---|---|---|---|
| GLD ETF | XNAS.BASIC or EQUS.PLUS |
GLD |
US equities consolidated |
| GC=F Futures | GLBX.MDP3 |
GC + continuous or GC=F raw |
Gold futures |
| Gold Options | OPRA.PILLAR |
GLD underlying |
Options on GLD ETF |
Schemas
| Schema | Use Case | Fields |
|---|---|---|
ohlcv-1d |
Daily backtesting | open, high, low, close, volume |
ohlcv-1h |
Intraday scenarios | Hourly bars |
trades |
Tick-level analysis | Full trade data |
definition |
Instrument metadata | Expiries, strike prices, tick sizes |
Implementation Plan
Phase 1: Historical Price Source (DATA-DB-001)
File: app/services/backtesting/databento_source.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
from typing import Any
import hashlib
import json
from app.services.backtesting.historical_provider import DailyClosePoint, HistoricalPriceSource
try:
import databento as db
DATABENTO_AVAILABLE = True
except ImportError:
DATABENTO_AVAILABLE = False
@dataclass(frozen=True)
class DatabentoCacheKey:
"""Cache key for Databento data requests."""
dataset: str
symbol: str
schema: str
start_date: date
end_date: date
def cache_path(self, cache_dir: Path) -> Path:
key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
return cache_dir / f"dbn_{key_hash}.parquet"
def metadata_path(self, cache_dir: Path) -> Path:
key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
return cache_dir / f"dbn_{key_hash}_meta.json"
@dataclass
class DatabentoSourceConfig:
"""Configuration for Databento data source."""
api_key: str | None = None # Falls back to DATABENTO_API_KEY env var
cache_dir: Path = Path(".cache/databento")
dataset: str = "XNAS.BASIC"
schema: str = "ohlcv-1d"
stype_in: str = "raw_symbol"
# Re-download threshold
max_cache_age_days: int = 30
class DatabentoHistoricalPriceSource(HistoricalPriceSource):
"""Databento-based historical price source for backtesting."""
def __init__(self, config: DatabentoSourceConfig | None = None) -> None:
if not DATABENTO_AVAILABLE:
raise RuntimeError("databento package required: pip install databento")
self.config = config or DatabentoSourceConfig()
self.config.cache_dir.mkdir(parents=True, exist_ok=True)
self._client: db.Historical | None = None
@property
def client(self) -> db.Historical:
if self._client is None:
self._client = db.Historical(key=self.config.api_key)
return self._client
def _load_from_cache(self, key: DatabentoCacheKey) -> list[DailyClosePoint] | None:
"""Load cached data if available and fresh."""
cache_file = key.cache_path(self.config.cache_dir)
meta_file = key.metadata_path(self.config.cache_dir)
if not cache_file.exists() or not meta_file.exists():
return None
try:
with open(meta_file) as f:
meta = json.load(f)
# Check cache age
download_date = date.fromisoformat(meta["download_date"])
age_days = (date.today() - download_date).days
if age_days > self.config.max_cache_age_days:
return None
# Check parameters match
if meta["dataset"] != key.dataset or meta["symbol"] != key.symbol:
return None
# Load parquet and convert
import pandas as pd
df = pd.read_parquet(cache_file)
return self._df_to_daily_points(df)
except Exception:
return None
def _save_to_cache(self, key: DatabentoCacheKey, df: pd.DataFrame) -> None:
"""Save data to cache."""
cache_file = key.cache_path(self.config.cache_dir)
meta_file = key.metadata_path(self.config.cache_dir)
df.to_parquet(cache_file, index=False)
meta = {
"download_date": date.today().isoformat(),
"dataset": key.dataset,
"symbol": key.symbol,
"schema": key.schema,
"start_date": key.start_date.isoformat(),
"end_date": key.end_date.isoformat(),
"rows": len(df),
}
with open(meta_file, "w") as f:
json.dump(meta, f, indent=2)
def _fetch_from_databento(self, key: DatabentoCacheKey) -> pd.DataFrame:
"""Fetch data from Databento API."""
data = self.client.timeseries.get_range(
dataset=key.dataset,
symbols=key.symbol,
schema=key.schema,
start=key.start_date.isoformat(),
end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end
stype_in=self.config.stype_in,
)
df = data.to_df()
return df
def _df_to_daily_points(self, df: pd.DataFrame) -> list[DailyClosePoint]:
"""Convert DataFrame to DailyClosePoint list."""
points = []
for idx, row in df.iterrows():
# Databento ohlcv schema has ts_event as timestamp
ts = row.get("ts_event", row.get("ts_recv", idx))
if hasattr(ts, "date"):
row_date = ts.date()
else:
row_date = date.fromisoformat(str(ts)[:10])
close = float(row["close"]) / 1e9 # Databento prices are int64 x 1e-9
points.append(DailyClosePoint(date=row_date, close=close))
return sorted(points, key=lambda p: p.date)
def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
"""Load daily closing prices from Databento (with caching)."""
# Map symbols to datasets
dataset = self._resolve_dataset(symbol)
databento_symbol = self._resolve_symbol(symbol)
key = DatabentoCacheKey(
dataset=dataset,
symbol=databento_symbol,
schema=self.config.schema,
start_date=start_date,
end_date=end_date,
)
# Try cache first
cached = self._load_from_cache(key)
if cached is not None:
return cached
# Fetch from Databento
import pandas as pd
df = self._fetch_from_databento(key)
# Cache results
self._save_to_cache(key, df)
return self._df_to_daily_points(df)
def _resolve_dataset(self, symbol: str) -> str:
"""Resolve symbol to Databento dataset."""
symbol_upper = symbol.upper()
if symbol_upper in ("GLD", "GLDM", "IAU"):
return "XNAS.BASIC" # ETFs on Nasdaq
elif symbol_upper in ("GC=F", "GC", "GOLD"):
return "GLBX.MDP3" # CME gold futures
elif symbol_upper == "XAU":
return "XNAS.BASIC" # Treat as GLD proxy
else:
return self.config.dataset # Use configured default
def _resolve_symbol(self, symbol: str) -> str:
"""Resolve vault-dash symbol to Databento symbol."""
symbol_upper = symbol.upper()
if symbol_upper == "XAU":
return "GLD" # Proxy XAU via GLD prices
elif symbol_upper == "GC=F":
return "GC" # Use parent symbol for continuous contracts
return symbol_upper
def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float:
"""Estimate cost in USD for a data request."""
dataset = self._resolve_dataset(symbol)
databento_symbol = self._resolve_symbol(symbol)
try:
cost = self.client.metadata.get_cost(
dataset=dataset,
symbols=databento_symbol,
schema=self.config.schema,
start=start_date.isoformat(),
end=(end_date + timedelta(days=1)).isoformat(),
)
return cost
except Exception:
return 0.0 # Return 0 if cost estimation fails
class DatabentoBacktestProvider:
"""Databento-backed historical provider for synthetic backtesting."""
provider_id = "databento_v1"
pricing_mode = "synthetic_bs_mid"
def __init__(
self,
price_source: DatabentoHistoricalPriceSource,
implied_volatility: float = 0.16,
risk_free_rate: float = 0.045,
) -> None:
self.price_source = price_source
self.implied_volatility = implied_volatility
self.risk_free_rate = risk_free_rate
def load_history(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
return self.price_source.load_daily_closes(symbol, start_date, end_date)
# ... rest delegates to SyntheticHistoricalProvider logic
Phase 2: Backtest Settings Model (DATA-DB-002)
File: app/models/backtest_settings.py
from dataclasses import dataclass, field
from datetime import date
from uuid import UUID
from app.models.backtest import ProviderRef
@dataclass(frozen=True)
class BacktestSettings:
"""User-configurable backtest settings (independent of portfolio)."""
# Scenario identification
settings_id: UUID
name: str
# Data source configuration
data_source: str = "databento" # "databento", "yfinance", "synthetic"
dataset: str = "XNAS.BASIC"
schema: str = "ohlcv-1d"
# Date range
start_date: date = date(2024, 1, 1)
end_date: date = date(2024, 12, 31)
# Independent scenario configuration (not derived from portfolio)
underlying_symbol: str = "GLD"
start_price: float = 0.0 # 0 = auto-derive from first close
underlying_units: float = 1000.0 # Independent of portfolio
loan_amount: float = 0.0 # Debt position for LTV analysis
margin_call_ltv: float = 0.75
# Templates to test
template_slugs: tuple[str, ...] = field(default_factory=lambda: ("protective-put-atm-12m",))
# Provider reference
provider_ref: ProviderRef = field(default_factory=lambda: ProviderRef(
provider_id="databento_v1",
pricing_mode="synthetic_bs_mid",
))
# Cache metadata
cache_key: str = "" # Populated when data is fetched
data_cost_usd: float = 0.0 # Cost of last data fetch
Phase 3: Cache Management (DATA-DB-003)
File: app/services/backtesting/databento_cache.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, timedelta
from pathlib import Path
import hashlib
import json
from app.services.backtesting.databento_source import DatabentoCacheKey
@dataclass
class CacheEntry:
"""Metadata for a cached Databento dataset."""
cache_key: DatabentoCacheKey
file_path: Path
download_date: date
size_bytes: int
cost_usd: float
class DatabentoCacheManager:
"""Manages Databento data cache lifecycle."""
def __init__(self, cache_dir: Path = Path(".cache/databento")) -> None:
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
def list_entries(self) -> list[CacheEntry]:
"""List all cached entries."""
entries = []
for meta_file in self.cache_dir.glob("*_meta.json"):
with open(meta_file) as f:
meta = json.load(f)
cache_file = meta_file.with_name(meta_file.stem.replace("_meta", "") + ".parquet")
if cache_file.exists():
entries.append(CacheEntry(
cache_key=DatabentoCacheKey(
dataset=meta["dataset"],
symbol=meta["symbol"],
schema=meta["schema"],
start_date=date.fromisoformat(meta["start_date"]),
end_date=date.fromisoformat(meta["end_date"]),
),
file_path=cache_file,
download_date=date.fromisoformat(meta["download_date"]),
size_bytes=cache_file.stat().st_size,
cost_usd=0.0, # Would need to track separately
))
return entries
def invalidate_expired(self, max_age_days: int = 30) -> list[Path]:
"""Remove cache entries older than max_age_days."""
removed = []
cutoff = date.today() - timedelta(days=max_age_days)
for entry in self.list_entries():
if entry.download_date < cutoff:
entry.file_path.unlink(missing_ok=True)
meta_file = entry.file_path.with_name(entry.file_path.stem + "_meta.json")
meta_file.unlink(missing_ok=True)
removed.append(entry.file_path)
return removed
def clear_all(self) -> int:
"""Clear all cached data."""
count = 0
for file in self.cache_dir.glob("*"):
if file.is_file():
file.unlink()
count += 1
return count
def get_cache_size(self) -> int:
"""Get total cache size in bytes."""
return sum(f.stat().st_size for f in self.cache_dir.glob("*") if f.is_file())
def should_redownload(self, key: DatabentoCacheKey, params_changed: bool) -> bool:
"""Determine if data should be re-downloaded."""
cache_file = key.cache_path(self.cache_dir)
meta_file = key.metadata_path(self.cache_dir)
if params_changed:
return True
if not cache_file.exists() or not meta_file.exists():
return True
try:
with open(meta_file) as f:
meta = json.load(f)
download_date = date.fromisoformat(meta["download_date"])
age_days = (date.today() - download_date).days
return age_days > 30
except Exception:
return True
Phase 4: Backtest Page UI Updates (DATA-DB-004)
Key changes to app/pages/backtests.py:
- Add Databento configuration section
- Add independent start price/units inputs
- Show estimated data cost before fetching
- Cache status indicator
# In backtests.py
with ui.card().classes("w-full ..."):
ui.label("Data Source").classes("text-lg font-semibold")
data_source = ui.select(
{"databento": "Databento (historical market data)", "yfinance": "Yahoo Finance (free, limited)"},
value="databento",
label="Data source",
).classes("w-full")
# Databento-specific settings
with ui.column().classes("w-full gap-2").bind_visibility_from(data_source, "value", lambda v: v == "databento"):
ui.label("Dataset configuration").classes("text-sm text-slate-500")
dataset_select = ui.select(
{"XNAS.BASIC": "Nasdaq Basic (GLD)", "GLBX.MDP3": "CME Globex (GC=F)"},
value="XNAS.BASIC",
label="Dataset",
).classes("w-full")
schema_select = ui.select(
{"ohlcv-1d": "Daily bars", "ohlcv-1h": "Hourly bars"},
value="ohlcv-1d",
label="Resolution",
).classes("w-full")
# Cost estimate
cost_label = ui.label("Estimated cost: $0.00").classes("text-sm text-slate-500")
# Cache status
cache_status = ui.label("").classes("text-xs text-slate-400")
# Independent scenario settings
with ui.card().classes("w-full ..."):
ui.label("Scenario Configuration").classes("text-lg font-semibold")
ui.label("Configure start values independent of portfolio settings").classes("text-sm text-slate-500")
start_price_input = ui.number(
"Start price",
value=0.0,
min=0.0,
step=0.01,
).classes("w-full")
ui.label("Set to 0 to auto-derive from first historical close").classes("text-xs text-slate-400 -mt-2")
underlying_units_input = ui.number(
"Underlying units",
value=1000.0,
min=0.0001,
step=0.0001,
).classes("w-full")
loan_amount_input = ui.number(
"Loan amount ($)",
value=0.0,
min=0.0,
step=1000,
).classes("w-full")
Phase 5: Scenario Pre-Seeding (DATA-DB-005)
File: app/services/backtesting/scenario_bulk_download.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from pathlib import Path
import json
try:
import databento as db
DATABENTO_AVAILABLE = True
except ImportError:
DATABENTO_AVAILABLE = False
@dataclass
class ScenarioPreset:
"""Pre-configured scenario ready for backtesting."""
preset_id: str
display_name: str
symbol: str
dataset: str
window_start: date
window_end: date
default_start_price: float # First close in window
default_templates: tuple[str, ...]
event_type: str
tags: tuple[str, ...]
description: str
def download_historical_presets(
client: db.Historical,
presets: list[ScenarioPreset],
output_dir: Path,
) -> dict[str, Path]:
"""Bulk download historical data for all presets.
Returns mapping of preset_id to cached file path.
"""
results = {}
for preset in presets:
cache_key = DatabentoCacheKey(
dataset=preset.dataset,
symbol=preset.symbol,
schema="ohlcv-1d",
start_date=preset.window_start,
end_date=preset.window_end,
)
cache_file = cache_key.cache_path(output_dir)
# Download if not cached
if not cache_file.exists():
data = client.timeseries.get_range(
dataset=preset.dataset,
symbols=preset.symbol,
schema="ohlcv-1d",
start=preset.window_start.isoformat(),
end=preset.window_end.isoformat(),
)
data.to_parquet(cache_file)
results[preset.preset_id] = cache_file
return results
def create_default_presets() -> list[ScenarioPreset]:
"""Create default scenario presets for gold hedging research."""
return [
ScenarioPreset(
preset_id="gld-2020-covid-crash",
display_name="GLD March 2020 COVID Crash",
symbol="GLD",
dataset="XNAS.BASIC",
window_start=date(2020, 2, 15),
window_end=date(2020, 4, 15),
default_start_price=143.0, # Approx GLD close on 2020-02-15
default_templates=("protective-put-atm-12m", "protective-put-95pct-12m"),
event_type="crash",
tags=("covid", "crash", "high-vol"),
description="March 2020 COVID market crash - extreme volatility event",
),
ScenarioPreset(
preset_id="gld-2022-rate-hike-cycle",
display_name="GLD 2022 Rate Hike Cycle",
symbol="GLD",
dataset="XNAS.BASIC",
window_start=date(2022, 1, 1),
window_end=date(2022, 12, 31),
default_start_price=168.0,
default_templates=("protective-put-atm-12m", "ladder-50-50-atm-95pct-12m"),
event_type="rate_cycle",
tags=("rates", "fed", "extended"),
description="Full year 2022 - aggressive Fed rate hikes",
),
ScenarioPreset(
preset_id="gcf-2024-rally",
display_name="GC=F 2024 Gold Rally",
symbol="GC",
dataset="GLBX.MDP3",
window_start=date(2024, 1, 1),
window_end=date(2024, 12, 31),
default_start_price=2060.0,
default_templates=("protective-put-atm-12m",),
event_type="rally",
tags=("gold", "futures", "rally"),
description="Gold futures rally in 2024",
),
]
Phase 6: Settings Persistence (DATA-DB-006)
File: app/models/backtest_settings_repository.py
from dataclasses import asdict
from datetime import date
from pathlib import Path
from uuid import UUID, uuid4
import json
from app.models.backtest_settings import BacktestSettings
class BacktestSettingsRepository:
"""Persistence for backtest settings."""
def __init__(self, base_path: Path | None = None) -> None:
self.base_path = base_path or Path(".workspaces")
def _settings_path(self, workspace_id: str) -> Path:
return self.base_path / workspace_id / "backtest_settings.json"
def load(self, workspace_id: str) -> BacktestSettings:
"""Load backtest settings, creating defaults if not found."""
path = self._settings_path(workspace_id)
if path.exists():
with open(path) as f:
data = json.load(f)
return BacktestSettings(
settings_id=UUID(data["settings_id"]),
name=data.get("name", "Default Backtest"),
data_source=data.get("data_source", "databento"),
dataset=data.get("dataset", "XNAS.BASIC"),
schema=data.get("schema", "ohlcv-1d"),
start_date=date.fromisoformat(data["start_date"]),
end_date=date.fromisoformat(data["end_date"]),
underlying_symbol=data.get("underlying_symbol", "GLD"),
start_price=data.get("start_price", 0.0),
underlying_units=data.get("underlying_units", 1000.0),
loan_amount=data.get("loan_amount", 0.0),
margin_call_ltv=data.get("margin_call_ltv", 0.75),
template_slugs=tuple(data.get("template_slugs", ("protective-put-atm-12m",))),
cache_key=data.get("cache_key", ""),
data_cost_usd=data.get("data_cost_usd", 0.0),
)
# Return defaults
return BacktestSettings(
settings_id=uuid4(),
name="Default Backtest",
)
def save(self, workspace_id: str, settings: BacktestSettings) -> None:
"""Persist backtest settings."""
path = self._settings_path(workspace_id)
path.parent.mkdir(parents=True, exist_ok=True)
data = asdict(settings)
data["settings_id"] = str(data["settings_id"])
data["start_date"] = data["start_date"].isoformat()
data["end_date"] = data["end_date"].isoformat()
data["template_slugs"] = list(data["template_slugs"])
data["provider_ref"] = {
"provider_id": settings.provider_ref.provider_id,
"pricing_mode": settings.provider_ref.pricing_mode,
}
with open(path, "w") as f:
json.dump(data, f, indent=2)
Roadmap Items
DATA-DB-001: Databento Historical Price Source
Dependencies: None
Estimated effort: 2-3 days
Deliverables:
app/services/backtesting/databento_source.pytests/test_databento_source.py(mocked API)- Environment variable
DATABENTO_API_KEYsupport
DATA-DB-002: Backtest Settings Model
Dependencies: None
Estimated effort: 1 day
Deliverables:
app/models/backtest_settings.py- Repository for persistence
DATA-DB-003: Cache Management
Dependencies: DATA-DB-001
Estimated effort: 1 day
Deliverables:
app/services/backtesting/databento_cache.py- Cache cleanup CLI command
DATA-DB-004: Backtest Page UI Updates
Dependencies: DATA-DB-001, DATA-DB-002
Estimated effort: 2 days
Deliverables:
- Updated
app/pages/backtests.py - Updated
app/pages/event_comparison.py - Cost estimation display
DATA-DB-005: Scenario Pre-Seeding
Dependencies: DATA-DB-001
Estimated effort: 1-2 days
Deliverables:
app/services/backtesting/scenario_bulk_download.py- Pre-configured presets for gold hedging research
- Bulk download script
DATA-DB-006: Options Data Source (Future)
Dependencies: DATA-DB-001
Estimated effort: 3-5 days
Deliverables:
DatabentoOptionSnapshotSourceimplementingOptionSnapshotSource- OPRA.PILLAR integration for historical options chains
Configuration
Add to .env:
DATABENTO_API_KEY=db-xxxxxxxxxxxxxxxxxxxxxxxx
Add to requirements.txt:
databento>=0.30.0
Add to pyproject.toml:
[project.optional-dependencies]
databento = ["databento>=0.30.0"]
Testing Strategy
- Unit tests with mocked Databento responses (
tests/test_databento_source.py) - Integration tests with recorded VCR cassettes (
tests/cassettes/*.yaml) - E2E tests using cached data (
tests/test_backtest_databento_playwright.py)
Cost Management
- Use
metadata.get_cost()before fetching to show estimated cost - Default to cached data when available
- Batch download for large historical ranges (>1 year)
- Consider Databento flat rate plans for heavy usage
Security Considerations
- API key stored in environment variable, never in code
- Cache files contain only market data (no PII)
- Rate limiting respected (100 requests/second per IP)