from dataclasses import dataclass from typing import Optional import logging from services.kuma_client import get_kuma_client, Monitor from services.claude_agent import MonitorSuggestion from services.ssh_manager import get_ssh_manager from services import push_scripts logger = logging.getLogger(__name__) @dataclass class DefaultMonitorProfile: """A default monitoring profile that doesn't require approval.""" name: str description: str monitors: list[Monitor] def create_host_health_monitors(hostname: str, ssh_port: int = 22) -> list[Monitor]: """Create default host health monitors.""" return [ Monitor( type="ping", name=f"{hostname} - Ping", hostname=hostname, interval=60, ), Monitor( type="tcp", name=f"{hostname} - SSH", hostname=hostname, port=ssh_port, interval=120, ), ] def create_web_server_monitors(hostname: str, port: int = 80, https: bool = False) -> list[Monitor]: """Create monitors for a detected web server.""" protocol = "https" if https else "http" return [ Monitor( type="http", name=f"{hostname} - Web ({port})", url=f"{protocol}://{hostname}:{port}/", interval=60, ), ] def create_docker_container_monitors(hostname: str, containers: list[dict]) -> list[Monitor]: """Create monitors for detected Docker containers.""" monitors = [] for container in containers: name = container.get("name", container.get("id", "unknown")) monitors.append( Monitor( type="docker", name=f"{hostname} - Container: {name}", docker_container=name, docker_host=hostname, interval=60, ) ) return monitors class MonitorService: """Service for managing monitors in Uptime Kuma.""" def __init__(self): self.created_monitors: list[dict] = [] def create_default_monitors( self, hostname: str, ssh_port: int = 22, has_docker: bool = False, containers: Optional[list[dict]] = None, web_ports: Optional[list[int]] = None, ) -> list[dict]: """ Create default monitors for a host. These are built-in and never require approval. """ kuma = get_kuma_client() created = [] # Host health monitors health_monitors = create_host_health_monitors(hostname, ssh_port) for monitor in health_monitors: try: result = kuma.create_monitor(monitor) created.append({ "monitor": monitor.name, "type": monitor.type, "status": "created", "result": result, }) except Exception as e: created.append({ "monitor": monitor.name, "type": monitor.type, "status": "failed", "error": str(e), }) # Web server monitors if web_ports: for port in web_ports: https = port == 443 or port == 8443 web_monitors = create_web_server_monitors(hostname, port, https) for monitor in web_monitors: try: result = kuma.create_monitor(monitor) created.append({ "monitor": monitor.name, "type": monitor.type, "status": "created", "result": result, }) except Exception as e: created.append({ "monitor": monitor.name, "type": monitor.type, "status": "failed", "error": str(e), }) # Docker container monitors if has_docker and containers: docker_monitors = create_docker_container_monitors(hostname, containers) for monitor in docker_monitors: try: result = kuma.create_monitor(monitor) created.append({ "monitor": monitor.name, "type": monitor.type, "status": "created", "result": result, }) except Exception as e: created.append({ "monitor": monitor.name, "type": monitor.type, "status": "failed", "error": str(e), }) self.created_monitors.extend(created) return created def create_from_suggestion(self, suggestion: MonitorSuggestion, hostname: str) -> dict: """ Create a monitor from a Claude suggestion. In production mode, this executes automatically. In dev mode, this should only be called after approval. """ kuma = get_kuma_client() # Build monitor from suggestion monitor = Monitor( type=suggestion.type, name=suggestion.name, interval=suggestion.interval, ) # Set type-specific fields if suggestion.type == "http" or suggestion.type == "keyword": monitor.url = suggestion.target if suggestion.keyword: monitor.keyword = suggestion.keyword elif suggestion.type == "tcp": monitor.hostname = suggestion.target monitor.port = suggestion.port elif suggestion.type == "ping": monitor.hostname = suggestion.target elif suggestion.type == "docker": monitor.docker_container = suggestion.target monitor.docker_host = hostname elif suggestion.type == "push": # Push monitors need the push_metric field pass try: result = kuma.create_monitor(monitor) response = { "monitor": monitor.name, "type": monitor.type, "status": "created", "result": result, "reason": suggestion.reason, "push_metric": suggestion.push_metric, } # For push monitors, deploy the script to the remote host if suggestion.type == "push" and suggestion.push_metric: push_token = result.get("pushToken") monitor_id = result.get("monitorID") if push_token and monitor_id: deploy_result = self.deploy_push_script( hostname=hostname, push_metric=suggestion.push_metric, push_token=push_token, monitor_id=monitor_id, interval_minutes=max(1, suggestion.interval // 60), ) response["deployment"] = deploy_result else: response["deployment"] = { "status": "failed", "error": "No push token returned from Uptime Kuma", } return response except Exception as e: return { "monitor": monitor.name, "type": monitor.type, "status": "failed", "error": str(e), "reason": suggestion.reason, } def deploy_push_script( self, hostname: str, push_metric: str, push_token: str, monitor_id: int, interval_minutes: int = 5, username: str = "root", port: int = 22, ) -> dict: """ Deploy a push monitoring script to a remote host via SSH. Args: hostname: The remote host to deploy to push_metric: The metric type (heartbeat, disk, memory, cpu, updates) push_token: The Uptime Kuma push token monitor_id: The Uptime Kuma monitor ID interval_minutes: Cronjob interval in minutes username: SSH username port: SSH port Returns: Dict with status and any error messages """ kuma = get_kuma_client() ssh = get_ssh_manager() # Build the push URL and script push_url = kuma.get_push_url(push_token) script_content = push_scripts.generate_script(push_metric, push_url) if not script_content: return { "status": "failed", "error": f"Unknown push metric type: {push_metric}", } script_path = push_scripts.get_script_path(push_metric, monitor_id) script_filename = push_scripts.get_script_filename(push_metric, monitor_id) cronjob_entry = push_scripts.get_cronjob_entry(push_metric, monitor_id, interval_minutes) try: # Ensure SSH connection if not ssh.is_connected(hostname, username, port): connected = ssh.connect(hostname, username, port) if not connected: return { "status": "failed", "error": f"Could not connect to {hostname}", } # Write the script to the remote host using heredoc # Escape any single quotes in the script content escaped_content = script_content.replace("'", "'\"'\"'") write_cmd = f"cat > {script_path} << 'KUMA_SCRIPT_EOF'\n{script_content}KUMA_SCRIPT_EOF" result = ssh.execute(hostname, write_cmd, username, port) if not result.success: return { "status": "failed", "error": f"Failed to write script: {result.stderr}", } # Make the script executable chmod_result = ssh.execute(hostname, f"chmod +x {script_path}", username, port) if not chmod_result.success: return { "status": "failed", "error": f"Failed to make script executable: {chmod_result.stderr}", } # Add cronjob entry (remove existing entry first to avoid duplicates) cron_cmd = f"(crontab -l 2>/dev/null | grep -v '{script_filename}'; echo '{cronjob_entry}') | crontab -" cron_result = ssh.execute(hostname, cron_cmd, username, port) if not cron_result.success: return { "status": "failed", "error": f"Failed to add cronjob: {cron_result.stderr}", } # Run the script once immediately to verify it works run_result = ssh.execute(hostname, script_path, username, port, timeout=30) return { "status": "deployed", "script_path": script_path, "cronjob": cronjob_entry, "initial_run": { "success": run_result.success, "stdout": run_result.stdout, "stderr": run_result.stderr, }, } except Exception as e: logger.exception(f"Failed to deploy push script to {hostname}") return { "status": "failed", "error": str(e), } def get_existing_monitors(self) -> list[dict]: """Get all existing monitors from Uptime Kuma.""" kuma = get_kuma_client() return kuma.get_monitors() def parse_web_ports_from_scan(open_ports: str) -> list[int]: """Extract web server ports from port scan output.""" common_web_ports = [80, 443, 8080, 8443, 3000, 5000, 8000] found_ports = [] for port in common_web_ports: if f":{port}" in open_ports or f" {port} " in open_ports: found_ports.append(port) return found_ports def parse_docker_containers_from_scan(docker_output: str) -> list[dict]: """Parse Docker container info from scan output.""" containers = [] if "Docker not available" in docker_output or not docker_output.strip(): return containers for line in docker_output.strip().split("\n"): if not line.strip(): continue parts = line.split("\t") if len(parts) >= 2: containers.append({ "id": parts[0] if len(parts) > 0 else "", "name": parts[1] if len(parts) > 1 else "", "image": parts[2] if len(parts) > 2 else "", "status": parts[3] if len(parts) > 3 else "", "ports": parts[4] if len(parts) > 4 else "", }) return containers # Global monitor service instance _monitor_service: Optional[MonitorService] = None def get_monitor_service() -> MonitorService: """Get the global monitor service instance.""" global _monitor_service if _monitor_service is None: _monitor_service = MonitorService() return _monitor_service