import uuid import threading from datetime import datetime from enum import Enum from dataclasses import dataclass, field from typing import Optional, Callable class ApprovalStatus(Enum): PENDING = "pending" APPROVED = "approved" REJECTED = "rejected" EXPIRED = "expired" class ApprovalType(Enum): SSH_COMMAND = "ssh_command" CREATE_MONITOR = "create_monitor" @dataclass class ApprovalRequest: """A request waiting for user approval.""" id: str type: ApprovalType description: str details: dict status: ApprovalStatus = ApprovalStatus.PENDING created_at: datetime = field(default_factory=datetime.now) resolved_at: Optional[datetime] = None reason: str = "" # Why Claude wants to do this def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { "id": self.id, "type": self.type.value, "description": self.description, "details": self.details, "status": self.status.value, "created_at": self.created_at.isoformat(), "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None, "reason": self.reason, } class ApprovalQueue: """Queue for managing approval requests in dev mode.""" def __init__(self): self._requests: dict[str, ApprovalRequest] = {} self._lock = threading.Lock() self._on_request_added: Optional[Callable[[ApprovalRequest], None]] = None self._on_request_resolved: Optional[Callable[[ApprovalRequest], None]] = None def set_callbacks( self, on_added: Optional[Callable[[ApprovalRequest], None]] = None, on_resolved: Optional[Callable[[ApprovalRequest], None]] = None, ) -> None: """Set callbacks for queue events.""" self._on_request_added = on_added self._on_request_resolved = on_resolved def add_ssh_command(self, command: str, reason: str, hostname: str) -> ApprovalRequest: """Add an SSH command approval request.""" request = ApprovalRequest( id=str(uuid.uuid4()), type=ApprovalType.SSH_COMMAND, description=f"Execute SSH command on {hostname}", details={ "command": command, "hostname": hostname, }, reason=reason, ) with self._lock: self._requests[request.id] = request if self._on_request_added: self._on_request_added(request) return request def add_monitor_creation( self, monitor_name: str, monitor_type: str, target: str, reason: str, ) -> ApprovalRequest: """Add a monitor creation approval request.""" request = ApprovalRequest( id=str(uuid.uuid4()), type=ApprovalType.CREATE_MONITOR, description=f"Create {monitor_type} monitor: {monitor_name}", details={ "name": monitor_name, "type": monitor_type, "target": target, }, reason=reason, ) with self._lock: self._requests[request.id] = request if self._on_request_added: self._on_request_added(request) return request def approve(self, request_id: str) -> Optional[ApprovalRequest]: """Approve a request.""" with self._lock: request = self._requests.get(request_id) if not request or request.status != ApprovalStatus.PENDING: return None request.status = ApprovalStatus.APPROVED request.resolved_at = datetime.now() if self._on_request_resolved: self._on_request_resolved(request) return request def reject(self, request_id: str) -> Optional[ApprovalRequest]: """Reject a request.""" with self._lock: request = self._requests.get(request_id) if not request or request.status != ApprovalStatus.PENDING: return None request.status = ApprovalStatus.REJECTED request.resolved_at = datetime.now() if self._on_request_resolved: self._on_request_resolved(request) return request def get_pending(self) -> list[ApprovalRequest]: """Get all pending requests.""" with self._lock: return [r for r in self._requests.values() if r.status == ApprovalStatus.PENDING] def get_request(self, request_id: str) -> Optional[ApprovalRequest]: """Get a specific request.""" with self._lock: return self._requests.get(request_id) def clear_resolved(self) -> int: """Clear all resolved requests. Returns count of cleared requests.""" with self._lock: to_remove = [ rid for rid, req in self._requests.items() if req.status != ApprovalStatus.PENDING ] for rid in to_remove: del self._requests[rid] return len(to_remove) def wait_for_approval( self, request_id: str, timeout: float = 300.0, check_interval: float = 0.5, ) -> Optional[ApprovalRequest]: """ Wait for a request to be approved or rejected. Args: request_id: The request ID to wait for timeout: Maximum time to wait in seconds check_interval: How often to check status Returns: The resolved request, or None if timeout """ import time start = time.time() while time.time() - start < timeout: request = self.get_request(request_id) if not request: return None if request.status != ApprovalStatus.PENDING: return request time.sleep(check_interval) # Timeout - mark as expired with self._lock: request = self._requests.get(request_id) if request and request.status == ApprovalStatus.PENDING: request.status = ApprovalStatus.EXPIRED request.resolved_at = datetime.now() return request # Global approval queue instance _approval_queue: Optional[ApprovalQueue] = None def get_approval_queue() -> ApprovalQueue: """Get the global approval queue instance.""" global _approval_queue if _approval_queue is None: _approval_queue = ApprovalQueue() return _approval_queue