feat(BT-002): add historical snapshot provider

This commit is contained in:
Bu5hm4nn
2026-03-27 18:31:28 +01:00
parent 1a6760bee3
commit 477514f838
15 changed files with 822 additions and 82 deletions

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from app.models.backtest import (
BacktestDailyPoint,
@@ -9,21 +8,21 @@ from app.models.backtest import (
BacktestSummaryMetrics,
TemplateBacktestResult,
)
from app.models.strategy_template import StrategyTemplate, TemplateLeg
from app.services.backtesting.historical_provider import DailyClosePoint, SyntheticHistoricalProvider
from app.models.strategy_template import StrategyTemplate
from app.services.backtesting.historical_provider import (
BacktestHistoricalProvider,
DailyClosePoint,
HistoricalOptionPosition,
)
@dataclass
class OpenSyntheticPosition:
position_id: str
leg: TemplateLeg
strike: float
expiry: date
quantity: float
class OpenSyntheticPosition(HistoricalOptionPosition):
pass
class SyntheticBacktestEngine:
def __init__(self, provider: SyntheticHistoricalProvider) -> None:
def __init__(self, provider: BacktestHistoricalProvider) -> None:
self.provider = provider
def run_template(
@@ -36,9 +35,9 @@ class SyntheticBacktestEngine:
cash_balance = scenario.initial_portfolio.cash_balance
total_hedge_cost = 0.0
total_option_payoff_realized = 0.0
warnings: list[str] = []
open_positions = self._open_positions(scenario, template, history, start_day)
opening_quotes = [self._mark_position(position, start_day) for position in open_positions]
opening_cost = sum(quote.mark * quote.quantity for quote in opening_quotes)
opening_cost = sum(position.entry_price * position.quantity for position in open_positions)
cash_balance -= opening_cost
total_hedge_cost += opening_cost
@@ -48,23 +47,23 @@ class SyntheticBacktestEngine:
realized_option_cashflow = 0.0
option_market_value = 0.0
active_position_ids: list[str] = []
remaining_positions: list[OpenSyntheticPosition] = []
remaining_positions: list[HistoricalOptionPosition] = []
for position in open_positions:
if day.date >= position.expiry:
intrinsic = self.provider.intrinsic_value(
option_type=position.leg.option_type,
spot=day.close,
strike=position.strike,
)
payoff = intrinsic * position.quantity
cash_balance += payoff
realized_option_cashflow += payoff
total_option_payoff_realized += payoff
valuation = self.provider.mark_position(
position,
symbol=scenario.symbol,
as_of_date=day.date,
spot=day.close,
)
self._append_warning(warnings, valuation.warning)
if not valuation.is_active:
cash_balance += valuation.realized_cashflow
realized_option_cashflow += valuation.realized_cashflow
total_option_payoff_realized += valuation.realized_cashflow
continue
quote = self._mark_position(position, day)
option_market_value += quote.mark * position.quantity
option_market_value += valuation.mark * position.quantity
active_position_ids.append(position.position_id)
remaining_positions.append(position)
@@ -113,6 +112,7 @@ class SyntheticBacktestEngine:
template_name=template.display_name,
summary_metrics=summary,
daily_path=tuple(daily_points),
warnings=tuple(warnings),
)
def _open_positions(
@@ -121,30 +121,25 @@ class SyntheticBacktestEngine:
template: StrategyTemplate,
history: list[DailyClosePoint],
start_day: DailyClosePoint,
) -> list[OpenSyntheticPosition]:
positions: list[OpenSyntheticPosition] = []
) -> list[HistoricalOptionPosition]:
positions: list[HistoricalOptionPosition] = []
for index, leg in enumerate(template.legs, start=1):
expiry = self.provider.resolve_expiry(history, start_day.date, leg.target_expiry_days)
positions.append(
OpenSyntheticPosition(
position_id=f"{template.slug}-position-{index}",
self.provider.open_position(
symbol=scenario.symbol,
leg=leg,
strike=start_day.close * leg.strike_rule.value,
expiry=expiry,
quantity=scenario.initial_portfolio.underlying_units
* leg.allocation_weight
* leg.target_coverage_pct,
position_id=f"{template.slug}-position-{index}",
quantity=(
scenario.initial_portfolio.underlying_units * leg.allocation_weight * leg.target_coverage_pct
),
as_of_date=start_day.date,
spot=start_day.close,
trading_days=history,
)
)
return positions
def _mark_position(self, position: OpenSyntheticPosition, day: DailyClosePoint):
return self.provider.price_option(
position_id=position.position_id,
leg=position.leg,
spot=day.close,
strike=position.strike,
expiry=position.expiry,
quantity=position.quantity,
valuation_date=day.date,
)
@staticmethod
def _append_warning(warnings: list[str], warning: str | None) -> None:
if warning and warning not in warnings:
warnings.append(warning)

View File

@@ -123,6 +123,7 @@ class TemplateBacktestResult:
template_name: str
summary_metrics: BacktestSummaryMetrics
daily_path: tuple[BacktestDailyPoint, ...]
warnings: tuple[str, ...] = field(default_factory=tuple)
@dataclass(frozen=True)

View File

@@ -207,16 +207,13 @@ def _render_event_comparison_page(workspace_id: str | None = None) -> None:
try:
preview_units = float(units_input.value or 0.0)
if workspace_id and config is not None and reseed_units:
preview_scenario = service.preview_scenario(
preview_entry_spot = service.derive_entry_spot(
preset_slug=str(option["slug"]),
template_slugs=template_slugs,
underlying_units=1.0,
loan_amount=float(loan_input.value or 0.0),
margin_call_ltv=float(ltv_input.value or 0.0),
)
preview_units = asset_quantity_from_workspace_config(
config,
entry_spot=float(preview_scenario.initial_portfolio.entry_spot),
entry_spot=preview_entry_spot,
symbol="GLD",
)
syncing_controls["value"] = True
@@ -536,7 +533,7 @@ def _render_event_comparison_page(workspace_id: str | None = None) -> None:
if syncing_controls["value"]:
return
validation_label.set_text("")
preview_error = refresh_preview(reset_templates=True, reseed_units=True)
preview_error = refresh_preview(reset_templates=True, reseed_units=False)
if preview_error:
validation_label.set_text(preview_error)
render_result_state("Scenario validation failed", preview_error, tone="warning")

View File

@@ -55,11 +55,130 @@ class SyntheticOptionQuote:
raise ValueError("mark must be non-negative")
@dataclass(frozen=True)
class DailyOptionSnapshot:
contract_key: str
symbol: str
snapshot_date: date
expiry: date
option_type: str
strike: float
mid: float
def __post_init__(self) -> None:
if not self.contract_key:
raise ValueError("contract_key is required")
if not self.symbol:
raise ValueError("symbol is required")
if self.option_type not in {"put", "call"}:
raise ValueError("unsupported option_type")
if self.strike <= 0:
raise ValueError("strike must be positive")
if self.mid < 0:
raise ValueError("mid must be non-negative")
@dataclass
class HistoricalOptionPosition:
position_id: str
leg_id: str
contract_key: str
option_type: str
strike: float
expiry: date
quantity: float
entry_price: float
current_mark: float
last_mark_date: date
source_snapshot_date: date
def __post_init__(self) -> None:
for field_name in ("position_id", "leg_id", "contract_key"):
value = getattr(self, field_name)
if not isinstance(value, str) or not value:
raise ValueError(f"{field_name} is required")
if self.option_type not in {"put", "call"}:
raise ValueError("unsupported option_type")
for field_name in ("strike", "quantity", "entry_price", "current_mark"):
value = getattr(self, field_name)
if not isinstance(value, (int, float)) or isinstance(value, bool) or not isfinite(float(value)):
raise TypeError(f"{field_name} must be a finite number")
if self.strike <= 0:
raise ValueError("strike must be positive")
if self.quantity <= 0:
raise ValueError("quantity must be positive")
if self.entry_price < 0:
raise ValueError("entry_price must be non-negative")
if self.current_mark < 0:
raise ValueError("current_mark must be non-negative")
@dataclass(frozen=True)
class HistoricalOptionMark:
contract_key: str
mark: float
source: str
is_active: bool
realized_cashflow: float = 0.0
warning: str | None = None
def __post_init__(self) -> None:
if not self.contract_key:
raise ValueError("contract_key is required")
for field_name in ("mark", "realized_cashflow"):
value = getattr(self, field_name)
if not isinstance(value, (int, float)) or isinstance(value, bool) or not isfinite(float(value)):
raise TypeError(f"{field_name} must be a finite number")
if self.mark < 0:
raise ValueError("mark must be non-negative")
if self.realized_cashflow < 0:
raise ValueError("realized_cashflow must be non-negative")
class HistoricalPriceSource(Protocol):
def load_daily_closes(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
raise NotImplementedError
class OptionSnapshotSource(Protocol):
def load_option_chain(self, symbol: str, snapshot_date: date) -> list[DailyOptionSnapshot]:
raise NotImplementedError
class BacktestHistoricalProvider(Protocol):
provider_id: str
pricing_mode: str
def load_history(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
raise NotImplementedError
def validate_provider_ref(self, provider_ref: ProviderRef) -> None:
raise NotImplementedError
def open_position(
self,
*,
symbol: str,
leg: TemplateLeg,
position_id: str,
quantity: float,
as_of_date: date,
spot: float,
trading_days: list[DailyClosePoint],
) -> HistoricalOptionPosition:
raise NotImplementedError
def mark_position(
self,
position: HistoricalOptionPosition,
*,
symbol: str,
as_of_date: date,
spot: float,
) -> HistoricalOptionMark:
raise NotImplementedError
class YFinanceHistoricalPriceSource:
@staticmethod
def _normalize_daily_close_row(*, row_date: object, close: object) -> DailyClosePoint | None:
@@ -121,6 +240,79 @@ class SyntheticHistoricalProvider:
return day.date
return target_date
def open_position(
self,
*,
symbol: str,
leg: TemplateLeg,
position_id: str,
quantity: float,
as_of_date: date,
spot: float,
trading_days: list[DailyClosePoint],
) -> HistoricalOptionPosition:
expiry = self.resolve_expiry(trading_days, as_of_date, leg.target_expiry_days)
strike = spot * leg.strike_rule.value
quote = self.price_option(
position_id=position_id,
leg=leg,
spot=spot,
strike=strike,
expiry=expiry,
quantity=quantity,
valuation_date=as_of_date,
)
return HistoricalOptionPosition(
position_id=position_id,
leg_id=leg.leg_id,
contract_key=f"{symbol}-{expiry.isoformat()}-{leg.option_type}-{strike:.4f}",
option_type=leg.option_type,
strike=strike,
expiry=expiry,
quantity=quantity,
entry_price=quote.mark,
current_mark=quote.mark,
last_mark_date=as_of_date,
source_snapshot_date=as_of_date,
)
def mark_position(
self,
position: HistoricalOptionPosition,
*,
symbol: str,
as_of_date: date,
spot: float,
) -> HistoricalOptionMark:
if as_of_date >= position.expiry:
intrinsic = self.intrinsic_value(option_type=position.option_type, spot=spot, strike=position.strike)
return HistoricalOptionMark(
contract_key=position.contract_key,
mark=0.0,
source="intrinsic_expiry",
is_active=False,
realized_cashflow=intrinsic * position.quantity,
)
quote = self.price_option_by_type(
position_id=position.position_id,
leg_id=position.leg_id,
option_type=position.option_type,
spot=spot,
strike=position.strike,
expiry=position.expiry,
quantity=position.quantity,
valuation_date=as_of_date,
)
position.current_mark = quote.mark
position.last_mark_date = as_of_date
return HistoricalOptionMark(
contract_key=position.contract_key,
mark=quote.mark,
source="synthetic_bs_mid",
is_active=True,
)
def price_option(
self,
*,
@@ -131,6 +323,29 @@ class SyntheticHistoricalProvider:
expiry: date,
quantity: float,
valuation_date: date,
) -> SyntheticOptionQuote:
return self.price_option_by_type(
position_id=position_id,
leg_id=leg.leg_id,
option_type=leg.option_type,
spot=spot,
strike=strike,
expiry=expiry,
quantity=quantity,
valuation_date=valuation_date,
)
def price_option_by_type(
self,
*,
position_id: str,
leg_id: str,
option_type: str,
spot: float,
strike: float,
expiry: date,
quantity: float,
valuation_date: date,
) -> SyntheticOptionQuote:
remaining_days = max(1, expiry.toordinal() - valuation_date.toordinal())
mark = black_scholes_price_and_greeks(
@@ -140,13 +355,13 @@ class SyntheticHistoricalProvider:
time_to_expiry=remaining_days / 365.0,
risk_free_rate=self.risk_free_rate,
volatility=self.implied_volatility,
option_type=leg.option_type,
option_type=option_type,
valuation_date=valuation_date,
)
).price
return SyntheticOptionQuote(
position_id=position_id,
leg_id=leg.leg_id,
leg_id=leg_id,
spot=spot,
strike=strike,
expiry=expiry,
@@ -161,3 +376,149 @@ class SyntheticHistoricalProvider:
if option_type == "call":
return max(spot - strike, 0.0)
raise ValueError(f"Unsupported option type: {option_type}")
class EmptyOptionSnapshotSource:
def load_option_chain(self, symbol: str, snapshot_date: date) -> list[DailyOptionSnapshot]:
return []
class DailyOptionsSnapshotProvider:
provider_id = "daily_snapshots_v1"
pricing_mode = "snapshot_mid"
def __init__(
self,
price_source: HistoricalPriceSource | None = None,
snapshot_source: OptionSnapshotSource | None = None,
) -> None:
self.price_source = price_source or YFinanceHistoricalPriceSource()
self.snapshot_source = snapshot_source or EmptyOptionSnapshotSource()
def load_history(self, symbol: str, start_date: date, end_date: date) -> list[DailyClosePoint]:
rows = self.price_source.load_daily_closes(symbol, start_date, end_date)
filtered = [row for row in rows if start_date <= row.date <= end_date]
return sorted(filtered, key=lambda row: row.date)
def validate_provider_ref(self, provider_ref: ProviderRef) -> None:
if provider_ref.provider_id != self.provider_id or provider_ref.pricing_mode != self.pricing_mode:
raise ValueError(
"Unsupported provider/pricing combination for historical snapshot engine: "
f"{provider_ref.provider_id}/{provider_ref.pricing_mode}"
)
def open_position(
self,
*,
symbol: str,
leg: TemplateLeg,
position_id: str,
quantity: float,
as_of_date: date,
spot: float,
trading_days: list[DailyClosePoint],
) -> HistoricalOptionPosition:
del trading_days # selection must use only the entry-day snapshot, not future state
selected_snapshot = self._select_entry_snapshot(symbol=symbol, leg=leg, as_of_date=as_of_date, spot=spot)
return HistoricalOptionPosition(
position_id=position_id,
leg_id=leg.leg_id,
contract_key=selected_snapshot.contract_key,
option_type=selected_snapshot.option_type,
strike=selected_snapshot.strike,
expiry=selected_snapshot.expiry,
quantity=quantity,
entry_price=selected_snapshot.mid,
current_mark=selected_snapshot.mid,
last_mark_date=as_of_date,
source_snapshot_date=as_of_date,
)
def mark_position(
self,
position: HistoricalOptionPosition,
*,
symbol: str,
as_of_date: date,
spot: float,
) -> HistoricalOptionMark:
if as_of_date >= position.expiry:
intrinsic = SyntheticHistoricalProvider.intrinsic_value(
option_type=position.option_type,
spot=spot,
strike=position.strike,
)
return HistoricalOptionMark(
contract_key=position.contract_key,
mark=0.0,
source="intrinsic_expiry",
is_active=False,
realized_cashflow=intrinsic * position.quantity,
)
exact_snapshot = next(
(
snapshot
for snapshot in self.snapshot_source.load_option_chain(symbol, as_of_date)
if snapshot.contract_key == position.contract_key
),
None,
)
if exact_snapshot is not None:
position.current_mark = exact_snapshot.mid
position.last_mark_date = as_of_date
return HistoricalOptionMark(
contract_key=position.contract_key,
mark=exact_snapshot.mid,
source="snapshot_mid",
is_active=True,
)
if position.current_mark < 0:
raise ValueError(f"Missing historical mark for {position.contract_key} on {as_of_date.isoformat()}")
return HistoricalOptionMark(
contract_key=position.contract_key,
mark=position.current_mark,
source="carry_forward",
is_active=True,
warning=(
f"Missing historical mark for {position.contract_key} on {as_of_date.isoformat()}; "
f"carrying forward prior mark from {position.last_mark_date.isoformat()}."
),
)
def _select_entry_snapshot(
self,
*,
symbol: str,
leg: TemplateLeg,
as_of_date: date,
spot: float,
) -> DailyOptionSnapshot:
target_expiry = date.fromordinal(as_of_date.toordinal() + leg.target_expiry_days)
target_strike = spot * leg.strike_rule.value
chain = [
snapshot
for snapshot in self.snapshot_source.load_option_chain(symbol, as_of_date)
if snapshot.symbol.strip().upper() == symbol.strip().upper() and snapshot.option_type == leg.option_type
]
eligible_expiries = [snapshot for snapshot in chain if snapshot.expiry >= target_expiry]
if not eligible_expiries:
raise ValueError(
f"No eligible historical option snapshots found for {symbol} on {as_of_date.isoformat()} "
f"at or beyond target expiry {target_expiry.isoformat()}"
)
selected_expiry = min(
eligible_expiries,
key=lambda snapshot: ((snapshot.expiry - target_expiry).days, snapshot.expiry),
).expiry
expiry_matches = [snapshot for snapshot in eligible_expiries if snapshot.expiry == selected_expiry]
return min(
expiry_matches, key=lambda snapshot: self._strike_sort_key(snapshot.strike, target_strike, leg.option_type)
)
@staticmethod
def _strike_sort_key(strike: float, target_strike: float, option_type: str) -> tuple[float, float]:
if option_type == "put":
return (abs(strike - target_strike), -strike)
return (abs(strike - target_strike), strike)

View File

@@ -5,7 +5,7 @@ from math import isclose
from app.backtesting.engine import SyntheticBacktestEngine
from app.models.backtest import BacktestRunResult, BacktestScenario
from app.models.strategy_template import StrategyTemplate
from app.services.backtesting.historical_provider import SyntheticHistoricalProvider
from app.services.backtesting.historical_provider import BacktestHistoricalProvider, SyntheticHistoricalProvider
from app.services.strategy_templates import StrategyTemplateService
@@ -15,7 +15,7 @@ class BacktestService:
def __init__(
self,
provider: SyntheticHistoricalProvider | None = None,
provider: BacktestHistoricalProvider | None = None,
template_service: StrategyTemplateService | None = None,
) -> None:
self.provider = provider or SyntheticHistoricalProvider()
@@ -59,15 +59,15 @@ class BacktestService:
return BacktestRunResult(scenario_id=scenario.scenario_id, template_results=tuple(template_results))
@staticmethod
def _validate_template_for_mvp(template: StrategyTemplate) -> None:
def _validate_template_for_mvp(self, template: StrategyTemplate) -> None:
provider_label = (
"historical snapshot engine" if self.provider.pricing_mode == "snapshot_mid" else "synthetic MVP engine"
)
if template.contract_mode != "continuous_units":
raise ValueError(f"Unsupported contract_mode for synthetic MVP engine: {template.contract_mode}")
raise ValueError(f"Unsupported contract_mode for {provider_label}: {template.contract_mode}")
if template.roll_policy.policy_type != "hold_to_expiry":
raise ValueError("Unsupported roll_policy for synthetic MVP engine: " f"{template.roll_policy.policy_type}")
raise ValueError(f"Unsupported roll_policy for {provider_label}: {template.roll_policy.policy_type}")
if template.entry_policy.entry_timing != "scenario_start_close":
raise ValueError(
"Unsupported entry_timing for synthetic MVP engine: " f"{template.entry_policy.entry_timing}"
)
raise ValueError(f"Unsupported entry_timing for {provider_label}: {template.entry_policy.entry_timing}")
if template.entry_policy.stagger_days is not None:
raise ValueError("Unsupported entry_policy configuration for synthetic MVP engine")
raise ValueError(f"Unsupported entry_policy configuration for {provider_label}")

View File

@@ -15,6 +15,8 @@ from app.models.backtest import (
)
from app.services.backtesting.historical_provider import (
DailyClosePoint,
HistoricalOptionMark,
HistoricalOptionPosition,
SyntheticHistoricalProvider,
SyntheticOptionQuote,
)
@@ -88,6 +90,12 @@ class FixtureBoundHistoricalProvider:
def price_option(self, **kwargs: object) -> SyntheticOptionQuote:
return self.base_provider.price_option(**kwargs)
def open_position(self, **kwargs: object) -> HistoricalOptionPosition:
return self.base_provider.open_position(**kwargs)
def mark_position(self, position: HistoricalOptionPosition, **kwargs: object) -> HistoricalOptionMark:
return self.base_provider.mark_position(position, **kwargs)
@staticmethod
def intrinsic_value(*, option_type: str, spot: float, strike: float) -> float:
return SyntheticHistoricalProvider.intrinsic_value(option_type=option_type, spot=spot, strike=strike)

View File

@@ -135,6 +135,18 @@ class EventComparisonPageService:
preset = self.event_preset_service.get_preset(preset_slug)
return tuple(preset.scenario_overrides.default_template_slugs)
def derive_entry_spot(self, *, preset_slug: str, template_slugs: tuple[str, ...]) -> float:
if not template_slugs:
raise ValueError("Select at least one strategy template.")
scenario = self.comparison_service.preview_scenario_from_inputs(
preset_slug=preset_slug,
template_slugs=template_slugs,
underlying_units=1.0,
loan_amount=0.0,
margin_call_ltv=0.75,
)
return float(scenario.initial_portfolio.entry_spot)
def preview_scenario(
self,
*,