- Extend DailyClosePoint to include low, high, open (optional) - Update Databento source to extract OHLC data from ohlcv-1d schema - Update YFinance source to extract Low, High, Open from history - Modify backtest engine to use worst-case (low) price for margin call detection This ensures margin calls are evaluated at the day's worst price, not just the closing price, providing more realistic risk assessment.
343 lines
12 KiB
Python
343 lines
12 KiB
Python
"""Databento historical price source for backtesting."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import date, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import databento, gracefully degrade if not available
|
|
try:
|
|
import databento as db
|
|
import pandas as pd
|
|
|
|
DATABENTO_AVAILABLE = True
|
|
except ImportError:
|
|
db = None
|
|
pd = None
|
|
DATABENTO_AVAILABLE = False
|
|
|
|
|
|
@dataclass
|
|
class DatabentoSourceConfig:
|
|
"""Configuration for Databento data source."""
|
|
|
|
api_key: str | None = None # Falls back to DATABENTO_API_KEY env var
|
|
cache_dir: Path = Path(".cache/databento")
|
|
dataset: str = "XNAS.BASIC"
|
|
schema: str = "ohlcv-1d"
|
|
stype_in: str = "raw_symbol"
|
|
|
|
# Re-download threshold
|
|
max_cache_age_days: int = 30
|
|
|
|
def __post_init__(self) -> None:
|
|
# Ensure cache_dir is a Path
|
|
if isinstance(self.cache_dir, str):
|
|
object.__setattr__(self, "cache_dir", Path(self.cache_dir))
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DatabentoCacheKey:
|
|
"""Cache key for Databento data."""
|
|
|
|
dataset: str
|
|
symbol: str
|
|
schema: str
|
|
start_date: date
|
|
end_date: date
|
|
|
|
def cache_path(self, cache_dir: Path) -> Path:
|
|
key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
|
|
key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
|
|
return cache_dir / f"dbn_{key_hash}.parquet"
|
|
|
|
def metadata_path(self, cache_dir: Path) -> Path:
|
|
key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}"
|
|
key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16]
|
|
return cache_dir / f"dbn_{key_hash}_meta.json"
|
|
|
|
|
|
class DatabentoHistoricalPriceSource:
|
|
"""Databento-based historical price source for backtesting.
|
|
|
|
This provider fetches historical daily OHLCV data from Databento's API
|
|
and caches it locally to minimize API calls and costs.
|
|
|
|
Key features:
|
|
- Smart caching with configurable age threshold
|
|
- Cost estimation before fetching
|
|
- Symbol-to-dataset resolution (GLD→XNAS.BASIC, GC=F→GLBX.MDP3)
|
|
- Parquet storage for fast loading
|
|
|
|
Example usage:
|
|
source = DatabentoHistoricalPriceSource()
|
|
prices = source.load_daily_closes("GLD", date(2024, 1, 1), date(2024, 1, 31))
|
|
"""
|
|
|
|
def __init__(self, config: DatabentoSourceConfig | None = None) -> None:
|
|
if not DATABENTO_AVAILABLE:
|
|
raise RuntimeError("databento package required: pip install databento>=0.30.0")
|
|
|
|
self.config = config or DatabentoSourceConfig()
|
|
self.config.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
self._client: Any = None # db.Historical
|
|
|
|
@property
|
|
def client(self) -> Any:
|
|
"""Get or create Databento client."""
|
|
if self._client is None:
|
|
if db is None:
|
|
raise RuntimeError("databento package not installed")
|
|
self._client = db.Historical(key=self.config.api_key)
|
|
return self._client
|
|
|
|
def _load_from_cache(self, key: DatabentoCacheKey) -> list[dict[str, Any]] | None:
|
|
"""Load cached data if available and fresh."""
|
|
cache_file = key.cache_path(self.config.cache_dir)
|
|
meta_file = key.metadata_path(self.config.cache_dir)
|
|
|
|
if not cache_file.exists() or not meta_file.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(meta_file) as f:
|
|
meta = json.load(f)
|
|
|
|
# Check dataset and symbol match (for cache invalidation)
|
|
if meta.get("dataset") != key.dataset or meta.get("symbol") != key.symbol:
|
|
return None
|
|
|
|
cache_age = (date.today() - date.fromisoformat(meta["download_date"])).days
|
|
if cache_age > self.config.max_cache_age_days:
|
|
return None
|
|
|
|
if meta.get("start_date") != key.start_date.isoformat() or meta.get("end_date") != key.end_date.isoformat():
|
|
return None
|
|
|
|
if meta.get("dataset") != key.dataset or meta.get("symbol") != key.symbol:
|
|
return None
|
|
|
|
# Load parquet and convert
|
|
|
|
df = pd.read_parquet(cache_file)
|
|
return self._df_to_daily_points(df)
|
|
except Exception:
|
|
return None
|
|
|
|
def _save_to_cache(self, key: DatabentoCacheKey, df: Any, cost_usd: float = 0.0) -> None:
|
|
"""Save data to cache."""
|
|
if pd is None:
|
|
return
|
|
|
|
cache_file = key.cache_path(self.config.cache_dir)
|
|
meta_file = key.metadata_path(self.config.cache_dir)
|
|
|
|
df.to_parquet(cache_file, index=False)
|
|
|
|
meta = {
|
|
"download_date": date.today().isoformat(),
|
|
"dataset": key.dataset,
|
|
"symbol": key.symbol,
|
|
"schema": key.schema,
|
|
"start_date": key.start_date.isoformat(),
|
|
"end_date": key.end_date.isoformat(),
|
|
"rows": len(df),
|
|
"cost_usd": cost_usd,
|
|
}
|
|
with open(meta_file, "w") as f:
|
|
json.dump(meta, f, indent=2)
|
|
|
|
def _fetch_from_databento(self, key: DatabentoCacheKey) -> Any:
|
|
"""Fetch data from Databento API."""
|
|
data = self.client.timeseries.get_range(
|
|
dataset=key.dataset,
|
|
symbols=key.symbol,
|
|
schema=key.schema,
|
|
start=key.start_date.isoformat(),
|
|
end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end
|
|
stype_in=self.config.stype_in,
|
|
)
|
|
return data.to_df()
|
|
|
|
def _df_to_daily_points(self, df: Any) -> list[Any]:
|
|
"""Convert DataFrame to DailyClosePoint list with OHLC data."""
|
|
from app.services.backtesting.historical_provider import DailyClosePoint
|
|
|
|
if pd is None:
|
|
return []
|
|
|
|
def parse_price(raw_val: Any) -> float | None:
|
|
"""Parse Databento price (int64 scaled by 1e9)."""
|
|
if raw_val is None or (isinstance(raw_val, float) and pd.isna(raw_val)):
|
|
return None
|
|
if isinstance(raw_val, (int, float)):
|
|
return float(raw_val) / 1e9 if raw_val > 1e9 else float(raw_val)
|
|
return float(raw_val) if raw_val else None
|
|
|
|
points = []
|
|
for idx, row in df.iterrows():
|
|
# Databento ohlcv schema has ts_event as timestamp
|
|
ts = row.get("ts_event", row.get("ts_recv", idx))
|
|
if hasattr(ts, "date"):
|
|
row_date = ts.date()
|
|
else:
|
|
# Parse ISO date string
|
|
ts_str = str(ts)
|
|
row_date = date.fromisoformat(ts_str[:10])
|
|
|
|
close = parse_price(row.get("close"))
|
|
low = parse_price(row.get("low"))
|
|
high = parse_price(row.get("high"))
|
|
open_price = parse_price(row.get("open"))
|
|
|
|
if close and close > 0:
|
|
points.append(
|
|
DailyClosePoint(
|
|
date=row_date,
|
|
close=close,
|
|
low=low,
|
|
high=high,
|
|
open=open_price,
|
|
)
|
|
)
|
|
|
|
return sorted(points, key=lambda p: p.date)
|
|
|
|
from app.services.backtesting.historical_provider import DailyClosePoint
|
|
|
|
def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
|
|
"""Load daily closing prices from Databento (with caching).
|
|
|
|
Args:
|
|
symbol: Trading symbol (GLD, GC=F, XAU)
|
|
start_date: Inclusive start date
|
|
end_date: Inclusive end date
|
|
|
|
Returns:
|
|
List of DailyClosePoint sorted by date
|
|
"""
|
|
# Map symbols to datasets
|
|
dataset = self._resolve_dataset(symbol)
|
|
databento_symbol = self._resolve_symbol(symbol)
|
|
|
|
key = DatabentoCacheKey(
|
|
dataset=dataset,
|
|
symbol=databento_symbol,
|
|
schema=self.config.schema,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
)
|
|
|
|
# Try cache first
|
|
cached = self._load_from_cache(key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# Fetch from Databento
|
|
df = self._fetch_from_databento(key)
|
|
|
|
# Get cost estimate (approximate)
|
|
try:
|
|
cost_usd = self.get_cost_estimate(symbol, start_date, end_date)
|
|
except Exception:
|
|
cost_usd = 0.0
|
|
|
|
# Cache results
|
|
self._save_to_cache(key, df, cost_usd)
|
|
|
|
return self._df_to_daily_points(df)
|
|
|
|
def _resolve_dataset(self, symbol: str) -> str:
|
|
"""Resolve symbol to Databento dataset."""
|
|
symbol_upper = symbol.upper()
|
|
if symbol_upper in ("GLD", "GLDM", "IAU"):
|
|
return "XNAS.BASIC" # ETFs on Nasdaq
|
|
elif symbol_upper in ("GC=F", "GC", "GOLD"):
|
|
return "GLBX.MDP3" # CME gold futures
|
|
elif symbol_upper == "XAU":
|
|
return "XNAS.BASIC" # Treat as GLD proxy
|
|
else:
|
|
return self.config.dataset # Use configured default
|
|
|
|
def _resolve_symbol(self, symbol: str) -> str:
|
|
"""Resolve vault-dash symbol to Databento symbol."""
|
|
symbol_upper = symbol.upper()
|
|
if symbol_upper == "XAU":
|
|
return "GLD" # Proxy XAU via GLD prices
|
|
elif symbol_upper == "GC=F":
|
|
return "GC" # Use parent symbol for continuous contracts
|
|
return symbol_upper
|
|
|
|
def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float:
|
|
"""Estimate cost in USD for a data request.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
start_date: Start date
|
|
end_date: End date
|
|
|
|
Returns:
|
|
Estimated cost in USD
|
|
"""
|
|
dataset = self._resolve_dataset(symbol)
|
|
databento_symbol = self._resolve_symbol(symbol)
|
|
|
|
try:
|
|
cost = self.client.metadata.get_cost(
|
|
dataset=dataset,
|
|
symbols=databento_symbol,
|
|
schema=self.config.schema,
|
|
start=start_date.isoformat(),
|
|
end=(end_date + timedelta(days=1)).isoformat(),
|
|
)
|
|
return float(cost)
|
|
except Exception:
|
|
return 0.0 # Return 0 if cost estimation fails
|
|
|
|
def get_available_range(self, symbol: str) -> tuple[date | None, date | None]:
|
|
"""Get the available date range for a symbol from Databento.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
|
|
Returns:
|
|
Tuple of (start_date, end_date) or (None, None) if unavailable
|
|
"""
|
|
# Note: Databento availability depends on the dataset
|
|
# For now, return None to indicate we should try fetching
|
|
return None, None
|
|
|
|
def get_cache_stats(self) -> dict[str, Any]:
|
|
"""Get cache statistics."""
|
|
cache_dir = self.config.cache_dir
|
|
if not cache_dir.exists():
|
|
return {"status": "empty", "entries": []}
|
|
|
|
entries = []
|
|
for meta_file in cache_dir.glob("*_meta.json"):
|
|
try:
|
|
with open(meta_file) as f:
|
|
meta = json.load(f)
|
|
entries.append(
|
|
{
|
|
"symbol": meta.get("symbol"),
|
|
"dataset": meta.get("dataset"),
|
|
"start_date": meta.get("start_date"),
|
|
"end_date": meta.get("end_date"),
|
|
"download_date": meta.get("download_date"),
|
|
"rows": meta.get("rows"),
|
|
"cost_usd": meta.get("cost_usd"),
|
|
}
|
|
)
|
|
except Exception:
|
|
continue
|
|
|
|
return {"status": "populated" if entries else "empty", "entries": entries}
|