diff --git a/.env.example b/.env.example index faa09f0..eedf51f 100644 --- a/.env.example +++ b/.env.example @@ -4,3 +4,4 @@ REDIS_URL=redis://localhost:6379 CONFIG_PATH=/app/config/settings.yaml TURNSTILE_SITE_KEY=1x00000000000000000000AA TURNSTILE_SECRET_KEY=1x0000000000000000000000000000000AA +DATABENTO_API_KEY=db-your-api-key-here diff --git a/=0.30.0 b/=0.30.0 new file mode 100644 index 0000000..e69de29 diff --git a/app/services/backtesting/databento_source.py b/app/services/backtesting/databento_source.py new file mode 100644 index 0000000..c087cf4 --- /dev/null +++ b/app/services/backtesting/databento_source.py @@ -0,0 +1,365 @@ +"""Databento historical price source for backtesting.""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass +from datetime import date, timedelta +from pathlib import Path +from typing import Any + +from app.services.backtesting.historical_provider import DailyClosePoint, HistoricalPriceSource + +try: + import databento as db + import pandas as pd + + DATABENTO_AVAILABLE = True +except ImportError: + DATABENTO_AVAILABLE = False + db = None # type: ignore + pd = None # type: ignore + + +@dataclass(frozen=True) +class DatabentoCacheKey: + """Cache key for Databento data requests.""" + + dataset: str + symbol: str + schema: str + start_date: date + end_date: date + + def cache_path(self, cache_dir: Path) -> Path: + """Generate cache file path from key.""" + key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" + key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] + return cache_dir / f"dbn_{key_hash}.parquet" + + def metadata_path(self, cache_dir: Path) -> Path: + """Generate metadata file path from key.""" + key_str = f"{self.dataset}_{self.symbol}_{self.schema}_{self.start_date}_{self.end_date}" + key_hash = hashlib.sha256(key_str.encode()).hexdigest()[:16] + return cache_dir / f"dbn_{key_hash}_meta.json" + + +@dataclass +class DatabentoSourceConfig: + """Configuration for Databento data source.""" + + api_key: str | None = None # Falls back to DATABENTO_API_KEY env var + cache_dir: Path = Path(".cache/databento") + dataset: str = "XNAS.BASIC" + schema: str = "ohlcv-1d" + stype_in: str = "raw_symbol" + + # Re-download threshold + max_cache_age_days: int = 30 + + def __post_init__(self) -> None: + # Ensure cache_dir is a Path + if isinstance(self.cache_dir, str): + object.__setattr__(self, "cache_dir", Path(self.cache_dir)) + + +class DatabentoHistoricalPriceSource(HistoricalPriceSource): + """Databento-based historical price source for backtesting. + + This provider fetches historical daily OHLCV data from Databento's API + and caches it locally to minimize API calls and costs. + + Key features: + - Smart caching with configurable age threshold + - Cost estimation before fetching + - Symbol-to-dataset resolution (GLD→XNAS.BASIC, GC=F→GLBX.MDP3) + - Parquet storage for fast loading + + Example usage: + source = DatabentoHistoricalPriceSource() + prices = source.load_daily_closes("GLD", date(2024, 1, 1), date(2024, 1, 31)) + """ + + def __init__(self, config: DatabentoSourceConfig | None = None) -> None: + if not DATABENTO_AVAILABLE: + raise RuntimeError("databento package required: pip install databento>=0.30.0") + + self.config = config or DatabentoSourceConfig() + self.config.cache_dir.mkdir(parents=True, exist_ok=True) + self._client: Any = None # db.Historical + + @property + def client(self) -> Any: + """Get or create Databento client.""" + if self._client is None: + if db is None: + raise RuntimeError("databento package not installed") + self._client = db.Historical(key=self.config.api_key) + return self._client + + def _load_from_cache(self, key: DatabentoCacheKey) -> list[DailyClosePoint] | None: + """Load cached data if available and fresh.""" + cache_file = key.cache_path(self.config.cache_dir) + meta_file = key.metadata_path(self.config.cache_dir) + + if not cache_file.exists() or not meta_file.exists(): + return None + + try: + with open(meta_file) as f: + meta = json.load(f) + + # Check cache age + download_date = date.fromisoformat(meta["download_date"]) + age_days = (date.today() - download_date).days + if age_days > self.config.max_cache_age_days: + return None + + # Check parameters match + if meta["dataset"] != key.dataset or meta["symbol"] != key.symbol: + return None + + # Load parquet and convert + if pd is None: + return None + df = pd.read_parquet(cache_file) + return self._df_to_daily_points(df) + except Exception: + return None + + def _save_to_cache(self, key: DatabentoCacheKey, df: Any, cost_usd: float = 0.0) -> None: + """Save data to cache.""" + if pd is None: + return + + cache_file = key.cache_path(self.config.cache_dir) + meta_file = key.metadata_path(self.config.cache_dir) + + df.to_parquet(cache_file, index=False) + + meta = { + "download_date": date.today().isoformat(), + "dataset": key.dataset, + "symbol": key.symbol, + "schema": key.schema, + "start_date": key.start_date.isoformat(), + "end_date": key.end_date.isoformat(), + "rows": len(df), + "cost_usd": cost_usd, + } + with open(meta_file, "w") as f: + json.dump(meta, f, indent=2) + + def _fetch_from_databento(self, key: DatabentoCacheKey) -> Any: + """Fetch data from Databento API.""" + data = self.client.timeseries.get_range( + dataset=key.dataset, + symbols=key.symbol, + schema=key.schema, + start=key.start_date.isoformat(), + end=(key.end_date + timedelta(days=1)).isoformat(), # Exclusive end + stype_in=self.config.stype_in, + ) + return data.to_df() + + def _df_to_daily_points(self, df: Any) -> list[DailyClosePoint]: + """Convert DataFrame to DailyClosePoint list.""" + if pd is None: + return [] + + points = [] + for idx, row in df.iterrows(): + # Databento ohlcv schema has ts_event as timestamp + ts = row.get("ts_event", row.get("ts_recv", idx)) + if hasattr(ts, "date"): + row_date = ts.date() + else: + # Parse ISO date string + ts_str = str(ts) + row_date = date.fromisoformat(ts_str[:10]) + + # Databento prices are int64 scaled by 1e-9 + close_raw = row.get("close", 0) + if isinstance(close_raw, (int, float)): + close = float(close_raw) / 1e9 if close_raw > 1e9 else float(close_raw) + else: + close = float(close_raw) + + if close > 0: + points.append(DailyClosePoint(date=row_date, close=close)) + + return sorted(points, key=lambda p: p.date) + + def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]: + """Load daily closing prices from Databento (with caching). + + Args: + symbol: Trading symbol (GLD, GC=F, XAU) + start_date: Inclusive start date + end_date: Inclusive end date + + Returns: + List of DailyClosePoint sorted by date + """ + # Map symbols to datasets + dataset = self._resolve_dataset(symbol) + databento_symbol = self._resolve_symbol(symbol) + + key = DatabentoCacheKey( + dataset=dataset, + symbol=databento_symbol, + schema=self.config.schema, + start_date=start_date, + end_date=end_date, + ) + + # Try cache first + cached = self._load_from_cache(key) + if cached is not None: + return cached + + # Fetch from Databento + df = self._fetch_from_databento(key) + + # Get cost estimate (approximate) + try: + cost_usd = self.get_cost_estimate(symbol, start_date, end_date) + except Exception: + cost_usd = 0.0 + + # Cache results + self._save_to_cache(key, df, cost_usd) + + return self._df_to_daily_points(df) + + def _resolve_dataset(self, symbol: str) -> str: + """Resolve symbol to Databento dataset.""" + symbol_upper = symbol.upper() + if symbol_upper in ("GLD", "GLDM", "IAU"): + return "XNAS.BASIC" # ETFs on Nasdaq + elif symbol_upper in ("GC=F", "GC", "GOLD"): + return "GLBX.MDP3" # CME gold futures + elif symbol_upper == "XAU": + return "XNAS.BASIC" # Treat as GLD proxy + else: + return self.config.dataset # Use configured default + + def _resolve_symbol(self, symbol: str) -> str: + """Resolve vault-dash symbol to Databento symbol.""" + symbol_upper = symbol.upper() + if symbol_upper == "XAU": + return "GLD" # Proxy XAU via GLD prices + elif symbol_upper == "GC=F": + return "GC" # Use parent symbol for continuous contracts + return symbol_upper + + def get_cost_estimate(self, symbol: str, start_date: date, end_date: date) -> float: + """Estimate cost in USD for a data request. + + Args: + symbol: Trading symbol + start_date: Start date + end_date: End date + + Returns: + Estimated cost in USD + """ + dataset = self._resolve_dataset(symbol) + databento_symbol = self._resolve_symbol(symbol) + + try: + cost = self.client.metadata.get_cost( + dataset=dataset, + symbols=databento_symbol, + schema=self.config.schema, + start=start_date.isoformat(), + end=(end_date + timedelta(days=1)).isoformat(), + ) + return float(cost) + except Exception: + return 0.0 # Return 0 if cost estimation fails + + def get_available_range(self, symbol: str) -> tuple[date | None, date | None]: + """Get the available date range for a symbol. + + Args: + symbol: Trading symbol + + Returns: + Tuple of (start_date, end_date) or (None, None) if unavailable + """ + dataset = self._resolve_dataset(symbol) + + try: + range_info = self.client.metadata.get_dataset_range(dataset=dataset) + start_str = range_info.get("start", "") + end_str = range_info.get("end", "") + + start = date.fromisoformat(start_str[:10]) if start_str else None + end = date.fromisoformat(end_str[:10]) if end_str else None + + return start, end + except Exception: + return None, None + + def clear_cache(self) -> int: + """Clear all cached data files. + + Returns: + Number of files deleted + """ + count = 0 + for file in self.config.cache_dir.glob("*"): + if file.is_file(): + file.unlink() + count += 1 + return count + + def get_cache_stats(self) -> dict[str, Any]: + """Get cache statistics. + + Returns: + Dict with total_size_bytes, file_count, oldest_download, entries + """ + total_size = 0 + file_count = 0 + oldest_download: date | None = None + entries: list[dict[str, Any]] = [] + + for meta_file in self.config.cache_dir.glob("*_meta.json"): + try: + with open(meta_file) as f: + meta = json.load(f) + + download_date = date.fromisoformat(meta["download_date"]) + cache_file = meta_file.with_name(meta_file.stem.replace("_meta", "") + ".parquet") + + size = cache_file.stat().st_size if cache_file.exists() else 0 + total_size += size + file_count += 2 # meta + parquet + + if oldest_download is None or download_date < oldest_download: + oldest_download = download_date + + entries.append( + { + "dataset": meta["dataset"], + "symbol": meta["symbol"], + "start_date": meta["start_date"], + "end_date": meta["end_date"], + "rows": meta.get("rows", 0), + "cost_usd": meta.get("cost_usd", 0.0), + "download_date": meta["download_date"], + "size_bytes": size, + } + ) + except Exception: + continue + + return { + "total_size_bytes": total_size, + "file_count": file_count, + "oldest_download": oldest_download.isoformat() if oldest_download else None, + "entries": entries, + } diff --git a/requirements.txt b/requirements.txt index cbdd6d6..c66df7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ pandas>=2.0.0 pydantic>=2.5.0 pyyaml>=6.0 redis>=5.0.0 +databento>=0.30.0 # QuantLib>=1.31 is optional - installed separately if needed diff --git a/tests/test_databento_source.py b/tests/test_databento_source.py new file mode 100644 index 0000000..f3e9bb7 --- /dev/null +++ b/tests/test_databento_source.py @@ -0,0 +1,310 @@ +"""Tests for Databento historical price source.""" + +from __future__ import annotations + +import json +import tempfile +from datetime import date, timedelta +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from app.services.backtesting.databento_source import ( + DatabentoCacheKey, + DatabentoHistoricalPriceSource, + DatabentoSourceConfig, +) + + +@pytest.fixture +def temp_cache_dir(): + """Create a temporary cache directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def mock_databento_client(): + """Create a mock Databento client.""" + mock_client = MagicMock() + return mock_client + + +@pytest.fixture +def sample_ohlcv_df(): + """Create sample OHLCV DataFrame.""" + import pandas as pd + + data = [ + {"ts_event": "2024-01-02", "close": 185000000000}, # 185.0 + {"ts_event": "2024-01-03", "close": 186500000000}, # 186.5 + {"ts_event": "2024-01-04", "close": 184000000000}, # 184.0 + {"ts_event": "2024-01-05", "close": 187000000000}, # 187.0 + ] + return pd.DataFrame(data) + + +class TestDatabentoCacheKey: + """Tests for DatabentoCacheKey.""" + + def test_cache_path_generation(self, temp_cache_dir: Path) -> None: + """Cache path is deterministic for same parameters.""" + key = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + path1 = key.cache_path(temp_cache_dir) + path2 = key.cache_path(temp_cache_dir) + + assert path1 == path2 + assert path1.suffix == ".parquet" + assert path1.name.startswith("dbn_") + + def test_metadata_path_generation(self, temp_cache_dir: Path) -> None: + """Metadata path matches cache path.""" + key = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + cache_path = key.cache_path(temp_cache_dir) + meta_path = key.metadata_path(temp_cache_dir) + + assert meta_path.stem == cache_path.stem + "_meta" + assert meta_path.suffix == ".json" + + +class TestDatabentoSourceConfig: + """Tests for DatabentoSourceConfig.""" + + def test_default_config(self) -> None: + """Default config uses XNAS.BASIC and daily bars.""" + config = DatabentoSourceConfig() + + assert config.dataset == "XNAS.BASIC" + assert config.schema == "ohlcv-1d" + assert config.max_cache_age_days == 30 + assert config.api_key is None + + def test_custom_config(self) -> None: + """Custom config overrides defaults.""" + config = DatabentoSourceConfig( + api_key="test-key", + dataset="GLBX.MDP3", + schema="ohlcv-1h", + max_cache_age_days=7, + ) + + assert config.api_key == "test-key" + assert config.dataset == "GLBX.MDP3" + assert config.schema == "ohlcv-1h" + assert config.max_cache_age_days == 7 + + +class TestDatabentoHistoricalPriceSource: + """Tests for DatabentoHistoricalPriceSource.""" + + def test_resolve_dataset_gld(self) -> None: + """GLD resolves to XNAS.BASIC.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig() + + assert source._resolve_dataset("GLD") == "XNAS.BASIC" + assert source._resolve_dataset("gld") == "XNAS.BASIC" + assert source._resolve_dataset("GLDM") == "XNAS.BASIC" + + def test_resolve_dataset_gc_f(self) -> None: + """GC=F resolves to GLBX.MDP3.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig() + + assert source._resolve_dataset("GC=F") == "GLBX.MDP3" + assert source._resolve_dataset("GC") == "GLBX.MDP3" + + def test_resolve_dataset_xau(self) -> None: + """XAU resolves to XNAS.BASIC (GLD proxy).""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig() + + assert source._resolve_dataset("XAU") == "XNAS.BASIC" + + def test_resolve_symbol_xau(self) -> None: + """XAU resolves to GLD symbol.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig() + + assert source._resolve_symbol("XAU") == "GLD" + + def test_resolve_symbol_gc_f(self) -> None: + """GC=F resolves to GC parent symbol.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig() + + assert source._resolve_symbol("GC=F") == "GC" + + def test_df_to_daily_points_converts_prices(self) -> None: + """DataFrame prices are converted from int64 scaled format.""" + import pandas as pd + + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig() + + df = pd.DataFrame( + [ + {"ts_event": "2024-01-02", "close": 185000000000}, # 185.0 + {"ts_event": "2024-01-03", "close": 186500000000}, # 186.5 + ] + ) + + points = source._df_to_daily_points(df) + + assert len(points) == 2 + assert points[0].date == date(2024, 1, 2) + assert points[0].close == 185.0 + assert points[1].close == 186.5 + + def test_load_from_cache_returns_none_if_missing(self, temp_cache_dir: Path) -> None: + """Returns None if cache files don't exist.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig(cache_dir=temp_cache_dir) + + key = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + result = source._load_from_cache(key) + assert result is None + + def test_load_from_cache_returns_data_if_fresh(self, temp_cache_dir: Path, sample_ohlcv_df) -> None: + """Returns cached data if within age threshold.""" + + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig(cache_dir=temp_cache_dir) + + key = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + # Save to cache + source._save_to_cache(key, sample_ohlcv_df) + + # Load from cache + result = source._load_from_cache(key) + + assert result is not None + assert len(result) == 4 + assert result[0].close == 185.0 + + def test_load_from_cache_returns_none_if_stale( + self, temp_cache_dir: Path, sample_ohlcv_df + ) -> None: + """Returns None if cache exceeds age threshold.""" + + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig( + cache_dir=temp_cache_dir, + max_cache_age_days=0, # Always stale + ) + + key = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + # Save to cache + source._save_to_cache(key, sample_ohlcv_df) + + # Manually age the cache by setting download_date to yesterday + meta_file = key.metadata_path(temp_cache_dir) + with open(meta_file) as f: + meta = json.load(f) + meta["download_date"] = (date.today() - timedelta(days=1)).isoformat() + with open(meta_file, "w") as f: + json.dump(meta, f) + + # Load from cache (should fail due to age) + result = source._load_from_cache(key) + + assert result is None + + @patch("app.services.backtesting.databento_source.DATABENTO_AVAILABLE", False) + def test_raises_if_databento_not_installed(self) -> None: + """Raises error if databento package not installed.""" + with pytest.raises(RuntimeError, match="databento package required"): + DatabentoHistoricalPriceSource() + + def test_clear_cache(self, temp_cache_dir: Path, sample_ohlcv_df) -> None: + """Clears all cache files.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig(cache_dir=temp_cache_dir) + + # Create some cache files + key1 = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + key2 = DatabentoCacheKey( + dataset="GLBX.MDP3", + symbol="GC", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + source._save_to_cache(key1, sample_ohlcv_df) + source._save_to_cache(key2, sample_ohlcv_df) + + count = source.clear_cache() + assert count == 4 # 2 parquet + 2 json + + +class TestDatabentoHistoricalPriceSourceIntegration: + """Integration tests (require databento package).""" + + @pytest.mark.skipif( + not DatabentoHistoricalPriceSource.__module__, + reason="databento not installed", + ) + def test_get_cache_stats(self, temp_cache_dir: Path, sample_ohlcv_df) -> None: + """Returns cache statistics.""" + source = DatabentoHistoricalPriceSource.__new__(DatabentoHistoricalPriceSource) + source.config = DatabentoSourceConfig(cache_dir=temp_cache_dir) + + key = DatabentoCacheKey( + dataset="XNAS.BASIC", + symbol="GLD", + schema="ohlcv-1d", + start_date=date(2024, 1, 1), + end_date=date(2024, 1, 31), + ) + + source._save_to_cache(key, sample_ohlcv_df) + + stats = source.get_cache_stats() + + assert stats["file_count"] == 2 + assert stats["total_size_bytes"] > 0 + assert len(stats["entries"]) == 1 + assert stats["entries"][0]["symbol"] == "GLD"