import os import base64 import re import requests from dataclasses import dataclass from typing import Optional def fetch_from_op_connect(op_ref: str, connect_host: str, connect_token: str) -> str: """Fetch a secret from 1Password Connect using op:// reference.""" match = re.match(r"op://([^/]+)/([^/]+)/(.+)", op_ref) if not match: raise ValueError(f"Invalid 1Password reference: {op_ref}") vault_name, item_name, field_name = match.groups() headers = {"Authorization": f"Bearer {connect_token}"} # Get vault ID vaults_resp = requests.get(f"{connect_host}/v1/vaults", headers=headers) vaults_resp.raise_for_status() vault_id = next((v["id"] for v in vaults_resp.json() if v["name"] == vault_name), None) if not vault_id: raise ValueError(f"Vault not found: {vault_name}") # Get item items_resp = requests.get( f"{connect_host}/v1/vaults/{vault_id}/items", headers=headers, params={"filter": f'title eq "{item_name}"'} ) items_resp.raise_for_status() items = items_resp.json() if not items: raise ValueError(f"Item not found: {item_name}") # Get item details with fields item_resp = requests.get( f"{connect_host}/v1/vaults/{vault_id}/items/{items[0]['id']}", headers=headers ) item_resp.raise_for_status() item = item_resp.json() # Find field for field in item.get("fields", []): if field.get("label", "").lower() == field_name.lower() or field.get("id", "").lower() == field_name.lower(): return field.get("value", "") raise ValueError(f"Field not found: {field_name}") def resolve_secret(env_var: str, default: str = "") -> str: """Resolve a secret from env var or 1Password Connect.""" value = os.environ.get(env_var, default) if value.startswith("op://"): connect_host = os.environ.get("OP_CONNECT_HOST") connect_token = os.environ.get("OP_CONNECT_TOKEN") if not connect_host or not connect_token: raise ValueError(f"{env_var} uses op:// reference but OP_CONNECT_HOST/OP_CONNECT_TOKEN not set") return fetch_from_op_connect(value, connect_host, connect_token) return value @dataclass class Config: """Application configuration loaded from environment variables.""" ssh_private_key: str uptime_kuma_url: str uptime_kuma_api_key: str claude_api_key: str dev_mode: bool = False @classmethod def from_env(cls) -> "Config": """Load configuration from environment variables.""" ssh_key_b64 = resolve_secret("SSH_PRIVATE_KEY", "") # Decode base64 SSH key try: ssh_private_key = base64.b64decode(ssh_key_b64).decode("utf-8") if ssh_key_b64 else "" except Exception: ssh_private_key = ssh_key_b64 # Allow plain text for development return cls( ssh_private_key=ssh_private_key, uptime_kuma_url=os.environ.get("UPTIME_KUMA_URL", "http://localhost:3001"), uptime_kuma_api_key=resolve_secret("UPTIME_KUMA_API_KEY", ""), claude_api_key=resolve_secret("CLAUDE_API_KEY", ""), dev_mode=os.environ.get("DEV_MODE", "false").lower() == "true", ) def validate(self) -> list[str]: """Validate configuration and return list of errors.""" errors = [] if not self.ssh_private_key: errors.append("SSH_PRIVATE_KEY is required") if not self.uptime_kuma_url: errors.append("UPTIME_KUMA_URL is required") # UPTIME_KUMA_API_KEY is optional - can login via web UI if not self.claude_api_key: errors.append("CLAUDE_API_KEY is required") return errors # Global config instance _config: Optional[Config] = None def get_config() -> Config: """Get the global configuration instance.""" global _config if _config is None: _config = Config.from_env() return _config def set_dev_mode(enabled: bool) -> None: """Update dev mode setting.""" config = get_config() config.dev_mode = enabled