feat(CORE-001D1): harden unit-aware workspace persistence
This commit is contained in:
@@ -3,7 +3,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -214,6 +216,24 @@ class PortfolioConfig:
|
||||
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
|
||||
|
||||
|
||||
def _coerce_persisted_decimal(value: Any) -> Decimal:
|
||||
if isinstance(value, bool):
|
||||
raise TypeError("Boolean values are not valid decimal persistence inputs")
|
||||
if isinstance(value, Decimal):
|
||||
amount = value
|
||||
elif isinstance(value, int):
|
||||
amount = Decimal(value)
|
||||
elif isinstance(value, float):
|
||||
amount = Decimal(str(value))
|
||||
elif isinstance(value, str):
|
||||
amount = Decimal(value)
|
||||
else:
|
||||
raise TypeError(f"Unsupported persisted decimal input type: {type(value)!r}")
|
||||
if not amount.is_finite():
|
||||
raise ValueError("Decimal persistence value must be finite")
|
||||
return amount
|
||||
|
||||
|
||||
class PortfolioRepository:
|
||||
"""Repository for persisting portfolio configuration.
|
||||
|
||||
@@ -221,6 +241,36 @@ class PortfolioRepository:
|
||||
"""
|
||||
|
||||
CONFIG_PATH = Path("data/portfolio_config.json")
|
||||
SCHEMA_VERSION = 2
|
||||
PERSISTENCE_CURRENCY = "USD"
|
||||
PERSISTENCE_WEIGHT_UNIT = "ozt"
|
||||
_WEIGHT_FACTORS = {
|
||||
"g": Decimal("1"),
|
||||
"kg": Decimal("1000"),
|
||||
"ozt": Decimal("31.1034768"),
|
||||
}
|
||||
_MONEY_FIELDS = {"gold_value", "loan_amount", "monthly_budget"}
|
||||
_WEIGHT_FIELDS = {"gold_ounces"}
|
||||
_PRICE_PER_WEIGHT_FIELDS = {"entry_price"}
|
||||
_RATIO_FIELDS = {"margin_threshold", "ltv_warning", "volatility_spike"}
|
||||
_PERCENT_FIELDS = {"spot_drawdown"}
|
||||
_INTEGER_FIELDS = {"refresh_interval"}
|
||||
_PERSISTED_FIELDS = {
|
||||
"gold_value",
|
||||
"entry_price",
|
||||
"gold_ounces",
|
||||
"entry_basis_mode",
|
||||
"loan_amount",
|
||||
"margin_threshold",
|
||||
"monthly_budget",
|
||||
"ltv_warning",
|
||||
"primary_source",
|
||||
"fallback_source",
|
||||
"refresh_interval",
|
||||
"volatility_spike",
|
||||
"spot_drawdown",
|
||||
"email_alerts",
|
||||
}
|
||||
|
||||
def __init__(self, config_path: Path | None = None) -> None:
|
||||
self.config_path = config_path or self.CONFIG_PATH
|
||||
@@ -228,8 +278,13 @@ class PortfolioRepository:
|
||||
|
||||
def save(self, config: PortfolioConfig) -> None:
|
||||
"""Save configuration to disk."""
|
||||
with open(self.config_path, "w") as f:
|
||||
json.dump(config.to_dict(), f, indent=2)
|
||||
payload = self._to_persistence_payload(config)
|
||||
tmp_path = self.config_path.with_name(f"{self.config_path.name}.tmp")
|
||||
with open(tmp_path, "w") as f:
|
||||
json.dump(payload, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, self.config_path)
|
||||
|
||||
def load(self) -> PortfolioConfig:
|
||||
"""Load configuration from disk.
|
||||
@@ -244,10 +299,178 @@ class PortfolioRepository:
|
||||
try:
|
||||
with open(self.config_path) as f:
|
||||
data = json.load(f)
|
||||
return PortfolioConfig.from_dict(data)
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
print(f"Warning: Failed to load portfolio config: {e}. Using defaults.")
|
||||
return PortfolioConfig()
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid portfolio config JSON: {e}") from e
|
||||
|
||||
return self._config_from_payload(data)
|
||||
|
||||
@classmethod
|
||||
def _to_persistence_payload(cls, config: PortfolioConfig) -> dict[str, Any]:
|
||||
return {
|
||||
"schema_version": cls.SCHEMA_VERSION,
|
||||
"portfolio": {key: cls._serialize_value(key, value) for key, value in config.to_dict().items()},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _config_from_payload(cls, data: dict[str, Any]) -> PortfolioConfig:
|
||||
if not isinstance(data, dict):
|
||||
raise TypeError("portfolio config payload must be an object")
|
||||
schema_version = data.get("schema_version")
|
||||
if schema_version != cls.SCHEMA_VERSION:
|
||||
raise ValueError(f"Unsupported portfolio schema_version: {schema_version}")
|
||||
portfolio = data.get("portfolio")
|
||||
if not isinstance(portfolio, dict):
|
||||
raise TypeError("portfolio payload must be an object")
|
||||
cls._validate_portfolio_fields(portfolio)
|
||||
return PortfolioConfig.from_dict(cls._deserialize_portfolio_payload(portfolio))
|
||||
|
||||
@classmethod
|
||||
def _validate_portfolio_fields(cls, payload: dict[str, Any]) -> None:
|
||||
keys = set(payload.keys())
|
||||
missing = sorted(cls._PERSISTED_FIELDS - keys)
|
||||
unknown = sorted(keys - cls._PERSISTED_FIELDS)
|
||||
if missing or unknown:
|
||||
details: list[str] = []
|
||||
if missing:
|
||||
details.append(f"missing={missing}")
|
||||
if unknown:
|
||||
details.append(f"unknown={unknown}")
|
||||
raise ValueError(f"Invalid portfolio payload fields: {'; '.join(details)}")
|
||||
|
||||
@classmethod
|
||||
def _deserialize_portfolio_payload(cls, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
return {key: cls._deserialize_value(key, value) for key, value in payload.items()}
|
||||
|
||||
@classmethod
|
||||
def _serialize_value(cls, key: str, value: Any) -> Any:
|
||||
if key in cls._MONEY_FIELDS:
|
||||
return {"value": cls._decimal_to_string(value), "currency": cls.PERSISTENCE_CURRENCY}
|
||||
if key in cls._WEIGHT_FIELDS:
|
||||
return {"value": cls._decimal_to_string(value), "unit": cls.PERSISTENCE_WEIGHT_UNIT}
|
||||
if key in cls._PRICE_PER_WEIGHT_FIELDS:
|
||||
return {
|
||||
"value": cls._decimal_to_string(value),
|
||||
"currency": cls.PERSISTENCE_CURRENCY,
|
||||
"per_weight_unit": cls.PERSISTENCE_WEIGHT_UNIT,
|
||||
}
|
||||
if key in cls._RATIO_FIELDS:
|
||||
return {"value": cls._decimal_to_string(value), "unit": "ratio"}
|
||||
if key in cls._PERCENT_FIELDS:
|
||||
return {"value": cls._decimal_to_string(value), "unit": "percent"}
|
||||
if key in cls._INTEGER_FIELDS:
|
||||
return cls._serialize_integer(value, unit="seconds")
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def _deserialize_value(cls, key: str, value: Any) -> Any:
|
||||
if key in cls._MONEY_FIELDS:
|
||||
return float(cls._deserialize_money(value))
|
||||
if key in cls._WEIGHT_FIELDS:
|
||||
return float(cls._deserialize_weight(value))
|
||||
if key in cls._PRICE_PER_WEIGHT_FIELDS:
|
||||
return float(cls._deserialize_price_per_weight(value))
|
||||
if key in cls._RATIO_FIELDS:
|
||||
return float(cls._deserialize_ratio(value))
|
||||
if key in cls._PERCENT_FIELDS:
|
||||
return float(cls._deserialize_percent(value))
|
||||
if key in cls._INTEGER_FIELDS:
|
||||
return cls._deserialize_integer(value, expected_unit="seconds")
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def _deserialize_money(cls, value: Any) -> Decimal:
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("money field must be an object")
|
||||
currency = value.get("currency")
|
||||
if currency != cls.PERSISTENCE_CURRENCY:
|
||||
raise ValueError(f"Unsupported currency: {currency!r}")
|
||||
return _coerce_persisted_decimal(value.get("value"))
|
||||
|
||||
@classmethod
|
||||
def _deserialize_weight(cls, value: Any) -> Decimal:
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("weight field must be an object")
|
||||
amount = _coerce_persisted_decimal(value.get("value"))
|
||||
unit = value.get("unit")
|
||||
return cls._convert_weight(amount, unit, cls.PERSISTENCE_WEIGHT_UNIT)
|
||||
|
||||
@classmethod
|
||||
def _deserialize_price_per_weight(cls, value: Any) -> Decimal:
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("price-per-weight field must be an object")
|
||||
currency = value.get("currency")
|
||||
if currency != cls.PERSISTENCE_CURRENCY:
|
||||
raise ValueError(f"Unsupported currency: {currency!r}")
|
||||
amount = _coerce_persisted_decimal(value.get("value"))
|
||||
unit = value.get("per_weight_unit")
|
||||
return cls._convert_price_per_weight(amount, unit, cls.PERSISTENCE_WEIGHT_UNIT)
|
||||
|
||||
@classmethod
|
||||
def _deserialize_ratio(cls, value: Any) -> Decimal:
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("ratio field must be an object")
|
||||
amount = _coerce_persisted_decimal(value.get("value"))
|
||||
unit = value.get("unit")
|
||||
if unit == "ratio":
|
||||
return amount
|
||||
if unit == "percent":
|
||||
return amount / Decimal("100")
|
||||
raise ValueError(f"Unsupported ratio unit: {unit!r}")
|
||||
|
||||
@classmethod
|
||||
def _deserialize_percent(cls, value: Any) -> Decimal:
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("percent field must be an object")
|
||||
amount = _coerce_persisted_decimal(value.get("value"))
|
||||
unit = value.get("unit")
|
||||
if unit == "percent":
|
||||
return amount
|
||||
if unit == "ratio":
|
||||
return amount * Decimal("100")
|
||||
raise ValueError(f"Unsupported percent unit: {unit!r}")
|
||||
|
||||
@staticmethod
|
||||
def _serialize_integer(value: Any, *, unit: str) -> dict[str, Any]:
|
||||
if isinstance(value, bool) or not isinstance(value, int):
|
||||
raise TypeError("integer field value must be an int")
|
||||
return {"value": value, "unit": unit}
|
||||
|
||||
@staticmethod
|
||||
def _deserialize_integer(value: Any, *, expected_unit: str) -> int:
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("integer field must be an object")
|
||||
unit = value.get("unit")
|
||||
if unit != expected_unit:
|
||||
raise ValueError(f"Unsupported integer unit: {unit!r}")
|
||||
raw = value.get("value")
|
||||
if isinstance(raw, bool) or not isinstance(raw, int):
|
||||
raise TypeError("integer field value must be an int")
|
||||
return raw
|
||||
|
||||
@classmethod
|
||||
def _convert_weight(cls, amount: Decimal, from_unit: Any, to_unit: str) -> Decimal:
|
||||
if from_unit not in cls._WEIGHT_FACTORS or to_unit not in cls._WEIGHT_FACTORS:
|
||||
raise ValueError(f"Unsupported weight unit conversion: {from_unit!r} -> {to_unit!r}")
|
||||
if from_unit == to_unit:
|
||||
return amount
|
||||
grams = amount * cls._WEIGHT_FACTORS[from_unit]
|
||||
return grams / cls._WEIGHT_FACTORS[to_unit]
|
||||
|
||||
@classmethod
|
||||
def _convert_price_per_weight(cls, amount: Decimal, from_unit: Any, to_unit: str) -> Decimal:
|
||||
if from_unit not in cls._WEIGHT_FACTORS or to_unit not in cls._WEIGHT_FACTORS:
|
||||
raise ValueError(f"Unsupported price-per-weight unit conversion: {from_unit!r} -> {to_unit!r}")
|
||||
if from_unit == to_unit:
|
||||
return amount
|
||||
return amount * cls._WEIGHT_FACTORS[to_unit] / cls._WEIGHT_FACTORS[from_unit]
|
||||
|
||||
@staticmethod
|
||||
def _decimal_to_string(value: Any) -> str:
|
||||
decimal_value = _coerce_persisted_decimal(value)
|
||||
normalized = format(decimal_value, "f")
|
||||
if "." not in normalized:
|
||||
normalized = f"{normalized}.0"
|
||||
return normalized
|
||||
|
||||
|
||||
_portfolio_repo: PortfolioRepository | None = None
|
||||
|
||||
@@ -26,7 +26,14 @@ class WorkspaceRepository:
|
||||
def workspace_exists(self, workspace_id: str) -> bool:
|
||||
if not self.is_valid_workspace_id(workspace_id):
|
||||
return False
|
||||
return self._portfolio_path(workspace_id).exists()
|
||||
portfolio_path = self._portfolio_path(workspace_id)
|
||||
if not portfolio_path.exists():
|
||||
return False
|
||||
try:
|
||||
PortfolioRepository(portfolio_path).load()
|
||||
except (ValueError, TypeError, FileNotFoundError):
|
||||
return False
|
||||
return True
|
||||
|
||||
def create_workspace(self, workspace_id: str | None = None) -> PortfolioConfig:
|
||||
resolved_workspace_id = workspace_id or str(uuid4())
|
||||
|
||||
Reference in New Issue
Block a user