import os import threading from flask import Flask, jsonify, request, send_from_directory from flask_cors import CORS from flask_socketio import SocketIO, emit from config import get_config, set_dev_mode # Path to frontend build FRONTEND_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend", "dist") from services.ssh_manager import get_ssh_manager from services.discovery import get_discovery_service, DiscoveryResult from services.claude_agent import create_agent, AgentResponse from services.monitors import ( get_monitor_service, parse_web_ports_from_scan, parse_docker_containers_from_scan, ) from services.kuma_client import get_kuma_client from services.database import get_database from services.sync import get_sync_service from utils.approval import get_approval_queue, ApprovalStatus app = Flask(__name__) CORS(app, origins=["http://localhost:5173", "http://localhost:3000"]) socketio = SocketIO(app, cors_allowed_origins="*", async_mode="gevent") # Store for active scans and their Claude agents active_scans: dict[str, dict] = {} # Setup approval queue callbacks for WebSocket notifications def on_approval_added(request): socketio.emit("approval_request", request.to_dict()) def on_approval_resolved(request): socketio.emit("approval_resolved", request.to_dict()) approval_queue = get_approval_queue() approval_queue.set_callbacks(on_approval_added, on_approval_resolved) # Serve frontend static files @app.route("/", defaults={"path": ""}) @app.route("/") def serve_frontend(path): if path and os.path.exists(os.path.join(FRONTEND_DIR, path)): return send_from_directory(FRONTEND_DIR, path) return send_from_directory(FRONTEND_DIR, "index.html") # Health check @app.route("/api/health") def health(): return jsonify({"status": "ok"}) # Settings endpoints @app.route("/api/settings", methods=["GET"]) def get_settings(): config = get_config() return jsonify({ "dev_mode": config.dev_mode, "uptime_kuma_url": config.uptime_kuma_url, "has_ssh_key": bool(config.ssh_private_key), "has_claude_key": bool(config.claude_api_key), "has_kuma_key": bool(config.uptime_kuma_api_key), }) @app.route("/api/settings", methods=["PUT"]) def update_settings(): data = request.json if "dev_mode" in data: set_dev_mode(data["dev_mode"]) return jsonify({"status": "ok"}) # Scan endpoints @app.route("/api/scan", methods=["POST"]) def start_scan(): data = request.json hostname = data.get("hostname") username = data.get("username", "root") port = data.get("port", 22) if not hostname: return jsonify({"error": "hostname is required"}), 400 # Start scan in background thread def run_scan(): # Sync this host's monitors before scanning try: sync = get_sync_service() sync_result = sync.sync_host(hostname) db = get_database() existing_monitors = db.get_monitors_for_hostname(hostname) socketio.emit("host_sync_complete", { "hostname": hostname, "sync_result": sync_result, "existing_monitors": [m.to_dict() for m in existing_monitors], }) except Exception as e: print(f"Pre-scan sync failed (non-fatal): {e}") discovery = get_discovery_service() def on_progress(cmd_name, status): socketio.emit("scan_progress", { "hostname": hostname, "command": cmd_name, "status": status, }) result = discovery.scan_host(hostname, username, port, on_progress) active_scans[result.scan_id] = { "result": result, "agent": None, "suggestions": None, } socketio.emit("scan_complete", { "scan_id": result.scan_id, "hostname": hostname, "username": result.username, "port": result.port, "connected": result.connected, "error": result.error, }) # If scan succeeded, analyze with Claude if result.connected: analyze_with_claude(result) thread = threading.Thread(target=run_scan) thread.start() return jsonify({"status": "started", "hostname": hostname}) def analyze_with_claude(result: DiscoveryResult): """Analyze scan results with Claude agent.""" socketio.emit("analysis_started", {"scan_id": result.scan_id}) try: agent = create_agent() response = agent.analyze_host(result.to_dict(), result.hostname) active_scans[result.scan_id]["agent"] = agent active_scans[result.scan_id]["suggestions"] = response socketio.emit("analysis_complete", { "scan_id": result.scan_id, "analysis": response.analysis, "monitors": [ { "type": m.type, "name": m.name, "target": m.target, "port": m.port, "interval": m.interval, "reason": m.reason, "push_metric": m.push_metric, } for m in response.monitors ], "additional_commands": [ {"command": c.command, "reason": c.reason} for c in response.additional_commands ], "questions": response.questions, }) except Exception as e: socketio.emit("analysis_error", { "scan_id": result.scan_id, "error": str(e), }) @app.route("/api/scan/", methods=["GET"]) def get_scan(scan_id): scan_data = active_scans.get(scan_id) if not scan_data: return jsonify({"error": "Scan not found"}), 404 result = scan_data["result"] suggestions = scan_data.get("suggestions") response_data = { "scan_id": result.scan_id, "hostname": result.hostname, "connected": result.connected, "error": result.error, "data": result.to_dict(), } if suggestions: response_data["suggestions"] = { "analysis": suggestions.analysis, "monitors": [ { "type": m.type, "name": m.name, "target": m.target, "port": m.port, "interval": m.interval, "reason": m.reason, "push_metric": m.push_metric, } for m in suggestions.monitors ], "additional_commands": [ {"command": c.command, "reason": c.reason} for c in suggestions.additional_commands ], "questions": suggestions.questions, } return jsonify(response_data) # Execute additional command (requires approval in dev mode) @app.route("/api/scan//command", methods=["POST"]) def run_additional_command(scan_id): scan_data = active_scans.get(scan_id) if not scan_data: return jsonify({"error": "Scan not found"}), 404 data = request.json command = data.get("command") reason = data.get("reason", "User requested") if not command: return jsonify({"error": "command is required"}), 400 result = scan_data["result"] config = get_config() discovery = get_discovery_service() # Check if command is safe (built-in) if discovery.is_safe_command(command): # Execute immediately cmd_result = discovery.run_additional_command( result.hostname, command, result.username, result.port ) return jsonify({ "status": "completed", "output": cmd_result.stdout, "error": cmd_result.stderr, "exit_code": cmd_result.exit_code, }) # In dev mode, require approval if config.dev_mode: approval_request = approval_queue.add_ssh_command( command, reason, result.hostname ) return jsonify({ "status": "pending_approval", "approval_id": approval_request.id, "message": "Command requires approval in dev mode", }) # In production mode, block non-safe commands from Claude return jsonify({ "status": "blocked", "message": "This command is not in the safe list and dev mode is disabled", }), 403 # Approval endpoints @app.route("/api/approvals", methods=["GET"]) def get_pending_approvals(): pending = approval_queue.get_pending() return jsonify({ "approvals": [r.to_dict() for r in pending], }) @app.route("/api/approvals//approve", methods=["POST"]) def approve_request(approval_id): request_obj = approval_queue.approve(approval_id) if not request_obj: return jsonify({"error": "Approval not found or already resolved"}), 404 # If it was an SSH command, execute it now if request_obj.type.value == "ssh_command": hostname = request_obj.details["hostname"] command = request_obj.details["command"] # Find the scan for this hostname for scan_id, scan_data in active_scans.items(): if scan_data["result"].hostname == hostname: discovery = get_discovery_service() result = scan_data["result"] cmd_result = discovery.run_additional_command( hostname, command, result.username, result.port ) # Send result via WebSocket socketio.emit("command_result", { "approval_id": approval_id, "command": command, "output": cmd_result.stdout, "error": cmd_result.stderr, "exit_code": cmd_result.exit_code, }) # Feed result back to Claude if agent exists agent = scan_data.get("agent") if agent: output = cmd_result.stdout if cmd_result.success else cmd_result.stderr response = agent.process_command_results(command, output) scan_data["suggestions"] = response socketio.emit("analysis_update", { "scan_id": scan_id, "analysis": response.analysis, "monitors": [ { "type": m.type, "name": m.name, "target": m.target, "port": m.port, "interval": m.interval, "reason": m.reason, "push_metric": m.push_metric, } for m in response.monitors ], "additional_commands": [ {"command": c.command, "reason": c.reason} for c in response.additional_commands ], "questions": response.questions, }) break return jsonify({"status": "approved", "request": request_obj.to_dict()}) @app.route("/api/approvals//reject", methods=["POST"]) def reject_request(approval_id): request_obj = approval_queue.reject(approval_id) if not request_obj: return jsonify({"error": "Approval not found or already resolved"}), 404 return jsonify({"status": "rejected", "request": request_obj.to_dict()}) # Monitor endpoints @app.route("/api/monitors", methods=["GET"]) def get_monitors(): try: kuma = get_kuma_client() monitors = kuma.get_monitors() return jsonify({"monitors": monitors}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/monitors/create-defaults", methods=["POST"]) def create_default_monitors(): data = request.json scan_id = data.get("scan_id") if not scan_id: return jsonify({"error": "scan_id is required"}), 400 scan_data = active_scans.get(scan_id) if not scan_data: return jsonify({"error": "Scan not found"}), 404 result = scan_data["result"] monitor_service = get_monitor_service() # Parse discovered services web_ports = parse_web_ports_from_scan(result.open_ports) containers = parse_docker_containers_from_scan(result.docker_containers) has_docker = "Docker not available" not in result.docker_containers created = monitor_service.create_default_monitors( hostname=result.hostname, ssh_port=result.port, has_docker=has_docker, containers=containers, web_ports=web_ports, ) return jsonify({"created": created}) @app.route("/api/monitors/create-suggested", methods=["POST"]) def create_suggested_monitors(): data = request.json scan_id = data.get("scan_id") monitor_indices = data.get("monitors", []) # List of indices to create if not scan_id: return jsonify({"error": "scan_id is required"}), 400 scan_data = active_scans.get(scan_id) if not scan_data: return jsonify({"error": "Scan not found"}), 404 suggestions = scan_data.get("suggestions") if not suggestions: return jsonify({"error": "No suggestions available"}), 400 result = scan_data["result"] monitor_service = get_monitor_service() created = [] for idx in monitor_indices: if 0 <= idx < len(suggestions.monitors): suggestion = suggestions.monitors[idx] monitor_result = monitor_service.create_from_suggestion( suggestion, result.hostname, username=result.username, port=result.port, ) created.append(monitor_result) return jsonify({"created": created}) @app.route("/api/monitors//deploy-script", methods=["POST"]) def deploy_push_script(monitor_id): """Deploy a push monitoring script to a remote host for an existing push monitor.""" data = request.json hostname = data.get("hostname") push_metric = data.get("push_metric") username = data.get("username", "root") port = data.get("port", 22) interval_minutes = data.get("interval_minutes", 5) if not hostname: return jsonify({"error": "hostname is required"}), 400 if not push_metric: return jsonify({"error": "push_metric is required"}), 400 valid_metrics = ["heartbeat", "disk", "memory", "cpu", "updates"] if push_metric not in valid_metrics: return jsonify({"error": f"push_metric must be one of: {', '.join(valid_metrics)}"}), 400 try: kuma = get_kuma_client() push_token = kuma.get_monitor_push_token(monitor_id) if not push_token: return jsonify({"error": "Could not get push token. Is this a push monitor?"}), 400 monitor_service = get_monitor_service() result = monitor_service.deploy_push_script( hostname=hostname, push_metric=push_metric, push_token=push_token, monitor_id=monitor_id, interval_minutes=interval_minutes, username=username, port=port, ) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/monitors/deploy-all-scripts", methods=["POST"]) def deploy_all_push_scripts(): """Deploy scripts for multiple push monitors at once.""" data = request.json monitors = data.get("monitors", []) if not monitors: return jsonify({"error": "monitors array is required"}), 400 results = [] kuma = get_kuma_client() monitor_service = get_monitor_service() for monitor_config in monitors: monitor_id = monitor_config.get("monitor_id") hostname = monitor_config.get("hostname") push_metric = monitor_config.get("push_metric") username = monitor_config.get("username", "root") port = monitor_config.get("port", 22) interval_minutes = monitor_config.get("interval_minutes", 5) if not monitor_id or not hostname or not push_metric: results.append({ "monitor_id": monitor_id, "status": "failed", "error": "monitor_id, hostname, and push_metric are required", }) continue try: push_token = kuma.get_monitor_push_token(monitor_id) if not push_token: results.append({ "monitor_id": monitor_id, "status": "failed", "error": "Could not get push token", }) continue result = monitor_service.deploy_push_script( hostname=hostname, push_metric=push_metric, push_token=push_token, monitor_id=monitor_id, interval_minutes=interval_minutes, username=username, port=port, ) result["monitor_id"] = monitor_id results.append(result) except Exception as e: results.append({ "monitor_id": monitor_id, "status": "failed", "error": str(e), }) return jsonify({"results": results}) # Uptime Kuma authentication endpoints @app.route("/api/kuma/auth", methods=["GET"]) def kuma_auth_status(): """Check if authenticated to Uptime Kuma.""" try: kuma = get_kuma_client() authenticated = kuma.is_authenticated() config = get_config() return jsonify({ "authenticated": authenticated, "url": config.uptime_kuma_url, }) except Exception as e: return jsonify({"authenticated": False, "error": str(e)}) @app.route("/api/kuma/login", methods=["POST"]) def kuma_login(): """Login to Uptime Kuma with username/password/TOTP.""" data = request.json username = data.get("username") password = data.get("password") totp = data.get("totp") # Optional if not username or not password: return jsonify({"error": "username and password are required"}), 400 try: kuma = get_kuma_client() result = kuma.login(username, password, totp) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 401 @app.route("/api/kuma/logout", methods=["POST"]) def kuma_logout(): """Logout from Uptime Kuma.""" try: kuma = get_kuma_client() kuma.logout() return jsonify({"status": "ok"}) except Exception as e: return jsonify({"error": str(e)}), 500 # Test Uptime Kuma connection @app.route("/api/kuma/test", methods=["GET"]) def test_kuma_connection(): try: kuma = get_kuma_client() connected = kuma.test_connection() return jsonify({"connected": connected}) except Exception as e: return jsonify({"connected": False, "error": str(e)}) # Sync endpoints @app.route("/api/sync", methods=["POST"]) def trigger_sync(): """Trigger a full sync with Uptime Kuma.""" try: sync = get_sync_service() result = sync.full_sync() return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/sync/host/", methods=["POST"]) def sync_host(hostname): """Sync monitors for a specific host.""" try: sync = get_sync_service() result = sync.sync_host(hostname) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/hosts", methods=["GET"]) def get_hosts(): """Get all tracked hosts.""" try: db = get_database() hosts = db.get_all_hosts() return jsonify({ "hosts": [h.to_dict() for h in hosts], "count": len(hosts) }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/hosts//monitors", methods=["GET"]) def get_host_monitors(hostname): """Get all tracked monitors for a host.""" try: db = get_database() monitors = db.get_monitors_for_hostname(hostname) return jsonify({ "hostname": hostname, "monitors": [m.to_dict() for m in monitors], "count": len(monitors) }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/monitors/tracked", methods=["GET"]) def get_tracked_monitors(): """Get all tracked monitors across all hosts.""" try: db = get_database() monitors = db.get_all_monitors() return jsonify({ "monitors": [m.to_dict() for m in monitors], "count": len(monitors) }) except Exception as e: return jsonify({"error": str(e)}), 500 # WebSocket events @socketio.on("connect") def handle_connect(): emit("connected", {"status": "ok"}) @socketio.on("disconnect") def handle_disconnect(): pass def startup_sync(): """Run initial sync with Uptime Kuma on startup.""" try: print("Running startup sync with Uptime Kuma...") sync = get_sync_service() result = sync.full_sync() print(f"Startup sync complete: added={result['added']}, updated={result['updated']}, removed={result['removed']}") if result['errors']: print(f"Sync errors: {result['errors']}") except Exception as e: print(f"Startup sync failed (non-fatal): {e}") if __name__ == "__main__": # Validate config on startup config = get_config() errors = config.validate() if errors: print("Configuration errors:") for error in errors: print(f" - {error}") print("\nSet the required environment variables and restart.") else: print("Configuration OK") print(f"Dev mode: {'enabled' if config.dev_mode else 'disabled'}") # Run startup sync in background after short delay def delayed_sync(): import time time.sleep(2) # Wait for app to fully start startup_sync() sync_thread = threading.Thread(target=delayed_sync, daemon=True) sync_thread.start() socketio.run(app, host="0.0.0.0", port=5000, debug=os.environ.get("DEBUG", "false").lower() == "true")