Files
kuma-strapper/backend/services/kuma_client.py
Debian 1fd29e449f
All checks were successful
Build and Push Container / build (push) Successful in 1m4s
Add local SQLite database and sync with Uptime Kuma
Features:
- SQLite database to track monitors and hosts locally
- Uses Uptime Kuma tags to mark monitors as managed by Kuma Strapper
- Sync on startup, before each scan, and on-demand via API
- Shows existing monitors when re-scanning a host

New files:
- backend/services/database.py - SQLite database service
- backend/services/sync.py - Sync service for Uptime Kuma reconciliation

API endpoints:
- POST /api/sync - Full sync with Uptime Kuma
- POST /api/sync/host/<hostname> - Sync specific host
- GET /api/hosts - List tracked hosts
- GET /api/hosts/<hostname>/monitors - Get monitors for host
- GET /api/monitors/tracked - Get all tracked monitors

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 09:10:26 +00:00

367 lines
12 KiB
Python

import os
import json
from typing import Optional
from dataclasses import dataclass
from uptime_kuma_api import UptimeKumaApi, MonitorType
from config import get_config
# Map our monitor types to Uptime Kuma types
TYPE_MAP = {
"http": MonitorType.HTTP,
"tcp": MonitorType.PORT,
"ping": MonitorType.PING,
"docker": MonitorType.DOCKER,
"keyword": MonitorType.KEYWORD,
"push": MonitorType.PUSH,
}
# Token storage path
TOKEN_FILE = os.environ.get("KUMA_TOKEN_FILE", "/tmp/kuma_token.json")
@dataclass
class Monitor:
"""Uptime Kuma monitor configuration."""
type: str # http, tcp, ping, docker, keyword, push
name: str
url: Optional[str] = None # For HTTP monitors
hostname: Optional[str] = None # For TCP/Ping monitors
port: Optional[int] = None # For TCP monitors
interval: int = 60
keyword: Optional[str] = None # For keyword monitors
docker_container: Optional[str] = None # For Docker monitors
docker_host: Optional[str] = None # For Docker monitors
push_token: Optional[str] = None # For Push monitors
retries: int = 3
retry_interval: int = 60
max_redirects: int = 10
accepted_statuscodes: list[str] = None
def __post_init__(self):
if self.accepted_statuscodes is None:
self.accepted_statuscodes = ["200-299"]
def load_saved_token() -> Optional[str]:
"""Load saved token from file."""
try:
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, "r") as f:
data = json.load(f)
return data.get("token")
except Exception:
pass
return None
def save_token(token: str) -> None:
"""Save token to file."""
try:
with open(TOKEN_FILE, "w") as f:
json.dump({"token": token}, f)
except Exception as e:
print(f"Warning: Could not save token: {e}")
def clear_saved_token() -> None:
"""Clear saved token."""
try:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
except Exception:
pass
class UptimeKumaClient:
"""Client for Uptime Kuma using Socket.io API."""
def __init__(self):
config = get_config()
self.base_url = config.uptime_kuma_url.rstrip("/")
self.api_key = config.uptime_kuma_api_key # Can be empty if using saved token
self._api = None
self._token = None
def _get_token(self) -> Optional[str]:
"""Get token from config or saved file."""
if self._token:
return self._token
if self.api_key:
return self.api_key
return load_saved_token()
def _get_api(self) -> UptimeKumaApi:
"""Get connected API instance."""
if self._api is None:
token = self._get_token()
if not token:
raise Exception("Not authenticated. Please login to Uptime Kuma first.")
self._api = UptimeKumaApi(self.base_url)
try:
self._api.login_by_token(token)
except Exception as e:
self._api = None
clear_saved_token()
raise Exception(f"Authentication failed. Please login again: {e}")
return self._api
def _disconnect(self):
"""Disconnect API."""
if self._api:
try:
self._api.disconnect()
except Exception:
pass
self._api = None
def login(self, username: str, password: str, totp: Optional[str] = None) -> dict:
"""Login with username/password and optional TOTP."""
self._disconnect()
try:
api = UptimeKumaApi(self.base_url)
if totp:
result = api.login(username, password, totp)
else:
result = api.login(username, password)
# Check if 2FA is required
if result.get("tokenRequired"):
api.disconnect()
raise Exception("2FA token required. Please enter your TOTP code.")
token = result.get("token")
if not token:
api.disconnect()
raise Exception("Login failed: No authentication token received")
save_token(token)
self._token = token
api.disconnect()
return {"success": True, "message": "Login successful"}
except Exception as e:
error_msg = str(e)
# Re-raise with clear message for 2FA requirement
if "tokenRequired" in error_msg or "2fa" in error_msg.lower():
raise Exception("2FA token required. Please enter your TOTP code.")
raise Exception(f"Login failed: {error_msg}")
def is_authenticated(self) -> bool:
"""Check if we have valid authentication."""
token = self._get_token()
if not token:
return False
try:
api = UptimeKumaApi(self.base_url)
api.login_by_token(token)
api.disconnect()
return True
except Exception:
clear_saved_token()
return False
def logout(self) -> None:
"""Clear saved authentication."""
self._disconnect()
self._token = None
clear_saved_token()
def get_monitors(self) -> list[dict]:
"""Get all monitors."""
try:
api = self._get_api()
return api.get_monitors()
except Exception as e:
self._disconnect()
raise Exception(f"Failed to get monitors: {str(e)}")
def get_monitor(self, monitor_id: int) -> dict:
"""Get a specific monitor."""
try:
api = self._get_api()
return api.get_monitor(monitor_id)
except Exception as e:
self._disconnect()
raise Exception(f"Failed to get monitor {monitor_id}: {str(e)}")
def create_monitor(self, monitor: Monitor) -> dict:
"""Create a new monitor."""
try:
api = self._get_api()
monitor_type = TYPE_MAP.get(monitor.type)
if not monitor_type:
raise ValueError(f"Unknown monitor type: {monitor.type}")
kwargs = {
"type": monitor_type,
"name": monitor.name,
"interval": monitor.interval,
"maxretries": monitor.retries,
"retryInterval": monitor.retry_interval,
}
# Add type-specific fields
if monitor.type == "http":
kwargs["url"] = monitor.url
kwargs["maxredirects"] = monitor.max_redirects
kwargs["accepted_statuscodes"] = monitor.accepted_statuscodes
elif monitor.type == "keyword":
kwargs["url"] = monitor.url
kwargs["keyword"] = monitor.keyword
elif monitor.type == "tcp":
kwargs["hostname"] = monitor.hostname
kwargs["port"] = monitor.port
elif monitor.type == "ping":
kwargs["hostname"] = monitor.hostname
elif monitor.type == "docker":
kwargs["docker_container"] = monitor.docker_container
if monitor.docker_host:
kwargs["docker_host"] = monitor.docker_host
elif monitor.type == "push":
# Push monitors don't need additional fields
# The push token is returned in the result
pass
# Add conditions field for Uptime Kuma v2 compatibility
kwargs["conditions"] = []
result = api.add_monitor(**kwargs)
# For push monitors, extract the push token from the result
if monitor.type == "push" and "monitorID" in result:
monitor_data = api.get_monitor(result["monitorID"])
if "pushToken" in monitor_data:
result["pushToken"] = monitor_data["pushToken"]
return result
except Exception as e:
self._disconnect()
raise Exception(f"Failed to create monitor: {str(e)}")
def delete_monitor(self, monitor_id: int) -> dict:
"""Delete a monitor."""
try:
api = self._get_api()
result = api.delete_monitor(monitor_id)
return result
except Exception as e:
self._disconnect()
raise Exception(f"Failed to delete monitor {monitor_id}: {str(e)}")
def pause_monitor(self, monitor_id: int) -> dict:
"""Pause a monitor."""
try:
api = self._get_api()
result = api.pause_monitor(monitor_id)
return result
except Exception as e:
self._disconnect()
raise Exception(f"Failed to pause monitor {monitor_id}: {str(e)}")
def resume_monitor(self, monitor_id: int) -> dict:
"""Resume a paused monitor."""
try:
api = self._get_api()
result = api.resume_monitor(monitor_id)
return result
except Exception as e:
self._disconnect()
raise Exception(f"Failed to resume monitor {monitor_id}: {str(e)}")
def test_connection(self) -> bool:
"""Test connection to Uptime Kuma."""
try:
api = self._get_api()
api.get_monitors()
return True
except Exception:
self._disconnect()
return False
# Tag management methods
def get_tags(self) -> list[dict]:
"""Get all tags."""
try:
api = self._get_api()
return api.get_tags()
except Exception as e:
self._disconnect()
raise Exception(f"Failed to get tags: {str(e)}")
def add_tag(self, name: str, color: str) -> dict:
"""Create a new tag."""
try:
api = self._get_api()
return api.add_tag(name=name, color=color)
except Exception as e:
self._disconnect()
raise Exception(f"Failed to add tag: {str(e)}")
def add_monitor_tag(self, tag_id: int, monitor_id: int, value: str = "") -> dict:
"""Add a tag to a monitor.
Args:
tag_id: The tag ID
monitor_id: The monitor ID
value: Optional value for the tag (e.g., hostname)
"""
try:
api = self._get_api()
return api.add_monitor_tag(tag_id=tag_id, monitor_id=monitor_id, value=value)
except Exception as e:
self._disconnect()
raise Exception(f"Failed to add tag to monitor: {str(e)}")
def delete_monitor_tag(self, tag_id: int, monitor_id: int, value: str = "") -> dict:
"""Remove a tag from a monitor."""
try:
api = self._get_api()
return api.delete_monitor_tag(tag_id=tag_id, monitor_id=monitor_id, value=value)
except Exception as e:
self._disconnect()
raise Exception(f"Failed to remove tag from monitor: {str(e)}")
def get_push_url(self, push_token: str) -> str:
"""Build the full push URL for a push monitor.
Args:
push_token: The push token from the monitor
Returns:
Full push URL like 'https://kuma.example.com/api/push/abc123'
"""
return f"{self.base_url}/api/push/{push_token}"
def get_monitor_push_token(self, monitor_id: int) -> Optional[str]:
"""Get the push token for an existing push monitor.
Args:
monitor_id: The Uptime Kuma monitor ID
Returns:
The push token, or None if not a push monitor or not found
"""
try:
monitor_data = self.get_monitor(monitor_id)
return monitor_data.get("pushToken")
except Exception:
return None
# Global client instance
_kuma_client: Optional[UptimeKumaClient] = None
def get_kuma_client() -> UptimeKumaClient:
"""Get the global Uptime Kuma client instance."""
global _kuma_client
if _kuma_client is None:
_kuma_client = UptimeKumaClient()
return _kuma_client