Files
kuma-strapper/backend/services/monitors.py
Debian a65997a391
All checks were successful
Build and Push Container / build (push) Successful in 34s
Add debug logging for interval values in monitor creation
This helps trace the interval values at each step:
- Claude agent logs what interval it suggested
- Monitor service logs the interval when creating and deploying

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 20:26:10 +00:00

454 lines
16 KiB
Python

from dataclasses import dataclass
from typing import Optional
import logging
from services.kuma_client import get_kuma_client, Monitor
from services.claude_agent import MonitorSuggestion
from services.ssh_manager import get_ssh_manager
from services import push_scripts
logger = logging.getLogger(__name__)
@dataclass
class DefaultMonitorProfile:
"""A default monitoring profile that doesn't require approval."""
name: str
description: str
monitors: list[Monitor]
def create_host_health_monitors(hostname: str, ssh_port: int = 22) -> list[Monitor]:
"""Create default host health monitors."""
return [
Monitor(
type="ping",
name=f"{hostname} - Ping",
hostname=hostname,
interval=60,
),
Monitor(
type="tcp",
name=f"{hostname} - SSH",
hostname=hostname,
port=ssh_port,
interval=120,
),
]
def create_web_server_monitors(hostname: str, port: int = 80, https: bool = False) -> list[Monitor]:
"""Create monitors for a detected web server."""
protocol = "https" if https else "http"
return [
Monitor(
type="http",
name=f"{hostname} - Web ({port})",
url=f"{protocol}://{hostname}:{port}/",
interval=60,
),
]
def create_docker_container_monitors(hostname: str, containers: list[dict]) -> list[Monitor]:
"""Create monitors for detected Docker containers."""
monitors = []
for container in containers:
name = container.get("name", container.get("id", "unknown"))
monitors.append(
Monitor(
type="docker",
name=f"{hostname} - Container: {name}",
docker_container=name,
docker_host=hostname,
interval=60,
)
)
return monitors
class MonitorService:
"""Service for managing monitors in Uptime Kuma."""
def __init__(self):
self.created_monitors: list[dict] = []
def create_default_monitors(
self,
hostname: str,
ssh_port: int = 22,
has_docker: bool = False,
containers: Optional[list[dict]] = None,
web_ports: Optional[list[int]] = None,
) -> list[dict]:
"""
Create default monitors for a host.
These are built-in and never require approval.
"""
kuma = get_kuma_client()
created = []
# Host health monitors
health_monitors = create_host_health_monitors(hostname, ssh_port)
for monitor in health_monitors:
try:
result = kuma.create_monitor(monitor)
created.append({
"monitor": monitor.name,
"type": monitor.type,
"status": "created",
"result": result,
})
except Exception as e:
created.append({
"monitor": monitor.name,
"type": monitor.type,
"status": "failed",
"error": str(e),
})
# Web server monitors
if web_ports:
for port in web_ports:
https = port == 443 or port == 8443
web_monitors = create_web_server_monitors(hostname, port, https)
for monitor in web_monitors:
try:
result = kuma.create_monitor(monitor)
created.append({
"monitor": monitor.name,
"type": monitor.type,
"status": "created",
"result": result,
})
except Exception as e:
created.append({
"monitor": monitor.name,
"type": monitor.type,
"status": "failed",
"error": str(e),
})
# Docker container monitors
if has_docker and containers:
docker_monitors = create_docker_container_monitors(hostname, containers)
for monitor in docker_monitors:
try:
result = kuma.create_monitor(monitor)
created.append({
"monitor": monitor.name,
"type": monitor.type,
"status": "created",
"result": result,
})
except Exception as e:
created.append({
"monitor": monitor.name,
"type": monitor.type,
"status": "failed",
"error": str(e),
})
self.created_monitors.extend(created)
return created
def create_from_suggestion(
self,
suggestion: MonitorSuggestion,
hostname: str,
username: str = "root",
port: int = 22,
) -> dict:
"""
Create a monitor from a Claude suggestion.
In production mode, this executes automatically.
In dev mode, this should only be called after approval.
"""
kuma = get_kuma_client()
interval_minutes = max(1, suggestion.interval // 60)
logger.info(
f"Creating monitor '{suggestion.name}': type={suggestion.type}, "
f"interval={suggestion.interval}s ({interval_minutes}min)"
)
# Build monitor from suggestion
monitor = Monitor(
type=suggestion.type,
name=suggestion.name,
interval=suggestion.interval,
)
# Set type-specific fields
if suggestion.type == "http" or suggestion.type == "keyword":
monitor.url = suggestion.target
if suggestion.keyword:
monitor.keyword = suggestion.keyword
elif suggestion.type == "tcp":
monitor.hostname = suggestion.target
monitor.port = suggestion.port
elif suggestion.type == "ping":
monitor.hostname = suggestion.target
elif suggestion.type == "docker":
monitor.docker_container = suggestion.target
monitor.docker_host = hostname
elif suggestion.type == "push":
# Push monitors need the push_metric field
pass
try:
result = kuma.create_monitor(monitor)
response = {
"monitor": monitor.name,
"type": monitor.type,
"status": "created",
"result": result,
"reason": suggestion.reason,
"push_metric": suggestion.push_metric,
}
# For push monitors, deploy the script to the remote host
if suggestion.type == "push" and suggestion.push_metric:
push_token = result.get("pushToken")
monitor_id = result.get("monitorID")
if push_token and monitor_id:
deploy_result = self.deploy_push_script(
hostname=hostname,
push_metric=suggestion.push_metric,
push_token=push_token,
monitor_id=monitor_id,
interval_minutes=max(1, suggestion.interval // 60),
username=username,
port=port,
)
response["deployment"] = deploy_result
else:
response["deployment"] = {
"status": "failed",
"error": "No push token returned from Uptime Kuma",
}
return response
except Exception as e:
return {
"monitor": monitor.name,
"type": monitor.type,
"status": "failed",
"error": str(e),
"reason": suggestion.reason,
}
def deploy_push_script(
self,
hostname: str,
push_metric: str,
push_token: str,
monitor_id: int,
interval_minutes: int = 5,
username: str = "root",
port: int = 22,
) -> dict:
"""
Deploy a push monitoring script to a remote host via SSH.
Args:
hostname: The remote host to deploy to
push_metric: The metric type (heartbeat, disk, memory, cpu, updates)
push_token: The Uptime Kuma push token
monitor_id: The Uptime Kuma monitor ID
interval_minutes: Cronjob interval in minutes
username: SSH username
port: SSH port
Returns:
Dict with status and any error messages
"""
logger.info(
f"Deploying push script: metric={push_metric}, monitor_id={monitor_id}, "
f"interval_minutes={interval_minutes}"
)
kuma = get_kuma_client()
ssh = get_ssh_manager()
# Build the push URL and script
push_url = kuma.get_push_url(push_token)
script_content = push_scripts.generate_script(push_metric, push_url)
if not script_content:
return {
"status": "failed",
"error": f"Unknown push metric type: {push_metric}",
}
script_path = push_scripts.get_script_path(push_metric, monitor_id)
script_filename = push_scripts.get_script_filename(push_metric, monitor_id)
cronjob_entry = push_scripts.get_cronjob_entry(push_metric, monitor_id, interval_minutes)
try:
# Ensure SSH connection
if not ssh.is_connected(hostname, username, port):
connected = ssh.connect(hostname, username, port)
if not connected:
return {
"status": "failed",
"error": f"Could not connect to {hostname}",
}
# Write the script to the remote host using sudo tee (works for non-root users)
# Using tee instead of redirect because sudo doesn't apply to redirects
write_cmd = f"sudo tee {script_path} > /dev/null << 'KUMA_SCRIPT_EOF'\n{script_content}KUMA_SCRIPT_EOF"
result = ssh.execute(hostname, write_cmd, username, port)
if not result.success:
error_detail = result.stderr or result.stdout or f"exit code {result.exit_code}"
return {
"status": "failed",
"error": f"Failed to write script: {error_detail}",
}
# Make the script executable
chmod_result = ssh.execute(hostname, f"sudo chmod +x {script_path}", username, port)
if not chmod_result.success:
error_detail = chmod_result.stderr or chmod_result.stdout or f"exit code {chmod_result.exit_code}"
return {
"status": "failed",
"error": f"Failed to make script executable: {error_detail}",
}
# Try to add cronjob entry (crontab may not be available on all systems)
cron_cmd = f"(crontab -l 2>/dev/null | grep -v '{script_filename}'; echo '{cronjob_entry}') | crontab -"
cron_result = ssh.execute(hostname, cron_cmd, username, port)
scheduling_method = None
scheduling_info = None
if cron_result.success:
scheduling_method = "crontab"
scheduling_info = cronjob_entry
else:
# Try systemd timer as fallback
timer_name = f"kuma-push-{push_metric}-{monitor_id}"
timer_content = f"""[Unit]
Description=Kuma Push Monitor - {push_metric}
[Timer]
OnBootSec=1min
OnUnitActiveSec={interval_minutes}min
AccuracySec=1s
[Install]
WantedBy=timers.target
"""
service_content = f"""[Unit]
Description=Kuma Push Monitor - {push_metric}
[Service]
Type=oneshot
ExecStart={script_path}
"""
# Write timer and service files
timer_path = f"/etc/systemd/system/{timer_name}.timer"
service_path = f"/etc/systemd/system/{timer_name}.service"
timer_cmd = f"sudo tee {timer_path} > /dev/null << 'KUMA_TIMER_EOF'\n{timer_content}KUMA_TIMER_EOF"
service_cmd = f"sudo tee {service_path} > /dev/null << 'KUMA_SERVICE_EOF'\n{service_content}KUMA_SERVICE_EOF"
timer_result = ssh.execute(hostname, timer_cmd, username, port)
if timer_result.success:
service_result = ssh.execute(hostname, service_cmd, username, port)
if service_result.success:
# Enable and start the timer
enable_cmd = f"sudo systemctl daemon-reload && sudo systemctl enable --now {timer_name}.timer"
enable_result = ssh.execute(hostname, enable_cmd, username, port)
if enable_result.success:
scheduling_method = "systemd"
scheduling_info = f"{timer_name}.timer"
# Run the script once immediately to verify it works
run_result = ssh.execute(hostname, script_path, username, port, timeout=30)
result = {
"status": "deployed",
"script_path": script_path,
"initial_run": {
"success": run_result.success,
"stdout": run_result.stdout,
"stderr": run_result.stderr,
},
}
if scheduling_method:
result["scheduling"] = {
"method": scheduling_method,
"info": scheduling_info,
}
else:
result["scheduling"] = {
"method": "none",
"warning": "Neither crontab nor systemd available. Script deployed but not scheduled.",
}
return result
except Exception as e:
logger.exception(f"Failed to deploy push script to {hostname}")
return {
"status": "failed",
"error": str(e),
}
def get_existing_monitors(self) -> list[dict]:
"""Get all existing monitors from Uptime Kuma."""
kuma = get_kuma_client()
return kuma.get_monitors()
def parse_web_ports_from_scan(open_ports: str) -> list[int]:
"""Extract web server ports from port scan output."""
common_web_ports = [80, 443, 8080, 8443, 3000, 5000, 8000]
found_ports = []
for port in common_web_ports:
if f":{port}" in open_ports or f" {port} " in open_ports:
found_ports.append(port)
return found_ports
def parse_docker_containers_from_scan(docker_output: str) -> list[dict]:
"""Parse Docker container info from scan output."""
containers = []
if "Docker not available" in docker_output or not docker_output.strip():
return containers
for line in docker_output.strip().split("\n"):
if not line.strip():
continue
parts = line.split("\t")
if len(parts) >= 2:
containers.append({
"id": parts[0] if len(parts) > 0 else "",
"name": parts[1] if len(parts) > 1 else "",
"image": parts[2] if len(parts) > 2 else "",
"status": parts[3] if len(parts) > 3 else "",
"ports": parts[4] if len(parts) > 4 else "",
})
return containers
# Global monitor service instance
_monitor_service: Optional[MonitorService] = None
def get_monitor_service() -> MonitorService:
"""Get the global monitor service instance."""
global _monitor_service
if _monitor_service is None:
_monitor_service = MonitorService()
return _monitor_service