from typing import Optional from dataclasses import dataclass from uptime_kuma_api import UptimeKumaApi, MonitorType from config import get_config # Map our monitor types to Uptime Kuma types TYPE_MAP = { "http": MonitorType.HTTP, "tcp": MonitorType.PORT, "ping": MonitorType.PING, "docker": MonitorType.DOCKER, "keyword": MonitorType.KEYWORD, "push": MonitorType.PUSH, } @dataclass class Monitor: """Uptime Kuma monitor configuration.""" type: str # http, tcp, ping, docker, keyword, push name: str url: Optional[str] = None # For HTTP monitors hostname: Optional[str] = None # For TCP/Ping monitors port: Optional[int] = None # For TCP monitors interval: int = 60 keyword: Optional[str] = None # For keyword monitors docker_container: Optional[str] = None # For Docker monitors docker_host: Optional[str] = None # For Docker monitors push_token: Optional[str] = None # For Push monitors retries: int = 3 retry_interval: int = 60 max_redirects: int = 10 accepted_statuscodes: list[str] = None def __post_init__(self): if self.accepted_statuscodes is None: self.accepted_statuscodes = ["200-299"] class UptimeKumaClient: """Client for Uptime Kuma using Socket.io API.""" def __init__(self): config = get_config() self.base_url = config.uptime_kuma_url.rstrip("/") self.api_key = config.uptime_kuma_api_key self._api = None def _get_api(self) -> UptimeKumaApi: """Get connected API instance.""" if self._api is None: self._api = UptimeKumaApi(self.base_url) # Login with API key as token self._api.login_by_token(self.api_key) return self._api def _disconnect(self): """Disconnect API.""" if self._api: try: self._api.disconnect() except Exception: pass self._api = None def get_monitors(self) -> list[dict]: """Get all monitors.""" try: api = self._get_api() return api.get_monitors() except Exception as e: self._disconnect() raise Exception(f"Failed to get monitors: {str(e)}") def get_monitor(self, monitor_id: int) -> dict: """Get a specific monitor.""" try: api = self._get_api() return api.get_monitor(monitor_id) except Exception as e: self._disconnect() raise Exception(f"Failed to get monitor {monitor_id}: {str(e)}") def create_monitor(self, monitor: Monitor) -> dict: """Create a new monitor.""" try: api = self._get_api() monitor_type = TYPE_MAP.get(monitor.type) if not monitor_type: raise ValueError(f"Unknown monitor type: {monitor.type}") kwargs = { "type": monitor_type, "name": monitor.name, "interval": monitor.interval, "maxretries": monitor.retries, "retryInterval": monitor.retry_interval, } # Add type-specific fields if monitor.type == "http": kwargs["url"] = monitor.url kwargs["maxredirects"] = monitor.max_redirects kwargs["accepted_statuscodes"] = monitor.accepted_statuscodes elif monitor.type == "keyword": kwargs["url"] = monitor.url kwargs["keyword"] = monitor.keyword elif monitor.type == "tcp": kwargs["hostname"] = monitor.hostname kwargs["port"] = monitor.port elif monitor.type == "ping": kwargs["hostname"] = monitor.hostname elif monitor.type == "docker": kwargs["docker_container"] = monitor.docker_container if monitor.docker_host: kwargs["docker_host"] = monitor.docker_host elif monitor.type == "push": # Push monitors are created and get a token back pass result = api.add_monitor(**kwargs) return result except Exception as e: self._disconnect() raise Exception(f"Failed to create monitor: {str(e)}") def delete_monitor(self, monitor_id: int) -> dict: """Delete a monitor.""" try: api = self._get_api() result = api.delete_monitor(monitor_id) return result except Exception as e: self._disconnect() raise Exception(f"Failed to delete monitor {monitor_id}: {str(e)}") def pause_monitor(self, monitor_id: int) -> dict: """Pause a monitor.""" try: api = self._get_api() result = api.pause_monitor(monitor_id) return result except Exception as e: self._disconnect() raise Exception(f"Failed to pause monitor {monitor_id}: {str(e)}") def resume_monitor(self, monitor_id: int) -> dict: """Resume a paused monitor.""" try: api = self._get_api() result = api.resume_monitor(monitor_id) return result except Exception as e: self._disconnect() raise Exception(f"Failed to resume monitor {monitor_id}: {str(e)}") def test_connection(self) -> bool: """Test connection to Uptime Kuma.""" try: api = self._get_api() api.get_monitors() return True except Exception: self._disconnect() return False # Global client instance _kuma_client: Optional[UptimeKumaClient] = None def get_kuma_client() -> UptimeKumaClient: """Get the global Uptime Kuma client instance.""" global _kuma_client if _kuma_client is None: _kuma_client = UptimeKumaClient() return _kuma_client