feat(backtest): async job queue for non-blocking backtest execution

BREAKING CHANGE: Complete redesign of backtest execution

- Add BacktestJob system with progress stages (validating, fetching_prices, calculating)
- Run backtests in background threads, UI polls for status
- Show progress label with current stage during execution
- Remove synchronous Databento API calls from page load
- Use static default entry spot for initial render (defers API call)
- Make refresh_workspace_seeded_units async with run.io_bound

This fixes:
- 'Connection lost' WebSocket timeout errors
- Slow page load (30s initial load)
- Backtest never completing

The job system provides:
- Non-blocking execution
- Progress tracking with stages
- Error handling with user-friendly messages
- Result caching for retrieval after completion
This commit is contained in:
Bu5hm4nn
2026-04-01 09:31:53 +02:00
parent 66d6eb3df2
commit 27ade507cd
2 changed files with 475 additions and 115 deletions

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import logging import logging
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any from typing import TYPE_CHECKING, Any
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from nicegui import run, ui from nicegui import run, ui
@@ -14,8 +14,16 @@ from app.models.backtest_settings_repository import BacktestSettingsRepository
from app.models.workspace import get_workspace_repository from app.models.workspace import get_workspace_repository
from app.pages.common import dashboard_page, split_page_panes from app.pages.common import dashboard_page, split_page_panes
from app.services.backtesting.databento_source import DatabentoHistoricalPriceSource from app.services.backtesting.databento_source import DatabentoHistoricalPriceSource
from app.services.backtesting.jobs import (
JobStatus,
job_store,
start_backtest_job,
)
from app.services.backtesting.ui_service import BacktestPageRunResult, BacktestPageService from app.services.backtesting.ui_service import BacktestPageRunResult, BacktestPageService
if TYPE_CHECKING:
pass # for forward references
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Dataset and schema options for Databento # Dataset and schema options for Databento
@@ -54,9 +62,10 @@ DATABENTO_DATASET_MIN_DATES = {
"GLBX.MDP3": date(2010, 1, 1), # GLBX.MDP3 futures data from 2010 "GLBX.MDP3": date(2010, 1, 1), # GLBX.MDP3 futures data from 2010
} }
def get_default_backtest_dates() -> tuple[date, date]: def get_default_backtest_dates() -> tuple[date, date]:
"""Get default backtest date range (~2 years ending on most recent Friday or earlier). """Get default backtest date range (~2 years ending on most recent Friday or earlier).
Returns dates (start, end) where: Returns dates (start, end) where:
- end is the most recent Friday (including today if today is Friday) - end is the most recent Friday (including today if today is Friday)
- start is ~730 days before end - start is ~730 days before end
@@ -80,7 +89,7 @@ def validate_date_range_for_symbol(start_date: date, end_date: date, symbol: str
"""Validate date range is within available data for symbol. """Validate date range is within available data for symbol.
Returns error message if invalid, None if valid. Returns error message if invalid, None if valid.
Validation order: Validation order:
1. Logical order (start <= end) 1. Logical order (start <= end)
2. End not in future 2. End not in future
@@ -260,19 +269,9 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
default_symbol = "GLD" default_symbol = "GLD"
default_start_price = 0.0 default_start_price = 0.0
# Derive entry spot from default date range # Use a reasonable default GLD price for initial render (will be derived async)
# Fall back to a reasonable default if data source doesn't support the date range # This prevents blocking page load with Databento API call
try: default_entry_spot = 230.0 # Approximate GLD price
default_entry_spot = service.derive_entry_spot(
"GLD",
date.fromisoformat(default_start_date),
date.fromisoformat(default_end_date),
data_source="databento",
)
except Exception:
# Data source may not support the default date range or API error
# Fall back to a reasonable GLD price (recent ~$230/oz)
default_entry_spot = 230.0
default_units = ( default_units = (
asset_quantity_from_workspace_config(config, entry_spot=default_entry_spot, symbol="GLD") asset_quantity_from_workspace_config(config, entry_spot=default_entry_spot, symbol="GLD")
if config is not None and default_entry_spot > 0 if config is not None and default_entry_spot > 0
@@ -428,6 +427,7 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
): ):
entry_spot_hint = ui.label("").classes("text-sm text-slate-500 dark:text-slate-400") entry_spot_hint = ui.label("").classes("text-sm text-slate-500 dark:text-slate-400")
validation_label = ui.label("").classes("text-sm text-rose-600 dark:text-rose-300") validation_label = ui.label("").classes("text-sm text-rose-600 dark:text-rose-300")
progress_label = ui.label("").classes("text-sm text-sky-600 dark:text-sky-300")
run_button = ( run_button = (
ui.button("Run backtest").props("color=primary data-testid=run-backtest-button").classes("mt-2") ui.button("Run backtest").props("color=primary data-testid=run-backtest-button").classes("mt-2")
) )
@@ -456,23 +456,19 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
"""Update the date range hint based on selected symbol and data source.""" """Update the date range hint based on selected symbol and data source."""
symbol = get_symbol_from_dataset() symbol = get_symbol_from_dataset()
data_source = str(data_source_select.value) data_source = str(data_source_select.value)
# Use dataset-specific minimum for Databento # Use dataset-specific minimum for Databento
if data_source == "databento": if data_source == "databento":
dataset = str(dataset_select.value) dataset = str(dataset_select.value)
min_date = DATABENTO_DATASET_MIN_DATES.get(dataset) min_date = DATABENTO_DATASET_MIN_DATES.get(dataset)
if min_date: if min_date:
date_range_hint.set_text( date_range_hint.set_text(f"{dataset} data available from {min_date.strftime('%Y-%m-%d')}")
f"{dataset} data available from {min_date.strftime('%Y-%m-%d')}"
)
return return
# Fall back to symbol minimum # Fall back to symbol minimum
min_date = SYMBOL_MIN_DATES.get(symbol) min_date = SYMBOL_MIN_DATES.get(symbol)
if min_date: if min_date:
date_range_hint.set_text( date_range_hint.set_text(f"{symbol} data available from {min_date.strftime('%Y-%m-%d')}")
f"{symbol} data available from {min_date.strftime('%Y-%m-%d')}"
)
else: else:
date_range_hint.set_text(f"{symbol} data availability unknown") date_range_hint.set_text(f"{symbol} data availability unknown")
@@ -782,25 +778,31 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
logger.warning(f"Failed to save backtest settings for workspace {workspace_id}: {e}") logger.warning(f"Failed to save backtest settings for workspace {workspace_id}: {e}")
return None return None
def refresh_workspace_seeded_units() -> None: async def refresh_workspace_seeded_units() -> None:
"""Derive entry spot asynchronously and update units when dates change."""
validation_label.set_text("") validation_label.set_text("")
entry_spot, entry_error = derive_entry_spot() progress_label.set_text("Fetching entry spot...")
if workspace_id and config is not None and config.gold_value is not None and entry_spot is not None:
units_input.value = asset_quantity_from_workspace_config( try:
config, entry_spot=entry_spot, symbol=get_symbol_from_dataset() # Run derive_entry_spot in background thread
) entry_spot, entry_error = await run.io_bound(derive_entry_spot)
update_cost_estimate()
render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error) if workspace_id and config is not None and config.gold_value is not None and entry_spot is not None:
if entry_error: units_input.value = asset_quantity_from_workspace_config(
validation_label.set_text(entry_error) config, entry_spot=entry_spot, symbol=get_symbol_from_dataset()
render_result_state("Scenario validation failed", entry_error, tone="warning") )
return update_cost_estimate()
validation_error = validate_current_scenario(entry_spot=entry_spot) render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error)
if validation_error: if entry_error:
validation_label.set_text(validation_error) validation_label.set_text(entry_error)
render_result_state("Scenario validation failed", validation_error, tone="warning") render_result_state("Scenario validation failed", entry_error, tone="warning")
else: return
mark_results_stale() validation_error = validate_current_scenario(entry_spot=entry_spot)
if validation_error:
validation_label.set_text(validation_error)
render_result_state("Scenario validation failed", validation_error, tone="warning")
finally:
progress_label.set_text("")
def on_form_change() -> None: def on_form_change() -> None:
"""Handle form changes with minimal API calls.""" """Handle form changes with minimal API calls."""
@@ -811,21 +813,18 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
# Keep existing entry spot, don't re-derive # Keep existing entry spot, don't re-derive
mark_results_stale() mark_results_stale()
async def run_backtest() -> None: def start_backtest() -> None:
"""Run the backtest asynchronously to avoid blocking the WebSocket.""" """Submit backtest job and start polling for results."""
validation_label.set_text("") validation_label.set_text("")
run_button.props("loading")
# Show loading state progress_label.set_text("Validating inputs...")
run_button.props('loading')
validation_label.set_text("Running backtest...")
ui.notify("Running backtest...", type="info")
try: try:
# Validate date range for symbol # Validate date range for symbol
start_date = parse_iso_date(start_input.value, "Start date") start_date = parse_iso_date(start_input.value, "Start date")
end_date = parse_iso_date(end_input.value, "End date") end_date = parse_iso_date(end_input.value, "End date")
symbol = get_symbol_from_dataset() symbol = get_symbol_from_dataset()
# Validate dataset-specific minimum dates for Databento # Validate dataset-specific minimum dates for Databento
data_source = str(data_source_select.value) data_source = str(data_source_select.value)
if data_source == "databento": if data_source == "databento":
@@ -837,14 +836,16 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
f"Selected start date {start_date.strftime('%Y-%m-%d')} is before available data." f"Selected start date {start_date.strftime('%Y-%m-%d')} is before available data."
) )
render_result_state("Invalid start date", validation_label.text, tone="warning") render_result_state("Invalid start date", validation_label.text, tone="warning")
run_button.props(remove='loading') run_button.props(remove="loading")
progress_label.set_text("")
return return
date_range_error = validate_date_range_for_symbol(start_date, end_date, symbol) date_range_error = validate_date_range_for_symbol(start_date, end_date, symbol)
if date_range_error: if date_range_error:
validation_label.set_text(date_range_error) validation_label.set_text(date_range_error)
render_result_state("Scenario validation failed", date_range_error, tone="warning") render_result_state("Scenario validation failed", date_range_error, tone="warning")
run_button.props(remove='loading') run_button.props(remove="loading")
progress_label.set_text("")
return return
# Validate numeric inputs # Validate numeric inputs
@@ -855,15 +856,17 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
if numeric_error: if numeric_error:
validation_label.set_text(numeric_error) validation_label.set_text(numeric_error)
render_result_state("Input validation failed", numeric_error, tone="warning") render_result_state("Input validation failed", numeric_error, tone="warning")
run_button.props(remove='loading') run_button.props(remove="loading")
progress_label.set_text("")
return return
# Save settings before running # Save settings before running
save_backtest_settings() save_backtest_settings()
# Run backtest in background thread to avoid blocking WebSocket # Start background job
result = await run.io_bound( job = start_backtest_job(
service.run_read_only_scenario, workspace_id=workspace_id,
service=service,
symbol=symbol, symbol=symbol,
start_date=start_date, start_date=start_date,
end_date=end_date, end_date=end_date,
@@ -873,65 +876,101 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
margin_call_ltv=ltv, margin_call_ltv=ltv,
data_source=str(data_source_select.value), data_source=str(data_source_select.value),
) )
# Update cost in saved settings after successful run
if str(data_source_select.value) == "databento": # Start polling for job status
update_cost_estimate() ui.timer(1.0, lambda: poll_job_status(job.job_id), once=True)
render_result(result)
run_button.props(remove='loading')
validation_label.set_text("")
ui.notify("Backtest completed!", type="positive")
except (ValueError, KeyError) as exc: except (ValueError, KeyError) as exc:
run_button.props(remove='loading') run_button.props(remove="loading")
entry_spot, entry_error = derive_entry_spot() progress_label.set_text("")
render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error)
if entry_spot is None:
entry_spot_hint.set_text("Entry spot unavailable until the scenario dates are valid.")
validation_label.set_text(str(exc)) validation_label.set_text(str(exc))
render_result_state("Scenario validation failed", str(exc), tone="warning") render_result_state("Validation failed", str(exc), tone="warning")
except Exception:
except Exception as exc: run_button.props(remove="loading")
run_button.props(remove='loading') progress_label.set_text("")
entry_spot, entry_error = derive_entry_spot() logger.exception("Failed to start backtest job")
render_seeded_summary(entry_spot=entry_spot, entry_spot_error=entry_error) validation_label.set_text("Failed to start backtest. Please try again.")
if entry_spot is None: render_result_state("Error", "Failed to start backtest. Please try again.", tone="error")
entry_spot_hint.set_text("Entry spot unavailable until the scenario dates are valid.")
def poll_job_status(job_id: str) -> None:
# Check for Databento API errors """Poll for job status and update UI."""
error_msg = str(exc) job = job_store.get_job(workspace_id)
if "data_start_before_available_start" in error_msg: if not job or job.job_id != job_id:
# Extract the available start date from the error message run_button.props(remove="loading")
import re progress_label.set_text("")
match = re.search(r"available start of dataset [^(]+\('([^']+)'\)", error_msg) render_result_state("Error", "Job not found", tone="error")
if match: return
available_start = match.group(1).split()[0] # Extract date part
validation_label.set_text( # Update progress display
f"Data not available before {available_start}. Please set start date to {available_start} or later." progress_label.set_text(job.stage.label)
)
else: if job.status == JobStatus.COMPLETE:
validation_label.set_text( # Job complete, render results
"Selected start date is before data is available for this dataset. Please choose a later date." run_button.props(remove="loading")
) progress_label.set_text("")
render_result_state("Invalid start date", validation_label.text, tone="warning") if job.result:
elif "BentoClientError" in error_msg or "422" in error_msg: render_job_result(job.result)
validation_label.set_text(f"Data source error: {error_msg}") ui.notify("Backtest completed!", type="positive")
render_result_state("Data unavailable", validation_label.text, tone="warning") elif job.status == JobStatus.FAILED:
else: # Job failed, show error
message = "Backtest failed. Please verify the scenario inputs and try again." run_button.props(remove="loading")
logger.exception( progress_label.set_text("")
"Backtest page run failed for workspace=%s symbol=%s start=%s end=%s template=%s units=%s loan=%s margin_call_ltv=%s", error_msg = job.error or "Unknown error occurred"
workspace_id, validation_label.set_text(error_msg)
symbol_select.value, render_result_state("Backtest failed", error_msg, tone="error")
start_input.value, else:
end_input.value, # Still running, poll again in 1 second
template_select.value, ui.timer(1.0, lambda: poll_job_status(job_id), once=True)
units_input.value,
loan_input.value, def render_job_result(result: dict[str, Any]) -> None:
ltv_input.value, """Render backtest result from job store."""
# Update entry spot hint
configured_start_price = float(start_price_input.value or 0.0)
if configured_start_price > 0:
entry_spot_hint.set_text(f"Using configured start price: ${configured_start_price:,.2f}")
else:
entry_spot_hint.set_text(f"Auto-derived entry spot: ${result.get('entry_spot', 0):,.2f}")
if result.get("entry_spot"):
render_seeded_summary(entry_spot=result["entry_spot"])
result_panel.clear()
with result_panel:
with ui.card().classes(
"w-full rounded-2xl border border-slate-200 bg-white shadow-sm dark:border-slate-800 dark:bg-slate-900"
):
ui.label("Scenario Results").classes("text-lg font-semibold text-slate-900 dark:text-slate-100")
ui.label(f"Template: {result.get('scenario_name', 'Unknown')}").classes(
"text-sm text-slate-500 dark:text-slate-400"
) )
validation_label.set_text(message)
render_result_state("Backtest failed", message, tone="error") # Summary metrics
summary_data = [
("Start value", f"${result.get('total_pnl', 0):,.0f}"),
("End value hedged", f"${result.get('total_pnl_pct', 0)*100:.1f}%"),
("Hedge cost", f"${result.get('hedging_cost', 0):,.0f}"),
("Hedge cost %", f"{result.get('hedging_cost_pct', 0)*100:.2f}%"),
]
with ui.grid(columns=4).classes("w-full gap-4 max-lg:grid-cols-2 max-sm:grid-cols-1"):
for label, value in summary_data:
with ui.card().classes(
"rounded-xl border border-slate-200 bg-slate-50 p-4 shadow-none dark:border-slate-800 dark:bg-slate-950"
):
ui.label(label).classes("text-sm text-slate-500 dark:text-slate-400")
ui.label(value).classes("text-2xl font-bold text-slate-900 dark:text-slate-100")
# Margin call info
if result.get("margin_call_events"):
with ui.card().classes(
"w-full mt-4 rounded-xl border border-amber-200 bg-amber-50 p-4 dark:border-amber-900/60 dark:bg-amber-950/30"
):
ui.label(f"Margin calls: {len(result['margin_call_events'])}").classes(
"text-amber-800 dark:text-amber-200"
)
# Update cost estimate for Databento
if str(data_source_select.value) == "databento":
update_cost_estimate()
# Wire up event handlers # Wire up event handlers
# Only call expensive derive_entry_spot on date changes # Only call expensive derive_entry_spot on date changes
@@ -947,7 +986,7 @@ def _render_backtests_page(workspace_id: str | None = None) -> None:
units_input.on_value_change(lambda e: mark_results_stale()) units_input.on_value_change(lambda e: mark_results_stale())
loan_input.on_value_change(lambda e: mark_results_stale()) loan_input.on_value_change(lambda e: mark_results_stale())
ltv_input.on_value_change(lambda e: mark_results_stale()) ltv_input.on_value_change(lambda e: mark_results_stale())
run_button.on_click(lambda: run_backtest()) run_button.on_click(lambda: start_backtest())
# Initial render # Initial render
render_seeded_summary(entry_spot=float(default_start_price) if default_start_price > 0 else None) render_seeded_summary(entry_spot=float(default_start_price) if default_start_price > 0 else None)

View File

@@ -0,0 +1,321 @@
"""Async backtest job execution with progress tracking.
This module provides a non-blocking backtest execution system:
1. Jobs are submitted and run in background threads
2. Progress is tracked with stages (validating, fetching_prices, calculating, complete)
3. UI polls for status updates
4. Results are cached for retrieval
"""
from __future__ import annotations
import logging
import threading
import uuid
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import Enum
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from app.services.backtesting.ui_service import BacktestPageService
logger = logging.getLogger(__name__)
class JobStatus(str, Enum):
"""Status of a backtest job."""
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
class JobStage(str, Enum):
"""Execution stages with user-friendly labels."""
VALIDATING = "validating"
FETCHING_PRICES = "fetching_prices"
CALCULATING = "calculating"
COMPLETE = "complete"
FAILED = "failed"
@property
def label(self) -> str:
"""Human-readable stage label."""
labels = {
JobStage.VALIDATING: "Validating inputs...",
JobStage.FETCHING_PRICES: "Fetching historical prices...",
JobStage.CALCULATING: "Running backtest calculations...",
JobStage.COMPLETE: "Complete",
JobStage.FAILED: "Failed",
}
return labels.get(self, self.value)
@dataclass
class BacktestJob:
"""Represents a backtest job with progress tracking."""
job_id: str
status: JobStatus = JobStatus.PENDING
stage: JobStage = JobStage.VALIDATING
progress: int = 0 # 0-100
message: str = ""
result: dict[str, Any] | None = None
error: str | None = None
created_at: datetime = field(default_factory=datetime.utcnow)
completed_at: datetime | None = None
def to_dict(self) -> dict[str, Any]:
"""Serialize job for JSON response."""
return {
"job_id": self.job_id,
"status": self.status.value,
"stage": self.stage.value,
"stage_label": self.stage.label,
"progress": self.progress,
"message": self.message,
"has_result": self.result is not None,
"error": self.error,
"created_at": self.created_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
}
class BacktestJobStore:
"""In-memory store for backtest jobs.
Jobs are stored in a dict keyed by workspace_id -> job_id.
Each workspace only has one active job at a time (latest replaces previous).
"""
def __init__(self) -> None:
self._jobs: dict[str, BacktestJob] = {} # workspace_id -> job
self._results: dict[str, dict[str, Any]] = {} # job_id -> result
self._lock = threading.Lock()
def create_job(self, workspace_id: str) -> BacktestJob:
"""Create a new job for a workspace, replacing any existing one."""
job = BacktestJob(job_id=str(uuid.uuid4()))
with self._lock:
self._jobs[workspace_id] = job
logger.info(f"Created job {job.job_id} for workspace {workspace_id}")
return job
def get_job(self, workspace_id: str) -> BacktestJob | None:
"""Get the current job for a workspace."""
with self._lock:
return self._jobs.get(workspace_id)
def update_job(
self,
workspace_id: str,
*,
status: JobStatus | None = None,
stage: JobStage | None = None,
progress: int | None = None,
message: str | None = None,
result: dict[str, Any] | None = None,
error: str | None = None,
) -> None:
"""Update job state."""
with self._lock:
job = self._jobs.get(workspace_id)
if not job:
return
if status:
job.status = status
if stage:
job.stage = stage
if progress is not None:
job.progress = progress
if message is not None:
job.message = message
if result is not None:
job.result = result
self._results[job.job_id] = result
if error:
job.error = error
if stage == JobStage.COMPLETE or stage == JobStage.FAILED:
job.completed_at = datetime.utcnow()
def get_result(self, job_id: str) -> dict[str, Any] | None:
"""Get cached result by job ID."""
with self._lock:
return self._results.get(job_id)
def clear_job(self, workspace_id: str) -> None:
"""Remove job from store."""
with self._lock:
if workspace_id in self._jobs:
del self._jobs[workspace_id]
# Global job store singleton
job_store = BacktestJobStore()
def run_backtest_job(
workspace_id: str,
job: BacktestJob,
service: "BacktestPageService",
symbol: str,
start_date: date,
end_date: date,
template_slug: str,
underlying_units: float,
loan_amount: float,
margin_call_ltv: float,
data_source: str,
) -> None:
"""Execute backtest in background thread with progress updates.
This function runs in a background thread and updates the job state
as it progresses through stages.
"""
try:
# Stage 1: Validating
job_store.update_job(
workspace_id,
status=JobStatus.RUNNING,
stage=JobStage.VALIDATING,
progress=10,
message="Validating inputs...",
)
# Stage 2: Fetching prices
job_store.update_job(
workspace_id,
stage=JobStage.FETCHING_PRICES,
progress=30,
message=f"Fetching prices for {symbol}...",
)
# Run the backtest (this includes price fetching)
result = service.run_read_only_scenario(
symbol=symbol,
start_date=start_date,
end_date=end_date,
template_slug=template_slug,
underlying_units=underlying_units,
loan_amount=loan_amount,
margin_call_ltv=margin_call_ltv,
data_source=data_source,
)
# Stage 3: Calculating (already done by run_read_only_scenario)
job_store.update_job(
workspace_id,
stage=JobStage.CALCULATING,
progress=70,
message="Processing results...",
)
# Convert result to dict for serialization
result_dict = {
"scenario_name": result.scenario_name,
"entry_date": result.entry_date.isoformat() if result.entry_date else None,
"entry_spot": result.entry_spot,
"underlying_units": result.underlying_units,
"loan_amount": result.loan_amount,
"margin_call_ltv": result.margin_call_ltv,
"total_pnl": result.total_pnl,
"total_pnl_pct": result.total_pnl_pct,
"hedging_cost": result.hedging_cost,
"hedging_cost_pct": result.hedging_cost_pct,
"unhedged_pnl": result.unhedged_pnl,
"unhedged_pnl_pct": result.unhedged_pnl_pct,
"margin_calls": result.margin_calls,
"margin_call_events": [
{
"date": event.date.isoformat(),
"price": event.price,
"ltv": event.ltv,
"action": event.action,
}
for event in (result.margin_call_events or [])
],
"prices": [{"date": p.date.isoformat(), "close": p.close} for p in (result.prices or [])],
}
# Stage 4: Complete
job_store.update_job(
workspace_id,
status=JobStatus.COMPLETE,
stage=JobStage.COMPLETE,
progress=100,
message="Backtest complete!",
result=result_dict,
)
logger.info(f"Job {job.job_id} completed successfully")
except Exception as e:
logger.exception(f"Job {job.job_id} failed: {e}")
error_msg = str(e)
# Check for Databento API errors
if "data_start_before_available_start" in error_msg:
import re
match = re.search(r"available start of dataset [^(]+\('([^']+)'", error_msg)
if match:
available_start = match.group(1).split()[0]
error_msg = (
f"Data not available before {available_start}. Please set start date to {available_start} or later."
)
else:
error_msg = "Selected start date is before data is available for this dataset."
elif "BentoClientError" in error_msg or "422" in error_msg:
error_msg = f"Data source error: {error_msg}"
job_store.update_job(
workspace_id,
status=JobStatus.FAILED,
stage=JobStage.FAILED,
progress=0,
error=error_msg,
)
def start_backtest_job(
workspace_id: str,
service: "BacktestPageService",
symbol: str,
start_date: date,
end_date: date,
template_slug: str,
underlying_units: float,
loan_amount: float,
margin_call_ltv: float,
data_source: str,
) -> BacktestJob:
"""Start a backtest job in a background thread.
Returns immediately with the job ID. The job runs in the background
and can be polled for status.
"""
job = job_store.create_job(workspace_id)
thread = threading.Thread(
target=run_backtest_job,
kwargs={
"workspace_id": workspace_id,
"job": job,
"service": service,
"symbol": symbol,
"start_date": start_date,
"end_date": end_date,
"template_slug": template_slug,
"underlying_units": underlying_units,
"loan_amount": loan_amount,
"margin_call_ltv": margin_call_ltv,
"data_source": data_source,
},
daemon=True,
)
thread.start()
return job