Files
vault-dash/app/services/backtesting/jobs.py
Bu5hm4nn 6b8336ab7e feat: add Portfolio Value, Option Value, and Contracts columns to daily results
- Add option_contracts field to BacktestDailyPoint (number of contracts held)
- Update engine to calculate total option contracts from positions
- Update job serialization to include underlying_value, option_market_value, net_portfolio_value, option_contracts
- Update both render_result and render_job_result tables to show:
  - Low, High, Close (from previous commit)
  - Portfolio value (net_portfolio_value)
  - Option value (option_market_value)
  - Contracts (option_contracts)
  - LTV hedged
  - Margin call status
2026-04-05 08:54:38 +02:00

359 lines
13 KiB
Python

"""Async backtest job execution with progress tracking.
This module provides a non-blocking backtest execution system:
1. Jobs are submitted and run in background threads
2. Progress is tracked with stages (validating, fetching_prices, calculating, complete)
3. UI polls for status updates
4. Results are cached for retrieval
"""
from __future__ import annotations
import logging
import threading
import uuid
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import Enum
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from app.services.backtesting.ui_service import BacktestPageService
logger = logging.getLogger(__name__)
class JobStatus(str, Enum):
"""Status of a backtest job."""
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
class JobStage(str, Enum):
"""Execution stages with user-friendly labels."""
VALIDATING = "validating"
FETCHING_PRICES = "fetching_prices"
CALCULATING = "calculating"
COMPLETE = "complete"
FAILED = "failed"
@property
def label(self) -> str:
"""Human-readable stage label."""
labels = {
JobStage.VALIDATING: "Validating inputs...",
JobStage.FETCHING_PRICES: "Fetching historical prices...",
JobStage.CALCULATING: "Running backtest calculations...",
JobStage.COMPLETE: "Complete",
JobStage.FAILED: "Failed",
}
return labels.get(self, self.value)
@dataclass
class BacktestJob:
"""Represents a backtest job with progress tracking."""
job_id: str
status: JobStatus = JobStatus.PENDING
stage: JobStage = JobStage.VALIDATING
progress: int = 0 # 0-100
message: str = ""
result: dict[str, Any] | None = None
error: str | None = None
created_at: datetime = field(default_factory=datetime.utcnow)
completed_at: datetime | None = None
def to_dict(self) -> dict[str, Any]:
"""Serialize job for JSON response."""
return {
"job_id": self.job_id,
"status": self.status.value,
"stage": self.stage.value,
"stage_label": self.stage.label,
"progress": self.progress,
"message": self.message,
"has_result": self.result is not None,
"error": self.error,
"created_at": self.created_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
}
class BacktestJobStore:
"""In-memory store for backtest jobs.
Jobs are stored in a dict keyed by workspace_id -> job_id.
Each workspace only has one active job at a time (latest replaces previous).
"""
def __init__(self) -> None:
self._jobs: dict[str, BacktestJob] = {} # workspace_id -> job
self._results: dict[str, dict[str, Any]] = {} # job_id -> result
self._lock = threading.Lock()
def create_job(self, workspace_id: str) -> BacktestJob:
"""Create a new job for a workspace, replacing any existing one."""
job = BacktestJob(job_id=str(uuid.uuid4()))
with self._lock:
self._jobs[workspace_id] = job
logger.info(f"Created job {job.job_id} for workspace {workspace_id}")
return job
def get_job(self, workspace_id: str) -> BacktestJob | None:
"""Get the current job for a workspace."""
with self._lock:
return self._jobs.get(workspace_id)
def update_job(
self,
workspace_id: str,
*,
status: JobStatus | None = None,
stage: JobStage | None = None,
progress: int | None = None,
message: str | None = None,
result: dict[str, Any] | None = None,
error: str | None = None,
) -> None:
"""Update job state."""
with self._lock:
job = self._jobs.get(workspace_id)
if not job:
return
if status:
job.status = status
if stage:
job.stage = stage
if progress is not None:
job.progress = progress
if message is not None:
job.message = message
if result is not None:
job.result = result
self._results[job.job_id] = result
if error:
job.error = error
if stage == JobStage.COMPLETE or stage == JobStage.FAILED:
job.completed_at = datetime.utcnow()
def get_result(self, job_id: str) -> dict[str, Any] | None:
"""Get cached result by job ID."""
with self._lock:
return self._results.get(job_id)
def clear_job(self, workspace_id: str) -> None:
"""Remove job from store."""
with self._lock:
if workspace_id in self._jobs:
del self._jobs[workspace_id]
# Global job store singleton
job_store = BacktestJobStore()
def run_backtest_job(
workspace_id: str,
job: BacktestJob,
service: "BacktestPageService",
symbol: str,
start_date: date,
end_date: date,
template_slug: str,
underlying_units: float,
loan_amount: float,
margin_call_ltv: float,
data_source: str,
) -> None:
"""Execute backtest in background thread with progress updates.
This function runs in a background thread and updates the job state
as it progresses through stages.
"""
try:
# Stage 1: Validating
job_store.update_job(
workspace_id,
status=JobStatus.RUNNING,
stage=JobStage.VALIDATING,
progress=10,
message="Validating inputs...",
)
# Stage 2: Fetching prices
job_store.update_job(
workspace_id,
stage=JobStage.FETCHING_PRICES,
progress=30,
message=f"Fetching prices for {symbol}...",
)
# Run the backtest (this includes price fetching)
result = service.run_read_only_scenario(
symbol=symbol,
start_date=start_date,
end_date=end_date,
template_slug=template_slug,
underlying_units=underlying_units,
loan_amount=loan_amount,
margin_call_ltv=margin_call_ltv,
data_source=data_source,
)
# Stage 3: Calculating (already done by run_read_only_scenario)
job_store.update_job(
workspace_id,
stage=JobStage.CALCULATING,
progress=70,
message="Processing results...",
)
# Convert result to dict for serialization
# BacktestPageRunResult has: scenario, run_result, entry_spot, data_source, data_cost_usd
template_results = result.run_result.template_results
first_template = template_results[0] if template_results else None
summary = first_template.summary_metrics if first_template else None
result_dict = {
"scenario_id": result.scenario.scenario_id,
"scenario_name": result.scenario.display_name,
"symbol": result.scenario.symbol,
"start_date": result.scenario.start_date.isoformat(),
"end_date": result.scenario.end_date.isoformat(),
"entry_spot": result.entry_spot,
"underlying_units": result.scenario.initial_portfolio.underlying_units,
"loan_amount": result.scenario.initial_portfolio.loan_amount,
"margin_call_ltv": result.scenario.initial_portfolio.margin_call_ltv,
"data_source": result.data_source,
"data_cost_usd": result.data_cost_usd,
# Summary metrics from first template result
"start_value": summary.start_value if summary else 0.0,
"end_value_hedged_net": summary.end_value_hedged_net if summary else 0.0,
"total_hedge_cost": summary.total_hedge_cost if summary else 0.0,
"max_ltv_hedged": summary.max_ltv_hedged if summary else 0.0,
"max_ltv_unhedged": summary.max_ltv_unhedged if summary else 0.0,
"margin_call_days_hedged": summary.margin_call_days_hedged if summary else 0,
"margin_call_days_unhedged": summary.margin_call_days_unhedged if summary else 0,
"margin_threshold_breached_hedged": summary.margin_threshold_breached_hedged if summary else False,
"margin_threshold_breached_unhedged": summary.margin_threshold_breached_unhedged if summary else False,
# Template results with full daily path
"template_results": [
{
"template_slug": tr.template_slug,
"template_name": tr.template_name,
"summary_metrics": {
"start_value": tr.summary_metrics.start_value,
"end_value_hedged_net": tr.summary_metrics.end_value_hedged_net,
"total_hedge_cost": tr.summary_metrics.total_hedge_cost,
"max_ltv_hedged": tr.summary_metrics.max_ltv_hedged,
"max_ltv_unhedged": tr.summary_metrics.max_ltv_unhedged,
"margin_call_days_hedged": tr.summary_metrics.margin_call_days_hedged,
"margin_call_days_unhedged": tr.summary_metrics.margin_call_days_unhedged,
},
"daily_path": [
{
"date": dp.date.isoformat(),
"spot_close": dp.spot_close,
"spot_low": dp.spot_low if dp.spot_low is not None else dp.spot_close,
"spot_high": dp.spot_high if dp.spot_high is not None else dp.spot_close,
"underlying_value": dp.underlying_value,
"option_market_value": dp.option_market_value,
"net_portfolio_value": dp.net_portfolio_value,
"option_contracts": dp.option_contracts,
"ltv_hedged": dp.ltv_hedged,
"ltv_unhedged": dp.ltv_unhedged,
"margin_call_hedged": dp.margin_call_hedged,
"margin_call_unhedged": dp.margin_call_unhedged,
}
for dp in tr.daily_path
],
}
for tr in template_results
],
}
# Stage 4: Complete
job_store.update_job(
workspace_id,
status=JobStatus.COMPLETE,
stage=JobStage.COMPLETE,
progress=100,
message="Backtest complete!",
result=result_dict,
)
logger.info(f"Job {job.job_id} completed successfully")
except Exception as e:
logger.exception(f"Job {job.job_id} failed: {e}")
error_msg = str(e)
# Check for Databento API errors
if "data_start_before_available_start" in error_msg:
import re
match = re.search(r"available start of dataset [^(]+\('([^']+)'", error_msg)
if match:
available_start = match.group(1).split()[0]
error_msg = (
f"Data not available before {available_start}. Please set start date to {available_start} or later."
)
else:
error_msg = "Selected start date is before data is available for this dataset."
elif "BentoClientError" in error_msg or "422" in error_msg:
error_msg = f"Data source error: {error_msg}"
job_store.update_job(
workspace_id,
status=JobStatus.FAILED,
stage=JobStage.FAILED,
progress=0,
error=error_msg,
)
def start_backtest_job(
workspace_id: str,
service: "BacktestPageService",
symbol: str,
start_date: date,
end_date: date,
template_slug: str,
underlying_units: float,
loan_amount: float,
margin_call_ltv: float,
data_source: str,
) -> BacktestJob:
"""Start a backtest job in a background thread.
Returns immediately with the job ID. The job runs in the background
and can be polled for status.
"""
job = job_store.create_job(workspace_id)
thread = threading.Thread(
target=run_backtest_job,
kwargs={
"workspace_id": workspace_id,
"job": job,
"service": service,
"symbol": symbol,
"start_date": start_date,
"end_date": end_date,
"template_slug": template_slug,
"underlying_units": underlying_units,
"loan_amount": loan_amount,
"margin_call_ltv": margin_call_ltv,
"data_source": data_source,
},
daemon=True,
)
thread.start()
return job