Files
kuma-strapper/backend/config.py
Debian 1fd29e449f
All checks were successful
Build and Push Container / build (push) Successful in 1m4s
Add local SQLite database and sync with Uptime Kuma
Features:
- SQLite database to track monitors and hosts locally
- Uses Uptime Kuma tags to mark monitors as managed by Kuma Strapper
- Sync on startup, before each scan, and on-demand via API
- Shows existing monitors when re-scanning a host

New files:
- backend/services/database.py - SQLite database service
- backend/services/sync.py - Sync service for Uptime Kuma reconciliation

API endpoints:
- POST /api/sync - Full sync with Uptime Kuma
- POST /api/sync/host/<hostname> - Sync specific host
- GET /api/hosts - List tracked hosts
- GET /api/hosts/<hostname>/monitors - Get monitors for host
- GET /api/monitors/tracked - Get all tracked monitors

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 09:10:26 +00:00

127 lines
4.2 KiB
Python

import os
import base64
import re
import requests
from dataclasses import dataclass
from typing import Optional
def fetch_from_op_connect(op_ref: str, connect_host: str, connect_token: str) -> str:
"""Fetch a secret from 1Password Connect using op:// reference."""
match = re.match(r"op://([^/]+)/([^/]+)/(.+)", op_ref)
if not match:
raise ValueError(f"Invalid 1Password reference: {op_ref}")
vault_name, item_name, field_name = match.groups()
headers = {"Authorization": f"Bearer {connect_token}"}
# Get vault ID
vaults_resp = requests.get(f"{connect_host}/v1/vaults", headers=headers)
vaults_resp.raise_for_status()
vault_id = next((v["id"] for v in vaults_resp.json() if v["name"] == vault_name), None)
if not vault_id:
raise ValueError(f"Vault not found: {vault_name}")
# Get item
items_resp = requests.get(
f"{connect_host}/v1/vaults/{vault_id}/items",
headers=headers,
params={"filter": f'title eq "{item_name}"'}
)
items_resp.raise_for_status()
items = items_resp.json()
if not items:
raise ValueError(f"Item not found: {item_name}")
# Get item details with fields
item_resp = requests.get(
f"{connect_host}/v1/vaults/{vault_id}/items/{items[0]['id']}",
headers=headers
)
item_resp.raise_for_status()
item = item_resp.json()
# Find field
for field in item.get("fields", []):
if field.get("label", "").lower() == field_name.lower() or field.get("id", "").lower() == field_name.lower():
return field.get("value", "")
raise ValueError(f"Field not found: {field_name}")
def resolve_secret(env_var: str, default: str = "") -> str:
"""Resolve a secret from env var or 1Password Connect."""
value = os.environ.get(env_var, default)
if value.startswith("op://"):
connect_host = os.environ.get("OP_CONNECT_HOST")
connect_token = os.environ.get("OP_CONNECT_TOKEN")
if not connect_host or not connect_token:
raise ValueError(f"{env_var} uses op:// reference but OP_CONNECT_HOST/OP_CONNECT_TOKEN not set")
return fetch_from_op_connect(value, connect_host, connect_token)
return value
@dataclass
class Config:
"""Application configuration loaded from environment variables."""
ssh_private_key: str
uptime_kuma_url: str
uptime_kuma_api_key: str
claude_api_key: str
dev_mode: bool = False
database_path: str = "/app/data/kuma_strapper.db"
@classmethod
def from_env(cls) -> "Config":
"""Load configuration from environment variables."""
ssh_key_b64 = resolve_secret("SSH_PRIVATE_KEY", "")
# Decode base64 SSH key
try:
ssh_private_key = base64.b64decode(ssh_key_b64).decode("utf-8") if ssh_key_b64 else ""
except Exception:
ssh_private_key = ssh_key_b64 # Allow plain text for development
return cls(
ssh_private_key=ssh_private_key,
uptime_kuma_url=os.environ.get("UPTIME_KUMA_URL", "http://localhost:3001"),
uptime_kuma_api_key=resolve_secret("UPTIME_KUMA_API_KEY", ""),
claude_api_key=resolve_secret("CLAUDE_API_KEY", ""),
dev_mode=os.environ.get("DEV_MODE", "false").lower() == "true",
database_path=os.environ.get("DATABASE_PATH", "/app/data/kuma_strapper.db"),
)
def validate(self) -> list[str]:
"""Validate configuration and return list of errors."""
errors = []
if not self.ssh_private_key:
errors.append("SSH_PRIVATE_KEY is required")
if not self.uptime_kuma_url:
errors.append("UPTIME_KUMA_URL is required")
# UPTIME_KUMA_API_KEY is optional - can login via web UI
if not self.claude_api_key:
errors.append("CLAUDE_API_KEY is required")
return errors
# Global config instance
_config: Optional[Config] = None
def get_config() -> Config:
"""Get the global configuration instance."""
global _config
if _config is None:
_config = Config.from_env()
return _config
def set_dev_mode(enabled: bool) -> None:
"""Update dev mode setting."""
config = get_config()
config.dev_mode = enabled