diff --git a/app/pages/backtests.py b/app/pages/backtests.py index 11e5b85..3e8828a 100644 --- a/app/pages/backtests.py +++ b/app/pages/backtests.py @@ -2,7 +2,7 @@ from __future__ import annotations import logging from datetime import date, datetime, timedelta -from typing import Any +from typing import TYPE_CHECKING, Any from fastapi.responses import RedirectResponse from nicegui import run, ui @@ -14,8 +14,16 @@ from app.models.backtest_settings_repository import BacktestSettingsRepository from app.models.workspace import get_workspace_repository from app.pages.common import dashboard_page, split_page_panes from app.services.backtesting.databento_source import DatabentoHistoricalPriceSource +from app.services.backtesting.jobs import ( + JobStatus, + job_store, + start_backtest_job, +) from app.services.backtesting.ui_service import BacktestPageRunResult, BacktestPageService +if TYPE_CHECKING: + pass # for forward references + logger = logging.getLogger(__name__) # Dataset and schema options for Databento @@ -54,9 +62,10 @@ DATABENTO_DATASET_MIN_DATES = { "GLBX.MDP3": date(2010, 1, 1), # GLBX.MDP3 futures data from 2010 } + def get_default_backtest_dates() -> tuple[date, date]: """Get default backtest date range (~2 years ending on most recent Friday or earlier). - + Returns dates (start, end) where: - end is the most recent Friday (including today if today is Friday) - start is ~730 days before end @@ -80,7 +89,7 @@ def validate_date_range_for_symbol(start_date: date, end_date: date, symbol: str """Validate date range is within available data for symbol. Returns error message if invalid, None if valid. - + Validation order: 1. Logical order (start <= end) 2. End not in future @@ -260,19 +269,9 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: default_symbol = "GLD" default_start_price = 0.0 - # Derive entry spot from default date range - # Fall back to a reasonable default if data source doesn't support the date range - try: - default_entry_spot = service.derive_entry_spot( - "GLD", - date.fromisoformat(default_start_date), - date.fromisoformat(default_end_date), - data_source="databento", - ) - except Exception: - # Data source may not support the default date range or API error - # Fall back to a reasonable GLD price (recent ~$230/oz) - default_entry_spot = 230.0 + # Use a reasonable default GLD price for initial render (will be derived async) + # This prevents blocking page load with Databento API call + default_entry_spot = 230.0 # Approximate GLD price default_units = ( asset_quantity_from_workspace_config(config, entry_spot=default_entry_spot, symbol="GLD") if config is not None and default_entry_spot > 0 @@ -428,6 +427,7 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: ): entry_spot_hint = ui.label("").classes("text-sm text-slate-500 dark:text-slate-400") validation_label = ui.label("").classes("text-sm text-rose-600 dark:text-rose-300") + progress_label = ui.label("").classes("text-sm text-sky-600 dark:text-sky-300") run_button = ( ui.button("Run backtest").props("color=primary data-testid=run-backtest-button").classes("mt-2") ) @@ -456,23 +456,19 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: """Update the date range hint based on selected symbol and data source.""" symbol = get_symbol_from_dataset() data_source = str(data_source_select.value) - + # Use dataset-specific minimum for Databento if data_source == "databento": dataset = str(dataset_select.value) min_date = DATABENTO_DATASET_MIN_DATES.get(dataset) if min_date: - date_range_hint.set_text( - f"{dataset} data available from {min_date.strftime('%Y-%m-%d')}" - ) + date_range_hint.set_text(f"{dataset} data available from {min_date.strftime('%Y-%m-%d')}") return - + # Fall back to symbol minimum min_date = SYMBOL_MIN_DATES.get(symbol) if min_date: - date_range_hint.set_text( - f"{symbol} data available from {min_date.strftime('%Y-%m-%d')}" - ) + date_range_hint.set_text(f"{symbol} data available from {min_date.strftime('%Y-%m-%d')}") else: date_range_hint.set_text(f"{symbol} data availability unknown") @@ -782,25 +778,31 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: logger.warning(f"Failed to save backtest settings for workspace {workspace_id}: {e}") return None - def refresh_workspace_seeded_units() -> None: + async def refresh_workspace_seeded_units() -> None: + """Derive entry spot asynchronously and update units when dates change.""" validation_label.set_text("") - entry_spot, entry_error = derive_entry_spot() - if workspace_id and config is not None and config.gold_value is not None and entry_spot is not None: - units_input.value = asset_quantity_from_workspace_config( - config, entry_spot=entry_spot, symbol=get_symbol_from_dataset() - ) - update_cost_estimate() - render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error) - if entry_error: - validation_label.set_text(entry_error) - render_result_state("Scenario validation failed", entry_error, tone="warning") - return - validation_error = validate_current_scenario(entry_spot=entry_spot) - if validation_error: - validation_label.set_text(validation_error) - render_result_state("Scenario validation failed", validation_error, tone="warning") - else: - mark_results_stale() + progress_label.set_text("Fetching entry spot...") + + try: + # Run derive_entry_spot in background thread + entry_spot, entry_error = await run.io_bound(derive_entry_spot) + + if workspace_id and config is not None and config.gold_value is not None and entry_spot is not None: + units_input.value = asset_quantity_from_workspace_config( + config, entry_spot=entry_spot, symbol=get_symbol_from_dataset() + ) + update_cost_estimate() + render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error) + if entry_error: + validation_label.set_text(entry_error) + render_result_state("Scenario validation failed", entry_error, tone="warning") + return + validation_error = validate_current_scenario(entry_spot=entry_spot) + if validation_error: + validation_label.set_text(validation_error) + render_result_state("Scenario validation failed", validation_error, tone="warning") + finally: + progress_label.set_text("") def on_form_change() -> None: """Handle form changes with minimal API calls.""" @@ -811,21 +813,18 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: # Keep existing entry spot, don't re-derive mark_results_stale() - async def run_backtest() -> None: - """Run the backtest asynchronously to avoid blocking the WebSocket.""" + def start_backtest() -> None: + """Submit backtest job and start polling for results.""" validation_label.set_text("") - - # Show loading state - run_button.props('loading') - validation_label.set_text("Running backtest...") - ui.notify("Running backtest...", type="info") - + run_button.props("loading") + progress_label.set_text("Validating inputs...") + try: # Validate date range for symbol start_date = parse_iso_date(start_input.value, "Start date") end_date = parse_iso_date(end_input.value, "End date") symbol = get_symbol_from_dataset() - + # Validate dataset-specific minimum dates for Databento data_source = str(data_source_select.value) if data_source == "databento": @@ -837,14 +836,16 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: f"Selected start date {start_date.strftime('%Y-%m-%d')} is before available data." ) render_result_state("Invalid start date", validation_label.text, tone="warning") - run_button.props(remove='loading') + run_button.props(remove="loading") + progress_label.set_text("") return - + date_range_error = validate_date_range_for_symbol(start_date, end_date, symbol) if date_range_error: validation_label.set_text(date_range_error) render_result_state("Scenario validation failed", date_range_error, tone="warning") - run_button.props(remove='loading') + run_button.props(remove="loading") + progress_label.set_text("") return # Validate numeric inputs @@ -855,15 +856,17 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: if numeric_error: validation_label.set_text(numeric_error) render_result_state("Input validation failed", numeric_error, tone="warning") - run_button.props(remove='loading') + run_button.props(remove="loading") + progress_label.set_text("") return # Save settings before running save_backtest_settings() - # Run backtest in background thread to avoid blocking WebSocket - result = await run.io_bound( - service.run_read_only_scenario, + # Start background job + job = start_backtest_job( + workspace_id=workspace_id, + service=service, symbol=symbol, start_date=start_date, end_date=end_date, @@ -873,65 +876,101 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: margin_call_ltv=ltv, data_source=str(data_source_select.value), ) - # Update cost in saved settings after successful run - if str(data_source_select.value) == "databento": - update_cost_estimate() - - render_result(result) - run_button.props(remove='loading') - validation_label.set_text("") - ui.notify("Backtest completed!", type="positive") - + + # Start polling for job status + ui.timer(1.0, lambda: poll_job_status(job.job_id), once=True) + except (ValueError, KeyError) as exc: - run_button.props(remove='loading') - entry_spot, entry_error = derive_entry_spot() - render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error) - if entry_spot is None: - entry_spot_hint.set_text("Entry spot unavailable until the scenario dates are valid.") + run_button.props(remove="loading") + progress_label.set_text("") validation_label.set_text(str(exc)) - render_result_state("Scenario validation failed", str(exc), tone="warning") - - except Exception as exc: - run_button.props(remove='loading') - entry_spot, entry_error = derive_entry_spot() - render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error) - if entry_spot is None: - entry_spot_hint.set_text("Entry spot unavailable until the scenario dates are valid.") - - # Check for Databento API errors - error_msg = str(exc) - if "data_start_before_available_start" in error_msg: - # Extract the available start date from the error message - import re - match = re.search(r"available start of dataset [^(]+\('([^']+)'\)", error_msg) - if match: - available_start = match.group(1).split()[0] # Extract date part - validation_label.set_text( - f"Data not available before {available_start}. Please set start date to {available_start} or later." - ) - else: - validation_label.set_text( - "Selected start date is before data is available for this dataset. Please choose a later date." - ) - render_result_state("Invalid start date", validation_label.text, tone="warning") - elif "BentoClientError" in error_msg or "422" in error_msg: - validation_label.set_text(f"Data source error: {error_msg}") - render_result_state("Data unavailable", validation_label.text, tone="warning") - else: - message = "Backtest failed. Please verify the scenario inputs and try again." - logger.exception( - "Backtest page run failed for workspace=%s symbol=%s start=%s end=%s template=%s units=%s loan=%s margin_call_ltv=%s", - workspace_id, - symbol_select.value, - start_input.value, - end_input.value, - template_select.value, - units_input.value, - loan_input.value, - ltv_input.value, + render_result_state("Validation failed", str(exc), tone="warning") + except Exception: + run_button.props(remove="loading") + progress_label.set_text("") + logger.exception("Failed to start backtest job") + validation_label.set_text("Failed to start backtest. Please try again.") + render_result_state("Error", "Failed to start backtest. Please try again.", tone="error") + + def poll_job_status(job_id: str) -> None: + """Poll for job status and update UI.""" + job = job_store.get_job(workspace_id) + if not job or job.job_id != job_id: + run_button.props(remove="loading") + progress_label.set_text("") + render_result_state("Error", "Job not found", tone="error") + return + + # Update progress display + progress_label.set_text(job.stage.label) + + if job.status == JobStatus.COMPLETE: + # Job complete, render results + run_button.props(remove="loading") + progress_label.set_text("") + if job.result: + render_job_result(job.result) + ui.notify("Backtest completed!", type="positive") + elif job.status == JobStatus.FAILED: + # Job failed, show error + run_button.props(remove="loading") + progress_label.set_text("") + error_msg = job.error or "Unknown error occurred" + validation_label.set_text(error_msg) + render_result_state("Backtest failed", error_msg, tone="error") + else: + # Still running, poll again in 1 second + ui.timer(1.0, lambda: poll_job_status(job_id), once=True) + + def render_job_result(result: dict[str, Any]) -> None: + """Render backtest result from job store.""" + # Update entry spot hint + configured_start_price = float(start_price_input.value or 0.0) + if configured_start_price > 0: + entry_spot_hint.set_text(f"Using configured start price: ${configured_start_price:,.2f}") + else: + entry_spot_hint.set_text(f"Auto-derived entry spot: ${result.get('entry_spot', 0):,.2f}") + + if result.get("entry_spot"): + render_seeded_summary(entry_spot=result["entry_spot"]) + + result_panel.clear() + with result_panel: + with ui.card().classes( + "w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900" + ): + ui.label("Scenario Results").classes("text-lg font-semibold text-slate-900 dark:text-slate-100") + ui.label(f"Template: {result.get('scenario_name', 'Unknown')}").classes( + "text-sm text-slate-500 dark:text-slate-400" ) - validation_label.set_text(message) - render_result_state("Backtest failed", message, tone="error") + + # Summary metrics + summary_data = [ + ("Start value", f"${result.get('total_pnl', 0):,.0f}"), + ("End value hedged", f"${result.get('total_pnl_pct', 0)*100:.1f}%"), + ("Hedge cost", f"${result.get('hedging_cost', 0):,.0f}"), + ("Hedge cost %", f"{result.get('hedging_cost_pct', 0)*100:.2f}%"), + ] + with ui.grid(columns=4).classes("w-full gap-4 max-lg:grid-cols-2 max-sm:grid-cols-1"): + for label, value in summary_data: + with ui.card().classes( + "rounded-xl border border-slate-200 bg-slate-50 p-4 shadow-none dark:border-slate-800 dark:bg-slate-950" + ): + ui.label(label).classes("text-sm text-slate-500 dark:text-slate-400") + ui.label(value).classes("text-2xl font-bold text-slate-900 dark:text-slate-100") + + # Margin call info + if result.get("margin_call_events"): + with ui.card().classes( + "w-full mt-4 rounded-xl border border-amber-200 bg-amber-50 p-4 dark:border-amber-900/60 dark:bg-amber-950/30" + ): + ui.label(f"Margin calls: {len(result['margin_call_events'])}").classes( + "text-amber-800 dark:text-amber-200" + ) + + # Update cost estimate for Databento + if str(data_source_select.value) == "databento": + update_cost_estimate() # Wire up event handlers # Only call expensive derive_entry_spot on date changes @@ -947,7 +986,7 @@ def _render_backtests_page(workspace_id: str | None = None) -> None: units_input.on_value_change(lambda e: mark_results_stale()) loan_input.on_value_change(lambda e: mark_results_stale()) ltv_input.on_value_change(lambda e: mark_results_stale()) - run_button.on_click(lambda: run_backtest()) + run_button.on_click(lambda: start_backtest()) # Initial render render_seeded_summary(entry_spot=float(default_start_price) if default_start_price > 0 else None) diff --git a/app/services/backtesting/jobs.py b/app/services/backtesting/jobs.py new file mode 100644 index 0000000..12e223a --- /dev/null +++ b/app/services/backtesting/jobs.py @@ -0,0 +1,321 @@ +"""Async backtest job execution with progress tracking. + +This module provides a non-blocking backtest execution system: +1. Jobs are submitted and run in background threads +2. Progress is tracked with stages (validating, fetching_prices, calculating, complete) +3. UI polls for status updates +4. Results are cached for retrieval +""" + +from __future__ import annotations + +import logging +import threading +import uuid +from dataclasses import dataclass, field +from datetime import date, datetime +from enum import Enum +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from app.services.backtesting.ui_service import BacktestPageService + +logger = logging.getLogger(__name__) + + +class JobStatus(str, Enum): + """Status of a backtest job.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETE = "complete" + FAILED = "failed" + + +class JobStage(str, Enum): + """Execution stages with user-friendly labels.""" + + VALIDATING = "validating" + FETCHING_PRICES = "fetching_prices" + CALCULATING = "calculating" + COMPLETE = "complete" + FAILED = "failed" + + @property + def label(self) -> str: + """Human-readable stage label.""" + labels = { + JobStage.VALIDATING: "Validating inputs...", + JobStage.FETCHING_PRICES: "Fetching historical prices...", + JobStage.CALCULATING: "Running backtest calculations...", + JobStage.COMPLETE: "Complete", + JobStage.FAILED: "Failed", + } + return labels.get(self, self.value) + + +@dataclass +class BacktestJob: + """Represents a backtest job with progress tracking.""" + + job_id: str + status: JobStatus = JobStatus.PENDING + stage: JobStage = JobStage.VALIDATING + progress: int = 0 # 0-100 + message: str = "" + result: dict[str, Any] | None = None + error: str | None = None + created_at: datetime = field(default_factory=datetime.utcnow) + completed_at: datetime | None = None + + def to_dict(self) -> dict[str, Any]: + """Serialize job for JSON response.""" + return { + "job_id": self.job_id, + "status": self.status.value, + "stage": self.stage.value, + "stage_label": self.stage.label, + "progress": self.progress, + "message": self.message, + "has_result": self.result is not None, + "error": self.error, + "created_at": self.created_at.isoformat(), + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + } + + +class BacktestJobStore: + """In-memory store for backtest jobs. + + Jobs are stored in a dict keyed by workspace_id -> job_id. + Each workspace only has one active job at a time (latest replaces previous). + """ + + def __init__(self) -> None: + self._jobs: dict[str, BacktestJob] = {} # workspace_id -> job + self._results: dict[str, dict[str, Any]] = {} # job_id -> result + self._lock = threading.Lock() + + def create_job(self, workspace_id: str) -> BacktestJob: + """Create a new job for a workspace, replacing any existing one.""" + job = BacktestJob(job_id=str(uuid.uuid4())) + with self._lock: + self._jobs[workspace_id] = job + logger.info(f"Created job {job.job_id} for workspace {workspace_id}") + return job + + def get_job(self, workspace_id: str) -> BacktestJob | None: + """Get the current job for a workspace.""" + with self._lock: + return self._jobs.get(workspace_id) + + def update_job( + self, + workspace_id: str, + *, + status: JobStatus | None = None, + stage: JobStage | None = None, + progress: int | None = None, + message: str | None = None, + result: dict[str, Any] | None = None, + error: str | None = None, + ) -> None: + """Update job state.""" + with self._lock: + job = self._jobs.get(workspace_id) + if not job: + return + if status: + job.status = status + if stage: + job.stage = stage + if progress is not None: + job.progress = progress + if message is not None: + job.message = message + if result is not None: + job.result = result + self._results[job.job_id] = result + if error: + job.error = error + if stage == JobStage.COMPLETE or stage == JobStage.FAILED: + job.completed_at = datetime.utcnow() + + def get_result(self, job_id: str) -> dict[str, Any] | None: + """Get cached result by job ID.""" + with self._lock: + return self._results.get(job_id) + + def clear_job(self, workspace_id: str) -> None: + """Remove job from store.""" + with self._lock: + if workspace_id in self._jobs: + del self._jobs[workspace_id] + + +# Global job store singleton +job_store = BacktestJobStore() + + +def run_backtest_job( + workspace_id: str, + job: BacktestJob, + service: "BacktestPageService", + symbol: str, + start_date: date, + end_date: date, + template_slug: str, + underlying_units: float, + loan_amount: float, + margin_call_ltv: float, + data_source: str, +) -> None: + """Execute backtest in background thread with progress updates. + + This function runs in a background thread and updates the job state + as it progresses through stages. + """ + try: + # Stage 1: Validating + job_store.update_job( + workspace_id, + status=JobStatus.RUNNING, + stage=JobStage.VALIDATING, + progress=10, + message="Validating inputs...", + ) + + # Stage 2: Fetching prices + job_store.update_job( + workspace_id, + stage=JobStage.FETCHING_PRICES, + progress=30, + message=f"Fetching prices for {symbol}...", + ) + + # Run the backtest (this includes price fetching) + result = service.run_read_only_scenario( + symbol=symbol, + start_date=start_date, + end_date=end_date, + template_slug=template_slug, + underlying_units=underlying_units, + loan_amount=loan_amount, + margin_call_ltv=margin_call_ltv, + data_source=data_source, + ) + + # Stage 3: Calculating (already done by run_read_only_scenario) + job_store.update_job( + workspace_id, + stage=JobStage.CALCULATING, + progress=70, + message="Processing results...", + ) + + # Convert result to dict for serialization + result_dict = { + "scenario_name": result.scenario_name, + "entry_date": result.entry_date.isoformat() if result.entry_date else None, + "entry_spot": result.entry_spot, + "underlying_units": result.underlying_units, + "loan_amount": result.loan_amount, + "margin_call_ltv": result.margin_call_ltv, + "total_pnl": result.total_pnl, + "total_pnl_pct": result.total_pnl_pct, + "hedging_cost": result.hedging_cost, + "hedging_cost_pct": result.hedging_cost_pct, + "unhedged_pnl": result.unhedged_pnl, + "unhedged_pnl_pct": result.unhedged_pnl_pct, + "margin_calls": result.margin_calls, + "margin_call_events": [ + { + "date": event.date.isoformat(), + "price": event.price, + "ltv": event.ltv, + "action": event.action, + } + for event in (result.margin_call_events or []) + ], + "prices": [{"date": p.date.isoformat(), "close": p.close} for p in (result.prices or [])], + } + + # Stage 4: Complete + job_store.update_job( + workspace_id, + status=JobStatus.COMPLETE, + stage=JobStage.COMPLETE, + progress=100, + message="Backtest complete!", + result=result_dict, + ) + + logger.info(f"Job {job.job_id} completed successfully") + + except Exception as e: + logger.exception(f"Job {job.job_id} failed: {e}") + error_msg = str(e) + + # Check for Databento API errors + if "data_start_before_available_start" in error_msg: + import re + + match = re.search(r"available start of dataset [^(]+\('([^']+)'", error_msg) + if match: + available_start = match.group(1).split()[0] + error_msg = ( + f"Data not available before {available_start}. Please set start date to {available_start} or later." + ) + else: + error_msg = "Selected start date is before data is available for this dataset." + elif "BentoClientError" in error_msg or "422" in error_msg: + error_msg = f"Data source error: {error_msg}" + + job_store.update_job( + workspace_id, + status=JobStatus.FAILED, + stage=JobStage.FAILED, + progress=0, + error=error_msg, + ) + + +def start_backtest_job( + workspace_id: str, + service: "BacktestPageService", + symbol: str, + start_date: date, + end_date: date, + template_slug: str, + underlying_units: float, + loan_amount: float, + margin_call_ltv: float, + data_source: str, +) -> BacktestJob: + """Start a backtest job in a background thread. + + Returns immediately with the job ID. The job runs in the background + and can be polled for status. + """ + job = job_store.create_job(workspace_id) + + thread = threading.Thread( + target=run_backtest_job, + kwargs={ + "workspace_id": workspace_id, + "job": job, + "service": service, + "symbol": symbol, + "start_date": start_date, + "end_date": end_date, + "template_slug": template_slug, + "underlying_units": underlying_units, + "loan_amount": loan_amount, + "margin_call_ltv": margin_call_ltv, + "data_source": data_source, + }, + daemon=True, + ) + thread.start() + + return job