Files
kuma-strapper/backend/app.py
Debian 7ecb6e396e
All checks were successful
Build and Push Container / build (push) Successful in 30s
Pass username/port to initial push monitor deployment
- Update create_from_suggestion to accept username and port
- Pass scan's username/port when creating suggested monitors
- Fixes first deploy using wrong username

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 10:13:18 +00:00

604 lines
19 KiB
Python

import os
import threading
from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS
from flask_socketio import SocketIO, emit
from config import get_config, set_dev_mode
# Path to frontend build
FRONTEND_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend", "dist")
from services.ssh_manager import get_ssh_manager
from services.discovery import get_discovery_service, DiscoveryResult
from services.claude_agent import create_agent, AgentResponse
from services.monitors import (
get_monitor_service,
parse_web_ports_from_scan,
parse_docker_containers_from_scan,
)
from services.kuma_client import get_kuma_client
from utils.approval import get_approval_queue, ApprovalStatus
app = Flask(__name__)
CORS(app, origins=["http://localhost:5173", "http://localhost:3000"])
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="gevent")
# Store for active scans and their Claude agents
active_scans: dict[str, dict] = {}
# Setup approval queue callbacks for WebSocket notifications
def on_approval_added(request):
socketio.emit("approval_request", request.to_dict())
def on_approval_resolved(request):
socketio.emit("approval_resolved", request.to_dict())
approval_queue = get_approval_queue()
approval_queue.set_callbacks(on_approval_added, on_approval_resolved)
# Serve frontend static files
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_frontend(path):
if path and os.path.exists(os.path.join(FRONTEND_DIR, path)):
return send_from_directory(FRONTEND_DIR, path)
return send_from_directory(FRONTEND_DIR, "index.html")
# Health check
@app.route("/api/health")
def health():
return jsonify({"status": "ok"})
# Settings endpoints
@app.route("/api/settings", methods=["GET"])
def get_settings():
config = get_config()
return jsonify({
"dev_mode": config.dev_mode,
"uptime_kuma_url": config.uptime_kuma_url,
"has_ssh_key": bool(config.ssh_private_key),
"has_claude_key": bool(config.claude_api_key),
"has_kuma_key": bool(config.uptime_kuma_api_key),
})
@app.route("/api/settings", methods=["PUT"])
def update_settings():
data = request.json
if "dev_mode" in data:
set_dev_mode(data["dev_mode"])
return jsonify({"status": "ok"})
# Scan endpoints
@app.route("/api/scan", methods=["POST"])
def start_scan():
data = request.json
hostname = data.get("hostname")
username = data.get("username", "root")
port = data.get("port", 22)
if not hostname:
return jsonify({"error": "hostname is required"}), 400
# Start scan in background thread
def run_scan():
discovery = get_discovery_service()
def on_progress(cmd_name, status):
socketio.emit("scan_progress", {
"hostname": hostname,
"command": cmd_name,
"status": status,
})
result = discovery.scan_host(hostname, username, port, on_progress)
active_scans[result.scan_id] = {
"result": result,
"agent": None,
"suggestions": None,
}
socketio.emit("scan_complete", {
"scan_id": result.scan_id,
"hostname": hostname,
"username": result.username,
"port": result.port,
"connected": result.connected,
"error": result.error,
})
# If scan succeeded, analyze with Claude
if result.connected:
analyze_with_claude(result)
thread = threading.Thread(target=run_scan)
thread.start()
return jsonify({"status": "started", "hostname": hostname})
def analyze_with_claude(result: DiscoveryResult):
"""Analyze scan results with Claude agent."""
socketio.emit("analysis_started", {"scan_id": result.scan_id})
try:
agent = create_agent()
response = agent.analyze_host(result.to_dict(), result.hostname)
active_scans[result.scan_id]["agent"] = agent
active_scans[result.scan_id]["suggestions"] = response
socketio.emit("analysis_complete", {
"scan_id": result.scan_id,
"analysis": response.analysis,
"monitors": [
{
"type": m.type,
"name": m.name,
"target": m.target,
"port": m.port,
"interval": m.interval,
"reason": m.reason,
"push_metric": m.push_metric,
}
for m in response.monitors
],
"additional_commands": [
{"command": c.command, "reason": c.reason}
for c in response.additional_commands
],
"questions": response.questions,
})
except Exception as e:
socketio.emit("analysis_error", {
"scan_id": result.scan_id,
"error": str(e),
})
@app.route("/api/scan/<scan_id>", methods=["GET"])
def get_scan(scan_id):
scan_data = active_scans.get(scan_id)
if not scan_data:
return jsonify({"error": "Scan not found"}), 404
result = scan_data["result"]
suggestions = scan_data.get("suggestions")
response_data = {
"scan_id": result.scan_id,
"hostname": result.hostname,
"connected": result.connected,
"error": result.error,
"data": result.to_dict(),
}
if suggestions:
response_data["suggestions"] = {
"analysis": suggestions.analysis,
"monitors": [
{
"type": m.type,
"name": m.name,
"target": m.target,
"port": m.port,
"interval": m.interval,
"reason": m.reason,
"push_metric": m.push_metric,
}
for m in suggestions.monitors
],
"additional_commands": [
{"command": c.command, "reason": c.reason}
for c in suggestions.additional_commands
],
"questions": suggestions.questions,
}
return jsonify(response_data)
# Execute additional command (requires approval in dev mode)
@app.route("/api/scan/<scan_id>/command", methods=["POST"])
def run_additional_command(scan_id):
scan_data = active_scans.get(scan_id)
if not scan_data:
return jsonify({"error": "Scan not found"}), 404
data = request.json
command = data.get("command")
reason = data.get("reason", "User requested")
if not command:
return jsonify({"error": "command is required"}), 400
result = scan_data["result"]
config = get_config()
discovery = get_discovery_service()
# Check if command is safe (built-in)
if discovery.is_safe_command(command):
# Execute immediately
cmd_result = discovery.run_additional_command(
result.hostname, command, result.username, result.port
)
return jsonify({
"status": "completed",
"output": cmd_result.stdout,
"error": cmd_result.stderr,
"exit_code": cmd_result.exit_code,
})
# In dev mode, require approval
if config.dev_mode:
approval_request = approval_queue.add_ssh_command(
command, reason, result.hostname
)
return jsonify({
"status": "pending_approval",
"approval_id": approval_request.id,
"message": "Command requires approval in dev mode",
})
# In production mode, block non-safe commands from Claude
return jsonify({
"status": "blocked",
"message": "This command is not in the safe list and dev mode is disabled",
}), 403
# Approval endpoints
@app.route("/api/approvals", methods=["GET"])
def get_pending_approvals():
pending = approval_queue.get_pending()
return jsonify({
"approvals": [r.to_dict() for r in pending],
})
@app.route("/api/approvals/<approval_id>/approve", methods=["POST"])
def approve_request(approval_id):
request_obj = approval_queue.approve(approval_id)
if not request_obj:
return jsonify({"error": "Approval not found or already resolved"}), 404
# If it was an SSH command, execute it now
if request_obj.type.value == "ssh_command":
hostname = request_obj.details["hostname"]
command = request_obj.details["command"]
# Find the scan for this hostname
for scan_id, scan_data in active_scans.items():
if scan_data["result"].hostname == hostname:
discovery = get_discovery_service()
result = scan_data["result"]
cmd_result = discovery.run_additional_command(
hostname, command, result.username, result.port
)
# Send result via WebSocket
socketio.emit("command_result", {
"approval_id": approval_id,
"command": command,
"output": cmd_result.stdout,
"error": cmd_result.stderr,
"exit_code": cmd_result.exit_code,
})
# Feed result back to Claude if agent exists
agent = scan_data.get("agent")
if agent:
output = cmd_result.stdout if cmd_result.success else cmd_result.stderr
response = agent.process_command_results(command, output)
scan_data["suggestions"] = response
socketio.emit("analysis_update", {
"scan_id": scan_id,
"analysis": response.analysis,
"monitors": [
{
"type": m.type,
"name": m.name,
"target": m.target,
"port": m.port,
"interval": m.interval,
"reason": m.reason,
"push_metric": m.push_metric,
}
for m in response.monitors
],
"additional_commands": [
{"command": c.command, "reason": c.reason}
for c in response.additional_commands
],
"questions": response.questions,
})
break
return jsonify({"status": "approved", "request": request_obj.to_dict()})
@app.route("/api/approvals/<approval_id>/reject", methods=["POST"])
def reject_request(approval_id):
request_obj = approval_queue.reject(approval_id)
if not request_obj:
return jsonify({"error": "Approval not found or already resolved"}), 404
return jsonify({"status": "rejected", "request": request_obj.to_dict()})
# Monitor endpoints
@app.route("/api/monitors", methods=["GET"])
def get_monitors():
try:
kuma = get_kuma_client()
monitors = kuma.get_monitors()
return jsonify({"monitors": monitors})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/monitors/create-defaults", methods=["POST"])
def create_default_monitors():
data = request.json
scan_id = data.get("scan_id")
if not scan_id:
return jsonify({"error": "scan_id is required"}), 400
scan_data = active_scans.get(scan_id)
if not scan_data:
return jsonify({"error": "Scan not found"}), 404
result = scan_data["result"]
monitor_service = get_monitor_service()
# Parse discovered services
web_ports = parse_web_ports_from_scan(result.open_ports)
containers = parse_docker_containers_from_scan(result.docker_containers)
has_docker = "Docker not available" not in result.docker_containers
created = monitor_service.create_default_monitors(
hostname=result.hostname,
ssh_port=result.port,
has_docker=has_docker,
containers=containers,
web_ports=web_ports,
)
return jsonify({"created": created})
@app.route("/api/monitors/create-suggested", methods=["POST"])
def create_suggested_monitors():
data = request.json
scan_id = data.get("scan_id")
monitor_indices = data.get("monitors", []) # List of indices to create
if not scan_id:
return jsonify({"error": "scan_id is required"}), 400
scan_data = active_scans.get(scan_id)
if not scan_data:
return jsonify({"error": "Scan not found"}), 404
suggestions = scan_data.get("suggestions")
if not suggestions:
return jsonify({"error": "No suggestions available"}), 400
result = scan_data["result"]
monitor_service = get_monitor_service()
created = []
for idx in monitor_indices:
if 0 <= idx < len(suggestions.monitors):
suggestion = suggestions.monitors[idx]
monitor_result = monitor_service.create_from_suggestion(
suggestion,
result.hostname,
username=result.username,
port=result.port,
)
created.append(monitor_result)
return jsonify({"created": created})
@app.route("/api/monitors/<int:monitor_id>/deploy-script", methods=["POST"])
def deploy_push_script(monitor_id):
"""Deploy a push monitoring script to a remote host for an existing push monitor."""
data = request.json
hostname = data.get("hostname")
push_metric = data.get("push_metric")
username = data.get("username", "root")
port = data.get("port", 22)
interval_minutes = data.get("interval_minutes", 5)
if not hostname:
return jsonify({"error": "hostname is required"}), 400
if not push_metric:
return jsonify({"error": "push_metric is required"}), 400
valid_metrics = ["heartbeat", "disk", "memory", "cpu", "updates"]
if push_metric not in valid_metrics:
return jsonify({"error": f"push_metric must be one of: {', '.join(valid_metrics)}"}), 400
try:
kuma = get_kuma_client()
push_token = kuma.get_monitor_push_token(monitor_id)
if not push_token:
return jsonify({"error": "Could not get push token. Is this a push monitor?"}), 400
monitor_service = get_monitor_service()
result = monitor_service.deploy_push_script(
hostname=hostname,
push_metric=push_metric,
push_token=push_token,
monitor_id=monitor_id,
interval_minutes=interval_minutes,
username=username,
port=port,
)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/monitors/deploy-all-scripts", methods=["POST"])
def deploy_all_push_scripts():
"""Deploy scripts for multiple push monitors at once."""
data = request.json
monitors = data.get("monitors", [])
if not monitors:
return jsonify({"error": "monitors array is required"}), 400
results = []
kuma = get_kuma_client()
monitor_service = get_monitor_service()
for monitor_config in monitors:
monitor_id = monitor_config.get("monitor_id")
hostname = monitor_config.get("hostname")
push_metric = monitor_config.get("push_metric")
username = monitor_config.get("username", "root")
port = monitor_config.get("port", 22)
interval_minutes = monitor_config.get("interval_minutes", 5)
if not monitor_id or not hostname or not push_metric:
results.append({
"monitor_id": monitor_id,
"status": "failed",
"error": "monitor_id, hostname, and push_metric are required",
})
continue
try:
push_token = kuma.get_monitor_push_token(monitor_id)
if not push_token:
results.append({
"monitor_id": monitor_id,
"status": "failed",
"error": "Could not get push token",
})
continue
result = monitor_service.deploy_push_script(
hostname=hostname,
push_metric=push_metric,
push_token=push_token,
monitor_id=monitor_id,
interval_minutes=interval_minutes,
username=username,
port=port,
)
result["monitor_id"] = monitor_id
results.append(result)
except Exception as e:
results.append({
"monitor_id": monitor_id,
"status": "failed",
"error": str(e),
})
return jsonify({"results": results})
# Uptime Kuma authentication endpoints
@app.route("/api/kuma/auth", methods=["GET"])
def kuma_auth_status():
"""Check if authenticated to Uptime Kuma."""
try:
kuma = get_kuma_client()
authenticated = kuma.is_authenticated()
config = get_config()
return jsonify({
"authenticated": authenticated,
"url": config.uptime_kuma_url,
})
except Exception as e:
return jsonify({"authenticated": False, "error": str(e)})
@app.route("/api/kuma/login", methods=["POST"])
def kuma_login():
"""Login to Uptime Kuma with username/password/TOTP."""
data = request.json
username = data.get("username")
password = data.get("password")
totp = data.get("totp") # Optional
if not username or not password:
return jsonify({"error": "username and password are required"}), 400
try:
kuma = get_kuma_client()
result = kuma.login(username, password, totp)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 401
@app.route("/api/kuma/logout", methods=["POST"])
def kuma_logout():
"""Logout from Uptime Kuma."""
try:
kuma = get_kuma_client()
kuma.logout()
return jsonify({"status": "ok"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# Test Uptime Kuma connection
@app.route("/api/kuma/test", methods=["GET"])
def test_kuma_connection():
try:
kuma = get_kuma_client()
connected = kuma.test_connection()
return jsonify({"connected": connected})
except Exception as e:
return jsonify({"connected": False, "error": str(e)})
# WebSocket events
@socketio.on("connect")
def handle_connect():
emit("connected", {"status": "ok"})
@socketio.on("disconnect")
def handle_disconnect():
pass
if __name__ == "__main__":
# Validate config on startup
config = get_config()
errors = config.validate()
if errors:
print("Configuration errors:")
for error in errors:
print(f" - {error}")
print("\nSet the required environment variables and restart.")
else:
print("Configuration OK")
print(f"Dev mode: {'enabled' if config.dev_mode else 'disabled'}")
socketio.run(app, host="0.0.0.0", port=5000, debug=os.environ.get("DEBUG", "false").lower() == "true")