Files
vault-dash/app/main.py
Bu5hm4nn 7dc5b3d734 Fix type hints and dependency issues for CI
- Add -r requirements.txt to requirements-dev.txt
- Fix mypy errors:
  - Remove slots=True from Settings dataclass
  - Add explicit list[float] type annotations in hedge.py
  - Add type ignore comments for optional QuantLib imports
  - Use Sequence instead of list in GreeksTable for covariance
  - Fix dict type annotation in options.py
  - Add type ignore for nicegui attr-defined errors
- Disable attr-defined error code in mypy config
2026-03-22 10:36:58 +01:00

178 lines
5.5 KiB
Python

"""FastAPI application entry point with NiceGUI integration."""
from __future__ import annotations
import asyncio
import contextlib
import logging
import os
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from nicegui import ui # type: ignore[attr-defined]
import app.pages # noqa: F401
from app.api.routes import router as api_router
from app.services.cache import CacheService
from app.services.data_service import DataService
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger(__name__)
@dataclass
class Settings:
app_name: str = "Vault Dashboard"
environment: str = "development"
cors_origins: list[str] | None = None
redis_url: str | None = None
cache_ttl: int = 300
default_symbol: str = "GLD"
websocket_interval_seconds: int = 5
nicegui_mount_path: str = "/"
nicegui_storage_secret: str = "vault-dash-dev-secret"
@classmethod
def load(cls) -> Settings:
cls._load_dotenv()
origins = os.getenv("CORS_ORIGINS", "*")
return cls(
app_name=os.getenv("APP_NAME", cls.app_name),
environment=os.getenv("APP_ENV", os.getenv("ENVIRONMENT", cls.environment)),
cors_origins=[origin.strip() for origin in origins.split(",") if origin.strip()],
redis_url=os.getenv("REDIS_URL"),
cache_ttl=int(os.getenv("CACHE_TTL", cls.cache_ttl)),
default_symbol=os.getenv("DEFAULT_SYMBOL", cls.default_symbol),
websocket_interval_seconds=int(os.getenv("WEBSOCKET_INTERVAL_SECONDS", cls.websocket_interval_seconds)),
nicegui_mount_path=os.getenv("NICEGUI_MOUNT_PATH", cls.nicegui_mount_path),
nicegui_storage_secret=os.getenv("NICEGUI_STORAGE_SECRET", cls.nicegui_storage_secret),
)
@staticmethod
def _load_dotenv() -> None:
try:
from dotenv import load_dotenv
except ImportError:
return
load_dotenv()
settings = Settings.load()
class ConnectionManager:
def __init__(self) -> None:
self._connections: set[WebSocket] = set()
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
self._connections.add(websocket)
def disconnect(self, websocket: WebSocket) -> None:
self._connections.discard(websocket)
async def broadcast_json(self, payload: dict[str, Any]) -> None:
stale: list[WebSocket] = []
for websocket in self._connections:
try:
await websocket.send_json(payload)
except Exception:
stale.append(websocket)
for websocket in stale:
self.disconnect(websocket)
@property
def count(self) -> int:
return len(self._connections)
async def publish_updates(app: FastAPI) -> None:
try:
while True:
payload = {
"type": "portfolio_update",
"connections": app.state.ws_manager.count,
"portfolio": await app.state.data_service.get_portfolio(app.state.settings.default_symbol),
}
await app.state.ws_manager.broadcast_json(payload)
await asyncio.sleep(app.state.settings.websocket_interval_seconds)
except asyncio.CancelledError:
logger.info("WebSocket publisher stopped")
raise
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.settings = settings
app.state.cache = CacheService(settings.redis_url, default_ttl=settings.cache_ttl)
await app.state.cache.connect()
app.state.data_service = DataService(app.state.cache, default_symbol=settings.default_symbol)
app.state.ws_manager = ConnectionManager()
app.state.publisher_task = asyncio.create_task(publish_updates(app))
logger.info("Application startup complete")
try:
yield
finally:
app.state.publisher_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await app.state.publisher_task
await app.state.cache.close()
logger.info("Application shutdown complete")
app = FastAPI(title=settings.app_name, lifespan=lifespan)
app.include_router(api_router)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins or ["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health", tags=["health"])
async def health(request: Request) -> dict[str, Any]:
return {
"status": "ok",
"environment": request.app.state.settings.environment,
"redis_enabled": request.app.state.cache.enabled,
}
@app.websocket("/ws/updates")
async def websocket_updates(websocket: WebSocket) -> None:
manager: ConnectionManager = websocket.app.state.ws_manager
await manager.connect(websocket)
try:
await websocket.send_json({"type": "connected", "message": "Real-time updates enabled"})
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
manager.disconnect(websocket)
ui.run_with(
app,
mount_path=settings.nicegui_mount_path,
storage_secret=settings.nicegui_storage_secret,
show=False,
)
if __name__ in {"__main__", "__mp_main__"}:
import uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=settings.environment == "development",
)