"""Async backtest job execution with progress tracking. This module provides a non-blocking backtest execution system: 1. Jobs are submitted and run in background threads 2. Progress is tracked with stages (validating, fetching_prices, calculating, complete) 3. UI polls for status updates 4. Results are cached for retrieval """ from __future__ import annotations import logging import threading import uuid from dataclasses import dataclass, field from datetime import date, datetime from enum import Enum from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from app.services.backtesting.ui_service import BacktestPageService logger = logging.getLogger(__name__) class JobStatus(str, Enum): """Status of a backtest job.""" PENDING = "pending" RUNNING = "running" COMPLETE = "complete" FAILED = "failed" class JobStage(str, Enum): """Execution stages with user-friendly labels.""" VALIDATING = "validating" FETCHING_PRICES = "fetching_prices" CALCULATING = "calculating" COMPLETE = "complete" FAILED = "failed" @property def label(self) -> str: """Human-readable stage label.""" labels = { JobStage.VALIDATING: "Validating inputs...", JobStage.FETCHING_PRICES: "Fetching historical prices...", JobStage.CALCULATING: "Running backtest calculations...", JobStage.COMPLETE: "Complete", JobStage.FAILED: "Failed", } return labels.get(self, self.value) @dataclass class BacktestJob: """Represents a backtest job with progress tracking.""" job_id: str status: JobStatus = JobStatus.PENDING stage: JobStage = JobStage.VALIDATING progress: int = 0 # 0-100 message: str = "" result: dict[str, Any] | None = None error: str | None = None created_at: datetime = field(default_factory=datetime.utcnow) completed_at: datetime | None = None def to_dict(self) -> dict[str, Any]: """Serialize job for JSON response.""" return { "job_id": self.job_id, "status": self.status.value, "stage": self.stage.value, "stage_label": self.stage.label, "progress": self.progress, "message": self.message, "has_result": self.result is not None, "error": self.error, "created_at": self.created_at.isoformat(), "completed_at": self.completed_at.isoformat() if self.completed_at else None, } class BacktestJobStore: """In-memory store for backtest jobs. Jobs are stored in a dict keyed by workspace_id -> job_id. Each workspace only has one active job at a time (latest replaces previous). """ def __init__(self) -> None: self._jobs: dict[str, BacktestJob] = {} # workspace_id -> job self._results: dict[str, dict[str, Any]] = {} # job_id -> result self._lock = threading.Lock() def create_job(self, workspace_id: str) -> BacktestJob: """Create a new job for a workspace, replacing any existing one.""" job = BacktestJob(job_id=str(uuid.uuid4())) with self._lock: self._jobs[workspace_id] = job logger.info(f"Created job {job.job_id} for workspace {workspace_id}") return job def get_job(self, workspace_id: str) -> BacktestJob | None: """Get the current job for a workspace.""" with self._lock: return self._jobs.get(workspace_id) def update_job( self, workspace_id: str, *, status: JobStatus | None = None, stage: JobStage | None = None, progress: int | None = None, message: str | None = None, result: dict[str, Any] | None = None, error: str | None = None, ) -> None: """Update job state.""" with self._lock: job = self._jobs.get(workspace_id) if not job: return if status: job.status = status if stage: job.stage = stage if progress is not None: job.progress = progress if message is not None: job.message = message if result is not None: job.result = result self._results[job.job_id] = result if error: job.error = error if stage == JobStage.COMPLETE or stage == JobStage.FAILED: job.completed_at = datetime.utcnow() def get_result(self, job_id: str) -> dict[str, Any] | None: """Get cached result by job ID.""" with self._lock: return self._results.get(job_id) def clear_job(self, workspace_id: str) -> None: """Remove job from store.""" with self._lock: if workspace_id in self._jobs: del self._jobs[workspace_id] # Global job store singleton job_store = BacktestJobStore() def run_backtest_job( workspace_id: str, job: BacktestJob, service: "BacktestPageService", symbol: str, start_date: date, end_date: date, template_slug: str, underlying_units: float, loan_amount: float, margin_call_ltv: float, data_source: str, ) -> None: """Execute backtest in background thread with progress updates. This function runs in a background thread and updates the job state as it progresses through stages. """ try: # Stage 1: Validating job_store.update_job( workspace_id, status=JobStatus.RUNNING, stage=JobStage.VALIDATING, progress=10, message="Validating inputs...", ) # Stage 2: Fetching prices job_store.update_job( workspace_id, stage=JobStage.FETCHING_PRICES, progress=30, message=f"Fetching prices for {symbol}...", ) # Run the backtest (this includes price fetching) result = service.run_read_only_scenario( symbol=symbol, start_date=start_date, end_date=end_date, template_slug=template_slug, underlying_units=underlying_units, loan_amount=loan_amount, margin_call_ltv=margin_call_ltv, data_source=data_source, ) # Stage 3: Calculating (already done by run_read_only_scenario) job_store.update_job( workspace_id, stage=JobStage.CALCULATING, progress=70, message="Processing results...", ) # Convert result to dict for serialization result_dict = { "scenario_name": result.scenario_name, "entry_date": result.entry_date.isoformat() if result.entry_date else None, "entry_spot": result.entry_spot, "underlying_units": result.underlying_units, "loan_amount": result.loan_amount, "margin_call_ltv": result.margin_call_ltv, "total_pnl": result.total_pnl, "total_pnl_pct": result.total_pnl_pct, "hedging_cost": result.hedging_cost, "hedging_cost_pct": result.hedging_cost_pct, "unhedged_pnl": result.unhedged_pnl, "unhedged_pnl_pct": result.unhedged_pnl_pct, "margin_calls": result.margin_calls, "margin_call_events": [ { "date": event.date.isoformat(), "price": event.price, "ltv": event.ltv, "action": event.action, } for event in (result.margin_call_events or []) ], "prices": [{"date": p.date.isoformat(), "close": p.close} for p in (result.prices or [])], } # Stage 4: Complete job_store.update_job( workspace_id, status=JobStatus.COMPLETE, stage=JobStage.COMPLETE, progress=100, message="Backtest complete!", result=result_dict, ) logger.info(f"Job {job.job_id} completed successfully") except Exception as e: logger.exception(f"Job {job.job_id} failed: {e}") error_msg = str(e) # Check for Databento API errors if "data_start_before_available_start" in error_msg: import re match = re.search(r"available start of dataset [^(]+\('([^']+)'", error_msg) if match: available_start = match.group(1).split()[0] error_msg = ( f"Data not available before {available_start}. Please set start date to {available_start} or later." ) else: error_msg = "Selected start date is before data is available for this dataset." elif "BentoClientError" in error_msg or "422" in error_msg: error_msg = f"Data source error: {error_msg}" job_store.update_job( workspace_id, status=JobStatus.FAILED, stage=JobStage.FAILED, progress=0, error=error_msg, ) def start_backtest_job( workspace_id: str, service: "BacktestPageService", symbol: str, start_date: date, end_date: date, template_slug: str, underlying_units: float, loan_amount: float, margin_call_ltv: float, data_source: str, ) -> BacktestJob: """Start a backtest job in a background thread. Returns immediately with the job ID. The job runs in the background and can be polled for status. """ job = job_store.create_job(workspace_id) thread = threading.Thread( target=run_backtest_job, kwargs={ "workspace_id": workspace_id, "job": job, "service": service, "symbol": symbol, "start_date": start_date, "end_date": end_date, "template_slug": template_slug, "underlying_units": underlying_units, "loan_amount": loan_amount, "margin_call_ltv": margin_call_ltv, "data_source": data_source, }, daemon=True, ) thread.start() return job