"""Sync service for reconciling local database with Uptime Kuma.""" import logging from datetime import datetime from typing import Optional from services.database import get_database, TrackedMonitor, Host from services.kuma_client import get_kuma_client logger = logging.getLogger(__name__) TAG_NAME = "kuma-strapper" TAG_COLOR = "#5865F2" # Discord blurple class SyncService: """Service for syncing local database with Uptime Kuma.""" def __init__(self): self.db = get_database() self.kuma = get_kuma_client() self._tag_id: Optional[int] = None def ensure_tag_exists(self) -> int: """Get or create the kuma-strapper tag in Uptime Kuma. Returns: The tag ID """ if self._tag_id is not None: return self._tag_id try: tags = self.kuma.get_tags() for tag in tags: if tag.get("name") == TAG_NAME: self._tag_id = tag["id"] logger.info(f"Found existing tag '{TAG_NAME}' with id {self._tag_id}") return self._tag_id # Create tag if it doesn't exist result = self.kuma.add_tag(name=TAG_NAME, color=TAG_COLOR) self._tag_id = result.get("id") logger.info(f"Created tag '{TAG_NAME}' with id {self._tag_id}") return self._tag_id except Exception as e: logger.error(f"Failed to ensure tag exists: {e}") raise def get_tag_id(self) -> int: """Get the kuma-strapper tag ID, creating it if needed.""" if self._tag_id is None: return self.ensure_tag_exists() return self._tag_id def add_tag_to_monitor(self, monitor_id: int, hostname: str): """Add the kuma-strapper tag to a monitor. Args: monitor_id: The Uptime Kuma monitor ID hostname: The source hostname (stored as tag value) """ tag_id = self.get_tag_id() self.kuma.add_monitor_tag(tag_id=tag_id, monitor_id=monitor_id, value=hostname) logger.info(f"Added tag to monitor {monitor_id} with hostname '{hostname}'") def full_sync(self) -> dict: """Sync all monitors with kuma-strapper tag from Uptime Kuma. Returns: Summary dict with added, updated, removed counts and any errors """ logger.info("Starting full sync with Uptime Kuma") added, updated, removed = 0, 0, 0 errors = [] try: self.ensure_tag_exists() monitors = self.kuma.get_monitors() # Track which Kuma IDs we've seen seen_kuma_ids = set() for monitor in monitors: tag_info = self._get_kuma_strapper_tag(monitor) if not tag_info: continue # Not managed by kuma-strapper kuma_id = monitor["id"] seen_kuma_ids.add(kuma_id) hostname = tag_info.get("value", "unknown") # Get or create host host = self.db.get_or_create_host(hostname) # Check if we already track this monitor existing = self.db.get_monitor_by_kuma_id(kuma_id) if existing: # Update if changed if self._monitor_changed(existing, monitor): self._update_tracked_monitor(existing, monitor) updated += 1 self.db.mark_monitor_synced(existing.id) else: # New monitor - import it self._import_monitor(monitor, host, hostname) added += 1 # Mark monitors deleted in Kuma all_local = self.db.get_all_monitors() for local in all_local: if local.kuma_monitor_id not in seen_kuma_ids: if local.status != "deleted_in_kuma": self.db.update_monitor_status(local.id, "deleted_in_kuma") removed += 1 logger.info( f"Marked monitor '{local.name}' (kuma_id={local.kuma_monitor_id}) " "as deleted_in_kuma" ) except Exception as e: logger.error(f"Full sync failed: {e}") errors.append(str(e)) result = { "added": added, "updated": updated, "removed": removed, "errors": errors, } logger.info(f"Full sync complete: {result}") return result def sync_host(self, hostname: str) -> dict: """Sync monitors for a specific host. Args: hostname: The hostname to sync Returns: Summary dict with added, updated, removed counts and any errors """ logger.info(f"Starting host sync for '{hostname}'") added, updated, removed = 0, 0, 0 errors = [] try: self.ensure_tag_exists() monitors = self.kuma.get_monitors() host = self.db.get_or_create_host(hostname) seen_kuma_ids = set() for monitor in monitors: tag_info = self._get_kuma_strapper_tag(monitor) if not tag_info: continue tag_hostname = tag_info.get("value", "") if tag_hostname != hostname: continue kuma_id = monitor["id"] seen_kuma_ids.add(kuma_id) existing = self.db.get_monitor_by_kuma_id(kuma_id) if existing: if self._monitor_changed(existing, monitor): self._update_tracked_monitor(existing, monitor) updated += 1 self.db.mark_monitor_synced(existing.id) else: self._import_monitor(monitor, host, hostname) added += 1 # Mark host's monitors as deleted if not in Kuma local_monitors = self.db.get_monitors_for_host(host.id) for local in local_monitors: if local.kuma_monitor_id not in seen_kuma_ids: if local.status != "deleted_in_kuma": self.db.update_monitor_status(local.id, "deleted_in_kuma") removed += 1 self.db.update_host_sync_time(host.id) except Exception as e: logger.error(f"Host sync failed for '{hostname}': {e}") errors.append(str(e)) result = { "hostname": hostname, "added": added, "updated": updated, "removed": removed, "errors": errors, } logger.info(f"Host sync complete for '{hostname}': {result}") return result def _get_kuma_strapper_tag(self, monitor: dict) -> Optional[dict]: """Extract kuma-strapper tag info from monitor if present. Args: monitor: Monitor dict from Uptime Kuma Returns: Tag dict with id, name, value, color or None if not found """ for tag in monitor.get("tags", []): if tag.get("name") == TAG_NAME: return tag return None def _import_monitor(self, kuma_monitor: dict, host: Host, hostname: str): """Import a monitor from Uptime Kuma into local DB. Args: kuma_monitor: Monitor dict from Uptime Kuma host: Local Host object hostname: The hostname string """ monitor = TrackedMonitor( id=None, kuma_monitor_id=kuma_monitor["id"], host_id=host.id, name=kuma_monitor.get("name", "Unknown"), type=self._kuma_type_to_string(kuma_monitor.get("type")), target=self._extract_target(kuma_monitor), port=kuma_monitor.get("port"), interval_seconds=kuma_monitor.get("interval", 60), push_metric=None, # Can't determine from Kuma API status="active" if kuma_monitor.get("active", True) else "paused", synced_at=datetime.utcnow(), ) self.db.add_monitor(monitor) logger.info(f"Imported monitor '{monitor.name}' (kuma_id={monitor.kuma_monitor_id})") def _monitor_changed(self, local: TrackedMonitor, kuma: dict) -> bool: """Check if a monitor has changed in Uptime Kuma. Args: local: Local TrackedMonitor kuma: Monitor dict from Uptime Kuma Returns: True if the monitor has changed """ if local.name != kuma.get("name"): return True if local.interval_seconds != kuma.get("interval", 60): return True kuma_active = kuma.get("active", True) local_active = local.status == "active" if local_active != kuma_active: return True return False def _update_tracked_monitor(self, local: TrackedMonitor, kuma: dict): """Update a tracked monitor with data from Uptime Kuma. Args: local: Local TrackedMonitor to update kuma: Monitor dict from Uptime Kuma """ local.name = kuma.get("name", local.name) local.interval_seconds = kuma.get("interval", 60) local.status = "active" if kuma.get("active", True) else "paused" local.target = self._extract_target(kuma) local.port = kuma.get("port") self.db.update_monitor(local) logger.info(f"Updated monitor '{local.name}' (kuma_id={local.kuma_monitor_id})") def _kuma_type_to_string(self, kuma_type) -> str: """Convert Uptime Kuma monitor type to string. The uptime-kuma-api returns types as MonitorType enum values. """ if kuma_type is None: return "unknown" # Handle MonitorType enum or string type_str = str(kuma_type) # Map common types type_map = { "MonitorType.HTTP": "http", "MonitorType.PORT": "tcp", "MonitorType.PING": "ping", "MonitorType.KEYWORD": "keyword", "MonitorType.DOCKER": "docker", "MonitorType.PUSH": "push", } return type_map.get(type_str, type_str.lower().replace("monitortype.", "")) def _extract_target(self, monitor: dict) -> Optional[str]: """Extract the target from a monitor based on its type. Args: monitor: Monitor dict from Uptime Kuma Returns: The target URL, hostname, or container name """ if monitor.get("url"): return monitor["url"] if monitor.get("hostname"): return monitor["hostname"] if monitor.get("docker_container"): return monitor["docker_container"] return None # Global sync service instance _sync_service: Optional[SyncService] = None def get_sync_service() -> SyncService: """Get the global sync service instance.""" global _sync_service if _sync_service is None: _sync_service = SyncService() return _sync_service